Bitcoin ABC  0.22.15
P2P Digital Currency
processor.cpp
Go to the documentation of this file.
1 // Copyright (c) 2018-2019 The Bitcoin developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <avalanche/processor.h>
6 
9 #include <chain.h>
10 #include <key_io.h> // For DecodeSecret
11 #include <net_processing.h> // For ::PeerManager
12 #include <netmessagemaker.h>
13 #include <reverse_iterator.h>
14 #include <scheduler.h>
15 #include <util/bitmanip.h>
16 #include <validation.h>
17 
18 #include <chrono>
19 #include <tuple>
20 
24 static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10};
25 
26 // Unfortunately, the bitcoind codebase is full of global and we are kinda
27 // forced into it here.
28 std::unique_ptr<avalanche::Processor> g_avalanche;
29 
30 namespace avalanche {
31 
32 bool VoteRecord::registerVote(NodeId nodeid, uint32_t error) {
33  // We just got a new vote, so there is one less inflight request.
35 
36  // We want to avoid having the same node voting twice in a quorum.
37  if (!addNodeToQuorum(nodeid)) {
38  return false;
39  }
40 
48  votes = (votes << 1) | (error == 0);
49  consider = (consider << 1) | (int32_t(error) >= 0);
50 
60  bool yes = countBits(votes & consider & 0xff) > 6;
61  if (!yes) {
62  bool no = countBits(~votes & consider & 0xff) > 6;
63  if (!no) {
64  // The round is inconclusive.
65  return false;
66  }
67  }
68 
69  // If the round is in agreement with previous rounds, increase confidence.
70  if (isAccepted() == yes) {
71  confidence += 2;
73  }
74 
75  // The round changed our state. We reset the confidence.
76  confidence = yes;
77  return true;
78 }
79 
81  if (nodeid == NO_NODE) {
82  // Helpful for testing.
83  return true;
84  }
85 
86  // MMIX Linear Congruent Generator.
87  const uint64_t r1 =
88  6364136223846793005 * uint64_t(nodeid) + 1442695040888963407;
89  // Fibonacci hashing.
90  const uint64_t r2 = 11400714819323198485ull * (nodeid ^ seed);
91  // Combine and extract hash.
92  const uint16_t h = (r1 + r2) >> 48;
93 
97  for (size_t i = 1; i < nodeFilter.size(); i++) {
98  if (nodeFilter[(successfulVotes + i) % nodeFilter.size()] == h) {
99  return false;
100  }
101  }
102 
106  nodeFilter[successfulVotes % nodeFilter.size()] = h;
107  successfulVotes++;
108  return true;
109 }
110 
112  uint8_t count = inflight.load();
113  while (count < AVALANCHE_MAX_INFLIGHT_POLL) {
114  if (inflight.compare_exchange_weak(count, count + 1)) {
115  return true;
116  }
117  }
118 
119  return false;
120 }
121 
122 static bool IsWorthPolling(const CBlockIndex *pindex) {
124 
125  if (pindex->nStatus.isInvalid()) {
126  // No point polling invalid blocks.
127  return false;
128  }
129 
130  if (::ChainstateActive().IsBlockFinalized(pindex)) {
131  // There is no point polling finalized block.
132  return false;
133  }
134 
135  return true;
136 }
137 
141 };
142 
146 
147 public:
148  NotificationsHandler(Processor *p) : m_processor(p) {}
149 
150  void updatedBlockTip() override {
151  LOCK(m_processor->cs_peerManager);
152  m_processor->peerManager->updatedBlockTip();
153  }
154 };
155 
157  NodePeerManager *nodePeerManagerIn)
158  : connman(connmanIn), nodePeerManager(nodePeerManagerIn),
159  queryTimeoutDuration(AVALANCHE_DEFAULT_QUERY_TIMEOUT), round(0),
160  peerManager(std::make_unique<PeerManager>()) {
161  if (gArgs.IsArgSet("-avasessionkey")) {
162  sessionKey = DecodeSecret(gArgs.GetArg("-avasessionkey", ""));
163  } else {
164  // Pick a random key for the session.
165  sessionKey.MakeNewKey(true);
166  }
167 
168  if (gArgs.IsArgSet("-avaproof")) {
169  peerData = std::make_unique<PeerData>();
170 
171  {
172  // The proof.
173  CDataStream stream(ParseHex(gArgs.GetArg("-avaproof", "")),
174  SER_NETWORK, 0);
175  stream >> peerData->proof;
176 
177  // Ensure the peer manager knows about it.
178  // FIXME: There is no way to register the proof at this time because
179  // we might not have the proper chainstate at the moment. We need to
180  // find a way to delay the registration of the proof until after IBD
181  // has finished and the chain state is settled.
182  // LOCK(cs_peerManager);
183  // peerManager->getPeerId(peerData->proof);
184  }
185 
186  // Generate the delegation to the session key.
187  DelegationBuilder dgb(peerData->proof);
188  if (sessionKey.GetPubKey() != peerData->proof.getMaster()) {
189  dgb.addLevel(DecodeSecret(gArgs.GetArg("-avamasterkey", "")),
191  }
192  peerData->delegation = dgb.build();
193  }
194 
195  // Make sure we get notified of chain state changes.
197  chain.handleNotifications(std::make_shared<NotificationsHandler>(this));
198 }
199 
202  stopEventLoop();
203 }
204 
206  bool isAccepted;
207 
208  {
209  LOCK(cs_main);
210  if (!IsWorthPolling(pindex)) {
211  // There is no point polling this block.
212  return false;
213  }
214 
215  isAccepted = ::ChainActive().Contains(pindex);
216  }
217 
218  return vote_records.getWriteView()
219  ->insert(std::make_pair(pindex, VoteRecord(isAccepted)))
220  .second;
221 }
222 
223 bool Processor::isAccepted(const CBlockIndex *pindex) const {
224  auto r = vote_records.getReadView();
225  auto it = r->find(pindex);
226  if (it == r.end()) {
227  return false;
228  }
229 
230  return it->second.isAccepted();
231 }
232 
233 int Processor::getConfidence(const CBlockIndex *pindex) const {
234  auto r = vote_records.getReadView();
235  auto it = r->find(pindex);
236  if (it == r.end()) {
237  return -1;
238  }
239 
240  return it->second.getConfidence();
241 }
242 
243 namespace {
248  class TCPResponse {
249  Response response;
251 
252  public:
253  TCPResponse(Response responseIn, const CKey &key)
254  : response(std::move(responseIn)) {
255  CHashWriter hasher(SER_GETHASH, 0);
256  hasher << response;
257  const uint256 hash = hasher.GetHash();
258 
259  // Now let's sign!
260  if (!key.SignSchnorr(hash, sig)) {
261  sig.fill(0);
262  }
263  }
264 
265  // serialization support
266  SERIALIZE_METHODS(TCPResponse, obj) {
267  READWRITE(obj.response, obj.sig);
268  }
269  };
270 } // namespace
271 
274  pfrom, CNetMsgMaker(pfrom->GetCommonVersion())
276  TCPResponse(std::move(response), sessionKey)));
277 }
278 
280  std::vector<BlockUpdate> &updates) {
281  {
282  // Save the time at which we can query again.
284 
285  // FIXME: This will override the time even when we received an old stale
286  // message. This should check that the message is indeed the most up to
287  // date one before updating the time.
288  peerManager->updateNextRequestTime(
289  nodeid, std::chrono::steady_clock::now() +
290  std::chrono::milliseconds(response.getCooldown()));
291  }
292 
293  std::vector<CInv> invs;
294 
295  {
296  // Check that the query exists.
297  auto w = queries.getWriteView();
298  auto it = w->find(std::make_tuple(nodeid, response.getRound()));
299  if (it == w.end()) {
300  nodePeerManager->Misbehaving(nodeid, 2, "unexpcted-ava-response");
301  return false;
302  }
303 
304  invs = std::move(it->invs);
305  w->erase(it);
306  }
307 
308  // Verify that the request and the vote are consistent.
309  const std::vector<Vote> &votes = response.GetVotes();
310  size_t size = invs.size();
311  if (votes.size() != size) {
312  nodePeerManager->Misbehaving(nodeid, 100, "invalid-ava-response-size");
313  return false;
314  }
315 
316  for (size_t i = 0; i < size; i++) {
317  if (invs[i].hash != votes[i].GetHash()) {
318  nodePeerManager->Misbehaving(nodeid, 100,
319  "invalid-ava-response-content");
320  return false;
321  }
322  }
323 
324  std::map<CBlockIndex *, Vote> responseIndex;
325 
326  {
327  LOCK(cs_main);
328  for (const auto &v : votes) {
329  auto pindex = LookupBlockIndex(BlockHash(v.GetHash()));
330  if (!pindex) {
331  // This should not happen, but just in case...
332  continue;
333  }
334 
335  if (!IsWorthPolling(pindex)) {
336  // There is no point polling this block.
337  continue;
338  }
339 
340  responseIndex.insert(std::make_pair(pindex, v));
341  }
342  }
343 
344  {
345  // Register votes.
346  auto w = vote_records.getWriteView();
347  for (const auto &p : responseIndex) {
348  CBlockIndex *pindex = p.first;
349  const Vote &v = p.second;
350 
351  auto it = w->find(pindex);
352  if (it == w.end()) {
353  // We are not voting on that item anymore.
354  continue;
355  }
356 
357  auto &vr = it->second;
358  if (!vr.registerVote(nodeid, v.GetError())) {
359  // This vote did not provide any extra information, move on.
360  continue;
361  }
362 
363  if (!vr.hasFinalized()) {
364  // This item has note been finalized, so we have nothing more to
365  // do.
366  updates.emplace_back(
367  pindex, vr.isAccepted() ? BlockUpdate::Status::Accepted
368  : BlockUpdate::Status::Rejected);
369  continue;
370  }
371 
372  // We just finalized a vote. If it is valid, then let the caller
373  // know. Either way, remove the item from the map.
374  updates.emplace_back(pindex, vr.isAccepted()
375  ? BlockUpdate::Status::Finalized
376  : BlockUpdate::Status::Invalid);
377  w->erase(it);
378  }
379  }
380 
381  return true;
382 }
383 
384 bool Processor::addNode(NodeId nodeid, const Proof &proof,
385  const Delegation &delegation) {
387  return peerManager->addNode(nodeid, proof, delegation);
388 }
389 
391  std::function<bool(const Node &n)> func) const {
393  return peerManager->forNode(nodeid, std::move(func));
394 }
395 
397  return sessionKey.GetPubKey();
398 }
399 
400 bool Processor::sendHello(CNode *pfrom) const {
401  if (!peerData) {
402  // We do not have a delegation to advertise.
403  return false;
404  }
405 
406  // Now let's sign!
407  SchnorrSig sig;
408 
409  {
410  CHashWriter hasher(SER_GETHASH, 0);
411  hasher << peerData->delegation.getId();
412  hasher << pfrom->GetLocalNonce();
413  hasher << pfrom->nRemoteHostNonce;
414  hasher << pfrom->GetLocalExtraEntropy();
415  hasher << pfrom->nRemoteExtraEntropy;
416  const uint256 hash = hasher.GetHash();
417 
418  if (!sessionKey.SignSchnorr(hash, sig)) {
419  return false;
420  }
421  }
422 
424  .Make(NetMsgType::AVAHELLO,
425  Hello(peerData->delegation, sig)));
426 
427  return true;
428 }
429 
431  return eventLoop.startEventLoop(
432  scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP);
433 }
434 
436  return eventLoop.stopEventLoop();
437 }
438 
439 std::vector<CInv> Processor::getInvsForNextPoll(bool forPoll) {
440  std::vector<CInv> invs;
441 
442  // First remove all blocks that are not worth polling.
443  {
444  LOCK(cs_main);
445  auto w = vote_records.getWriteView();
446  for (auto it = w->begin(); it != w->end();) {
447  const CBlockIndex *pindex = it->first;
448  if (!IsWorthPolling(pindex)) {
449  w->erase(it++);
450  } else {
451  ++it;
452  }
453  }
454  }
455 
456  auto r = vote_records.getReadView();
457  for (const std::pair<const CBlockIndex *const, VoteRecord> &p :
458  reverse_iterate(r)) {
459  // Check if we can run poll.
460  const bool shouldPoll =
461  forPoll ? p.second.registerPoll() : p.second.shouldPoll();
462  if (!shouldPoll) {
463  continue;
464  }
465 
466  // We don't have a decision, we need more votes.
467  invs.emplace_back(MSG_BLOCK, p.first->GetBlockHash());
468  if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) {
469  // Make sure we do not produce more invs than specified by the
470  // protocol.
471  return invs;
472  }
473  }
474 
475  return invs;
476 }
477 
480  return peerManager->selectNode();
481 }
482 
484  auto now = std::chrono::steady_clock::now();
485  std::map<CInv, uint8_t> timedout_items{};
486 
487  {
488  // Clear expired requests.
489  auto w = queries.getWriteView();
490  auto it = w->get<query_timeout>().begin();
491  while (it != w->get<query_timeout>().end() && it->timeout < now) {
492  for (const auto &i : it->invs) {
493  timedout_items[i]++;
494  }
495 
496  w->get<query_timeout>().erase(it++);
497  }
498  }
499 
500  if (timedout_items.empty()) {
501  return;
502  }
503 
504  // In flight request accounting.
505  for (const auto &p : timedout_items) {
506  const CInv &inv = p.first;
507  assert(inv.type == MSG_BLOCK);
508 
509  CBlockIndex *pindex;
510 
511  {
512  LOCK(cs_main);
513  pindex = LookupBlockIndex(BlockHash(inv.hash));
514  if (!pindex) {
515  continue;
516  }
517  }
518 
519  auto w = vote_records.getWriteView();
520  auto it = w->find(pindex);
521  if (it == w.end()) {
522  continue;
523  }
524 
525  it->second.clearInflightRequest(p.second);
526  }
527 }
528 
530  // First things first, check if we have requests that timed out and clear
531  // them.
533 
534  // Make sure there is at least one suitable node to query before gathering
535  // invs.
536  NodeId nodeid = getSuitableNodeToQuery();
537  if (nodeid == NO_NODE) {
538  return;
539  }
540  std::vector<CInv> invs = getInvsForNextPoll();
541  if (invs.empty()) {
542  return;
543  }
544 
545  do {
551  bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) {
552  uint64_t current_round = round++;
553 
554  {
555  // Compute the time at which this requests times out.
556  auto timeout =
557  std::chrono::steady_clock::now() + queryTimeoutDuration;
558  // Register the query.
559  queries.getWriteView()->insert(
560  {pnode->GetId(), current_round, timeout, invs});
561  // Set the timeout.
563  peerManager->updateNextRequestTime(pnode->GetId(), timeout);
564  }
565 
566  // Send the query to the node.
568  pnode, CNetMsgMaker(pnode->GetCommonVersion())
569  .Make(NetMsgType::AVAPOLL,
570  Poll(current_round, std::move(invs))));
571  return true;
572  });
573 
574  // Success!
575  if (hasSent) {
576  return;
577  }
578 
579  {
580  // This node is obsolete, delete it.
582  peerManager->removeNode(nodeid);
583  }
584 
585  // Get next suitable node to try again
586  nodeid = getSuitableNodeToQuery();
587  } while (nodeid != NO_NODE);
588 }
589 
590 } // namespace avalanche
uint32_t getCooldown() const
Definition: protocol.h:44
int getConfidence(const CBlockIndex *pindex) const
Definition: processor.cpp:233
uint32_t successfulVotes
Definition: processor.h:91
uint64_t nRemoteHostNonce
Definition: net.h:850
void clearTimedoutRequests()
Definition: processor.cpp:483
static constexpr NodeId NO_NODE
Special NodeId that represent no node.
Definition: net.h:106
bool IsArgSet(const std::string &strArg) const
Return true if the given argument has been manually set.
Definition: system.cpp:391
void sendResponse(CNode *pfrom, Response response) const
Definition: processor.cpp:272
bool sendHello(CNode *pfrom) const
Definition: processor.cpp:400
virtual std::unique_ptr< Handler > handleNotifications(std::shared_ptr< Notifications > notifications)=0
Register handler for notifications.
CChain & ChainActive()
Definition: validation.cpp:85
std::atomic< uint8_t > inflight
Definition: processor.h:85
const char * AVAHELLO
Contains a delegation and a signature.
Definition: protocol.cpp:54
CPubKey GetPubKey() const
Compute the public key from a private key.
Definition: key.cpp:210
reverse_range< T > reverse_iterate(T &x)
Inv(ventory) message data.
Definition: protocol.h:505
bool forNode(NodeId nodeid, std::function< bool(const Node &n)> func) const
Definition: processor.cpp:390
BlockStatus nStatus
Verification status of this block. See enum BlockStatus.
Definition: blockindex.h:76
std::vector< CInv > getInvsForNextPoll(bool forPoll=true)
Definition: processor.cpp:439
std::array< uint8_t, CPubKey::SCHNORR_SIZE > SchnorrSig
a Schnorr signature
Definition: key.h:25
void PushMessage(CNode *pnode, CSerializedNetMsg &&msg)
Definition: net.cpp:3035
std::unique_ptr< PeerData > peerData
Definition: processor.h:247
std::unique_ptr< interfaces::Handler > chainNotificationsHandler
Definition: processor.h:255
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:197
uint64_t GetLocalNonce() const
Definition: net.h:1076
const std::vector< Vote > & GetVotes() const
Definition: protocol.h:45
bool addBlockToReconcile(const CBlockIndex *pindex)
Definition: processor.cpp:205
WriteView getWriteView()
Definition: rwcollection.h:82
NodeId getSuitableNodeToQuery()
Definition: processor.cpp:478
bool registerVote(NodeId nodeid, uint32_t error)
Register a new vote for an item and update confidence accordingly.
Definition: processor.cpp:32
static constexpr std::chrono::milliseconds AVALANCHE_DEFAULT_QUERY_TIMEOUT
How long before we consider that a query timed out.
Definition: processor.h:57
static constexpr size_t AVALANCHE_MAX_ELEMENT_POLL
Maximum item that can be polled at once.
Definition: processor.h:49
bool ForNode(NodeId id, std::function< bool(CNode *pnode)> func)
Definition: net.cpp:3072
CChainState & ChainstateActive()
Definition: validation.cpp:79
bool addNode(NodeId nodeid, const Proof &proof, const Delegation &delegation)
Definition: processor.cpp:384
uint64_t nRemoteExtraEntropy
Definition: net.h:852
bool IsBlockFinalized(const CBlockIndex *pindex) const EXCLUSIVE_LOCKS_REQUIRED(cs_main)
Checks if a block is finalized.
Response response
Definition: processor.cpp:249
SchnorrSig sig
Definition: processor.cpp:250
static constexpr int AVALANCHE_MAX_INFLIGHT_POLL
How many inflight requests can exist for one item.
Definition: processor.h:63
bool addLevel(const CKey &key, const CPubKey &newMaster)
#define LOCK(cs)
Definition: sync.h:230
bool Contains(const CBlockIndex *pindex) const
Efficiently check whether a block is present in this chain.
Definition: chain.h:190
Chain notifications.
Definition: chain.h:270
std::unique_ptr< avalanche::Processor > g_avalanche
Global avalanche instance.
Definition: processor.cpp:28
An encapsulated public key.
Definition: pubkey.h:31
RWCollection< QuerySet > queries
Definition: processor.h:244
std::chrono::milliseconds queryTimeoutDuration
Definition: processor.h:198
NodePeerManager * nodePeerManager
Definition: processor.h:197
void MakeNewKey(bool fCompressed)
Generate a new private key using a cryptographic PRNG.
Definition: key.cpp:183
bool stopEventLoop()
Definition: eventloop.cpp:45
CConnman * connman
Definition: processor.h:196
RecursiveMutex cs_main
Global state.
Definition: validation.cpp:102
bool isInvalid() const
Definition: blockstatus.h:102
static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP
Run the avalanche event loop every 10ms.
Definition: processor.cpp:24
uint256 hash
Definition: protocol.h:508
bool isAccepted(const CBlockIndex *pindex) const
Definition: processor.cpp:223
const uint32_t seed
Definition: processor.h:88
int64_t NodeId
Definition: net.h:101
Definition: net.h:167
static constexpr int AVALANCHE_FINALIZATION_SCORE
Finalization score.
Definition: processor.h:44
uint32_t GetError() const
Definition: protocol.h:27
NodeId GetId() const
Definition: net.h:1074
EventLoop eventLoop
Event loop machinery.
Definition: processor.h:252
void Misbehaving(const NodeId pnode, const int howmuch, const std::string &message)
Increment peer&#39;s misbehavior score.
#define SERIALIZE_METHODS(cls, obj)
Implement the Serialize and Unserialize methods by delegating to a single templated static method tha...
Definition: serialize.h:226
uint256 GetHash()
Definition: hash.h:118
static bool IsWorthPolling(const CBlockIndex *pindex)
Definition: processor.cpp:122
256-bit opaque blob.
Definition: uint256.h:123
RWCollection< BlockVoteMap > vote_records
Blocks to run avalanche on.
Definition: processor.h:203
const char * AVARESPONSE
Contains an avalanche::Response.
Definition: protocol.cpp:56
Vote history.
Definition: processor.h:74
bool registerVotes(NodeId nodeid, const Response &response, std::vector< BlockUpdate > &updates)
Definition: processor.cpp:279
A BlockHash is a unqiue identifier for a block.
Definition: blockhash.h:13
Interface giving clients (wallet processes, maybe other analysis tools in the future) ability to acce...
Definition: chain.h:105
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: blockindex.h:23
bool addNodeToQuorum(NodeId nodeid)
Add the node to the quorum.
Definition: processor.cpp:80
std::string GetArg(const std::string &strArg, const std::string &strDefault) const
Return string argument or default value.
Definition: system.cpp:480
std::vector< uint8_t > ParseHex(const char *psz)
ReadView getReadView() const
Definition: rwcollection.h:75
CBlockIndex * LookupBlockIndex(const BlockHash &hash)
Definition: validation.cpp:151
uint32_t type
Definition: protocol.h:507
ArgsManager gArgs
Definition: system.cpp:77
uint64_t getRound() const
Definition: protocol.h:43
const char * AVAPOLL
Contains an avalanche::Poll.
Definition: protocol.cpp:55
static int count
Definition: tests.c:35
uint16_t getConfidence() const
Definition: processor.h:113
A writer stream (for serialization) that computes a 256-bit hash.
Definition: hash.h:99
bool startEventLoop(CScheduler &scheduler, std::function< void()> runEventLoop, std::chrono::milliseconds delta)
Definition: eventloop.cpp:13
void clearInflightRequest(uint8_t count=1)
Clear count inflight requests.
Definition: processor.h:139
An encapsulated secp256k1 private key.
Definition: key.h:28
uint64_t GetLocalExtraEntropy() const
Definition: net.h:1077
Information about a peer.
Definition: net.h:810
CKey DecodeSecret(const std::string &str)
Definition: key_io.cpp:80
bool registerPoll() const
Register that a request is being made regarding that item.
Definition: processor.cpp:111
Mutex cs_peerManager
Keep track of the peers and associated infos.
Definition: processor.h:213
AssertLockHeld(g_cs_orphans)
#define READWRITE(...)
Definition: serialize.h:179
int GetCommonVersion() const
Definition: net.h:1092
bool startEventLoop(CScheduler &scheduler)
Definition: processor.cpp:430
std::atomic< uint64_t > round
Keep track of peers and queries sent.
Definition: processor.h:208
std::array< uint16_t, 8 > nodeFilter
Definition: processor.h:94
bool SignSchnorr(const uint256 &hash, SchnorrSig &sig, uint32_t test_case=0) const
Create a Schnorr signature.
Definition: key.cpp:288
bool error(const char *fmt, const Args &... args)
Definition: system.h:47
uint32_t countBits(uint32_t v)
Definition: bitmanip.h:12
Processor(interfaces::Chain &chain, CConnman *connmanIn, NodePeerManager *nodePeerManagerIn)
Definition: processor.cpp:156
CPubKey getSessionPubKey() const
Definition: processor.cpp:396
bool isAccepted() const
Vote accounting facilities.
Definition: processor.h:111