TaildirSource類(lèi)圖如下(列出主要類(lèi))
創(chuàng)新互聯(lián)專注于通州網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供通州營(yíng)銷(xiāo)型網(wǎng)站建設(shè),通州網(wǎng)站制作、通州網(wǎng)頁(yè)設(shè)計(jì)、通州網(wǎng)站官網(wǎng)定制、小程序開(kāi)發(fā)服務(wù),打造通州網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供通州網(wǎng)站排名全網(wǎng)營(yíng)銷(xiāo)落地服務(wù)。
TailDirSource類(lèi)
TailDirSource繼承了AbstractSource類(lèi),而AbstractSource類(lèi)中channelProcessor屬性負(fù)責(zé)將Source中的Event提交給Channel組件
TailDirSource類(lèi)通過(guò)配置參數(shù)匹配日志文件,獲取日志文件更新內(nèi)容并且將已經(jīng)讀取的偏移量記錄到特定的文件當(dāng)中(position file)
configure()方法:
1.判斷從配置文件加載的配置是否合法,其中包括了對(duì)filegroups,以及以filegroups為單位的文件路徑是否存在等條件。
2.對(duì)batchSize,skipToEnd,writePosInterval,idleTimeout等變量進(jìn)行初始化工作
batchSize定義了往Channel中發(fā)送Event的批量處理大小
skipToEnd定義了每次程序啟動(dòng),對(duì)文件進(jìn)行讀取的時(shí)候,是否從文件尾部開(kāi)始讀取數(shù)據(jù),或者從文件最開(kāi)始讀取。
writePosInterval,TaildirSource讀取每個(gè)監(jiān)控文件都在位置文件中記錄監(jiān)控文件的已經(jīng)讀取的偏移量,writePosInterval則是定義了更新位置文件的間隔。
idleTimeout日志文件在idleTimeout間隔時(shí)間,沒(méi)有被修改,文件將被關(guān)閉
start()方法:
通過(guò)configure()初始化后的變量創(chuàng)建了ReliableTaildirEventReader對(duì)象,同時(shí)創(chuàng)建兩個(gè)線程池idleFileChecker和positionWriter,分別用于監(jiān)控日志文件和記錄日志文件讀取的偏移量。
idleFileChecker實(shí)現(xiàn)一個(gè)Runnable接口,遍歷reader所有監(jiān)控的文件,檢查文件最后修改時(shí)間+idleTimeout是否小于當(dāng)前時(shí)間,說(shuō)明日志文件在idleTimeout時(shí)間內(nèi)沒(méi)有被修改,該文件將被關(guān)閉。
private class idleFileCheckerRunnable implements Runnable {
@Override
public void run() {
try {
long now = System.currentTimeMillis();
for (TailFile tf : reader.getTailFiles().values()) {
if (tf.getLastUpdated() + idleTimeout < now && tf.getRaf() != null) {
idleInodes.add(tf.getInode());
}
}
} catch (Throwable t) {
logger.error("Uncaught exception in IdleFileChecker thread", t);
}
}
}
positionWriter主要作用是記錄日志文件讀取的偏移量,以json格式("inode", inode, "pos", tf.getPos(), "file", tf.getPath()),其中inode是linux系統(tǒng)中特有屬性,在適應(yīng)其他系統(tǒng)(Windows等)日志采集時(shí)ReliableTaildirEventReader.getInode()方法需要修改(注意:在利用Linux系統(tǒng)上inode實(shí)現(xiàn)上,文件是通過(guò)inode記錄日志讀取偏移量。所以即使文件名改變了,也不影響日志讀取,在我實(shí)現(xiàn)Window版本上,只采用了文件名對(duì)應(yīng)日志讀取偏移量,文件名改變影響日志讀取
)。pos則是記錄的日志讀取的偏移量,file記錄了日志文件的路徑
process()方法:
process方法記錄了TailDirSource類(lèi)中主要的邏輯,獲取每個(gè)監(jiān)控的日志文件,調(diào)用tailFileProcess獲取每個(gè)日志文件的更新數(shù)據(jù),并將每條記錄轉(zhuǎn)換為Event(具體細(xì)節(jié)要看ReliableTaildirEventReader的readEvents方法)
public Status process() {
Status status = Status.READY;
try {
existingInodes.clear();
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
tailFileProcess(tf, true);
}
}
closeTailFiles();
try {
TimeUnit.MILLISECONDS.sleep(retryInterval);
} catch (InterruptedException e) {
logger.info("Interrupted while sleeping");
}
} catch (Throwable t) {
logger.error("Unable to tail files", t);
status = Status.BACKOFF;
}
return status;
}
ReliableTaildirEventReader類(lèi)
構(gòu)造ReliableTaildirEventReader對(duì)象的時(shí)候,首先會(huì)判斷各種必須參數(shù)是否合法等,然后加載position file獲取每個(gè)文件上次記錄的日志文件讀取的偏移量
loadPositionFile(String filePath) 不粘貼方法的具體代碼,主要就是獲取每個(gè)監(jiān)控日志文件的讀取偏移量
readEvents()的各個(gè)不同參數(shù)方法中,下面這個(gè)是最主要的,該方法獲取當(dāng)前日志文件的偏移量,調(diào)用TailFile.readEvents(numEvents, backoffWithoutNL, addByteOffset)方法將日志文件每行轉(zhuǎn)換為Flume的消息對(duì)象Event,并循環(huán)將每個(gè)event添加header信息。
public List readEvents(int numEvents, boolean backoffWithoutNL)
throws IOException {
if (!committed) {
if (currentFile == null) {
throw new IllegalStateException("current file does not exist. " + currentFile.getPath());
}
logger.info("Last read was never committed - resetting position");
long lastPos = currentFile.getPos();
currentFile.updateFilePos(lastPos);
}
List events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
if (events.isEmpty()) {
return events;
}
Map headers = currentFile.getHeaders();
if (annotateFileName || (headers != null && !headers.isEmpty())) {
for (Event event : events) {
if (headers != null && !headers.isEmpty()) {
event.getHeaders().putAll(headers);
}
if (annotateFileName) {
event.getHeaders().put(fileNameHeader, currentFile.getPath());
}
}
}
committed = false;
return events;
}
openFile(File file, Map
TailFile類(lèi)
TaildirSource通過(guò)TailFile類(lèi)操作處理每個(gè)日志文件,包含了RandomAccessFile類(lèi),以及記錄日志文件偏移量pos,最新更新時(shí)間lastUpdated等屬性
RandomAccessFile完美的符合TaildirSource的應(yīng)用場(chǎng)景,RandomAccessFile支持使用seek()方法隨機(jī)訪問(wèn)文件,配合position file中記錄的日志文件讀取偏移量,能夠輕松簡(jiǎn)單的seek到文件偏移量,然后向后讀取日志內(nèi)容,并重新將新的偏移量記錄到position file中。
readEvent(boolean backoffWithoutNL, boolean addByteOffset)方法:
下圖描述了該方法的調(diào)用層級(jí),readEvent簡(jiǎn)單的理解就是將每行日志轉(zhuǎn)為Event消息體,方法最終調(diào)用的是readFile()方法。
readLine()方法,有點(diǎn)難還在研究
public LineResult readLine() throws IOException {
LineResult lineResult = null;
while (true) {
if (bufferPos == NEED_READING) {
if (raf.getFilePointer() < raf.length()) {//當(dāng)文件指針位置小于文件總長(zhǎng)度的時(shí)候,就需要讀取指針位置到文件最后的數(shù)據(jù)
readFile();
} else {
if (oldBuffer.length > 0) {
lineResult = new LineResult(false, oldBuffer);
oldBuffer = new byte[0];
setLineReadPos(lineReadPos + lineResult.line.length);
}
break;
}
}
for (int i = bufferPos; i < buffer.length; i++) {
if (buffer[i] == BYTE_NL) {
int oldLen = oldBuffer.length;
// Don't copy last byte(NEW_LINE)
int lineLen = i - bufferPos;
// For windows, check for CR
if (i > 0 && buffer[i - 1] == BYTE_CR) {
lineLen -= 1;
} else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) {
oldLen -= 1;
}
lineResult = new LineResult(true,
concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen));
setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1)));
oldBuffer = new byte[0];
if (i + 1 < buffer.length) {
bufferPos = i + 1;
} else {
bufferPos = NEED_READING;
}
break;
}
}
if (lineResult != null) {
break;
}
// NEW_LINE not showed up at the end of the buffer
oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length,
buffer, bufferPos, buffer.length - bufferPos);
bufferPos = NEED_READING;
}
return lineResult;
}
readFile()按BUFFER_SIZE(默認(rèn)8KB)作為緩沖讀取日志文件數(shù)據(jù)
private void readFile() throws IOException {
if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) {
buffer = new byte[(int) (raf.length() - raf.getFilePointer())];
} else {
buffer = new byte[BUFFER_SIZE];
}
raf.read(buffer, 0, buffer.length);
bufferPos = 0;
}