Bitcoin ABC  0.22.13
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <blockdb.h>
8 #include <chain.h>
9 #include <chainparams.h>
10 #include <config.h>
11 #include <primitives/blockhash.h>
12 #include <primitives/txid.h>
13 #include <rpc/server.h>
14 #include <streams.h>
15 #include <util/system.h>
16 
17 #include <cstdarg>
18 
19 static std::multimap<std::string, CZMQAbstractPublishNotifier *>
21 
22 static const char *MSG_HASHBLOCK = "hashblock";
23 static const char *MSG_HASHTX = "hashtx";
24 static const char *MSG_RAWBLOCK = "rawblock";
25 static const char *MSG_RAWTX = "rawtx";
26 
27 // Internal function to send multipart message
28 static int zmq_send_multipart(void *sock, const void *data, size_t size, ...) {
29  va_list args;
30  va_start(args, size);
31 
32  while (1) {
33  zmq_msg_t msg;
34 
35  int rc = zmq_msg_init_size(&msg, size);
36  if (rc != 0) {
37  zmqError("Unable to initialize ZMQ msg");
38  va_end(args);
39  return -1;
40  }
41 
42  void *buf = zmq_msg_data(&msg);
43  memcpy(buf, data, size);
44 
45  data = va_arg(args, const void *);
46 
47  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
48  if (rc == -1) {
49  zmqError("Unable to send ZMQ msg");
50  zmq_msg_close(&msg);
51  va_end(args);
52  return -1;
53  }
54 
55  zmq_msg_close(&msg);
56 
57  if (!data) {
58  break;
59  }
60 
61  size = va_arg(args, size_t);
62  }
63  va_end(args);
64  return 0;
65 }
66 
68  assert(!psocket);
69 
70  // check if address is being used by other publish notifier
71  std::multimap<std::string, CZMQAbstractPublishNotifier *>::iterator i =
73 
74  if (i == mapPublishNotifiers.end()) {
75  psocket = zmq_socket(pcontext, ZMQ_PUB);
76  if (!psocket) {
77  zmqError("Failed to create socket");
78  return false;
79  }
80 
82  "zmq: Outbound message high water mark for %s at %s is %d\n",
84 
85  int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM,
88  if (rc != 0) {
89  zmqError("Failed to set outbound message high water mark");
90  zmq_close(psocket);
91  return false;
92  }
93 
94  rc = zmq_bind(psocket, address.c_str());
95  if (rc != 0) {
96  zmqError("Failed to bind address");
97  zmq_close(psocket);
98  return false;
99  }
100 
101  // register this notifier for the address, so it can be reused for other
102  // publish notifier
103  mapPublishNotifiers.insert(std::make_pair(address, this));
104  return true;
105  } else {
106  LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
108  "zmq: Outbound message high water mark for %s at %s is %d\n",
110 
111  psocket = i->second->psocket;
112  mapPublishNotifiers.insert(std::make_pair(address, this));
113 
114  return true;
115  }
116 }
117 
119  // Early return if Initialize was not called
120  if (!psocket) {
121  return;
122  }
123 
124  int count = mapPublishNotifiers.count(address);
125 
126  // remove this notifier from the list of publishers using this address
127  typedef std::multimap<std::string, CZMQAbstractPublishNotifier *>::iterator
128  iterator;
129  std::pair<iterator, iterator> iterpair =
130  mapPublishNotifiers.equal_range(address);
131 
132  for (iterator it = iterpair.first; it != iterpair.second; ++it) {
133  if (it->second == this) {
134  mapPublishNotifiers.erase(it);
135  break;
136  }
137  }
138 
139  if (count == 1) {
140  LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
141  int linger = 0;
142  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
143  zmq_close(psocket);
144  }
145 
146  psocket = nullptr;
147 }
148 
150  const void *data, size_t size) {
151  assert(psocket);
152 
153  /* send three parts, command & data & a LE 4byte sequence number */
154  uint8_t msgseq[sizeof(uint32_t)];
155  WriteLE32(&msgseq[0], nSequence);
156  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size,
157  msgseq, (size_t)sizeof(uint32_t), nullptr);
158  if (rc == -1) {
159  return false;
160  }
161 
162  /* increment memory only sequence number after sending */
163  nSequence++;
164 
165  return true;
166 }
167 
169  BlockHash hash = pindex->GetBlockHash();
170  LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
171  char data[32];
172  for (unsigned int i = 0; i < 32; i++) {
173  data[31 - i] = hash.begin()[i];
174  }
175  return SendMessage(MSG_HASHBLOCK, data, 32);
176 }
177 
179  const CTransaction &transaction) {
180  TxId txid = transaction.GetId();
181  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", txid.GetHex());
182  char data[32];
183  for (unsigned int i = 0; i < 32; i++) {
184  data[31 - i] = txid.begin()[i];
185  }
186  return SendMessage(MSG_HASHTX, data, 32);
187 }
188 
190  LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n",
191  pindex->GetBlockHash().GetHex());
192 
193  const Config &config = GetConfig();
195  {
196  LOCK(cs_main);
197  CBlock block;
198  if (!ReadBlockFromDisk(block, pindex,
199  config.GetChainParams().GetConsensus())) {
200  zmqError("Can't read block from disk");
201  return false;
202  }
203 
204  ss << block;
205  }
206 
207  return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
208 }
209 
211  const CTransaction &transaction) {
212  TxId txid = transaction.GetId();
213  LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", txid.GetHex());
215  ss << transaction;
216  return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
217 }
static const char * MSG_HASHBLOCK
bool NotifyTransaction(const CTransaction &transaction) override
bool ReadBlockFromDisk(CBlock &block, const FlatFilePos &pos, const Consensus::Params &params)
Functions for disk access for blocks.
Definition: blockdb.cpp:33
uint32_t nSequence
upcounting per message sequence number
#define LogPrint(category,...)
Definition: logging.h:192
static void WriteLE32(uint8_t *ptr, uint32_t x)
Definition: common.h:40
Definition: block.h:62
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:196
static const char * MSG_HASHTX
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
BlockHash GetBlockHash() const
Definition: blockindex.h:133
Definition: config.h:19
size_type size() const
Definition: streams.h:279
#define LOCK(cs)
Definition: sync.h:230
bool SendMessage(const char *command, const void *data, size_t size)
bool NotifyBlock(const CBlockIndex *pindex) override
RecursiveMutex cs_main
Global state.
Definition: validation.cpp:95
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
const Config & GetConfig()
Definition: config.cpp:34
uint8_t * begin()
Definition: uint256.h:76
static const char * MSG_RAWTX
const_iterator begin() const
Definition: streams.h:275
static const char * MSG_RAWBLOCK
A BlockHash is a unqiue identifier for a block.
Definition: blockhash.h:13
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: blockindex.h:23
int RPCSerializationFlags()
Retrieves any serialization flags requested in command line argument.
Definition: server.cpp:559
A TxId is the identifier of a transaction.
Definition: txid.h:14
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:11
std::string GetHex() const
Definition: uint256.cpp:16
static int count
Definition: tests.c:35
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:211
void zmqError(const char *str)
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
const TxId GetId() const
Definition: transaction.h:261
bool Initialize(void *pcontext) override