Make sure that set/map diff has completed before returning from noms log (#2249)

This fixes https://github.com/attic-labs/noms/issues/2235 where set/map
diff was still running when noms log has finished, and already closed
its database. Commonly this would crash.

I've removed panic-based control flow to make error handling explicit,
and added a "DiffLeftRight" method to set/map so that noms log completes
ASAP. See the issue for details.
This commit is contained in:
Ben Kalman
2016-08-02 18:14:09 -07:00
committed by GitHub
parent c6d99f2fab
commit 7c10334472
4 changed files with 163 additions and 157 deletions

View File

@@ -7,9 +7,14 @@ package diff
import (
"io"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/types"
"github.com/dustin/go-humanize"
humanize "github.com/dustin/go-humanize"
)
type (
diffFunc func(changeChan chan<- types.ValueChanged, stopChan <-chan struct{})
lineFunc func(w io.Writer, op prefixOp, key, val types.Value) error
valueFunc func(k types.Value) types.Value
)
func shouldDescend(v1, v2 types.Value) bool {
@@ -18,231 +23,218 @@ func shouldDescend(v1, v2 types.Value) bool {
}
func Diff(w io.Writer, v1, v2 types.Value) error {
return d.Try(func() {
diff(w, types.NewPath(), nil, v1, v2)
})
return diff(w, types.NewPath(), nil, v1, v2)
}
func diff(w io.Writer, p types.Path, key, v1, v2 types.Value) {
if !v1.Equals(v2) {
if shouldDescend(v1, v2) {
switch v1.Type().Kind() {
case types.ListKind:
diffLists(w, p, v1.(types.List), v2.(types.List))
case types.MapKind:
diffMaps(w, p, v1.(types.Map), v2.(types.Map))
case types.SetKind:
diffSets(w, p, v1.(types.Set), v2.(types.Set))
case types.StructKind:
diffStructs(w, p, v1.(types.Struct), v2.(types.Struct))
default:
panic("Unrecognized type in diff function")
}
} else {
line(w, DEL, key, v1)
line(w, ADD, key, v2)
func diff(w io.Writer, p types.Path, key, v1, v2 types.Value) error {
if v1.Equals(v2) {
return nil
}
if shouldDescend(v1, v2) {
switch v1.Type().Kind() {
case types.ListKind:
return diffLists(w, p, v1.(types.List), v2.(types.List))
case types.MapKind:
return diffMaps(w, p, v1.(types.Map), v2.(types.Map))
case types.SetKind:
return diffSets(w, p, v1.(types.Set), v2.(types.Set))
case types.StructKind:
return diffStructs(w, p, v1.(types.Struct), v2.(types.Struct))
default:
panic("Unrecognized type in diff function")
}
}
line(w, DEL, key, v1)
return line(w, ADD, key, v2)
}
func diffLists(w io.Writer, p types.Path, v1, v2 types.List) {
func diffLists(w io.Writer, p types.Path, v1, v2 types.List) (err error) {
spliceChan := make(chan types.Splice)
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
defer stop(stopChan)
go func() {
v2.Diff(v1, spliceChan, stopChan)
close(spliceChan)
}()
wroteHeader := false
wroteHdr := false
for splice := range spliceChan {
if err != nil {
break
}
if splice.SpRemoved == splice.SpAdded {
// Heuristic: list only has modifications.
for i := uint64(0); i < splice.SpRemoved; i++ {
lastEl := v1.Get(splice.SpAt + i)
newEl := v2.Get(splice.SpFrom + i)
if shouldDescend(lastEl, newEl) {
idx := types.Number(splice.SpAt + i)
diff(w, p.AddIndex(idx), idx, lastEl, newEl)
writeFooter(w, &wroteHdr)
err = diff(w, p.AddIndex(idx), idx, lastEl, newEl)
} else {
wroteHeader = writeHeader(w, wroteHeader, p)
writeHeader(w, p, &wroteHdr)
line(w, DEL, nil, v1.Get(splice.SpAt+i))
line(w, ADD, nil, v2.Get(splice.SpFrom+i))
err = line(w, ADD, nil, v2.Get(splice.SpFrom+i))
}
}
} else {
for i := uint64(0); i < splice.SpRemoved; i++ {
wroteHeader = writeHeader(w, wroteHeader, p)
line(w, DEL, nil, v1.Get(splice.SpAt+i))
}
for i := uint64(0); i < splice.SpAdded; i++ {
wroteHeader = writeHeader(w, wroteHeader, p)
line(w, ADD, nil, v2.Get(splice.SpFrom+i))
}
continue
}
// Heuristic: list only has additions/removals.
for i := uint64(0); i < splice.SpRemoved && err == nil; i++ {
writeHeader(w, p, &wroteHdr)
err = line(w, DEL, nil, v1.Get(splice.SpAt+i))
}
for i := uint64(0); i < splice.SpAdded && err == nil; i++ {
writeHeader(w, p, &wroteHdr)
err = line(w, ADD, nil, v2.Get(splice.SpFrom+i))
}
}
writeFooter(w, wroteHeader)
err = writeFooter(w, &wroteHdr)
if err != nil {
stopChan <- struct{}{}
// Wait for diff to stop.
for range spliceChan {
}
}
return
}
func diffMaps(w io.Writer, p types.Path, v1, v2 types.Map) {
func diffMaps(w io.Writer, p types.Path, v1, v2 types.Map) error {
return diffOrdered(w, p, line, func(cc chan<- types.ValueChanged, sc <-chan struct{}) {
v2.DiffLeftRight(v1, cc, sc)
},
func(k types.Value) types.Value { return k },
func(k types.Value) types.Value { return v1.Get(k) },
func(k types.Value) types.Value { return v2.Get(k) },
)
}
func diffStructs(w io.Writer, p types.Path, v1, v2 types.Struct) error {
return diffOrdered(w, p, field, func(cc chan<- types.ValueChanged, sc <-chan struct{}) {
v2.Diff(v1, cc, sc)
},
func(k types.Value) types.Value { return k },
func(k types.Value) types.Value { return v1.Get(string(k.(types.String))) },
func(k types.Value) types.Value { return v2.Get(string(k.(types.String))) },
)
}
func diffSets(w io.Writer, p types.Path, v1, v2 types.Set) error {
return diffOrdered(w, p, line, func(cc chan<- types.ValueChanged, sc <-chan struct{}) {
v2.DiffLeftRight(v1, cc, sc)
},
func(k types.Value) types.Value { return nil },
func(k types.Value) types.Value { return k },
func(k types.Value) types.Value { return k },
)
}
func diffOrdered(w io.Writer, p types.Path, lf lineFunc, df diffFunc, kf, v1, v2 valueFunc) (err error) {
changeChan := make(chan types.ValueChanged)
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
defer stop(stopChan)
go func() {
v2.Diff(v1, changeChan, stopChan)
df(changeChan, stopChan)
close(changeChan)
}()
wroteHeader := false
wroteHdr := false
for change := range changeChan {
if err != nil {
break
}
k := kf(change.V)
switch change.ChangeType {
case types.DiffChangeAdded:
wroteHeader = writeHeader(w, wroteHeader, p)
line(w, ADD, change.V, v2.Get(change.V))
writeHeader(w, p, &wroteHdr)
err = lf(w, ADD, k, v2(change.V))
case types.DiffChangeRemoved:
wroteHeader = writeHeader(w, wroteHeader, p)
line(w, DEL, change.V, v1.Get(change.V))
writeHeader(w, p, &wroteHdr)
err = lf(w, DEL, k, v1(change.V))
case types.DiffChangeModified:
c1, c2 := v1.Get(change.V), v2.Get(change.V)
c1, c2 := v1(change.V), v2(change.V)
if shouldDescend(c1, c2) {
wroteHeader = writeFooter(w, wroteHeader)
diff(w, p.AddIndex(change.V), change.V, c1, c2)
writeFooter(w, &wroteHdr)
err = diff(w, p.AddIndex(k), change.V, c1, c2)
} else {
wroteHeader = writeHeader(w, wroteHeader, p)
line(w, DEL, change.V, c1)
line(w, ADD, change.V, c2)
writeHeader(w, p, &wroteHdr)
lf(w, DEL, k, c1)
err = lf(w, ADD, k, c2)
}
default:
panic("unknown change type")
}
}
writeFooter(w, wroteHeader)
}
writeFooter(w, &wroteHdr)
func diffStructs(w io.Writer, p types.Path, v1, v2 types.Struct) {
changeChan := make(chan types.ValueChanged)
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
defer stop(stopChan)
go func() {
v2.Diff(v1, changeChan, stopChan)
close(changeChan)
}()
wroteHeader := false
for change := range changeChan {
fn := string(change.V.(types.String))
switch change.ChangeType {
case types.DiffChangeAdded:
wroteHeader = writeHeader(w, wroteHeader, p)
field(w, ADD, change.V, v2.Get(fn))
case types.DiffChangeRemoved:
wroteHeader = writeHeader(w, wroteHeader, p)
field(w, DEL, change.V, v1.Get(fn))
case types.DiffChangeModified:
f1 := v1.Get(fn)
f2 := v2.Get(fn)
if shouldDescend(f1, f2) {
diff(w, p.AddField(fn), types.String(fn), f1, f2)
} else {
wroteHeader = writeHeader(w, wroteHeader, p)
field(w, DEL, change.V, f1)
field(w, ADD, change.V, f2)
}
if err != nil {
stopChan <- struct{}{}
// Wait for diff to stop.
for range changeChan {
}
}
writeFooter(w, wroteHeader)
return
}
func diffSets(w io.Writer, p types.Path, v1, v2 types.Set) {
changeChan := make(chan types.ValueChanged)
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
defer stop(stopChan)
go func() {
v2.Diff(v1, changeChan, stopChan)
close(changeChan)
}()
wroteHeader := false
for change := range changeChan {
wroteHeader = writeHeader(w, wroteHeader, p)
switch change.ChangeType {
case types.DiffChangeAdded:
line(w, ADD, nil, change.V)
case types.DiffChangeRemoved:
line(w, DEL, nil, change.V)
default:
panic("unknown change type")
}
func writeHeader(w io.Writer, p types.Path, wroteHdr *bool) error {
if *wroteHdr {
return nil
}
writeFooter(w, wroteHeader)
*wroteHdr = true
hdr := "(root)"
if len(p) > 0 {
hdr = p.String()
}
return write(w, []byte(hdr+" {\n"))
}
func line(w io.Writer, op prefixOp, key, val types.Value) {
func writeFooter(w io.Writer, wroteHdr *bool) error {
if !*wroteHdr {
return nil
}
*wroteHdr = false
return write(w, []byte(" }\n"))
}
func line(w io.Writer, op prefixOp, key, val types.Value) error {
pw := newPrefixWriter(w, op)
if key != nil {
writeEncodedValue(pw, key)
write(w, []byte(": "))
}
writeEncodedValue(pw, val)
write(w, []byte("\n"))
return write(w, []byte("\n"))
}
func field(w io.Writer, op prefixOp, name, val types.Value) {
func field(w io.Writer, op prefixOp, name, val types.Value) error {
pw := newPrefixWriter(w, op)
write(pw, []byte(name.(types.String)))
write(w, []byte(": "))
writeEncodedValue(pw, val)
write(w, []byte("\n"))
return write(w, []byte("\n"))
}
func writeHeader(w io.Writer, wroteHeader bool, p types.Path) bool {
if !wroteHeader {
if len(p) == 0 {
write(w, []byte("(root)"))
} else {
write(w, []byte(p.String()))
}
write(w, []byte(" {\n"))
func writeEncodedValue(w io.Writer, v types.Value) error {
if v.Type().Kind() != types.BlobKind {
return types.WriteEncodedValue(w, v)
}
return true
write(w, []byte("Blob ("))
write(w, []byte(humanize.Bytes(v.(types.Blob).Len())))
return write(w, []byte(")"))
}
func writeFooter(w io.Writer, wroteHeader bool) bool {
if wroteHeader {
write(w, []byte(" }\n"))
}
return false
}
func write(w io.Writer, b []byte) {
func write(w io.Writer, b []byte) error {
_, err := w.Write(b)
d.PanicIfError(err)
}
func writeEncodedValue(w io.Writer, v types.Value) {
if v.Type().Kind() == types.BlobKind {
w.Write([]byte("Blob ("))
w.Write([]byte(humanize.Bytes(v.(types.Blob).Len())))
w.Write([]byte(")"))
} else {
d.PanicIfError(types.WriteEncodedValue(w, v))
}
}
func writeEncodedValueWithTags(w io.Writer, v types.Value) {
d.PanicIfError(types.WriteEncodedValueWithTags(w, v))
}
func stop(ch chan<- struct{}) {
ch <- struct{}{}
return err
}

View File

@@ -6,7 +6,7 @@ import (
"github.com/attic-labs/noms/go/datas"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/status"
"github.com/dustin/go-humanize"
humanize "github.com/dustin/go-humanize"
)
// Summary prints a summary of the diff between two values to stdout.
@@ -31,7 +31,7 @@ func Summary(value1, value2 types.Value) {
plural = "values"
}
}
// waitChan := make(chan struct{})
ch := make(chan diffSummaryProgress)
go func() {
diffSummary(ch, value1, value2)
@@ -84,7 +84,6 @@ func diffSummaryList(ch chan<- diffSummaryProgress, v1, v2 types.List) {
spliceChan := make(chan types.Splice)
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
defer stop(stopChan)
go func() {
v2.Diff(v1, spliceChan, stopChan)
close(spliceChan)
@@ -119,15 +118,12 @@ func diffSummaryStructs(ch chan<- diffSummaryProgress, v1, v2 types.Struct) {
})
}
type diffFunc func(changeChan chan<- types.ValueChanged, stopChan <-chan struct{})
func diffSummaryValueChanged(ch chan<- diffSummaryProgress, oldSize, newSize uint64, f diffFunc) {
ch <- diffSummaryProgress{OldSize: oldSize, NewSize: newSize}
changeChan := make(chan types.ValueChanged)
stopChan := make(chan struct{}, 1) // buffer size of 1, so this won't block if diff already finished
defer stop(stopChan)
go func() {
f(changeChan, stopChan)
close(changeChan)

View File

@@ -59,6 +59,7 @@ func NewStreamingMap(vrw ValueReadWriter, kvs <-chan Value) <-chan Map {
return outChan
}
// Computes the diff from |last| to |m| using "best" algorithm, which balances returning results early vs completing quickly.
func (m Map) Diff(last Map, changes chan<- ValueChanged, closeChan <-chan struct{}) {
if m.Equals(last) {
return
@@ -66,6 +67,14 @@ func (m Map) Diff(last Map, changes chan<- ValueChanged, closeChan <-chan struct
orderedSequenceDiffBest(last.seq, m.seq, changes, closeChan)
}
// Like Diff() but uses a left-to-right streaming approach, optimised for returning results early, but not completing quickly.
func (m Map) DiffLeftRight(last Map, changes chan<- ValueChanged, closeChan <-chan struct{}) {
if m.Equals(last) {
return
}
orderedSequenceDiffLeftRight(last.seq, m.seq, changes, closeChan)
}
// Collection interface
func (m Map) Len() uint64 {
return m.seq.numLeaves()

View File

@@ -30,6 +30,7 @@ func NewSet(v ...Value) Set {
return newSet(seq.Done(nil).(orderedSequence))
}
// Computes the diff from |last| to |s| using "best" algorithm, which balances returning results early vs completing quickly.
func (s Set) Diff(last Set, changes chan<- ValueChanged, closeChan <-chan struct{}) {
if s.Equals(last) {
return
@@ -37,6 +38,14 @@ func (s Set) Diff(last Set, changes chan<- ValueChanged, closeChan <-chan struct
orderedSequenceDiffBest(last.seq, s.seq, changes, closeChan)
}
// Like Diff() but uses a left-to-right streaming approach, optimised for returning results early, but not completing quickly.
func (s Set) DiffLeftRight(last Set, changes chan<- ValueChanged, closeChan <-chan struct{}) {
if s.Equals(last) {
return
}
orderedSequenceDiffLeftRight(last.seq, s.seq, changes, closeChan)
}
// Collection interface
func (s Set) Len() uint64 {
return s.seq.numLeaves()