Use the same cursor when initializing and finalizing the chunker (#2729)

Previously we would clone them from the original cursor, to (a) not
modify the original cursor, and (b) have initialization and finalization
not interfere with each other.

However, this isn't necessary and it just creates unnecessary churn. For
example, when we read-ahead, it would be wasteful to re-read the
read-head chunks from initialization.
This commit is contained in:
Ben Kalman
2016-10-20 16:04:03 -07:00
committed by GitHub
parent d94fb97788
commit 007ba18987
2 changed files with 26 additions and 29 deletions

View File

@@ -29,11 +29,12 @@ func newEmptySequenceChunker(vr ValueReader, vw ValueWriter, makeChunk, parentMa
}
func newSequenceChunker(cur *sequenceCursor, vr ValueReader, vw ValueWriter, makeChunk, parentMakeChunk makeChunkFn, hashValueBytes hashValueBytesFn) *sequenceChunker {
// |cur| will be nil if this is a new sequence, implying this is a new tree, or the tree has grown in height relative to its original chunked form.
d.PanicIfFalse(makeChunk != nil)
d.PanicIfFalse(parentMakeChunk != nil)
d.PanicIfFalse(hashValueBytes != nil)
// |cur| will be nil if this is a new sequence, implying this is a new tree, or the tree has grown in height relative to its original chunked form.
sc := &sequenceChunker{
cur,
vr,
@@ -62,13 +63,12 @@ func (sc *sequenceChunker) resume() {
// Number of previous items' value bytes which must be hashed into the boundary checker.
primeHashBytes := int64(sc.rv.window)
retreater := sc.cur.clone()
appendCount := 0
primeHashCount := 0
// If the cursor is beyond the final position in the sequence, then we can't tell the difference between it having been an explicit and implicit boundary. Since the caller may be about to append another value, we need to know whether the existing final item is an explicit chunk boundary.
cursorBeyondFinal := sc.cur.idx == sc.cur.length()
if cursorBeyondFinal && retreater.retreatMaybeAllowBeforeStart(false) {
if cursorBeyondFinal && sc.cur.retreatMaybeAllowBeforeStart(false) {
// In that case, we prime enough items *prior* to the final item to be correct.
appendCount++
primeHashCount++
@@ -76,28 +76,28 @@ func (sc *sequenceChunker) resume() {
// Walk backwards to the start of the existing chunk.
sc.rv.lengthOnly = true
for retreater.indexInChunk() > 0 && retreater.retreatMaybeAllowBeforeStart(false) {
for sc.cur.indexInChunk() > 0 && sc.cur.retreatMaybeAllowBeforeStart(false) {
appendCount++
if primeHashBytes > 0 {
primeHashCount++
sc.rv.ClearLastBoundary()
sc.hashValueBytes(retreater.current(), sc.rv)
sc.hashValueBytes(sc.cur.current(), sc.rv)
primeHashBytes -= int64(sc.rv.bytesHashed)
}
}
// If the hash window won't be filled by the preceding items in the current chunk, walk further back until they will.
for primeHashBytes > 0 && retreater.retreatMaybeAllowBeforeStart(false) {
for primeHashBytes > 0 && sc.cur.retreatMaybeAllowBeforeStart(false) {
primeHashCount++
sc.rv.ClearLastBoundary()
sc.hashValueBytes(retreater.current(), sc.rv)
sc.hashValueBytes(sc.cur.current(), sc.rv)
primeHashBytes -= int64(sc.rv.bytesHashed)
}
sc.rv.lengthOnly = false
for primeHashCount > 0 || appendCount > 0 {
item := retreater.current()
retreater.advance()
item := sc.cur.current()
sc.cur.advance()
if primeHashCount > appendCount {
// Before the start of the current chunk: just hash value bytes into window
@@ -255,22 +255,21 @@ func (sc *sequenceChunker) finalizeCursor() {
// Append the rest of the values in the sequence, up to the window size, plus the rest of that chunk. It needs to be the full window size because anything that was appended/skipped between chunker construction and finalization will have changed the hash state.
hashWindow := int64(sc.rv.window)
fzr := sc.cur.clone()
isBoundary := len(sc.current) == 0
// We can terminate when: (1) we hit the end input in this sequence or (2) we process beyond the hash window and encounter an item which is boundary in both the old and new state of the sequence.
for i := 0; fzr.valid() && (hashWindow > 0 || fzr.indexInChunk() > 0 || !isBoundary); i++ {
if i == 0 || fzr.indexInChunk() == 0 {
for i := 0; sc.cur.valid() && (hashWindow > 0 || sc.cur.indexInChunk() > 0 || !isBoundary); i++ {
if i == 0 || sc.cur.indexInChunk() == 0 {
// Every time we step into a chunk from the original sequence, that chunk will no longer exist in the new sequence. The parent must be instructed to skip it.
sc.skipParentIfExists()
}
item := fzr.current()
item := sc.cur.current()
sc.current = append(sc.current, item)
isBoundary = false
fzr.advance()
sc.cur.advance()
if hashWindow > 0 {
// While we are within the hash window, we need to continue to hash items into the rolling hash and explicitly check for resulting boundaries.
@@ -278,7 +277,7 @@ func (sc *sequenceChunker) finalizeCursor() {
sc.hashValueBytes(item, sc.rv)
hashWindow -= int64(sc.rv.bytesHashed)
isBoundary = sc.rv.crossedBoundary
} else if fzr.indexInChunk() == 0 {
} else if sc.cur.indexInChunk() == 0 {
// Once we are beyond the hash window, we know that boundaries can only occur in the same place they did within the existing sequence.
isBoundary = true
}

View File

@@ -100,7 +100,6 @@ export default class SequenceChunker<T, S: Sequence<T>> {
// Number of previous items which must be hashed into the boundary checker.
let primeHashBytes = this._rv.window;
const retreater = cursor.clone();
let appendCount = 0;
let primeHashCount = 0;
@@ -109,7 +108,7 @@ export default class SequenceChunker<T, S: Sequence<T>> {
// append another value, we need to know whether the existing final item is an explicit chunk
// boundary.
const cursorBeyondFinal = cursor.idx === cursor.length;
if (cursorBeyondFinal && await retreater._retreatMaybeAllowBeforeStart(false)) {
if (cursorBeyondFinal && await cursor._retreatMaybeAllowBeforeStart(false)) {
// In that case, we prime enough items *prior* to the final item to be correct.
appendCount++;
primeHashCount++;
@@ -117,29 +116,29 @@ export default class SequenceChunker<T, S: Sequence<T>> {
// Walk backwards to the start of the existing chunk.
this._rv.lengthOnly = true;
while (retreater.indexInChunk > 0 && await retreater._retreatMaybeAllowBeforeStart(false)) {
while (cursor.indexInChunk > 0 && await cursor._retreatMaybeAllowBeforeStart(false)) {
appendCount++;
if (primeHashBytes > 0) {
primeHashCount++;
this._rv.clearLastBoundary();
this._hashValueBytes(retreater.getCurrent(), this._rv);
this._hashValueBytes(cursor.getCurrent(), this._rv);
primeHashBytes -= this._rv.bytesHashed;
}
}
// If the hash window won't be filled by the preceeding items in the current chunk, walk
// further back until they will.
while (primeHashBytes > 0 && await retreater._retreatMaybeAllowBeforeStart(false)) {
while (primeHashBytes > 0 && await cursor._retreatMaybeAllowBeforeStart(false)) {
primeHashCount++;
this._rv.clearLastBoundary();
this._hashValueBytes(retreater.getCurrent(), this._rv);
this._hashValueBytes(cursor.getCurrent(), this._rv);
primeHashBytes -= this._rv.bytesHashed;
}
this._rv.lengthOnly = false;
while (primeHashCount > 0 || appendCount > 0) {
const item = retreater.getCurrent();
await retreater.advance();
const item = cursor.getCurrent();
await cursor.advance();
if (primeHashCount > appendCount) {
// Before the start of the current chunk: just hash value bytes into window.
@@ -356,7 +355,6 @@ export default class SequenceChunker<T, S: Sequence<T>> {
// chunk. It needs to be the full window size because anything that was appended/skipped
// between chunker construction and finalization will have changed the hash state.
let hashWindow = this._rv.window;
const fzr = cursor.clone();
let isBoundary = this._current.length === 0;
@@ -364,18 +362,18 @@ export default class SequenceChunker<T, S: Sequence<T>> {
// the hash window and encounter an item which is boundary in both the old and new state of the
// sequence.
let i = 0;
for (; fzr.valid && (hashWindow > 0 || fzr.indexInChunk > 0 || !isBoundary); i++) {
if (i === 0 || fzr.indexInChunk === 0) {
for (; cursor.valid && (hashWindow > 0 || cursor.indexInChunk > 0 || !isBoundary); i++) {
if (i === 0 || cursor.indexInChunk === 0) {
// Every time we step into a chunk from the original sequence, that chunk will no longer
// exist in the new sequence. The parent must be instructed to skip it.
await this.skipParentIfExists();
}
const item = fzr.getCurrent();
const item = cursor.getCurrent();
this._current.push(item);
isBoundary = false;
await fzr.advance();
await cursor.advance();
if (hashWindow > 0) {
// While we are within the hash window, append items (which explicit checks the hash value
@@ -384,7 +382,7 @@ export default class SequenceChunker<T, S: Sequence<T>> {
this._hashValueBytes(item, this._rv);
isBoundary = this._rv.crossedBoundary;
hashWindow -= this._rv.bytesHashed;
} else if (fzr.indexInChunk === 0) {
} else if (cursor.indexInChunk === 0) {
// Once we are beyond the hash window, we know that boundaries can only occur in the same
// place they did within the existing sequence.
isBoundary = true;