Bitcoin ABC  0.22.13
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2018 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
7 
8 #include <primitives/block.h>
9 #include <util/system.h>
10 
11 void zmqError(const char *str) {
12  LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str,
13  zmq_strerror(errno));
14 }
15 
17 
19  Shutdown();
20 
21  for (std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin();
22  i != notifiers.end(); ++i) {
23  delete *i;
24  }
25 }
26 
27 std::list<const CZMQAbstractNotifier *>
29  std::list<const CZMQAbstractNotifier *> result;
30  for (const auto *n : notifiers) {
31  result.push_back(n);
32  }
33  return result;
34 }
35 
37  CZMQNotificationInterface *notificationInterface = nullptr;
38  std::map<std::string, CZMQNotifierFactory> factories;
39  std::list<CZMQAbstractNotifier *> notifiers;
40 
41  factories["pubhashblock"] =
42  CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
43  factories["pubhashtx"] =
44  CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
45  factories["pubrawblock"] =
46  CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
47  factories["pubrawtx"] =
48  CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
49 
50  for (const auto &entry : factories) {
51  std::string arg("-zmq" + entry.first);
52  if (gArgs.IsArgSet(arg)) {
53  CZMQNotifierFactory factory = entry.second;
54  std::string address = gArgs.GetArg(arg, "");
55  CZMQAbstractNotifier *notifier = factory();
56  notifier->SetType(entry.first);
57  notifier->SetAddress(address);
59  static_cast<int>(gArgs.GetArg(
61  notifiers.push_back(notifier);
62  }
63  }
64 
65  if (!notifiers.empty()) {
66  notificationInterface = new CZMQNotificationInterface();
67  notificationInterface->notifiers = notifiers;
68 
69  if (!notificationInterface->Initialize()) {
70  delete notificationInterface;
71  notificationInterface = nullptr;
72  }
73  }
74 
75  return notificationInterface;
76 }
77 
78 // Called at startup to conditionally set up ZMQ socket(s)
80  int major = 0, minor = 0, patch = 0;
81  zmq_version(&major, &minor, &patch);
82  LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
83 
84  LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
85  assert(!pcontext);
86 
87  pcontext = zmq_ctx_new();
88 
89  if (!pcontext) {
90  zmqError("Unable to initialize context");
91  return false;
92  }
93 
94  std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin();
95  for (; i != notifiers.end(); ++i) {
96  CZMQAbstractNotifier *notifier = *i;
97  if (notifier->Initialize(pcontext)) {
98  LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n",
99  notifier->GetType(), notifier->GetAddress());
100  } else {
101  LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n",
102  notifier->GetType(), notifier->GetAddress());
103  break;
104  }
105  }
106 
107  if (i != notifiers.end()) {
108  return false;
109  }
110 
111  return true;
112 }
113 
114 // Called during shutdown sequence
116  LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
117  if (pcontext) {
118  for (std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin();
119  i != notifiers.end(); ++i) {
120  CZMQAbstractNotifier *notifier = *i;
121  LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n",
122  notifier->GetType(), notifier->GetAddress());
123  notifier->Shutdown();
124  }
125  zmq_ctx_term(pcontext);
126 
127  pcontext = nullptr;
128  }
129 }
130 
132  const CBlockIndex *pindexFork,
133  bool fInitialDownload) {
134  // In IBD or blocks were disconnected without any new ones
135  if (fInitialDownload || pindexNew == pindexFork) {
136  return;
137  }
138 
139  for (std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin();
140  i != notifiers.end();) {
141  CZMQAbstractNotifier *notifier = *i;
142  if (notifier->NotifyBlock(pindexNew)) {
143  i++;
144  } else {
145  notifier->Shutdown();
146  i = notifiers.erase(i);
147  }
148  }
149 }
150 
152  const CTransactionRef &ptx) {
153  // Used by BlockConnected and BlockDisconnected as well, because they're all
154  // the same external callback.
155  const CTransaction &tx = *ptx;
156 
157  for (std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin();
158  i != notifiers.end();) {
159  CZMQAbstractNotifier *notifier = *i;
160  if (notifier->NotifyTransaction(tx)) {
161  i++;
162  } else {
163  notifier->Shutdown();
164  i = notifiers.erase(i);
165  }
166  }
167 }
168 
170  const std::shared_ptr<const CBlock> &pblock,
171  const CBlockIndex *pindexConnected) {
172  for (const CTransactionRef &ptx : pblock->vtx) {
173  // Do a normal notify for each transaction added in the block
175  }
176 }
177 
179  const std::shared_ptr<const CBlock> &pblock,
180  const CBlockIndex *pindexDisconnected) {
181  for (const CTransactionRef &ptx : pblock->vtx) {
182  // Do a normal notify for each transaction removed in block
183  // disconnection
185  }
186 }
187 
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:336
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected.
bool IsArgSet(const std::string &strArg) const
Return true if the given argument has been manually set.
Definition: system.cpp:390
std::list< CZMQAbstractNotifier * > notifiers
#define LogPrint(category,...)
Definition: logging.h:192
void TransactionAddedToMempool(const CTransactionRef &tx) override
Notifies listeners of a transaction having been added to mempool.
virtual bool NotifyBlock(const CBlockIndex *pindex)
std::string GetAddress() const
static CZMQNotificationInterface * Create()
virtual bool NotifyTransaction(const CTransaction &transaction)
void SetOutboundMessageHighWaterMark(const int sndhwm)
void BlockConnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
void SetAddress(const std::string &a)
static const int DEFAULT_ZMQ_SNDHWM
virtual void Shutdown()=0
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
void zmqError(const char *str)
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: blockindex.h:23
std::string GetArg(const std::string &strArg, const std::string &strDefault) const
Return string argument or default value.
Definition: system.cpp:479
ArgsManager gArgs
Definition: system.cpp:76
CZMQAbstractNotifier *(* CZMQNotifierFactory)()
virtual bool Initialize(void *pcontext)=0
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:211
void SetType(const std::string &t)
std::string GetType() const
CZMQNotificationInterface * g_zmq_notification_interface