mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2026-01-06 02:49:45 -06:00
core: UDPSocketPair and use IpPort AddrsInfo everywhere
* core: UDPSocketPair and use IpPort AddrsInfo everywhere * Refactor UDPSocketPair a bit * ci: kmod always delete img before create * shuckle: fix scripts/json marshal --------- Co-authored-by: Francesco Mazzoli <francesco.mazzoli@xtxmarkets.com>
This commit is contained in:
committed by
GitHub Enterprise
parent
c9ba44ea1e
commit
8a0ea10cde
84
cpp/core/UDPSocketPair.cpp
Normal file
84
cpp/core/UDPSocketPair.cpp
Normal file
@@ -0,0 +1,84 @@
|
||||
#include "UDPSocketPair.hpp"
|
||||
|
||||
#include "Common.hpp"
|
||||
#include "Loop.hpp"
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <cstddef>
|
||||
#include <ostream>
|
||||
|
||||
UDPSocketPair::UDPSocketPair(Env& env, const AddrsInfo& addr_, int32_t sockBufSize) : _addr(addr_) {
|
||||
sockaddr_in saddr;
|
||||
for (int i = 0; i < 2; i++) {
|
||||
bool hasIp = _addr[i].ip != Ip({0,0,0,0});
|
||||
ALWAYS_ASSERT(i > 0 || hasIp, "The first IP address must be specified");
|
||||
if (!hasIp) { continue; }
|
||||
_addr[i].toSockAddrIn(saddr);
|
||||
_initSock(i, saddr, sockBufSize);
|
||||
_addr[i].port = ntohs(saddr.sin_port);
|
||||
}
|
||||
LOG_INFO(env, "Bound to addresses %s", _addr);
|
||||
}
|
||||
|
||||
void UDPSocketPair::_initSock(uint8_t sockIdx, sockaddr_in& addr, int32_t sockBufSize) {
|
||||
auto sock = Sock::UDPSock();
|
||||
if (sock.error()) {
|
||||
throw SYSCALL_EXCEPTION("cannot create socket");
|
||||
}
|
||||
|
||||
if (bind(sock.get(), (sockaddr*)&addr, sizeof(addr)) != 0) {
|
||||
char ip[INET_ADDRSTRLEN];
|
||||
throw SYSCALL_EXCEPTION("cannot bind socket to addr %s:%s", inet_ntop(AF_INET, &addr.sin_addr, ip, INET_ADDRSTRLEN), ntohs(addr.sin_port));
|
||||
}
|
||||
{
|
||||
socklen_t addrLen = sizeof(addr);
|
||||
if (getsockname(sock.get(), (sockaddr*)&addr, &addrLen) < 0) {
|
||||
throw SYSCALL_EXCEPTION("getsockname");
|
||||
}
|
||||
}
|
||||
{
|
||||
if (setsockopt(sock.get(), SOL_SOCKET, SO_RCVBUF, (void*)&sockBufSize, sizeof(sockBufSize)) < 0) {
|
||||
throw SYSCALL_EXCEPTION("setsockopt");
|
||||
}
|
||||
}
|
||||
_socks[sockIdx] = std::move(sock);
|
||||
}
|
||||
|
||||
|
||||
void UDPSender::sendMessages(Env& env, const UDPSocketPair& socks) {
|
||||
for (size_t i = 0; i < _sendAddrs.size(); ++i) {
|
||||
if (_sendAddrs[i].size() == 0) { continue; }
|
||||
LOG_DEBUG(env, "sending %s messages to socket (%s)[%s]", _sendAddrs[i].size(), socks.addr(), i);
|
||||
for (size_t j = 0; j < _sendAddrs[i].size(); ++j) {
|
||||
auto& vec = _sendVecs[i][j];
|
||||
vec.iov_base = &_sendBuf[(size_t)vec.iov_base];
|
||||
auto& hdr = _sendHdrs[i].emplace_back();
|
||||
ALWAYS_ASSERT(_sendAddrs[i][j].sin_port != 0);
|
||||
hdr.msg_hdr = {
|
||||
.msg_name = (sockaddr_in*)&_sendAddrs[i][j],
|
||||
.msg_namelen = sizeof(_sendAddrs[i][j]),
|
||||
.msg_iov = &vec,
|
||||
.msg_iovlen = 1,
|
||||
};
|
||||
hdr.msg_len = vec.iov_len;
|
||||
}
|
||||
int ret = sendmmsg(socks.socks()[i].get(), &_sendHdrs[i][0], _sendHdrs[i].size(), 0);
|
||||
if (unlikely(ret < 0)) {
|
||||
// we get this when nf drops packets
|
||||
if (errno != EPERM) {
|
||||
throw SYSCALL_EXCEPTION("sendmmsg");
|
||||
} else {
|
||||
LOG_INFO(env, "dropping %s messages because of EPERM", _sendHdrs[i].size());
|
||||
}
|
||||
} else if (unlikely(ret < _sendHdrs[i].size())) {
|
||||
LOG_INFO(env, "dropping %s out of %s messages since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size());
|
||||
}
|
||||
}
|
||||
|
||||
_sendBuf.clear();
|
||||
for(size_t i = 0; i < _sendHdrs.size(); ++i) {
|
||||
_sendAddrs[i].clear();
|
||||
_sendHdrs[i].clear();
|
||||
_sendVecs[i].clear();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user