From 2d1cb0050ddf0db44784f76231c5afa7792015c5 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Tue, 27 Aug 2024 22:20:32 -0400 Subject: [PATCH] dev: puter-wisp --- src/puter-wisp/README.md | 102 +++++ src/puter-wisp/basic.html | 412 ++++++++++++++++++ .../devlog/unit_test_usefulness/a.js | 297 +++++++++++++ .../devlog/unit_test_usefulness/b.js | 307 +++++++++++++ .../devlog/unit_test_usefulness/test_a_b.diff | 18 + src/puter-wisp/package.json | 15 + src/puter-wisp/src/exports.js | 298 +++++++++++++ src/puter-wisp/test/test.js | 50 +++ 8 files changed, 1499 insertions(+) create mode 100644 src/puter-wisp/README.md create mode 100644 src/puter-wisp/basic.html create mode 100644 src/puter-wisp/devlog/unit_test_usefulness/a.js create mode 100644 src/puter-wisp/devlog/unit_test_usefulness/b.js create mode 100644 src/puter-wisp/devlog/unit_test_usefulness/test_a_b.diff create mode 100644 src/puter-wisp/package.json create mode 100644 src/puter-wisp/src/exports.js create mode 100644 src/puter-wisp/test/test.js diff --git a/src/puter-wisp/README.md b/src/puter-wisp/README.md new file mode 100644 index 00000000..bfbab20d --- /dev/null +++ b/src/puter-wisp/README.md @@ -0,0 +1,102 @@ +# Wisp Utilities + +This is still a work in progress. Thes utilities use my own stream interface +to avoid browser/node compatibility issues and because I found it more +convenient. These streams can by used as async iterator objects just like +other conventional implementations. Currently there is no logic for closing +streams or knowing if a stream has been closed, but this is planned. + +## Classes and Factory Functions + +### WispPacket (class) + +Wraps a Uint8Array containing a Wisp packet. `data` should be a Uint8Array +containing only the Wisp frame, starting at the Packet Type and ending at +the last byte of the payload (inclusive). + +```javascript +const packet = new WispPacket({ + data: new Uint8Array(...), + direction: WispPacket.SEND, // or RECV + + // `extra` is optional, for debugging + extra: { some: 'value', }, +}); + +packet.type; // ex: WispPacket.CONTINUE +``` + +#### Methods + +- `describe()` - outputs a summary string + ```javascript + packet.describe(); + // ex: "INFO v2.0 f000000000" + ``` +- `toVirtioFrame` - prepends the size of the Wisp frame (u32LE) +- `log()` - prints a collapsed console group +- `reflect()` - returns a reflected version of the packet (flips `SEND` and `RECV`) + +### NewCallbackByteStream (function) + +Returns a stream for values that get passed through a callback interface. +The stream object (an async iterator object) has a property called +`listener` which can be passed as a listener or called directly. This +listener expects only one argument which is the data to pass through the +stream (typically a value of type `Uint8Array`). + +```javascript +const byteStream = NewCallbackByteStream(); +emulator.add_listener('virtio-console0-output-bytes', + byteStream.listener); +``` + +### NewVirtioFrameStream (function) + +Takes in a byte stream (stream of `Uint8Array`) and assumes that this byte +stream contains integers (u32LE) describing the length (in bytes) of data, +followed by the data. Returns a stream which outputs a complete chunk of +data (as per the specified length) as each value, excluding the bytes that +describe the length. + +```javascript +const virtioStream = NewVirtioFrameStream(byteStream); +``` + +### NewWispPacketStream (function) + +Takes in a stream of `Uint8Array`s, each containing a complete Wisp packet, +and outputs a stream of instances of **WispPacket** + +## Example Use with v86 + +```javascript +const emulator = new V86(...); + +// Get a byte stream for /dev/hvc0 +const byteStream = NewCallbackByteStream(); +emulator.add_listener('virtio-console0-output-bytes', + byteStream.listener); + +// Get a stream of frames with prepended byte lengths +// (for example, `twisp` uses this format) +const virtioStream = NewVirtioFrameStream(byteStream); + +// Get a stream of WispPacket objects +const wispStream = NewWispPacketStream(virtioStream); + +// Async iterator +(async () => { + for ( const packet of wispStream ) { + console.log('Wisp packet!', packet.describe()); + + // Let's send back a reflected packet for INFO! + if ( packet.type === WispPacket.INFO ) { + emulator.bus.send( + 'virtio-console0-input-bytes', + packet.toVirtioFrame(), + ); + } + } +})(); +``` diff --git a/src/puter-wisp/basic.html b/src/puter-wisp/basic.html new file mode 100644 index 00000000..07bbfb3f --- /dev/null +++ b/src/puter-wisp/basic.html @@ -0,0 +1,412 @@ + +Basic Emulator + + + + + + +
+
+ +
diff --git a/src/puter-wisp/devlog/unit_test_usefulness/a.js b/src/puter-wisp/devlog/unit_test_usefulness/a.js new file mode 100644 index 00000000..75b99e7d --- /dev/null +++ b/src/puter-wisp/devlog/unit_test_usefulness/a.js @@ -0,0 +1,297 @@ +const lib = {}; + +// SO: 40031688 +lib.buf2hex = (buffer) => { // buffer is an ArrayBuffer + return [...new Uint8Array(buffer)] + .map(x => x.toString(16).padStart(2, '0')) + .join(''); +} + +// Tiny inline little-endian integer library +lib.get_int = (n_bytes, array8, signed=false) => { + return (v => signed ? v : v >>> 0)( + array8.slice(0,n_bytes).reduce((v,e,i)=>v|=e<<8*i,0)); +} +lib.to_int = (n_bytes, num) => { + return (new Uint8Array()).map((_,i)=>(num>>8*i)&0xFF); +} + +class ATStream { + constructor ({ delegate, acc, transform, observe }) { + this.delegate = delegate; + if ( acc ) this.acc = acc; + if ( transform ) this.transform = transform; + if ( observe ) this.observe = observe; + this.state = {}; + this.carry = []; + } + [Symbol.asyncIterator]() { return this; } + async next_value_ () { + if ( this.carry.length > 0 ) { + console.log('got from carry!', this.carry); + return { + value: this.carry.shift(), + done: false, + }; + } + return await this.delegate.next(); + } + async acc ({ value }) { + return value; + } + async next_ () { + for (;;) { + const ret = await this.next_value_(); + if ( ret.done ) return ret; + const v = await this.acc({ + state: this.state, + value: ret.value, + carry: v => this.carry.push(v), + }); + if ( this.carry.length >= 0 && v === undefined ) { + throw new Error(`no value, but carry value exists`); + } + if ( v === undefined ) continue; + // We have a value, clear the state! + this.state = {}; + if ( this.transform ) { + const new_value = await this.transform( + { value: ret.value }); + return { ...ret, value: new_value }; + } + return { ...ret, value: v }; + } + } + async next () { + const ret = await this.next_(); + if ( this.observe && !ret.done ) { + this.observe(ret); + } + return ret; + } + async enqueue_ (v) { + this.queue.push(v); + } +} + +const NewCallbackByteStream = () => { + let listener; + let queue = []; + const NOOP = () => {}; + let signal = NOOP; + (async () => { + for (;;) { + const v = await new Promise((rslv, rjct) => { + listener = rslv; + }); + queue.push(v); + signal(); + } + })(); + const stream = { + [Symbol.asyncIterator](){ + return this; + }, + async next () { + if ( queue.length > 0 ) { + return { + value: queue.shift(), + done: false, + }; + } + await new Promise(rslv => { + signal = rslv; + }); + signal = NOOP; + const v = queue.shift(); + return { value: v, done: false }; + } + }; + stream.listener = data => { + listener(data); + }; + return stream; +} + +const NewVirtioFrameStream = byteStream => { + return new ATStream({ + delegate: byteStream, + async acc ({ value, carry }) { + if ( ! this.state.buffer ) { + const size = lib.get_int(4, value); + // 512MiB limit in case of attempted abuse or a bug + // (assuming this won't happen under normal conditions) + if ( size > 512*(1024**2) ) { + throw new Error(`Way too much data! (${size} bytes)`); + } + value = value.slice(4); + this.state.buffer = new Uint8Array(size); + this.state.index = 0; + } + + const needed = this.state.buffer.length - this.state.index; + if ( value.length > needed ) { + const remaining = value.slice(needed); + console.log('we got more bytes than we needed', + needed, + remaining, + value.length, + this.state.buffer.length, + this.state.index, + ); + carry(remaining); + } + + const amount = Math.min(value.length, needed); + const added = value.slice(0, amount); + this.state.buffer.set(added, this.state.index); + this.state.index += amount; + + if ( this.state.index > this.state.buffer.length ) { + throw new Error('WUT'); + } + if ( this.state.index == this.state.buffer.length ) { + return this.state.buffer; + } + } + }); +}; + +const wisp_types = [ + { + id: 3, + label: 'CONTINUE', + describe: ({ payload }) => { + return `buffer: ${lib.get_int(4, payload)}B`; + }, + getAttributes ({ payload }) { + return { + buffer_size: lib.get_int(4, payload), + }; + } + }, + { + id: 5, + label: 'INFO', + describe: ({ payload }) => { + return `v${payload[0]}.${payload[1]} ` + + lib.buf2hex(payload.slice(2)); + }, + getAttributes ({ payload }) { + return { + version_major: payload[0], + version_minor: payload[1], + extensions: payload.slice(2), + } + } + }, +]; + +class WispPacket { + static SEND = Symbol('SEND'); + static RECV = Symbol('RECV'); + constructor ({ data, direction, extra }) { + this.direction = direction; + this.data_ = data; + this.extra = extra ?? {}; + this.types_ = { + 1: { label: 'CONNECT' }, + 2: { label: 'DATA' }, + 4: { label: 'CLOSE' }, + }; + for ( const item of wisp_types ) { + this.types_[item.id] = item; + } + } + get type () { + const i_ = this.data_[0]; + return this.types_[i_]; + } + get attributes () { + if ( ! this.type.getAttributes ) return {}; + const attrs = {}; + Object.assign(attrs, this.type.getAttributes({ + payload: this.data_.slice(5), + })); + Object.assign(attrs, this.extra); + return attrs; + } + toVirtioFrame () { + const arry = new Uint8Array(this.data_.length + 4); + arry.set(lib.to_int(4, this.data_.length), 0); + arry.set(this.data_, 4); + return arry; + } + describe () { + return this.type.label + '(' + + (this.type.describe?.({ + payload: this.data_.slice(5), + }) ?? '?') + ')'; + } + log () { + const arrow = + this.direction === this.constructor.SEND ? '->' : + this.direction === this.constructor.RECV ? '<-' : + '<>' ; + console.groupCollapsed(`WISP ${arrow} ${this.describe()}`); + const attrs = this.attributes; + for ( const k in attrs ) { + console.log(k, attrs[k]); + } + console.groupEnd(); + } + reflect () { + const reflected = new WispPacket({ + data: this.data_, + direction: + this.direction === this.constructor.SEND ? + this.constructor.RECV : + this.direction === this.constructor.RECV ? + this.constructor.SEND : + undefined, + extra: { + reflectedFrom: this, + } + }); + return reflected; + } +} + +for ( const item of wisp_types ) { + WispPacket[item.label] = item; +} + +const NewWispPacketStream = frameStream => { + return new ATStream({ + delegate: frameStream, + transform ({ value }) { + return new WispPacket({ + data: value, + direction: WispPacket.RECV, + }); + }, + observe ({ value }) { + value.log(); + } + }); +} + +class WispClient { + constructor ({ + packetStream, + sendFn, + }) { + this.packetStream = packetStream; + this.sendFn = sendFn; + } + send (packet) { + packet.log(); + this.sendFn(packet); + } +} + +module.exports = { + NewVirtioFrameStream, + NewWispPacketStream, + WispPacket, +}; diff --git a/src/puter-wisp/devlog/unit_test_usefulness/b.js b/src/puter-wisp/devlog/unit_test_usefulness/b.js new file mode 100644 index 00000000..4cadc526 --- /dev/null +++ b/src/puter-wisp/devlog/unit_test_usefulness/b.js @@ -0,0 +1,307 @@ +const lib = {}; + +// SO: 40031688 +lib.buf2hex = (buffer) => { // buffer is an ArrayBuffer + return [...new Uint8Array(buffer)] + .map(x => x.toString(16).padStart(2, '0')) + .join(''); +} + +// Tiny inline little-endian integer library +lib.get_int = (n_bytes, array8, signed=false) => { + return (v => signed ? v : v >>> 0)( + array8.slice(0,n_bytes).reduce((v,e,i)=>v|=e<<8*i,0)); +} +lib.to_int = (n_bytes, num) => { + return (new Uint8Array()).map((_,i)=>(num>>8*i)&0xFF); +} + +class ATStream { + constructor ({ delegate, acc, transform, observe }) { + this.delegate = delegate; + if ( acc ) this.acc = acc; + if ( transform ) this.transform = transform; + if ( observe ) this.observe = observe; + this.state = {}; + this.carry = []; + } + [Symbol.asyncIterator]() { return this; } + async next_value_ () { + if ( this.carry.length > 0 ) { + return { + value: this.carry.shift(), + done: false, + }; + } + return await this.delegate.next(); + } + async acc ({ value }) { + return value; + } + async next_ () { + for (;;) { + const ret = await this.next_value_(); + if ( ret.done ) return ret; + const v = await this.acc({ + state: this.state, + value: ret.value, + carry: v => this.carry.push(v), + }); + if ( this.carry.length > 0 && v === undefined ) { + throw new Error(`no value, but carry value exists`); + } + if ( v === undefined ) continue; + // We have a value, clear the state! + this.state = {}; + if ( this.transform ) { + const new_value = await this.transform( + { value: ret.value }); + return { ...ret, value: new_value }; + } + return { ...ret, value: v }; + } + } + async next () { + const ret = await this.next_(); + if ( this.observe && !ret.done ) { + this.observe(ret); + } + return ret; + } + async enqueue_ (v) { + this.queue.push(v); + } +} + +const NewCallbackByteStream = () => { + let listener; + let queue = []; + const NOOP = () => {}; + let signal = NOOP; + (async () => { + for (;;) { + const v = await new Promise((rslv, rjct) => { + listener = rslv; + }); + queue.push(v); + signal(); + } + })(); + const stream = { + [Symbol.asyncIterator](){ + return this; + }, + async next () { + if ( queue.length > 0 ) { + return { + value: queue.shift(), + done: false, + }; + } + await new Promise(rslv => { + signal = rslv; + }); + signal = NOOP; + const v = queue.shift(); + return { value: v, done: false }; + } + }; + stream.listener = data => { + listener(data); + }; + return stream; +} + +const NewVirtioFrameStream = byteStream => { + return new ATStream({ + delegate: byteStream, + async acc ({ value, carry }) { + if ( ! this.state.buffer ) { + if ( this.state.hold ) { + const old_val = value; + let size = this.state.hold.length + value.length; + value = new Uint8Array(size); + value.set(this.state.hold, 0); + value.set(old_val, this.state.hold.length); + } + if ( value.length < 4 ) { + this.state.hold = value; + return undefined; + } + const size = lib.get_int(4, value); + // 512MiB limit in case of attempted abuse or a bug + // (assuming this won't happen under normal conditions) + if ( size > 512*(1024**2) ) { + throw new Error(`Way too much data! (${size} bytes)`); + } + value = value.slice(4); + this.state.buffer = new Uint8Array(size); + this.state.index = 0; + } + + const needed = this.state.buffer.length - this.state.index; + if ( value.length > needed ) { + const remaining = value.slice(needed); + console.log('we got more bytes than we needed', + needed, + remaining, + value.length, + this.state.buffer.length, + this.state.index, + ); + carry(remaining); + } + + const amount = Math.min(value.length, needed); + const added = value.slice(0, amount); + this.state.buffer.set(added, this.state.index); + this.state.index += amount; + + if ( this.state.index > this.state.buffer.length ) { + throw new Error('WUT'); + } + if ( this.state.index == this.state.buffer.length ) { + return this.state.buffer; + } + } + }); +}; + +const wisp_types = [ + { + id: 3, + label: 'CONTINUE', + describe: ({ payload }) => { + return `buffer: ${lib.get_int(4, payload)}B`; + }, + getAttributes ({ payload }) { + return { + buffer_size: lib.get_int(4, payload), + }; + } + }, + { + id: 5, + label: 'INFO', + describe: ({ payload }) => { + return `v${payload[0]}.${payload[1]} ` + + lib.buf2hex(payload.slice(2)); + }, + getAttributes ({ payload }) { + return { + version_major: payload[0], + version_minor: payload[1], + extensions: payload.slice(2), + } + } + }, +]; + +class WispPacket { + static SEND = Symbol('SEND'); + static RECV = Symbol('RECV'); + constructor ({ data, direction, extra }) { + this.direction = direction; + this.data_ = data; + this.extra = extra ?? {}; + this.types_ = { + 1: { label: 'CONNECT' }, + 2: { label: 'DATA' }, + 4: { label: 'CLOSE' }, + }; + for ( const item of wisp_types ) { + this.types_[item.id] = item; + } + } + get type () { + const i_ = this.data_[0]; + return this.types_[i_]; + } + get attributes () { + if ( ! this.type.getAttributes ) return {}; + const attrs = {}; + Object.assign(attrs, this.type.getAttributes({ + payload: this.data_.slice(5), + })); + Object.assign(attrs, this.extra); + return attrs; + } + toVirtioFrame () { + const arry = new Uint8Array(this.data_.length + 4); + arry.set(lib.to_int(4, this.data_.length), 0); + arry.set(this.data_, 4); + return arry; + } + describe () { + return this.type.label + '(' + + (this.type.describe?.({ + payload: this.data_.slice(5), + }) ?? '?') + ')'; + } + log () { + const arrow = + this.direction === this.constructor.SEND ? '->' : + this.direction === this.constructor.RECV ? '<-' : + '<>' ; + console.groupCollapsed(`WISP ${arrow} ${this.describe()}`); + const attrs = this.attributes; + for ( const k in attrs ) { + console.log(k, attrs[k]); + } + console.groupEnd(); + } + reflect () { + const reflected = new WispPacket({ + data: this.data_, + direction: + this.direction === this.constructor.SEND ? + this.constructor.RECV : + this.direction === this.constructor.RECV ? + this.constructor.SEND : + undefined, + extra: { + reflectedFrom: this, + } + }); + return reflected; + } +} + +for ( const item of wisp_types ) { + WispPacket[item.label] = item; +} + +const NewWispPacketStream = frameStream => { + return new ATStream({ + delegate: frameStream, + transform ({ value }) { + return new WispPacket({ + data: value, + direction: WispPacket.RECV, + }); + }, + observe ({ value }) { + value.log(); + } + }); +} + +class WispClient { + constructor ({ + packetStream, + sendFn, + }) { + this.packetStream = packetStream; + this.sendFn = sendFn; + } + send (packet) { + packet.log(); + this.sendFn(packet); + } +} + +module.exports = { + NewVirtioFrameStream, + NewWispPacketStream, + WispPacket, +}; diff --git a/src/puter-wisp/devlog/unit_test_usefulness/test_a_b.diff b/src/puter-wisp/devlog/unit_test_usefulness/test_a_b.diff new file mode 100644 index 00000000..311e9820 --- /dev/null +++ b/src/puter-wisp/devlog/unit_test_usefulness/test_a_b.diff @@ -0,0 +1,18 @@ +31d30 +< console.log('got from carry!', this.carry); +51c50 +< if ( this.carry.length >= 0 && v === undefined ) { +--- +> if ( this.carry.length > 0 && v === undefined ) { +120a120,130 +> if ( this.state.hold ) { +> const old_val = value; +> let size = this.state.hold.length + value.length; +> value = new Uint8Array(size); +> value.set(this.state.hold, 0); +> value.set(old_val, this.state.hold.length); +> } +> if ( value.length < 4 ) { +> this.state.hold = value; +> return undefined; +> } diff --git a/src/puter-wisp/package.json b/src/puter-wisp/package.json new file mode 100644 index 00000000..bcb02763 --- /dev/null +++ b/src/puter-wisp/package.json @@ -0,0 +1,15 @@ +{ + "name": "puter-wisp", + "version": "1.0.0", + "main": "exports.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [], + "author": "", + "license": "UNLICENSED", + "directories": { + "test": "test" + }, + "description": "" +} diff --git a/src/puter-wisp/src/exports.js b/src/puter-wisp/src/exports.js new file mode 100644 index 00000000..709bb3eb --- /dev/null +++ b/src/puter-wisp/src/exports.js @@ -0,0 +1,298 @@ +const lib = {}; + +// SO: 40031688 +lib.buf2hex = (buffer) => { // buffer is an ArrayBuffer + return [...new Uint8Array(buffer)] + .map(x => x.toString(16).padStart(2, '0')) + .join(''); +} + +// Tiny inline little-endian integer library +lib.get_int = (n_bytes, array8, signed=false) => { + return (v => signed ? v : v >>> 0)( + array8.slice(0,n_bytes).reduce((v,e,i)=>v|=e<<8*i,0)); +} +lib.to_int = (n_bytes, num) => { + return (new Uint8Array()).map((_,i)=>(num>>8*i)&0xFF); +} + +// Accumulator and/or Transformer (and/or Observer) Stream +// The Swiss Army Knife* of Streams! +// (* this code is not affiliated with the Swiss Army Knife corporation) +class ATStream { + constructor ({ delegate, acc, transform, observe }) { + this.delegate = delegate; + if ( acc ) this.acc = acc; + if ( transform ) this.transform = transform; + if ( observe ) this.observe = observe; + this.state = {}; + this.carry = []; + } + [Symbol.asyncIterator]() { return this; } + async next_value_ () { + if ( this.carry.length > 0 ) { + return { + value: this.carry.shift(), + done: false, + }; + } + return await this.delegate.next(); + } + async acc ({ value }) { + return value; + } + async next_ () { + for (;;) { + const ret = await this.next_value_(); + if ( ret.done ) return ret; + const v = await this.acc({ + state: this.state, + value: ret.value, + carry: v => this.carry.push(v), + }); + if ( this.carry.length > 0 && v === undefined ) { + throw new Error(`no value, but carry value exists`); + } + if ( v === undefined ) continue; + // We have a value, clear the state! + this.state = {}; + if ( this.transform ) { + const new_value = await this.transform( + { value: ret.value }); + return { ...ret, value: new_value }; + } + return { ...ret, value: v }; + } + } + async next () { + const ret = await this.next_(); + if ( this.observe && !ret.done ) { + this.observe(ret); + } + return ret; + } + async enqueue_ (v) { + this.queue.push(v); + } +} + +const NewCallbackByteStream = () => { + let listener; + let queue = []; + const NOOP = () => {}; + let signal = NOOP; + (async () => { + for (;;) { + const v = await new Promise((rslv, rjct) => { + listener = rslv; + }); + queue.push(v); + signal(); + } + })(); + const stream = { + [Symbol.asyncIterator](){ + return this; + }, + async next () { + if ( queue.length > 0 ) { + return { + value: queue.shift(), + done: false, + }; + } + await new Promise(rslv => { + signal = rslv; + }); + signal = NOOP; + const v = queue.shift(); + return { value: v, done: false }; + } + }; + stream.listener = data => { + listener(data); + }; + return stream; +} + +const NewVirtioFrameStream = byteStream => { + return new ATStream({ + delegate: byteStream, + async acc ({ value, carry }) { + if ( ! this.state.buffer ) { + if ( this.state.hold ) { + const old_val = value; + let size = this.state.hold.length + value.length; + value = new Uint8Array(size); + value.set(this.state.hold, 0); + value.set(old_val, this.state.hold.length); + } + if ( value.length < 4 ) { + this.state.hold = value; + return undefined; + } + const size = lib.get_int(4, value); + // 512MiB limit in case of attempted abuse or a bug + // (assuming this won't happen under normal conditions) + if ( size > 512*(1024**2) ) { + throw new Error(`Way too much data! (${size} bytes)`); + } + value = value.slice(4); + this.state.buffer = new Uint8Array(size); + this.state.index = 0; + } + + const needed = this.state.buffer.length - this.state.index; + if ( value.length > needed ) { + const remaining = value.slice(needed); + console.log('we got more bytes than we needed', + needed, + remaining, + value.length, + this.state.buffer.length, + this.state.index, + ); + carry(remaining); + } + + const amount = Math.min(value.length, needed); + const added = value.slice(0, amount); + this.state.buffer.set(added, this.state.index); + this.state.index += amount; + + if ( this.state.index > this.state.buffer.length ) { + throw new Error('WUT'); + } + if ( this.state.index == this.state.buffer.length ) { + return this.state.buffer; + } + } + }); +}; + +const wisp_types = [ + { + id: 3, + label: 'CONTINUE', + describe: ({ payload }) => { + return `buffer: ${lib.get_int(4, payload)}B`; + }, + getAttributes ({ payload }) { + return { + buffer_size: lib.get_int(4, payload), + }; + } + }, + { + id: 5, + label: 'INFO', + describe: ({ payload }) => { + return `v${payload[0]}.${payload[1]} ` + + lib.buf2hex(payload.slice(2)); + }, + getAttributes ({ payload }) { + return { + version_major: payload[0], + version_minor: payload[1], + extensions: payload.slice(2), + } + } + }, +]; + +class WispPacket { + static SEND = Symbol('SEND'); + static RECV = Symbol('RECV'); + constructor ({ data, direction, extra }) { + this.direction = direction; + this.data_ = data; + this.extra = extra ?? {}; + this.types_ = { + 1: { label: 'CONNECT' }, + 2: { label: 'DATA' }, + 4: { label: 'CLOSE' }, + }; + for ( const item of wisp_types ) { + this.types_[item.id] = item; + } + } + get type () { + const i_ = this.data_[0]; + return this.types_[i_]; + } + get attributes () { + if ( ! this.type.getAttributes ) return {}; + const attrs = {}; + Object.assign(attrs, this.type.getAttributes({ + payload: this.data_.slice(5), + })); + Object.assign(attrs, this.extra); + return attrs; + } + toVirtioFrame () { + const arry = new Uint8Array(this.data_.length + 4); + arry.set(lib.to_int(4, this.data_.length), 0); + arry.set(this.data_, 4); + return arry; + } + describe () { + return this.type.label + '(' + + (this.type.describe?.({ + payload: this.data_.slice(5), + }) ?? '?') + ')'; + } + log () { + const arrow = + this.direction === this.constructor.SEND ? '->' : + this.direction === this.constructor.RECV ? '<-' : + '<>' ; + console.groupCollapsed(`WISP ${arrow} ${this.describe()}`); + const attrs = this.attributes; + for ( const k in attrs ) { + console.log(k, attrs[k]); + } + console.groupEnd(); + } + reflect () { + const reflected = new WispPacket({ + data: this.data_, + direction: + this.direction === this.constructor.SEND ? + this.constructor.RECV : + this.direction === this.constructor.RECV ? + this.constructor.SEND : + undefined, + extra: { + reflectedFrom: this, + } + }); + return reflected; + } +} + +for ( const item of wisp_types ) { + WispPacket[item.label] = item; +} + +const NewWispPacketStream = frameStream => { + return new ATStream({ + delegate: frameStream, + transform ({ value }) { + return new WispPacket({ + data: value, + direction: WispPacket.RECV, + }); + }, + observe ({ value }) { + // TODO: configurable behavior, or a separate stream decorator + value.log(); + } + }); +} + +module.exports = { + NewCallbackByteStream, + NewVirtioFrameStream, + NewWispPacketStream, + WispPacket, +}; diff --git a/src/puter-wisp/test/test.js b/src/puter-wisp/test/test.js new file mode 100644 index 00000000..be063cee --- /dev/null +++ b/src/puter-wisp/test/test.js @@ -0,0 +1,50 @@ +const assert = require('assert'); +const { + NewVirtioFrameStream, + NewWispPacketStream, + WispPacket, +} = require('../src/exports'); + +const NewTestByteStream = uint8array => { + return (async function * () { + for ( const item of uint8array ) { + yield Uint8Array.from([item]); + } + })(); +}; + +const NewTestFullByteStream = uint8array => { + return (async function * () { + yield uint8array; + })(); +}; + +(async () => { + const stream_behaviors = [ + NewTestByteStream, + NewTestFullByteStream, + ]; + for ( const stream_behavior of stream_behaviors ) { + const byteStream = stream_behavior( + Uint8Array.from([ + 9, 0, 0, 0, // size of frame: 9 bytes (u32-L) + 3, // CONTINUE (u8) + 0, 0, 0, 0, // stream id: 0 (u32-L) + 0x0F, 0x0F, 0, 0, // buffer size (u32-L) + ]) + ); + const virtioStream = NewVirtioFrameStream(byteStream); + const wispStream = NewWispPacketStream(virtioStream); + + const packets = []; + for await ( const packet of wispStream ) { + packets.push(packet); + } + + assert.strictEqual(packets.length, 1); + const packet = packets[0]; + assert.strictEqual(packet.type.id, 3); + assert.strictEqual(packet.type.label, 'CONTINUE'); + assert.strictEqual(packet.type, WispPacket.CONTINUE); + } +})(); \ No newline at end of file