Flume NG 编程实践

JerryXia 发表于 , 阅读 (38)

前言

Flume已经自带了几个比较常用的source,但是在特定情况下还是有一些需求不能满足,因此需要特定开发的程序。

我们在使用的过程中,遇到了遇到对source和sink开发的情况,因此下面以这两个为例解释一下。

我们的需求主要是功能方面的,因此只写了source和sink的程序,没有对channal端没有做开发,直接用了file channal,之前看过美团对flume的使用,感觉对channal的定制开发还是不错的,感兴趣的可以参考一下。

定制Source

Simple Source Example

下面是一个PollableSource的简单例子,从官网copy下来的。

主要就是configure、start、stop和process,里面的注释还是挺清晰的。

public class MySource extends AbstractSource implements Configurable, PollableSource {  private String myProp;  @Override  public void configure(Context context) {    String myProp = context.getString("myProp", "defaultValue");    // Process the myProp value (e.g. validation, convert to another type, ...)    // Store myProp for later retrieval by process() method    this.myProp = myProp;  }  @Override  public void start() {    // Initialize the connection to the external client  }  @Override  public void stop () {    // Disconnect from external client and do any additional cleanup    // (e.g. releasing resources or nulling-out field values) ..  }  @Override  public Status process() throws EventDeliveryException {    Status status = null;    try {      // This try clause includes whatever Channel/Event operations you want to do      // Receive new data      Event e = getSomeData();      // Store the Event into this Source's associated Channel(s)      getChannelProcessor().processEvent(e);      status = Status.READY;    } catch (Throwable t) {      // Log exception, handle individual exceptions as needed      status = Status.BACKOFF;      // re-throw all Errors      if (t instanceof Error) {        throw (Error)t;      }    } finally {      txn.close();    }