小編給大家分享一下如何實(shí)現(xiàn)ceph SimpleMessenger模塊消息的接收,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
創(chuàng)新互聯(lián)自2013年起,公司以成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、系統(tǒng)開發(fā)、網(wǎng)絡(luò)推廣、文化傳媒、企業(yè)宣傳、平面廣告設(shè)計(jì)等為主要業(yè)務(wù),適用行業(yè)近百種。服務(wù)企業(yè)客戶千余家,涉及國內(nèi)多個(gè)省份客戶。擁有多年網(wǎng)站建設(shè)開發(fā)經(jīng)驗(yàn)。為企業(yè)提供專業(yè)的網(wǎng)站建設(shè)、創(chuàng)意設(shè)計(jì)、宣傳推廣等服務(wù)。 通過專業(yè)的設(shè)計(jì)、獨(dú)特的風(fēng)格,為不同客戶提供各種風(fēng)格的特色服務(wù)。
OSD服務(wù)端消息的接收起始于OSD::init()中的messenger::add_dispatcher_head(Dispatcher *d)函數(shù)
|- 358 void add_dispatcher_head(Dispatcher *d) { || 359 bool first = dispatchers.empty(); || 360 dispatchers.push_front(d); || 361 if (d->ms_can_fast_dispatch_any()) || 362 fast_dispatchers.push_front(d); || 363 if (first) || 364 ready(); //如果dispatcher list空,啟動(dòng)SimpleMessenger::ready,不為空證明SimpleMessenger已經(jīng)啟動(dòng)了 || 365 }
在SimpleMessenger::ready()中,啟動(dòng)DispatchQueue等待mqueue,如果綁定了端口就啟動(dòng) accepter接收線程
76 void SimpleMessenger::ready() - 77 { | 78 ldout(cct,10) << "ready " << get_myaddr() << dendl; | 79 dispatch_queue.start(); //啟動(dòng)DispatchQueue,等待mqueue | 80 | 81 lock.Lock(); | 82 if (did_bind) | 83 accepter.start(); | 84 lock.Unlock(); | 85 }
Accepter是Thread的繼承類,Accepter::start()最終調(diào)用Accepter::entry(),在entry中 accept并把接收到的sd加入到Pipe類中
void *Accepter::entry() { ... struct pollfd pfd; pfd.fd = listen_sd; pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; while (!done) { int r = poll(&pfd, 1, -1); if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) break; // accept entity_addr_t addr; socklen_t slen = sizeof(addr.ss_addr()); int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); if (sd >= 0) { errors = 0; ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl; msgr->add_accept_pipe(sd); //注冊一個(gè)pipe,啟動(dòng)讀線程,從該sd中讀取數(shù)據(jù) } else { ldout(msgr->cct,0) << "accepter no incoming connection? sd = " << sd << " errno " << errno << " " << cpp_strerror(errno) << dendl; if (++errors > 4) break; } } ... return 0;
在SimpleMessenger::add_accept_pipe(int sd)中,申請一個(gè)Pipe類并把sd加入到Pipe中,開始Pipe::start_reader()
340 Pipe *SimpleMessenger::add_accept_pipe(int sd) - 341 { | 342 lock.Lock(); | 343 Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL); | 344 p->sd = sd; | 345 p->pipe_lock.Lock(); | 346 p->start_reader(); | 347 p->pipe_lock.Unlock(); | 348 pipes.insert(p); | 349 accepting_pipes.insert(p); | 350 lock.Unlock(); | 351 return p; | 352 }
Pipe類內(nèi)部有一個(gè)Reader和Writer線程類,Pipe::start_reader()啟動(dòng)Pipe::Reader::entry(),最終啟動(dòng)Pipe::reader函數(shù)
134 void Pipe::start_reader() - 135 { | 136 assert(pipe_lock.is_locked()); | 137 assert(!reader_running); |- 138 if (reader_needs_join) { || 139 reader_thread.join(); || 140 reader_needs_join = false; || 141 } | 142 reader_running = true; | 143 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes); | 144 }
|- 48 class Reader : public Thread { || 49 Pipe *pipe; || 50 public: || 51 explicit Reader(Pipe *p) : pipe(p) {} || 52 void *entry() { pipe->reader(); return 0; } || 53 } reader_thread;
在Pipe::reader函數(shù)中根據(jù)tag接收不同類型的消息,如果是CEPH_MSGR_TAG_MSG類型消息調(diào)用read_message接收消息,并把消息加入到mqueue中
void Pipe::reader() { pipe_lock.Lock(); if (state == STATE_ACCEPTING) { accept(); //第一次進(jìn)入此函數(shù)處理 assert(pipe_lock.is_locked()); } // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { assert(pipe_lock.is_locked()); ...... ...... else if (tag == CEPH_MSGR_TAG_MSG) { ldout(msgr->cct,20) << "reader got MSG" << dendl; Message *m = 0; int r = read_message(&m, auth_handler.get()); pipe_lock.Lock(); if (!m) { if (r < 0) fault(true); continue; } ...... ...... ...... // note last received message. in_seq = m->get_seq(); cond.Signal(); // wake up writer, to ack this ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; in_q->fast_preprocess(m); //mds 、mon不會(huì)進(jìn)入此函數(shù),預(yù)處理 if (delay_thread) { utime_t release; if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { release = m->get_recv_stamp(); release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; } delay_thread->queue(release, m); } else { if (in_q->can_fast_dispatch(m)) { reader_dispatching = true; pipe_lock.Unlock(); in_q->fast_dispatch(m); pipe_lock.Lock(); reader_dispatching = false; if (state == STATE_CLOSED || notify_on_dispatch_done) { // there might be somebody waiting notify_on_dispatch_done = false; cond.Signal(); } } else { //mds進(jìn)入此else in_q->enqueue(m, m->get_priority(), conn_id); //把接收到的messenger加入到mqueue中 } } } ...... ...... } // reap? reader_running = false; reader_needs_join = true; unlock_maybe_reap(); ldout(msgr->cct,10) << "reader done" << dendl; }
在Pipe::DispatchQueue::enqueue函數(shù)中加入到mqueue中
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) { Mutex::Locker l(lock); ldout(cct,20) << "queue " << m << " prio " << priority << dendl; add_arrival(m); if (priority >= CEPH_MSG_PRIO_LOW) { mqueue.enqueue_strict( id, priority, QueueItem(m)); } else { mqueue.enqueue( id, priority, m->get_cost(), QueueItem(m)); } cond.Signal(); //喚醒dispatch_queue.start() 啟動(dòng)的dispatchThread,進(jìn)入entry進(jìn)行處理 }
看完了這篇文章,相信你對“如何實(shí)現(xiàn)ceph SimpleMessenger模塊消息的接收”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!