消息处理上篇-消息线程分析
P2P 网络的建立是在系统启动的第 12 步,最后时刻调用 CConnman::Start
方法开始的。
本部分内容在 net.cpp
、net_processing.cpp
等文件中。
终于,来到了我们非常非常关心比特币消息处理,通过比特币消息处理,我们会理解比特币的协义,理解比特币是如何同步区块,如何发送交易,从而建立起理解比特币的至关重要一步。
本部分内容是如此的重要,也是相当的长,所以我们分上下两部分来介绍具体的消息处理。
上篇主要以消息处理线程的分析为主,下篇以具体的比特币消息即比特币协义分析为主。
下面我们来看消息处理线程相关的代码。
ThreadMessageHandler
处理所有消息的线程。主体是一个 while
循环,退出条件为 flagInterruptMsgProc
为假,循环体如下:
生成一个对等节点容器对象
vNodesCopy
,类型为CNode*
。然后,设置其值为vNodes
。后者,代表当前节点保存的所有对等节点信息。遍历所有的
vNodesCopy
节点,调用当前节点的AddRef
方法,把对等节点的nRefCount
属性加1。遍历所有的
vNodesCopy
节点,进行如下处理:如果当前节点已断开连接,则处理下一个。
调用
PeerLogicValidation::ProcessMessages
方法,处理当前远程对等节点发送给本对等节点的消息。调用
PeerLogicValidation::SendMessages
方法,处理本对等节点发送给当前远程对等节点的消息。
遍历所有的
vNodesCopy
节点,调用当前节点的Release
方法,把对等节点的nRefCount
属性减1。
1、ProcessMessages
本方法主要处理对等节点接收到的相关消息,具体代码在 net_processing.cpp
文件中。
如果对等节点的已接收请求数据集合不为空,也就是保存别的对等节点请求数据的集合不为空,则调用
ProcessGetData
方法,处理别的对等获取数据的请求。vRecvGetData
属性是一个 inv 消息(CInv
)的双端队列。下面,我们来看
ProcessGetData
方法是怎样处理收到的消息。从对等节点的
vRecvGetData
集合中,取得其迭代器。std::deque::iterator it = pfrom->vRecvGetData.begin(); std::vector vNotFound;
生成一个
CNetMsgMaker
类型的对象msgMaker
。其nVersionIn
属性是对等节点已经发送的版本消息。遍历请求数据集合,如果请求数据的类型是交易或者见证隔离交易,进行如下的处理:
如果已经终止处理消息信号为真,则直接返回。如果当前节点处于暂停状态(缓冲区太慢而不能响应),则推出循环。
取得当前的
inv
消息。从mapRelay
集合中取得当前inv
消息。这个mapRelay
是一个 Map 集合,Key 是交易的哈希,Value 是指向交易的智能指针。如果查找的交易存在于集合中,则调用位于
net.cpp
文件中的CConnman::PushMessage
方法,发送交易;否则,如果节点最后一次MEMPOOL
请求存在,则从内存池中取得对应的交易信息,然后调用CConnman::PushMessage
方法,发送交易。如果没有达到请求数据集合尾部,且节点对象不是暂停状态,并且当前请求的
inv
消息类型是区块、过滤型区块、紧凑型区块、隔离见证型区块之一,则调用ProcessGetBlockData
方法处理区块数据。 遍历收到的所有数据,如果数据类型是交易或者见证隔离交易,进行如下的处理:ProcessGetBlockData
方法,因为处理过程比较长,所以其处理过程放在下面进行详细说明。清空已收到数据集合从起始部分到当前位置之间的
inv
数据。
如果对等节点已断开,则直接返回。
如果对等节点要处理的数据不为空,则直接返回。
这样维护了响应的顺序。
如果对等节点已暂停,则直接返回。
缓冲区太满而不能进行继续处理。
如果节点对象待处理的消息列表
vProcessMsg
为空,则返回假。否则,取出第一个消息对象,放入消息列表msgs
中,并待处理的消息列表中删除。将节点对象处理队列大小减去已删除的消息对象收到的数据长度与消息头部大小之和。根据节点对象处理队列大小与允许的接收上限比较,如果大于接收上限,则设置节点暂停接收消息。生成一个消息对象,设置其版本为对等节点已接收到的版本。
验证消息的
MESSAGESTART
是否有效。如果无效,则设置对等节点为断开,然后返回。从消息对象中取得消息头部,并进行验证。如果无效,则返回。
从消息对象中取得具体的命令和消息大小。
调用消息对象自身的哈希,并与消息头部的校验和进行验证。如果验证失败,则返回。
const uint256& hash = msg.GetMessageHash(); if (memcmp(hash.begin(), hdr.pchChecksum, CMessageHeader::CHECKSUMSIZE) != 0) { LogPrint(BCLog::NET, "%s(%s, %u bytes): CHECKSUM ERROR expected %s was %s\n", _func, SanitizeString(strCommand), nMessageSize, HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); return fMoreWork; }
调用
ProcessMessage
方法,进行消息处理。对于比特币网络来说,最最重要的方法来了。这个方法处理比特币的所有具体,比如:版本消息、获取区块消息等。
因为这个方法是如此的重要,所以我们把留在下一篇文章中进行说明。
调用
SendRejectsAndCheckIfBanned
方法,进行可能的reject
处理。具体如下:取得节点的状态对象。
如果启用 BIP61,则遍历状态对象中保存的
reject
,调用PushMessage
方法,发送reject
消息。清空状态对象中保存的
reject
.如果状态对象的禁止属性
fShouldBan
为真,则:设置禁止属性为假。
如果节点在白名单中,或者是手动连接的,则进行警告。否则,进行下面的处理:
设置对等节点的断开连接属性为真;如果节点的地址是本地地址,则进行警告,否则,调用
Ban
方法,禁止对等节点连接。
ProcessGetBlockData
生成一些内部变量,并设置为已缓存的变量值。
调用
LookupBlockIndex
方法,查询消息对应的区块索引。如果区块索引存在,并且索引对应的区块在链上的交易也存在,但区块还没有验证过,那么设置变量need_activate_chain
为真。const CBlockIndex* pindex = LookupBlockIndex(inv.hash); if (pindex) { if (pindex->nChainTx && !pindex->IsValid(BLOCK_VALID_SCRIPTS) && pindex->IsValid(BLOCK_VALID_TREE)) { need_activate_chain = true; } }
如果需要激活区块链,那么调用
ActivateBestChain
方法来激活。如果区块索引存在,调用
BlockRequestAllowed
方法检查是否允许发送数据。如果允许发送,且达到历史区块服务限额的情况下,断开节点连接,并设置发送标志为假。
如果允许发送,且节点不在白名单中,且节点支持的服务是
NODE_NETWORK_LIMITED
(只支持 288个区块,即2天内生成的区块),且不支持NODE_NETWORK
服务,且区块链栈顶的高度与当前区块索引的高度之差大于网络限制允许的最小区块数(NODE_NETWORK_LIMITED_MIN_BLOCKS
,288个区块)加上额外的两个区块(为了防止竞争,指的是分叉?所以增加两个区块缓冲),则断开节点连接,并设置发送标志为假。如果允许发送,并且这个区块索引的状态等于
BLOCK_HAVE_DATA
(全部数据都blk*.dat
文件中可用),则进行下面的处理:生成一个指向区块对象的智能指针对象
pblock
。如果最近区块对象存在,且其哈希与区块索引对象的哈希一样,那么设置
pblock
为最近区块对象;否则,如果消息对象类型是隔离见证区块,那么:调用
ReadRawBlockFromDisk
方法,从磁盘中读取原始的区块数据。如果可以读到,则调用PushMessage
方法,发送区块数据。这种情况下,直接发送了区块数据,所以不设置pblock
变量。否则,调用
ReadBlockFromDisk
方法,从磁盘中读取原始的区块数据。如果可以读到,则设置pblock
为读取到的数据。如果
pblock
为真,则发送消息。如果消息对象类型是区块,则调用
PushMessage
方法,发送标志为SERIALIZE_TRANSACTION_NO_WITNESS
的区块消息;如果消息类型是隔离见证区块,则调用
PushMessage
方法,发送区块消息;如果消息类型是过滤区块,则:如果对节对象的 Bloom 过滤器存在,那么生成默克尔区块,并设置发送过滤区块标志为真;如果发送过滤区块标志为真,则调用
PushMessage
方法,发送默克尔区块,然后调用PushMessage
方法,发送默克尔区块的每个交易数据,标志为SERIALIZE_TRANSACTION_NO_WITNESS
;如果消息类型是紧凑区块,同样调用
PushMessage
方法,发送区块消息。如果消息的哈希等于节点的继续发送属性(
hashContinue
属性,代表继续发送的哈希),则:生成一个
CInv
向量;然后,构造一个CInv
对象,类型为区块,哈希为区块链栈顶元素的哈希;然后,调用PushMessage
方法,发送inv
消息;最后,设置节点的继续发送属性为空。
代码如下:
2、SendMessages
本方法主要处理对等节点发送的相关逻辑,具体代码在 net_processing.cpp
文件中。
如果对等节点间还没有完成握手,或者已经断开连接,则返回。
如果对等节点的 Ping 已经被请求,则设置
pingSend
变量为真。如果对等节点没有期望 Pong 回复(即对等节点的
nPingNonceSent
等于0),且用户开始 ping 的时间(nPingUsecStart
)加上规定的节点 ping 间隔小于当前时间,则设置pingSend
变量为真。如果
pingSend
为真,处理如下:设置对等节点的 Ping 已经被请求假。
设置开始 ping 的时间为当前时间。
如果对等节点的版本大于 BIP 0031 规定的版本(60000),则设置对等节点的
nPingNonceSent
为随机变量nonce
,调用PushMessage
方法,发送ping
消息,消息中包括nonce
;否则,即对等节点不支持带随机数的 Ping 命令,则设置设置对等节点的nPingNonceSent
为0,调用PushMessage
方法,发送ping
消息,消息中不包括nonce
。
if (pingSend) { uint64_t nonce = 0; while (nonce == 0) { GetRandBytes((unsigned char*)&nonce, sizeof(nonce)); } pto->fPingQueued = false; pto->nPingUsecStart = GetTimeMicros(); if (pto->nVersion > BIP0031_VERSION) { pto->nPingNonceSent = nonce; connman->PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce)); } else { pto->nPingNonceSent = 0; connman->PushMessage(pto, msgMaker.Make(NetMsgType::PING)); } }
调用
SendRejectsAndCheckIfBanned
方法,进行可能的reject
处理。如果该函数返回为真,则返回。获取节点的状态对象。
如果当前没有在 IBD 下载中(
IsInitialBlockDownload
函数为假),且节点下次发送本地地址的时间(nNextLocalAddrSend
)小于当前时间,那么进行如下处理:调用
AdvertiseLocal
方法,发送我们自己本身的地址给对等节点。方法的主要逻辑是调用节点对象的
PushAddress
方法,把要发送的地址保存在vAddrToSend
集合中。调用
PoissonNextSend
方法,计算下次发送地址的时间,并设置节点的下次发送本地地址的时间为该值。
如果节点的下次发送地址时间小于当前时间,则:
调用
PoissonNextSend
方法,计算下次发送地址的时间,并保存为节点的下次发送地址时间nNextAddrSend
属性。生成一个地址向量集合
vAddr
。遍历节点待发送的地址向量
vAddrToSend
,如果当前地址不在节点的概率“跟踪最近插入的”集合addrKnown
中,则:保存当前地址到节点的概率“跟踪最近插入的”集合中。
保存地址到
vAddr
向量中。如果当前的地址向量集合数量大于等于 1000,则调用
PushMessage
方法,发送地址向量;然后清空地址向量集合。
清空节点的发送地址集合
vAddrToSend
。如果地址向量集合不空,即地址向量集合的数量不超过 1000个,则调用
PushMessage
方法,发送地址消息。如果节点的待发送的地址向量集合的预分配的内存空间(
capacity()
)大于40,则调用其shrink_to_fit
方法来缩减空间,即只允许发送一次大的地址包。
if (pto->nNextAddrSend < nNow) { pto->nNextAddrSend = PoissonNextSend(nNow, AVG_ADDRESS_BROADCAST_INTERVAL); std::vector vAddr; vAddr.reserve(pto->vAddrToSend.size()); for (const CAddress& addr : pto->vAddrToSend) { if (!pto->addrKnown.contains(addr.GetKey())) { pto->addrKnown.insert(addr.GetKey()); vAddr.push_back(addr); // receiver rejects addr messages larger than 1000 if (vAddr.size() >= 1000) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); vAddr.clear(); } } } pto->vAddrToSend.clear(); if (!vAddr.empty()) connman->PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); if (pto->vAddrToSend.capacity() > 40) pto->vAddrToSend.shrink_to_fit(); }
接下来,开始区块同步。
如果指向最佳区块链头部的指针(
pindexBestHeader
)为空指针,则设置其为当前活跃区块链的栈顶元素指针。如果还没有从这个节点同步区块头部,并且节点的
fClient
、fImporting
、fReindex
等属性为假,进一步,如果已经同步的节点数量为 0 且需要获取区块数据,或者最佳区块头部的区块时间距离现在已超过 24 小时,那么调用PushMessage
方法,发出请求GETHEADERS
命令,开始同步区块头部。bool fFetch = state.fPreferredDownload || (nPreferredDownload == 0 && !pto->fClient && !pto->fOneShot); // Download if this is a nice peer, or we have no nice peers and this one might do. if (!state.fSyncStarted && !pto->fClient && !fImporting && !fReindex) { if ((nSyncStarted == 0 && fFetch) || pindexBestHeader->GetBlockTime() > GetAdjustedTime() - 24 60 60) { state.fSyncStarted = true; state.nHeadersSyncTimeout = GetTimeMicros() + HEADERS_DOWNLOAD_TIMEOUT_BASE + HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER (GetAdjustedTime() - pindexBestHeader->GetBlockTime())/(consensusParams.nPowTargetSpacing); nSyncStarted++; const CBlockIndex pindexStart = pindexBestHeader; if (pindexStart->pprev) pindexStart = pindexStart->pprev; LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), pto->nStartingHeight); connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256())); } }
如果当前不是重建索引、重新导入和 IBD 下载期间,则重新发送尚未进入区块的钱包交易。
如果不需要转化为
Inv
消息,那么进行如下处理:遍历区块头部进行区块公告的集合(
vBlockHashesToAnnounce
),按如下处理:调用
LookupBlockIndex
方法,查找当前对应的区块索引。如果当前活跃区块链上没有这个索引,那么设置需要转化为
Inv
消息为真,然后退出当前循环。如果最佳索引不是空指针,并且当前区块索引不等于最佳指针,那么设置需要转化为
Inv
消息为真,然后退出当前循环。接下来,设置当前区块索引为最佳索引,处理哪些区块索引可以放入头部集合。
如果不需要转化成
Inv
消息,并且区块头部集合(vHeaders
)不空,进行下面的处理:如果区块头部长度为1,并且需要下载头部和ID,那么:
生成发送标志,如果对等节点想要紧凑的隔离见证类型,则设置发送标志为 0,否则设置为
SERIALIZE_TRANSACTION_NO_WITNESS
。如果最近的区块哈希与最佳区块索引的哈希相等,则调用
PushMessage
方法,发送消息,类型为CMPCTBLOCK
,然后设置区块是从缓存区中加载的标志为真。如果区块不是从缓存区中加载的,那么就需要调用
ReadBlockFromDisk
方法,从硬盘中加载区块,然后再调用PushMessage
方法,发送消息,类型为CMPCTBLOCK
。否则,如果不是优先下载头部(即区块状态对象
fPreferHeaders
)为假,那么就调用PushMessage
方法,发送消息,类型为HEADERS
,然后设置区块状态对象的pindexBestHeaderSent
属性为当前的最佳索引区块。
具体代码如下:
如果需要转化成
Inv
消息,进一步,如果使用区块头部进行区块公告的集合(vBlockHashesToAnnounce
),则进行如下处理:返回集合最后一个元素,调用
LookupBlockIndex
方法,查找这个元素对应的区块索引。如果活跃区块链在区块索引指定的高度上对应的索引不是我们找到的索引,即要公告的区块不在主链上,则打印一个警告。
如果这个区块不在节点的区块链上,那么就把这个区块放在区块库存清单中。
生成一个
Inv
消息,类型是区块,然后调用节点对象的PushInventory
方法,放入节点对象的库存清单集合中。
清空节点对象的区块头部进行区块公告的集合。
生成
vInv
集合,并设置其长度。遍历已经公告的区块 ID 列表,如果没有达到
vInv
集合的最大长度,则加入集合尾部,如果已经达到则调用PushMessage
方法,发送INV
消息。然后,清空已经公告的区块 ID 列表。如果节点对象下次发送
Inv
消息的时间已经小于当前时间,那么设置fSendTrickle
变量为真,根据是否为入门节点,设置不同的节点对象下次发送Inv
消息。如果发送时间已到,但是节点请求我们不要发送中继交易,那么清空节点对象的发送
Inv
消息的集合集合setInventoryTxToSend
。如果发送时间已到,并且节点请求过 BIP35 规定的
mempool
,那么:调用内存池对象的
infoAll
方法,返回内存池交易信息集合。设置区块对象的
mempool
为假。获取节点设置的最小交易费用过滤器。默认为 0。
遍历内存池交易信息集合并进行处理。
用当前交易信息生成一个 inv 对象,然后从区块对象的
setInventoryTxToSend
集合中删除对应的交易信息。如果设置了最小交易费用,并且当前交易的费用小于设置的最小费用,那么处理下一个。如果区块对象设置了布隆过滤器,并且当前交易不符合要求,那么处理下一个。
把当前交易的哈希加入区块对象的
filterInventoryKnown
集合;把 inv 对象加入vInv
集合。如果集合已经达到规定的最大数量,那么调用PushMessage
方法,发送INV
消息给远程对等节点,然后清空集合。设置区块对象的
timeLastMempoolReq
属性。
如果需要发送,即发送时间已到,那么:
生成交易的向量集合,并设置其大小。然后从区块对象的
setInventoryTxToSend
集合中取得其迭代器放进新生成的向量集合。设置区块对象的最小交易费用,默认为0。
如果
vInvTx
集合不空,并且需要中继的交易数量小于规定的最大 INV 广播数量,那就进行while
循环。下面是循环体:
如果
vInv
集合不空,那么就调用PushMessage
方法,发送INV
消息。如果区块状态对象的停止下载区块的时间不等于0,并且小于当前时间减去规定的时间,那么设置区块对象为断开,然后返回真。
如果区块下载超时,那么设置区块对象为断开,然后返回真。
接下来,检查区块头部下载是否超时。
获取所有节点请求的数据,包括区块与非区块,并放在
vGetData
集合中,然后调用PushMessage
方法,发送给远程节点。
Last updated
Was this helpful?