mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-24 03:09:22 -06:00
Make sure that set/map diff has completed before returning from noms log (#2249)
This fixes https://github.com/attic-labs/noms/issues/2235 where set/map diff was still running when noms log has finished, and already closed its database. Commonly this would crash. I've removed panic-based control flow to make error handling explicit, and added a "DiffLeftRight" method to set/map so that noms log completes ASAP. See the issue for details.
This commit is contained in:
@@ -7,9 +7,14 @@ package diff
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/dustin/go-humanize"
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
)
|
||||
|
||||
type (
|
||||
diffFunc func(changeChan chan<- types.ValueChanged, stopChan <-chan struct{})
|
||||
lineFunc func(w io.Writer, op prefixOp, key, val types.Value) error
|
||||
valueFunc func(k types.Value) types.Value
|
||||
)
|
||||
|
||||
func shouldDescend(v1, v2 types.Value) bool {
|
||||
@@ -18,231 +23,218 @@ func shouldDescend(v1, v2 types.Value) bool {
|
||||
}
|
||||
|
||||
func Diff(w io.Writer, v1, v2 types.Value) error {
|
||||
return d.Try(func() {
|
||||
diff(w, types.NewPath(), nil, v1, v2)
|
||||
})
|
||||
return diff(w, types.NewPath(), nil, v1, v2)
|
||||
}
|
||||
|
||||
func diff(w io.Writer, p types.Path, key, v1, v2 types.Value) {
|
||||
if !v1.Equals(v2) {
|
||||
if shouldDescend(v1, v2) {
|
||||
switch v1.Type().Kind() {
|
||||
case types.ListKind:
|
||||
diffLists(w, p, v1.(types.List), v2.(types.List))
|
||||
case types.MapKind:
|
||||
diffMaps(w, p, v1.(types.Map), v2.(types.Map))
|
||||
case types.SetKind:
|
||||
diffSets(w, p, v1.(types.Set), v2.(types.Set))
|
||||
case types.StructKind:
|
||||
diffStructs(w, p, v1.(types.Struct), v2.(types.Struct))
|
||||
default:
|
||||
panic("Unrecognized type in diff function")
|
||||
}
|
||||
} else {
|
||||
line(w, DEL, key, v1)
|
||||
line(w, ADD, key, v2)
|
||||
func diff(w io.Writer, p types.Path, key, v1, v2 types.Value) error {
|
||||
if v1.Equals(v2) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if shouldDescend(v1, v2) {
|
||||
switch v1.Type().Kind() {
|
||||
case types.ListKind:
|
||||
return diffLists(w, p, v1.(types.List), v2.(types.List))
|
||||
case types.MapKind:
|
||||
return diffMaps(w, p, v1.(types.Map), v2.(types.Map))
|
||||
case types.SetKind:
|
||||
return diffSets(w, p, v1.(types.Set), v2.(types.Set))
|
||||
case types.StructKind:
|
||||
return diffStructs(w, p, v1.(types.Struct), v2.(types.Struct))
|
||||
default:
|
||||
panic("Unrecognized type in diff function")
|
||||
}
|
||||
}
|
||||
|
||||
line(w, DEL, key, v1)
|
||||
return line(w, ADD, key, v2)
|
||||
}
|
||||
|
||||
func diffLists(w io.Writer, p types.Path, v1, v2 types.List) {
|
||||
func diffLists(w io.Writer, p types.Path, v1, v2 types.List) (err error) {
|
||||
spliceChan := make(chan types.Splice)
|
||||
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
|
||||
|
||||
defer stop(stopChan)
|
||||
go func() {
|
||||
v2.Diff(v1, spliceChan, stopChan)
|
||||
close(spliceChan)
|
||||
}()
|
||||
|
||||
wroteHeader := false
|
||||
wroteHdr := false
|
||||
|
||||
for splice := range spliceChan {
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if splice.SpRemoved == splice.SpAdded {
|
||||
// Heuristic: list only has modifications.
|
||||
for i := uint64(0); i < splice.SpRemoved; i++ {
|
||||
lastEl := v1.Get(splice.SpAt + i)
|
||||
newEl := v2.Get(splice.SpFrom + i)
|
||||
if shouldDescend(lastEl, newEl) {
|
||||
idx := types.Number(splice.SpAt + i)
|
||||
diff(w, p.AddIndex(idx), idx, lastEl, newEl)
|
||||
writeFooter(w, &wroteHdr)
|
||||
err = diff(w, p.AddIndex(idx), idx, lastEl, newEl)
|
||||
} else {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
writeHeader(w, p, &wroteHdr)
|
||||
line(w, DEL, nil, v1.Get(splice.SpAt+i))
|
||||
line(w, ADD, nil, v2.Get(splice.SpFrom+i))
|
||||
err = line(w, ADD, nil, v2.Get(splice.SpFrom+i))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for i := uint64(0); i < splice.SpRemoved; i++ {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, DEL, nil, v1.Get(splice.SpAt+i))
|
||||
}
|
||||
for i := uint64(0); i < splice.SpAdded; i++ {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, ADD, nil, v2.Get(splice.SpFrom+i))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Heuristic: list only has additions/removals.
|
||||
for i := uint64(0); i < splice.SpRemoved && err == nil; i++ {
|
||||
writeHeader(w, p, &wroteHdr)
|
||||
err = line(w, DEL, nil, v1.Get(splice.SpAt+i))
|
||||
}
|
||||
for i := uint64(0); i < splice.SpAdded && err == nil; i++ {
|
||||
writeHeader(w, p, &wroteHdr)
|
||||
err = line(w, ADD, nil, v2.Get(splice.SpFrom+i))
|
||||
}
|
||||
}
|
||||
|
||||
writeFooter(w, wroteHeader)
|
||||
err = writeFooter(w, &wroteHdr)
|
||||
|
||||
if err != nil {
|
||||
stopChan <- struct{}{}
|
||||
// Wait for diff to stop.
|
||||
for range spliceChan {
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func diffMaps(w io.Writer, p types.Path, v1, v2 types.Map) {
|
||||
func diffMaps(w io.Writer, p types.Path, v1, v2 types.Map) error {
|
||||
return diffOrdered(w, p, line, func(cc chan<- types.ValueChanged, sc <-chan struct{}) {
|
||||
v2.DiffLeftRight(v1, cc, sc)
|
||||
},
|
||||
func(k types.Value) types.Value { return k },
|
||||
func(k types.Value) types.Value { return v1.Get(k) },
|
||||
func(k types.Value) types.Value { return v2.Get(k) },
|
||||
)
|
||||
}
|
||||
|
||||
func diffStructs(w io.Writer, p types.Path, v1, v2 types.Struct) error {
|
||||
return diffOrdered(w, p, field, func(cc chan<- types.ValueChanged, sc <-chan struct{}) {
|
||||
v2.Diff(v1, cc, sc)
|
||||
},
|
||||
func(k types.Value) types.Value { return k },
|
||||
func(k types.Value) types.Value { return v1.Get(string(k.(types.String))) },
|
||||
func(k types.Value) types.Value { return v2.Get(string(k.(types.String))) },
|
||||
)
|
||||
}
|
||||
|
||||
func diffSets(w io.Writer, p types.Path, v1, v2 types.Set) error {
|
||||
return diffOrdered(w, p, line, func(cc chan<- types.ValueChanged, sc <-chan struct{}) {
|
||||
v2.DiffLeftRight(v1, cc, sc)
|
||||
},
|
||||
func(k types.Value) types.Value { return nil },
|
||||
func(k types.Value) types.Value { return k },
|
||||
func(k types.Value) types.Value { return k },
|
||||
)
|
||||
}
|
||||
|
||||
func diffOrdered(w io.Writer, p types.Path, lf lineFunc, df diffFunc, kf, v1, v2 valueFunc) (err error) {
|
||||
changeChan := make(chan types.ValueChanged)
|
||||
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
|
||||
|
||||
defer stop(stopChan)
|
||||
go func() {
|
||||
v2.Diff(v1, changeChan, stopChan)
|
||||
df(changeChan, stopChan)
|
||||
close(changeChan)
|
||||
}()
|
||||
|
||||
wroteHeader := false
|
||||
wroteHdr := false
|
||||
|
||||
for change := range changeChan {
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
k := kf(change.V)
|
||||
|
||||
switch change.ChangeType {
|
||||
case types.DiffChangeAdded:
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, ADD, change.V, v2.Get(change.V))
|
||||
writeHeader(w, p, &wroteHdr)
|
||||
err = lf(w, ADD, k, v2(change.V))
|
||||
case types.DiffChangeRemoved:
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, DEL, change.V, v1.Get(change.V))
|
||||
writeHeader(w, p, &wroteHdr)
|
||||
err = lf(w, DEL, k, v1(change.V))
|
||||
case types.DiffChangeModified:
|
||||
c1, c2 := v1.Get(change.V), v2.Get(change.V)
|
||||
c1, c2 := v1(change.V), v2(change.V)
|
||||
if shouldDescend(c1, c2) {
|
||||
wroteHeader = writeFooter(w, wroteHeader)
|
||||
diff(w, p.AddIndex(change.V), change.V, c1, c2)
|
||||
writeFooter(w, &wroteHdr)
|
||||
err = diff(w, p.AddIndex(k), change.V, c1, c2)
|
||||
} else {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, DEL, change.V, c1)
|
||||
line(w, ADD, change.V, c2)
|
||||
writeHeader(w, p, &wroteHdr)
|
||||
lf(w, DEL, k, c1)
|
||||
err = lf(w, ADD, k, c2)
|
||||
}
|
||||
default:
|
||||
panic("unknown change type")
|
||||
}
|
||||
}
|
||||
|
||||
writeFooter(w, wroteHeader)
|
||||
}
|
||||
writeFooter(w, &wroteHdr)
|
||||
|
||||
func diffStructs(w io.Writer, p types.Path, v1, v2 types.Struct) {
|
||||
changeChan := make(chan types.ValueChanged)
|
||||
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
|
||||
|
||||
defer stop(stopChan)
|
||||
go func() {
|
||||
v2.Diff(v1, changeChan, stopChan)
|
||||
close(changeChan)
|
||||
}()
|
||||
|
||||
wroteHeader := false
|
||||
|
||||
for change := range changeChan {
|
||||
fn := string(change.V.(types.String))
|
||||
switch change.ChangeType {
|
||||
case types.DiffChangeAdded:
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
field(w, ADD, change.V, v2.Get(fn))
|
||||
case types.DiffChangeRemoved:
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
field(w, DEL, change.V, v1.Get(fn))
|
||||
case types.DiffChangeModified:
|
||||
f1 := v1.Get(fn)
|
||||
f2 := v2.Get(fn)
|
||||
if shouldDescend(f1, f2) {
|
||||
diff(w, p.AddField(fn), types.String(fn), f1, f2)
|
||||
} else {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
field(w, DEL, change.V, f1)
|
||||
field(w, ADD, change.V, f2)
|
||||
}
|
||||
if err != nil {
|
||||
stopChan <- struct{}{}
|
||||
// Wait for diff to stop.
|
||||
for range changeChan {
|
||||
}
|
||||
}
|
||||
writeFooter(w, wroteHeader)
|
||||
return
|
||||
}
|
||||
|
||||
func diffSets(w io.Writer, p types.Path, v1, v2 types.Set) {
|
||||
changeChan := make(chan types.ValueChanged)
|
||||
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
|
||||
|
||||
defer stop(stopChan)
|
||||
go func() {
|
||||
v2.Diff(v1, changeChan, stopChan)
|
||||
close(changeChan)
|
||||
}()
|
||||
|
||||
wroteHeader := false
|
||||
|
||||
for change := range changeChan {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
switch change.ChangeType {
|
||||
case types.DiffChangeAdded:
|
||||
line(w, ADD, nil, change.V)
|
||||
case types.DiffChangeRemoved:
|
||||
line(w, DEL, nil, change.V)
|
||||
default:
|
||||
panic("unknown change type")
|
||||
}
|
||||
func writeHeader(w io.Writer, p types.Path, wroteHdr *bool) error {
|
||||
if *wroteHdr {
|
||||
return nil
|
||||
}
|
||||
|
||||
writeFooter(w, wroteHeader)
|
||||
*wroteHdr = true
|
||||
hdr := "(root)"
|
||||
if len(p) > 0 {
|
||||
hdr = p.String()
|
||||
}
|
||||
return write(w, []byte(hdr+" {\n"))
|
||||
}
|
||||
|
||||
func line(w io.Writer, op prefixOp, key, val types.Value) {
|
||||
func writeFooter(w io.Writer, wroteHdr *bool) error {
|
||||
if !*wroteHdr {
|
||||
return nil
|
||||
}
|
||||
*wroteHdr = false
|
||||
return write(w, []byte(" }\n"))
|
||||
}
|
||||
|
||||
func line(w io.Writer, op prefixOp, key, val types.Value) error {
|
||||
pw := newPrefixWriter(w, op)
|
||||
if key != nil {
|
||||
writeEncodedValue(pw, key)
|
||||
write(w, []byte(": "))
|
||||
}
|
||||
writeEncodedValue(pw, val)
|
||||
write(w, []byte("\n"))
|
||||
return write(w, []byte("\n"))
|
||||
}
|
||||
|
||||
func field(w io.Writer, op prefixOp, name, val types.Value) {
|
||||
func field(w io.Writer, op prefixOp, name, val types.Value) error {
|
||||
pw := newPrefixWriter(w, op)
|
||||
write(pw, []byte(name.(types.String)))
|
||||
write(w, []byte(": "))
|
||||
writeEncodedValue(pw, val)
|
||||
write(w, []byte("\n"))
|
||||
return write(w, []byte("\n"))
|
||||
}
|
||||
|
||||
func writeHeader(w io.Writer, wroteHeader bool, p types.Path) bool {
|
||||
if !wroteHeader {
|
||||
if len(p) == 0 {
|
||||
write(w, []byte("(root)"))
|
||||
} else {
|
||||
write(w, []byte(p.String()))
|
||||
}
|
||||
write(w, []byte(" {\n"))
|
||||
func writeEncodedValue(w io.Writer, v types.Value) error {
|
||||
if v.Type().Kind() != types.BlobKind {
|
||||
return types.WriteEncodedValue(w, v)
|
||||
}
|
||||
return true
|
||||
write(w, []byte("Blob ("))
|
||||
write(w, []byte(humanize.Bytes(v.(types.Blob).Len())))
|
||||
return write(w, []byte(")"))
|
||||
}
|
||||
|
||||
func writeFooter(w io.Writer, wroteHeader bool) bool {
|
||||
if wroteHeader {
|
||||
write(w, []byte(" }\n"))
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func write(w io.Writer, b []byte) {
|
||||
func write(w io.Writer, b []byte) error {
|
||||
_, err := w.Write(b)
|
||||
d.PanicIfError(err)
|
||||
}
|
||||
|
||||
func writeEncodedValue(w io.Writer, v types.Value) {
|
||||
if v.Type().Kind() == types.BlobKind {
|
||||
w.Write([]byte("Blob ("))
|
||||
w.Write([]byte(humanize.Bytes(v.(types.Blob).Len())))
|
||||
w.Write([]byte(")"))
|
||||
} else {
|
||||
d.PanicIfError(types.WriteEncodedValue(w, v))
|
||||
}
|
||||
}
|
||||
|
||||
func writeEncodedValueWithTags(w io.Writer, v types.Value) {
|
||||
d.PanicIfError(types.WriteEncodedValueWithTags(w, v))
|
||||
}
|
||||
|
||||
func stop(ch chan<- struct{}) {
|
||||
ch <- struct{}{}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"github.com/attic-labs/noms/go/datas"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/attic-labs/noms/go/util/status"
|
||||
"github.com/dustin/go-humanize"
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
)
|
||||
|
||||
// Summary prints a summary of the diff between two values to stdout.
|
||||
@@ -31,7 +31,7 @@ func Summary(value1, value2 types.Value) {
|
||||
plural = "values"
|
||||
}
|
||||
}
|
||||
// waitChan := make(chan struct{})
|
||||
|
||||
ch := make(chan diffSummaryProgress)
|
||||
go func() {
|
||||
diffSummary(ch, value1, value2)
|
||||
@@ -84,7 +84,6 @@ func diffSummaryList(ch chan<- diffSummaryProgress, v1, v2 types.List) {
|
||||
spliceChan := make(chan types.Splice)
|
||||
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
|
||||
|
||||
defer stop(stopChan)
|
||||
go func() {
|
||||
v2.Diff(v1, spliceChan, stopChan)
|
||||
close(spliceChan)
|
||||
@@ -119,15 +118,12 @@ func diffSummaryStructs(ch chan<- diffSummaryProgress, v1, v2 types.Struct) {
|
||||
})
|
||||
}
|
||||
|
||||
type diffFunc func(changeChan chan<- types.ValueChanged, stopChan <-chan struct{})
|
||||
|
||||
func diffSummaryValueChanged(ch chan<- diffSummaryProgress, oldSize, newSize uint64, f diffFunc) {
|
||||
ch <- diffSummaryProgress{OldSize: oldSize, NewSize: newSize}
|
||||
|
||||
changeChan := make(chan types.ValueChanged)
|
||||
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
|
||||
|
||||
defer stop(stopChan)
|
||||
go func() {
|
||||
f(changeChan, stopChan)
|
||||
close(changeChan)
|
||||
|
||||
@@ -59,6 +59,7 @@ func NewStreamingMap(vrw ValueReadWriter, kvs <-chan Value) <-chan Map {
|
||||
return outChan
|
||||
}
|
||||
|
||||
// Computes the diff from |last| to |m| using "best" algorithm, which balances returning results early vs completing quickly.
|
||||
func (m Map) Diff(last Map, changes chan<- ValueChanged, closeChan <-chan struct{}) {
|
||||
if m.Equals(last) {
|
||||
return
|
||||
@@ -66,6 +67,14 @@ func (m Map) Diff(last Map, changes chan<- ValueChanged, closeChan <-chan struct
|
||||
orderedSequenceDiffBest(last.seq, m.seq, changes, closeChan)
|
||||
}
|
||||
|
||||
// Like Diff() but uses a left-to-right streaming approach, optimised for returning results early, but not completing quickly.
|
||||
func (m Map) DiffLeftRight(last Map, changes chan<- ValueChanged, closeChan <-chan struct{}) {
|
||||
if m.Equals(last) {
|
||||
return
|
||||
}
|
||||
orderedSequenceDiffLeftRight(last.seq, m.seq, changes, closeChan)
|
||||
}
|
||||
|
||||
// Collection interface
|
||||
func (m Map) Len() uint64 {
|
||||
return m.seq.numLeaves()
|
||||
|
||||
@@ -30,6 +30,7 @@ func NewSet(v ...Value) Set {
|
||||
return newSet(seq.Done(nil).(orderedSequence))
|
||||
}
|
||||
|
||||
// Computes the diff from |last| to |s| using "best" algorithm, which balances returning results early vs completing quickly.
|
||||
func (s Set) Diff(last Set, changes chan<- ValueChanged, closeChan <-chan struct{}) {
|
||||
if s.Equals(last) {
|
||||
return
|
||||
@@ -37,6 +38,14 @@ func (s Set) Diff(last Set, changes chan<- ValueChanged, closeChan <-chan struct
|
||||
orderedSequenceDiffBest(last.seq, s.seq, changes, closeChan)
|
||||
}
|
||||
|
||||
// Like Diff() but uses a left-to-right streaming approach, optimised for returning results early, but not completing quickly.
|
||||
func (s Set) DiffLeftRight(last Set, changes chan<- ValueChanged, closeChan <-chan struct{}) {
|
||||
if s.Equals(last) {
|
||||
return
|
||||
}
|
||||
orderedSequenceDiffLeftRight(last.seq, s.seq, changes, closeChan)
|
||||
}
|
||||
|
||||
// Collection interface
|
||||
func (s Set) Len() uint64 {
|
||||
return s.seq.numLeaves()
|
||||
|
||||
Reference in New Issue
Block a user