這篇文章主要為大家展示了“flume如何自定義source、sink”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“flume如何自定義source、sink”這篇文章吧。
創(chuàng)新互聯(lián)是一家專業(yè)提供大田企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站制作、做網(wǎng)站、H5開發(fā)、小程序制作等業(yè)務(wù)。10年已為大田眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站設(shè)計(jì)公司優(yōu)惠進(jìn)行中。
自定義source開發(fā):
source是收集日志存入channel。
Source提供了兩種機(jī)制:PollableSource(輪訓(xùn)拉取)和EventDrivenSource(事件驅(qū)動(dòng)),
如果使用EventDrivenSource,你可以在start方法中啟動(dòng)額外的線程,不斷的往channel中發(fā)數(shù)據(jù)。如果使用PollableSource,你可以在process()實(shí)現(xiàn)不斷重發(fā)。
public class MySource extends AbstractSource implements Configurable, PollableSource { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = getSomeData(); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Throwable t) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; }}
或者
package
org.apache.flume;
import
org.apache.flume.conf.Configurable;
import
org.apache.flume.source.AbstractSource;
public
class
TailSource
extends
AbstractSource
implements
EventDrivenSource,
Configurable {
@Override
public
void
configure(Context context) {
}
@Override
public
synchronized
void
start() {
}
@Override
public
synchronized
void
stop() {
}
}
自定義sink:
sink是從channel中拉取日志處理。
process會(huì)不斷調(diào)用,你只需在process中去取channel的數(shù)據(jù)即可。
public class MySink extends AbstractSink implements Configurable { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } return status; }}
以上是“flume如何自定義source、sink”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!