mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-27 23:51:59 -05:00
streaming map edits
This commit is contained in:
@@ -60,7 +60,7 @@ function export_tables() {
|
||||
places
|
||||
do
|
||||
dolt table export "$table" "$table$1.csv"
|
||||
dolt sql -r csv -q "select * from $table" > "$table$1.sql.csv"
|
||||
dolt sql -r csv -q "select * from $table" | sed 's/<NULL>//g' > "$table$1.sql.csv"
|
||||
done
|
||||
}
|
||||
|
||||
@@ -99,7 +99,7 @@ local_bin="`pwd`"/"$bin"
|
||||
PATH="$local_bin":"$PATH" dolt clone Liquidata/corona-virus
|
||||
pushd "corona-virus"
|
||||
PATH="$local_bin":"$PATH" export_tables "-pre"
|
||||
dolt migrate
|
||||
time dolt migrate
|
||||
export_tables "-post"
|
||||
diff_tables
|
||||
echo "success!"
|
||||
|
||||
@@ -17,6 +17,7 @@ package rebase
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed/noms"
|
||||
"time"
|
||||
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/diff"
|
||||
@@ -404,7 +405,7 @@ func replayCommitWithNewTag(ctx context.Context, root, parentRoot, rebasedParent
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rebasedRows, err := replayRowDiffs(ctx, rebasedSch, rows, parentRows, rebasedParentRows, tableMapping)
|
||||
rebasedRows, err := replayRowDiffs(ctx, rebasedParentRoot.VRW(), rebasedSch, rows, parentRows, rebasedParentRows, tableMapping)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -442,7 +443,7 @@ func replayCommitWithNewTag(ctx context.Context, root, parentRoot, rebasedParent
|
||||
return newRoot, nil
|
||||
}
|
||||
|
||||
func replayRowDiffs(ctx context.Context, rSch schema.Schema, rows, parentRows, rebasedParentRows types.Map, tagMapping map[uint64]uint64) (types.Map, error) {
|
||||
func replayRowDiffs(ctx context.Context, vrw types.ValueReadWriter, rSch schema.Schema, rows, parentRows, rebasedParentRows types.Map, tagMapping map[uint64]uint64) (types.Map, error) {
|
||||
|
||||
unmappedTags := set.NewUint64Set(rSch.GetAllCols().Tags)
|
||||
tm := make(map[uint64]uint64)
|
||||
@@ -454,8 +455,7 @@ func replayRowDiffs(ctx context.Context, rSch schema.Schema, rows, parentRows, r
|
||||
tm[t] = t
|
||||
}
|
||||
|
||||
// we will apply modified differences to the rebasedParent
|
||||
rebasedRowEditor := rebasedParentRows.Edit()
|
||||
nmu := noms.NewNomsMapUpdater(ctx, vrw, rebasedParentRows, rSch, func(stats types.AppliedEditStats) {})
|
||||
|
||||
ad := diff.NewAsyncDiffer(diffBufSize)
|
||||
// get all differences (including merges) between original commit and its parent
|
||||
@@ -486,16 +486,25 @@ func replayRowDiffs(ctx context.Context, rSch schema.Schema, rows, parentRows, r
|
||||
|
||||
switch d.ChangeType {
|
||||
case types.DiffChangeAdded:
|
||||
rebasedRowEditor.Set(key, newVal)
|
||||
err = nmu.WriteEdit(ctx, key, newVal)
|
||||
case types.DiffChangeRemoved:
|
||||
rebasedRowEditor.Remove(key)
|
||||
err = nmu.WriteEdit(ctx, key, nil)
|
||||
case types.DiffChangeModified:
|
||||
rebasedRowEditor.Set(key, newVal)
|
||||
err = nmu.WriteEdit(ctx, key, newVal)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return types.EmptyMap, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rebasedRowEditor.Map(ctx)
|
||||
err := nmu.Close(ctx)
|
||||
if err != nil {
|
||||
return types.EmptyMap, err
|
||||
}
|
||||
|
||||
return *nmu.GetMap(), nil
|
||||
}
|
||||
|
||||
func dropValsForDeletedColumns(ctx context.Context, nbf *types.NomsBinFormat, rows types.Map, sch, parentSch schema.Schema) (types.Map, error) {
|
||||
|
||||
@@ -132,6 +132,42 @@ func (nmu *NomsMapUpdater) WriteRow(ctx context.Context, r row.Row) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
// WriteRow will write a row to a table
|
||||
func (nmu *NomsMapUpdater) WriteEdit(ctx context.Context, pk types.LesserValuable, fieldVals types.Valuable) error {
|
||||
if nmu.acc == nil {
|
||||
return errors.New("Attempting to write after closing.")
|
||||
}
|
||||
|
||||
if err := nmu.ae.Get(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := func() error {
|
||||
nmu.acc.AddEdit(pk, fieldVals)
|
||||
nmu.count++
|
||||
|
||||
if nmu.count%maxEdits == 0 {
|
||||
edits, err := nmu.acc.FinishedEditing()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nmu.mapChan <- edits
|
||||
nmu.acc = types.CreateEditAccForMapEdits(nmu.vrw.Format())
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close should flush all writes, release resources being held
|
||||
func (nmu *NomsMapUpdater) Close(ctx context.Context) error {
|
||||
if nmu.result != nil {
|
||||
|
||||
Reference in New Issue
Block a user