Carnage continues

This commit is contained in:
Zach Musgrave
2022-09-12 15:24:15 -07:00
parent 660338507f
commit 57611e92f5
7 changed files with 0 additions and 1310 deletions
-124
View File
@@ -1,124 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package commands
import (
"context"
"errors"
"strings"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
)
type FilterFn = func(r row.Row) (matchesFilter bool)
func ParseWhere(sch schema.Schema, whereClause string) (FilterFn, error) {
if whereClause == "" {
return func(r row.Row) bool {
return true
}, nil
} else {
tokens := strings.Split(whereClause, "=")
if len(tokens) != 2 {
return nil, errors.New("'" + whereClause + "' is not in the format key=value")
}
key := tokens[0]
valStr := tokens[1]
col, ok := sch.GetAllCols().GetByName(key)
var cols []schema.Column
if !ok {
toCol, toOk := sch.GetAllCols().GetByName("to_" + key)
fromCol, fromOk := sch.GetAllCols().GetByName("from_" + key)
if !(toOk && fromOk) {
return nil, errors.New("where clause is invalid. '" + key + "' is not a known column.")
}
if fromCol.Kind != toCol.Kind {
panic("to col and from col are different types.")
}
cols = []schema.Column{toCol, fromCol}
} else {
cols = []schema.Column{col}
}
var tags []uint64
for _, curr := range cols {
tags = append(tags, curr.Tag)
}
var val types.Value
if typeinfo.IsStringType(cols[0].TypeInfo) {
val = types.String(valStr)
} else {
var err error
vrw := types.NewMemoryValueStore() // We don't want to persist anything, so we use an internal VRW
val, err = typeinfo.StringDefaultType.ConvertToType(context.Background(), vrw, cols[0].TypeInfo, types.String(valStr))
if err != nil {
return nil, errors.New("unable to convert '" + valStr + "' to " + col.TypeInfo.String())
}
}
return func(r row.Row) bool {
for _, tag := range tags {
rowVal, ok := r.GetColVal(tag)
if !ok {
continue
}
if val.Equals(rowVal) {
return true
}
}
return false
}, nil
}
}
type SelectTransform struct {
Pipeline *pipeline.Pipeline
filter FilterFn
limit int
count int
}
func NewSelTrans(filter FilterFn, limit int) *SelectTransform {
return &SelectTransform{filter: filter, limit: limit}
}
func (st *SelectTransform) LimitAndFilter(inRow row.Row, props pipeline.ReadableMap) ([]*pipeline.TransformedRowResult, string) {
if st.limit <= 0 || st.count < st.limit {
if st.filter(inRow) {
st.count++
return []*pipeline.TransformedRowResult{{RowData: inRow, PropertyUpdates: nil}}, ""
}
} else if st.count == st.limit {
st.Pipeline.NoMore()
}
return nil, ""
}
@@ -25,7 +25,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/set"
)
@@ -64,18 +63,6 @@ type DataMoverOptions interface {
DestName() string
}
type DataMoverCloser interface {
table.TableWriteCloser
Flush(context.Context) (*doltdb.RootValue, error)
}
type DataMover struct {
Rd table.ReadCloser
Transforms *pipeline.TransformCollection
Wr table.TableWriteCloser
ContOnErr bool
}
type DataMoverCreationErrType string
const (
@@ -1,353 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"sync"
"sync/atomic"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
)
// Buffer size of processing channels created by the pipeline
const channelSize = 1024
// InFunc is a pipeline input function that reads row data from a source and puts it in a channel.
type InFunc func(p *Pipeline, ch chan<- RowWithProps, badRowChan chan<- *TransformRowFailure, noMoreChan <-chan struct{})
// OutFunc is a pipeline output function that takes the data the pipeline has processed off of the channel.
type OutFunc func(p *Pipeline, ch <-chan RowWithProps, badRowChan chan<- *TransformRowFailure)
// BadRowCallback is a callback function that is called when a bad row is encountered. returning true from this
// function when called will quit the entire pipeline
type BadRowCallback func(*TransformRowFailure) (quit bool)
// Pipeline is a struct that manages the operation of a row processing pipeline, where data is read from some source
// and written to a channel by the InFunc. An optional series of transformation functions read from this output as their
// input, passing output to the next stage, ultimately to the OutFunc. Each transform has a name, and is referred to as
// a stage in the pipeline.
//
// Pipelines can be constructed in phases, with different call sites adding transformations or even redirecting output
// as required. Once a pipeline is started with Start(), all configuration methods will panic.
//
// Pipelines can be supplied with callbacks to run after they complete, which happens when output has finished writing,
// or when Abort() or StopWithError() is called.
//
// Pipelines must be cleaned up by a call to either Wait, Abort, or StopWithError, all of which run any deferred
// functions registered with the pipeline via calls to RunAfter (e.g. closing readers and writers).
//
// Ironically, not even a little thread safe.
type Pipeline struct {
// A wait group that will block until the pipeline is done.
wg *sync.WaitGroup
// A channel that will receive a message when the pipeline stops.
stopChan chan struct{}
// A channel for consumers to write to when there are no more input rows to process.
noMoreChan chan struct{}
// A channel for consumers to read from to handle bad rows.
badRowChan chan *TransformRowFailure
// A function to run on rows that cannot be transformed.
badRowCB BadRowCallback
// An error in the pipeline's operation, accessible after it finishes.
atomicErr atomic.Value
// The input function for the pipeline.
inFunc InFunc
// The output function for the pipeline.
outFunc OutFunc
// The series of transformations to apply, each of which has a name called the "stage" of the pipeline
stages *TransformCollection
// A map of stage name to input channel.
inputChansByStageName map[string]chan RowWithProps
// A collection of synthetic rows to insert into the pipeline at a particular stage, before any other pipelined
// input arrives to that stage.
syntheticRowsByStageName map[string][]RowWithProps
// A slice of cleanup functions to run when the pipeline finishes.
runAfterFuncs []func()
// A helper to run cleanup funcs exactly once.
runAfter func()
// Whether the pipeline is currently running
isRunning bool
}
// NewAsyncPipeline creates a Pipeline from a given InFunc, OutFunc, TransformCollection, and a BadRowCallback.
func NewAsyncPipeline(inFunc InFunc, outFunc OutFunc, stages *TransformCollection, badRowCB BadRowCallback) *Pipeline {
var wg sync.WaitGroup
return &Pipeline{
wg: &wg,
inFunc: inFunc,
outFunc: outFunc,
stages: stages,
badRowCB: badRowCB,
badRowChan: make(chan *TransformRowFailure, channelSize),
stopChan: make(chan struct{}),
noMoreChan: make(chan struct{}),
inputChansByStageName: make(map[string]chan RowWithProps),
syntheticRowsByStageName: make(map[string][]RowWithProps),
runAfter: func() {},
}
}
// NewPartialPipeline creates a pipeline stub that doesn't have an output func set on it yet. An OutFunc must be
// applied via a call to SetOutput before calling Start().
func NewPartialPipeline(inFunc InFunc) *Pipeline {
return NewAsyncPipeline(inFunc, nil, &TransformCollection{}, nil)
}
// AddStage adds a new named transform to the set of stages
func (p *Pipeline) AddStage(stage NamedTransform) {
if p.isRunning {
panic("cannot add stages to a running pipeline")
}
p.stages.AppendTransforms(stage)
}
// SetOutput sets the output function to the function given
func (p *Pipeline) SetOutput(outFunc OutFunc) {
if p.isRunning {
panic("cannot set output on a running pipeline")
}
p.outFunc = outFunc
}
// SetBadRowCallback sets the callback to run when a bad row is encountered to the callback given
func (p *Pipeline) SetBadRowCallback(callback BadRowCallback) {
if p.isRunning {
panic("cannot set bad row callback on a running pipeline")
}
p.badRowCB = callback
}
// InjectRow injects a row at a particular stage in the pipeline. The row will be processed before other pipeline input
// arrives.
func (p *Pipeline) InjectRow(stageName string, r row.Row) {
p.InjectRowWithProps(stageName, r, nil)
}
func (p *Pipeline) InjectRowWithProps(stageName string, r row.Row, props map[string]interface{}) {
if p.isRunning {
panic("cannot inject rows into a running pipeline")
}
var validStageName bool
for _, stage := range p.stages.Transforms {
if stage.Name == stageName {
validStageName = true
break
}
}
if !validStageName {
panic("unknown stage name " + stageName)
}
_, ok := p.syntheticRowsByStageName[stageName]
if !ok {
p.syntheticRowsByStageName[stageName] = make([]RowWithProps, 0, 1)
}
rowWithProps := NewRowWithProps(r, props)
p.syntheticRowsByStageName[stageName] = append(p.syntheticRowsByStageName[stageName], rowWithProps)
}
// Schedules the given function to run after the pipeline completes.
func (p *Pipeline) RunAfter(f func()) {
if p.isRunning {
panic("cannot add a RunAfter function to a running pipeline")
}
p.runAfterFuncs = append(p.runAfterFuncs, f)
}
// NoMore signals that the pipeline has no more input to process. Must be called exactly once by the consumer when there
// are no more input rows to process.
func (p *Pipeline) NoMore() {
defer func() {
// TODO zachmu: there is a bug in pipeline execution where a limit of 1 causes NoMore to be called more than
// once. This should be an error we don't recover from.
recover()
}()
close(p.noMoreChan)
}
// Starts the pipeline processing. Panics if the pipeline hasn't been set up completely yet.
func (p *Pipeline) Start() {
if p.isRunning {
panic("pipeline already started")
}
if p.inFunc == nil || p.outFunc == nil {
panic("pipeline started without input or output func")
}
in := make(chan RowWithProps, channelSize)
p.stopChan = make(chan struct{})
// Start all the transform stages, chaining the output of one to the input of the next.
curr := in
if p.stages != nil {
for i := 0; i < p.stages.NumTransforms(); i++ {
stage := p.stages.TransformAt(i)
p.inputChansByStageName[stage.Name] = curr
curr = transformAsync(stage.Func, p.wg, curr, p.badRowChan, p.stopChan)
}
}
// Inject all synthetic rows requested into their appropriate input channels.
for stageName, injectedRows := range p.syntheticRowsByStageName {
ch := p.inputChansByStageName[stageName]
for _, rowWithProps := range injectedRows {
ch <- rowWithProps
}
}
p.runAfter = runOnce(p.runAfterFuncs)
// Start all the async processing: the sink, the error handlers, then the source.
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.processBadRows()
}()
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.outFunc(p, curr, p.badRowChan)
close(p.badRowChan)
p.runAfter()
}()
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.inFunc(p, in, p.badRowChan, p.noMoreChan)
}()
p.isRunning = true
}
// Returns a function that runs each of the funcs given exactly once (calling the returned func more than once will not
// result in additional executions of the underlying funcs).
func runOnce(funcs []func()) func() {
mutex := sync.Mutex{}
alreadyRun := false
return func() {
defer mutex.Unlock()
mutex.Lock()
if alreadyRun {
return
}
for _, fn := range funcs {
fn()
}
alreadyRun = true
}
}
// Wait waits for the pipeline to complete and return any error that occurred during its execution.
func (p *Pipeline) Wait() error {
if !p.isRunning {
panic("cannot Wait() on a pipeline before a call to Start()")
}
p.wg.Wait()
p.isRunning = false
atomicErr := p.atomicErr.Load()
if atomicErr != nil {
return atomicErr.(error)
}
return nil
}
// Abort signals the pipeline to stop processing.
func (p *Pipeline) Abort() {
defer func() {
p.isRunning = false
}()
defer p.runAfter()
defer func() {
recover() // ignore multiple calls to close channels
}()
close(p.stopChan)
}
// StopWithErr provides a method by the pipeline can be stopped when an error is encountered. This would typically be
// done in InFuncs and OutFuncs
func (p *Pipeline) StopWithErr(err error) {
p.atomicErr.Store(err)
p.Abort()
}
// IsStopping returns true if the pipeline is currently stopping
func (p *Pipeline) IsStopping() bool {
// exit if stop
select {
case <-p.stopChan:
return true
default:
}
return false
}
// Processes all the errors that occur during the pipeline
func (p *Pipeline) processBadRows() {
if p.badRowCB != nil {
for {
select {
case bRow, ok := <-p.badRowChan:
if !ok {
return
}
quit := p.badRowCB(bRow)
if quit {
p.Abort()
return
}
case <-p.stopChan:
return
}
}
}
}
// Runs the ansync transform function given with the input channel given and returns its output channel.
func transformAsync(transformer TransformFunc, wg *sync.WaitGroup, inChan <-chan RowWithProps, badRowChan chan<- *TransformRowFailure, stopChan <-chan struct{}) chan RowWithProps {
outChan := make(chan RowWithProps, channelSize)
wg.Add(1)
go func() {
defer wg.Done()
defer close(outChan)
transformer(inChan, outChan, badRowChan, stopChan)
}()
return outChan
}
@@ -1,187 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"context"
"io"
"time"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
)
// SourceFunc is a function that will return a new row for each successive call until all it's rows are exhausted, at
// which point io.EOF should be returned
type SourceFunc func() (row.Row, ImmutableProperties, error)
// ProcFuncForSourceFunc is a helper method that creates an InFunc for a given SourceFunc. It takes care of channel
// processing, stop conditions, and error handling.
func ProcFuncForSourceFunc(sourceFunc SourceFunc) InFunc {
return func(p *Pipeline, ch chan<- RowWithProps, badRowChan chan<- *TransformRowFailure, noMoreChan <-chan struct{}) {
defer close(ch)
for !p.IsStopping() {
select {
case <-noMoreChan:
return
default:
break
}
r, props, err := sourceFunc()
// process read errors
if err != nil {
if err == io.EOF {
if r == nil {
return
}
} else if table.IsBadRow(err) {
badRowChan <- &TransformRowFailure{table.GetBadRowRow(err), nil, "reader", err.Error()}
} else {
p.StopWithErr(err)
return
}
} else if r == nil {
panic("Readers should not be returning nil without error. io.EOF should be used when done.")
}
if r != nil {
select {
case ch <- RowWithProps{r, props}:
case <-p.stopChan:
return
}
}
}
}
}
// ProcFuncForReader adapts a standard TableReader to work as an InFunc for a pipeline
func ProcFuncForReader(ctx context.Context, rd table.Reader) InFunc {
return ProcFuncForSourceFunc(func() (row.Row, ImmutableProperties, error) {
r, err := rd.ReadRow(ctx)
return r, NoProps, err
})
}
// SinkFunc is a function that will process the final transformed rows from a pipeline. This function will be called
// once for every row that makes it through the pipeline
type SinkFunc func(row.Row, ReadableMap) error
// ProcFuncForSinkFunc is a helper method that creates an OutFunc for a given SinkFunc. It takes care of channel
// processing, stop conditions, and error handling.
func ProcFuncForSinkFunc(sinkFunc SinkFunc) OutFunc {
return func(p *Pipeline, ch <-chan RowWithProps, badRowChan chan<- *TransformRowFailure) {
for {
if p.IsStopping() {
return
}
select {
case r, ok := <-ch:
if ok {
err := sinkFunc(r.Row, r.Props)
if err != nil {
if table.IsBadRow(err) ||
sql.ErrPrimaryKeyViolation.Is(err) ||
sql.ErrUniqueKeyViolation.Is(err) {
badRowChan <- &TransformRowFailure{r.Row, nil, "writer", err.Error()}
} else {
p.StopWithErr(err)
return
}
}
} else {
return
}
case <-time.After(100 * time.Millisecond):
// wake up and check stop condition
}
}
}
}
// SourceFuncForRows returns a source func that yields the rows given in order. Suitable for very small result sets
// that are statically defined or otherwise fit easily into memory.
func SourceFuncForRows(rows []row.Row) SourceFunc {
idx := 0
return func() (row.Row, ImmutableProperties, error) {
if idx >= len(rows) {
return nil, NoProps, io.EOF
}
r := rows[idx]
idx++
return r, NoProps, nil
}
}
// ProcFuncForWriter adapts a standard TableWriter to work as an OutFunc for a pipeline
func ProcFuncForWriter(ctx context.Context, wr table.RowWriter) OutFunc {
return ProcFuncForSinkFunc(func(r row.Row, props ReadableMap) error {
return wr.WriteRow(ctx, r)
})
}
// InFuncForChannel returns an InFunc that reads off the channel given.
func InFuncForChannel(rowChan <-chan row.Row) InFunc {
return func(p *Pipeline, ch chan<- RowWithProps, badRowChan chan<- *TransformRowFailure, noMoreChan <-chan struct{}) {
defer close(ch)
more := true
for more {
if p.IsStopping() {
return
}
select {
case <-noMoreChan:
more = false
case r, ok := <-rowChan:
if ok {
ch <- RowWithProps{Row: r, Props: NoProps}
} else {
return
}
}
}
// no more data will be written to rowChan, but still need to make sure what was written is drained.
if !more {
for {
if p.IsStopping() {
return
}
select {
case r, ok := <-rowChan:
if ok {
ch <- RowWithProps{Row: r, Props: NoProps}
} else {
return
}
default:
return
}
}
}
}
}
@@ -1,360 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"bytes"
"context"
"fmt"
"io"
"strconv"
"strings"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped"
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped/csv"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/store/types"
)
var inCSV = `first,last,film or show,year
Tim,Allen,The Santa Clause,1994
Tim,Allen,The Santa Clause 2,2002
Tim,Allen,The Santa Clause 3: The Escape Clause,2006
Ed,Asner,Elf,2003
Ed,Asner,Christmas on the Bayou,2013
Ed,Asner,Elf: Buddy's Musical Christmas,2014
Fred,Astaire,The Man in the Santa Claus Suit,1979
Richard,Attenborough,Miracle on 34th Street,1994
Steve,Bacic,Deck the Halls,2005
Alec,Baldwin,Rise of the Guardians,2012
Don,Beddoe,Bewitched (episode Humbug Not to Be Spoken Here - Season 4),1967
`
var outCSV = `first,last,film or show,year,pre2000,index
Tim,Allen,The Santa Clause,1994,true,0
Tim,Allen,The Santa Clause,1994,true,1
Tim,Allen,The Santa Clause 2,2002,false,0
Tim,Allen,The Santa Clause 2,2002,false,1
Tim,Allen,The Santa Clause 3: The Escape Clause,2006,false,0
Tim,Allen,The Santa Clause 3: The Escape Clause,2006,false,1
Ed,Asner,Elf,2003,false,0
Ed,Asner,Elf,2003,false,1
Ed,Asner,Christmas on the Bayou,2013,false,0
Ed,Asner,Christmas on the Bayou,2013,false,1
Ed,Asner,Elf: Buddy's Musical Christmas,2014,false,0
Ed,Asner,Elf: Buddy's Musical Christmas,2014,false,1
Fred,Astaire,The Man in the Santa Claus Suit,1979,true,0
Fred,Astaire,The Man in the Santa Claus Suit,1979,true,1
Richard,Attenborough,Miracle on 34th Street,1994,true,0
Richard,Attenborough,Miracle on 34th Street,1994,true,1
Steve,Bacic,Deck the Halls,2005,false,0
Steve,Bacic,Deck the Halls,2005,false,1
Alec,Baldwin,Rise of the Guardians,2012,false,0
Alec,Baldwin,Rise of the Guardians,2012,false,1
Don,Beddoe,Bewitched (episode Humbug Not to Be Spoken Here - Season 4),1967,true,0
Don,Beddoe,Bewitched (episode Humbug Not to Be Spoken Here - Season 4),1967,true,1`
var _, schIn = untyped.NewUntypedSchema("first", "last", "film or show", "year")
var nameToTag, schOut = untyped.NewUntypedSchema("first", "last", "film or show", "year", "pre2000", "index")
func TestPipeline(t *testing.T) {
buf := bytes.NewBuffer([]byte(inCSV))
outBuf := bytes.NewBuffer([]byte{})
afterFinishCalled := false
afterFinishFunc := func() {
afterFinishCalled = true
}
func() {
csvInfo := &csv.CSVFileInfo{Delim: ",", HasHeaderLine: true, Columns: nil, EscapeQuotes: true}
rd, _ := csv.NewCSVReader(types.Format_Default, io.NopCloser(buf), csvInfo)
wr, _ := csv.NewCSVWriter(iohelp.NopWrCloser(outBuf), schOut, csvInfo)
tc := NewTransformCollection(
NewNamedTransform("identity", identityTransFunc),
NewNamedTransform("label", labelTransFunc),
NewNamedTransform("dupe", dupeTransFunc),
NewNamedTransform("append", appendColumnPre2000TransFunc),
)
inProcFunc := ProcFuncForReader(context.Background(), rd)
outProcFunc := ProcFuncForWriter(context.Background(), wr)
p := NewAsyncPipeline(inProcFunc, outProcFunc, tc, nil)
p.RunAfter(func() { rd.Close(context.Background()) })
p.RunAfter(func() { wr.Close(context.Background()) })
p.RunAfter(afterFinishFunc)
p.Start()
p.Wait()
}()
assert.True(t, afterFinishCalled, "afterFinish func not called when pipeline ended")
assert.Equal(t, strings.TrimSpace(outCSV), strings.TrimSpace(outBuf.String()), "output doesn't match expectation")
}
func TestAddingStages(t *testing.T) {
buf := bytes.NewBuffer([]byte(inCSV))
outBuf := bytes.NewBuffer([]byte{})
afterFinishCalled := false
afterFinishFunc := func() {
afterFinishCalled = true
}
func() {
csvInfo := &csv.CSVFileInfo{Delim: ",", HasHeaderLine: true, Columns: nil, EscapeQuotes: true}
rd, _ := csv.NewCSVReader(types.Format_Default, io.NopCloser(buf), csvInfo)
wr, _ := csv.NewCSVWriter(iohelp.NopWrCloser(outBuf), schOut, csvInfo)
tc := NewTransformCollection(
NewNamedTransform("identity", identityTransFunc),
NewNamedTransform("label", labelTransFunc),
)
addedStages := []NamedTransform{
NewNamedTransform("dupe", dupeTransFunc),
NewNamedTransform("append", appendColumnPre2000TransFunc),
}
inProcFunc := ProcFuncForReader(context.Background(), rd)
outProcFunc := ProcFuncForWriter(context.Background(), wr)
p := NewAsyncPipeline(inProcFunc, outProcFunc, tc, nil)
for _, stage := range addedStages {
p.AddStage(stage)
}
p.RunAfter(func() { rd.Close(context.Background()) })
p.RunAfter(func() { wr.Close(context.Background()) })
p.RunAfter(afterFinishFunc)
p.Start()
p.Wait()
}()
assert.True(t, afterFinishCalled, "afterFinish func not called when pipeline ended")
assert.Equal(t, strings.TrimSpace(outCSV), strings.TrimSpace(outBuf.String()), "output doesn't match expectation")
}
func TestPartialPipeline(t *testing.T) {
buf := bytes.NewBuffer([]byte(inCSV))
outBuf := bytes.NewBuffer([]byte{})
afterFinishCalled := false
afterFinishFunc := func() {
afterFinishCalled = true
}
var newOutCsv = `first,last,film or show,year,pre2000,index
New,Row,InAppendStage,2999,true,0
AnotherNew,Row,InAppendStage,3000,true,1
Tim,Allen,The Santa Clause,1994,true,0
Tim,Allen,The Santa Clause,1994,true,1
Tim,Allen,The Santa Clause 2,2002,false,0
Tim,Allen,The Santa Clause 2,2002,false,1
Tim,Allen,The Santa Clause 3: The Escape Clause,2006,false,0
Tim,Allen,The Santa Clause 3: The Escape Clause,2006,false,1
Ed,Asner,Elf,2003,false,0
Ed,Asner,Elf,2003,false,1
Ed,Asner,Christmas on the Bayou,2013,false,0
Ed,Asner,Christmas on the Bayou,2013,false,1
Ed,Asner,Elf: Buddy's Musical Christmas,2014,false,0
Ed,Asner,Elf: Buddy's Musical Christmas,2014,false,1
Fred,Astaire,The Man in the Santa Claus Suit,1979,true,0
Fred,Astaire,The Man in the Santa Claus Suit,1979,true,1
Richard,Attenborough,Miracle on 34th Street,1994,true,0
Richard,Attenborough,Miracle on 34th Street,1994,true,1
Steve,Bacic,Deck the Halls,2005,false,0
Steve,Bacic,Deck the Halls,2005,false,1
Alec,Baldwin,Rise of the Guardians,2012,false,0
Alec,Baldwin,Rise of the Guardians,2012,false,1
Don,Beddoe,Bewitched (episode Humbug Not to Be Spoken Here - Season 4),1967,true,0
Don,Beddoe,Bewitched (episode Humbug Not to Be Spoken Here - Season 4),1967,true,1`
func() {
csvInfo := &csv.CSVFileInfo{Delim: ",", HasHeaderLine: true, Columns: nil, EscapeQuotes: true}
rd, _ := csv.NewCSVReader(types.Format_Default, io.NopCloser(buf), csvInfo)
wr, _ := csv.NewCSVWriter(iohelp.NopWrCloser(outBuf), schOut, csvInfo)
addedStages := []NamedTransform{
NewNamedTransform("identity", identityTransFunc),
NewNamedTransform("label", labelTransFunc),
NewNamedTransform("dupe", dupeTransFunc),
NewNamedTransform("append", appendColumnPre2000TransFunc),
}
inProcFunc := ProcFuncForReader(context.Background(), rd)
outProcFunc := ProcFuncForWriter(context.Background(), wr)
p := NewPartialPipeline(inProcFunc)
for _, stage := range addedStages {
p.AddStage(stage)
}
// Can't start the pipeline until setting a sink
assert.Panics(t, func() {
p.Start()
})
p.SetOutput(outProcFunc)
//New,Row,InAppendStage,2999,true,0
var injectedColumns = map[uint64]string{
0: "New",
1: "Row",
2: "InAppendStage",
3: "2999",
4: "true",
5: "0",
}
injectedRow, err := untyped.NewRowFromTaggedStrings(types.Format_Default, schOut, injectedColumns)
assert.NoError(t, err)
p.InjectRow("append", injectedRow)
//AnotherNew,Row,InAppendStage,3000,true,1
injectedColumns = map[uint64]string{
0: "AnotherNew",
1: "Row",
2: "InAppendStage",
3: "3000",
4: "true",
5: "1",
}
injectedRow, err = untyped.NewRowFromTaggedStrings(types.Format_Default, schOut, injectedColumns)
assert.NoError(t, err)
p.InjectRow("append", injectedRow)
p.RunAfter(func() { rd.Close(context.Background()) })
p.RunAfter(func() { wr.Close(context.Background()) })
p.RunAfter(afterFinishFunc)
p.Start()
// Now that the pipeline is started, other calls to set it up should panic
assert.Panics(t, func() {
p.SetOutput(func(p *Pipeline, ch <-chan RowWithProps, badRowChan chan<- *TransformRowFailure) {
})
})
assert.Panics(t, func() {
p.AddStage(NewNamedTransform("identity2", identityTransFunc))
})
assert.Panics(t, func() {
p.InjectRow("identity", injectedRow)
})
p.Wait()
}()
assert.True(t, afterFinishCalled, "afterFinish func not called when pipeline ended")
assert.Equal(t, strings.TrimSpace(newOutCsv), strings.TrimSpace(outBuf.String()), "output does not match expectation")
}
func TestAbort(t *testing.T) {
buf := bytes.NewBuffer([]byte(inCSV))
outBuf := bytes.NewBuffer([]byte{})
afterFinishCalled := false
afterFinishFunc := func() {
afterFinishCalled = true
}
func() {
csvInfo := &csv.CSVFileInfo{Delim: ",", HasHeaderLine: true, Columns: nil, EscapeQuotes: true}
rd, _ := csv.NewCSVReader(types.Format_Default, io.NopCloser(buf), csvInfo)
wr, _ := csv.NewCSVWriter(iohelp.NopWrCloser(outBuf), schOut, csvInfo)
var wg = sync.WaitGroup{}
tc := NewTransformCollection(
NewNamedTransform("identity", identityTransFunc),
NewNamedTransform("dies", hangs(&wg)),
)
inProcFunc := ProcFuncForReader(context.Background(), rd)
outProcFunc := ProcFuncForWriter(context.Background(), wr)
p := NewAsyncPipeline(inProcFunc, outProcFunc, tc, nil)
p.RunAfter(func() { rd.Close(context.Background()) })
p.RunAfter(func() { wr.Close(context.Background()) })
p.RunAfter(afterFinishFunc)
p.Start()
wg.Wait()
p.Abort()
}()
assert.True(t, afterFinishCalled, "afterFinish func not called when pipeline ended")
}
// Returns a function that hangs right after signalling the given WaitGroup that it's done
func hangs(wg *sync.WaitGroup) func(inRow row.Row, props ReadableMap) ([]*TransformedRowResult, string) {
wg.Add(1)
return func(inRow row.Row, props ReadableMap) (results []*TransformedRowResult, s string) {
i := 0
fmt.Println("about to call done()")
wg.Done()
for {
i++
}
}
}
func identityTransFunc(inRow row.Row, props ReadableMap) ([]*TransformedRowResult, string) {
return []*TransformedRowResult{{inRow, nil}}, ""
}
func labelTransFunc(inRow row.Row, props ReadableMap) ([]*TransformedRowResult, string) {
val, _ := inRow.GetColVal(nameToTag["year"])
year, _ := strconv.ParseInt(string(val.(types.String)), 10, 32)
return []*TransformedRowResult{
{inRow, map[string]interface{}{"pre2000": year < 2000}},
}, ""
}
func dupeTransFunc(inRow row.Row, props ReadableMap) ([]*TransformedRowResult, string) {
r1, _ := inRow.SetColVal(nameToTag["index"], types.String("0"), schOut)
r2, _ := inRow.SetColVal(nameToTag["index"], types.String("1"), schOut)
return []*TransformedRowResult{
{r1, map[string]interface{}{"dupe_index": 1}},
{r2, map[string]interface{}{"dupe_index": 2}},
}, ""
}
func appendColumnPre2000TransFunc(inRow row.Row, props ReadableMap) (rowData []*TransformedRowResult, badRowDetails string) {
labelval, _ := props.Get("pre2000")
isPre2000Str := "false"
if boolVal, ok := labelval.(bool); ok && boolVal {
isPre2000Str = "true"
}
// Update the column value if it's not already present
var r1 row.Row = inRow
if _, ok := inRow.GetColVal(nameToTag["pre2000"]); !ok {
r1, _ = inRow.SetColVal(nameToTag["pre2000"], types.String(isPre2000Str), schOut)
}
return []*TransformedRowResult{
{r1, nil},
}, ""
}
@@ -1,162 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fwt
import (
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
"github.com/dolthub/dolt/go/store/types"
)
// AutoSizingFWTTransformer samples rows to automatically determine maximum column widths to provide to FWTTransformer.
type AutoSizingFWTTransformer struct {
// The number of rows to sample to determine column widths
numSamples int
// A map of column tag to max print width
printWidths map[uint64]int
// A map of column tag to max number of runes
maxRunes map[uint64]int
// A buffer of rows to process
rowBuffer []pipeline.RowWithProps
// The schema being examined
sch schema.Schema
// The behavior to use for a value that's too long to print
tooLngBhv TooLongBehavior
// The underlying fixed width transformer being assembled by row sampling.
fwtTr *FWTTransformer
}
func NewAutoSizingFWTTransformer(sch schema.Schema, tooLngBhv TooLongBehavior, numSamples int) *AutoSizingFWTTransformer {
return &AutoSizingFWTTransformer{
numSamples: numSamples,
printWidths: make(map[uint64]int, sch.GetAllCols().Size()),
maxRunes: make(map[uint64]int, sch.GetAllCols().Size()),
rowBuffer: make([]pipeline.RowWithProps, 0, 128),
sch: sch,
tooLngBhv: tooLngBhv,
}
}
func (asTr *AutoSizingFWTTransformer) TransformToFWT(inChan <-chan pipeline.RowWithProps, outChan chan<- pipeline.RowWithProps, badRowChan chan<- *pipeline.TransformRowFailure, stopChan <-chan struct{}) {
RowLoop:
for {
select {
case <-stopChan:
return
default:
}
select {
case r, ok := <-inChan:
if ok {
asTr.handleRow(r, outChan, badRowChan, stopChan)
} else {
break RowLoop
}
case <-stopChan:
return
}
}
asTr.flush(outChan, badRowChan, stopChan)
}
func (asTr *AutoSizingFWTTransformer) handleRow(r pipeline.RowWithProps, outChan chan<- pipeline.RowWithProps, badRowChan chan<- *pipeline.TransformRowFailure, stopChan <-chan struct{}) {
var err error
if asTr.rowBuffer == nil {
asTr.processRow(r, outChan, badRowChan)
} else if asTr.numSamples <= 0 || len(asTr.rowBuffer) < asTr.numSamples {
err = asTr.formatAndAddToBuffer(r)
} else {
asTr.flush(outChan, badRowChan, stopChan)
err = asTr.formatAndAddToBuffer(r)
}
if err != nil {
badRowChan <- &pipeline.TransformRowFailure{Row: r.Row, TransformName: "fwt", Details: err.Error()}
return
}
}
func (asTr *AutoSizingFWTTransformer) formatAndAddToBuffer(r pipeline.RowWithProps) error {
_, err := r.Row.IterSchema(asTr.sch, func(tag uint64, val types.Value) (stop bool, err error) {
if !types.IsNull(val) {
strVal := val.(types.String)
printWidth := StringWidth(string(strVal))
numRunes := len([]rune(string(strVal)))
if printWidth > asTr.printWidths[tag] {
asTr.printWidths[tag] = printWidth
}
if numRunes > asTr.maxRunes[tag] {
asTr.maxRunes[tag] = numRunes
}
}
return false, nil
})
if err != nil {
return err
}
asTr.rowBuffer = append(asTr.rowBuffer, r)
return nil
}
func (asTr *AutoSizingFWTTransformer) flush(outChan chan<- pipeline.RowWithProps, badRowChan chan<- *pipeline.TransformRowFailure, stopChan <-chan struct{}) {
if asTr.fwtTr == nil {
fwf := FixedWidthFormatterForSchema(asTr.sch, asTr.tooLngBhv, asTr.printWidths, asTr.maxRunes)
asTr.fwtTr = NewFWTTransformer(asTr.sch, fwf)
}
for i := 0; i < len(asTr.rowBuffer); i++ {
asTr.processRow(asTr.rowBuffer[i], outChan, badRowChan)
if i%100 == 0 {
select {
case <-stopChan:
return
default:
}
}
}
asTr.rowBuffer = nil
return
}
func (asTr *AutoSizingFWTTransformer) processRow(rowWithProps pipeline.RowWithProps, outChan chan<- pipeline.RowWithProps, badRowChan chan<- *pipeline.TransformRowFailure) {
rds, errMsg := asTr.fwtTr.Transform(rowWithProps.Row, rowWithProps.Props)
if errMsg != "" {
badRowChan <- &pipeline.TransformRowFailure{
Row: rowWithProps.Row,
TransformName: "Auto Sizing Fixed Width Transform",
Details: errMsg,
}
} else if len(rds) == 1 {
propUpdates := rds[0].PropertyUpdates
outProps := rowWithProps.Props
if len(propUpdates) > 0 {
outProps = outProps.Set(propUpdates)
}
outRow := pipeline.RowWithProps{Row: rds[0].RowData, Props: outProps}
outChan <- outRow
}
}
@@ -1,111 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fwt
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
"github.com/dolthub/dolt/go/store/types"
)
func TestHandleRow(t *testing.T) {
tests := []struct {
name string
inputRows []pipeline.RowWithProps
expectedRows []pipeline.RowWithProps
}{
{
name: "already fixed width",
inputRows: rs(
testRow(t, "12345", "12345"),
testRow(t, "12345", "12345"),
),
expectedRows: rs(
testRow(t, "12345", "12345"),
testRow(t, "12345", "12345"),
),
},
{
name: "pad right",
inputRows: rs(
testRow(t, "a", "a"),
testRow(t, "12345", "12345"),
),
expectedRows: rs(
testRow(t, "a ", "a "),
testRow(t, "12345", "12345"),
),
},
// This could be a lot better, but it's exactly as broken as the MySQL shell so we're leaving it as is.
{
name: "embedded newlines",
inputRows: rs(
testRow(t, "aaaaa\naaaaa", "a"),
testRow(t, "12345", "12345\n12345"),
),
expectedRows: rs(
testRow(t, "aaaaa\naaaaa", "a "),
testRow(t, "12345 ", "12345\n12345"),
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
transformer := NewAutoSizingFWTTransformer(testSchema(), PrintAllWhenTooLong, 100)
outChan := make(chan pipeline.RowWithProps)
badRowChan := make(chan *pipeline.TransformRowFailure)
stopChan := make(chan struct{})
go func() {
for _, r := range tt.inputRows {
transformer.handleRow(r, outChan, badRowChan, stopChan)
}
transformer.flush(outChan, badRowChan, stopChan)
close(outChan)
}()
var outputRows []pipeline.RowWithProps
for r := range outChan {
outputRows = append(outputRows, r)
}
assert.Equal(t, tt.expectedRows, outputRows)
})
}
}
func testSchema() schema.Schema {
col1 := schema.NewColumn("col1", 0, types.StringKind, false)
col2 := schema.NewColumn("col2", 1, types.StringKind, false)
colColl := schema.NewColCollection(col1, col2)
return schema.UnkeyedSchemaFromCols(colColl)
}
func testRow(t *testing.T, col1, col2 string) pipeline.RowWithProps {
taggedVals := row.TaggedValues{0: types.String(col1), 1: types.String(col2)}
r, err := row.New(types.Format_Default, testSchema(), taggedVals)
assert.NoError(t, err)
return pipeline.RowWithProps{Row: r, Props: pipeline.NoProps}
}
func rs(rs ...pipeline.RowWithProps) []pipeline.RowWithProps {
return rs
}