unify send recv loops (#76)

* kmod: unify recv loops

* kmod: unify send loops

* kmod: make control flow explicit
This commit is contained in:
Isabella Bosia
2025-11-27 11:16:33 +00:00
committed by GitHub
parent b262674e9c
commit c0039428e3

View File

@@ -38,6 +38,37 @@ static void ternfs_free_fs_info(struct ternfs_fs_info* info) {
kfree(info);
}
static inline int recvloop(struct socket *sock, char *target, size_t size) {
int read_so_far;
for (read_so_far = 0; read_so_far < size;) {
struct kvec iov = {
.iov_base = target + read_so_far,
.iov_len = size - read_so_far,
};
struct msghdr msg = {NULL};
int read = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) return -ECONNRESET;
if (read < 0) return read;
read_so_far += read;
}
return read_so_far;
}
static inline int sendloop(struct socket *sock, char *target, size_t size) {
int written_so_far;
for (written_so_far = 0; written_so_far < size;) {
struct kvec iov = {
.iov_base = target + written_so_far,
.iov_len = size - written_so_far,
};
struct msghdr msg = {NULL};
int written = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
if (written < 0) return written;
written_so_far += written;
}
return written_so_far;
}
static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
int err;
@@ -45,31 +76,14 @@ static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
err = ternfs_create_registry_socket(&info->registry_addr, &registry_sock);
if (err < 0) { return err; }
struct kvec iov;
struct msghdr msg = {NULL};
{
static_assert(TERNFS_LOCAL_SHARDS_REQ_SIZE == 0);
char registry_req[TERNFS_REGISTRY_REQ_HEADER_SIZE];
ternfs_write_registry_req_header(registry_req, TERNFS_LOCAL_SHARDS_REQ_SIZE, TERNFS_REGISTRY_LOCAL_SHARDS);
int written_so_far;
for (written_so_far = 0; written_so_far < sizeof(registry_req);) {
iov.iov_base = registry_req + written_so_far;
iov.iov_len = sizeof(registry_req) - written_so_far;
int written = kernel_sendmsg(registry_sock, &msg, &iov, 1, iov.iov_len);
if (written < 0) { err = written; goto out_sock; }
written_so_far += written;
}
if (err = sendloop(registry_sock, registry_req, sizeof registry_req), err < 0) goto out_sock;
char shards_resp_header[TERNFS_REGISTRY_RESP_HEADER_SIZE + 2]; // + 2 = list len
int read_so_far;
for (read_so_far = 0; read_so_far < sizeof(shards_resp_header);) {
iov.iov_base = shards_resp_header + read_so_far;
iov.iov_len = sizeof(shards_resp_header) - read_so_far;
int read = kernel_recvmsg(registry_sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) { err = -ECONNRESET; goto out_sock; }
if (read < 0) { err = read; goto out_sock; }
read_so_far += read;
}
if (err = recvloop(registry_sock, shards_resp_header, sizeof shards_resp_header), err < 0) goto out_sock;
u32 registry_resp_len;
u8 registry_resp_kind;
err = ternfs_read_registry_resp_header(shards_resp_header, &registry_resp_len, &registry_resp_kind);
@@ -87,14 +101,7 @@ static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
int shid;
for (shid = 0; shid < 256; shid++) {
char shard_info_resp[TERNFS_SHARD_INFO_SIZE];
for (read_so_far = 0; read_so_far < TERNFS_SHARD_INFO_SIZE;) {
iov.iov_base = shard_info_resp + read_so_far;
iov.iov_len = sizeof(shard_info_resp) - read_so_far;
int read = kernel_recvmsg(registry_sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) { err = -ECONNRESET; goto out_sock; }
if (read < 0) { err = read; goto out_sock; }
read_so_far += read;
}
if (err = recvloop(registry_sock, shard_info_resp, sizeof shard_info_resp), err < 0) goto out_sock;
struct ternfs_bincode_get_ctx ctx = {
.buf = shard_info_resp,
.end = shard_info_resp + sizeof(shard_info_resp),
@@ -124,25 +131,10 @@ static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
static_assert(TERNFS_LOCAL_CDC_REQ_SIZE == 0);
char cdc_req[TERNFS_REGISTRY_REQ_HEADER_SIZE];
ternfs_write_registry_req_header(cdc_req, TERNFS_LOCAL_CDC_REQ_SIZE, TERNFS_REGISTRY_LOCAL_CDC);
int written_so_far;
for (written_so_far = 0; written_so_far < sizeof(cdc_req);) {
iov.iov_base = cdc_req + written_so_far;
iov.iov_len = sizeof(cdc_req) - written_so_far;
int written = kernel_sendmsg(registry_sock, &msg, &iov, 1, iov.iov_len);
if (written < 0) { err = written; goto out_sock; }
written_so_far += written;
}
if (err = sendloop(registry_sock, cdc_req, sizeof cdc_req), err < 0) goto out_sock;
char cdc_resp_header[TERNFS_REGISTRY_RESP_HEADER_SIZE];
int read_so_far;
for (read_so_far = 0; read_so_far < sizeof(cdc_resp_header);) {
iov.iov_base = cdc_resp_header + read_so_far;
iov.iov_len = sizeof(cdc_resp_header) - read_so_far;
int read = kernel_recvmsg(registry_sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) { err = -ECONNRESET; goto out_sock; }
if (read < 0) { err = read; goto out_sock; }
read_so_far += read;
}
if (err = recvloop(registry_sock, cdc_resp_header, sizeof cdc_resp_header), err < 0) goto out_sock;
u32 registry_resp_len;
u8 registry_resp_kind;
err = ternfs_read_registry_resp_header(cdc_resp_header, &registry_resp_len, &registry_resp_kind);
@@ -153,14 +145,7 @@ static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
}
{
char cdc_resp[TERNFS_LOCAL_CDC_RESP_SIZE];
for (read_so_far = 0; read_so_far < sizeof(cdc_resp);) {
iov.iov_base = (char*)&cdc_resp + read_so_far;
iov.iov_len = sizeof(cdc_resp) - read_so_far;
int read = kernel_recvmsg(registry_sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) { err = -ECONNRESET; goto out_sock; }
if (read < 0) { err = read; goto out_sock; }
read_so_far += read;
}
if (err = recvloop(registry_sock, cdc_resp, sizeof cdc_resp), err < 0) goto out_sock;
struct ternfs_bincode_get_ctx ctx = {
.buf = cdc_resp,
.end = cdc_resp + sizeof(cdc_resp),
@@ -199,28 +184,13 @@ static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
ternfs_local_changed_block_services_req_put_changed_since(&ctx, start, changed_since, info->block_services_last_changed_time);
ternfs_local_changed_block_services_req_put_end(&ctx, changed_since, end);
ternfs_write_registry_req_header(changed_block_services_req, TERNFS_LOCAL_CHANGED_BLOCK_SERVICES_REQ_SIZE, TERNFS_REGISTRY_LOCAL_CHANGED_BLOCK_SERVICES);
int written_so_far;
for (written_so_far = 0; written_so_far < sizeof(changed_block_services_req);) {
iov.iov_base = changed_block_services_req + written_so_far;
iov.iov_len = sizeof(changed_block_services_req) - written_so_far;
int written = kernel_sendmsg(registry_sock, &msg, &iov, 1, iov.iov_len);
if (written < 0) { err = written; goto out_sock; }
written_so_far += written;
}
if (err = sendloop(registry_sock, changed_block_services_req, sizeof changed_block_services_req), err < 0) goto out_sock;
}
u32 registry_resp_len;
u8 registry_resp_kind;
{
char block_services_resp_header[TERNFS_REGISTRY_RESP_HEADER_SIZE];
int read_so_far;
for (read_so_far = 0; read_so_far < sizeof(block_services_resp_header);) {
iov.iov_base = block_services_resp_header + read_so_far;
iov.iov_len = sizeof(block_services_resp_header) - read_so_far;
int read = kernel_recvmsg(registry_sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) { err = -ECONNRESET; goto out_sock; }
if (read < 0) { err = read; goto out_sock; }
read_so_far += read;
}
if (err = recvloop(registry_sock, block_services_resp_header, sizeof block_services_resp_header), err < 0) goto out_sock;
err = ternfs_read_registry_resp_header(block_services_resp_header, &registry_resp_len, &registry_resp_kind);
if (err < 0) { goto out_sock; }
}
@@ -233,15 +203,7 @@ static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
err = -EINVAL;
goto out_sock;
}
int read_so_far;
for (read_so_far = 0; read_so_far < sizeof(last_changed_and_len);) {
iov.iov_base = last_changed_and_len + read_so_far;
iov.iov_len = sizeof(last_changed_and_len) - read_so_far;
int read = kernel_recvmsg(registry_sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) { err = -ECONNRESET; goto out_sock; }
if (read < 0) { err = read; goto out_sock; }
read_so_far += read;
}
if (err = recvloop(registry_sock, last_changed_and_len, sizeof last_changed_and_len), err < 0) goto out_sock;
last_changed = get_unaligned_le64(last_changed_and_len);
block_services_len = get_unaligned_le16(last_changed_and_len + sizeof(last_changed));
registry_resp_len -= sizeof(last_changed_and_len);
@@ -254,17 +216,9 @@ static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
goto out_sock;
}
u16 block_service_idx;
int read_so_far;
for (block_service_idx = 0; block_service_idx < block_services_len; block_service_idx++) {
char block_service_buf[TERNFS_BLOCK_SERVICE_SIZE];
for (read_so_far = 0; read_so_far < sizeof(block_service_buf);) {
iov.iov_base = block_service_buf + read_so_far;
iov.iov_len = sizeof(block_service_buf) - read_so_far;
int read = kernel_recvmsg(registry_sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) { err = -ECONNRESET; goto out_sock; }
if (read < 0) { err = read; goto out_sock; }
read_so_far += read;
}
if (err = recvloop(registry_sock, block_service_buf, sizeof block_service_buf), err < 0) goto out_sock;
struct ternfs_bincode_get_ctx bs_ctx = {
.buf = block_service_buf,
.end = block_service_buf + sizeof(block_service_buf),
@@ -307,25 +261,10 @@ static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
static_assert(TERNFS_INFO_REQ_SIZE == 0);
char info_req[TERNFS_REGISTRY_REQ_HEADER_SIZE];
ternfs_write_registry_req_header(info_req, 0, TERNFS_REGISTRY_INFO);
int written_so_far;
for (written_so_far = 0; written_so_far < sizeof(info_req);) {
iov.iov_base = info_req + written_so_far;
iov.iov_len = sizeof(info_req) - written_so_far;
int written = kernel_sendmsg(registry_sock, &msg, &iov, 1, iov.iov_len);
if (written < 0) { err = written; goto out_sock; }
written_so_far += written;
}
if (err = sendloop(registry_sock, info_req, sizeof info_req), err < 0) goto out_sock;
char info_resp_header[TERNFS_REGISTRY_RESP_HEADER_SIZE];
int read_so_far;
for (read_so_far = 0; read_so_far < sizeof(info_resp_header);) {
iov.iov_base = info_resp_header + read_so_far;
iov.iov_len = sizeof(info_resp_header) - read_so_far;
int read = kernel_recvmsg(registry_sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) { err = -ECONNRESET; goto out_sock; }
if (read < 0) { err = read; goto out_sock; }
read_so_far += read;
}
if (err = recvloop(registry_sock, info_resp_header, sizeof info_resp_header), err < 0) goto out_sock;
u32 info_resp_len;
u8 info_resp_kind;
err = ternfs_read_registry_resp_header(info_resp_header, &info_resp_len, &info_resp_kind);
@@ -337,14 +276,7 @@ static int ternfs_refresh_fs_info(struct ternfs_fs_info* info) {
err = -EIO; goto out_sock;
}
char info_resp[TERNFS_INFO_RESP_SIZE];
for (read_so_far = 0; read_so_far < sizeof(info_resp);) {
iov.iov_base = (char*)&info_resp + read_so_far;
iov.iov_len = sizeof(info_resp) - read_so_far;
int read = kernel_recvmsg(registry_sock, &msg, &iov, 1, iov.iov_len, 0);
if (read == 0) { err = -ECONNRESET; goto out_sock; }
if (read < 0) { err = read; goto out_sock; }
read_so_far += read;
}
if (err = recvloop(registry_sock, info_resp, sizeof info_resp), err < 0) goto out_sock;
struct ternfs_bincode_get_ctx ctx = {
.buf = info_resp,
.end = info_resp + sizeof(info_resp),