remove dead, deprecated code from table packages

This commit is contained in:
Andy Arthur
2022-10-17 11:50:45 -07:00
parent e72052eadf
commit 0d4b3cce3f
25 changed files with 75 additions and 1028 deletions
+1 -2
View File
@@ -187,9 +187,8 @@ func secondsSince(start time.Time) float64 {
// nullWriter is a no-op SqlRowWriter implementation
type nullWriter struct{}
func (n nullWriter) WriteRow(ctx context.Context, r row.Row) error { return nil }
func (n nullWriter) Close(ctx context.Context) error { return nil }
func (n nullWriter) WriteSqlRow(ctx context.Context, r sql.Row) error { return nil }
func (n nullWriter) Close(ctx context.Context) error { return nil }
func printEmptySetResult(start time.Time) {
seconds := secondsSince(start)
-26
View File
@@ -379,32 +379,6 @@ func setConflicts(ctx context.Context, cons durable.ConflictIndex, tbl, mergeTbl
return tableToUpdate, nil
}
func getTableInfoFromRoot(ctx context.Context, tblName string, root *doltdb.RootValue) (
ok bool,
table *doltdb.Table,
sch schema.Schema,
h hash.Hash,
err error,
) {
table, ok, err = root.GetTable(ctx, tblName)
if err != nil {
return false, nil, nil, hash.Hash{}, err
}
if ok {
h, err = table.HashOf()
if err != nil {
return false, nil, nil, hash.Hash{}, err
}
sch, err = table.GetSchema(ctx)
if err != nil {
return false, nil, nil, hash.Hash{}, err
}
}
return ok, table, sch, h, nil
}
func calcTableMergeStats(ctx context.Context, tbl *doltdb.Table, mergeTbl *doltdb.Table) (MergeStats, error) {
ms := MergeStats{Operation: TableModified}
@@ -289,10 +289,6 @@ func createCVsForPartialKeyMatches(
return createdViolation, nil
}
func makePartialDescriptor(desc val.TupleDesc, n int) val.TupleDesc {
return val.NewTupleDescriptor(desc.Types[:n]...)
}
func makePartialKey(kb *val.TupleBuilder, idxSch schema.Index, tblSch schema.Schema, k, v val.Tuple, pool pool.BuffPool) (val.Tuple, bool) {
for i, tag := range idxSch.IndexedColumnTags() {
if j, ok := tblSch.GetPKCols().TagToIdx[tag]; ok {
@@ -1,92 +0,0 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package table
import (
"context"
"io"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
)
// CompositeTableReader is a TableReader implementation which will concatenate the results
// of multiple TableReader instances into a single set of results.
type CompositeTableReader struct {
sch schema.Schema
readers []ReadCloser
idx int
}
// NewCompositeTableReader creates a new CompositeTableReader instance from a slice of TableReadClosers.
func NewCompositeTableReader(readers []ReadCloser) (*CompositeTableReader, error) {
if len(readers) == 0 {
panic("nothing to iterate")
}
sch := readers[0].GetSchema()
for i := 1; i < len(readers); i++ {
otherSch := readers[i].GetSchema()
if !schema.SchemasAreEqual(sch, otherSch) {
panic("readers must have the same schema")
}
}
return &CompositeTableReader{sch: sch, readers: readers, idx: 0}, nil
}
// GetSchema gets the schema of the rows that this reader will return
func (rd *CompositeTableReader) GetSchema() schema.Schema {
return rd.sch
}
// ReadRow reads a row from a table. If there is a bad row the returned error will be non nil, and calling
// IsBadRow(err) will be return true. This is a potentially non-fatal error and callers can decide if they want to
// continue on a bad row, or fail.
func (rd *CompositeTableReader) ReadRow(ctx context.Context) (row.Row, error) {
for rd.idx < len(rd.readers) {
r, err := rd.readers[rd.idx].ReadRow(ctx)
if err == io.EOF {
rd.idx++
continue
} else if err != nil {
return nil, err
}
return r, nil
}
return nil, io.EOF
}
// VerifySchema checks that the incoming schema matches the schema from the existing table
func (rd *CompositeTableReader) VerifySchema(outSch schema.Schema) (bool, error) {
return schema.VerifyInSchema(rd.sch, outSch)
}
// Close should release resources being held
func (rd *CompositeTableReader) Close(ctx context.Context) error {
var firstErr error
for _, rdr := range rd.readers {
err := rdr.Close(ctx)
if firstErr == nil {
firstErr = err
}
}
return firstErr
}
@@ -1,93 +0,0 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package table
import (
"context"
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/types"
)
func TestCompositeTableReader(t *testing.T) {
const (
numReaders = 7
numItemsPerReader = 7
)
ctx := context.Background()
coll := schema.NewColCollection(
schema.NewColumn("id", 0, types.UintKind, true, schema.NotNullConstraint{}),
schema.NewColumn("val", 1, types.IntKind, false),
)
sch, err := schema.SchemaFromCols(coll)
require.NoError(t, err)
var readers []ReadCloser
var expectedKeys []uint64
var expectedVals []int64
for i := 0; i < numReaders; i++ {
var rows []row.Row
for j := 0; j < numItemsPerReader; j++ {
idx := j + (i * numItemsPerReader)
expectedKeys = append(expectedKeys, uint64(idx))
expectedVals = append(expectedVals, int64(idx))
r, err := row.New(types.Format_Default, sch, row.TaggedValues{
0: types.Uint(uint64(idx)),
1: types.Int(idx),
})
require.NoError(t, err)
rows = append(rows, r)
}
imt := NewInMemTableWithData(sch, rows)
rd := NewInMemTableReader(imt)
readers = append(readers, rd)
}
compositeRd, err := NewCompositeTableReader(readers)
require.NoError(t, err)
var keys []uint64
var vals []int64
for {
r, err := compositeRd.ReadRow(ctx)
if err == io.EOF {
break
}
assert.NoError(t, err)
val0, ok := r.GetColVal(0)
assert.True(t, ok)
val1, ok := r.GetColVal(1)
assert.True(t, ok)
keys = append(keys, uint64(val0.(types.Uint)))
vals = append(vals, int64(val1.(types.Int)))
}
assert.Equal(t, expectedKeys, keys)
assert.Equal(t, expectedVals, vals)
err = compositeRd.Close(ctx)
assert.NoError(t, err)
}
@@ -25,7 +25,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
@@ -437,14 +436,14 @@ func (kte *keylessTableEditor) flush(ctx context.Context) error {
}
kte.eg.Go(func() (err error) {
kte.tbl, err = applyEdits(ctx, tbl, acc, kte.indexEds, nil)
kte.tbl, err = applyEdits(ctx, tbl, acc, kte.indexEds)
return err
})
return nil
}
func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc, indexEds []*IndexEditor, errFunc PKDuplicateCb) (_ *doltdb.Table, retErr error) {
func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc, indexEds []*IndexEditor) (_ *doltdb.Table, retErr error) {
rowData, err := tbl.GetNomsRowData(ctx)
if err != nil {
return nil, err
@@ -463,7 +462,7 @@ func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc, inde
}
ed := rowData.Edit()
iter := table.NewMapPointReader(rowData, keys...)
iter := newMapPointReader(rowData, keys)
var ok bool
for {
@@ -568,6 +567,44 @@ func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc, inde
return tbl.UpdateNomsRows(ctx, rowData)
}
type pointReader struct {
m types.Map
keys []types.Tuple
emptyTuple types.Tuple
idx int
}
func newMapPointReader(m types.Map, keys []types.Tuple) *pointReader {
return &pointReader{
m: m,
keys: keys,
emptyTuple: types.EmptyTuple(m.Format()),
}
}
// NextTuple implements types.MapIterator.
func (pr *pointReader) NextTuple(ctx context.Context) (k, v types.Tuple, err error) {
if pr.idx >= len(pr.keys) {
return types.Tuple{}, types.Tuple{}, io.EOF
}
k = pr.keys[pr.idx]
v = pr.emptyTuple
var ok bool
// todo: optimize by implementing MapIterator.Seek()
v, ok, err = pr.m.MaybeGetTuple(ctx, k)
pr.idx++
if err != nil {
return types.Tuple{}, types.Tuple{}, err
} else if !ok {
return k, pr.emptyTuple, nil
}
return k, v, nil
}
// for deletes (cardinality < 1): |ok| is set false
func initializeCardinality(val types.Tuple, card int64) (v types.Tuple, ok bool, err error) {
if card < 1 {
@@ -162,28 +162,3 @@ func (rd *InMemTableReader) GetSchema() schema.Schema {
func (rd *InMemTableReader) VerifySchema(outSch schema.Schema) (bool, error) {
return schema.VerifyInSchema(rd.tt.sch, outSch)
}
// InMemTableWriter is an implementation of a RowWriter for an InMemTable
type InMemTableWriter struct {
tt *InMemTable
}
// NewInMemTableWriter creates an instance of a RowWriter from an InMemTable
func NewInMemTableWriter(imt *InMemTable) *InMemTableWriter {
return &InMemTableWriter{imt}
}
// WriteRow will write a row to a table
func (w *InMemTableWriter) WriteRow(ctx context.Context, r row.Row) error {
return w.tt.AppendRow(r)
}
// Close should flush all writes, release resources being held
func (w *InMemTableWriter) Close(ctx context.Context) error {
return nil
}
// GetSchema gets the schema of the rows that this writer writes
func (w *InMemTableWriter) GetSchema() schema.Schema {
return w.tt.sch
}
@@ -76,12 +76,8 @@ func TestInMemTable(t *testing.T) {
imt := NewInMemTable(rowSch)
func() {
var wr TableWriteCloser
wr = NewInMemTableWriter(imt)
defer wr.Close(context.Background())
for _, row := range rows {
err := wr.WriteRow(context.Background(), row)
for _, r := range rows {
err := imt.AppendRow(r)
if err != nil {
t.Fatal("Failed to write row")
@@ -112,41 +108,6 @@ func TestInMemTable(t *testing.T) {
}()
}
func TestPipeRows(t *testing.T) {
imt := NewInMemTableWithData(rowSch, rows)
imtt2 := NewInMemTable(rowSch)
var err error
func() {
rd := NewInMemTableReader(imt)
defer rd.Close(context.Background())
wr := NewInMemTableWriter(imtt2)
defer wr.Close(context.Background())
_, _, err = PipeRows(context.Background(), rd, wr, false)
}()
if err != nil {
t.Error("Error piping rows from reader to writer", err)
}
if imt.NumRows() != imtt2.NumRows() {
t.Error("Row counts should match")
}
for i := 0; i < imt.NumRows(); i++ {
r1, err1 := imt.GetRow(i)
r2, err2 := imtt2.GetRow(i)
if err1 != nil || err2 != nil {
t.Fatal("Couldn't Get row.")
}
if !row.AreEqual(r1, r2, rowSch) {
t.Error("Rows should be the same.", row.Fmt(context.Background(), r1, rowSch), "!=", row.Fmt(context.Background(), r2, rowSch))
}
}
}
func TestReadAllRows(t *testing.T) {
imt := NewInMemTableWithData(rowSch, rows)
+3 -3
View File
@@ -29,10 +29,10 @@ import (
//
// Returns a tuple: (number of rows written, number of errors ignored, error). In the case that err is non-nil, the
// row counter fields in the tuple will be set to -1.
func PipeRows(ctx context.Context, rd Reader, wr RowWriter, contOnBadRow bool) (int, int, error) {
func PipeRows(ctx context.Context, rd SqlRowReader, wr SqlRowWriter, contOnBadRow bool) (int, int, error) {
var numBad, numGood int
for {
r, err := rd.ReadRow(ctx)
r, err := rd.ReadSqlRow(ctx)
if err != nil && err != io.EOF {
if IsBadRow(err) && contOnBadRow {
@@ -48,7 +48,7 @@ func PipeRows(ctx context.Context, rd Reader, wr RowWriter, contOnBadRow bool) (
return -1, -1, errors.New("reader returned nil row with err==nil")
}
err = wr.WriteRow(ctx, r)
err = wr.WriteSqlRow(ctx, r)
if err != nil {
return -1, -1, err
@@ -1,157 +0,0 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package table
import (
"context"
"io"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/store/types"
)
type keylessTableReader struct {
iter types.MapIterator
sch schema.Schema
row row.Row
remainingCopies uint64
bounded bool
remainingEntries uint64
}
var _ SqlTableReader = &keylessTableReader{}
var _ ReadCloser = &keylessTableReader{}
// GetSchema implements the TableReader interface.
func (rdr *keylessTableReader) GetSchema() schema.Schema {
return rdr.sch
}
// ReadSqlRow implements the SqlTableReader interface.
func (rdr *keylessTableReader) ReadRow(ctx context.Context) (row.Row, error) {
if rdr.remainingCopies <= 0 {
if rdr.bounded && rdr.remainingEntries == 0 {
return nil, io.EOF
}
key, val, err := rdr.iter.Next(ctx)
if err != nil {
return nil, err
} else if key == nil {
return nil, io.EOF
}
rdr.row, rdr.remainingCopies, err = row.KeylessRowsFromTuples(key.(types.Tuple), val.(types.Tuple))
if err != nil {
return nil, err
}
if rdr.remainingEntries > 0 {
rdr.remainingEntries -= 1
}
if rdr.remainingCopies == 0 {
return nil, row.ErrZeroCardinality
}
}
rdr.remainingCopies -= 1
return rdr.row, nil
}
// ReadSqlRow implements the SqlTableReader interface.
func (rdr *keylessTableReader) ReadSqlRow(ctx context.Context) (sql.Row, error) {
r, err := rdr.ReadRow(ctx)
if err != nil {
return nil, err
}
return sqlutil.DoltRowToSqlRow(r, rdr.sch)
}
// Close implements the TableReadCloser interface.
func (rdr *keylessTableReader) Close(_ context.Context) error {
return nil
}
func newKeylessTableReader(ctx context.Context, tbl *doltdb.Table, sch schema.Schema, buffered bool) (*keylessTableReader, error) {
rows, err := tbl.GetNomsRowData(ctx)
if err != nil {
return nil, err
}
return newKeylessTableReaderForRows(ctx, rows, sch, buffered)
}
func newKeylessTableReaderForRows(ctx context.Context, rows types.Map, sch schema.Schema, buffered bool) (*keylessTableReader, error) {
var err error
var iter types.MapIterator
if buffered {
iter, err = rows.Iterator(ctx)
} else {
iter, err = rows.BufferedIterator(ctx)
}
if err != nil {
return nil, err
}
return &keylessTableReader{
iter: iter,
sch: sch,
}, nil
}
func newKeylessTableReaderForPartition(ctx context.Context, tbl *doltdb.Table, sch schema.Schema, start, end uint64) (SqlTableReader, error) {
rows, err := tbl.GetNomsRowData(ctx)
if err != nil {
return nil, err
}
iter, err := rows.BufferedIteratorAt(ctx, start)
if err != nil {
return nil, err
}
return &keylessTableReader{
iter: iter,
sch: sch,
remainingEntries: end - start,
bounded: true,
}, nil
}
func newKeylessTableReaderFrom(ctx context.Context, tbl *doltdb.Table, sch schema.Schema, val types.Value) (SqlTableReader, error) {
rows, err := tbl.GetNomsRowData(ctx)
if err != nil {
return nil, err
}
iter, err := rows.IteratorFrom(ctx, val)
if err != nil {
return nil, err
}
return &keylessTableReader{
iter: iter,
sch: sch,
}, nil
}
@@ -1,82 +0,0 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package table
import (
"context"
"io"
"github.com/dolthub/dolt/go/store/types"
)
type PointReader struct {
m types.Map
emptyTuple types.Tuple
keys []types.Tuple
idx int
}
var _ types.MapIterator = &PointReader{}
// read the map values for a set of map keys
func NewMapPointReader(m types.Map, keys ...types.Tuple) types.MapIterator {
return &PointReader{
m: m,
emptyTuple: types.EmptyTuple(m.Format()),
keys: keys,
}
}
// Next implements types.MapIterator.
func (pr *PointReader) Next(ctx context.Context) (k, v types.Value, err error) {
kt, vt, err := pr.NextTuple(ctx)
if err != nil {
return nil, nil, err
}
if !kt.Empty() {
k = kt
}
if !vt.Empty() {
v = vt
}
return k, v, nil
}
// NextTuple implements types.MapIterator.
func (pr *PointReader) NextTuple(ctx context.Context) (k, v types.Tuple, err error) {
if pr.idx >= len(pr.keys) {
return types.Tuple{}, types.Tuple{}, io.EOF
}
k = pr.keys[pr.idx]
v = pr.emptyTuple
var ok bool
// todo: optimize by implementing MapIterator.Seek()
v, ok, err = pr.m.MaybeGetTuple(ctx, k)
pr.idx++
if err != nil {
return types.Tuple{}, types.Tuple{}, err
} else if !ok {
return k, pr.emptyTuple, nil
}
return k, v, nil
}
-153
View File
@@ -1,153 +0,0 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package table
import (
"context"
"io"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/store/types"
)
type pkTableReader struct {
iter types.MapIterator
sch schema.Schema
}
var _ SqlTableReader = pkTableReader{}
var _ ReadCloser = pkTableReader{}
// GetSchema implements the TableReader interface.
func (rdr pkTableReader) GetSchema() schema.Schema {
return rdr.sch
}
// ReadRow implements the TableReader interface.
func (rdr pkTableReader) ReadRow(ctx context.Context) (row.Row, error) {
key, val, err := rdr.iter.Next(ctx)
if err != nil {
return nil, err
} else if key == nil {
return nil, io.EOF
}
return row.FromNoms(rdr.sch, key.(types.Tuple), val.(types.Tuple))
}
// ReadSqlRow implements the SqlTableReader interface.
func (rdr pkTableReader) ReadSqlRow(ctx context.Context) (sql.Row, error) {
key, val, err := rdr.iter.Next(ctx)
if err != nil {
return nil, err
} else if key == nil {
return nil, io.EOF
}
return noms.SqlRowFromTuples(rdr.sch, key.(types.Tuple), val.(types.Tuple))
}
// Close implements the TableReadCloser interface.
func (rdr pkTableReader) Close(_ context.Context) error {
return nil
}
func newPkTableReader(ctx context.Context, tbl *doltdb.Table, sch schema.Schema, buffered bool) (pkTableReader, error) {
rows, err := tbl.GetNomsRowData(ctx)
if err != nil {
return pkTableReader{}, err
}
return newPkTableReaderForRows(ctx, rows, sch, buffered)
}
func newPkTableReaderForRows(ctx context.Context, rows types.Map, sch schema.Schema, buffered bool) (pkTableReader, error) {
var err error
var iter types.MapIterator
if buffered {
iter, err = rows.Iterator(ctx)
} else {
iter, err = rows.BufferedIterator(ctx)
}
if err != nil {
return pkTableReader{}, err
}
return pkTableReader{
iter: iter,
sch: sch,
}, nil
}
func newPkTableReaderFrom(ctx context.Context, tbl *doltdb.Table, sch schema.Schema, val types.Value) (SqlTableReader, error) {
rows, err := tbl.GetNomsRowData(ctx)
if err != nil {
return nil, err
}
iter, err := rows.IteratorFrom(ctx, val)
if err != nil {
return nil, err
}
return pkTableReader{
iter: iter,
sch: sch,
}, nil
}
type partitionTableReader struct {
SqlTableReader
remaining uint64
}
var _ SqlTableReader = &partitionTableReader{}
func newPkTableReaderForPartition(ctx context.Context, tbl *doltdb.Table, sch schema.Schema, start, end uint64) (SqlTableReader, error) {
rows, err := tbl.GetNomsRowData(ctx)
if err != nil {
return nil, err
}
iter, err := rows.BufferedIteratorAt(ctx, start)
if err != nil {
return nil, err
}
return &partitionTableReader{
SqlTableReader: pkTableReader{
iter: iter,
sch: sch,
},
remaining: end - start,
}, nil
}
// ReadSqlRow implements the SqlTableReader interface.
func (rdr *partitionTableReader) ReadSqlRow(ctx context.Context) (sql.Row, error) {
if rdr.remaining == 0 {
return nil, io.EOF
}
rdr.remaining -= 1
return rdr.SqlTableReader.ReadSqlRow(ctx)
}
@@ -1,71 +0,0 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package table
import (
"context"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/utils/async"
)
var _ ReadCloser = (*AsyncReadAheadTableReader)(nil)
// AsyncReadAheadTableReader is a TableReadCloser implementation that spins up a go routine to keep reading data into
// a buffered channel so that it is ready when the caller wants it.
type AsyncReadAheadTableReader struct {
backingReader ReadCloser
reader *async.AsyncReader
}
// NewAsyncReadAheadTableReader creates a new AsyncReadAheadTableReader
func NewAsyncReadAheadTableReader(tr ReadCloser, bufferSize int) *AsyncReadAheadTableReader {
read := func(ctx context.Context) (interface{}, error) {
return tr.ReadRow(ctx)
}
reader := async.NewAsyncReader(read, bufferSize)
return &AsyncReadAheadTableReader{tr, reader}
}
// Start the worker routine reading rows to the channel
func (tr *AsyncReadAheadTableReader) Start(ctx context.Context) error {
return tr.reader.Start(ctx)
}
// GetSchema gets the schema of the rows that this reader will return
func (tr *AsyncReadAheadTableReader) GetSchema() schema.Schema {
return tr.backingReader.GetSchema()
}
// ReadRow reads a row from a table. If there is a bad row the returned error will be non nil, and calling
// IsBadRow(err) will be return true. This is a potentially non-fatal error and callers can decide if they want to
// continue on a bad row, or fail.
func (tr *AsyncReadAheadTableReader) ReadRow(ctx context.Context) (row.Row, error) {
obj, err := tr.reader.Read()
if err != nil {
return nil, err
}
return obj.(row.Row), err
}
// Close releases resources being held
func (tr *AsyncReadAheadTableReader) Close(ctx context.Context) error {
_ = tr.reader.Close()
return tr.backingReader.Close(ctx)
}
+1 -15
View File
@@ -18,23 +18,9 @@ import (
"context"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
)
// RowWriter knows how to write table rows to some destination
type RowWriter interface {
// WriteRow writes a row to the destination of this writer
WriteRow(ctx context.Context, r row.Row) error
}
// TableWriteCloser is an interface for writing rows to a table, that can be closed
type TableWriteCloser interface {
RowWriter
Closer
}
type SqlRowWriter interface {
TableWriteCloser
Closer
WriteSqlRow(ctx context.Context, r sql.Row) error
}
@@ -24,12 +24,10 @@ import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/sqltypes"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/store/types"
)
const jsonHeader = `{"rows": [`
@@ -72,77 +70,6 @@ func (j *RowWriter) GetSchema() schema.Schema {
return j.sch
}
// WriteRow encodes the row given into JSON format and writes it, returning any error
func (j *RowWriter) WriteRow(ctx context.Context, r row.Row) error {
if j.rowsWritten == 0 {
err := iohelp.WriteAll(j.bWr, []byte(j.header))
if err != nil {
return err
}
}
allCols := j.sch.GetAllCols()
colValMap := make(map[string]interface{}, allCols.Size())
if err := allCols.Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
val, ok := r.GetColVal(tag)
if !ok || types.IsNull(val) {
return false, nil
}
switch col.TypeInfo.GetTypeIdentifier() {
case typeinfo.DatetimeTypeIdentifier,
typeinfo.DecimalTypeIdentifier,
typeinfo.EnumTypeIdentifier,
typeinfo.InlineBlobTypeIdentifier,
typeinfo.SetTypeIdentifier,
typeinfo.TimeTypeIdentifier,
typeinfo.TupleTypeIdentifier,
typeinfo.UuidTypeIdentifier,
typeinfo.VarBinaryTypeIdentifier,
typeinfo.YearTypeIdentifier:
v, err := col.TypeInfo.FormatValue(val)
if err != nil {
return true, err
}
val = types.String(*v)
case typeinfo.BitTypeIdentifier,
typeinfo.BoolTypeIdentifier,
typeinfo.VarStringTypeIdentifier,
typeinfo.UintTypeIdentifier,
typeinfo.IntTypeIdentifier,
typeinfo.FloatTypeIdentifier:
// use primitive type
}
colValMap[col.Name] = val
return false, nil
}); err != nil {
return err
}
data, err := marshalToJson(colValMap)
if err != nil {
return errors.New("marshaling did not work")
}
if j.rowsWritten != 0 {
_, err := j.bWr.WriteString(j.separator)
if err != nil {
return err
}
}
newErr := iohelp.WriteAll(j.bWr, data)
if newErr != nil {
return newErr
}
j.rowsWritten++
return nil
}
func (j *RowWriter) WriteSqlRow(ctx context.Context, row sql.Row) error {
// The Type.SQL() call takes in a SQL context to determine the output character set for types that use a collation.
// The context given is not a SQL context, so we force the `utf8mb4` character set to be used, as it is the most
@@ -24,7 +24,6 @@ import (
"github.com/xitongsys/parquet-go/source"
"github.com/xitongsys/parquet-go/writer"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
@@ -96,15 +95,6 @@ func (pwr *ParquetWriter) GetSchema() schema.Schema {
return pwr.sch
}
// WriteRow will write a row to a table
func (pwr *ParquetWriter) WriteRow(ctx context.Context, r row.Row) error {
sqlRow, err := sqlutil.DoltRowToSqlRow(r, pwr.GetSchema())
if err != nil {
return err
}
return pwr.WriteSqlRow(ctx, sqlRow)
}
func (pwr *ParquetWriter) WriteSqlRow(ctx context.Context, r sql.Row) error {
colValStrs := make([]*string, pwr.sch.GetAllCols().Size())
@@ -21,11 +21,11 @@ import (
"os"
"testing"
"github.com/dolthub/go-mysql-server/sql"
"github.com/stretchr/testify/assert"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/reader"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo"
"github.com/dolthub/dolt/go/store/types"
@@ -54,41 +54,21 @@ type Person struct {
Title string `parquet:"name=title, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
}
func mustRow(r row.Row, err error) row.Row {
if err != nil {
panic(err)
}
return r
}
func getSampleRows() (rows []row.Row) {
return []row.Row{
mustRow(row.New(types.Format_Default, rowSch, row.TaggedValues{
nameColTag: types.String("Bill Billerson"),
ageColTag: types.Uint(32),
titleColTag: types.String("Senior Dufus")})),
mustRow(row.New(types.Format_Default, rowSch, row.TaggedValues{
nameColTag: types.String("Rob Robertson"),
ageColTag: types.Uint(25),
titleColTag: types.String("Dufus")})),
mustRow(row.New(types.Format_Default, rowSch, row.TaggedValues{
nameColTag: types.String("John Johnson"),
ageColTag: types.Uint(21),
titleColTag: types.String("")})),
mustRow(row.New(types.Format_Default, rowSch, row.TaggedValues{
nameColTag: types.String("Andy Anderson"),
ageColTag: types.Uint(27),
/* title = NULL */})),
func getSampleRows() []sql.Row {
return []sql.Row{
{"Bill Billerson", 32, "Senior Dufus"},
{"Rob Robertson", 25, "Dufus"},
{"John Johnson", 21, ""},
{"Andy Anderson", 27, nil},
}
}
func writeToParquet(pWr *ParquetWriter, rows []row.Row, t *testing.T) {
func writeToParquet(pWr *ParquetWriter, rows []sql.Row, t *testing.T) {
func() {
defer pWr.Close(context.Background())
for _, row := range rows {
err := pWr.WriteRow(context.Background(), row)
err := pWr.WriteSqlRow(context.Background(), row)
if err != nil {
t.Fatal("Failed to write row")
}
@@ -20,10 +20,6 @@ import (
"strings"
)
func csvSplitLineRuneDelim(str string, delim rune, escapedQuotes bool) ([]*string, error) {
return csvSplitLine(str, string(delim), escapedQuotes)
}
func csvSplitLine(str string, delim string, escapedQuotes bool) ([]*string, error) {
if strings.IndexRune(delim, '"') != -1 {
panic("delims cannot contain quotes")
@@ -26,7 +26,6 @@ import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
@@ -81,31 +80,6 @@ func NewCSVWriter(wr io.WriteCloser, outSch schema.Schema, info *CSVFileInfo) (*
return csvw, nil
}
// WriteRow will write a row to a table
func (csvw *CSVWriter) WriteRow(ctx context.Context, r row.Row) error {
allCols := csvw.sch.GetAllCols()
colValStrs := make([]*string, allCols.Size())
sqlRow, err := sqlutil.DoltRowToSqlRow(r, csvw.sch)
if err != nil {
return err
}
for i, val := range sqlRow {
if val == nil {
colValStrs[i] = nil
} else {
v, err := sqlutil.SqlColToStr(csvw.sch.GetAllCols().GetByIndex(i).TypeInfo.ToSqlType(), val)
if err != nil {
return err
}
colValStrs[i] = &v
}
}
return csvw.write(colValStrs)
}
func (csvw *CSVWriter) WriteSqlRow(ctx context.Context, r sql.Row) error {
colValStrs := make([]*string, csvw.sch.GetAllCols().Size())
for i, val := range r {
@@ -19,7 +19,8 @@ import (
"os"
"testing"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
@@ -43,33 +44,21 @@ var inCols = []schema.Column{
var colColl = schema.NewColCollection(inCols...)
var rowSch = schema.MustSchemaFromCols(colColl)
func getSampleRows() (rows []row.Row) {
return []row.Row{
mustRow(row.New(types.Format_Default, rowSch, row.TaggedValues{
nameColTag: types.String("Bill Billerson"),
ageColTag: types.Uint(32),
titleColTag: types.String("Senior Dufus")})),
mustRow(row.New(types.Format_Default, rowSch, row.TaggedValues{
nameColTag: types.String("Rob Robertson"),
ageColTag: types.Uint(25),
titleColTag: types.String("Dufus")})),
mustRow(row.New(types.Format_Default, rowSch, row.TaggedValues{
nameColTag: types.String("John Johnson"),
ageColTag: types.Uint(21),
titleColTag: types.String("")})),
mustRow(row.New(types.Format_Default, rowSch, row.TaggedValues{
nameColTag: types.String("Andy Anderson"),
ageColTag: types.Uint(27),
/* title = NULL */})),
func getSampleRows() []sql.Row {
return []sql.Row{
{"Bill Billerson", 32, "Senior Dufus"},
{"Rob Robertson", 25, "Dufus"},
{"John Johnson", 21, ""},
{"Andy Anderson", 27, nil},
}
}
func writeToCSV(csvWr *CSVWriter, rows []row.Row, t *testing.T) {
func writeToCSV(csvWr *CSVWriter, rows []sql.Row, t *testing.T) {
func() {
defer csvWr.Close(context.Background())
for _, row := range rows {
err := csvWr.WriteRow(context.Background(), row)
for _, r := range rows {
err := csvWr.WriteSqlRow(context.Background(), r)
if err != nil {
t.Fatal("Failed to write row")
@@ -23,7 +23,6 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlfmt"
@@ -81,64 +80,6 @@ func (w *BatchSqlExportWriter) GetSchema() schema.Schema {
return w.sch
}
// WriteRow will write a row to a table
func (w *BatchSqlExportWriter) WriteRow(ctx context.Context, r row.Row) error {
if err := w.maybeWriteDropCreate(ctx); err != nil {
return err
}
// Previous write was last insert
if w.numInserts > 0 && r == nil {
return iohelp.WriteLine(w.wr, ";")
}
// Reached max number of inserts on one line
if w.numInserts == batchSize {
// Reset count
w.numInserts = 0
// End line
err := iohelp.WriteLine(w.wr, ";")
if err != nil {
return err
}
}
// Append insert values as tuples
var stmt string
if w.numInserts == 0 {
// Get insert prefix string
prefix, err := sqlfmt.InsertStatementPrefix(w.tableName, w.sch)
if err != nil {
return nil
}
// Write prefix
err = iohelp.WriteWithoutNewLine(w.wr, prefix)
if err != nil {
return nil
}
} else {
stmt = ", "
}
// Get insert tuple string
tuple, err := sqlfmt.RowAsTupleString(r, w.sch)
if err != nil {
return err
}
// Write insert tuple
err = iohelp.WriteWithoutNewLine(w.wr, stmt+tuple)
if err != nil {
return nil
}
// Increase count of inserts written on this line
w.numInserts++
return err
}
func (w *BatchSqlExportWriter) WriteSqlRow(ctx context.Context, r sql.Row) error {
if err := w.maybeWriteDropCreate(ctx); err != nil {
return err
@@ -23,7 +23,6 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlfmt"
@@ -77,21 +76,6 @@ func (w *SqlExportWriter) GetSchema() schema.Schema {
return w.sch
}
// WriteRow will write a row to a table
func (w *SqlExportWriter) WriteRow(ctx context.Context, r row.Row) error {
if err := w.maybeWriteDropCreate(ctx); err != nil {
return err
}
stmt, err := sqlfmt.RowAsInsertStmt(r, w.tableName, w.sch)
if err != nil {
return err
}
return iohelp.WriteLine(w.wr, stmt)
}
func (w *SqlExportWriter) WriteSqlRow(ctx context.Context, r sql.Row) error {
if r == nil {
return nil
@@ -19,7 +19,7 @@ import (
"strings"
"testing"
"github.com/google/uuid"
"github.com/dolthub/go-mysql-server/sql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -40,9 +40,7 @@ func (*StringBuilderCloser) Close() error {
}
func TestEndToEnd(t *testing.T) {
id := uuid.MustParse("00000000-0000-0000-0000-000000000000")
tableName := "people"
dropCreateStatement := sqlfmt.DropTableIfExistsStmt(tableName) + "\n" +
"CREATE TABLE `people` (\n" +
" `id` varchar(16383) NOT NULL,\n" +
@@ -57,7 +55,7 @@ func TestEndToEnd(t *testing.T) {
type test struct {
name string
rows []row.Row
rows []sql.Row
sch schema.Schema
expectedOutput string
}
@@ -65,9 +63,10 @@ func TestEndToEnd(t *testing.T) {
tests := []test{
{
name: "two rows",
rows: rs(
dtestutils.NewTypedRow(id, "some guy", 100, false, strPointer("normie")),
dtestutils.NewTypedRow(id, "guy personson", 0, true, strPointer("officially a person"))),
rows: []sql.Row{
{"00000000-0000-0000-0000-000000000000", "some guy", 100, 0, "normie"},
{"00000000-0000-0000-0000-000000000000", "guy personson", 0, 1, "officially a person"},
},
sch: dtestutils.TypedSchema,
expectedOutput: dropCreateStatement + "\n" +
"INSERT INTO `people` (`id`,`name`,`age`,`is_married`,`title`) " +
@@ -112,7 +111,7 @@ func TestEndToEnd(t *testing.T) {
}
for _, r := range tt.rows {
assert.NoError(t, w.WriteRow(ctx, r))
assert.NoError(t, w.WriteSqlRow(ctx, r))
}
assert.NoError(t, w.Close(ctx))
@@ -25,7 +25,6 @@ import (
"github.com/fatih/color"
"github.com/dolthub/dolt/go/libraries/doltcore/diff"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
@@ -168,10 +167,6 @@ func (w *FixedWidthTableWriter) WriteColoredRow(ctx context.Context, r sql.Row,
return nil
}
func (w *FixedWidthTableWriter) WriteRow(ctx context.Context, r row.Row) error {
panic("unimplemented")
}
func (w *FixedWidthTableWriter) sampleRow(r sql.Row, colors []*color.Color) (tableRow, error) {
row := tableRow{
columns: make([]string, len(r)),
-8
View File
@@ -16,7 +16,6 @@ package edits
import (
"context"
"os"
"github.com/dolthub/dolt/go/store/types"
)
@@ -111,10 +110,3 @@ func (dbea *DiskBackedEditAcc) Close(ctx context.Context) error {
return nil
}
// best effort deletion ignores errors
func tryDeleteFiles(paths []string) {
for _, path := range paths {
_ = os.Remove(path)
}
}