作者:freewind
比原項(xiàng)目倉庫:https://github.com/Bytom/bytom
在前一篇中,我們說到,當(dāng)比原向其它節(jié)點(diǎn)請求區(qū)塊數(shù)據(jù)時(shí),BlockKeeper
會(huì)發(fā)送一個(gè)BlockRequestMessage
把需要的區(qū)塊height
告訴對方,并把該信息對應(yīng)的二進(jìn)制數(shù)據(jù)放入ProtocolReactor
對應(yīng)的sendQueue
通道中,等待發(fā)送。而具體的發(fā)送細(xì)節(jié),由于邏輯比較復(fù)雜,所以在前一篇中并未詳解,放到本篇中。
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、微信小程序開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了盈江免費(fèi)建站歡迎大家使用!
由于sendQueue
是一個(gè)通道,數(shù)據(jù)放進(jìn)去后,到底是由誰在什么情況下取走并發(fā)送,BlockKeeper
這邊是不知道的。經(jīng)過我們在代碼中搜索,發(fā)現(xiàn)只有一個(gè)類型會(huì)直接監(jiān)視sendQueue
中的數(shù)據(jù),它就是前文出現(xiàn)的MConnection
。MConnection
的對象在它的OnStart
方法中,會(huì)監(jiān)視sendQueue
中的數(shù)據(jù),然后,等發(fā)現(xiàn)數(shù)據(jù)時(shí),會(huì)將之取走并放入一個(gè)叫sending
的通道里。
事情變得有點(diǎn)復(fù)雜了:
MConnection
對應(yīng)了一個(gè)與peer的連接,而比原節(jié)點(diǎn)之間建立連接的情況又有多種:比如主動(dòng)連接別的節(jié)點(diǎn),或者別的節(jié)點(diǎn)主動(dòng)連上我sending
之后,我們還需要知道又是誰在什么情況下會(huì)監(jiān)視sending
,取走它里面的數(shù)據(jù)sending
中的數(shù)據(jù)被取走后,又是如何被發(fā)送到其它節(jié)點(diǎn)的呢?還是像以前一樣,遇到復(fù)雜的問題,我們先通過“相互獨(dú)立,完全窮盡”的原則,把它分解成一個(gè)個(gè)小問題,然后依次解決。
那么首先我們需要弄清楚的是:
MConnection
的對象并調(diào)用其OnStart
方法?(從而我們知道sendQueue
中的數(shù)據(jù)是如何被監(jiān)視的)
經(jīng)過分析,我們發(fā)現(xiàn)MConnection
的啟動(dòng),只出現(xiàn)在一個(gè)地方,即Peer
的OnStart
方法中。那么就這個(gè)問題就變成了:比原在什么情況下,會(huì)創(chuàng)建Peer
的對象并調(diào)用其OnStart
方法?
再經(jīng)過一番折騰,終于確定,在比原中,在下列4種情況Peer.OnStart
方法最終會(huì)被調(diào)用:
addrbook.json
中保存的節(jié)點(diǎn)的時(shí)候PEXReactor
,并使用它自己的協(xié)議與當(dāng)前連接上的節(jié)點(diǎn)進(jìn)行通信的時(shí)候Switch.Connect2Switches
方法中(可忽略)第4種情況我們完全忽略。第3種情況中,由于PEXReactor
會(huì)使用類似于BitTorrent的文件分享協(xié)議與其它節(jié)點(diǎn)分享數(shù)據(jù),邏輯比較獨(dú)立,算是一種輔助作用,我們也暫不考慮。這樣我們就只需要分析前兩種情況了。
MConnection.OnStart
方法的?首先我們快速走到SyncManager.Start
方法:
cmd/bytomd/main.go#L54
func main() {
cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir()))
cmd.Execute()
}
cmd/bytomd/commands/run_node.go#L41
func runNode(cmd *cobra.Command, args []string) error {
n := node.NewNode(config)
if _, err := n.Start(); err != nil {
// ...
}
node/node.go#L169
func (n *Node) OnStart() error {
// ...
n.syncManager.Start()
// ...
}
netsync/handle.go#L141
func (sm *SyncManager) Start() {
go sm.netStart()
// ...
}
然后我們將進(jìn)入netStart()
方法。在這個(gè)方法中,比原將主動(dòng)連接其它節(jié)點(diǎn):
func (sm *SyncManager) netStart() error {
// ...
if sm.config.P2P.Seeds != "" {
// dial out
seeds := strings.Split(sm.config.P2P.Seeds, ",")
if err := sm.DialSeeds(seeds); err != nil {
return err
}
}
return nil
}
這里出現(xiàn)的sm.config.P2P.Seeds
,對應(yīng)的就是本地?cái)?shù)據(jù)目錄中config.toml
中的p2p.seeds
中的種子結(jié)點(diǎn)。
接著通過sm.DialSeeds
去主動(dòng)連接每個(gè)種子:
netsync/handle.go#L229-L231
func (sm *SyncManager) DialSeeds(seeds []string) error {
return sm.sw.DialSeeds(sm.addrBook, seeds)
}
p2p/switch.go#L311-L340
func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
// ...
for i := 0; i < len(perm)/2; i++ {
j := perm[i]
sw.dialSeed(netAddrs[j])
}
// ...
}
p2p/switch.go#L342-L349
func (sw *Switch) dialSeed(addr *NetAddress) {
peer, err := sw.DialPeerWithAddress(addr, false)
// ...
}
p2p/switch.go#L351-L392
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
// ...
peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
// ...
err = sw.AddPeer(peer)
// ...
}
先是通過newOutboundPeerWithConfig
創(chuàng)建了peer
,然后把它加入到sw
(即Switch
對象)中。
p2p/switch.go#L226-L275
func (sw *Switch) AddPeer(peer *Peer) error {
// ...
// Start peer
if sw.IsRunning() {
if err := sw.startInitPeer(peer); err != nil {
return err
}
}
// ...
}
在sw.startInitPeer
中,將會(huì)調(diào)用peer.Start
:
p2p/switch.go#L300-L308
func (sw *Switch) startInitPeer(peer *Peer) error {
peer.Start()
// ...
}
而peer.Start
對應(yīng)了Peer.OnStart
,最后就是:
p2p/peer.go#L207-L211
func (p *Peer) OnStart() error {
p.BaseService.OnStart()
_, err := p.mconn.Start()
return err
}
可以看到,在這里調(diào)用了mconn.Start
,終于找到了??偨Y(jié)一下就是:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.DialSeeds
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
那么,第一種主動(dòng)連接別的節(jié)點(diǎn)的情況就到這里分析完了。下面是第二種情況:
MConnection.OnStart
方法這一步的?比原節(jié)點(diǎn)啟動(dòng)后,會(huì)監(jiān)聽本地的p2p端口,等待別的節(jié)點(diǎn)連接上來。那么這個(gè)流程又是什么樣的呢?
由于比原節(jié)點(diǎn)的啟動(dòng)流程在目前的文章中已經(jīng)多次出現(xiàn),這里就不貼了,我們直接從Switch.OnStart
開始(它是在SyncManager
啟動(dòng)的時(shí)候啟動(dòng)的):
p2p/switch.go#L186-L185
func (sw *Switch) OnStart() error {
// ...
for _, peer := range sw.peers.List() {
sw.startInitPeer(peer)
}
// Start listeners
for _, listener := range sw.listeners {
go sw.listenerRoutine(listener)
}
// ...
}
這個(gè)方法經(jīng)過省略以后,還剩兩塊代碼,一塊是startInitPeer(...)
,一塊是sw.listenerRoutine(listener)
。
如果你剛才在讀前一節(jié)時(shí)留意了,就會(huì)發(fā)現(xiàn),startInitPeer(...)
方法馬上就會(huì)調(diào)用Peer.Start
。然而在這里需要說明的是,經(jīng)過我的分析,發(fā)現(xiàn)這塊代碼實(shí)際上沒有起到任何作用,因?yàn)樵诋?dāng)前這個(gè)時(shí)刻,sw.peers
總是空的,它里面還沒有來得及被其它的代碼添加進(jìn)peer。所以我覺得它可以刪掉,以免誤導(dǎo)讀者。(提了一個(gè)issue,參見#902)
第二塊代碼,listenerRoutine
,如果你還有印象的話,它就是用來監(jiān)聽本地p2p端口的,在前面“比原是如何監(jiān)聽p2p端口的”一文中有詳細(xì)的講解。
我們今天還是需要再挖掘一下它,看看它到底是怎么走到MConnection.OnStart
的:
p2p/switch.go#L498-L536
func (sw *Switch) listenerRoutine(l Listener) {
for {
inConn, ok := <-l.Connections()
// ...
err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
// ...
}
}
這里的l
就是監(jiān)聽本地p2p端口的Listener。通過一個(gè)for
循環(huán),拿到連接到該端口的節(jié)點(diǎn)的連接,生成新peer。
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
// ...
peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
// ...
if err = sw.AddPeer(peer); err != nil {
// ...
}
// ...
}
生成新的peer之后,調(diào)用了Switch
的AddPeer
方法。到了這里,就跟前一節(jié)一樣了,在AddPeer
中將調(diào)用sw.startInitPeer(peer)
,然后調(diào)用peer.Start()
,最后調(diào)用了MConnection.OnStart()
。由于代碼一模一樣,就不貼出來了。
總結(jié)一下,就是:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.OnStart
-> Switch.listenerRoutine
-> Switch.addPeerWithConnectionAndConfig
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
那么,第二種情況我們也分析完了。
不過到目前為止,我們只解決了這次問題中的第一個(gè)小問題,即:我們終于知道了比原代碼會(huì)在什么情況來啟動(dòng)一個(gè)MConnection
,從而監(jiān)視sendQueue
通道,把要發(fā)送的信息數(shù)據(jù),轉(zhuǎn)到了sending
通道中。
那么,我們進(jìn)入下一個(gè)小問題:
sending
之后,誰又會(huì)來取走它們呢?經(jīng)過分析之后,發(fā)現(xiàn)通道sendQueue
和sending
都屬于類型Channel
,只不過兩者作用不同。sendQueue
是用來存放待發(fā)送的完整的信息數(shù)據(jù),而sending
更底層一些,它持有的數(shù)據(jù)可能會(huì)被分成多個(gè)塊發(fā)送。如果只有sendQueue
一個(gè)通道,那么很難實(shí)現(xiàn)分塊的操作的。
而Channel
的發(fā)送是由MConnection
來調(diào)用的,幸運(yùn)的是,當(dāng)我們一直往回追溯下去,發(fā)現(xiàn)竟走到了MConnection.OnStart
這里。也就是說,我們在這個(gè)小問題中,研究的正好是前面兩個(gè)鏈條后面的部分:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.DialSeeds
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ???
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.OnStart
-> Switch.listenerRoutine
-> Switch.addPeerWithConnectionAndConfig
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ???
也就是上面的???
部分。
那么我們就直接從MConnection.OnStart
開始:
p2p/connection.go#L152-L159
func (c *MConnection) OnStart() error {
// ...
go c.sendRoutine()
// ...
}
c.sendRoutine()
方法就是我們需要的。當(dāng)MConnection
啟動(dòng)以后,就會(huì)開始進(jìn)行發(fā)送操作(等待數(shù)據(jù)到來)。它的代碼如下:
p2p/connection.go#L289-L343
func (c *MConnection) sendRoutine() {
// ...
case <-c.send:
// Send some msgPackets
eof := c.sendSomeMsgPackets()
if !eof {
// Keep sendRoutine awake.
select {
case c.send <- struct{}{}:
default:
}
}
}
// ...
}
這個(gè)方法本來很長,只是我們省略掉了很多無關(guān)的代碼。里面的c.sendSomeMsgPackets()
就是我們要找的,但是,我們突然發(fā)現(xiàn),怎么又出來了一個(gè)c.send
通道?它又有什么用?而且看起來好像只有當(dāng)這個(gè)通道里有東西的時(shí)候,我們才會(huì)去調(diào)用c.sendSomeMsgPackets()
,似乎像是一個(gè)鈴鐺一樣用來提醒我們。
那么c.send
什么時(shí)候會(huì)有東西呢?檢查了代碼之后,發(fā)現(xiàn)在以下3個(gè)地方:
p2p/connection.go#L206-L239
func (c *MConnection) Send(chID byte, msg interface{}) bool {
// ...
success := channel.sendBytes(wire.BinaryBytes(msg))
if success {
// Wake up sendRoutine if necessary
select {
case c.send <- struct{}{}:
// ..
}
p2p/connection.go#L243-L271
func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
// ...
ok = channel.trySendBytes(wire.BinaryBytes(msg))
if ok {
// Wake up sendRoutine if necessary
select {
case c.send <- struct{}{}:
// ...
}
p2p/connection.go#L289-L343
func (c *MConnection) sendRoutine() {
// ....
case <-c.send:
// Send some msgPackets
eof := c.sendSomeMsgPackets()
if !eof {
// Keep sendRoutine awake.
select {
case c.send <- struct{}{}:
// ...
}
如果我們對前一篇文章還有印象,就會(huì)記得channel.trySendBytes
是在我們想給對方節(jié)點(diǎn)發(fā)信息時(shí)調(diào)用的,調(diào)用完以后,它會(huì)把信息對應(yīng)的二進(jìn)制數(shù)據(jù)放入到channel.sendQueue
通道(所以才有了本文)。channel.sendBytes
我們目前雖然還沒用到,但是它也應(yīng)該是類似的。在它們兩個(gè)調(diào)用完之后,它們都會(huì)向c.send
通道里放入一個(gè)數(shù)據(jù),用來通知Channel
有數(shù)據(jù)可以發(fā)送了。
而第三個(gè)sendRoutine()
就是我們剛剛走到的地方。當(dāng)我們調(diào)用c.sendSomeMsgPackets()
發(fā)送了sending
中的一部分之后,如果還有剩余的,則繼續(xù)向c.send
放個(gè)數(shù)據(jù),提醒可以繼續(xù)發(fā)送。
那到目前為止,發(fā)送數(shù)據(jù)涉及到的Channel就有三個(gè)了,分別是sendQueue
、sending
和send
。之所以這么復(fù)雜,根本原因就是想把數(shù)據(jù)分塊發(fā)送。
為什么要分塊發(fā)送呢?這是因?yàn)楸仍M芸刂瓢l(fā)送速率,讓節(jié)點(diǎn)之間的網(wǎng)速能保持在一個(gè)合理的水平。如果不限制的話,一下子發(fā)出大量的數(shù)據(jù),一是可能會(huì)讓接收者來不及處理,二是有可能會(huì)被惡意節(jié)點(diǎn)利用,請求大量區(qū)塊數(shù)據(jù)把帶寬占滿。
擔(dān)心sendQueue
、sending
和send
這三個(gè)通道不太好理解,我想到了一個(gè)“燒鴨店”的比喻,來理解它們:
sendQueue
就像是用來掛烤好的燒鴨的勾子,可以有多個(gè)(但對于比原來說,默認(rèn)只有一個(gè),因?yàn)?code>sendQueue的容量默認(rèn)為1
),當(dāng)有燒鴨烤好以后,就掛在勾子上;sending
是砧板,可以把燒鴨從sendQueue
勾子上取下來一只,放在上面切成塊,等待裝盤,一只燒鴨可能可以裝成好幾盤;send
是鈴鐺,當(dāng)有人點(diǎn)單后,服務(wù)員就會(huì)按一下鈴鐺,廚師就從sending
砧板上拿幾塊燒鴨放在小盤中放在出餐口。由于廚師非常忙,每次切出一盤后都可能會(huì)去做別的事情,而忘了sending
砧板上還有燒鴨沒裝盤,所以為了防止自己忘記,他每切出一盤之后,都會(huì)看一眼sending
砧板,如果還有肉,就會(huì)按一下鈴鐺提醒自己繼續(xù)裝盤。好了,理解了send
后,我們就可以回到主線,繼續(xù)看c.sendSomeMsgPackets()
的代碼了:
p2p/connection.go#L347-L360
func (c *MConnection) sendSomeMsgPackets() bool {
// Block until .sendMonitor says we can write.
// Once we're ready we send more than we asked for,
// but amortized it should even out.
c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
// Now send some msgPackets.
for i := 0; i < numBatchMsgPackets; i++ {
if c.sendMsgPacket() {
return true
}
}
return false
}
c.sendMonitor.Limit
的作用是限制發(fā)送速率,其中maxMsgPacketTotalSize
即每個(gè)packet的最大長度為常量10240
,第二個(gè)參數(shù)是預(yù)先指定的發(fā)送速率,默認(rèn)值為500KB/s
,第三個(gè)參數(shù)是說,當(dāng)實(shí)際速度過大時(shí),是否暫停發(fā)送,直到變得正常。
經(jīng)過限速的調(diào)整后,后面一段就可以正常發(fā)送數(shù)據(jù)了,其中的c.sendMsgPacket
是我們繼續(xù)要看的方法:
p2p/connection.go#L363-L398
func (c *MConnection) sendMsgPacket() bool {
// ...
n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
// ..
c.sendMonitor.Update(int(n))
// ...
return false
}
這個(gè)方法最前面我省略了一大段代碼,其作用是檢查多個(gè)channel,結(jié)合它們的優(yōu)先級和已經(jīng)發(fā)的數(shù)據(jù)量,找到當(dāng)前最需要發(fā)送數(shù)據(jù)的那個(gè)channel,記為leastChannel
。
然后就是調(diào)用leastChannel.writeMsgPacketTo(c.bufWriter)
,把當(dāng)前要發(fā)送的一塊數(shù)據(jù),寫到bufWriter
中。這個(gè)bufWriter
就是真正與連接對象綁定的一個(gè)緩存區(qū),寫入到它里面的數(shù)據(jù),會(huì)被Go發(fā)送出去。它的定義是在創(chuàng)建MConnection
的地方:
p2p/connection.go#L114-L118
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
mconn := &MConnection{
conn: conn,
bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
其中minReadBufferSize
為1024
,minWriteBufferSize
為65536
。
數(shù)據(jù)寫到bufWriter
以后,我們就不需要關(guān)心了,交給Go來操作了。
在leastChannel.writeMsgPacketTo(c.bufWriter)
調(diào)用完以后,后面會(huì)更新c.sendMonitor
,這樣它才能繼續(xù)正確的限速。
這時(shí)我們已經(jīng)知道數(shù)據(jù)是怎么發(fā)出去的了,但是我們還沒有找到是誰在監(jiān)視sending
里的數(shù)據(jù),那讓我們繼續(xù)看leastChannel.writeMsgPacketTo
:
p2p/connection.go#L655-L663
func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
packet := ch.nextMsgPacket()
wire.WriteByte(packetTypeMsg, w, &n, &err)
wire.WriteBinary(packet, w, &n, &err)
if err == nil {
ch.recentlySent += int64(n)
}
return
}
其中的ch.nextMsgPacket()
是取出下一個(gè)要發(fā)送的數(shù)據(jù)塊,那么是從哪里取出呢?是從sending
嗎?
其后的代碼是把數(shù)據(jù)塊對象變成二進(jìn)制,放入到前面的bufWriter
中發(fā)送。
繼續(xù)ch.nextMsgPacket()
:
p2p/connection.go#L638-L651
func (ch *Channel) nextMsgPacket() msgPacket {
packet := msgPacket{}
packet.ChannelID = byte(ch.id)
packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
if len(ch.sending) <= maxMsgPacketPayloadSize {
packet.EOF = byte(0x01)
ch.sending = nil
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
} else {
packet.EOF = byte(0x00)
ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
}
return packet
}
終于看到sending
了。從這里可以看出,sending
的確是放著很多塊鴨肉的砧板,而packet
就是一個(gè)小盤,所以需要從先sending
中拿出不超過指定長度的數(shù)據(jù)放到packet
中,然后判斷sending
里還有沒有剩下的。如果有,則packet
的EOF
值為0x00
,否則為0x01
,這樣調(diào)用者就知道數(shù)據(jù)有沒有發(fā)完,還需不需要去按那個(gè)叫send
的鈴。
那么到這里為止,我們就知道原來還是Channel自己在關(guān)注sending
,并且為了限制發(fā)送速度,需要把它切成一個(gè)個(gè)小塊。
最后就我們的第三個(gè)小問題了,其實(shí)我們剛才在第二問里已經(jīng)弄清楚了。
sending
中的數(shù)據(jù)被取走后,又是如何被發(fā)送到其它節(jié)點(diǎn)的呢?答案就是,sending
中的數(shù)據(jù)被分成一塊塊取出來后,會(huì)放入到bufWriter
中,就直接被Go的net.Conn
對象發(fā)送出去了。到這一層面,就不需要我們再繼續(xù)深入了。
由于本篇中涉及的方法調(diào)用比較多,可能看完都亂了,所以在最后,我們前面調(diào)用鏈補(bǔ)充完整,放在最后:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.DialSeeds
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ...
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.OnStart
-> Switch.listenerRoutine
-> Switch.addPeerWithConnectionAndConfig
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ...
然后是:
MConnection.sendRoutine
-> MConnection.send
-> MConnection.sendSomeMsgPackets
-> MConnection.sendMsgPacket
-> MConnection.writeMsgPacketTo
-> MConnection.nextMsgPacket
-> MConnection.sending
到了最后,我的感覺就是,一個(gè)復(fù)雜問題最開始看起來很可怕,但是一旦把它分解成小問題之后,每次只關(guān)注一個(gè),各個(gè)擊破,好像就沒那么復(fù)雜了。