removing nil row at end

This commit is contained in:
James Cor
2022-03-07 11:24:33 -08:00
parent eb5650cd39
commit 06892bb6ce
5 changed files with 8 additions and 15 deletions

View File

@@ -57,7 +57,6 @@ func (e *DataMoverPipeline) Execute() error {
for {
row, err := e.rd.ReadSqlRow(e.ctx)
if err == io.EOF {
parsedRowChan <- nil
return nil
}

View File

@@ -124,9 +124,6 @@ func (jsonw *JSONWriter) WriteRow(ctx context.Context, r row.Row) error {
}
func (jsonw *JSONWriter) WriteSqlRow(ctx context.Context, row sql.Row) error {
if row == nil {
return nil
}
allCols := jsonw.sch.GetAllCols()
colValMap := make(map[string]interface{}, allCols.Size())
if err := allCols.Iter(func(tag uint64, col schema.Column) (stop bool, err error) {

View File

@@ -131,9 +131,6 @@ func (pwr *ParquetWriter) WriteRow(ctx context.Context, r row.Row) error {
}
func (pwr *ParquetWriter) WriteSqlRow(ctx context.Context, r sql.Row) error {
if r == nil {
return nil
}
colValStrs := make([]*string, pwr.sch.GetAllCols().Size())
for i, val := range r {

View File

@@ -105,9 +105,6 @@ func (csvw *CSVWriter) WriteRow(ctx context.Context, r row.Row) error {
}
func (csvw *CSVWriter) WriteSqlRow(ctx context.Context, r sql.Row) error {
if r == nil {
return nil
}
colValStrs := make([]*string, csvw.sch.GetAllCols().Size())
for i, val := range r {
if val == nil {

View File

@@ -141,11 +141,6 @@ func (w *BatchSqlExportWriter) WriteSqlRow(ctx context.Context, r sql.Row) error
return err
}
// Previous write was last insert
if w.numInserts > 0 && r == nil {
return iohelp.WriteLine(w.wr, ";")
}
// Reached max number of inserts on one line
if w.numInserts == batchSize {
// Reset count
@@ -219,6 +214,14 @@ func (w *BatchSqlExportWriter) Close(ctx context.Context) error {
return err
}
// if wrote at least 1 insert, write the semicolon
if w.numInserts > 0 {
err := iohelp.WriteLine(w.wr, ";")
if err != nil {
return err
}
}
if w.wr != nil {
return w.wr.Close()
}