套接字的处理
ThreadSocketHandler
if (!fNetworkActive) { // Disconnect any connected nodes for (CNode* pnode : vNodes) { if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId()); pnode->fDisconnect = true; } } }void CNode::CloseSocketDisconnect() { fDisconnect = true; LOCK(cs_hSocket); if (hSocket != INVALID_SOCKET) { LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); CloseSocket(hSocket); } }
void CConnman::DeleteNode(CNode* pnode) { assert(pnode); bool fUpdateConnectionTime = false; m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime); if(fUpdateConnectionTime) { addrman.Connected(pnode->addr); } delete pnode; }int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) { std::map<uint256, COrphanTx>::iterator it = mapOrphanTransactions.find(hash); if (it == mapOrphanTransactions.end()) return 0; for (const CTxIn& txin : it->second.tx->vin) { std::set<std::map<uint256, COrphanTx>::iterator, IteratorComparator> auto itPrev = mapOrphanTransactionsByPrev.find(txin.prevout); if (itPrev == mapOrphanTransactionsByPrev.end()) continue; itPrev->second.erase(it); if (itPrev->second.empty()) mapOrphanTransactionsByPrev.erase(itPrev); } mapOrphanTransactions.erase(it); return 1; }
{ LOCK(cs_vNodes); vNodesSize = vNodes.size(); } if(vNodesSize != nPrevNodeCount) { nPrevNodeCount = vNodesSize; if(clientInterface) clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount); }struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 50000; // frequency to poll pnode->vSend fd_set fdsetRecv; fd_set fdsetSend; fd_set fdsetError; FD_ZERO(&fdsetRecv); FD_ZERO(&fdsetSend); FD_ZERO(&fdsetError);for (const ListenSocket& hListenSocket : vhListenSocket) { FD_SET(hListenSocket.socket, &fdsetRecv); hSocketMax = std::max(hSocketMax, hListenSocket.socket); have_fds = true; }for (CNode* pnode : vNodes) { bool select_recv = !pnode->fPauseRecv; bool select_send; { LOCK(pnode->cs_vSend); select_send = !pnode->vSendMsg.empty(); } LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; FD_SET(pnode->hSocket, &fdsetError); hSocketMax = std::max(hSocketMax, pnode->hSocket); have_fds = true; if (select_send) { FD_SET(pnode->hSocket, &fdsetSend); continue; } if (select_recv) { FD_SET(pnode->hSocket, &fdsetRecv); } }int nSelect = select(have_fds ? hSocketMax + 1 : 0,&fdsetRecv, &fdsetSend, &fdsetError, &timeout);if (nSelect == SOCKET_ERROR) { if (have_fds) { int nErr = WSAGetLastError(); LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); for (unsigned int i = 0; i <= hSocketMax; i++) FD_SET(i, &fdsetRecv); } FD_ZERO(&fdsetSend); FD_ZERO(&fdsetError); if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) return; }for (const ListenSocket& hListenSocket : vhListenSocket) { if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv)) { AcceptConnection(hListenSocket); } }int nMaxInbound = nMaxConnections - (nMaxOutbound + nMaxFeeler);if (hSocket != INVALID_SOCKET) { if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr)) { LogPrintf("Warning: Unknown socket family\n"); } }{ LOCK(cs_vNodes); for (const CNode* pnode : vNodes) { if (pnode->fInbound) nInbound++; } }if (hSocket == INVALID_SOCKET) { int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK) LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr)); return; }if (!fNetworkActive) { LogPrintf("connection from %s dropped: not accepting new connections\n", addr.ToString()); CloseSocket(hSocket); return; }if (!IsSelectableSocket(hSocket)) { LogPrintf("connection from %s dropped: non-selectable socket\n", addr.ToString()); CloseSocket(hSocket); return; }if (nInbound >= nMaxInbound) { if (!AttemptToEvictConnection()) { LogPrint(BCLog::NET, "failed to find an eviction candidate - connection dropped (full)\n"); CloseSocket(hSocket); return; } }NodeId id = GetNewNodeId(); uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); CAddress addr_bind = GetBindAddress(hSocket); CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true); pnode->AddRef(); pnode->fWhitelisted = whitelisted; m_msgproc->InitializeNode(pnode); LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString()); { LOCK(cs_vNodes); vNodes.push_back(pnode); }
std::vector<CNode*> vNodesCopy; { LOCK(cs_vNodes); vNodesCopy = vNodes; for (CNode* pnode : vNodesCopy) pnode->AddRef(); }bool recvSet = false; bool sendSet = false; bool errorSet = false; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); errorSet = FD_ISSET(pnode->hSocket, &fdsetError); }char pchBuf[0x10000]; int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); }if (recvSet || errorSet) { // typical socket buffer is 8K-64K char pchBuf[0x10000]; int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); } if (nBytes > 0) { bool notify = false; if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); if (notify) { size_t nSizeAdded = 0; auto it(pnode->vRecvMsg.begin()); for (; it != pnode->vRecvMsg.end(); ++it) { if (!it->complete()) break; nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; } { LOCK(pnode->cs_vProcessMsg); pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); pnode->nProcessQueueSize += nSizeAdded; pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } WakeMessageHandler(); } } else if (nBytes == 0) { // socket closed gracefully if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "socket closed\n"); } pnode->CloseSocketDisconnect(); } else if (nBytes < 0) { // error int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { if (!pnode->fDisconnect) LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); pnode->CloseSocketDisconnect(); } } }if (sendSet) { LOCK(pnode->cs_vSend); size_t nBytes = SocketSendData(pnode); if (nBytes) { RecordBytesSent(nBytes); } }size_t CConnman::SocketSendData(CNode *pnode) const { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; while (it != pnode->vSendMsg.end()) { const auto &data = *it; assert(data.size() > pnode->nSendOffset); int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) break; nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); } if (nBytes > 0) { pnode->nLastSend = GetSystemTimeInSeconds(); pnode->nSendBytes += nBytes; pnode->nSendOffset += nBytes; nSentSize += nBytes; if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more break; } } else { if (nBytes < 0) { // error int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { LogPrintf("socket send error %s\n", NetworkErrorString(nErr)); pnode->CloseSocketDisconnect(); } } // couldn't send anything at all break; } } if (it == pnode->vSendMsg.end()) { assert(pnode->nSendOffset == 0); assert(pnode->nSendSize == 0); } pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); return nSentSize; }
int64_t nTime = GetSystemTimeInSeconds(); if (nTime - pnode->nTimeConnected > 60) { if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) { LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId()); pnode->fDisconnect = true; } else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) { LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend); pnode->fDisconnect = true; } else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60)) { LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv); pnode->fDisconnect = true; } else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros()) { LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart)); pnode->fDisconnect = true; } else if (!pnode->fSuccessfullyConnected) { LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId()); pnode->fDisconnect = true; } }{ LOCK(cs_vNodes); for (CNode* pnode : vNodesCopy) pnode->Release(); }
Last updated