真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flume-ng源碼解析之Channel組件-創(chuàng)新互聯(lián)

如果還沒看過Flume-ng源碼解析之啟動流程,可以點擊Flume-ng源碼解析之啟動流程 查看

成都創(chuàng)新互聯(lián)IDC提供業(yè)務(wù):西部信息服務(wù)器托管,成都服務(wù)器租用,西部信息服務(wù)器托管,重慶服務(wù)器租用等四川省內(nèi)主機托管與主機租用業(yè)務(wù);數(shù)據(jù)中心含:雙線機房,BGP機房,電信機房,移動機房,聯(lián)通機房。

1 接口介紹

組件的分析順序是按照上一篇中啟動順序來分析的,首先是Channel,然后是Sink,最后是Source,在開始看組件源碼之前我們先來看一下兩個重要的接口,一個是LifecycleAware ,另一個是NamedComponent

1.1 LifecycleAware

@InterfaceAudience.Public@InterfaceStability.Stablepublic interface LifecycleAware {  public void start();  public void stop();  public LifecycleState getLifecycleState();

}

非常簡單就是三個方法,start()、stop()和getLifecycleState,這個接口是flume好多類都要實現(xiàn)的接口,包括Flume-ng源碼解析之啟動流程
所中提到PollingPropertiesFileConfigurationProvider(),只要涉及到生命周期的都會實現(xiàn)該接口,當然組件們也是要實現(xiàn)的!

1.2 NamedComponent

@InterfaceAudience.Public@InterfaceStability.Stablepublic interface NamedComponent {  public void setName(String name);  public String getName();

}

這個沒什么好講的,就是用來設(shè)置名字的。

2 Channel

作為Flume三大核心組件之一的Channel,我們有必要來看看它的構(gòu)成:

@InterfaceAudience.Public@InterfaceStability.Stablepublic interface Channel extends LifecycleAware, NamedComponent {  public void put(Event event) throws ChannelException;  public Event take() throws ChannelException;  public Transaction getTransaction();
}

那么從上面的接口中我們可以看到Channel的主要功能就是put()和take(),那么我們就來看一下它的具體實現(xiàn)。這里我們選擇MemoryChannel作為例子,但是MemoryChannel太長了,我們就截取一小段來看看

public class MemoryChannel extends BasicChannelSemantics {    private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);    private static final Integer defaultCapacity = Integer.valueOf(100);    private static final Integer defaultTransCapacity = Integer.valueOf(100);
    
    public MemoryChannel() {
    }

    ...
}

我們又看到它繼承了BasicChannelSemantics ,從名字我們可以看出它是一個基礎(chǔ)的Channel,我們繼續(xù)看看看它的實現(xiàn)

@InterfaceAudience.Public@InterfaceStability.Stablepublic abstract class BasicChannelSemantics extends AbstractChannel {  private ThreadLocal currentTransaction
      = new ThreadLocal();  private boolean initialized = false;  protected void initialize() {}  protected abstract BasicTransactionSemantics createTransaction();  @Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,        "No transaction exists for this thread");
    transaction.put(event);
  }  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,        "No transaction exists for this thread");    return transaction.take();
  }  @Override
  public Transaction getTransaction() {    if (!initialized) {      synchronized (this) {        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }

    BasicTransactionSemantics transaction = currentTransaction.get();    if (transaction == null || transaction.getState().equals(
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();
      currentTransaction.set(transaction);
    }    return transaction;
  }
}

找了許久,終于發(fā)現(xiàn)了put()和take(),但是仔細一看,它們內(nèi)部調(diào)用的是BasicTransactionSemantics 的put()和take(),有點失望,繼續(xù)來看看BasicTransactionSemantics

public abstract class BasicTransactionSemantics implements Transaction {  private State state;  private long initialThreadId;  protected void doBegin() throws InterruptedException {}  protected abstract void doPut(Event event) throws InterruptedException;  protected abstract Event doTake() throws InterruptedException;  protected abstract void doCommit() throws InterruptedException;  protected abstract void doRollback() throws InterruptedException;  protected void doClose() {}  protected BasicTransactionSemantics() {
    state = State.NEW;
    initialThreadId = Thread.currentThread().getId();
  }  protected void put(Event event) {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,        "put() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),        "put() called when transaction is %s!", state);
    Preconditions.checkArgument(event != null,        "put() called with null event!");    try {
      doPut(event);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();      throw new ChannelException(e.toString(), e);
    }
  }  protected Event take() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,        "take() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),        "take() called when transaction is %s!", state);    try {      return doTake();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();      return null;
    }
  }  protected State getState() {    return state;
  }

  ...//我們這里只是討論put和take,所以一些暫時不涉及的方法就被我干掉,有興趣恩典朋友可以自行閱讀

  protected static enum State {
    NEW, OPEN, COMPLETED, CLOSED
  }
}

又是一個抽象類,put()和take()內(nèi)部調(diào)用的還是抽象方法doPut()和doTake(),看到這里,我相信沒有耐心的同學已經(jīng)崩潰了,但是就差最后一步了,既然是抽象類,那么最終Channel所使用的肯定是它的一個實現(xiàn)類,這時候我們可以回到一開始使用的MemoryChannel,到里面找找有沒有線索,一看,MemoryChannel中就藏著個內(nèi)部類

private class MemoryTransaction extends BasicTransactionSemantics {    private LinkedBlockingDeque takeList;    private LinkedBlockingDeque putList;    private final ChannelCounter channelCounter;    private int putByteCounter = 0;    private int takeByteCounter = 0;    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque(transCapacity);
      takeList = new LinkedBlockingDeque(transCapacity);

      channelCounter = counter;
    }

    @Override    protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);      if (!putList.offer(event)) {        throw new ChannelException(            "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +            "increasing capacity or increasing thread count");
      }
      putByteCounter += eventByteSize;
    }

    @Override    protected Event doTake() throws InterruptedException {
      channelCounter.incrementEventTakeAttemptCount();      if (takeList.remainingCapacity() == 0) {        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +            "increasing capacity, or increasing thread count");
      }      if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {        return null;
      }
      Event event;
      synchronized (queueLock) {        event = queue.poll();
      }
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +          "signalling existence of entry");
      takeList.put(event);      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      takeByteCounter += eventByteSize;      return event;
    }   //...依然刪除暫時不需要的方法

  }

在這個類中我們可以看到doPut()和doTake()的實現(xiàn)方法,也明白MemoryChannel的put()和take()最終調(diào)用的是MemoryTransaction 的doPut()和doTake()。

有朋友看到這里以為這次解析就要結(jié)束了,其實好戲還在后頭,Channel中還有兩個重要的類ChannelProcessor和ChannelSelector,耐心地聽我慢慢道來。

3 ChannelProcessor

ChannelProcessor 的作用就是執(zhí)行put操作,將數(shù)據(jù)放到channel里面。每個ChannelProcessor實例都會配備一個ChannelSelector來決定event要put到那個channl當中

public class ChannelProcessor implements Configurable {    private static final Logger LOG = LoggerFactory.getLogger(ChannelProcessor.class);    private final ChannelSelector selector;    private final InterceptorChain interceptorChain;    public ChannelProcessor(ChannelSelector selector) {        this.selector = selector;        this.interceptorChain = new InterceptorChain();
    }    public void initialize() {        this.interceptorChain.initialize();
    }    public void close() {        this.interceptorChain.close();
    }    public void configure(Context context) {        this.configureInterceptors(context);
    }    private void configureInterceptors(Context context) {        //配置攔截器
    }    public ChannelSelector getSelector() {        return this.selector;
    }    public void processEventBatch(List events) {
        ...        while(i$.hasNext()) {
            Event optChannel = (Event)i$.next();
            List tx = this.selector.getRequiredChannels(optChannel);

            ...//將event放到Required隊列

            t1 = this.selector.getOptionalChannels(optChannel);

            Object eventQueue;
            ...//將event放到Optional隊列
           
        }

        ...//event的分配操作

    }    public void processEvent(Event event) {        event = this.interceptorChain.intercept(event);        if(event != null) {
            List requiredChannels = this.selector.getRequiredChannels(event);
            Iterator optionalChannels = requiredChannels.iterator();

            ...//event的分配操作

            List optionalChannels1 = this.selector.getOptionalChannels(event);
            Iterator i$1 = optionalChannels1.iterator();

            ...//event的分配操作
        }
    }
}

為了簡化代碼,我進行了一些刪除,只保留需要講解的部分,說白了Channel中的兩個寫入方法,都是需要從作為參數(shù)傳入的selector中獲取對應(yīng)的channel來執(zhí)行event的put操作。接下來我們來看看ChannelSelector

4 ChannelSelector

ChannelSelector是一個接口,我們可以通過ChannelSelectorFactory來創(chuàng)建它的子類,F(xiàn)lume提供了兩個實現(xiàn)類MultiplexingChannelSelector和ReplicatingChannelSelector。

public interface ChannelSelector extends NamedComponent, Configurable {    void setChannels(List var1);    List getRequiredChannels(Event var1);    List getOptionalChannels(Event var1);    List getAllChannels();
}

通過ChannelSelectorFactory 的create來創(chuàng)建,create中調(diào)用getSelectorForType來獲得一個selector,通過配置文件中的type來創(chuàng)建相應(yīng)的子類

public class ChannelSelectorFactory {  private static final Logger LOGGER = LoggerFactory.getLogger(
      ChannelSelectorFactory.class);  public static ChannelSelector create(List channels,
      Map config) {

      ...
  }  public static ChannelSelector create(List channels,
      ChannelSelectorConfiguration conf) {
    String type = ChannelSelectorType.REPLICATING.toString();    if (conf != null) {      type = conf.getType();
    }
    ChannelSelector selector = getSelectorForType(type);
    selector.setChannels(channels);
    Configurables.configure(selector, conf);    return selector;
  }  private static ChannelSelector getSelectorForType(String type) {
    if (type == null || type.trim().length() == 0) {      return new ReplicatingChannelSelector();
    }

    String selectorClassName = type;
    ChannelSelectorType  selectorType = ChannelSelectorType.OTHER;    try {
      selectorType = ChannelSelectorType.valueOf(type.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException ex) {
      LOGGER.debug("Selector type {} is a custom type", type);
    }    if (!selectorType.equals(ChannelSelectorType.OTHER)) {
      selectorClassName = selectorType.getChannelSelectorClassName();
    }

    ChannelSelector selector = null;    try {
      @SuppressWarnings("unchecked")
      Class selectorClass =
          (Class) Class.forName(selectorClassName);
      selector = selectorClass.newInstance();
    } catch (Exception ex) {
      throw new FlumeException("Unable to load selector type: " + type
          + ", class: " + selectorClassName, ex);
    }    return selector;
  }

}

對于這兩種Selector簡單說一下:

1)MultiplexingChannelSelector
下面是一個channel selector 配置文件

agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

MultiplexingChannelSelector類中定義了三個屬性,用于存儲不同類型的channel

    private Map> channelMapping;    private Map> optionalChannels;    private List defaultChannels;

那么具體分配原則如下:

  • 如果設(shè)置了maping,那么會event肯定會給指定的channel,如果同時設(shè)置了optional,也會發(fā)送給optionalchannel

  • 如果沒有設(shè)置maping,設(shè)置default,那么event會發(fā)送給defaultchannel,如果還同時設(shè)置了optional,那么也會發(fā)送給optionalchannel

  • 如果maping和default都沒指定,如果有指定option,那么會發(fā)送給optionalchannel,但是發(fā)送給optionalchannel不會進行失敗重試

2)ReplicatingChannelSelector

分配原則比較簡單

  • 如果是replicating的話,那么如果沒有指定optional,那么全部channel都有,如果某個channel指定為option的話,那么就要從requiredChannel移除,只發(fā)送給optionalchannel

5 總結(jié):

作為一個承上啟下的組件,Channel的作用就是將source來的數(shù)據(jù)通過自己流向sink,那么ChannelProcessor就起到將event put到分配好的channel中,而分配的規(guī)則是由selector決定的,flume提供的selector有multiplexing和replicating兩種。所以ChannelProcessor一般都是在Source中被調(diào)用。那么Channel的take()肯定是在Sink中調(diào)用的。

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。


網(wǎng)頁題目:Flume-ng源碼解析之Channel組件-創(chuàng)新互聯(lián)
標題鏈接:http://weahome.cn/article/dpiosi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部