Fix auto fwt transform to not drop a row on flush (#2490)

This commit is contained in:
Vinai Rachakonda
2021-12-11 12:35:32 -08:00
committed by GitHub
parent ac7389b5b3
commit 57ab97e743
2 changed files with 59 additions and 23 deletions
@@ -74,35 +74,47 @@ RowLoop:
}
func (asTr *AutoSizingFWTTransformer) handleRow(r pipeline.RowWithProps, outChan chan<- pipeline.RowWithProps, badRowChan chan<- *pipeline.TransformRowFailure, stopChan <-chan struct{}) {
var err error
if asTr.rowBuffer == nil {
asTr.processRow(r, outChan, badRowChan)
} else if asTr.numSamples <= 0 || len(asTr.rowBuffer) < asTr.numSamples {
_, err := r.Row.IterSchema(asTr.sch, func(tag uint64, val types.Value) (stop bool, err error) {
if !types.IsNull(val) {
strVal := val.(types.String)
printWidth := StringWidth(string(strVal))
numRunes := len([]rune(string(strVal)))
if printWidth > asTr.printWidths[tag] {
asTr.printWidths[tag] = printWidth
}
if numRunes > asTr.maxRunes[tag] {
asTr.maxRunes[tag] = numRunes
}
}
return false, nil
})
if err != nil {
badRowChan <- &pipeline.TransformRowFailure{Row: r.Row, TransformName: "fwt", Details: err.Error()}
return
}
asTr.rowBuffer = append(asTr.rowBuffer, r)
err = asTr.formatAndAddToBuffer(r)
} else {
asTr.flush(outChan, badRowChan, stopChan)
err = asTr.formatAndAddToBuffer(r)
}
if err != nil {
badRowChan <- &pipeline.TransformRowFailure{Row: r.Row, TransformName: "fwt", Details: err.Error()}
return
}
}
func (asTr *AutoSizingFWTTransformer) formatAndAddToBuffer(r pipeline.RowWithProps) error {
_, err := r.Row.IterSchema(asTr.sch, func(tag uint64, val types.Value) (stop bool, err error) {
if !types.IsNull(val) {
strVal := val.(types.String)
printWidth := StringWidth(string(strVal))
numRunes := len([]rune(string(strVal)))
if printWidth > asTr.printWidths[tag] {
asTr.printWidths[tag] = printWidth
}
if numRunes > asTr.maxRunes[tag] {
asTr.maxRunes[tag] = numRunes
}
}
return false, nil
})
if err != nil {
return err
}
asTr.rowBuffer = append(asTr.rowBuffer, r)
return nil
}
func (asTr *AutoSizingFWTTransformer) flush(outChan chan<- pipeline.RowWithProps, badRowChan chan<- *pipeline.TransformRowFailure, stopChan <-chan struct{}) {
+24
View File
@@ -626,3 +626,27 @@ SQL
[ $status -eq 0 ]
[ "${lines[0]}" = 'ALTER TABLE `t` RENAME COLUMN `pk` TO `pk`;' ]
}
@test "diff: large diff does not drop rows" {
dolt sql -q "create table t(pk int primary key, val int)"
VALUES=""
for i in {1..1000}
do
if [ $i -eq 1 ]
then
VALUES="${VALUES}($i,$i)"
else
VALUES="${VALUES},($i,$i)"
fi
done
dolt sql -q "INSERT INTO t values $VALUES"
dolt commit -am "Add the initial rows"
dolt sql -q "UPDATE t set val = val + 1 WHERE pk < 10000"
dolt commit -am "make a bulk update creating a large diff"
run dolt diff HEAD~1
[ "${#lines[@]}" -eq 2007 ] # 2000 diffs + 6 for top rows before data + 1 for bottom row of table
}