72 return connect(
m_socket, addr, addr_len);
76 socklen_t *opt_len)
const {
77 return getsockopt(
m_socket, level, opt_name,
static_cast<char *
>(opt_val),
82 Event *occurred)
const {
87 if (requested &
RECV) {
90 if (requested &
SEND) {
98 if (occurred !=
nullptr) {
100 if (fd.revents & POLLIN) {
103 if (fd.revents & POLLOUT) {
116 FD_ZERO(&fdset_recv);
117 FD_ZERO(&fdset_send);
119 if (requested &
RECV) {
123 if (requested &
SEND) {
129 if (select(
m_socket + 1, &fdset_recv, &fdset_send,
nullptr,
134 if (occurred !=
nullptr) {
136 if (FD_ISSET(
m_socket, &fdset_recv)) {
139 if (FD_ISSET(
m_socket, &fdset_send)) {
149 std::chrono::milliseconds timeout,
151 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
159 sent +=
static_cast<size_t>(ret);
160 if (sent == data.size()) {
166 throw std::runtime_error(
171 const auto now = GetTime<std::chrono::milliseconds>();
173 if (now >= deadline) {
174 throw std::runtime_error(
175 strprintf(
"Send timeout (sent only %u of %u bytes before that)",
181 "Send interrupted (sent only %u of %u bytes before that)", sent,
187 const auto wait_time = std::min(
194 std::chrono::milliseconds timeout,
196 size_t max_data)
const {
197 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
199 bool terminator_found{
false};
211 if (data.size() >= max_data) {
212 throw std::runtime_error(
213 strprintf(
"Received too many bytes without a terminator (%u)",
219 const ssize_t peek_ret{
220 Recv(buf, std::min(
sizeof(buf), max_data - data.size()), MSG_PEEK)};
226 throw std::runtime_error(
232 throw std::runtime_error(
233 "Connection unexpectedly closed by peer");
235 auto end = buf + peek_ret;
236 auto terminator_pos = std::find(buf, end, terminator);
237 terminator_found = terminator_pos != end;
239 const size_t try_len{terminator_found
240 ? terminator_pos - buf + 1
241 :
static_cast<size_t>(peek_ret)};
243 const ssize_t read_ret{
Recv(buf, try_len, 0)};
245 if (read_ret < 0 ||
static_cast<size_t>(read_ret) != try_len) {
246 throw std::runtime_error(
247 strprintf(
"recv() returned %u bytes on attempt to read "
248 "%u bytes but previous "
249 "peek claimed %u bytes are available",
250 read_ret, try_len, peek_ret));
254 const size_t append_len{terminator_found ? try_len - 1
257 data.append(buf, buf + append_len);
259 if (terminator_found) {
264 const auto now = GetTime<std::chrono::milliseconds>();
266 if (now >= deadline) {
267 throw std::runtime_error(
268 strprintf(
"Receive timeout (received %u bytes without "
269 "terminator before that)",
274 throw std::runtime_error(
275 strprintf(
"Receive interrupted (received %u bytes without "
276 "terminator before that)",
282 const auto wait_time = std::min(
290 errmsg =
"not connected";
295 switch (
Recv(&c,
sizeof(c), MSG_PEEK)) {
316 if (FormatMessageW(FORMAT_MESSAGE_FROM_SYSTEM |
317 FORMAT_MESSAGE_IGNORE_INSERTS |
318 FORMAT_MESSAGE_MAX_WIDTH_MASK,
319 nullptr, err, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
320 buf, ARRAYSIZE(buf),
nullptr)) {
323 std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>,
wchar_t>()
327 return strprintf(
"Unknown error (%d)", err);
339 #ifdef STRERROR_R_CHAR_P
341 s = strerror_r(err, buf,
sizeof(buf));
345 if (strerror_r(err, buf,
sizeof(buf))) {
358 int ret = closesocket(hSocket);
360 int ret = close(hSocket);
363 LogPrintf(
"Socket close failed: %d. Error: %s\n", hSocket,
A helper class for interruptible sleeps.
RAII helper class that manages a socket.
virtual ssize_t Send(const void *data, size_t len, int flags) const
send(2) wrapper.
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
SOCKET m_socket
Contained socket.
virtual void SendComplete(const std::string &data, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt) const
Send the given data, retrying on transient errors.
virtual bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const
Wait for readiness for input (recv) or output (send).
virtual ~Sock()
Destructor, close the socket or do nothing if empty.
Sock()
Default constructor, creates an empty object that does nothing when destroyed.
virtual SOCKET Release()
Get the value of the contained socket and drop ownership.
virtual bool IsConnected(std::string &errmsg) const
Check if still connected.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
virtual SOCKET Get() const
Get the value of the contained socket.
virtual int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const
getsockopt(2) wrapper.
virtual int Connect(const sockaddr *addr, socklen_t addr_len) const
connect(2) wrapper.
Sock & operator=(const Sock &)=delete
Copy assignment operator, disabled because closing the same socket twice is undesirable.
virtual void Reset()
Close if non-empty.
virtual ssize_t Recv(void *buf, size_t len, int flags) const
recv(2) wrapper.
virtual std::string RecvUntilTerminator(uint8_t terminator, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt, size_t max_data) const
Read from socket until a terminator character is encountered.
#define WSAGetLastError()
static bool IsSelectableSocket(const SOCKET &s)
static bool IOErrorIsPermanent(int err)
std::string NetworkErrorString(int err)
Return readable error string for a network error code.
bool CloseSocket(SOCKET &hSocket)
Close socket and set hSocket to INVALID_SOCKET.
static constexpr auto MAX_WAIT_FOR_IO
Maximum time to wait for I/O readiness.
struct timeval MillisToTimeval(int64_t nTimeout)
Convert milliseconds to a struct timeval for e.g.
constexpr int64_t count_milliseconds(std::chrono::milliseconds t)