mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-10 18:49:02 -06:00
JS: back BatchStore's put-cache with tingodb (#1479)
* JS: back BatchStore's put-cache with tingodb tingodb is a pure-JS, file-backed mongodb-api-compatible database. Before this patch, BatchStore was caching all pending chunks in memory, which seems unlikely to scale. Putting them in a tingodb spills them to disk as needed, though we can still iterate them in put-order when it's time to send them. Fixes #1349
This commit is contained in:
@@ -8,7 +8,7 @@ This repository contains two reference implementations of the noms protocol - on
|
||||
|
||||
# Prerequisites
|
||||
|
||||
* [Go 1.4+](https://golang.org/dl/)
|
||||
* [Go 1.6+](https://golang.org/dl/)
|
||||
* [Python 2.7+](https://www.python.org/downloads/) (Note: Python 2.x only, not Python 3.x)
|
||||
* [Node.js 5.11+](https://nodejs.org/download/)
|
||||
|
||||
|
||||
@@ -9,7 +9,8 @@
|
||||
"babel-regenerator-runtime": "6.5.0",
|
||||
"babel-runtime": "5.8.38",
|
||||
"rusha": "0.8.3",
|
||||
"text-encoding-utf-8": "1.0.1"
|
||||
"text-encoding-utf-8": "1.0.1",
|
||||
"tingodb": "^0.4.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@attic/eslintrc": "^1.0.0",
|
||||
@@ -35,7 +36,7 @@
|
||||
},
|
||||
"scripts": {
|
||||
"pretest": "eslint src/ && flow src/",
|
||||
"ts": "mocha --ui tdd --timeout=3000 --reporter spec --slow=50 --invert --grep='LONG:' --compilers js:babel-core/register src/*-test.js",
|
||||
"ts": "mocha --ui tdd --timeout=4000 --reporter spec --slow=50 --invert --grep='LONG:' --compilers js:babel-core/register src/*-test.js",
|
||||
"test": "mocha --ui tdd --timeout=3000 --reporter dot --compilers js:babel-core/register src/*-test.js",
|
||||
"prepublish": "npm run compile && npm run copy-flow-files",
|
||||
"compile": "npm run compile-to-commonjs && npm run compile-to-es6",
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
// @flow
|
||||
|
||||
import Chunk from './chunk.js';
|
||||
import Ref from './ref.js';
|
||||
import MemoryStore from './memory-store.js';
|
||||
import BatchStore from './batch-store.js';
|
||||
import type {ChunkStore} from './chunk-store.js';
|
||||
import type {UnsentReadMap, WriteRequest} from './batch-store.js';
|
||||
import type {UnsentReadMap} from './batch-store.js';
|
||||
import type {ChunkStream} from './chunk-serializer.js';
|
||||
|
||||
export function makeTestingBatchStore(): BatchStore {
|
||||
return new BatchStore(3, new BatchStoreAdaptorDelegate(new MemoryStore()));
|
||||
@@ -29,10 +31,8 @@ export class BatchStoreAdaptorDelegate {
|
||||
});
|
||||
}
|
||||
|
||||
async writeBatch(reqs: Array<WriteRequest>): Promise<void> {
|
||||
reqs.forEach(req => {
|
||||
this._cs.put(req.c);
|
||||
});
|
||||
async writeBatch(hints: Set<Ref>, chunkStream: ChunkStream): Promise<void> {
|
||||
return chunkStream((chunk: Chunk) => this._cs.put(chunk));
|
||||
}
|
||||
|
||||
async getRoot(): Promise<Ref> {
|
||||
|
||||
@@ -2,20 +2,21 @@
|
||||
|
||||
import Chunk from './chunk.js';
|
||||
import Ref from './ref.js';
|
||||
import OrderedPutCache from './put-cache.js';
|
||||
import type {ChunkStream} from './chunk-serializer.js';
|
||||
import {notNull} from './assert.js';
|
||||
|
||||
type PendingReadMap = { [key: string]: Promise<Chunk> };
|
||||
export type UnsentReadMap = { [key: string]: (c: Chunk) => void };
|
||||
|
||||
type WriteMap = { [key: string]: Chunk };
|
||||
export type WriteRequest = {
|
||||
c: Chunk;
|
||||
hash: Ref;
|
||||
hints: Set<Ref>;
|
||||
}
|
||||
|
||||
interface Delegate {
|
||||
readBatch(reqs: UnsentReadMap): Promise<void>;
|
||||
writeBatch(reqs: Array<WriteRequest>): Promise<void>;
|
||||
writeBatch(hints: Set<Ref>, chunkStream: ChunkStream): Promise<void>;
|
||||
getRoot(): Promise<Ref>;
|
||||
updateRoot(current: Ref, last: Ref): Promise<boolean>;
|
||||
}
|
||||
@@ -27,7 +28,7 @@ export default class BatchStore {
|
||||
_activeReads: number;
|
||||
_maxReads: number;
|
||||
|
||||
_pendingWrites: WriteMap;
|
||||
_pendingWrites: OrderedPutCache;
|
||||
_unsentWrites: ?Array<WriteRequest>;
|
||||
_delegate: Delegate;
|
||||
|
||||
@@ -38,7 +39,7 @@ export default class BatchStore {
|
||||
this._activeReads = 0;
|
||||
this._maxReads = maxReads;
|
||||
|
||||
this._pendingWrites = Object.create(null);
|
||||
this._pendingWrites = new OrderedPutCache();
|
||||
this._unsentWrites = null;
|
||||
this._delegate = delegate;
|
||||
}
|
||||
@@ -49,9 +50,9 @@ export default class BatchStore {
|
||||
if (p) {
|
||||
return p;
|
||||
}
|
||||
p = this._pendingWrites[refStr];
|
||||
p = this._pendingWrites.get(refStr);
|
||||
if (p) {
|
||||
return Promise.resolve(p);
|
||||
return p;
|
||||
}
|
||||
|
||||
return this._pendingReads[refStr] = new Promise(resolve => {
|
||||
@@ -92,16 +93,14 @@ export default class BatchStore {
|
||||
}
|
||||
|
||||
schedulePut(c: Chunk, hints: Set<Ref>): void {
|
||||
const refStr = c.ref.toString();
|
||||
if (this._pendingWrites[refStr]) {
|
||||
if (!this._pendingWrites.append(c)) {
|
||||
return; // Already in flight.
|
||||
}
|
||||
this._pendingWrites[refStr] = c;
|
||||
|
||||
if (!this._unsentWrites) {
|
||||
this._unsentWrites = [];
|
||||
}
|
||||
this._unsentWrites.push({c: c, hints: hints});
|
||||
this._unsentWrites.push({hash: c.ref, hints: hints});
|
||||
}
|
||||
|
||||
async flush(): Promise<void> {
|
||||
@@ -112,12 +111,18 @@ export default class BatchStore {
|
||||
const reqs = notNull(this._unsentWrites);
|
||||
this._unsentWrites = null;
|
||||
|
||||
await this._delegate.writeBatch(reqs); // TODO: Deal with backpressure
|
||||
const first = reqs[0].hash;
|
||||
let last = first;
|
||||
const hints = new Set();
|
||||
for (const req of reqs) {
|
||||
req.hints.forEach(hint => hints.add(hint));
|
||||
last = req.hash;
|
||||
}
|
||||
// TODO: Deal with backpressure
|
||||
const chunkStream = await this._pendingWrites.extractChunks(first.toString(), last.toString());
|
||||
await this._delegate.writeBatch(hints, chunkStream);
|
||||
|
||||
const self = this; // TODO: Remove this when babel bug is fixed.
|
||||
reqs.forEach(req => {
|
||||
delete self._pendingWrites[req.c.ref.toString()];
|
||||
});
|
||||
return this._pendingWrites.dropUntil(last.toString());
|
||||
}
|
||||
|
||||
async getRoot(): Promise<Ref> {
|
||||
|
||||
@@ -5,6 +5,7 @@ import {assert} from 'chai';
|
||||
import Chunk from './chunk.js';
|
||||
import Ref from './ref.js';
|
||||
import {deserialize, serialize} from './chunk-serializer.js';
|
||||
import type {ChunkStream} from './chunk-serializer.js';
|
||||
|
||||
suite('ChunkSerializer', () => {
|
||||
|
||||
@@ -22,38 +23,42 @@ suite('ChunkSerializer', () => {
|
||||
}
|
||||
}
|
||||
|
||||
test('simple', () => {
|
||||
test('simple', async () => {
|
||||
const expHints = [];
|
||||
const expChunks = [Chunk.fromString('abc'), Chunk.fromString('def'), Chunk.fromString('ghi'),
|
||||
Chunk.fromString('wacka wack wack')];
|
||||
|
||||
const {hints, chunks} = deserialize(serialize(new Set(expHints), expChunks));
|
||||
const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks));
|
||||
const {hints, chunks} = deserialize(await pSerialized);
|
||||
|
||||
assertHints(expHints, hints);
|
||||
assertChunks(expChunks, chunks);
|
||||
});
|
||||
|
||||
test('leading & trailing empty', () => {
|
||||
test('leading & trailing empty', async () => {
|
||||
const expHints = [];
|
||||
const expChunks = [Chunk.fromString(''), Chunk.fromString('A'), Chunk.fromString('')];
|
||||
|
||||
const {hints, chunks} = deserialize(serialize(new Set(expHints), expChunks));
|
||||
const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks));
|
||||
const {hints, chunks} = deserialize(await pSerialized);
|
||||
|
||||
assertHints(expHints, hints);
|
||||
assertChunks(expChunks, chunks);
|
||||
});
|
||||
|
||||
test('all empty', () => {
|
||||
test('all empty', async () => {
|
||||
const expHints = [];
|
||||
const expChunks = [];
|
||||
|
||||
const {hints, chunks} = deserialize(serialize(new Set(expHints), expChunks));
|
||||
|
||||
const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks));
|
||||
const {hints, chunks} = deserialize(await pSerialized);
|
||||
|
||||
assertHints(expHints, hints);
|
||||
assertChunks(expChunks, chunks);
|
||||
});
|
||||
|
||||
test('with hints', () => {
|
||||
test('with hints', async () => {
|
||||
const expHints = [
|
||||
Chunk.fromString('123').ref,
|
||||
Chunk.fromString('456').ref,
|
||||
@@ -62,9 +67,36 @@ suite('ChunkSerializer', () => {
|
||||
];
|
||||
const expChunks = [Chunk.fromString('abc'), Chunk.fromString('def'), Chunk.fromString('ghi')];
|
||||
|
||||
const {hints, chunks} = deserialize(serialize(new Set(expHints), expChunks));
|
||||
const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks));
|
||||
const {hints, chunks} = deserialize(await pSerialized);
|
||||
|
||||
assertHints(expHints, hints);
|
||||
assertChunks(expChunks, chunks);
|
||||
});
|
||||
|
||||
test('large chunk', async () => {
|
||||
const expHints = [];
|
||||
const expChunks = [
|
||||
new Chunk(new Uint8Array(1024)),
|
||||
Chunk.fromString('abc'),
|
||||
Chunk.fromString('def'),
|
||||
new Chunk(new Uint8Array(2048))];
|
||||
|
||||
const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks));
|
||||
const {hints, chunks} = deserialize(await pSerialized);
|
||||
|
||||
assertHints(expHints, hints);
|
||||
assertChunks(expChunks, chunks);
|
||||
});
|
||||
});
|
||||
|
||||
function createChunkStream(chunks: Array<Chunk>): ChunkStream {
|
||||
return function(cb: (chunk: Chunk) => void): Promise<void> {
|
||||
return new Promise(fulfill => {
|
||||
for (const chunk of chunks) {
|
||||
cb(chunk);
|
||||
}
|
||||
fulfill();
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@@ -10,32 +10,48 @@ const sha1Size = 20;
|
||||
const chunkLengthSize = 4; // uint32
|
||||
const chunkHeaderSize = sha1Size + chunkLengthSize;
|
||||
|
||||
export function serialize(hints: Set<Ref>, chunks: Array<Chunk>): ArrayBuffer {
|
||||
const buffer = new ArrayBuffer(serializedHintLength(hints) + serializedChunkLength(chunks));
|
||||
export type ChunkStream = (cb: (chunk: Chunk) => void) => Promise<void>
|
||||
|
||||
let offset = serializeHints(hints, buffer);
|
||||
export function serialize(hints: Set<Ref>, stream: ChunkStream): Promise<ArrayBuffer> {
|
||||
let buf = new ArrayBuffer(1024);
|
||||
const ensureCapacity = (needed: number) => {
|
||||
let newLen = buf.byteLength;
|
||||
for (; newLen < needed; newLen *= 2)
|
||||
;
|
||||
const newBuf = new ArrayBuffer(newLen);
|
||||
new Uint8Array(newBuf).set(new Uint8Array(buf));
|
||||
buf = newBuf;
|
||||
};
|
||||
|
||||
for (let i = 0; i < chunks.length; i++) {
|
||||
const chunk = chunks[i];
|
||||
invariant(buffer.byteLength - offset >= chunkHeaderSize + chunk.data.length,
|
||||
'Invalid chunk buffer');
|
||||
|
||||
const refArray = new Uint8Array(buffer, offset, sha1Size);
|
||||
refArray.set(chunk.ref.digest);
|
||||
offset += sha1Size;
|
||||
|
||||
|
||||
const chunkLength = chunk.data.length;
|
||||
const view = new DataView(buffer, offset, chunkLengthSize);
|
||||
view.setUint32(0, chunkLength | 0, bigEndian); // Coerce number to uint32
|
||||
offset += chunkLengthSize;
|
||||
|
||||
const dataArray = new Uint8Array(buffer, offset, chunkLength);
|
||||
dataArray.set(chunk.data);
|
||||
offset += chunkLength;
|
||||
const hintsLength = serializedHintsLength(hints);
|
||||
if (buf.byteLength < hintsLength) {
|
||||
buf = new ArrayBuffer(hintsLength * 2); // Leave space for some chunks.
|
||||
}
|
||||
let offset = serializeHints(hints, buf);
|
||||
return stream(chunk => {
|
||||
const chunkLength = serializedChunkLength(chunk);
|
||||
ensureCapacity(offset + chunkLength);
|
||||
offset = serializeChunk(chunk, buf, offset);
|
||||
}).then(() => buf.slice(0, offset));
|
||||
}
|
||||
|
||||
return buffer;
|
||||
function serializeChunk(chunk: Chunk, buffer: ArrayBuffer, offset: number): number {
|
||||
invariant(buffer.byteLength - offset >= serializedChunkLength(chunk),
|
||||
'Invalid chunk buffer');
|
||||
|
||||
const refArray = new Uint8Array(buffer, offset, sha1Size);
|
||||
refArray.set(chunk.ref.digest);
|
||||
offset += sha1Size;
|
||||
|
||||
const chunkLength = chunk.data.length;
|
||||
const view = new DataView(buffer, offset, chunkLengthSize);
|
||||
view.setUint32(0, chunkLength, bigEndian); // Coerce number to uint32
|
||||
offset += chunkLengthSize;
|
||||
|
||||
const dataArray = new Uint8Array(buffer, offset, chunkLength);
|
||||
dataArray.set(chunk.data);
|
||||
offset += chunkLength;
|
||||
return offset;
|
||||
}
|
||||
|
||||
function serializeHints(hints: Set<Ref>, buffer: ArrayBuffer): number {
|
||||
@@ -53,16 +69,12 @@ function serializeHints(hints: Set<Ref>, buffer: ArrayBuffer): number {
|
||||
return offset;
|
||||
}
|
||||
|
||||
function serializedHintLength(hints: Set<Ref>): number {
|
||||
function serializedHintsLength(hints: Set<Ref>): number {
|
||||
return headerSize + sha1Size * hints.size;
|
||||
}
|
||||
|
||||
function serializedChunkLength(chunks: Array<Chunk>): number {
|
||||
let totalSize = 0;
|
||||
for (let i = 0; i < chunks.length; i++) {
|
||||
totalSize += chunkHeaderSize + chunks[i].data.length;
|
||||
}
|
||||
return totalSize;
|
||||
function serializedChunkLength(chunk: Chunk): number {
|
||||
return chunkHeaderSize + chunk.data.length;
|
||||
}
|
||||
|
||||
export function deserialize(buffer: ArrayBuffer): {hints: Array<Ref>, chunks: Array<Chunk>} {
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
// @flow
|
||||
|
||||
import {suite, test} from 'mocha';
|
||||
import Chunk from './chunk.js';
|
||||
import {assert} from 'chai';
|
||||
import {Delegate} from './http-batch-store.js';
|
||||
import {deserialize} from './chunk-serializer.js';
|
||||
|
||||
suite('HttpBatchStore', () => {
|
||||
test('build write request', async () => {
|
||||
const canned = new Set([Chunk.fromString('abc').ref, Chunk.fromString('def').ref]);
|
||||
const reqs = [
|
||||
{c: Chunk.fromString('ghi'), hints: new Set()},
|
||||
{c: Chunk.fromString('wacka wack wack'), hints: canned},
|
||||
];
|
||||
const d = new Delegate(
|
||||
{getRefs: '', writeValue: '', root: ''},
|
||||
{method: 'POST', headers: {}});
|
||||
const body = d._buildWriteRequest(reqs);
|
||||
const {hints, chunks} = deserialize(body);
|
||||
assert.equal(2, chunks.length);
|
||||
assert.equal(2, hints.length);
|
||||
});
|
||||
});
|
||||
@@ -2,9 +2,10 @@
|
||||
|
||||
import Ref from './ref.js';
|
||||
import BatchStore from './batch-store.js';
|
||||
import type {UnsentReadMap, WriteRequest} from './batch-store.js';
|
||||
import type {UnsentReadMap} from './batch-store.js';
|
||||
import type {FetchOptions} from './fetch.js';
|
||||
import {deserializeChunks, serialize} from './chunk-serializer.js';
|
||||
import type {ChunkStream} from './chunk-serializer.js';
|
||||
import {serialize, deserializeChunks} from './chunk-serializer.js';
|
||||
import {emptyChunk} from './chunk.js';
|
||||
import {fetchArrayBuffer, fetchText} from './fetch.js';
|
||||
|
||||
@@ -47,11 +48,13 @@ export class Delegate {
|
||||
_rpc: RpcStrings;
|
||||
_readBatchOptions: FetchOptions;
|
||||
_rootOptions: FetchOptions;
|
||||
_body: ArrayBuffer;
|
||||
|
||||
constructor(rpc: RpcStrings, fetchOptions: FetchOptions) {
|
||||
this._rpc = rpc;
|
||||
this._rootOptions = fetchOptions;
|
||||
this._readBatchOptions = mergeOptions(readBatchOptions, fetchOptions);
|
||||
this._body = new ArrayBuffer(0);
|
||||
}
|
||||
|
||||
async readBatch(reqs: UnsentReadMap): Promise<void> {
|
||||
@@ -73,18 +76,10 @@ export class Delegate {
|
||||
Object.keys(reqs).forEach(refStr => reqs[refStr](emptyChunk));
|
||||
}
|
||||
|
||||
async writeBatch(reqs: Array<WriteRequest>): Promise<void> {
|
||||
const body = this._buildWriteRequest(reqs);
|
||||
await fetchText(this._rpc.writeValue, {method: 'POST', body});
|
||||
}
|
||||
|
||||
_buildWriteRequest(reqs: Array<WriteRequest>): ArrayBuffer {
|
||||
let hints = new Set();
|
||||
const chunks = reqs.map(writeReq => {
|
||||
writeReq.hints.forEach(hint => { hints = hints.add(hint); });
|
||||
return writeReq.c;
|
||||
});
|
||||
return serialize(hints, chunks);
|
||||
writeBatch(hints: Set<Ref>, chunkStream: ChunkStream): Promise<void> {
|
||||
return serialize(hints, chunkStream)
|
||||
.then(body => fetchText(this._rpc.writeValue, {method: 'POST', body}))
|
||||
.then(() => undefined);
|
||||
}
|
||||
|
||||
async getRoot(): Promise<Ref> {
|
||||
|
||||
84
js/src/put-cache-test.js
Normal file
84
js/src/put-cache-test.js
Normal file
@@ -0,0 +1,84 @@
|
||||
// @flow
|
||||
|
||||
import {suite, test} from 'mocha';
|
||||
import {assert} from 'chai';
|
||||
import {notNull} from './assert.js';
|
||||
import OrderedPutCache from './put-cache.js';
|
||||
import Chunk from './chunk.js';
|
||||
|
||||
suite('OrderedPutCache', () => {
|
||||
test('append', async () => {
|
||||
const canned = [Chunk.fromString('abc'), Chunk.fromString('def')];
|
||||
const cache = new OrderedPutCache();
|
||||
assert.isTrue(cache.append(canned[0]));
|
||||
assert.isTrue(cache.append(canned[1]));
|
||||
await cache.destroy();
|
||||
});
|
||||
|
||||
test('repeated append returns false', async () => {
|
||||
const canned = [Chunk.fromString('abc'), Chunk.fromString('def')];
|
||||
const cache = new OrderedPutCache();
|
||||
assert.isTrue(cache.append(canned[0]));
|
||||
assert.isTrue(cache.append(canned[1]));
|
||||
assert.isFalse(cache.append(canned[0]));
|
||||
await cache.destroy();
|
||||
});
|
||||
|
||||
test('get', async () => {
|
||||
const canned = [Chunk.fromString('abc'), Chunk.fromString('def')];
|
||||
const cache = new OrderedPutCache();
|
||||
assert.isTrue(cache.append(canned[0]));
|
||||
|
||||
let p = cache.get(canned[1].ref.toString());
|
||||
assert.isNull(p);
|
||||
|
||||
assert.isTrue(cache.append(canned[1]));
|
||||
p = cache.get(canned[1].ref.toString());
|
||||
assert.isNotNull(p);
|
||||
const chunk = await notNull(p);
|
||||
assert.isTrue(canned[1].ref.equals(chunk.ref));
|
||||
|
||||
await cache.destroy();
|
||||
});
|
||||
|
||||
test('dropUntil', async () => {
|
||||
const canned = [Chunk.fromString('abc'), Chunk.fromString('def'), Chunk.fromString('ghi')];
|
||||
const cache = new OrderedPutCache();
|
||||
for (const chunk of canned) {
|
||||
assert.isTrue(cache.append(chunk));
|
||||
}
|
||||
|
||||
await cache.dropUntil(canned[1].ref.toString());
|
||||
|
||||
let p = cache.get(canned[2].ref.toString());
|
||||
assert.isNotNull(p);
|
||||
const chunk = await notNull(p);
|
||||
assert.isTrue(canned[2].ref.equals(chunk.ref));
|
||||
|
||||
p = cache.get(canned[0].ref.toString());
|
||||
assert.isNull(p);
|
||||
p = cache.get(canned[1].ref.toString());
|
||||
assert.isNull(p);
|
||||
|
||||
await cache.destroy();
|
||||
});
|
||||
|
||||
test('extractChunks', async () => {
|
||||
const canned = [Chunk.fromString('abc'), Chunk.fromString('def'), Chunk.fromString('ghi')];
|
||||
const cache = new OrderedPutCache();
|
||||
for (const chunk of canned) {
|
||||
assert.isTrue(cache.append(chunk));
|
||||
}
|
||||
|
||||
const chunkStream = await cache.extractChunks(canned[0].ref.toString(),
|
||||
canned[2].ref.toString());
|
||||
const chunks = [];
|
||||
await chunkStream(chunk => { chunks.push(chunk); });
|
||||
|
||||
for (let i = 0; i < canned.length; i++) {
|
||||
assert.isTrue(canned[i].ref.equals(chunks[i].ref));
|
||||
}
|
||||
|
||||
await cache.destroy();
|
||||
});
|
||||
});
|
||||
274
js/src/put-cache.js
Normal file
274
js/src/put-cache.js
Normal file
@@ -0,0 +1,274 @@
|
||||
// @flow
|
||||
|
||||
import tingodb from 'tingodb';
|
||||
import type {tcoll as Collection} from 'tingodb';
|
||||
import fs from 'fs';
|
||||
import {default as Chunk, emptyChunk} from './chunk.js';
|
||||
import {invariant} from './assert.js';
|
||||
|
||||
const __tingodb = tingodb();
|
||||
|
||||
const Db = __tingodb.Db;
|
||||
const Binary = __tingodb.Binary;
|
||||
|
||||
type ChunkStream = (cb: (chunk: Chunk) => void) => Promise<void>
|
||||
type ChunkItem = {hash: string, data: Uint8Array};
|
||||
type DbRecord = {hash: string, data: Binary};
|
||||
|
||||
declare class CursorStream {
|
||||
pause(): void;
|
||||
resume(): void;
|
||||
on(event: 'data', cb: (record: DbRecord) => void): void;
|
||||
on(event: 'end', cb: () => void): void;
|
||||
}
|
||||
|
||||
type ChunkIndex = Map<string, number>;
|
||||
|
||||
export default class OrderedPutCache {
|
||||
_chunkIndex: ChunkIndex;
|
||||
_folder: string;
|
||||
_coll: Promise<DbCollection>;
|
||||
_appends: Set<Promise<void>>;
|
||||
|
||||
constructor() {
|
||||
this._chunkIndex = new Map();
|
||||
this._folder = '';
|
||||
this._coll = this._init();
|
||||
this._appends = new Set();
|
||||
}
|
||||
|
||||
_init(): Promise<DbCollection> {
|
||||
return makeTempDir().then((dir): Promise<DbCollection> => {
|
||||
this._folder = dir;
|
||||
const coll = new DbCollection(dir);
|
||||
return coll.ensureIndex({hash: 1}, {unique: true}).then(() => coll);
|
||||
});
|
||||
}
|
||||
|
||||
append(c: Chunk): boolean {
|
||||
const hash = c.ref.toString();
|
||||
if (this._chunkIndex.has(hash)) {
|
||||
return false;
|
||||
}
|
||||
this._chunkIndex.set(hash, -1);
|
||||
const p = this._coll
|
||||
.then(coll => coll.insert({hash: hash, data: c.data}))
|
||||
.then(itemId => this._chunkIndex.set(hash, itemId))
|
||||
.then(() => { this._appends.delete(p); });
|
||||
this._appends.add(p);
|
||||
return true;
|
||||
}
|
||||
|
||||
get(hash: string): ?Promise<Chunk> {
|
||||
if (!this._chunkIndex.has(hash)) {
|
||||
return null;
|
||||
}
|
||||
//$FlowIssue
|
||||
return Promise.all(this._appends)
|
||||
.then(() => this._coll)
|
||||
.then(coll => coll.findOne(hash))
|
||||
.then(item => {
|
||||
if (item) {
|
||||
return new Chunk(item.data);
|
||||
}
|
||||
return emptyChunk;
|
||||
});
|
||||
}
|
||||
|
||||
dropUntil(limit: string): Promise<void> {
|
||||
if (!this._chunkIndex.has(limit)) {
|
||||
return Promise.reject(new Error('Tried to drop unknown chunk: ' + limit));
|
||||
}
|
||||
//$FlowIssue
|
||||
return Promise.all(this._appends).then(() => this._coll).then(coll => {
|
||||
let count = 0;
|
||||
for (const [hash, dbKey] of this._chunkIndex) {
|
||||
count++;
|
||||
this._chunkIndex.delete(hash);
|
||||
if (hash === limit) {
|
||||
return coll.dropUntil(dbKey).then(dropped => invariant(dropped === count));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
extractChunks(first: string, last: string): Promise<ChunkStream> {
|
||||
//$FlowIssue
|
||||
return Promise.all(this._appends)
|
||||
.then(() => this._coll)
|
||||
.then(coll => {
|
||||
const firstDbKey = this._chunkIndex.get(first);
|
||||
const lastDbKey = this._chunkIndex.get(last);
|
||||
if (firstDbKey === undefined) {
|
||||
throw new Error('Tried to range from unknown chunk: ' + first);
|
||||
}
|
||||
if (lastDbKey === undefined) {
|
||||
throw new Error('Tried to range to unknown chunk: ' + last);
|
||||
}
|
||||
return coll.findRange(firstDbKey, lastDbKey);
|
||||
});
|
||||
}
|
||||
|
||||
destroy(): Promise<void> {
|
||||
return this._coll.then(() => removeDir(this._folder));
|
||||
}
|
||||
}
|
||||
|
||||
function createChunkStream(stream: CursorStream): ChunkStream {
|
||||
return function(cb: (chunk: Chunk) => void): Promise<void> {
|
||||
return new Promise(fulfill => {
|
||||
stream.on('data', (record: DbRecord) => {
|
||||
const item = recordToItem(record);
|
||||
cb(new Chunk(item.data));
|
||||
});
|
||||
|
||||
stream.resume();
|
||||
stream.on('end', fulfill);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
class DbCollection {
|
||||
_coll: Collection;
|
||||
|
||||
constructor(folder: string) {
|
||||
const db = new Db(folder, {});
|
||||
this._coll = db.collection('puts');
|
||||
}
|
||||
|
||||
ensureIndex(obj: Object, options: Object = {}): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
options.w = 1;
|
||||
this._coll.ensureIndex(obj, options, (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
insert(item: ChunkItem, options: Object = {}): Promise<number> {
|
||||
return new Promise((resolve, reject) => {
|
||||
options.w = 1;
|
||||
//$FlowIssue
|
||||
const data = new Binary(new Buffer(item.data.buffer));
|
||||
this._coll.insert({hash: item.hash, data: data}, options, (err, result) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(result[0]._id);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
findOne(hash: string, options: Object = {}): Promise<ChunkItem> {
|
||||
return new Promise((resolve, reject) => {
|
||||
options.w = 1;
|
||||
this._coll.findOne({hash: hash}, options, (err, record) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(recordToItem(record));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
findRange(first: number, last: number, options: Object = {}): ChunkStream {
|
||||
options.w = 1;
|
||||
options.hint = {_id: 1};
|
||||
const stream = this._coll.find({_id: {$gte: first, $lte: last}}, options).stream();
|
||||
stream.pause();
|
||||
return createChunkStream(stream);
|
||||
}
|
||||
|
||||
dropUntil(limit: number, options: Object = {}): Promise<number> {
|
||||
return new Promise((resolve, reject) => {
|
||||
options.w = 1;
|
||||
this._coll.remove({_id: {$lte: limit}}, options, (err, numRemovedDocs) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(numRemovedDocs);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function recordToItem(record: DbRecord): ChunkItem {
|
||||
return {hash: record.hash, data: new Uint8Array(record.data.buffer)};
|
||||
}
|
||||
|
||||
function makeTempDir(): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
//$FlowIssue
|
||||
fs.mkdtemp('/tmp/put-cache-', (err, folder) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(folder);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function removeDir(dir: string): Promise<void> {
|
||||
await access(dir);
|
||||
const files = await readdir(dir);
|
||||
for (const file of files) {
|
||||
await unlink(dir + '/' + file);
|
||||
}
|
||||
return rmdir(dir);
|
||||
}
|
||||
|
||||
function access(path: string, mode = fs.F_OK): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
fs.access(path, mode, (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function readdir(path: string): Promise<Array<string>> {
|
||||
return new Promise((resolve, reject) => {
|
||||
fs.readdir(path, (err, files) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(files);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function rmdir(path: string): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
fs.rmdir(path, (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function unlink(path: string): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
fs.unlink(path, (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -116,7 +116,6 @@ suite('ValueStore', () => {
|
||||
|
||||
const l = await newList([vs.writeValue(1), vs.writeValue(2)]);
|
||||
const r = vs.writeValue(l);
|
||||
// await vs.flush();
|
||||
|
||||
const v = await vs.readValue(r.targetRef);
|
||||
assert.isTrue(equals(l, v));
|
||||
|
||||
Reference in New Issue
Block a user