Implement Resolve message in raftless_metadata.py

- It's a drop-in replacement for resolve of raftless_metadata.rs
- This is mainly a proof-of-concept, I want to completely change the
interface we have now. However, it demonstrates that we can match the
bincode protocol in python without issues.
This commit is contained in:
James Chicken
2022-07-04 18:50:14 +01:00
parent fbf1afb2ea
commit cfe5b108e0
2 changed files with 218 additions and 9 deletions

161
pyeggsfs/metadata_msgs.py Normal file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/env python3
import enum
from typing import NamedTuple, Optional, Union
import bincode
class RequestKind(enum.IntEnum):
# values derrived from metadata_msgs.rs
MK_DIR = 10
RESOLVE = 13
class ResponseKind(enum.IntEnum):
MK_DIR = 0
RESOLVE = 1
class ResolveReq(NamedTuple):
parent_id: int
subname: str
ts: int
class MkDirReq(NamedTuple):
parent_id: int
subname: str
ReqBodyTy = Union[ResolveReq, MkDirReq]
class MetadataRequest(NamedTuple):
ver: int
request_id: int
body: ReqBodyTy
def pack_request(r: MetadataRequest) -> bytearray:
b = bytearray()
bincode.pack_unsigned_into(r.ver, b)
bincode.pack_unsigned_into(r.request_id, b)
if isinstance(r.body, ResolveReq):
bincode.pack_unsigned_into(RequestKind.RESOLVE, b)
bincode.pack_unsigned_into(r.body.parent_id, b)
bincode.pack_bytes_into(r.body.subname.encode(), b)
bincode.pack_unsigned_into(r.body.ts, b)
else:
raise ValueError(f'Unrecognised body: {r.body}')
return b
def unpack_request(b: bytes) -> MetadataRequest:
u = bincode.UnpackWrapper(b)
ver = bincode.unpack_unsigned(u)
request_id = bincode.unpack_unsigned(u)
kind = bincode.unpack_unsigned(u)
if kind == RequestKind.RESOLVE:
parent_id = bincode.unpack_unsigned(u)
subname_len = bincode.unpack_unsigned(u)
subname = u.read()[:subname_len].decode()
u.advance(subname_len)
ts = bincode.unpack_unsigned(u)
body = ResolveReq(parent_id, subname, ts)
else:
raise ValueError(f'Unrecognised kind: {kind}')
if u.idx != len(b):
raise ValueError(f'Extra bytes found after request: {b[u.idx:]!r}')
return MetadataRequest(ver, request_id, body)
class MetadataErrorKind(enum.IntEnum):
TOO_SOON = 0
INODE_ALREADY_EXISTS = 1
NAME_TOO_LONG = 2
ROCKS_DB_ERROR = 3
NETWORK_ERROR = 4
BINCODE_ERROR = 5
LOGIC_ERROR = 6
UNSUPPORTED_VERSION = 7
class MetadataError(NamedTuple):
kind: MetadataErrorKind
text: str
class ResolvedInode(NamedTuple):
id: int
creation_time: int
deletion_time: int
is_file: bool
class ResolveResp(NamedTuple):
f: Optional[ResolvedInode]
RespBodyTy = Union[MetadataError, ResolveResp]
class MetadataResponse(NamedTuple):
request_id: int
body: RespBodyTy
def pack_response(r: MetadataResponse) -> bytearray:
b = bytearray()
bincode.pack_unsigned_into(r.request_id, b)
if isinstance(r.body, MetadataError):
bincode.pack_unsigned_into(1, b) # Result::Err
bincode.pack_unsigned_into(r.body.kind, b)
bincode.pack_bytes_into(r.body.text.encode(), b)
return b
bincode.pack_unsigned_into(0, b) # Result::Ok
if isinstance(r.body, ResolveResp):
bincode.pack_unsigned_into(ResponseKind.RESOLVE, b)
if r.body.f is None:
bincode.pack_unsigned_into(0, b) # Option::None
else:
bincode.pack_unsigned_into(1, b) # Option::Some
bincode.pack_unsigned_into(r.body.f.id, b)
bincode.pack_unsigned_into(r.body.f.creation_time, b)
bincode.pack_unsigned_into(r.body.f.deletion_time, b)
bincode.pack_unsigned_into(r.body.f.is_file, b)
else:
raise ValueError(f'Unrecognised body: {r.body}')
return b
def __tests() -> None:
packed = bytes([0, 123, 13, 251, 0, 2, 11, 104, 101, 108, 108, 111, 95, 119,
111, 114, 108, 100, 251, 57, 48])
unpacked = MetadataRequest(ver=0, request_id=123,
body=ResolveReq(parent_id=512, subname='hello_world', ts=12345))
unpack_res = unpack_request(packed)
assert(unpack_res == unpacked)
pack_res = pack_request(unpacked)
assert(pack_res == packed)
packed2 = bytes([251, 200, 1, 0, 1, 1, 251, 41, 35, 251, 164, 9, 251, 251,
13, 1])
unpacked2 = MetadataResponse(
request_id=456,
body=ResolveResp(f=ResolvedInode(
id=9001, creation_time=2468, deletion_time=3579, is_file=True)))
pack_res2 = pack_response(unpacked2)
assert(pack_res2 == packed2)
if __name__ == '__main__':
__tests()

View File

@@ -2,14 +2,18 @@
import argparse
import enum
from typing import Dict, List, NamedTuple, Optional, Union
from typing import Dict, List, NamedTuple, Optional, Tuple, Union
from sortedcontainers import SortedDict
import os
import pickle
import socket
import sys
import metadata_msgs
ROOT_INODE_NUMBER = 0
PROTOCOL_VERSION = 0
def shard_from_inode(inode_number: int) -> int:
@@ -47,7 +51,6 @@ class DeadValue:
class Directory:
inode_id: int
parent_inode_id: Optional[int]
mtime: int
living_items: 'SortedDict[LivingKey, LivingValue]'
@@ -77,7 +80,6 @@ class Span:
class File:
inode_id: int
mtime: int
is_eden: bool
cookie: int # or maybe str?
@@ -86,17 +88,63 @@ class File:
spans: List[Span]
class StorageNode:
write_weight: float # used to weight random variable for block creation
private_key: bytes
addr: Tuple[str, int]
class MetadataShard:
def __init__(self, shard: int):
assert 0 <= shard <= 255
self.shard = shard
self.next_inode_id = shard if shard != 0 else 0x100 # never root
self.next_block_id = shard
self.shard_id = shard
self.next_inode_id = shard | 0x100 # 00-FF is reserved
self.next_block_id = shard | 0x100
self.directories: Dict[int, Directory] = {}
self.files: Dict[int, File] = {}
self.storage_nodes: Dict[int, StorageNode] = {}
def run_forever(self) -> None:
pass
def resolve(self, r: metadata_msgs.ResolveReq) -> Optional[metadata_msgs.ResolvedInode]:
parent = self.directories.get(r.parent_id)
if parent is None:
return None
hashed_name = hash(r.subname)
res = parent.living_items.get(LivingKey(hashed_name, r.subname))
if res is None:
#TODO: maybe check dead items? (or not since interface will change)
return None
return metadata_msgs.ResolvedInode(
id=res.inode_id,
creation_time=res.creation_time,
deletion_time=0,
is_file=(res.type != InodeType.DIRECTORY),
)
def run_forever(shard: MetadataShard) -> None:
port = shard.shard_id + 22272
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.bind(('', port))
while True:
data, addr = sock.recvfrom(8192)
request = metadata_msgs.unpack_request(data)
if request.ver != PROTOCOL_VERSION:
print('Ignoring request, unsupported ver:', request.ver,
file=sys.stderr)
continue
if isinstance(request.body, metadata_msgs.ResolveReq):
result = shard.resolve(request.body)
resp_body = metadata_msgs.ResolveResp(result)
else:
print('Ignoring request, unrecognised body:', request.body,
file=sys.stderr)
continue
resp = metadata_msgs.MetadataResponse(
request_id=request.request_id,
body=resp_body
)
print(request, resp, '', sep='\n')
packed = metadata_msgs.pack_response(resp)
sock.sendto(packed, addr)
def main() -> None:
@@ -123,7 +171,7 @@ def main() -> None:
shard_object = MetadataShard(config.shard)
try:
shard_object.run_forever()
run_forever(shard_object)
finally:
print(f'Dumping to {db_fn}')
with open(db_fn, 'wb') as f: