這篇文章主要介紹MDLog的示例分析,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
創(chuàng)新互聯(lián)公司一直在為企業(yè)提供服務(wù),多年的磨煉,使我們?cè)趧?chuàng)意設(shè)計(jì),成都全網(wǎng)營(yíng)銷到技術(shù)研發(fā)擁有了開發(fā)經(jīng)驗(yàn)。我們擅長(zhǎng)傾聽企業(yè)需求,挖掘用戶對(duì)產(chǎn)品需求服務(wù)價(jià)值,為企業(yè)制作有用的創(chuàng)意設(shè)計(jì)體驗(yàn)。核心團(tuán)隊(duì)擁有超過(guò)十多年以上行業(yè)經(jīng)驗(yàn),涵蓋創(chuàng)意,策化,開發(fā)等專業(yè)領(lǐng)域,公司涉及領(lǐng)域有基礎(chǔ)互聯(lián)網(wǎng)服務(wù)成都溫江機(jī)房、App定制開發(fā)、手機(jī)移動(dòng)建站、網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)絡(luò)整合營(yíng)銷。
ReplayThread類:負(fù)責(zé)log的replay事件處理。
RecoveryThread類:負(fù)責(zé)log的recovery事件處理。
SubmitThread類:負(fù)責(zé)log的submit事件處理。
map
set
set
uint64_t event_seq; 記錄log event的當(dāng)前序列值
int expiring_events; 記錄expiring的log個(gè)數(shù)
int expired_events; 記錄expired的log個(gè)數(shù)
MDLog::write_head()
|__Journaler::write_head() 直接調(diào)用Journaler類對(duì)應(yīng)的函數(shù)進(jìn)行處理
MDLog::get_read_pos()
|__Journaler::get_read_pos()
MDLog::get_write_pos()
|__Journaler::get_write_pos()
MDLog::get_safe_pos()
|__Journaler::get_write_safe_pos()
MDLog::create()
|__創(chuàng)建C_GatherBuilder類對(duì)象
|__設(shè)置C_GatherBuilder類的finisher函數(shù)
|__確定inode的默認(rèn)journal號(hào),即:ino = MDS_INO_LOG_OFFSET+mds->get_nodeid()
|__創(chuàng)建Journaler類對(duì)象,該類對(duì)象寫入到metadata pool里
|__設(shè)置Journaler類對(duì)象的寫出錯(cuò)處理函數(shù)
|__設(shè)置Journaler類對(duì)象可寫
|__Journaler::create()
|__Journaler::write_head()
|__創(chuàng)建JournalPointer類對(duì)象,該類對(duì)象寫入到metadata pool里
|__設(shè)置JournalPointer的front值為inode號(hào)
|__設(shè)置JournalPointer的back為0
|__JournalPointer::save()
|__C_GatherBuilder::activate()
|__SubmitThread::create() 創(chuàng)建SubmitThread線程
MDLog::open()
|__RecoveryThread::set_completion() 設(shè)置RecoveryThread的completion回調(diào)函數(shù)
|__RecoveryThread::create() 創(chuàng)建RecoveryThread線程
|__SubmitThread::create() 創(chuàng)建SubmitThread線程
MDLog::reopen()
|__刪除Journaler類對(duì)象
|__RecoveryThread::join() 等待RecoveryThread結(jié)束
|__RecoveryThread::set_completion() 設(shè)置RecoveryThrad結(jié)束時(shí)的回調(diào)函數(shù)MDLog::append()
|__RecoveryThread::create() 創(chuàng)建RecoveryThread線程
MDLog::append()
|__Journaler::set_read_pos(Journaler::get_write_pos()) 設(shè)置Journaler類對(duì)象的read/write position指向同一個(gè)地方
|__Journaler::set_expire_pos(Journaler::get_write_pos()) 設(shè)置Journaler類對(duì)象的expire position為write position
|__Journaler::set_writable() 設(shè)置Journaler為可寫
MDLog::_start_entry()
|__設(shè)置cur_event=e 設(shè)置cur_event為當(dāng)前待處理的LogEvent
|__event_seq++ 增加Event的序列號(hào)
|__從LogEvent類對(duì)象中得到EMetaBlob對(duì)象,即:LogEvent::get_metablob()
|__設(shè)置EMetaBlob的event_seq為當(dāng)前的event_seq,last_subtree_map為當(dāng)前最后一個(gè)segment
MDLog::cancel_entry()
|__設(shè)置cur_event=NULL 清空cur_event
|__刪除LogEvent參數(shù)類對(duì)象
MDLog::_submit_entry()
|__設(shè)置cur_event = NULL
|__從segments集合中得到最后一個(gè)LogSegment類對(duì)象
|__增加LogSement類對(duì)象的event個(gè)數(shù),即:num_events++
|__設(shè)置LogEvent::_segment=LogSement類對(duì)象
|__LogEvent::update_segment() 更新LogEvent的segment字段
|__以LogEvent作為參數(shù),更新pending_events集合
|__num_events++ 增加events的個(gè)數(shù)
|__unflushed++ 增加unflushed的個(gè)數(shù)
|__若LogEvent的type是EVENT_SUBTREEMAP或EVENT_IMPORTFINISH且mds處于resolve狀態(tài)
|__直接退出
|__若LogSegment::end/Journaler::get_layout_period()!=LogSegment::offset/Journaler::get_layout_period()
|___start_new_segment()
|__若LogEvent的類型是EVENT_SUBTREEMAP_TEST
|__創(chuàng)建LogEvent類對(duì)象
|__設(shè)置LogEvent類對(duì)象的類型是EVENT_SUBTREEMAP_TEST
|___submit_entry()
MDLog::_submit_thread()
|__判斷當(dāng)前MDS進(jìn)程是否stopping,若是則直接退出
|__判斷mds_log_pause是否為真,若為真則調(diào)用submit_cond.Wait()
|__判斷pending_events集合是否為空,若為空則調(diào)用submit_cond.Wait()
|__從pending_event中得到PendingEvent類對(duì)象
|__若PendingEvent中包含有效的LogEvent
|__得到LogEvent和LogSegment
|__encode LogEvent到bufferlist中
|__設(shè)置LogEvent::set_start_off(write_pos) 設(shè)置LogEvent的start offset值為write position值
|__Journaler::append_entry(bufferlist) 將encoded LogEvent寫入到Journaler中
|__更新LogSegment::end為最新的write position
|__Journaler::wait_for_flush() 等待Journaler flush LogEvent到磁盤上,完成flush后調(diào)用回調(diào)函數(shù)設(shè)置MDLog::safe_pos=最新的write position
|__Journaler::flush()
|__刪除LogEvent類對(duì)象
|__若PendingEvent中沒(méi)有包含有效的LogEvent
|__Journaler::wait_for_flush() 等待Journaler flush LogEvent到磁盤上,完成flush后調(diào)用回調(diào)函數(shù)設(shè)置MDLog::safe_pos=Journaler::get_write_pos()
|__Journaler::flush()
MDLog::wait_for_safe() 若目前仍然還有pending的event,則不做wait_for_flush
|__判斷pending_events是否不為空
|__向pending_events集合的末尾添加一個(gè)NULL的PendingEvent類對(duì)象
|__設(shè)置no_pending=false
|__submit_cond.Signal()
|__判斷no_pending==true
|__Journaler::wait_for_flush()
MDLog::flush() 若目前仍然還有pending的event,則不做flush
|__判斷pending_events集合不為空
|__向pending_event集合末尾插入一個(gè)NULL的PendingEvent類對(duì)象
|__設(shè)置do_flush=false
|__判斷do_flush==true
|__Journaler::flush()
MDLog::shutdown()
|__判斷SubmitThread是否正在運(yùn)行
|__SubmitThread::join() 等待SubmitThread進(jìn)程停止
|__Journaler::shutdown() 調(diào)用Journaler的shutdown()函數(shù)
|__判斷ReplayThread是否正在運(yùn)行
|__ReplayThread::join() 等待ReplayThread進(jìn)程停止
|__判斷RecoveryThread是否正在運(yùn)行
|__RecoveryThread::join() 等待RecoveryThread進(jìn)程停止
MDLog::_prepare_new_segment()
|__得到seq的值為event_seq + 1
|__在segments[seq]處新創(chuàng)建一個(gè)LogSegment類對(duì)象
|__MDCache::advance_stray()
MDLog::_journal_segment_subtree_map()
|__MDCache::create_subtree_map() 從MDCache中得到一個(gè)ESubtreeMap
|__設(shè)置改ESubtreeMap的event_seq值為segment的最后一個(gè)元素
|___submit_entry()
MDLog::_start_new_segment()
|___prepare_new_segment()
|___journal_segment_subtree_map()
MDLog::trim()
|__根據(jù)配置文件得到max_segments和max_events
|__遍歷segments集合
|__若該LogSegment在pending_events集合中,則直接退出,不能進(jìn)行trim操作
|__若該LogSegment在expiring_segments或expired_segments集合中,則遍歷下一個(gè)
|__將該LogSegment插入到expiring_segments集合中
|__try_expire()
|___trim_expired_segments()
MDLog::trim_all() 處理過(guò)程與MDLog::trim()類似,只不過(guò)是遍歷所有的segments并沒(méi)有trim數(shù)量上的限制
MDLog::try_expire()
|__從expiring_segments集合中刪除指定的LogSegment
|___expired()
|__將該LogSegment插入到expired_segments集合中
MDLog::_trim_expired_segments()
|__遍歷segments集合
|__判斷LogSegment是否在expired_segments集合中,若不在則停止遍歷
|__將LogSegment從expired_segments集合中刪除
|__將LogSegment從segments集合中刪除
|__刪除LogSegment
|__設(shè)置trim=true
|__若trim==true
|__Journaler::write_head(0)
MDLog::_maybe_expired()
|__try_expire()
MDLog::replay()
|__判斷Journaler的read pos是否和write pos一致
|__不需要replay,直接返回
|__waitfor_replay.push_back(c) 添加replay waiter
|__ReplayThread::create() 創(chuàng)建一個(gè)新的ReplayThread
MDLog::_replay_thread() 執(zhí)行replay操作的獨(dú)立線程
|__判斷Journaler是否還有未讀數(shù)據(jù)
|__Journaler::wait_for_readable() 等待Journaler可讀并且沒(méi)有未讀的數(shù)據(jù)
|__判斷Journaler是否有出錯(cuò)信息
|__處理Journaler的出錯(cuò)信息
|__若Journaler不可讀并且Journaler的read pos==write pos
|__直接退出
|__得到Journaler的read position
|__Journaler::try_read_entry() 從Journaler類對(duì)象中讀取數(shù)據(jù)
|__得到LogEvent類對(duì)象
|__設(shè)置LogEvent的start offset為read position
|__LogEvent的類型是EVENT_SUBTREEMAP或EVENT_RESETJOURNAL
|__在segments集合中創(chuàng)建一個(gè)新的LogSegment類對(duì)象
|__若segments集合不為空
|__設(shè)置LogEvent::_segment=get_current_segment()
|__設(shè)置LogEvent::_segment->end = Journaler::get_read_pos()
|__LogEvent::replay()
|__設(shè)置safe_pos = Journaler::get_write_safe_pos()
MDLog::_recovery_thread()
|__創(chuàng)建JournalPointer類對(duì)象
|__JournalPointer::load(mds->objecter) 從JournalPointer處load出數(shù)據(jù)
|__若JournalPointer.back不為空,則說(shuō)明有寫Journal未完成的情況
|__根據(jù)JournalPointer.back創(chuàng)建Journaler類對(duì)象
|__Journaler::recover() 調(diào)用Journaler類對(duì)象的recover()函數(shù)進(jìn)行recover操作
|__從JournalPointer.front創(chuàng)建Journaler類對(duì)象
|__Journaler::recover() 調(diào)用Journaler類對(duì)象的recover()函數(shù)進(jìn)行recover操作
|__若MDS處于standby replay模式或者stream_format() >= mds_journal_format
|__Journaler::set_write_error_handler() 設(shè)置Journaler的write error handler
|___reformat_journal()
MDLog::_reformat_journal()
|__根據(jù)參數(shù)JournalPointer的front值來(lái)確定JournalPointer的back值
|__根據(jù)參數(shù)JournalPointer的back,創(chuàng)建一個(gè)新的Journaler
|__Journaler::set_writeable()
|__Journaler::create()
|__Journaler::write_head()
|__從參數(shù)的old_journaler處讀取數(shù)據(jù)到bufferlist
|__Journaler::append_entry(bufferlist) 將old_journaler讀取到的數(shù)據(jù)寫入到新的Journaler中
|__Journaler::flush()
補(bǔ)充一下針對(duì)MDLog的理解:
MDLog::create()核心處理流程如下:
1、得到Journaler對(duì)應(yīng)的inode號(hào),即:MDS_INO_LOG_OFFSET+mds->get_nodeid()
2、創(chuàng)建Journaler類對(duì)象(Journaler寫入到metadata pool中)
3、創(chuàng)建JournalPointer類對(duì)象(以mds->get_nodeid()和metadata pool為參數(shù))
4、設(shè)置JournalPointer類對(duì)象的front = inode,back = 0
5、保存JournalPointer類對(duì)象,即:jp->save(mds->objecter, gather.new_sub())
6、創(chuàng)建SubmitThread線程類,即:submit_thread.create(),之后執(zhí)行SubmitThread類的entry()函數(shù),即:_submit_thread()
MDLog::_submit_thread()主要處理流程如下:
1、遍歷pending_event數(shù)組,若數(shù)組為空或數(shù)組中的PendingEvent為空則重新遍歷
2、從pending_event數(shù)組中得到PendingEvent類對(duì)象
3、從PendingEvent類對(duì)象中得到LogEvent,之后從LogEvent類對(duì)象中得到LogSegment
4、對(duì)于LogSegment,序列化其header信息。序列化的header信息如下:
EVENT_NEW_ENCODING
1
_type
mdsmap->get_up_features()
5、得到當(dāng)前Journaler類對(duì)象寫入日志的位置,即:journaler->get_write_pos()
6、將序列化的header信息寫入到j(luò)ournaler,即:journaler->append_entry(bl)
7、flush journaler到磁盤,即:journaler->wait_for_flush()
MDLog::_recovery_thread()
1、根據(jù)mds->get_nodeid()以及metadata pool得到JournalPointer類對(duì)象
2、讀取JournalPointer的object,即:jp.load(mds->objecter)
3、若JournalPointer沒(méi)有object,則創(chuàng)建一個(gè)JournalPointer的front inode號(hào)且保存JournalPointer(jp.save(mds->objecter))
4、若jp.back不為空,則說(shuō)明之前日志回寫的時(shí)候有錯(cuò)誤出現(xiàn),因此需要?jiǎng)h除jp.back對(duì)應(yīng)的日志
|__根據(jù)jp.back創(chuàng)建Journaler類對(duì)象
|__執(zhí)行Journaler::recover()
|__執(zhí)行Journaler::erase()
|__更新JournalPointer
|__設(shè)置jp.back=0
|__調(diào)用jp.save(mds->objecter)
5、根據(jù)jp.front創(chuàng)建Journaler類對(duì)象
6、執(zhí)行Journaler::recover()
7、執(zhí)行_reformat_journal()
MDLog::_reformat_journal() 將日志信息寫入到JournalPointer的back處,若寫入成功則設(shè)置JournalPointer的front=back以及back=0
1、得到JournalPointer的back值
2、保存JournalPointer類對(duì)象,即:jp.save()
3、根據(jù)jp.back創(chuàng)建Journaler類對(duì)象
4、從老的日志中讀取日志信息,即:old_journal->try_read_entry(bl)
5、將老的日志信息寫入到新的日志中,即:new_journal->append_entry(bl)
6、刷新日志到磁盤,即:new_journal->flush()
7、調(diào)換JournalPointer的front和back值且保存JournalPointer,即:jp.save()
8、刪除老的日志,即:old_journal->erase()
9、更新JournalPointer,即:jp.back=0且jp.save()
MDLog::_start_entry()
1、設(shè)置cur_event為當(dāng)前待處理的entry
2、遞增event_seq的值
3、更新LogEvent的event_seq值和last_subtree_map值
MDLog::_submit_entry()
1、清空cur_event值
2、從segments數(shù)組的末尾得到LogSegment
3、設(shè)置LogEvent的_segment為segments數(shù)組中的LogSegment
4、更新pending_events數(shù)組,即:pending_events[ls->seq].push_back(PendingEvent(le, c))
5、更新num_events值和unflushed值
MDLog::_prepare_new_segment()
1、根據(jù)event_seq值,得到seq值,即:seq = event_seq + 1
2、更新segments數(shù)組,即:segments[seq] = new LogSegment(seq)
MDLog::trim()
MDLog::trim_all() 這兩個(gè)函數(shù)用于從segments數(shù)組中trim掉滿足條件的LogSegment。這里使用了兩個(gè)數(shù)組expiring_segments和expired_segments保存trim過(guò)程中的LogSegment。
MDLog::replay() 當(dāng)journaler的read_pos和write_pos不一致時(shí),需要進(jìn)行replay操作
1、更新waitfor_replay數(shù)組
2、設(shè)置already_replayed=true
3、創(chuàng)建ReplayThread線程類,之后執(zhí)行ReplayThread.entry()函數(shù),即:執(zhí)行_replay_thread()
MDLog::_replay_thread()
1、從Journaler中讀取日志內(nèi)容,即:journaler->try_read_entry(bl)
2、從日志中解析出LogEvent,即:LogEvent::decode(bl)
3、更新segments數(shù)組,即:segments[event_seq] = new LogSegment(event_seq, pos)
4、更新LogEvent的_segment
5、執(zhí)行LogEvent的replay()操作
以上是“MDLog的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!