真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

flume如何自定義source、sink

這篇文章主要為大家展示了“flume如何自定義source、sink”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“flume如何自定義source、sink”這篇文章吧。

創(chuàng)新互聯(lián)是一家專業(yè)提供大田企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站制作、做網(wǎng)站、H5開發(fā)、小程序制作等業(yè)務(wù)。10年已為大田眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站設(shè)計(jì)公司優(yōu)惠進(jìn)行中。

自定義source開發(fā):

source是收集日志存入channel。

Source提供了兩種機(jī)制:PollableSource(輪訓(xùn)拉取)和EventDrivenSource(事件驅(qū)動(dòng)),

如果使用EventDrivenSource,你可以在start方法中啟動(dòng)額外的線程,不斷的往channel中發(fā)數(shù)據(jù)。如果使用PollableSource,你可以在process()實(shí)現(xiàn)不斷重發(fā)。

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }}

或者

package  org.apache.flume;
 
import  org.apache.flume.conf.Configurable;
import  org.apache.flume.source.AbstractSource;
 
public  class  TailSource  extends  AbstractSource  implements  EventDrivenSource,
         Configurable {
     @Override
     public  void  configure(Context context) {
 
     }
 
     @Override
     public  synchronized  void  start() {
 
     }
 
     @Override
     public  synchronized  void  stop() {
 
     }
}

自定義sink:

sink是從channel中拉取日志處理。

process會(huì)不斷調(diào)用,你只需在process中去取channel的數(shù)據(jù)即可。

public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }}

以上是“flume如何自定義source、sink”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


分享標(biāo)題:flume如何自定義source、sink
標(biāo)題網(wǎng)址:http://weahome.cn/article/gdjiee.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部