From e0eb7bc2977d7abb741e7ad3a5f9f0d703e68ac2 Mon Sep 17 00:00:00 2001 From: cmasone-attic Date: Mon, 16 May 2016 14:38:25 -0700 Subject: [PATCH] JS: back BatchStore's put-cache with tingodb (#1479) * JS: back BatchStore's put-cache with tingodb tingodb is a pure-JS, file-backed mongodb-api-compatible database. Before this patch, BatchStore was caching all pending chunks in memory, which seems unlikely to scale. Putting them in a tingodb spills them to disk as needed, though we can still iterate them in put-order when it's time to send them. Fixes #1349 --- README.md | 2 +- js/package.json | 5 +- js/src/batch-store-adaptor.js | 10 +- js/src/batch-store.js | 37 +++-- js/src/chunk-serializer-test.js | 48 +++++- js/src/chunk-serializer.js | 70 ++++---- js/src/http-batch-store-test.js | 24 --- js/src/http-batch-store.js | 23 ++- js/src/put-cache-test.js | 84 ++++++++++ js/src/put-cache.js | 274 ++++++++++++++++++++++++++++++++ js/src/value-store-test.js | 1 - 11 files changed, 478 insertions(+), 100 deletions(-) delete mode 100644 js/src/http-batch-store-test.js create mode 100644 js/src/put-cache-test.js create mode 100644 js/src/put-cache.js diff --git a/README.md b/README.md index 2c62b46406..61627a13fe 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This repository contains two reference implementations of the noms protocol - on # Prerequisites -* [Go 1.4+](https://golang.org/dl/) +* [Go 1.6+](https://golang.org/dl/) * [Python 2.7+](https://www.python.org/downloads/) (Note: Python 2.x only, not Python 3.x) * [Node.js 5.11+](https://nodejs.org/download/) diff --git a/js/package.json b/js/package.json index 8780c16932..c30ac31825 100644 --- a/js/package.json +++ b/js/package.json @@ -9,7 +9,8 @@ "babel-regenerator-runtime": "6.5.0", "babel-runtime": "5.8.38", "rusha": "0.8.3", - "text-encoding-utf-8": "1.0.1" + "text-encoding-utf-8": "1.0.1", + "tingodb": "^0.4.2" }, "devDependencies": { "@attic/eslintrc": "^1.0.0", @@ -35,7 +36,7 @@ }, "scripts": { "pretest": "eslint src/ && flow src/", - "ts": "mocha --ui tdd --timeout=3000 --reporter spec --slow=50 --invert --grep='LONG:' --compilers js:babel-core/register src/*-test.js", + "ts": "mocha --ui tdd --timeout=4000 --reporter spec --slow=50 --invert --grep='LONG:' --compilers js:babel-core/register src/*-test.js", "test": "mocha --ui tdd --timeout=3000 --reporter dot --compilers js:babel-core/register src/*-test.js", "prepublish": "npm run compile && npm run copy-flow-files", "compile": "npm run compile-to-commonjs && npm run compile-to-es6", diff --git a/js/src/batch-store-adaptor.js b/js/src/batch-store-adaptor.js index fa3cf11bda..272e9a6dea 100644 --- a/js/src/batch-store-adaptor.js +++ b/js/src/batch-store-adaptor.js @@ -1,10 +1,12 @@ // @flow +import Chunk from './chunk.js'; import Ref from './ref.js'; import MemoryStore from './memory-store.js'; import BatchStore from './batch-store.js'; import type {ChunkStore} from './chunk-store.js'; -import type {UnsentReadMap, WriteRequest} from './batch-store.js'; +import type {UnsentReadMap} from './batch-store.js'; +import type {ChunkStream} from './chunk-serializer.js'; export function makeTestingBatchStore(): BatchStore { return new BatchStore(3, new BatchStoreAdaptorDelegate(new MemoryStore())); @@ -29,10 +31,8 @@ export class BatchStoreAdaptorDelegate { }); } - async writeBatch(reqs: Array): Promise { - reqs.forEach(req => { - this._cs.put(req.c); - }); + async writeBatch(hints: Set, chunkStream: ChunkStream): Promise { + return chunkStream((chunk: Chunk) => this._cs.put(chunk)); } async getRoot(): Promise { diff --git a/js/src/batch-store.js b/js/src/batch-store.js index ddf5a318fd..e01db762f2 100644 --- a/js/src/batch-store.js +++ b/js/src/batch-store.js @@ -2,20 +2,21 @@ import Chunk from './chunk.js'; import Ref from './ref.js'; +import OrderedPutCache from './put-cache.js'; +import type {ChunkStream} from './chunk-serializer.js'; import {notNull} from './assert.js'; type PendingReadMap = { [key: string]: Promise }; export type UnsentReadMap = { [key: string]: (c: Chunk) => void }; -type WriteMap = { [key: string]: Chunk }; export type WriteRequest = { - c: Chunk; + hash: Ref; hints: Set; } interface Delegate { readBatch(reqs: UnsentReadMap): Promise; - writeBatch(reqs: Array): Promise; + writeBatch(hints: Set, chunkStream: ChunkStream): Promise; getRoot(): Promise; updateRoot(current: Ref, last: Ref): Promise; } @@ -27,7 +28,7 @@ export default class BatchStore { _activeReads: number; _maxReads: number; - _pendingWrites: WriteMap; + _pendingWrites: OrderedPutCache; _unsentWrites: ?Array; _delegate: Delegate; @@ -38,7 +39,7 @@ export default class BatchStore { this._activeReads = 0; this._maxReads = maxReads; - this._pendingWrites = Object.create(null); + this._pendingWrites = new OrderedPutCache(); this._unsentWrites = null; this._delegate = delegate; } @@ -49,9 +50,9 @@ export default class BatchStore { if (p) { return p; } - p = this._pendingWrites[refStr]; + p = this._pendingWrites.get(refStr); if (p) { - return Promise.resolve(p); + return p; } return this._pendingReads[refStr] = new Promise(resolve => { @@ -92,16 +93,14 @@ export default class BatchStore { } schedulePut(c: Chunk, hints: Set): void { - const refStr = c.ref.toString(); - if (this._pendingWrites[refStr]) { + if (!this._pendingWrites.append(c)) { return; // Already in flight. } - this._pendingWrites[refStr] = c; if (!this._unsentWrites) { this._unsentWrites = []; } - this._unsentWrites.push({c: c, hints: hints}); + this._unsentWrites.push({hash: c.ref, hints: hints}); } async flush(): Promise { @@ -112,12 +111,18 @@ export default class BatchStore { const reqs = notNull(this._unsentWrites); this._unsentWrites = null; - await this._delegate.writeBatch(reqs); // TODO: Deal with backpressure + const first = reqs[0].hash; + let last = first; + const hints = new Set(); + for (const req of reqs) { + req.hints.forEach(hint => hints.add(hint)); + last = req.hash; + } + // TODO: Deal with backpressure + const chunkStream = await this._pendingWrites.extractChunks(first.toString(), last.toString()); + await this._delegate.writeBatch(hints, chunkStream); - const self = this; // TODO: Remove this when babel bug is fixed. - reqs.forEach(req => { - delete self._pendingWrites[req.c.ref.toString()]; - }); + return this._pendingWrites.dropUntil(last.toString()); } async getRoot(): Promise { diff --git a/js/src/chunk-serializer-test.js b/js/src/chunk-serializer-test.js index 724306312c..37d2d6fddc 100644 --- a/js/src/chunk-serializer-test.js +++ b/js/src/chunk-serializer-test.js @@ -5,6 +5,7 @@ import {assert} from 'chai'; import Chunk from './chunk.js'; import Ref from './ref.js'; import {deserialize, serialize} from './chunk-serializer.js'; +import type {ChunkStream} from './chunk-serializer.js'; suite('ChunkSerializer', () => { @@ -22,38 +23,42 @@ suite('ChunkSerializer', () => { } } - test('simple', () => { + test('simple', async () => { const expHints = []; const expChunks = [Chunk.fromString('abc'), Chunk.fromString('def'), Chunk.fromString('ghi'), Chunk.fromString('wacka wack wack')]; - const {hints, chunks} = deserialize(serialize(new Set(expHints), expChunks)); + const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks)); + const {hints, chunks} = deserialize(await pSerialized); assertHints(expHints, hints); assertChunks(expChunks, chunks); }); - test('leading & trailing empty', () => { + test('leading & trailing empty', async () => { const expHints = []; const expChunks = [Chunk.fromString(''), Chunk.fromString('A'), Chunk.fromString('')]; - const {hints, chunks} = deserialize(serialize(new Set(expHints), expChunks)); + const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks)); + const {hints, chunks} = deserialize(await pSerialized); assertHints(expHints, hints); assertChunks(expChunks, chunks); }); - test('all empty', () => { + test('all empty', async () => { const expHints = []; const expChunks = []; - const {hints, chunks} = deserialize(serialize(new Set(expHints), expChunks)); + + const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks)); + const {hints, chunks} = deserialize(await pSerialized); assertHints(expHints, hints); assertChunks(expChunks, chunks); }); - test('with hints', () => { + test('with hints', async () => { const expHints = [ Chunk.fromString('123').ref, Chunk.fromString('456').ref, @@ -62,9 +67,36 @@ suite('ChunkSerializer', () => { ]; const expChunks = [Chunk.fromString('abc'), Chunk.fromString('def'), Chunk.fromString('ghi')]; - const {hints, chunks} = deserialize(serialize(new Set(expHints), expChunks)); + const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks)); + const {hints, chunks} = deserialize(await pSerialized); + + assertHints(expHints, hints); + assertChunks(expChunks, chunks); + }); + + test('large chunk', async () => { + const expHints = []; + const expChunks = [ + new Chunk(new Uint8Array(1024)), + Chunk.fromString('abc'), + Chunk.fromString('def'), + new Chunk(new Uint8Array(2048))]; + + const pSerialized = serialize(new Set(expHints), createChunkStream(expChunks)); + const {hints, chunks} = deserialize(await pSerialized); assertHints(expHints, hints); assertChunks(expChunks, chunks); }); }); + +function createChunkStream(chunks: Array): ChunkStream { + return function(cb: (chunk: Chunk) => void): Promise { + return new Promise(fulfill => { + for (const chunk of chunks) { + cb(chunk); + } + fulfill(); + }); + }; +} diff --git a/js/src/chunk-serializer.js b/js/src/chunk-serializer.js index 89521acffc..8d1a7c7b97 100644 --- a/js/src/chunk-serializer.js +++ b/js/src/chunk-serializer.js @@ -10,32 +10,48 @@ const sha1Size = 20; const chunkLengthSize = 4; // uint32 const chunkHeaderSize = sha1Size + chunkLengthSize; -export function serialize(hints: Set, chunks: Array): ArrayBuffer { - const buffer = new ArrayBuffer(serializedHintLength(hints) + serializedChunkLength(chunks)); +export type ChunkStream = (cb: (chunk: Chunk) => void) => Promise - let offset = serializeHints(hints, buffer); +export function serialize(hints: Set, stream: ChunkStream): Promise { + let buf = new ArrayBuffer(1024); + const ensureCapacity = (needed: number) => { + let newLen = buf.byteLength; + for (; newLen < needed; newLen *= 2) + ; + const newBuf = new ArrayBuffer(newLen); + new Uint8Array(newBuf).set(new Uint8Array(buf)); + buf = newBuf; + }; - for (let i = 0; i < chunks.length; i++) { - const chunk = chunks[i]; - invariant(buffer.byteLength - offset >= chunkHeaderSize + chunk.data.length, - 'Invalid chunk buffer'); - - const refArray = new Uint8Array(buffer, offset, sha1Size); - refArray.set(chunk.ref.digest); - offset += sha1Size; - - - const chunkLength = chunk.data.length; - const view = new DataView(buffer, offset, chunkLengthSize); - view.setUint32(0, chunkLength | 0, bigEndian); // Coerce number to uint32 - offset += chunkLengthSize; - - const dataArray = new Uint8Array(buffer, offset, chunkLength); - dataArray.set(chunk.data); - offset += chunkLength; + const hintsLength = serializedHintsLength(hints); + if (buf.byteLength < hintsLength) { + buf = new ArrayBuffer(hintsLength * 2); // Leave space for some chunks. } + let offset = serializeHints(hints, buf); + return stream(chunk => { + const chunkLength = serializedChunkLength(chunk); + ensureCapacity(offset + chunkLength); + offset = serializeChunk(chunk, buf, offset); + }).then(() => buf.slice(0, offset)); +} - return buffer; +function serializeChunk(chunk: Chunk, buffer: ArrayBuffer, offset: number): number { + invariant(buffer.byteLength - offset >= serializedChunkLength(chunk), + 'Invalid chunk buffer'); + + const refArray = new Uint8Array(buffer, offset, sha1Size); + refArray.set(chunk.ref.digest); + offset += sha1Size; + + const chunkLength = chunk.data.length; + const view = new DataView(buffer, offset, chunkLengthSize); + view.setUint32(0, chunkLength, bigEndian); // Coerce number to uint32 + offset += chunkLengthSize; + + const dataArray = new Uint8Array(buffer, offset, chunkLength); + dataArray.set(chunk.data); + offset += chunkLength; + return offset; } function serializeHints(hints: Set, buffer: ArrayBuffer): number { @@ -53,16 +69,12 @@ function serializeHints(hints: Set, buffer: ArrayBuffer): number { return offset; } -function serializedHintLength(hints: Set): number { +function serializedHintsLength(hints: Set): number { return headerSize + sha1Size * hints.size; } -function serializedChunkLength(chunks: Array): number { - let totalSize = 0; - for (let i = 0; i < chunks.length; i++) { - totalSize += chunkHeaderSize + chunks[i].data.length; - } - return totalSize; +function serializedChunkLength(chunk: Chunk): number { + return chunkHeaderSize + chunk.data.length; } export function deserialize(buffer: ArrayBuffer): {hints: Array, chunks: Array} { diff --git a/js/src/http-batch-store-test.js b/js/src/http-batch-store-test.js deleted file mode 100644 index b09a54e59a..0000000000 --- a/js/src/http-batch-store-test.js +++ /dev/null @@ -1,24 +0,0 @@ -// @flow - -import {suite, test} from 'mocha'; -import Chunk from './chunk.js'; -import {assert} from 'chai'; -import {Delegate} from './http-batch-store.js'; -import {deserialize} from './chunk-serializer.js'; - -suite('HttpBatchStore', () => { - test('build write request', async () => { - const canned = new Set([Chunk.fromString('abc').ref, Chunk.fromString('def').ref]); - const reqs = [ - {c: Chunk.fromString('ghi'), hints: new Set()}, - {c: Chunk.fromString('wacka wack wack'), hints: canned}, - ]; - const d = new Delegate( - {getRefs: '', writeValue: '', root: ''}, - {method: 'POST', headers: {}}); - const body = d._buildWriteRequest(reqs); - const {hints, chunks} = deserialize(body); - assert.equal(2, chunks.length); - assert.equal(2, hints.length); - }); -}); diff --git a/js/src/http-batch-store.js b/js/src/http-batch-store.js index d513c52526..51ff752d3f 100644 --- a/js/src/http-batch-store.js +++ b/js/src/http-batch-store.js @@ -2,9 +2,10 @@ import Ref from './ref.js'; import BatchStore from './batch-store.js'; -import type {UnsentReadMap, WriteRequest} from './batch-store.js'; +import type {UnsentReadMap} from './batch-store.js'; import type {FetchOptions} from './fetch.js'; -import {deserializeChunks, serialize} from './chunk-serializer.js'; +import type {ChunkStream} from './chunk-serializer.js'; +import {serialize, deserializeChunks} from './chunk-serializer.js'; import {emptyChunk} from './chunk.js'; import {fetchArrayBuffer, fetchText} from './fetch.js'; @@ -47,11 +48,13 @@ export class Delegate { _rpc: RpcStrings; _readBatchOptions: FetchOptions; _rootOptions: FetchOptions; + _body: ArrayBuffer; constructor(rpc: RpcStrings, fetchOptions: FetchOptions) { this._rpc = rpc; this._rootOptions = fetchOptions; this._readBatchOptions = mergeOptions(readBatchOptions, fetchOptions); + this._body = new ArrayBuffer(0); } async readBatch(reqs: UnsentReadMap): Promise { @@ -73,18 +76,10 @@ export class Delegate { Object.keys(reqs).forEach(refStr => reqs[refStr](emptyChunk)); } - async writeBatch(reqs: Array): Promise { - const body = this._buildWriteRequest(reqs); - await fetchText(this._rpc.writeValue, {method: 'POST', body}); - } - - _buildWriteRequest(reqs: Array): ArrayBuffer { - let hints = new Set(); - const chunks = reqs.map(writeReq => { - writeReq.hints.forEach(hint => { hints = hints.add(hint); }); - return writeReq.c; - }); - return serialize(hints, chunks); + writeBatch(hints: Set, chunkStream: ChunkStream): Promise { + return serialize(hints, chunkStream) + .then(body => fetchText(this._rpc.writeValue, {method: 'POST', body})) + .then(() => undefined); } async getRoot(): Promise { diff --git a/js/src/put-cache-test.js b/js/src/put-cache-test.js new file mode 100644 index 0000000000..e86fec9d7d --- /dev/null +++ b/js/src/put-cache-test.js @@ -0,0 +1,84 @@ +// @flow + +import {suite, test} from 'mocha'; +import {assert} from 'chai'; +import {notNull} from './assert.js'; +import OrderedPutCache from './put-cache.js'; +import Chunk from './chunk.js'; + +suite('OrderedPutCache', () => { + test('append', async () => { + const canned = [Chunk.fromString('abc'), Chunk.fromString('def')]; + const cache = new OrderedPutCache(); + assert.isTrue(cache.append(canned[0])); + assert.isTrue(cache.append(canned[1])); + await cache.destroy(); + }); + + test('repeated append returns false', async () => { + const canned = [Chunk.fromString('abc'), Chunk.fromString('def')]; + const cache = new OrderedPutCache(); + assert.isTrue(cache.append(canned[0])); + assert.isTrue(cache.append(canned[1])); + assert.isFalse(cache.append(canned[0])); + await cache.destroy(); + }); + + test('get', async () => { + const canned = [Chunk.fromString('abc'), Chunk.fromString('def')]; + const cache = new OrderedPutCache(); + assert.isTrue(cache.append(canned[0])); + + let p = cache.get(canned[1].ref.toString()); + assert.isNull(p); + + assert.isTrue(cache.append(canned[1])); + p = cache.get(canned[1].ref.toString()); + assert.isNotNull(p); + const chunk = await notNull(p); + assert.isTrue(canned[1].ref.equals(chunk.ref)); + + await cache.destroy(); + }); + + test('dropUntil', async () => { + const canned = [Chunk.fromString('abc'), Chunk.fromString('def'), Chunk.fromString('ghi')]; + const cache = new OrderedPutCache(); + for (const chunk of canned) { + assert.isTrue(cache.append(chunk)); + } + + await cache.dropUntil(canned[1].ref.toString()); + + let p = cache.get(canned[2].ref.toString()); + assert.isNotNull(p); + const chunk = await notNull(p); + assert.isTrue(canned[2].ref.equals(chunk.ref)); + + p = cache.get(canned[0].ref.toString()); + assert.isNull(p); + p = cache.get(canned[1].ref.toString()); + assert.isNull(p); + + await cache.destroy(); + }); + + test('extractChunks', async () => { + const canned = [Chunk.fromString('abc'), Chunk.fromString('def'), Chunk.fromString('ghi')]; + const cache = new OrderedPutCache(); + for (const chunk of canned) { + assert.isTrue(cache.append(chunk)); + } + + const chunkStream = await cache.extractChunks(canned[0].ref.toString(), + canned[2].ref.toString()); + const chunks = []; + await chunkStream(chunk => { chunks.push(chunk); }); + + for (let i = 0; i < canned.length; i++) { + assert.isTrue(canned[i].ref.equals(chunks[i].ref)); + } + + await cache.destroy(); + }); +}); diff --git a/js/src/put-cache.js b/js/src/put-cache.js new file mode 100644 index 0000000000..584adc6e10 --- /dev/null +++ b/js/src/put-cache.js @@ -0,0 +1,274 @@ +// @flow + +import tingodb from 'tingodb'; +import type {tcoll as Collection} from 'tingodb'; +import fs from 'fs'; +import {default as Chunk, emptyChunk} from './chunk.js'; +import {invariant} from './assert.js'; + +const __tingodb = tingodb(); + +const Db = __tingodb.Db; +const Binary = __tingodb.Binary; + +type ChunkStream = (cb: (chunk: Chunk) => void) => Promise +type ChunkItem = {hash: string, data: Uint8Array}; +type DbRecord = {hash: string, data: Binary}; + +declare class CursorStream { + pause(): void; + resume(): void; + on(event: 'data', cb: (record: DbRecord) => void): void; + on(event: 'end', cb: () => void): void; +} + +type ChunkIndex = Map; + +export default class OrderedPutCache { + _chunkIndex: ChunkIndex; + _folder: string; + _coll: Promise; + _appends: Set>; + + constructor() { + this._chunkIndex = new Map(); + this._folder = ''; + this._coll = this._init(); + this._appends = new Set(); + } + + _init(): Promise { + return makeTempDir().then((dir): Promise => { + this._folder = dir; + const coll = new DbCollection(dir); + return coll.ensureIndex({hash: 1}, {unique: true}).then(() => coll); + }); + } + + append(c: Chunk): boolean { + const hash = c.ref.toString(); + if (this._chunkIndex.has(hash)) { + return false; + } + this._chunkIndex.set(hash, -1); + const p = this._coll + .then(coll => coll.insert({hash: hash, data: c.data})) + .then(itemId => this._chunkIndex.set(hash, itemId)) + .then(() => { this._appends.delete(p); }); + this._appends.add(p); + return true; + } + + get(hash: string): ?Promise { + if (!this._chunkIndex.has(hash)) { + return null; + } + //$FlowIssue + return Promise.all(this._appends) + .then(() => this._coll) + .then(coll => coll.findOne(hash)) + .then(item => { + if (item) { + return new Chunk(item.data); + } + return emptyChunk; + }); + } + + dropUntil(limit: string): Promise { + if (!this._chunkIndex.has(limit)) { + return Promise.reject(new Error('Tried to drop unknown chunk: ' + limit)); + } + //$FlowIssue + return Promise.all(this._appends).then(() => this._coll).then(coll => { + let count = 0; + for (const [hash, dbKey] of this._chunkIndex) { + count++; + this._chunkIndex.delete(hash); + if (hash === limit) { + return coll.dropUntil(dbKey).then(dropped => invariant(dropped === count)); + } + } + }); + } + + extractChunks(first: string, last: string): Promise { + //$FlowIssue + return Promise.all(this._appends) + .then(() => this._coll) + .then(coll => { + const firstDbKey = this._chunkIndex.get(first); + const lastDbKey = this._chunkIndex.get(last); + if (firstDbKey === undefined) { + throw new Error('Tried to range from unknown chunk: ' + first); + } + if (lastDbKey === undefined) { + throw new Error('Tried to range to unknown chunk: ' + last); + } + return coll.findRange(firstDbKey, lastDbKey); + }); + } + + destroy(): Promise { + return this._coll.then(() => removeDir(this._folder)); + } +} + +function createChunkStream(stream: CursorStream): ChunkStream { + return function(cb: (chunk: Chunk) => void): Promise { + return new Promise(fulfill => { + stream.on('data', (record: DbRecord) => { + const item = recordToItem(record); + cb(new Chunk(item.data)); + }); + + stream.resume(); + stream.on('end', fulfill); + }); + }; +} + +class DbCollection { + _coll: Collection; + + constructor(folder: string) { + const db = new Db(folder, {}); + this._coll = db.collection('puts'); + } + + ensureIndex(obj: Object, options: Object = {}): Promise { + return new Promise((resolve, reject) => { + options.w = 1; + this._coll.ensureIndex(obj, options, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + + insert(item: ChunkItem, options: Object = {}): Promise { + return new Promise((resolve, reject) => { + options.w = 1; + //$FlowIssue + const data = new Binary(new Buffer(item.data.buffer)); + this._coll.insert({hash: item.hash, data: data}, options, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result[0]._id); + } + }); + }); + } + + findOne(hash: string, options: Object = {}): Promise { + return new Promise((resolve, reject) => { + options.w = 1; + this._coll.findOne({hash: hash}, options, (err, record) => { + if (err) { + reject(err); + } else { + resolve(recordToItem(record)); + } + }); + }); + } + + findRange(first: number, last: number, options: Object = {}): ChunkStream { + options.w = 1; + options.hint = {_id: 1}; + const stream = this._coll.find({_id: {$gte: first, $lte: last}}, options).stream(); + stream.pause(); + return createChunkStream(stream); + } + + dropUntil(limit: number, options: Object = {}): Promise { + return new Promise((resolve, reject) => { + options.w = 1; + this._coll.remove({_id: {$lte: limit}}, options, (err, numRemovedDocs) => { + if (err) { + reject(err); + } else { + resolve(numRemovedDocs); + } + }); + }); + } +} + +function recordToItem(record: DbRecord): ChunkItem { + return {hash: record.hash, data: new Uint8Array(record.data.buffer)}; +} + +function makeTempDir(): Promise { + return new Promise((resolve, reject) => { + //$FlowIssue + fs.mkdtemp('/tmp/put-cache-', (err, folder) => { + if (err) { + reject(err); + } else { + resolve(folder); + } + }); + }); +} + +async function removeDir(dir: string): Promise { + await access(dir); + const files = await readdir(dir); + for (const file of files) { + await unlink(dir + '/' + file); + } + return rmdir(dir); +} + +function access(path: string, mode = fs.F_OK): Promise { + return new Promise((resolve, reject) => { + fs.access(path, mode, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +function readdir(path: string): Promise> { + return new Promise((resolve, reject) => { + fs.readdir(path, (err, files) => { + if (err) { + reject(err); + } else { + resolve(files); + } + }); + }); +} + +function rmdir(path: string): Promise { + return new Promise((resolve, reject) => { + fs.rmdir(path, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +function unlink(path: string): Promise { + return new Promise((resolve, reject) => { + fs.unlink(path, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} diff --git a/js/src/value-store-test.js b/js/src/value-store-test.js index 9955405a78..fec0012281 100644 --- a/js/src/value-store-test.js +++ b/js/src/value-store-test.js @@ -116,7 +116,6 @@ suite('ValueStore', () => { const l = await newList([vs.writeValue(1), vs.writeValue(2)]); const r = vs.writeValue(l); - // await vs.flush(); const v = await vs.readValue(r.targetRef); assert.isTrue(equals(l, v));