mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-11 18:49:14 -06:00
Implement JS BlobReader.seek (#1673)
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@attic/noms",
|
||||
"version": "41.5.0",
|
||||
"version": "41.6.0",
|
||||
"description": "Noms JS SDK",
|
||||
"repository": "https://github.com/attic-labs/noms",
|
||||
"main": "dist/commonjs/noms.js",
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import {blobType, refOfBlobType} from './type.js';
|
||||
import {assert} from 'chai';
|
||||
import Blob, {BlobWriter} from './blob.js';
|
||||
import Blob, {BlobReader, BlobWriter} from './blob.js';
|
||||
import {suite, test} from 'mocha';
|
||||
import {
|
||||
assertChunkCountAndType,
|
||||
@@ -19,17 +19,19 @@ import {equals} from './compare.js';
|
||||
|
||||
suite('Blob', () => {
|
||||
|
||||
async function assertReadFull(expect: Uint8Array, blob: Blob): Promise<void> {
|
||||
async function assertReadFull(expect: Uint8Array, reader: BlobReader): Promise<void> {
|
||||
const length = expect.length;
|
||||
const reader = blob.getReader();
|
||||
let i = 0;
|
||||
let pos = reader._pos;
|
||||
|
||||
while (i < length) {
|
||||
const next = await reader.read();
|
||||
assert.isFalse(next.done);
|
||||
const arr = next.value;
|
||||
invariant(arr);
|
||||
for (let j = 0; j < arr.length; j++) {
|
||||
assert.strictEqual(arr.length + pos, reader._pos);
|
||||
pos = reader._pos;
|
||||
for (let j = 0; j < arr.length && i < length; j++) {
|
||||
assert.strictEqual(expect[i], arr[j]);
|
||||
i++;
|
||||
}
|
||||
@@ -58,6 +60,40 @@ suite('Blob', () => {
|
||||
assert.strictEqual(expectCount, chunkDiffCount(blob, v2));
|
||||
}
|
||||
|
||||
async function testRandomRead(buff: Uint8Array, blob: Blob): Promise<void> {
|
||||
const checkByteRange = async (start: number, rel: number, count: number) => {
|
||||
const buffSlice = new Uint8Array(buff.buffer, buff.byteOffset + rel + start, count);
|
||||
const blobReader = blob.getReader();
|
||||
assert.strictEqual(start, await blobReader.seek(start));
|
||||
assert.strictEqual(start, blobReader._pos);
|
||||
assert.strictEqual(start + rel, await blobReader.seek(rel, 1));
|
||||
assert.strictEqual(start + rel, blobReader._pos);
|
||||
await assertReadFull(buffSlice, blobReader);
|
||||
};
|
||||
|
||||
const checkByteRangeFromEnd = async (length: number, offset: number, count: number) => {
|
||||
const buffSlice = new Uint8Array(buff.buffer,
|
||||
buff.byteOffset + buff.byteLength + offset,
|
||||
count);
|
||||
const blobReader = blob.getReader();
|
||||
assert.strictEqual(length + offset, await blobReader.seek(offset, 2));
|
||||
assert.strictEqual(length + offset, blobReader._pos);
|
||||
await assertReadFull(buffSlice, blobReader);
|
||||
};
|
||||
|
||||
const length = buff.byteLength;
|
||||
let start = 0;
|
||||
let count = length / 2;
|
||||
while (count > 2) {
|
||||
await checkByteRange(start, 0, count);
|
||||
await checkByteRange(0, start, count);
|
||||
await checkByteRange(Math.floor(start / 2), Math.ceil(start / 2), count);
|
||||
await checkByteRangeFromEnd(length, start - length, count);
|
||||
start += count;
|
||||
count = (length - start) / 2;
|
||||
}
|
||||
}
|
||||
|
||||
function randomBuff(len: number): Uint8Array {
|
||||
const r = new CountingByteReader();
|
||||
const a = new Uint8Array(len);
|
||||
@@ -80,13 +116,12 @@ suite('Blob', () => {
|
||||
assertChunkCountAndType(expectChunkCount, refOfBlobType, blob);
|
||||
|
||||
await testRoundTripAndValidate(blob, async(b2) => {
|
||||
await assertReadFull(buff, b2);
|
||||
await assertReadFull(buff, b2.getReader());
|
||||
});
|
||||
|
||||
// TODO: Random Read
|
||||
|
||||
await testPrependChunkDiff(buff, blob, expectPrependChunkDiff);
|
||||
await testAppendChunkDiff(buff, blob, expectAppendChunkDiff);
|
||||
await testRandomRead(buff, blob);
|
||||
}
|
||||
|
||||
class CountingByteReader {
|
||||
|
||||
@@ -23,7 +23,7 @@ export default class Blob extends Collection<IndexedSequence> {
|
||||
}
|
||||
|
||||
getReader(): BlobReader {
|
||||
return new BlobReader(this.sequence.newCursorAt(0));
|
||||
return new BlobReader(this.sequence);
|
||||
}
|
||||
|
||||
get length(): number {
|
||||
@@ -32,31 +32,92 @@ export default class Blob extends Collection<IndexedSequence> {
|
||||
}
|
||||
|
||||
export class BlobReader {
|
||||
_sequence: IndexedSequence;
|
||||
_cursor: Promise<SequenceCursor<number, IndexedSequence<number>>>;
|
||||
_lock: boolean;
|
||||
_pos: number;
|
||||
_lock: string;
|
||||
|
||||
constructor(cursor: Promise<SequenceCursor<number, IndexedSequence<number>>>) {
|
||||
this._cursor = cursor;
|
||||
this._lock = false;
|
||||
constructor(sequence: IndexedSequence) {
|
||||
this._sequence = sequence;
|
||||
this._cursor = sequence.newCursorAt(0);
|
||||
this._pos = 0;
|
||||
this._lock = '';
|
||||
}
|
||||
|
||||
async read(): Promise<{done: boolean, value?: Uint8Array}> {
|
||||
invariant(!this._lock, 'cannot read without completing current read');
|
||||
this._lock = true;
|
||||
/**
|
||||
* Reads the next chunk of bytes from this blob.
|
||||
*
|
||||
* Returns {done: false, value: chunk} if there is more data, or {done: true} if there is none.
|
||||
*/
|
||||
read(): Promise<{done: boolean, value?: Uint8Array}> {
|
||||
invariant(this._lock === '', `cannot read without completing current ${this._lock}`);
|
||||
this._lock = 'read';
|
||||
|
||||
const cur = await this._cursor;
|
||||
if (!cur.valid) {
|
||||
return {done: true};
|
||||
return this._cursor.then(cur => {
|
||||
if (!cur.valid) {
|
||||
return {done: true};
|
||||
}
|
||||
return this._readCur(cur).then(arr => ({done: false, value: arr}));
|
||||
}).then(res => {
|
||||
this._lock = '';
|
||||
return res;
|
||||
});
|
||||
}
|
||||
|
||||
_readCur(cur: SequenceCursor): Promise<Uint8Array> {
|
||||
let arr = cur.sequence.items;
|
||||
invariant(arr instanceof Uint8Array);
|
||||
|
||||
const idx = cur.indexInChunk;
|
||||
if (idx > 0) {
|
||||
invariant(idx < arr.byteLength);
|
||||
arr = new Uint8Array(arr.buffer, arr.byteOffset + idx, arr.byteLength - idx);
|
||||
}
|
||||
|
||||
const arr = cur.sequence.items;
|
||||
await cur.advanceChunk();
|
||||
return cur.advanceChunk().then(() => {
|
||||
this._pos += arr.byteLength;
|
||||
return arr;
|
||||
});
|
||||
}
|
||||
|
||||
// No more awaits after this, so we can't be interrupted.
|
||||
this._lock = false;
|
||||
/**
|
||||
* Seeks the reader to a position either relative to the start, the current position, or end of
|
||||
* the blob.
|
||||
*
|
||||
* If |whence| is 0, |offset| will be relative to the start.
|
||||
* If |whence| is 1, |offset| will be relative to the current position.
|
||||
* If |whence| is 2, |offset| will be relative to the end.
|
||||
*/
|
||||
seek(offset: number, whence: number = 0): Promise<number> {
|
||||
invariant(this._lock === '', `cannot seek without completing current ${this._lock}`);
|
||||
this._lock = 'seek';
|
||||
|
||||
invariant(arr instanceof Uint8Array);
|
||||
return {done: false, value: arr};
|
||||
let abs = this._pos;
|
||||
|
||||
switch (whence) {
|
||||
case 0:
|
||||
abs = offset;
|
||||
break;
|
||||
case 1:
|
||||
abs += offset;
|
||||
break;
|
||||
case 2:
|
||||
abs = this._sequence.numLeaves + offset;
|
||||
break;
|
||||
default:
|
||||
throw new Error(`invalid whence ${whence}`);
|
||||
}
|
||||
|
||||
invariant(abs >= 0, `cannot seek to negative position ${abs}`);
|
||||
|
||||
this._cursor = this._sequence.newCursorAt(abs);
|
||||
|
||||
// Wait for the seek to complete so that reads will be relative to the new position.
|
||||
return this._cursor.then(() => {
|
||||
this._pos = abs;
|
||||
this._lock = '';
|
||||
return abs;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -119,20 +119,40 @@ func (suite *blobTestSuite) TestRandomRead() {
|
||||
buffReader := bytes.NewReader(suite.buff)
|
||||
blobReader := suite.col.(Blob).Reader()
|
||||
|
||||
readByteRange := func(r io.ReadSeeker, start int64, count int64) []byte {
|
||||
readByteRange := func(r io.ReadSeeker, start, rel, count int64) []byte {
|
||||
bytes := make([]byte, count)
|
||||
n, err := r.Seek(start, 0)
|
||||
suite.NoError(err)
|
||||
suite.Equal(start, n)
|
||||
n2, err := r.Seek(rel, 1)
|
||||
suite.NoError(err)
|
||||
suite.Equal(start+rel, n2)
|
||||
n3, err := io.ReadFull(r, bytes)
|
||||
suite.NoError(err)
|
||||
suite.Equal(int(count), n3)
|
||||
return bytes
|
||||
}
|
||||
|
||||
readByteRangeFromEnd := func(r io.ReadSeeker, length, offset, count int64) []byte {
|
||||
bytes := make([]byte, count)
|
||||
n, err := r.Seek(offset, 2)
|
||||
suite.NoError(err)
|
||||
suite.Equal(length+offset, n)
|
||||
n2, err := io.ReadFull(r, bytes)
|
||||
suite.NoError(err)
|
||||
suite.Equal(int(count), n2)
|
||||
return bytes
|
||||
}
|
||||
|
||||
checkByteRange := func(start int64, count int64) {
|
||||
expect := readByteRange(buffReader, start, count)
|
||||
actual := readByteRange(blobReader, start, count)
|
||||
checkByteRange := func(start, rel, count int64) {
|
||||
expect := readByteRange(buffReader, start, rel, count)
|
||||
actual := readByteRange(blobReader, start, rel, count)
|
||||
suite.Equal(expect, actual)
|
||||
}
|
||||
|
||||
checkByteRangeFromEnd := func(length, offset, count int64) {
|
||||
expect := readByteRangeFromEnd(buffReader, length, offset, count)
|
||||
actual := readByteRangeFromEnd(blobReader, length, offset, count)
|
||||
suite.Equal(expect, actual)
|
||||
}
|
||||
|
||||
@@ -140,7 +160,10 @@ func (suite *blobTestSuite) TestRandomRead() {
|
||||
start := int64(0)
|
||||
count := int64(length / 2)
|
||||
for count > 2 {
|
||||
checkByteRange(start, count)
|
||||
checkByteRange(start, 0, count)
|
||||
checkByteRange(0, start, count)
|
||||
checkByteRange(start/2, start-(start/2), count)
|
||||
checkByteRangeFromEnd(length, start-length, count)
|
||||
start = start + count
|
||||
count = (length - start) / 2
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user