Files
dolt/js/src/blob.js
T
Rafael Weinstein 74ba1012c2 sequence.getOffset => sequence.cumulativeNumLeaves (#2193)
sequence.getOffset was problematic and didn't have a clear meaning. In addition it was causing a bunch of +1 code at call sites. This patch replaces it with cumulativeNumLeaves which has a clearer meaning.
2016-07-28 16:55:41 -07:00

203 lines
5.5 KiB
JavaScript

// @flow
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
import * as Bytes from './bytes.js';
import Collection from './collection.js';
import RollingValueHasher from './rolling-value-hasher.js';
import SequenceChunker from './sequence-chunker.js';
import type {EqualsFn} from './edit-distance.js';
import type {ValueReader, ValueReadWriter} from './value-store.js';
import type {makeChunkFn} from './sequence-chunker.js';
import {IndexedSequence} from './indexed-sequence.js';
import {Kind} from './noms-kind.js';
import {OrderedKey, newIndexedMetaSequenceChunkFn} from './meta-sequence.js';
import {SequenceCursor} from './sequence.js';
import {blobType} from './type.js';
import {invariant} from './assert.js';
export default class Blob extends Collection<IndexedSequence> {
constructor(bytes: Uint8Array) {
const chunker = new SequenceChunker(null, null, newBlobLeafChunkFn(null),
newIndexedMetaSequenceChunkFn(Kind.Blob, null), blobHashValueBytes);
for (let i = 0; i < bytes.length; i++) {
chunker.append(bytes[i]);
}
const seq = chunker.doneSync();
invariant(seq instanceof IndexedSequence);
super(seq);
}
getReader(): BlobReader {
return new BlobReader(this.sequence);
}
get length(): number {
return this.sequence.numLeaves;
}
}
export class BlobReader {
_sequence: IndexedSequence;
_cursor: Promise<SequenceCursor<number, IndexedSequence<number>>>;
_pos: number;
_lock: string;
constructor(sequence: IndexedSequence) {
this._sequence = sequence;
this._cursor = sequence.newCursorAt(0);
this._pos = 0;
this._lock = '';
}
/**
* Reads the next chunk of bytes from this blob.
*
* Returns {done: false, value: chunk} if there is more data, or {done: true} if there is none.
*/
read(): Promise<{done: boolean, value?: Uint8Array}> {
invariant(this._lock === '', `cannot read without completing current ${this._lock}`);
this._lock = 'read';
return this._cursor.then(cur => {
if (!cur.valid) {
return {done: true};
}
return this._readCur(cur).then(arr => ({done: false, value: arr}));
}).then(res => {
this._lock = '';
return res;
});
}
_readCur(cur: SequenceCursor): Promise<Uint8Array> {
let arr = cur.sequence.items;
invariant(arr instanceof Uint8Array);
const idx = cur.indexInChunk;
if (idx > 0) {
invariant(idx < arr.byteLength);
arr = Bytes.subarray(arr, idx, arr.byteLength);
}
return cur.advanceChunk().then(() => {
this._pos += arr.byteLength;
return arr;
});
}
/**
* Seeks the reader to a position either relative to the start, the current position, or end of
* the blob.
*
* If |whence| is 0, |offset| will be relative to the start.
* If |whence| is 1, |offset| will be relative to the current position.
* If |whence| is 2, |offset| will be relative to the end.
*/
seek(offset: number, whence: number = 0): Promise<number> {
invariant(this._lock === '', `cannot seek without completing current ${this._lock}`);
this._lock = 'seek';
let abs = this._pos;
switch (whence) {
case 0:
abs = offset;
break;
case 1:
abs += offset;
break;
case 2:
abs = this._sequence.numLeaves + offset;
break;
default:
throw new Error(`invalid whence ${whence}`);
}
invariant(abs >= 0, `cannot seek to negative position ${abs}`);
this._cursor = this._sequence.newCursorAt(abs);
// Wait for the seek to complete so that reads will be relative to the new position.
return this._cursor.then(() => {
this._pos = abs;
this._lock = '';
return abs;
});
}
}
export class BlobLeafSequence extends IndexedSequence<number> {
constructor(vr: ?ValueReader, items: Uint8Array) {
// $FlowIssue: The super class expects Array<T> but we sidestep that.
super(vr, blobType, items);
}
cumulativeNumberOfLeaves(idx: number): number {
return idx + 1;
}
getCompareFn(other: IndexedSequence): EqualsFn {
return (idx: number, otherIdx: number) =>
this.items[idx] === other.items[otherIdx];
}
}
function newBlobLeafChunkFn(vr: ?ValueReader): makeChunkFn {
return (items: Array<number>) => {
const blobLeaf = new BlobLeafSequence(vr, Bytes.fromValues(items));
const blob = Blob.fromSequence(blobLeaf);
const key = new OrderedKey(items.length);
return [blob, key, items.length];
};
}
function blobHashValueBytes(b: number, rv: RollingValueHasher) {
rv.hashByte(b);
}
type BlobWriterState = 'writable' | 'closed';
export class BlobWriter {
_state: BlobWriterState;
_blob: ?Promise<Blob>;
_chunker: SequenceChunker;
_vrw: ?ValueReadWriter;
constructor(vrw: ?ValueReadWriter) {
this._state = 'writable';
this._chunker = new SequenceChunker(null, vrw, newBlobLeafChunkFn(vrw),
newIndexedMetaSequenceChunkFn(Kind.Blob, vrw), blobHashValueBytes);
this._vrw = vrw;
}
write(chunk: Uint8Array) {
assert(this._state === 'writable');
for (let i = 0; i < chunk.length; i++) {
this._chunker.append(chunk[i]);
}
}
close() {
assert(this._state === 'writable');
this._blob = this._chunker.done(this._vrw).then(seq => Blob.fromSequence(seq));
this._state = 'closed';
}
get blob(): Promise<Blob> {
assert(this._state === 'closed');
invariant(this._blob);
return this._blob;
}
}
function assert(v: any) {
if (!v) {
throw new TypeError('Invalid usage of BlobWriter');
}
}