Merge pull request #1063 from rafael-atticlabs/datastoreCommit

DataStore.commit + tests
This commit is contained in:
Rafael Weinstein
2016-03-10 20:10:05 -08:00
2 changed files with 170 additions and 14 deletions

View File

@@ -4,12 +4,17 @@ import Chunk from './chunk.js';
import Ref from './ref.js';
import Struct from './struct.js';
import type {ChunkStore} from './chunk_store.js';
import type {NomsMap} from './map.js';
import type {NomsSet} from './set.js';
import type {valueOrPrimitive} from './value.js';
import {Field, makeCompoundType, makePrimitiveType, makeStructType, makeType,
Type} from './type.js';
import {Kind} from './noms_kind.js';
import {newMap, NomsMap} from './map.js';
import {newMap} from './map.js';
import {newSet} from './set.js';
import {Package, registerPackage} from './package.js';
import {readValue} from './read_value.js';
import {writeValue} from './encode.js';
type DatasTypes = {
commitTypeDef: Type,
@@ -67,7 +72,7 @@ export class DataStore {
constructor(cs: ChunkStore) {
this._cs = cs;
this._datasets = this._datasetsFromRootRef();
this._datasets = this._datasetsFromRootRef(this.getRoot());
}
getRoot(): Promise<Ref> {
@@ -92,8 +97,8 @@ export class DataStore {
close() {}
_datasetsFromRootRef(): Promise<NomsMap<string, Ref>> {
return this._cs.getRoot().then(rootRef => {
_datasetsFromRootRef(rootRef: Promise<Ref>): Promise<NomsMap<string, Ref>> {
return rootRef.then(rootRef => {
if (rootRef.isEmpty()) {
return getEmptyCommitMap();
}
@@ -111,4 +116,58 @@ export class DataStore {
datasets(): Promise<NomsMap<string, Ref>> {
return this._datasets;
}
async _descendsFrom(commit: Struct, currentHeadRef: Ref): Promise<boolean> {
let ancestors = commit.get('parents');
while (!(await ancestors.has(currentHeadRef))) {
if (ancestors.isEmpty()) {
return false;
}
ancestors = await getAncestors(ancestors, this);
}
return true;
}
async commit(datasetId: string, commit: Struct): Promise<DataStore> {
const currentRootRefP = this.getRoot();
let currentDatasets = await this._datasetsFromRootRef(currentRootRefP);
const currentRootRef = await currentRootRefP;
const commitRef = writeValue(commit, commit.type, this);
if (!currentRootRef.isEmpty()) {
const currentHeadRef = await currentDatasets.get(datasetId);
if (currentHeadRef) {
if (commitRef.equals(currentHeadRef)) {
return this;
}
if (!await this._descendsFrom(commit, currentHeadRef)) {
throw new Error('Merge needed');
}
}
}
currentDatasets = await currentDatasets.set(datasetId, commitRef);
const newRootRef = writeValue(currentDatasets, currentDatasets.type, this);
if (await this.updateRoot(newRootRef, currentRootRef)) {
return new DataStore(this._cs);
}
throw new Error('Optimistic lock failed');
}
}
async function getAncestors(commits: NomsSet<Ref>, store: ChunkStore): Promise<NomsSet<Ref>> {
let ancestors = await newSet(getDatasTypes().commitSetType, []);
await commits.map(async (commitRef) => {
const commit = await readValue(commitRef, store);
await commit.get('parents').map(async (ref) => ancestors = await ancestors.insert(ref));
});
return ancestors;
}
export function newCommit(value: valueOrPrimitive, parents: Array<Ref> = []):
Promise<Struct> {
const types = getDatasTypes();
return newSet(types.commitSetType, parents).then(parents =>
new Struct(types.commitType, types.commitTypeDef, {value,parents}));
}

View File

@@ -5,12 +5,10 @@ import {suite, test} from 'mocha';
import Chunk from './chunk.js';
import MemoryStore from './memory_store.js';
import Ref from './ref.js';
import Struct from './struct.js';
import {assert} from 'chai';
import {DataStore, getDatasTypes} from './datastore.js';
import {invariant} from './assert.js';
import {DataStore, getDatasTypes, newCommit} from './datastore.js';
import {invariant, notNull} from './assert.js';
import {newMap} from './map.js';
import {newSet} from './set.js';
import {writeValue} from './encode.js';
suite('DataStore', () => {
@@ -34,22 +32,121 @@ suite('DataStore', () => {
assert.isTrue(has);
});
test('empty datasets', async() => {
test('commit', async () => {
const ms = new MemoryStore();
let ds = new DataStore(ms);
const datasetID = 'ds1';
const datasets = await ds.datasets();
assert.isTrue(datasets.isEmpty());
// |a|
const aCommit = await newCommit('a');
const ds2 = await ds.commit(datasetID, aCommit);
// The old datastore still still has no head.
assert.isNull(await ds.head(datasetID));
// The new datastore has |a|.
const aCommit1 = notNull(await ds2.head(datasetID));
assert.strictEqual('a', aCommit1.get('value'));
ds = ds2;
// |a| <- |b|
const bCommit = await newCommit('b', [aCommit.ref]);
ds = await ds.commit(datasetID, bCommit);
assert.strictEqual('b', notNull(await ds.head(datasetID)).get('value'));
// |a| <- |b|
// \----|c|
// Should be disallowed.
const cCommit = await newCommit('c');
let message = '';
try {
await ds.commit(datasetID, cCommit);
throw new Error('not reached');
} catch (ex) {
message = ex.message;
}
assert.strictEqual('Merge needed', message);
assert.strictEqual('b', notNull(await ds.head(datasetID)).get('value'));
// |a| <- |b| <- |d|
const dCommit = await newCommit('d', [bCommit.ref]);
ds = await ds.commit(datasetID, dCommit);
assert.strictEqual('d', notNull(await ds.head(datasetID)).get('value'));
// Attempt to recommit |b| with |a| as parent.
// Should be disallowed.
try {
await ds.commit(datasetID, bCommit);
throw new Error('not reached');
} catch (ex) {
message = ex.message;
}
// assert.strictEqual('Merge needed', message);
assert.strictEqual('d', notNull(await ds.head(datasetID)).get('value'));
// Add a commit to a different datasetId
ds = await ds.commit('otherDs', aCommit);
assert.strictEqual('a', notNull(await ds.head('otherDs')).get('value'));
// Get a fresh datastore, and verify that both datasets are present
const newDs = new DataStore(ms);
assert.strictEqual('d', notNull(await newDs.head(datasetID)).get('value'));
assert.strictEqual('a', notNull(await newDs.head('otherDs')).get('value'));
});
test('concurrency', async () => {
const ms = new MemoryStore();
let ds = new DataStore(ms);
const datasetID = 'ds1';
// |a|
const aCommit = await newCommit('a');
ds = await ds.commit(datasetID, aCommit);
const bCommit = await newCommit('b', [aCommit.ref]);
ds = await ds.commit(datasetID, bCommit);
assert.strictEqual('b', notNull(await ds.head(datasetID)).get('value'));
// Important to create this here.
let ds2 = new DataStore(ms);
// Change 1:
// |a| <- |b| <- |c|
const cCommit = await newCommit('c', [bCommit.ref]);
ds = await ds.commit(datasetID, cCommit);
assert.strictEqual('c', notNull(await ds.head(datasetID)).get('value'));
// Change 2:
// |a| <- |b| <- |e|
// Should be disallowed, DataStore returned by Commit() should have |c| as Head.
const eCommit = await newCommit('e', [bCommit.ref]);
let message = '';
try {
ds2 = await ds2.commit(datasetID, eCommit);
throw new Error('not reached');
} catch (ex) {
message = ex.message;
}
assert.strictEqual('Merge needed', message);
assert.strictEqual('c', notNull(await ds.head(datasetID)).get('value'));
});
test('empty datasets', async () => {
const ms = new MemoryStore();
const ds = new DataStore(ms);
const datasets = await ds.datasets();
assert.strictEqual(0, datasets.size);
});
test('head', async() => {
test('head', async () => {
const ms = new MemoryStore();
let ds = new DataStore(ms);
const types = getDatasTypes();
const commit = new Struct(types.commitType, types.commitTypeDef, {
value: 'foo',
parents: await newSet(types.commitSetType, []),
});
const commit = await newCommit('foo', []);
const commitRef = writeValue(commit, commit.type, ms);
const datasets = await newMap(types.commitMapType, ['foo', commitRef]);