这篇文章主要为大家展示了“flume如何自定义source、sink”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“flume如何自定义source、sink”这篇文章吧。

创新互联是一家专业提供大田企业网站建设,专注与网站制作、做网站、H5开发、小程序制作等业务。10年已为大田众多企业、政府机构等服务。创新互联专业网站设计公司优惠进行中。
自定义source开发:
source是收集日志存入channel。
Source提供了两种机制:PollableSource(轮训拉取)和EventDrivenSource(事件驱动),
如果使用EventDrivenSource,你可以在start方法中启动额外的线程,不断的往channel中发数据。如果使用PollableSource,你可以在process()实现不断重发。
public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;
  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");
    // Process the myProp value (e.g. validation, convert to another type, ...)
    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }
  @Override
  public void start() {
    // Initialize the connection to the external client
  }
  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }
  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;
    try {
      // This try clause includes whatever Channel/Event operations you want to do
      // Receive new data
      Event e = getSomeData();
      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);
      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed
      status = Status.BACKOFF;
      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }}或者
package  
 org.apache.flume;import  
 org.apache.flume.conf.Configurable;import  
 org.apache.flume.source.AbstractSource;public  
 class  
 TailSource  
 extends  
 AbstractSource  
 implements  
 EventDrivenSource,         
 Configurable {     
 @Override     
 public  
 void  
 configure(Context context) {     
 }     
 @Override     
 public  
 synchronized  
 void  
 start() {     
 }     
 @Override     
 public  
 synchronized  
 void  
 stop() {     
 }}自定义sink:
sink是从channel中拉取日志处理。
process会不断调用,你只需在process中去取channel的数据即可。
public class MySink extends AbstractSink implements Configurable {
  private String myProp;
  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");
    // Process the myProp value (e.g. validation)
    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }
  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }
  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }
  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;
    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do
      Event event = ch.take();
      // Send the Event to the external repository.
      // storeSomeData(e);
      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();
      // Log exception, handle individual exceptions as needed
      status = Status.BACKOFF;
      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }}以上是“flume如何自定义source、sink”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!
Copyright © 2009-2022 www.fjjierui.cn 青羊区广皓图文设计工作室(个体工商户)达州站 版权所有 蜀ICP备19037934号