mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-14 19:38:56 -05:00
Go: Make ValidatingBatchSink read Values during validation (#1646)
In order to ensure that pulling works without requiring extensive Hint gymnastics on the client, we're willing to relax our server-side requirement that every writeValue request contain all information needed to complete validation. Now, in the event that the validation code encounters a Ref that's not in its validation cache, it'll just go ReadValue() it and soldier on. Toward #1568
This commit is contained in:
@@ -82,6 +82,8 @@ func HandleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
for c := range chunkChan {
|
||||
if bpe == nil {
|
||||
bpe = vbs.Enqueue(c)
|
||||
} else {
|
||||
bpe = append(bpe, c.Hash())
|
||||
}
|
||||
// If a previous Enqueue() errored, we still need to drain chunkChan
|
||||
// TODO: what about having DeserializeToChan take a 'done' channel to stop it?
|
||||
@@ -102,7 +104,7 @@ func HandleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("Error: %v\nSaw hashes %v", err, hashes), http.StatusBadRequest)
|
||||
http.Error(w, fmt.Sprintf("Error: %v\nChunks in payload: %v", err, hashes), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ func (vbs *ValidatingBatchingSink) Enqueue(c chunks.Chunk) chunks.BackpressureEr
|
||||
v := DecodeChunk(c, vbs.vs)
|
||||
d.Exp.NotNil(v, "Chunk with hash %s failed to decode", h)
|
||||
d.Exp.Equal(EnsureHash(&hash.Hash{}, v), h)
|
||||
vbs.vs.checkChunksInCache(v)
|
||||
vbs.vs.ensureChunksInCache(v)
|
||||
vbs.vs.set(h, hintedChunk{v.Type(), h})
|
||||
|
||||
vbs.batch[vbs.count] = c
|
||||
|
||||
+23
-12
@@ -72,7 +72,7 @@ func (lvs *ValueStore) WriteValue(v Value) Ref {
|
||||
if lvs.isPresent(hash) {
|
||||
return r
|
||||
}
|
||||
hints := lvs.checkChunksInCache(v)
|
||||
hints := lvs.chunkHintsFromCache(v)
|
||||
lvs.bs.SchedulePut(c, height, hints)
|
||||
lvs.set(hash, hintedChunk{v.Type(), hash})
|
||||
return r
|
||||
@@ -123,26 +123,37 @@ func (lvs *ValueStore) checkAndSet(r hash.Hash, entry chunkCacheEntry) {
|
||||
}
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) checkChunksInCache(v Value) map[hash.Hash]struct{} {
|
||||
func (lvs *ValueStore) chunkHintsFromCache(v Value) Hints {
|
||||
return lvs.checkChunksInCache(v, false)
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) ensureChunksInCache(v Value) {
|
||||
lvs.checkChunksInCache(v, true)
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) checkChunksInCache(v Value, readValues bool) Hints {
|
||||
hints := map[hash.Hash]struct{}{}
|
||||
for _, reachable := range v.Chunks() {
|
||||
entry := lvs.check(reachable.TargetHash())
|
||||
// First, check the type cache to see if reachable is already known to be valid.
|
||||
targetHash := reachable.TargetHash()
|
||||
entry := lvs.check(targetHash)
|
||||
|
||||
// If it's not already in the cache, attempt to read the value directly, which will put it and its chunks into the cache.
|
||||
if entry == nil || !entry.Present() {
|
||||
d.Exp.Fail("Attempted to write Value containing Ref to non-existent object.", "%s\n, contains ref %s, which points to a non-existent Value.", EncodedValueWithTags(v), reachable.TargetHash())
|
||||
var reachableV Value
|
||||
if readValues {
|
||||
reachableV = lvs.ReadValue(targetHash)
|
||||
entry = lvs.check(targetHash)
|
||||
}
|
||||
if reachableV == nil {
|
||||
d.Exp.Fail("Attempted to write Value containing Ref to non-existent object.", "%s\n, contains ref %s, which points to a non-existent Value.", EncodedValueWithTags(v), reachable.TargetHash())
|
||||
}
|
||||
}
|
||||
if hint := entry.Hint(); !hint.IsEmpty() {
|
||||
hints[hint] = struct{}{}
|
||||
}
|
||||
|
||||
// BUG 1121
|
||||
// It's possible that entry.Type() will be simply 'Value', but that 'reachable' is actually a
|
||||
// properly-typed object -- that is, a Ref to some specific Type. The Exp below would fail,
|
||||
// though it's possible that the Type is actually correct. We wouldn't be able to verify
|
||||
// without reading it, though, so we'll dig into this later.
|
||||
targetType := getTargetType(reachable)
|
||||
if targetType.Equals(ValueType) {
|
||||
continue
|
||||
}
|
||||
d.Exp.True(entry.Type().Equals(targetType), "Value to write contains ref %s, which points to a value of a different type: %+v != %+v", reachable.TargetHash(), entry.Type(), targetType)
|
||||
}
|
||||
return hints
|
||||
|
||||
@@ -78,7 +78,58 @@ func TestCheckChunksInCache(t *testing.T) {
|
||||
cvs.set(b.Hash(), hintedChunk{b.Type(), b.Hash()})
|
||||
|
||||
bref := NewRef(b)
|
||||
assert.NotPanics(func() { cvs.checkChunksInCache(bref) })
|
||||
assert.NotPanics(func() { cvs.chunkHintsFromCache(bref) })
|
||||
}
|
||||
|
||||
func TestCheckChunksNotInCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
cvs := newLocalValueStore(cs)
|
||||
|
||||
b := NewEmptyBlob()
|
||||
cs.Put(EncodeValue(b, nil))
|
||||
|
||||
bref := NewRef(b)
|
||||
assert.Panics(func() { cvs.chunkHintsFromCache(bref) })
|
||||
}
|
||||
|
||||
func TestEnsureChunksInCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
cvs := newLocalValueStore(cs)
|
||||
|
||||
b := NewEmptyBlob()
|
||||
s := NewString("oy")
|
||||
bref := NewRef(b)
|
||||
sref := NewRef(s)
|
||||
l := NewList(bref, sref)
|
||||
|
||||
cs.Put(EncodeValue(b, nil))
|
||||
cs.Put(EncodeValue(s, nil))
|
||||
cs.Put(EncodeValue(l, nil))
|
||||
|
||||
assert.NotPanics(func() { cvs.ensureChunksInCache(bref) })
|
||||
assert.NotPanics(func() { cvs.ensureChunksInCache(l) })
|
||||
}
|
||||
|
||||
func TestEnsureChunksFails(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
cvs := newLocalValueStore(cs)
|
||||
|
||||
b := NewEmptyBlob()
|
||||
bref := NewRef(b)
|
||||
assert.Panics(func() { cvs.ensureChunksInCache(bref) })
|
||||
|
||||
s := NewString("oy")
|
||||
cs.Put(EncodeValue(b, nil))
|
||||
cs.Put(EncodeValue(s, nil))
|
||||
|
||||
badRef := constructRef(MakeRefType(MakePrimitiveType(BoolKind)), s.Hash(), 1)
|
||||
l := NewList(bref, badRef)
|
||||
|
||||
cs.Put(EncodeValue(l, nil))
|
||||
assert.Panics(func() { cvs.ensureChunksInCache(l) })
|
||||
}
|
||||
|
||||
func TestCacheOnReadValue(t *testing.T) {
|
||||
@@ -115,7 +166,7 @@ func TestHintsOnCache(t *testing.T) {
|
||||
bref := cvs.WriteValue(NewBlob(bytes.NewBufferString("g")))
|
||||
l = l.Insert(0, bref)
|
||||
|
||||
hints := cvs.checkChunksInCache(l)
|
||||
hints := cvs.chunkHintsFromCache(l)
|
||||
if assert.Len(hints, 2) {
|
||||
for _, hash := range []hash.Hash{v.Hash(), bref.TargetHash()} {
|
||||
_, present := hints[hash]
|
||||
|
||||
Reference in New Issue
Block a user