Flume NG 编程实践
前言
Flume已经自带了几个比较常用的source,但是在特定情况下还是有一些需求不能满足,因此需要特定开发的程序。
我们在使用的过程中,遇到了遇到对source和sink开发的情况,因此下面以这两个为例解释一下。
我们的需求主要是功能方面的,因此只写了source和sink的程序,没有对channal端没有做开发,直接用了file channal,之前看过美团对flume的使用,感觉对channal的定制开发还是不错的,感兴趣的可以参考一下。
定制Source
Simple Source Example
下面是一个PollableSource的简单例子,从官网copy下来的。
主要就是configure、start、stop和process,里面的注释还是挺清晰的。
public class MySource extends AbstractSource implements Configurable, PollableSource { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = getSomeData(); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Throwable t) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); }