mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-24 00:59:41 -06:00
restoring applied edit stats, still used in table import
This commit is contained in:
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/dolthub/vitess/go/sqltypes"
|
||||
"github.com/fatih/color"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/text/message"
|
||||
"gopkg.in/src-d/go-errors.v1"
|
||||
|
||||
"github.com/dolthub/dolt/go/cmd/dolt/cli"
|
||||
@@ -50,6 +51,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/funcitr"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
eventsapi "github.com/dolthub/eventsapi_schema/dolt/services/eventsapi/v1alpha1"
|
||||
)
|
||||
|
||||
@@ -553,6 +555,16 @@ func (cmd ImportCmd) Exec(ctx context.Context, commandStr string, args []string,
|
||||
return 0
|
||||
}
|
||||
|
||||
var displayStrLen int
|
||||
|
||||
func importStatsCB(stats types.AppliedEditStats) {
|
||||
noEffect := stats.NonExistentDeletes + stats.SameVal
|
||||
total := noEffect + stats.Modifications + stats.Additions
|
||||
p := message.NewPrinter(message.MatchLanguage("en")) // adds commas
|
||||
displayStr := p.Sprintf("Rows Processed: %d, Additions: %d, Modifications: %d, Had No Effect: %d", total, stats.Additions, stats.Modifications, noEffect)
|
||||
displayStrLen = cli.DeleteAndPrint(displayStrLen, displayStr)
|
||||
}
|
||||
|
||||
func newImportDataReader(ctx context.Context, root doltdb.RootValue, dEnv *env.DoltEnv, impOpts *importOptions) (table.SqlRowReader, *mvdata.DataMoverCreationError) {
|
||||
var err error
|
||||
|
||||
@@ -631,7 +643,7 @@ func newImportSqlEngineMover(ctx *sql.Context, root doltdb.RootValue, dEnv *env.
|
||||
}
|
||||
}
|
||||
|
||||
mv, err := mvdata.NewSqlEngineTableWriter(ctx, engine, tableSchema, rowOperationSchema, moveOps)
|
||||
mv, err := mvdata.NewSqlEngineTableWriter(ctx, engine, tableSchema, rowOperationSchema, moveOps, importStatsCB)
|
||||
if err != nil {
|
||||
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateWriterErr, Cause: err}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/overrides"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -49,6 +51,8 @@ type SqlEngineTableWriter struct {
|
||||
force bool
|
||||
disableFks bool
|
||||
|
||||
statsCB noms.StatsCB
|
||||
stats types.AppliedEditStats
|
||||
statOps int32
|
||||
|
||||
importOption TableImportOp
|
||||
@@ -56,12 +60,7 @@ type SqlEngineTableWriter struct {
|
||||
rowOperationSchema sql.PrimaryKeySchema
|
||||
}
|
||||
|
||||
func NewSqlEngineTableWriter(
|
||||
ctx *sql.Context,
|
||||
engine *sqle.Engine,
|
||||
createTableSchema, rowOperationSchema schema.Schema,
|
||||
options *MoverOptions,
|
||||
) (*SqlEngineTableWriter, error) {
|
||||
func NewSqlEngineTableWriter(ctx *sql.Context, engine *sqle.Engine, createTableSchema, rowOperationSchema schema.Schema, options *MoverOptions, statsCB noms.StatsCB) (*SqlEngineTableWriter, error) {
|
||||
if engine.IsReadOnly() {
|
||||
// SqlEngineTableWriter does not respect read only mode
|
||||
return nil, analyzererrors.ErrReadOnlyDatabase.New(ctx.GetCurrentDatabase())
|
||||
@@ -87,6 +86,8 @@ func NewSqlEngineTableWriter(
|
||||
database: ctx.GetCurrentDatabase(),
|
||||
tableName: options.TableToWriteTo,
|
||||
|
||||
statsCB: statsCB,
|
||||
|
||||
importOption: options.Operation,
|
||||
tableSchema: doltCreateTableSchema,
|
||||
rowOperationSchema: doltRowOperationSchema,
|
||||
@@ -116,6 +117,28 @@ func (s *SqlEngineTableWriter) WriteRows(ctx context.Context, inputChannel chan
|
||||
return err
|
||||
}
|
||||
|
||||
updateStats := func(row sql.Row) {
|
||||
if row == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If the length of the row does not match the schema then we have an update operation.
|
||||
if len(row) != len(s.tableSchema.Schema) {
|
||||
oldRow := row[:len(row)/2]
|
||||
newRow := row[len(row)/2:]
|
||||
|
||||
if ok, err := oldRow.Equals(s.sqlCtx, newRow, s.tableSchema.Schema); err == nil {
|
||||
if ok {
|
||||
s.stats.SameVal++
|
||||
} else {
|
||||
s.stats.Modifications++
|
||||
}
|
||||
}
|
||||
} else {
|
||||
s.stats.Additions++
|
||||
}
|
||||
}
|
||||
|
||||
insertOrUpdateOperation, err := s.getInsertNode(inputChannel, false)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -153,15 +176,24 @@ func (s *SqlEngineTableWriter) WriteRows(ctx context.Context, inputChannel chan
|
||||
|
||||
line := 1
|
||||
for {
|
||||
_, err := iter.Next(s.sqlCtx)
|
||||
if s.statsCB != nil && atomic.LoadInt32(&s.statOps) >= tableWriterStatUpdateRate {
|
||||
atomic.StoreInt32(&s.statOps, 0)
|
||||
s.statsCB(s.stats)
|
||||
}
|
||||
|
||||
row, err := iter.Next(s.sqlCtx)
|
||||
line += 1
|
||||
|
||||
// All other errors are handled by the errorHandler
|
||||
if err == nil {
|
||||
_ = atomic.AddInt32(&s.statOps, 1)
|
||||
updateStats(row)
|
||||
} else if err == io.EOF {
|
||||
atomic.LoadInt32(&s.statOps)
|
||||
atomic.StoreInt32(&s.statOps, 0)
|
||||
if s.statsCB != nil {
|
||||
s.statsCB(s.stats)
|
||||
}
|
||||
|
||||
return err
|
||||
} else {
|
||||
|
||||
@@ -79,6 +79,26 @@ const (
|
||||
batchSizeMax = 5000
|
||||
)
|
||||
|
||||
// AppliedEditStats contains statistics on what edits were applied in types.ApplyEdits
|
||||
type AppliedEditStats struct {
|
||||
// Additions counts the number of elements added to the map
|
||||
Additions int64
|
||||
|
||||
// Modifications counts the number of map entries that were modified
|
||||
Modifications int64
|
||||
|
||||
// SamVal counts the number of edits that had no impact because a value was set to the same value that is already
|
||||
// stored in the map
|
||||
SameVal int64
|
||||
|
||||
// Deletions counts the number of items deleted from the map
|
||||
Deletions int64
|
||||
|
||||
// NonexistentDeletes counts the number of items where a deletion was attempted, but the key didn't exist in the map
|
||||
// so there was no impact
|
||||
NonExistentDeletes int64
|
||||
}
|
||||
|
||||
// ApplyEdits applies all the edits to a given Map and returns the resulting map, and some statistics about the edits
|
||||
// that were applied.
|
||||
func ApplyEdits(ctx context.Context, edits EditProvider, m Map) (Map, error) {
|
||||
|
||||
Reference in New Issue
Block a user