diff --git a/go/libraries/doltcore/mvdata/engine_table_writer.go b/go/libraries/doltcore/mvdata/engine_table_writer.go index af7abce5eb..7ee5752d01 100644 --- a/go/libraries/doltcore/mvdata/engine_table_writer.go +++ b/go/libraries/doltcore/mvdata/engine_table_writer.go @@ -31,7 +31,6 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/overrides" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil" - "github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms" "github.com/dolthub/dolt/go/store/types" ) @@ -40,6 +39,9 @@ const ( tableWriterStatUpdateRate = 64 * 1024 ) +// StatsCb is a callback for reporting stats about the rows that have been processed so far +type StatsCb func(types.AppliedEditStats) + // SqlEngineTableWriter is a utility for importing a set of rows through the sql engine. type SqlEngineTableWriter struct { se *sqle.Engine @@ -51,7 +53,7 @@ type SqlEngineTableWriter struct { force bool disableFks bool - statsCB noms.StatsCB + statsCB StatsCb stats types.AppliedEditStats statOps int32 @@ -60,7 +62,7 @@ type SqlEngineTableWriter struct { rowOperationSchema sql.PrimaryKeySchema } -func NewSqlEngineTableWriter(ctx *sql.Context, engine *sqle.Engine, createTableSchema, rowOperationSchema schema.Schema, options *MoverOptions, statsCB noms.StatsCB) (*SqlEngineTableWriter, error) { +func NewSqlEngineTableWriter(ctx *sql.Context, engine *sqle.Engine, createTableSchema, rowOperationSchema schema.Schema, options *MoverOptions, statsCB StatsCb) (*SqlEngineTableWriter, error) { if engine.IsReadOnly() { // SqlEngineTableWriter does not respect read only mode return nil, analyzererrors.ErrReadOnlyDatabase.New(ctx.GetCurrentDatabase()) diff --git a/go/libraries/doltcore/sqle/index/index_reader.go b/go/libraries/doltcore/sqle/index/index_reader.go index 30a34e07c1..3cee7e9348 100644 --- a/go/libraries/doltcore/sqle/index/index_reader.go +++ b/go/libraries/doltcore/sqle/index/index_reader.go @@ -26,7 +26,6 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" "github.com/dolthub/dolt/go/libraries/doltcore/row" "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms" "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" @@ -174,7 +173,6 @@ func (itr *rangePartitionIter) nextProllyPartition() (sql.Partition, error) { } type rangePartition struct { - nomsRange *noms.ReadRange key []byte prollyRange prolly.Range isReverse bool diff --git a/go/libraries/doltcore/table/editor/editor_options.go b/go/libraries/doltcore/table/editor/editor_options.go index b2f705400e..d332723a5a 100644 --- a/go/libraries/doltcore/table/editor/editor_options.go +++ b/go/libraries/doltcore/table/editor/editor_options.go @@ -15,10 +15,6 @@ package editor import ( - "context" - "fmt" - "strings" - "github.com/dolthub/dolt/go/store/types" ) @@ -38,33 +34,3 @@ type Options struct { func TestEditorOptions(vrw types.ValueReadWriter) Options { return Options{} } - -// formatKey returns a comma-separated string representation of the key given. -func formatKey(ctx context.Context, key types.Value) (string, error) { - tuple, ok := key.(types.Tuple) - if !ok { - return "", fmt.Errorf("Expected types.Tuple but got %T", key) - } - - var vals []string - iter, err := tuple.Iterator() - if err != nil { - return "", err - } - - for iter.HasMore() { - i, val, err := iter.Next() - if err != nil { - return "", err - } - if i%2 == 1 { - str, err := types.EncodedValue(ctx, val) - if err != nil { - return "", err - } - vals = append(vals, str) - } - } - - return fmt.Sprintf("[%s]", strings.Join(vals, ",")), nil -} diff --git a/go/libraries/doltcore/table/editor/index_edit_accumulator.go b/go/libraries/doltcore/table/editor/index_edit_accumulator.go deleted file mode 100644 index 453102c56b..0000000000 --- a/go/libraries/doltcore/table/editor/index_edit_accumulator.go +++ /dev/null @@ -1,444 +0,0 @@ -// Copyright 2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package editor - -import ( - "context" - "io" - - "github.com/dolthub/dolt/go/libraries/doltcore/row" - "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/libraries/doltcore/table" - "github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms" - "github.com/dolthub/dolt/go/libraries/utils/set" - "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/types" - "github.com/dolthub/dolt/go/store/types/edits" -) - -// var for testing -var indexFlushThreshold int64 = 256 * 1024 - -type IndexEditAccumulator interface { - // Delete adds a row to be deleted when these edits are eventually applied. - Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error - - // Insert adds a row to be inserted when these edits are eventually applied. - Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error - - // Has returns true if the current TableEditAccumulator contains the given key, or it exists in the row data. - Has(ctx context.Context, keyHash hash.Hash, key types.Tuple) (bool, error) - - // HasPartial returns true if the current TableEditAccumulator contains the given partialKey - HasPartial(ctx context.Context, idxSch schema.Schema, partialKeyHash hash.Hash, partialKey types.Tuple) ([]hashedTuple, error) - - // Commit applies the in memory edits to the list of committed in memory edits - Commit(ctx context.Context, nbf *types.NomsBinFormat) error - - // Rollback rolls back in memory edits until it reaches the state represented by the savedTea - Rollback(ctx context.Context) error - - // MaterializeEdits commits and applies the in memory edits to the row data - MaterializeEdits(ctx context.Context, nbf *types.NomsBinFormat) (types.Map, error) -} - -// hashedTuple is a tuple accompanied by a hash. The representing value of the hash is dependent on the function -// it is obtained from. -type hashedTuple struct { - key types.Tuple - value types.Tuple - hash hash.Hash -} - -// inMemIndexEdits represent row adds and deletes that have not been written to the underlying storage and only exist in memory -type inMemIndexEdits struct { - // addedPartialKeys is a map of partial keys to a map of full keys that match the partial key - partialAdds map[hash.Hash]map[hash.Hash]types.Tuple - // These hashes represent the hash of the partial key, with the tuple being the full key - deletes map[hash.Hash]*hashedTuple - // These hashes represent the hash of the partial key, with the tuple being the full key - adds map[hash.Hash]*hashedTuple - ops int64 -} - -func newInMemIndexEdits() *inMemIndexEdits { - return &inMemIndexEdits{ - partialAdds: make(map[hash.Hash]map[hash.Hash]types.Tuple), - deletes: make(map[hash.Hash]*hashedTuple), - adds: make(map[hash.Hash]*hashedTuple), - } -} - -// MergeIn merges changes from another inMemIndexEdits object into this instance -func (edits *inMemIndexEdits) MergeIn(other *inMemIndexEdits) { - for keyHash, ht := range other.deletes { - delete(edits.adds, keyHash) - edits.deletes[keyHash] = ht - } - - for keyHash, ht := range other.adds { - delete(edits.deletes, keyHash) - edits.adds[keyHash] = ht - } - - for partialKeyHash, keyHashToPartialKey := range other.partialAdds { - if dest, ok := edits.partialAdds[partialKeyHash]; !ok { - edits.partialAdds[partialKeyHash] = keyHashToPartialKey - } else { - for keyHash, partialKey := range keyHashToPartialKey { - dest[keyHash] = partialKey - } - } - } - - edits.ops += other.ops -} - -// Has returns whether a key hash has been added as an insert, or a delete in this inMemIndexEdits object -func (edits *inMemIndexEdits) Has(keyHash hash.Hash) (added, deleted bool) { - if _, ok := edits.adds[keyHash]; ok { - return true, false - } - if _, ok := edits.deletes[keyHash]; ok { - return false, true - } - return false, false -} - -// indexEditAccumulatorImpl is the index equivalent of the tableEditAccumulatorImpl. -// -// indexEditAccumulatorImpl accumulates edits that need to be applied to the index row data. It needs to be able to -// support rollback and commit without having to materialize the types.Map. To do this it tracks committed and uncommitted -// modifications in memory. When a commit occurs the list of uncommitted changes are added to the list of committed changes. -// When a rollback occurs uncommitted changes are dropped. -// -// In addition to the in memory edits, the changes are applied to committedEA when a commit occurs. It is possible -// for the uncommitted changes to become so large that they need to be flushed to disk. At this point we change modes to write all edits -// to a separate map edit accumulator as they occur until the next commit occurs. -type indexEditAccumulatorImpl struct { - vr types.ValueReader - - // state of the index last time edits were applied - rowData types.Map - - // in memory changes which will be applied to the rowData when the map is materialized - committed *inMemIndexEdits - uncommitted *inMemIndexEdits - - // accumulatorIdx defines the order in which types.EditAccumulators will be applied - accumulatorIdx uint64 - - // flusher manages flushing of the types.EditAccumulators to disk when needed - flusher *edits.DiskEditFlusher - - // committedEaIds tracks ids of edit accumulators which have changes that have been committed - committedEaIds *set.Uint64Set - // uncommittedEAIds tracks ids of edit accumulators which have not been committed yet. - uncommittedEaIds *set.Uint64Set - - // commitEA is the types.EditAccumulator containing the committed changes that are being accumulated currently - commitEA types.EditAccumulator - // commitEAId is the id used for ordering the commitEA with other types.EditAccumulators that will be applied when - // materializing all changes. - commitEAId uint64 - - // flushingUncommitted is a flag that tracks whether we are in a state where we write uncommitted map edits to uncommittedEA - flushingUncommitted bool - // lastFlush is the number of uncommitted ops that had occurred at the time of the last flush - lastFlush int64 - // uncommittedEA is a types.EditAccumulator that we write to as uncommitted edits come in when the number of uncommitted - // edits becomes large - uncommittedEA types.EditAccumulator - // uncommittedEAId is the id used for ordering the uncommittedEA with other types.EditAccumulators that will be applied - // when materializing all changes - uncommittedEAId uint64 -} - -var _ IndexEditAccumulator = (*indexEditAccumulatorImpl)(nil) - -func (iea *indexEditAccumulatorImpl) flushUncommitted() { - // if we are not already actively writing edits to the uncommittedEA then change the state and push all in mem edits - // to a types.EditAccumulator - if !iea.flushingUncommitted { - iea.flushingUncommitted = true - - if iea.commitEA != nil && iea.commitEA.EditsAdded() > 0 { - // if there are uncommitted flushed changes we need to flush the committed changes first - // so they can be applied before the uncommitted flushed changes and future changes can be applied after - iea.committedEaIds.Add(iea.commitEAId) - iea.flusher.Flush(iea.commitEA, iea.commitEAId) - - iea.commitEA = nil - iea.commitEAId = invalidEaId - } - - iea.uncommittedEA = edits.NewAsyncSortedEditsWithDefaults(iea.vr) - iea.uncommittedEAId = iea.accumulatorIdx - iea.accumulatorIdx++ - - for _, ht := range iea.uncommitted.adds { - iea.uncommittedEA.AddEdit(ht.key, ht.value) - } - - for _, ht := range iea.uncommitted.deletes { - iea.uncommittedEA.AddEdit(ht.key, nil) - } - } - - // flush uncommitted - iea.lastFlush = iea.uncommitted.ops - iea.uncommittedEaIds.Add(iea.uncommittedEAId) - iea.flusher.Flush(iea.uncommittedEA, iea.uncommittedEAId) - - // initialize a new types.EditAccumulator for additional uncommitted edits to be written to. - iea.uncommittedEA = edits.NewAsyncSortedEditsWithDefaults(iea.vr) - iea.uncommittedEAId = iea.accumulatorIdx - iea.accumulatorIdx++ -} - -// Insert adds a row to be inserted when these edits are eventually applied. -func (iea *indexEditAccumulatorImpl) Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error { - if _, ok := iea.uncommitted.deletes[keyHash]; ok { - delete(iea.uncommitted.deletes, keyHash) - } else { - iea.uncommitted.adds[keyHash] = &hashedTuple{key, value, partialKeyHash} - if matchingMap, ok := iea.uncommitted.partialAdds[partialKeyHash]; ok { - matchingMap[keyHash] = key - } else { - iea.uncommitted.partialAdds[partialKeyHash] = map[hash.Hash]types.Tuple{keyHash: key} - } - } - - iea.uncommitted.ops++ - if iea.flushingUncommitted { - iea.uncommittedEA.AddEdit(key, value) - - if iea.uncommitted.ops-iea.lastFlush > indexFlushThreshold { - iea.flushUncommitted() - } - } else if iea.uncommitted.ops > indexFlushThreshold { - iea.flushUncommitted() - } - return nil -} - -// Delete adds a row to be deleted when these edits are eventually applied. -func (iea *indexEditAccumulatorImpl) Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error { - if _, ok := iea.uncommitted.adds[keyHash]; ok { - delete(iea.uncommitted.adds, keyHash) - delete(iea.uncommitted.partialAdds[partialKeyHash], keyHash) - } else { - iea.uncommitted.deletes[keyHash] = &hashedTuple{key, value, partialKeyHash} - } - - iea.uncommitted.ops++ - if iea.flushingUncommitted { - iea.uncommittedEA.AddEdit(key, nil) - - if iea.uncommitted.ops-iea.lastFlush > indexFlushThreshold { - iea.flushUncommitted() - } - } else if iea.uncommitted.ops > indexFlushThreshold { - iea.flushUncommitted() - } - return nil -} - -// Has returns whether the current indexEditAccumulatorImpl contains the given key. This assumes that the given hash is for -// the given key. -func (iea *indexEditAccumulatorImpl) Has(ctx context.Context, keyHash hash.Hash, key types.Tuple) (bool, error) { - // in order of most recent changes to least recent falling back to whats in the materialized row data - orderedMods := []*inMemIndexEdits{iea.uncommitted, iea.committed} - for _, mods := range orderedMods { - added, deleted := mods.Has(keyHash) - - if added { - return true, nil - } else if deleted { - return false, nil - } - } - - _, ok, err := iea.rowData.MaybeGetTuple(ctx, key) - return ok, err -} - -// HasPartial returns whether the current indexEditAccumulatorImpl contains the given partial key. This assumes that the -// given hash is for the given key. The hashes returned represent the hash of the returned tuple. -func (iea *indexEditAccumulatorImpl) HasPartial(ctx context.Context, idxSch schema.Schema, partialKeyHash hash.Hash, partialKey types.Tuple) ([]hashedTuple, error) { - if hasNulls, err := partialKey.Contains(types.NullValue); err != nil { - return nil, err - } else if hasNulls { // rows with NULL are considered distinct, and therefore we do not match on them - return nil, nil - } - - var err error - var matches []hashedTuple - var mapIter table.ReadCloser = noms.NewNomsRangeReader(iea.vr, idxSch, iea.rowData, []*noms.ReadRange{ - {Start: partialKey, Inclusive: true, Reverse: false, Check: noms.InRangeCheckPartial(partialKey)}}) - defer mapIter.Close(ctx) - var r row.Row - for r, err = mapIter.ReadRow(ctx); err == nil; r, err = mapIter.ReadRow(ctx) { - tplKeyVal, err := r.NomsMapKey(idxSch).Value(ctx) - if err != nil { - return nil, err - } - key := tplKeyVal.(types.Tuple) - tplValVal, err := r.NomsMapValue(idxSch).Value(ctx) - if err != nil { - return nil, err - } - val := tplValVal.(types.Tuple) - keyHash, err := key.Hash(key.Format()) - if err != nil { - return nil, err - } - matches = append(matches, hashedTuple{key, val, keyHash}) - } - - if err != io.EOF { - return nil, err - } - - // reapply partial key edits in order - orderedMods := []*inMemIndexEdits{iea.committed, iea.uncommitted} - for _, mods := range orderedMods { - for i := len(matches) - 1; i >= 0; i-- { - // If we've removed a key that's present here, remove it from the slice - if _, ok := mods.deletes[matches[i].hash]; ok { - matches[i] = matches[len(matches)-1] - matches = matches[:len(matches)-1] - } - } - for addedHash, addedTpl := range mods.partialAdds[partialKeyHash] { - matches = append(matches, hashedTuple{addedTpl, types.EmptyTuple(addedTpl.Format()), addedHash}) - } - } - return matches, nil -} - -// Commit applies the in memory edits to the list of committed in memory edits -func (iea *indexEditAccumulatorImpl) Commit(ctx context.Context, nbf *types.NomsBinFormat) error { - if iea.uncommitted.ops > 0 { - if !iea.flushingUncommitted { - // if there are uncommitted changes add them to the committed list of map edits - for _, ht := range iea.uncommitted.adds { - iea.commitEA.AddEdit(ht.key, ht.value) - } - - for _, ht := range iea.uncommitted.deletes { - iea.commitEA.AddEdit(ht.key, nil) - } - } else { - // if we were flushing to the uncommittedEA make the current uncommittedEA the active committedEA and add - // any uncommittedEA IDs that we already flushed - iea.commitEA = iea.uncommittedEA - iea.commitEAId = iea.uncommittedEAId - iea.committedEaIds.Add(iea.uncommittedEaIds.AsSlice()...) - - // reset state to not be flushing uncommitted - iea.uncommittedEA = nil - iea.uncommittedEAId = invalidEaId - iea.uncommittedEaIds = set.NewUint64Set(nil) - iea.lastFlush = 0 - iea.flushingUncommitted = false - } - - // apply in memory uncommitted changes to the committed in memory edits - iea.committed.MergeIn(iea.uncommitted) - - // initialize uncommitted to future in memory edits - iea.uncommitted = newInMemIndexEdits() - } - - return nil -} - -// Rollback rolls back in memory edits until it reaches the state represented by the savedTea -func (iea *indexEditAccumulatorImpl) Rollback(ctx context.Context) error { - // drop uncommitted ea IDs - iea.uncommittedEaIds = set.NewUint64Set(nil) - - if iea.uncommitted.ops > 0 { - iea.uncommitted = newInMemIndexEdits() - - if iea.flushingUncommitted { - _ = iea.uncommittedEA.Close(ctx) - iea.uncommittedEA = nil - iea.uncommittedEAId = invalidEaId - iea.uncommittedEaIds = set.NewUint64Set(nil) - iea.lastFlush = 0 - iea.flushingUncommitted = false - } - } - - return nil -} - -// MaterializeEdits applies the in memory edits to the row data and returns types.Map -func (iea *indexEditAccumulatorImpl) MaterializeEdits(ctx context.Context, nbf *types.NomsBinFormat) (m types.Map, err error) { - err = iea.Commit(ctx, nbf) - if err != nil { - return types.EmptyMap, err - } - - if iea.committed.ops == 0 { - return iea.rowData, nil - } - - committedEP, err := iea.commitEA.FinishedEditing(ctx) - iea.commitEA = nil - if err != nil { - return types.EmptyMap, err - } - - flushedEPs, err := iea.flusher.WaitForIDs(ctx, iea.committedEaIds) - if err != nil { - return types.EmptyMap, err - } - - eps := make([]types.EditProvider, 0, len(flushedEPs)+1) - for i := 0; i < len(flushedEPs); i++ { - eps = append(eps, flushedEPs[i].Edits) - } - eps = append(eps, committedEP) - - defer func() { - for _, ep := range eps { - _ = ep.Close(ctx) - } - }() - - accEdits, err := edits.NewEPMerger(ctx, iea.vr, eps) - if err != nil { - return types.EmptyMap, err - } - - // We are guaranteed that rowData is valid, as we process ieas sequentially. - updatedMap, _, err := types.ApplyEdits(ctx, accEdits, iea.rowData) - if err != nil { - return types.EmptyMap, err - } - - iea.rowData = updatedMap - iea.committed = newInMemIndexEdits() - iea.commitEAId = iea.accumulatorIdx - iea.accumulatorIdx++ - iea.commitEA = edits.NewAsyncSortedEditsWithDefaults(iea.vr) - iea.committedEaIds = set.NewUint64Set(nil) - iea.uncommittedEaIds = set.NewUint64Set(nil) - - return updatedMap, nil -} diff --git a/go/libraries/doltcore/table/editor/index_editor.go b/go/libraries/doltcore/table/editor/index_editor.go deleted file mode 100644 index 3a6960b20d..0000000000 --- a/go/libraries/doltcore/table/editor/index_editor.go +++ /dev/null @@ -1,254 +0,0 @@ -// Copyright 2020-2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package editor - -import ( - "context" - "fmt" - "sync" - - "github.com/dolthub/go-mysql-server/sql" - - "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/store/types" -) - -const rebuildIndexFlushInterval = 1 << 25 - -var _ error = (*uniqueKeyErr)(nil) - -// uniqueKeyErr is an error that is returned when a unique constraint has been violated. It contains the index key -// (which is the full row). -type uniqueKeyErr struct { - TableTuple types.Tuple - IndexTuple types.Tuple - IndexName string -} - -// Error implements the error interface. -func (u *uniqueKeyErr) Error() string { - keyStr, _ := formatKey(context.Background(), u.IndexTuple) - return fmt.Sprintf("duplicate unique key given: %s", keyStr) -} - -// NOTE: Regarding partial keys and full keys. For this example, let's say that our table has a primary key W, with -// non-pk columns X, Y, and Z. You then declare an index over X and Y (in that order). In the table map containing all of -// the rows for the table, each row is composed of two tuples: the first tuple is called the key, the second tuple is -// called the value. The key is the entire primary key, which in this case is Tuple (tags are ignored for this -// example). The value is the remaining three columns: Tuple. Therefore, a row in the table map is -// Row(Tuple,Tuple). -// -// The index map containing all of the rows for the index also follows this format of key and value tuples. However, -// indexes store all of the columns in the key, and have an empty value tuple. An index key contains the indexed columns -// in the order they were defined, along with any primary keys that were not defined in the index. Thus, our example key -// looks like Tuple. We refer to this key as the full key in the index context, as with the full key you can -// construct an index row, as it's simply adding an empty tuple to the value, i.e. Row(Tuple,Tuple<>). Also with -// a full key, you can find the table row that matches this index row, as the entire primary key (just W) is in the full -// key. -// -// In both the table and index maps, keys are sorted. This means that given X and Y values for the index, we can -// construct a tuple with just those values, Tuple, and find all of the rows in the table with those two values by -// the appended primary key(s). We refer to this prefix of the full key as a partial key. It's easy to think of partial -// keys as just the indexed columns (Tuple), and the full key as the partial key along with the referring primary -// key (Tuple + W = Tuple). - -// IndexEditor takes in changes to an index map and returns the updated map if changes have been made. -// This type is thread-safe, and may be used in a multi-threaded environment. -type IndexEditor struct { - nbf *types.NomsBinFormat - - idxSch schema.Schema - tblSch schema.Schema - idx schema.Index - iea IndexEditAccumulator - stack indexOperationStack - permanentErr error // If this is set then we should always return this error as the IndexEditor is no longer usable - - // This mutex blocks on each operation, so that map reads and updates are serialized - writeMutex *sync.Mutex -} - -// InsertRow adds the given row to the index. If the row already exists and the index is unique, then an error is returned. -// Otherwise, it is a no-op. -func (ie *IndexEditor) InsertRow(ctx context.Context, key, partialKey types.Tuple, value types.Tuple) error { - return ie.InsertRowWithDupCb(ctx, key, partialKey, value, func(ctx context.Context, uke *uniqueKeyErr) error { - msg, err := formatKey(context.Background(), uke.IndexTuple) - if err != nil { - return err - } - // The only secondary index that can throw unique key errors is a unique index - return sql.NewUniqueKeyErr(msg, !ie.idx.IsUnique(), nil) - }) -} - -// InsertRowWithDupCb adds the given row to the index. If the row already exists and the -// index is unique, then a uniqueKeyErr is passed to |cb|. If |cb| returns a non-nil -// error then the insert is aborted. Otherwise, the insert proceeds. -func (ie *IndexEditor) InsertRowWithDupCb(ctx context.Context, key, partialKey types.Tuple, value types.Tuple, cb func(ctx context.Context, uke *uniqueKeyErr) error) error { - keyHash, err := key.Hash(key.Format()) - if err != nil { - return err - } - partialKeyHash, err := partialKey.Hash(partialKey.Format()) - if err != nil { - return err - } - - ie.writeMutex.Lock() - defer ie.writeMutex.Unlock() - - if ie.permanentErr != nil { - return ie.permanentErr - } - - if ie.idx.IsUnique() { - if matches, err := ie.iea.HasPartial(ctx, ie.idxSch, partialKeyHash, partialKey); err != nil { - return err - } else if len(matches) > 0 { - tableTuple, err := ie.idx.ToTableTuple(ctx, matches[0].key, ie.nbf) - if err != nil { - return err - } - cause := &uniqueKeyErr{tableTuple, partialKey, ie.idx.Name()} - err = cb(ctx, cause) - if err != nil { - return err - } - } - } else { - if rowExists, err := ie.iea.Has(ctx, keyHash, key); err != nil { - return err - } else if rowExists && value.Empty() { - ie.stack.Push(true, types.EmptyTuple(key.Format()), types.EmptyTuple(key.Format()), types.EmptyTuple(value.Format())) - return nil - } - } - - err = ie.iea.Insert(ctx, keyHash, partialKeyHash, key, value) - if err != nil { - return err - } - - ie.stack.Push(true, key, partialKey, value) - return nil -} - -// DeleteRow removes the given row from the index. -func (ie *IndexEditor) DeleteRow(ctx context.Context, key, partialKey, value types.Tuple) error { - keyHash, err := key.Hash(ie.nbf) - if err != nil { - return err - } - partialKeyHash, err := partialKey.Hash(partialKey.Format()) - if err != nil { - return err - } - - ie.writeMutex.Lock() - defer ie.writeMutex.Unlock() - - if ie.permanentErr != nil { - return ie.permanentErr - } - - err = ie.iea.Delete(ctx, keyHash, partialKeyHash, key, value) - if err != nil { - return err - } - - ie.stack.Push(false, key, partialKey, value) - return nil -} - -// HasPartial returns whether the index editor has the given partial key. -func (ie *IndexEditor) HasPartial(ctx context.Context, partialKey types.Tuple) (bool, error) { - partialKeyHash, err := partialKey.Hash(partialKey.Format()) - if err != nil { - return false, err - } - - ie.writeMutex.Lock() - defer ie.writeMutex.Unlock() - - if ie.permanentErr != nil { - return false, ie.permanentErr - } - - tpls, err := ie.iea.HasPartial(ctx, ie.idxSch, partialKeyHash, partialKey) - if err != nil { - return false, err - } - return len(tpls) > 0, nil -} - -// Undo will cause the index editor to undo the last operation at the top of the stack. As Insert and Delete are called, -// they are added onto a limited-size stack, and Undo pops an operation off the top and undoes it. So if there was an -// Insert on a key, it will use Delete on that same key. The stack size is very small, therefore too many consecutive -// calls will cause the stack to empty. This should only be called in the event that an operation was performed that -// has failed for other reasons, such as an INSERT on the parent table failing on a separate index editor. In the event -// that Undo is called and there are no operations to undo OR the reverse operation fails (such as memory capacity -// reached), then we set a permanent error as the index editor is in an invalid state that cannot be corrected. -// -// We don't return an error here as Undo will only be called when there's an error in a different editor. We allow the -// user to handle that initial error, as ALL further calls to this IndexEditor will return the error set here. -func (ie *IndexEditor) Undo(ctx context.Context) { - if ie.permanentErr != nil { - return - } - - indexOp, ok := ie.stack.Pop() - if !ok { - panic(fmt.Sprintf("attempted to undo the last operation on index '%s' but failed due to an empty stack", ie.idx.Name())) - } - // If an operation succeeds and does not do anything, then an empty tuple is pushed onto the stack. - if indexOp.fullKey.Empty() { - return - } - - if indexOp.isInsert { - err := ie.DeleteRow(ctx, indexOp.fullKey, indexOp.partialKey, indexOp.value) - if err != nil { - ie.permanentErr = fmt.Errorf("index '%s' is in an invalid and unrecoverable state: "+ - "attempted to undo previous insertion but encountered the following error: %v", - ie.idx.Name(), err) - return - } - } else { - err := ie.InsertRow(ctx, indexOp.fullKey, indexOp.partialKey, indexOp.value) - if err != nil { - ie.permanentErr = fmt.Errorf("index '%s' is in an invalid and unrecoverable state: "+ - "attempted to undo previous deletion but encountered the following error: %v", - ie.idx.Name(), err) - return - } - } -} - -// Map returns a map based on the edits given, if any. -func (ie *IndexEditor) Map(ctx context.Context) (types.Map, error) { - ie.writeMutex.Lock() - defer ie.writeMutex.Unlock() - - if ie.permanentErr != nil { - return types.EmptyMap, ie.permanentErr - } - - return ie.iea.MaterializeEdits(ctx, ie.nbf) -} - -// Close is a no-op for an IndexEditor. -func (ie *IndexEditor) Close() error { - return ie.permanentErr -} diff --git a/go/libraries/doltcore/table/editor/index_operation_stack.go b/go/libraries/doltcore/table/editor/index_operation_stack.go deleted file mode 100644 index 5e942251c0..0000000000 --- a/go/libraries/doltcore/table/editor/index_operation_stack.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package editor - -import "github.com/dolthub/dolt/go/store/types" - -// indexOperationStack is a limited-size stack, intended for usage with the index editor and its undo functionality. -// As operations are added, the internal array is filled up. Once it is full, new operations replace the oldest ones. -// This reduces memory usage compared to a traditional stack with an unbounded size, as undo should always come -// immediately after an operation is added. -type indexOperationStack struct { - // entries has a length of 4 as an UPDATE on a table is a Delete & Insert on the index, so we double it for safety. - entries [4]indexOperation - // This is the index of the next item we are adding. Add at this index, then increment. - currentIndex uint64 - // Represents the number of items relative to the "stack size". - numOfItems uint64 -} - -// indexOperation is an operation performed by the index editor, along with the key used. -type indexOperation struct { - isInsert bool - fullKey types.Tuple - partialKey types.Tuple - value types.Tuple -} - -// Push adds the given keys to the top of the stack. -func (ios *indexOperationStack) Push(isInsert bool, fullKey, partialKey, value types.Tuple) { - ios.entries[ios.currentIndex].isInsert = isInsert - ios.entries[ios.currentIndex].fullKey = fullKey - ios.entries[ios.currentIndex].partialKey = partialKey - ios.entries[ios.currentIndex].value = value - ios.currentIndex = (ios.currentIndex + 1) % uint64(len(ios.entries)) - ios.numOfItems++ - if ios.numOfItems > uint64(len(ios.entries)) { - ios.numOfItems = uint64(len(ios.entries)) - } -} - -// Pop removes and returns the keys from the top of the stack. Returns false if the stack is empty. -func (ios *indexOperationStack) Pop() (indexOperation, bool) { - if ios.numOfItems == 0 { - return indexOperation{}, false - } - ios.numOfItems-- - ios.currentIndex = (ios.currentIndex - 1) % uint64(len(ios.entries)) - return ios.entries[ios.currentIndex], true -} diff --git a/go/libraries/doltcore/table/editor/index_operation_stack_test.go b/go/libraries/doltcore/table/editor/index_operation_stack_test.go deleted file mode 100644 index 7810240c9d..0000000000 --- a/go/libraries/doltcore/table/editor/index_operation_stack_test.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package editor - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/dolthub/dolt/go/store/types" -) - -func TestIndexOperationStack(t *testing.T) { - ios := &indexOperationStack{} - require.True(t, len(ios.entries) >= 2) // Entries should always at least have a length of 2 - - ios.Push(true, iosTuple(t, 100, 100), iosTuple(t, 100), iosTuple(t, 0)) - entry, ok := ios.Pop() - require.True(t, ok) - iosTupleComp(t, entry.fullKey, 100, 100) - iosTupleComp(t, entry.partialKey, 100) - iosTupleComp(t, entry.value, 0) - require.True(t, entry.isInsert) - _, ok = ios.Pop() - require.False(t, ok) - - for i := 0; i < len(ios.entries); i++ { - ios.Push(false, iosTuple(t, i, i), iosTuple(t, i), iosTuple(t, i*2)) - } - for i := len(ios.entries) - 1; i >= 0; i-- { - entry, ok = ios.Pop() - require.True(t, ok) - iosTupleComp(t, entry.fullKey, i, i) - iosTupleComp(t, entry.partialKey, i) - iosTupleComp(t, entry.partialKey, i*2) - require.False(t, entry.isInsert) - } - _, ok = ios.Pop() - require.False(t, ok) - - for i := 0; i < (len(ios.entries)*2)+1; i++ { - ios.Push(true, iosTuple(t, i, i), iosTuple(t, i), iosTuple(t, i*2)) - } - for i := len(ios.entries) - 1; i >= 0; i-- { - entry, ok = ios.Pop() - require.True(t, ok) - val := ((len(ios.entries) * 2) + 1) - i - iosTupleComp(t, entry.fullKey, val, val) - iosTupleComp(t, entry.partialKey, val) - iosTupleComp(t, entry.value, val*2) - require.True(t, entry.isInsert) - } - _, ok = ios.Pop() - require.False(t, ok) -} - -func iosTuple(t *testing.T, vals ...int) types.Tuple { - typeVals := make([]types.Value, len(vals)) - for i, val := range vals { - typeVals[i] = types.Int(val) - } - tpl, err := types.NewTuple(types.Format_Default, typeVals...) - if err != nil { - require.NoError(t, err) - } - return tpl -} - -func iosTupleComp(t *testing.T, tpl types.Tuple, vals ...int) bool { - if tpl.Len() != uint64(len(vals)) { - return false - } - iter, err := tpl.Iterator() - require.NoError(t, err) - var i uint64 - var val types.Value - for i, val, err = iter.Next(); i < uint64(len(vals)) && err == nil; i, val, err = iter.Next() { - if !types.Int(vals[i]).Equals(val) { - return false - } - } - require.NoError(t, err) - return true -} diff --git a/go/libraries/doltcore/table/typed/noms/doc.go b/go/libraries/doltcore/table/typed/noms/doc.go deleted file mode 100644 index bcc3bbaa85..0000000000 --- a/go/libraries/doltcore/table/typed/noms/doc.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2019 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package nbf provides TableReadCloser and TableWriteCloser implementations for working with dolt tables in noms. -package noms diff --git a/go/libraries/doltcore/table/typed/noms/range_reader.go b/go/libraries/doltcore/table/typed/noms/range_reader.go deleted file mode 100644 index f6dccb0e83..0000000000 --- a/go/libraries/doltcore/table/typed/noms/range_reader.go +++ /dev/null @@ -1,333 +0,0 @@ -// Copyright 2020 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package noms - -import ( - "context" - "errors" - "fmt" - "io" - - "github.com/dolthub/dolt/go/libraries/doltcore/row" - "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/store/types" -) - -// InRangeCheck evaluates tuples to determine whether they are valid and/or should be skipped. -type InRangeCheck interface { - // Check is a call made as the reader reads through values to check that the next value either being read is valid - // and whether it should be skipped or returned. - Check(ctx context.Context, vr types.ValueReader, tuple types.Tuple) (valid bool, skip bool, err error) -} - -// InRangeCheckAlways will always return that the given tuple is valid and not to be skipped. -type InRangeCheckAlways struct{} - -func (InRangeCheckAlways) Check(context.Context, types.ValueReader, types.Tuple) (valid bool, skip bool, err error) { - return true, false, nil -} - -func (InRangeCheckAlways) String() string { - return "Always" -} - -// InRangeCheckNever will always return that the given tuple is not valid. -type InRangeCheckNever struct{} - -func (InRangeCheckNever) Check(context.Context, types.ValueReader, types.Tuple) (valid bool, skip bool, err error) { - return false, false, nil -} - -func (InRangeCheckNever) String() string { - return "Never" -} - -// InRangeCheckPartial will check if the given tuple contains the aliased tuple as a partial key. -type InRangeCheckPartial types.Tuple - -func (ircp InRangeCheckPartial) Check(_ context.Context, vr types.ValueReader, t types.Tuple) (valid bool, skip bool, err error) { - return t.StartsWith(types.Tuple(ircp)), false, nil -} - -func (ircp InRangeCheckPartial) String() string { - return fmt.Sprintf("StartsWith(%v)", types.Tuple(ircp).HumanReadableString()) -} - -// ReadRange represents a range of values to be read -type ReadRange struct { - // Start is a Dolt map key which is the starting point (or ending point if Reverse is true) - Start types.Tuple - // Inclusive says whether the Start key should be included in the range. - Inclusive bool - // Reverse says if the range should be read in reverse (from high to low) instead of the default (low to high) - Reverse bool - // Check is a callb made as the reader reads through values to check that the next value being read is in the range. - Check InRangeCheck -} - -func (rr *ReadRange) String() string { - return fmt.Sprintf("ReadRange[Start: %v, Inclusive: %t, Reverse %t, Check: %v]", rr.Start.HumanReadableString(), rr.Inclusive, rr.Reverse, rr.Check) -} - -// NewRangeEndingAt creates a range with a starting key which will be iterated in reverse -func NewRangeEndingAt(key types.Tuple, inRangeCheck InRangeCheck) *ReadRange { - return &ReadRange{ - Start: key, - Inclusive: true, - Reverse: true, - Check: inRangeCheck, - } -} - -// NewRangeEndingBefore creates a range starting before the provided key iterating in reverse -func NewRangeEndingBefore(key types.Tuple, inRangeCheck InRangeCheck) *ReadRange { - return &ReadRange{ - Start: key, - Inclusive: false, - Reverse: true, - Check: inRangeCheck, - } -} - -// NewRangeStartingAt creates a range with a starting key -func NewRangeStartingAt(key types.Tuple, inRangeCheck InRangeCheck) *ReadRange { - return &ReadRange{ - Start: key, - Inclusive: true, - Reverse: false, - Check: inRangeCheck, - } -} - -// NewRangeStartingAfter creates a range starting after the provided key -func NewRangeStartingAfter(key types.Tuple, inRangeCheck InRangeCheck) *ReadRange { - return &ReadRange{ - Start: key, - Inclusive: false, - Reverse: false, - Check: inRangeCheck, - } -} - -// NomsRangeReader reads values in one or more ranges from a map -type NomsRangeReader struct { - vr types.ValueReader - sch schema.Schema - m types.Map - ranges []*ReadRange - idx int - itr types.MapIterator - currCheck InRangeCheck - cardCounter *CardinalityCounter -} - -// NewNomsRangeReader creates a NomsRangeReader -func NewNomsRangeReader(vr types.ValueReader, sch schema.Schema, m types.Map, ranges []*ReadRange) *NomsRangeReader { - return &NomsRangeReader{ - vr, - sch, - m, - ranges, - 0, - nil, - nil, - NewCardinalityCounter(), - } -} - -// GetSchema gets the schema of the rows being read. -func (nrr *NomsRangeReader) GetSchema() schema.Schema { - return nrr.sch -} - -// ReadRow reads a row from a table. If there is a bad row the returned error will be non nil, and calling -// IsBadRow(err) will be return true. This is a potentially non-fatal error and callers can decide if they want to -// continue on a bad row, or fail. -func (nrr *NomsRangeReader) ReadRow(ctx context.Context) (row.Row, error) { - k, v, err := nrr.ReadKV(ctx) - - if err != nil { - return nil, err - } - - return row.FromNoms(nrr.sch, k, v) -} - -func (nrr *NomsRangeReader) ReadKey(ctx context.Context) (types.Tuple, error) { - k, _, err := nrr.ReadKV(ctx) - - return k, err -} - -func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Tuple, types.Tuple, error) { - var err error - var k types.Tuple - var v types.Tuple - for nrr.itr != nil || nrr.idx < len(nrr.ranges) { - if !nrr.cardCounter.empty() { - if nrr.cardCounter.done() { - nrr.cardCounter.reset() - } else { - return nrr.cardCounter.next() - } - } - - if nrr.itr == nil { - r := nrr.ranges[nrr.idx] - nrr.idx++ - - if r.Reverse { - nrr.itr, err = nrr.m.IteratorBackFrom(ctx, r.Start) - } else { - nrr.itr, err = nrr.m.IteratorFrom(ctx, r.Start) - } - if err != nil { - return types.Tuple{}, types.Tuple{}, err - } - - nrr.currCheck = r.Check - - k, v, err = nrr.itr.NextTuple(ctx) - - if err == nil && !r.Inclusive { - var res int - res, err = r.Start.Compare(ctx, nrr.vr.Format(), k) - if err == nil && res == 0 { - k, v, err = nrr.itr.NextTuple(ctx) - } - } - } else { - k, v, err = nrr.itr.NextTuple(ctx) - } - - if err != nil && err != io.EOF { - return types.Tuple{}, types.Tuple{}, err - } - - if err != io.EOF { - valid, skip, err := nrr.currCheck.Check(ctx, nrr.vr, k) - if err != nil { - return types.Tuple{}, types.Tuple{}, err - } - - if valid { - if skip { - continue - } - if !v.Empty() { - nrr.cardCounter.updateWithKV(k, v) - if !nrr.cardCounter.empty() && !nrr.cardCounter.done() { - return nrr.cardCounter.next() - } - } - return k, v, nil - } - } - - nrr.itr = nil - nrr.currCheck = nil - } - - return types.Tuple{}, types.Tuple{}, io.EOF -} - -// VerifySchema checks that the incoming schema matches the schema from the existing table -func (nrr *NomsRangeReader) VerifySchema(outSch schema.Schema) (bool, error) { - return schema.VerifyInSchema(nrr.sch, outSch) -} - -// Close should release resources being held -func (nrr *NomsRangeReader) Close(ctx context.Context) error { - return nil -} - -type CardinalityCounter struct { - key *types.Tuple - value *types.Tuple - card int - idx int -} - -func NewCardinalityCounter() *CardinalityCounter { - return &CardinalityCounter{ - nil, - nil, - -1, - -1, - } -} - -func (cc *CardinalityCounter) updateWithKV(k, v types.Tuple) error { - if !v.Empty() { - cardTagVal, err := v.Get(0) - if err != nil { - return err - } - cardTag, ok := cardTagVal.(types.Uint) - if !ok { - return errors.New("index cardinality invalid tag type") - } - - if uint64(cardTag) != schema.KeylessRowCardinalityTag { - return errors.New("index cardinality tag invalid") - } - - cardVal, err := v.Get(1) - if err != nil { - return err - } - card, ok := cardVal.(types.Uint) - if !ok { - return errors.New("index cardinality value invalid type") - } - if int(card) > 1 { - cc.card = int(card) - cc.idx = 0 - cc.key = &k - cc.value = &v - return nil - } else { - cc.card = -1 - cc.idx = -1 - cc.key = nil - cc.value = nil - } - } - return nil -} - -func (cc *CardinalityCounter) empty() bool { - return cc.key == nil || cc.value == nil -} - -func (cc *CardinalityCounter) done() bool { - return cc.card < 1 || cc.idx >= cc.card -} - -func (cc *CardinalityCounter) next() (types.Tuple, types.Tuple, error) { - if cc.key == nil || cc.value == nil { - return types.Tuple{}, types.Tuple{}, errors.New("cannot increment empty cardinality counter") - } - cc.idx++ - return *cc.key, *cc.value, nil - -} - -func (cc *CardinalityCounter) reset() { - cc.card = -1 - cc.idx = -1 - cc.key = nil - cc.value = nil -} diff --git a/go/libraries/doltcore/table/typed/noms/range_reader_test.go b/go/libraries/doltcore/table/typed/noms/range_reader_test.go deleted file mode 100644 index 1b09dbcc54..0000000000 --- a/go/libraries/doltcore/table/typed/noms/range_reader_test.go +++ /dev/null @@ -1,204 +0,0 @@ -// Copyright 2019 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package noms - -import ( - "context" - "io" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/store/chunks" - "github.com/dolthub/dolt/go/store/types" -) - -var rangeReaderTests = []struct { - name string - ranges []*ReadRange - expectKeys []int64 -}{ - { - "test range ending at", - []*ReadRange{NewRangeEndingAt(mustTuple(10), greaterThanCheck(2))}, - []int64{10, 8, 6, 4}, - }, - { - "test range ending before", - []*ReadRange{NewRangeEndingBefore(mustTuple(10), greaterThanCheck(2))}, - []int64{8, 6, 4}, - }, - { - "test range starting at", - []*ReadRange{NewRangeStartingAt(mustTuple(10), lessThanCheck(20))}, - []int64{10, 12, 14, 16, 18}, - }, - { - "test range starting after", - []*ReadRange{NewRangeStartingAfter(mustTuple(10), lessThanCheck(20))}, - []int64{12, 14, 16, 18}, - }, - { - "test range iterating to the end", - []*ReadRange{NewRangeStartingAt(mustTuple(100), lessThanCheck(200))}, - []int64{100}, - }, - { - "test multiple ranges", - []*ReadRange{ - NewRangeEndingBefore(mustTuple(10), greaterThanCheck(2)), - NewRangeStartingAt(mustTuple(10), lessThanCheck(20)), - }, - []int64{8, 6, 4, 10, 12, 14, 16, 18}, - }, - { - "test empty range starting after", - []*ReadRange{NewRangeStartingAfter(mustTuple(100), lessThanCheck(200))}, - []int64(nil), - }, - { - "test empty range starting at", - []*ReadRange{NewRangeStartingAt(mustTuple(101), lessThanCheck(200))}, - []int64(nil), - }, - { - "test empty range ending before", - []*ReadRange{NewRangeEndingBefore(mustTuple(0), greaterThanCheck(-100))}, - []int64(nil), - }, - { - "test empty range ending at", - []*ReadRange{NewRangeEndingAt(mustTuple(-1), greaterThanCheck(-100))}, - []int64(nil), - }, -} - -func mustTuple(id int64) types.Tuple { - t, err := types.NewTuple(types.Format_Default, types.Uint(pkTag), types.Int(id)) - - if err != nil { - panic(err) - } - - return t -} - -func TestRangeReader(t *testing.T) { - ctx := context.Background() - colColl := schema.NewColCollection( - schema.NewColumn("id", pkTag, types.IntKind, true), - schema.NewColumn("val", valTag, types.IntKind, false)) - - sch, err := schema.SchemaFromCols(colColl) - require.NoError(t, err) - - storage := &chunks.MemoryStorage{} - vrw := types.NewValueStore(storage.NewView()) - m, err := types.NewMap(ctx, vrw) - assert.NoError(t, err) - - me := m.Edit() - for i := 0; i <= 100; i += 2 { - k, err := types.NewTuple(vrw.Format(), types.Uint(pkTag), types.Int(i)) - require.NoError(t, err) - - v, err := types.NewTuple(vrw.Format(), types.Uint(valTag), types.Int(100-i)) - require.NoError(t, err) - - me.Set(k, v) - } - - m, err = me.Map(ctx) - assert.NoError(t, err) - - for _, test := range rangeReaderTests { - t.Run(test.name, func(t *testing.T) { - ctx := context.Background() - rd := NewNomsRangeReader(vrw, sch, m, test.ranges) - - var keys []int64 - for { - r, err := rd.ReadRow(ctx) - - if err == io.EOF { - break - } - - assert.NoError(t, err) - col0, ok := r.GetColVal(0) - assert.True(t, ok) - - keys = append(keys, int64(col0.(types.Int))) - } - - err = rd.Close(ctx) - assert.NoError(t, err) - - assert.Equal(t, test.expectKeys, keys) - }) - } -} - -func TestRangeReaderOnEmptyMap(t *testing.T) { - ctx := context.Background() - colColl := schema.NewColCollection( - schema.NewColumn("id", pkTag, types.IntKind, true), - schema.NewColumn("val", valTag, types.IntKind, false)) - - sch, err := schema.SchemaFromCols(colColl) - require.NoError(t, err) - - storage := &chunks.MemoryStorage{} - vrw := types.NewValueStore(storage.NewView()) - m, err := types.NewMap(ctx, vrw) - assert.NoError(t, err) - - for _, test := range rangeReaderTests { - t.Run(test.name, func(t *testing.T) { - ctx := context.Background() - rd := NewNomsRangeReader(vrw, sch, m, test.ranges) - - r, err := rd.ReadRow(ctx) - assert.Equal(t, io.EOF, err) - assert.Nil(t, r) - }) - } -} - -type greaterThanCheck int64 - -func (n greaterThanCheck) Check(ctx context.Context, _ types.ValueReader, k types.Tuple) (valid bool, skip bool, err error) { - col0, err := k.Get(1) - - if err != nil { - panic(err) - } - - return int64(col0.(types.Int)) > int64(n), false, nil -} - -type lessThanCheck int64 - -func (n lessThanCheck) Check(ctx context.Context, _ types.ValueReader, k types.Tuple) (valid bool, skip bool, err error) { - col0, err := k.Get(1) - - if err != nil { - panic(err) - } - - return int64(col0.(types.Int)) < int64(n), false, nil -} diff --git a/go/libraries/doltcore/table/typed/noms/reader.go b/go/libraries/doltcore/table/typed/noms/reader.go deleted file mode 100644 index 674cc76948..0000000000 --- a/go/libraries/doltcore/table/typed/noms/reader.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2019 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package noms - -import ( - "context" - "io" - - "github.com/dolthub/dolt/go/libraries/doltcore/row" - "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/store/types" -) - -type StatsCB func(stats types.AppliedEditStats) - -// NomsMapReader is a TableReader that reads rows from a noms table which is stored in a types.Map where the key is -// a types.Value and the value is a types.Tuple of field values. -type NomsMapReader struct { - sch schema.Schema - itr types.MapIterator -} - -// NewNomsMapReader creates a NomsMapReader for a given noms types.Map -func NewNomsMapReader(ctx context.Context, m types.Map, sch schema.Schema) (*NomsMapReader, error) { - itr, err := m.Iterator(ctx) - - if err != nil { - return nil, err - } - - return &NomsMapReader{sch, itr}, nil -} - -// GetSchema gets the schema of the rows that this reader will return -func (nmr *NomsMapReader) GetSchema() schema.Schema { - return nmr.sch -} - -// ReadRow reads a row from a table. If there is a bad row the returned error will be non nil, and callin IsBadRow(err) -// will be return true. This is a potentially non-fatal error and callers can decide if they want to continue on a bad row, or fail. -func (nmr *NomsMapReader) ReadRow(ctx context.Context) (row.Row, error) { - key, val, err := nmr.itr.Next(ctx) - - if err != nil { - return nil, err - } else if key == nil { - return nil, io.EOF - } - - return row.FromNoms(nmr.sch, key.(types.Tuple), val.(types.Tuple)) -} - -// Close should release resources being held -func (nmr *NomsMapReader) Close(ctx context.Context) error { - nmr.itr = nil - return nil -} - -// VerifySchema checks that the incoming schema matches the schema from the existing table -func (nmr *NomsMapReader) VerifySchema(outSch schema.Schema) (bool, error) { - return schema.VerifyInSchema(nmr.sch, outSch) -} diff --git a/go/libraries/doltcore/table/typed/noms/reader_for_keys.go b/go/libraries/doltcore/table/typed/noms/reader_for_keys.go deleted file mode 100644 index f4ec11c550..0000000000 --- a/go/libraries/doltcore/table/typed/noms/reader_for_keys.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2020 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package noms - -import ( - "context" - "io" - - "github.com/dolthub/dolt/go/libraries/doltcore/row" - "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/store/types" -) - -// KeyIterator is an interface for iterating through a collection of keys -type KeyIterator interface { - // Next returns the next key in the collection. When all keys are exhausted nil, io.EOF must be returned. - Next() (types.Value, error) -} - -// SliceOfKeysIterator is a KeyIterator implementation backed by a slice of keys which are iterated in order -type SliceOfKeysIterator struct { - keys []types.Tuple - idx int -} - -// Next returns the next key in the slice. When all keys are exhausted nil, io.EOF is be returned. -func (sokItr *SliceOfKeysIterator) Next() (types.Value, error) { - if sokItr.idx < len(sokItr.keys) { - k := sokItr.keys[sokItr.idx] - sokItr.idx++ - - return k, nil - } - - return nil, io.EOF -} - -// NomsMapReaderForKeys implements TableReadCloser -type NomsMapReaderForKeys struct { - sch schema.Schema - m types.Map - keyItr KeyIterator -} - -// NewNomsMapReaderForKeys creates a NomsMapReaderForKeys for a given noms types.Map, and a list of keys -func NewNomsMapReaderForKeys(m types.Map, sch schema.Schema, keys []types.Tuple) *NomsMapReaderForKeys { - return NewNomsMapReaderForKeyItr(m, sch, &SliceOfKeysIterator{keys, 0}) -} - -// NewNomsMapReaderForKeyItr creates a NomsMapReaderForKeys for a given noms types.Map, and a list of keys -func NewNomsMapReaderForKeyItr(m types.Map, sch schema.Schema, keyItr KeyIterator) *NomsMapReaderForKeys { - return &NomsMapReaderForKeys{sch, m, keyItr} -} - -// GetSchema gets the schema of the rows being read. -func (nmr *NomsMapReaderForKeys) GetSchema() schema.Schema { - return nmr.sch -} - -// ReadRow reads a row from a table. If there is a bad row the returned error will be non nil, and calling -// IsBadRow(err) will be return true. This is a potentially non-fatal error and callers can decide if they want to -// continue on a bad row, or fail. -func (nmr *NomsMapReaderForKeys) ReadRow(ctx context.Context) (row.Row, error) { - var key types.Value - var value types.Value - var err error - for value == nil { - key, err = nmr.keyItr.Next() - - if err != nil { - return nil, err - } - - v, ok, err := nmr.m.MaybeGet(ctx, key) - - if err != nil { - return nil, err - } - - if ok { - value = v - } - } - - return row.FromNoms(nmr.sch, key.(types.Tuple), value.(types.Tuple)) -} - -// VerifySchema checks that the incoming schema matches the schema from the existing table -func (nmr *NomsMapReaderForKeys) VerifySchema(outSch schema.Schema) (bool, error) { - return schema.VerifyInSchema(nmr.sch, outSch) -} - -// Close should release resources being held -func (nmr *NomsMapReaderForKeys) Close(ctx context.Context) error { - return nil -} diff --git a/go/libraries/doltcore/table/typed/noms/reader_for_keys_test.go b/go/libraries/doltcore/table/typed/noms/reader_for_keys_test.go deleted file mode 100644 index 85e1f687a7..0000000000 --- a/go/libraries/doltcore/table/typed/noms/reader_for_keys_test.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2020 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package noms - -import ( - "context" - "io" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/dolthub/dolt/go/libraries/doltcore/row" - "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/store/chunks" - "github.com/dolthub/dolt/go/store/types" -) - -const ( - pkTag uint64 = iota - valTag -) - -func TestReaderForKeys(t *testing.T) { - ctx := context.Background() - colColl := schema.NewColCollection( - schema.NewColumn("id", pkTag, types.IntKind, true), - schema.NewColumn("val", valTag, types.IntKind, false)) - - sch, err := schema.SchemaFromCols(colColl) - require.NoError(t, err) - - storage := &chunks.MemoryStorage{} - vrw := types.NewValueStore(storage.NewView()) - m, err := types.NewMap(ctx, vrw) - assert.NoError(t, err) - - me := m.Edit() - for i := 0; i <= 100; i += 2 { - k, err := types.NewTuple(vrw.Format(), types.Uint(pkTag), types.Int(i)) - require.NoError(t, err) - - v, err := types.NewTuple(vrw.Format(), types.Uint(valTag), types.Int(100-i)) - require.NoError(t, err) - - me.Set(k, v) - } - - m, err = me.Map(ctx) - assert.NoError(t, err) - - tests := []struct { - name string - keys []int - expected []int - }{ - { - name: "tens", - keys: []int{10, 20, 30, 40, 50, 60, 70, 80, 90}, - expected: []int{10, 20, 30, 40, 50, 60, 70, 80, 90}, - }, - { - name: "fives", - keys: []int{5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95}, - expected: []int{10, 20, 30, 40, 50, 60, 70, 80, 90}, - }, - { - name: "empty", - keys: []int{}, - expected: []int{}, - }, - { - name: "no keys that are in the map", - keys: []int{-5, -3, -1, 1, 3, 5, 102, 104, 106}, - expected: []int{}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - ctx := context.Background() - rd := NewNomsMapReaderForKeys(m, sch, intKeysToTupleKeys(t, vrw.Format(), test.keys)) - - var rows []row.Row - for { - r, err := rd.ReadRow(ctx) - - if err == io.EOF { - break - } - - assert.NoError(t, err) - rows = append(rows, r) - } - - testAgainstExpected(t, rows, test.expected) - rd.Close(ctx) - }) - } -} - -func intKeysToTupleKeys(t *testing.T, nbf *types.NomsBinFormat, keys []int) []types.Tuple { - tupleKeys := make([]types.Tuple, len(keys)) - - for i, key := range keys { - tuple, err := types.NewTuple(nbf, types.Uint(pkTag), types.Int(key)) - require.NoError(t, err) - - tupleKeys[i] = tuple - } - - return tupleKeys -} - -func testAgainstExpected(t *testing.T, rows []row.Row, expected []int) { - assert.Equal(t, len(expected), len(rows)) - for i, r := range rows { - k, ok := r.GetColVal(pkTag) - require.True(t, ok) - v, ok := r.GetColVal(valTag) - require.True(t, ok) - - kn := int(k.(types.Int)) - vn := int(v.(types.Int)) - - expectedK := expected[i] - expectedV := 100 - expectedK - - assert.Equal(t, expectedK, kn) - assert.Equal(t, expectedV, vn) - } -} diff --git a/go/libraries/doltcore/table/untyped/xlsx/marshaling.go b/go/libraries/doltcore/table/untyped/xlsx/marshaling.go index 3f9d03ab20..f8b151aa94 100644 --- a/go/libraries/doltcore/table/untyped/xlsx/marshaling.go +++ b/go/libraries/doltcore/table/untyped/xlsx/marshaling.go @@ -27,21 +27,6 @@ import ( var ErrTableNameMatchSheetName = errors.New("table name must match excel sheet name.") -func UnmarshalFromXLSX(path string) ([][][]string, error) { - data, err := openFile(path) - - if err != nil { - return nil, err - } - - dataSlice, err := data.ToSlice() - if err != nil { - return nil, err - } - - return dataSlice, nil -} - func openFile(path string) (*xlsx.File, error) { data, err := xlsx.OpenFile(path) diff --git a/go/libraries/doltcore/table/untyped/xlsx/reader.go b/go/libraries/doltcore/table/untyped/xlsx/reader.go index 1460a47876..16dab2815e 100644 --- a/go/libraries/doltcore/table/untyped/xlsx/reader.go +++ b/go/libraries/doltcore/table/untyped/xlsx/reader.go @@ -41,35 +41,6 @@ type XLSXReader struct { vrw types.ValueReadWriter } -func OpenXLSXReaderFromBinary(ctx context.Context, vrw types.ValueReadWriter, r io.ReadCloser, info *XLSXFileInfo) (*XLSXReader, error) { - br := bufio.NewReaderSize(r, ReadBufSize) - - contents, err := io.ReadAll(r) - if err != nil { - return nil, err - } - - colStrs, err := getColHeadersFromBinary(contents, info.SheetName) - if err != nil { - return nil, err - } - - data, err := getXlsxRowsFromBinary(contents, info.SheetName) - if err != nil { - return nil, err - } - - _, sch := untyped.NewUntypedSchema(colStrs...) - - decodedRows, err := decodeXLSXRows(data, sch) - if err != nil { - r.Close() - return nil, err - } - - return &XLSXReader{r, br, info, sch, 0, decodedRows, vrw}, nil -} - func OpenXLSXReader(ctx context.Context, vrw types.ValueReadWriter, path string, fs filesys.ReadableFS, info *XLSXFileInfo) (*XLSXReader, error) { r, err := fs.OpenForRead(path) @@ -110,16 +81,6 @@ func getColHeadersFromPath(path string, sheetName string) ([]string, error) { return colHeaders, nil } -func getColHeadersFromBinary(content []byte, sheetName string) ([]string, error) { - data, err := getXlsxRowsFromBinary(content, sheetName) - if err != nil { - return nil, err - } - - colHeaders := data[0][0] - return colHeaders, nil -} - // GetSchema gets the schema of the rows that this reader will return func (xlsxr *XLSXReader) GetSchema() schema.Schema { return xlsxr.sch diff --git a/go/libraries/utils/buffer/buffer.go b/go/libraries/utils/buffer/buffer.go deleted file mode 100644 index 9fb3216016..0000000000 --- a/go/libraries/utils/buffer/buffer.go +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package buffer - -import ( - "io" - - "github.com/dolthub/dolt/go/libraries/utils/iohelp" -) - -type DynamicBuffer struct { - blocks [][]byte - blockSize int -} - -func New(blockSize int) *DynamicBuffer { - return &DynamicBuffer{blockSize: blockSize} -} - -func (buf *DynamicBuffer) Append(bytes []byte) { - blockIdx := len(buf.blocks) - 1 - - var space int - var pos int - if blockIdx >= 0 { - currBlock := buf.blocks[blockIdx] - pos = len(currBlock) - space = cap(currBlock) - pos - } - - for len(bytes) > 0 { - if space == 0 { - for len(bytes) >= buf.blockSize { - buf.blocks = append(buf.blocks, bytes[:buf.blockSize]) - bytes = bytes[buf.blockSize:] - blockIdx++ - } - - if len(bytes) == 0 { - return - } - - buf.blocks = append(buf.blocks, make([]byte, 0, buf.blockSize)) - pos = 0 - space = buf.blockSize - blockIdx++ - } - - n := len(bytes) - if n > space { - n = space - } - - buf.blocks[blockIdx] = buf.blocks[blockIdx][:pos+n] - copy(buf.blocks[blockIdx][pos:], bytes[:n]) - bytes = bytes[n:] - space -= n - pos += n - } -} - -func (buf *DynamicBuffer) Close() *BufferIterator { - itr := &BufferIterator{blocks: buf.blocks} - buf.blocks = nil - - return itr -} - -type BufferIterator struct { - blocks [][]byte - i int -} - -func (itr *BufferIterator) Next() ([]byte, error) { - if itr.i >= len(itr.blocks) { - return nil, io.EOF - } - next := itr.blocks[itr.i] - itr.i++ - - return next, nil -} - -func (itr *BufferIterator) NumBlocks() int { - return len(itr.blocks) -} - -func (itr *BufferIterator) FlushTo(wr io.Writer) error { - for { - data, err := itr.Next() - - if err == io.EOF { - return nil - } else if err != nil { - return err - } - - err = iohelp.WriteAll(wr, data) - - if err != nil { - return err - } - } -} - -func (itr *BufferIterator) AsReader() io.Reader { - return &bufferIteratorReader{ - itr: itr, - } -} - -type bufferIteratorReader struct { - itr *BufferIterator - currBuff []byte -} - -func (b *bufferIteratorReader) Read(dest []byte) (n int, err error) { - if len(b.currBuff) == 0 { - b.currBuff, err = b.itr.Next() - - if err != nil { - return 0, err - } - } - - destSize := len(dest) - toCopy := b.currBuff - if len(b.currBuff) > destSize { - toCopy = b.currBuff[:destSize] - } - - n = copy(dest, toCopy) - b.currBuff = b.currBuff[n:] - - return n, err -} diff --git a/go/libraries/utils/buffer/buffer_test.go b/go/libraries/utils/buffer/buffer_test.go deleted file mode 100644 index 003501da98..0000000000 --- a/go/libraries/utils/buffer/buffer_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package buffer - -import ( - "bytes" - "math/rand" - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestDynamicBuffer(t *testing.T) { - const blockSize = 53 - - rand := rand.New(rand.NewSource(time.Now().UnixNano())) - for i := 0; i < 100; i++ { - n := 1000 + rand.Int63()%10000 - t.Run(strconv.FormatInt(n, 10), func(t *testing.T) { - data := make([]byte, n) - read, err := rand.Read(data) - require.NoError(t, err) - require.Equal(t, int(n), read) - - buf := New(blockSize) - buf.Append(data) - itr := buf.Close() - - reassembled := bytes.NewBuffer(nil) - err = itr.FlushTo(reassembled) - require.NoError(t, err) - require.Equal(t, data, reassembled.Bytes()) - }) - } -} diff --git a/go/store/blobstore/oci.go b/go/store/blobstore/oci.go index c9a030d0a2..6e0749fedc 100644 --- a/go/store/blobstore/oci.go +++ b/go/store/blobstore/oci.go @@ -22,7 +22,6 @@ import ( "io" "math" "net/http" - "os" "path" "github.com/oracle/oci-go-sdk/v65/common" @@ -44,23 +43,6 @@ type toUpload struct { type uploadFunc func(ctx context.Context, objectName, uploadID string, partNumber int, contentLength int64, reader io.Reader) (objectstorage.CommitMultipartUploadPartDetails, error) -type tempLocalObject struct { - f *os.File - path string -} - -var _ io.ReadCloser = &tempLocalObject{} - -func (t *tempLocalObject) Read(p []byte) (int, error) { - return t.f.Read(p) -} - -func (t *tempLocalObject) Close() error { - err := t.f.Close() - os.Remove(t.path) - return err -} - // OCIBlobstore provides an OCI implementation of the Blobstore interface type OCIBlobstore struct { provider common.ConfigurationProvider diff --git a/go/store/config/resolver.go b/go/store/config/resolver.go index 7a733e862e..914cbdfd07 100644 --- a/go/store/config/resolver.go +++ b/go/store/config/resolver.go @@ -26,7 +26,6 @@ import ( "fmt" "strings" - "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/spec" @@ -140,16 +139,6 @@ func (r *Resolver) GetDatabase(ctx context.Context, str string) (datas.Database, return sp.GetDatabase(ctx), sp.GetVRW(ctx), sp.GetNodeStore(ctx), nil } -// Resolve string to a chunkstore. Like ResolveDatabase, but returns the underlying ChunkStore -func (r *Resolver) GetChunkStore(ctx context.Context, str string) (chunks.ChunkStore, error) { - dbc := r.DbConfigForDbSpec(str) - sp, err := spec.ForDatabaseOpts(r.verbose(ctx, str, dbc.Url), specOptsForConfig(r.config, dbc)) - if err != nil { - return nil, err - } - return sp.NewChunkStore(ctx), nil -} - // Resolve string to a dataset. If a config is present, // - if no db prefix is present, assume the default db // - if the db prefix is an alias, replace it @@ -178,9 +167,3 @@ func (r *Resolver) GetPath(ctx context.Context, str string) (datas.Database, typ return sp.GetDatabase(ctx), sp.GetVRW(ctx), value, nil } - -// GetDatabaseSpecForPath returns the database and a VRW for the path given, but does not attempt to load a value -func (r *Resolver) GetDatabaseSpecForPath(ctx context.Context, str string) (spec.Spec, error) { - specStr, dbc := r.ResolvePathSpecAndGetDbConfig(str) - return spec.ForPathOpts(r.verbose(ctx, str, specStr), specOptsForConfig(r.config, dbc)) -} diff --git a/go/store/datas/commit.go b/go/store/datas/commit.go index 02e71885e7..4d32b474bd 100644 --- a/go/store/datas/commit.go +++ b/go/store/datas/commit.go @@ -31,7 +31,6 @@ import ( "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/chunks" - "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/nomdl" "github.com/dolthub/dolt/go/store/prolly/tree" @@ -741,13 +740,6 @@ func makeCommitStructType(metaType, parentsType, parentsListType, parentsClosure } } -func getRefElementType(t *types.Type) *types.Type { - // precondition checks - d.PanicIfFalse(t.TargetKind() == types.RefKind) - - return t.Desc.(types.CompoundDesc).ElemTypes[0] -} - func firstError(l, r error) error { if l != nil { return l diff --git a/go/store/datas/commit_test.go b/go/store/datas/commit_test.go index 5369a03ab2..a12036e508 100644 --- a/go/store/datas/commit_test.go +++ b/go/store/datas/commit_test.go @@ -47,13 +47,6 @@ func mustHead(ds Dataset) types.Value { return s } -func mustHeight(ds Dataset) uint64 { - h, ok, err := ds.MaybeHeight() - d.PanicIfError(err) - d.PanicIfFalse(ok) - return h -} - func mustHeadValue(ds Dataset) types.Value { val, ok, err := ds.MaybeHeadValue() if err != nil { @@ -70,11 +63,6 @@ func mustString(str string, err error) string { return str } -func mustStruct(st types.Struct, err error) types.Struct { - d.PanicIfError(err) - return st -} - func mustSet(s types.Set, err error) types.Set { d.PanicIfError(err) return s @@ -85,11 +73,6 @@ func mustList(l types.List, err error) types.List { return l } -func mustMap(m types.Map, err error) types.Map { - d.PanicIfError(err) - return m -} - func mustParentsClosure(t *testing.T, exists bool) func(types.Ref, bool, error) types.Ref { return func(r types.Ref, got bool, err error) types.Ref { t.Helper() @@ -114,11 +97,6 @@ func mustValue(val types.Value, err error) types.Value { return val } -func mustTuple(val types.Tuple, err error) types.Tuple { - d.PanicIfError(err) - return val -} - func TestNewCommit(t *testing.T) { assert := assert.New(t) @@ -279,20 +257,6 @@ func mustCommitToTargetHashes(vrw types.ValueReadWriter, commits ...types.Value) return ret } -// Convert list of Struct's to List -func toRefList(vrw types.ValueReadWriter, commits ...types.Struct) (types.List, error) { - l, err := types.NewList(context.Background(), vrw) - if err != nil { - return types.EmptyList, err - } - - le := l.Edit() - for _, p := range commits { - le = le.Append(mustRef(types.NewRef(p, vrw.Format()))) - } - return le.List(context.Background()) -} - func commonAncWithSetClosure(ctx context.Context, c1, c2 *Commit, vr1, vr2 types.ValueReader, ns1, ns2 tree.NodeStore) (a hash.Hash, ok bool, err error) { closure, err := NewSetCommitClosure(ctx, vr1, c1) if err != nil { diff --git a/go/store/datas/dataset.go b/go/store/datas/dataset.go index 0ac191b9a7..bebb2a1d8f 100644 --- a/go/store/datas/dataset.go +++ b/go/store/datas/dataset.go @@ -531,74 +531,10 @@ func newStatisticHead(sm types.SerialMessage, addr hash.Hash) serialStashListHea return serialStashListHead{sm, addr} } -type statisticsHead struct { - msg types.SerialMessage - addr hash.Hash -} - -var _ dsHead = statisticsHead{} - -// TypeName implements dsHead -func (s statisticsHead) TypeName() string { - return "Statistics" -} - -// Addr implements dsHead -func (s statisticsHead) Addr() hash.Hash { - return s.addr -} - -// HeadTag implements dsHead -func (s statisticsHead) HeadTag() (*TagMeta, hash.Hash, error) { - return nil, hash.Hash{}, errors.New("HeadTag called on statistic") -} - -// HeadWorkingSet implements dsHead -func (s statisticsHead) HeadWorkingSet() (*WorkingSetHead, error) { - return nil, errors.New("HeadWorkingSet called on statistic") -} - -// value implements dsHead -func (s statisticsHead) value() types.Value { - return s.msg -} - func newTupleHead(sm types.SerialMessage, addr hash.Hash) serialStashListHead { return serialStashListHead{sm, addr} } -type tupleHead struct { - msg types.SerialMessage - addr hash.Hash -} - -var _ dsHead = tupleHead{} - -// TypeName implements dsHead -func (s tupleHead) TypeName() string { - return "Tuple" -} - -// Addr implements dsHead -func (s tupleHead) Addr() hash.Hash { - return s.addr -} - -// HeadTag implements dsHead -func (s tupleHead) HeadTag() (*TagMeta, hash.Hash, error) { - return nil, hash.Hash{}, errors.New("HeadTag called on tuple") -} - -// HeadWorkingSet implements dsHead -func (s tupleHead) HeadWorkingSet() (*WorkingSetHead, error) { - return nil, errors.New("HeadWorkingSet called on statistic") -} - -// value implements dsHead -func (s tupleHead) value() types.Value { - return s.msg -} - // Dataset is a named value within a Database. Different head values may be stored in a dataset. Most commonly, this is // a commit, but other values are also supported in some cases. type Dataset struct { diff --git a/go/store/datas/stashlist.go b/go/store/datas/stashlist.go index 7418ea7d7f..3654b5af1a 100644 --- a/go/store/datas/stashlist.go +++ b/go/store/datas/stashlist.go @@ -142,18 +142,6 @@ func (s *StashList) getStashAtIdx(ctx context.Context, idx int) (hash.Hash, erro return stash.addr, nil } -// IsStashList determines whether the types.Value is a stash list object. -func IsStashList(v types.Value) (bool, error) { - if _, ok := v.(types.Struct); ok { - // this should not return true as stash is not supported for old format - return false, nil - } else if sm, ok := v.(types.SerialMessage); ok { - return serial.GetFileID(sm) == serial.StashListFileID, nil - } else { - return false, nil - } -} - // GetStashAtIdx returns hash address of stash at given index in the stash list. func GetStashAtIdx(ctx context.Context, ns tree.NodeStore, val types.Value, idx int) (hash.Hash, error) { stashList, err := getExistingStashList(ctx, ns, val) diff --git a/go/store/datas/statistics.go b/go/store/datas/statistics.go index e5de7cfe78..4bd4e443ef 100644 --- a/go/store/datas/statistics.go +++ b/go/store/datas/statistics.go @@ -49,18 +49,6 @@ func (s *Statistics) Count() (int, error) { return s.m.Count() } -// IsStatistic determines whether the types.Value is a stash list object. -func IsStatistic(v types.Value) (bool, error) { - if _, ok := v.(types.Struct); ok { - // this should not return true as stash is not supported for old format - return false, nil - } else if sm, ok := v.(types.SerialMessage); ok { - return serial.GetFileID(sm) == serial.StatisticFileID, nil - } else { - return false, nil - } -} - // LoadStatistics attempts to dereference a database's statistics Dataset into a typed Statistics object. func LoadStatistics(ctx context.Context, nbf *types.NomsBinFormat, ns tree.NodeStore, vr types.ValueReader, ds Dataset) (*Statistics, error) { if !nbf.UsesFlatbuffers() { diff --git a/go/store/datas/tuple.go b/go/store/datas/tuple.go index 947c7a0ea0..2016eb2ec3 100644 --- a/go/store/datas/tuple.go +++ b/go/store/datas/tuple.go @@ -35,18 +35,6 @@ func (t Tuple) Bytes() []byte { return t.val } -// IsTuple determines whether the types.Value is a tuple -func IsTuple(v types.Value) (bool, error) { - if _, ok := v.(types.Struct); ok { - // this should not return true as stash is not supported for old format - return false, nil - } else if sm, ok := v.(types.SerialMessage); ok { - return serial.GetFileID(sm) == serial.StatisticFileID, nil - } else { - return false, nil - } -} - // LoadTuple attempts to dereference a database's Tuple Dataset into a typed Tuple object. func LoadTuple(ctx context.Context, nbf *types.NomsBinFormat, ns tree.NodeStore, vr types.ValueReader, ds Dataset) (*Tuple, error) { if !nbf.UsesFlatbuffers() { diff --git a/go/store/nbs/aws_table_persister_test.go b/go/store/nbs/aws_table_persister_test.go index 71ceaf724e..17ceb398a6 100644 --- a/go/store/nbs/aws_table_persister_test.go +++ b/go/store/nbs/aws_table_persister_test.go @@ -34,7 +34,6 @@ import ( "github.com/stretchr/testify/require" dherrors "github.com/dolthub/dolt/go/libraries/utils/errors" - "github.com/dolthub/dolt/go/store/hash" ) func randomChunks(t *testing.T, r *rand.Rand, sz int) [][]byte { @@ -159,30 +158,6 @@ func TestAWSTablePersisterPersist(t *testing.T) { }) } -type waitOnStoreTableCache struct { - readers map[hash.Hash]io.ReaderAt - mu sync.RWMutex - storeWG sync.WaitGroup -} - -func (mtc *waitOnStoreTableCache) checkout(h hash.Hash) (io.ReaderAt, error) { - mtc.mu.RLock() - defer mtc.mu.RUnlock() - return mtc.readers[h], nil -} - -func (mtc *waitOnStoreTableCache) checkin(h hash.Hash) error { - return nil -} - -func (mtc *waitOnStoreTableCache) store(h hash.Hash, data io.Reader, size uint64) error { - defer mtc.storeWG.Done() - mtc.mu.Lock() - defer mtc.mu.Unlock() - mtc.readers[h] = data.(io.ReaderAt) - return nil -} - type failingFakeS3 struct { *fakeS3 mu sync.Mutex diff --git a/go/store/nbs/journal_index_record.go b/go/store/nbs/journal_index_record.go index 6fbb9557e4..19509465c1 100644 --- a/go/store/nbs/journal_index_record.go +++ b/go/store/nbs/journal_index_record.go @@ -18,11 +18,9 @@ import ( "bufio" "encoding/binary" "errors" - "fmt" "hash/crc32" "io" - "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" ) @@ -93,91 +91,6 @@ func journalIndexRecordSize(idx []byte) (recordSz uint32) { return } -func writeJournalIndexRecord(buf []byte, root hash.Hash, start, end uint64, idx []byte) (n uint32) { - //defer trace.StartRegion(ctx, "writeJournalIndexRecord").End() - - // length - l := journalIndexRecordSize(idx) - writeUint32(buf[:indexRecLenSz], l) - n += indexRecLenSz - // last root - buf[n] = byte(lastRootIndexRecTag) - n += indexRecTagSz - copy(buf[n:], root[:]) - n += indexRecLastRootSz - // start offset - buf[n] = byte(startOffsetIndexRecTag) - n += indexRecTagSz - writeUint64(buf[n:], start) - n += indexRecOffsetSz - // end offset - buf[n] = byte(endOffsetIndexRecTag) - n += indexRecTagSz - writeUint64(buf[n:], end) - n += indexRecOffsetSz - // kind - buf[n] = byte(kindIndexRecTag) - n += indexRecTagSz - buf[n] = byte(tableIndexRecKind) - n += indexRecKindSz - // payload - buf[n] = byte(payloadIndexRecTag) - n += indexRecTagSz - copy(buf[n:], idx) - n += uint32(len(idx)) - // checksum - writeUint32(buf[n:], crc(buf[:n])) - n += indexRecChecksumSz - d.PanicIfFalse(l == n) - return -} - -func readJournalIndexRecord(buf []byte) (rec indexRec, err error) { - rec.length = readUint32(buf) - buf = buf[indexRecLenSz:] - for len(buf) > indexRecChecksumSz { - tag := indexRecTag(buf[0]) - buf = buf[indexRecTagSz:] - switch tag { - case lastRootIndexRecTag: - copy(rec.lastRoot[:], buf) - buf = buf[indexRecLastRootSz:] - case startOffsetIndexRecTag: - rec.start = readUint64(buf) - buf = buf[indexRecOffsetSz:] - case endOffsetIndexRecTag: - rec.end = readUint64(buf) - buf = buf[indexRecOffsetSz:] - case kindIndexRecTag: - rec.kind = indexRecKind(buf[0]) - buf = buf[indexRecKindSz:] - case payloadIndexRecTag: - sz := len(buf) - indexRecChecksumSz - rec.payload = buf[:sz] - buf = buf[sz:] - case unknownIndexRecTag: - fallthrough - default: - err = fmt.Errorf("unknown record field tag: %d", tag) - return - } - } - rec.checksum = readUint32(buf[:indexRecChecksumSz]) - return -} - -func validateIndexRecord(buf []byte) bool { - if len(buf) < (indexRecLenSz + indexRecChecksumSz) { - return false - } - off := readUint32(buf) - if int(off) > len(buf) { - return false - } - off -= indexRecChecksumSz - return crc(buf[:off]) == readUint32(buf[off:]) -} - type lookupMeta struct { batchStart int64 batchEnd int64 diff --git a/go/store/nbs/journal_index_record_test.go b/go/store/nbs/journal_index_record_test.go index 15007d4c88..34c3ebbce5 100644 --- a/go/store/nbs/journal_index_record_test.go +++ b/go/store/nbs/journal_index_record_test.go @@ -115,19 +115,3 @@ func TestRoundTripIndexLookupsMeta(t *testing.T) { // do a bunch of iters // use processIndexRecords2 to read back, make sure roots/checksums are consistent, counts, etc } - -func makeLookups(cnt int) (lookups []lookup) { - lookups = make([]lookup, cnt) - buf := make([]byte, cnt*hash.ByteLen) - rand.Read(buf) - var off uint64 - for i := range lookups { - copy(lookups[i].a[:], buf) - buf = buf[hash.ByteLen:] - lookups[i].r.Offset = off - l := rand.Uint32() % 1024 - lookups[i].r.Length = l - off += uint64(l) - } - return -} diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index 3cd5b45e5e..8478acca44 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -699,14 +699,6 @@ func (idx rangeIndex) novelCount() int { return len(idx.novel) } -func (idx rangeIndex) novelLookups() (lookups []lookup) { - lookups = make([]lookup, 0, len(idx.novel)) - for a, r := range idx.novel { - lookups = append(lookups, lookup{a: toAddr16(a), r: r}) - } - return -} - func (idx rangeIndex) flatten(ctx context.Context) rangeIndex { defer trace.StartRegion(ctx, "flatten journal index").End() trace.Logf(ctx, "map index cached count", "%d", len(idx.cached)) diff --git a/go/store/nbs/journal_writer_test.go b/go/store/nbs/journal_writer_test.go index 546c3f0526..b8e7897c88 100644 --- a/go/store/nbs/journal_writer_test.go +++ b/go/store/nbs/journal_writer_test.go @@ -16,7 +16,6 @@ package nbs import ( "context" - "encoding/base32" "math/rand" "os" "path/filepath" @@ -405,13 +404,6 @@ func TestJournalIndexBootstrap(t *testing.T) { } } -var encoding = base32.NewEncoding("0123456789abcdefghijklmnopqrstuv") - -// encode returns the base32 encoding in the Dolt alphabet. -func encode(data []byte) string { - return encoding.EncodeToString(data) -} - func randomCompressedChunks(cnt int) (compressed map[hash.Hash]CompressedChunk) { compressed = make(map[hash.Hash]CompressedChunk) var buf []byte diff --git a/go/store/nbs/mem_table.go b/go/store/nbs/mem_table.go index 473583d7ba..87c4eb3428 100644 --- a/go/store/nbs/mem_table.go +++ b/go/store/nbs/mem_table.go @@ -212,14 +212,6 @@ func (mt *memTable) getManyCompressed(ctx context.Context, eg *errgroup.Group, r return remaining, gcBehavior_Continue, nil } -func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) error { - for _, hrec := range mt.order { - chunks <- extractRecord{a: *hrec.a, data: mt.chunks[*hrec.a], err: nil} - } - - return nil -} - func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name hash.Hash, data []byte, splitOffset uint64, chunkCount uint32, gcb gcBehavior, err error) { gcb = gcBehavior_Continue numChunks := uint64(len(mt.order)) diff --git a/go/store/prolly/commit_closure.go b/go/store/prolly/commit_closure.go index 8345d22b31..9f9c98a69c 100644 --- a/go/store/prolly/commit_closure.go +++ b/go/store/prolly/commit_closure.go @@ -122,13 +122,6 @@ func (c CommitClosure) ContainsKey(ctx context.Context, h hash.Hash, height uint return c.closure.Has(ctx, k) } -func DecodeCommitClosureKey(key []byte) (height uint64, addr hash.Hash) { - height = binary.LittleEndian.Uint64(key) - addr = hash.New(key[prefixWidth:]) - - return -} - func (c CommitClosure) AsHashSet(ctx context.Context) (hash.HashSet, error) { closureIter, err := c.IterAllReverse(ctx) if err != nil { diff --git a/go/store/prolly/tuple_range_iter.go b/go/store/prolly/tuple_range_iter.go index 71323de13f..4f843bb7a8 100644 --- a/go/store/prolly/tuple_range_iter.go +++ b/go/store/prolly/tuple_range_iter.go @@ -35,7 +35,6 @@ type rangeIter[K, V ~[]byte] interface { var _ rangeIter[val.Tuple, val.Tuple] = &tree.OrderedTreeIter[val.Tuple, val.Tuple]{} var _ rangeIter[val.Tuple, val.Tuple] = &memRangeIter{} -var _ rangeIter[val.Tuple, val.Tuple] = emptyIter{} // mutableMapIter iterates over a Range of Tuples. type mutableMapIter[K, V ~[]byte, O tree.Ordering[K]] struct { @@ -88,16 +87,6 @@ func (it mutableMapIter[K, V, O]) Next(ctx context.Context) (key K, value V, err } } -func (it mutableMapIter[K, V, O]) currentKeys() (memKey, proKey K) { - if it.memory != nil { - memKey, _ = it.memory.Current() - } - if it.prolly != nil { - proKey, _ = it.prolly.Current() - } - return -} - func (it mutableMapIter[K, V, O]) compareKeys(ctx context.Context, memKey, proKey K) int { if memKey == nil { return 1 @@ -179,16 +168,6 @@ func (it *memRangeIter) Iterate(ctx context.Context) (err error) { } } -type emptyIter struct{} - -func (e emptyIter) Next(context.Context) (val.Tuple, val.Tuple, error) { - return nil, nil, io.EOF -} - -func (e emptyIter) Iterate(ctx context.Context) (err error) { return } - -func (e emptyIter) Current() (key, value val.Tuple) { return } - type filteredIter struct { iter MapIter rng Range diff --git a/go/store/prolly/tuple_range_iter_test.go b/go/store/prolly/tuple_range_iter_test.go index 7b03987ff3..8c3bb53f67 100644 --- a/go/store/prolly/tuple_range_iter_test.go +++ b/go/store/prolly/tuple_range_iter_test.go @@ -95,8 +95,8 @@ func testIterRange(t *testing.T, om testMap, tuples [][2]val.Tuple) { } for _, test := range tests { - //s := fmt.Sprintf(test.testRange.format()) - //fmt.Println(s) + // s := fmt.Sprintf(test.testRange.format()) + // fmt.Println(s) iter, err := om.IterRange(ctx, test.testRange) require.NoError(t, err) @@ -500,38 +500,6 @@ func testIterOrdinalRangeWithBounds(t *testing.T, om Map, tuples [][2]val.Tuple, }) } -func testIterKeyRange(t *testing.T, m Map, tuples [][2]val.Tuple) { - ctx := context.Background() - - t.Run("RandomKeyRange", func(t *testing.T) { - bounds := generateInserts(t, m, m.keyDesc, m.valDesc, 2) - start, stop := bounds[0][0], bounds[1][0] - if m.keyDesc.Compare(ctx, start, stop) > 0 { - start, stop = stop, start - } - kR := keyRange{kd: m.keyDesc, start: start, stop: stop} - - var expectedKeys []val.Tuple - for _, kv := range tuples { - if kR.includes(kv[0]) { - expectedKeys = append(expectedKeys, kv[0]) - } - } - - itr, err := m.IterKeyRange(ctx, start, stop) - require.NoError(t, err) - - for _, eK := range expectedKeys { - k, _, err := itr.Next(ctx) - require.NoError(t, err) - assert.Equal(t, eK, k) - } - - _, _, err = itr.Next(ctx) - require.Equal(t, io.EOF, err) - }) -} - func iterOrdinalRange(t *testing.T, ctx context.Context, iter MapIter) (actual [][2]val.Tuple) { for { k, v, err := iter.Next(ctx) diff --git a/go/store/spec/util.go b/go/store/spec/util.go index 7c224983bc..05d65c80de 100644 --- a/go/store/spec/util.go +++ b/go/store/spec/util.go @@ -23,7 +23,6 @@ package spec import ( "github.com/dolthub/dolt/go/store/d" - "github.com/dolthub/dolt/go/store/hash" ) func CreateDatabaseSpecString(protocol, db string) string { @@ -35,7 +34,3 @@ func CreateValueSpecString(protocol, db, path string) string { d.Chk.NoError(err) return Spec{Protocol: protocol, DatabaseName: db, Path: p}.String() } - -func CreateHashSpecString(protocol, db string, h hash.Hash) string { - return Spec{Protocol: protocol, DatabaseName: db, Path: AbsolutePath{Hash: h}}.String() -} diff --git a/go/store/types/apply_map_edits.go b/go/store/types/apply_map_edits.go index f9e7d7b1b8..60d5eea11a 100644 --- a/go/store/types/apply_map_edits.go +++ b/go/store/types/apply_map_edits.go @@ -99,28 +99,15 @@ type AppliedEditStats struct { NonExistentDeletes int64 } -// Add adds two AppliedEditStats structures member by member. -func (stats AppliedEditStats) Add(other AppliedEditStats) AppliedEditStats { - return AppliedEditStats{ - Additions: stats.Additions + other.Additions, - Modifications: stats.Modifications + other.Modifications, - SameVal: stats.SameVal + other.SameVal, - Deletions: stats.Deletions + other.Deletions, - NonExistentDeletes: stats.NonExistentDeletes + other.NonExistentDeletes, - } -} - // ApplyEdits applies all the edits to a given Map and returns the resulting map, and some statistics about the edits // that were applied. -func ApplyEdits(ctx context.Context, edits EditProvider, m Map) (Map, AppliedEditStats, error) { +func ApplyEdits(ctx context.Context, edits EditProvider, m Map) (Map, error) { return ApplyNEdits(ctx, edits, m, -1) } -func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64) (Map, AppliedEditStats, error) { - var stats AppliedEditStats - +func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64) (Map, error) { if edits.ReachedEOF() { - return m, stats, nil // no edits + return m, nil // no edits } var seq sequence = m.orderedSequence @@ -168,12 +155,10 @@ func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64) } if existingValue == nil && kv.value == nil { - stats.NonExistentDeletes++ continue // already non-present } if existingValue != nil && kv.value != nil && existingValue.Equals(kv.value) { - stats.SameVal++ continue // same value } @@ -193,15 +178,11 @@ func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64) } if existingValue != nil { - stats.Modifications++ err := ch.Skip(ctx) if ae.SetIfError(err) { continue } - - } else { - stats.Additions++ } if kv.value != nil { @@ -219,20 +200,20 @@ func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64) } if ae.IsSet() { - return EmptyMap, AppliedEditStats{}, ae.Get() + return EmptyMap, ae.Get() } if ch == nil { - return m, stats, nil // no edits required application + return m, nil // no edits required application } seq, err := ch.Done(ctx) if err != nil { - return EmptyMap, AppliedEditStats{}, err + return EmptyMap, err } - return newMap(seq.(orderedSequence)), stats, nil + return newMap(seq.(orderedSequence)), nil } // prepWorker will wait for work to be read from a channel, then iterate over all of the edits finding the appropriate diff --git a/go/store/types/float_util.go b/go/store/types/float_util.go index 2799ada5b7..793cbf906b 100644 --- a/go/store/types/float_util.go +++ b/go/store/types/float_util.go @@ -22,7 +22,6 @@ package types import ( - "fmt" "math" ) @@ -34,53 +33,3 @@ func Round(v Value) Value { return val } } - -func Increment(v Value) Value { - switch val := v.(type) { - case Int: - return Int(int64(val) + 1) - case Uint: - return Uint(uint64(val) + 1) - case Float: - return Float(float64(val) + 1) - default: - return val - } -} - -func float64IsInt(f float64) bool { - return math.Trunc(f) == f -} - -// convert float64 to int64 where f == i * 2^exp -func float64ToIntExp(f float64) (int64, int) { - if math.IsNaN(f) || math.IsInf(f, 0) { - panic(fmt.Errorf("%v is not a supported number", f)) - } - - if f == 0 { - return 0, 0 - } - - isNegative := math.Signbit(f) - f = math.Abs(f) - - frac, exp := math.Frexp(f) - // frac is [.5, 1) - // Move frac up until it is an integer. - for !float64IsInt(frac) { - frac *= 2 - exp-- - } - - if isNegative { - frac *= -1 - } - - return int64(frac), exp -} - -// fracExpToFloat returns frac * 2 ** exp -func fracExpToFloat(frac int64, exp int) float64 { - return float64(frac) * math.Pow(2, float64(exp)) -} diff --git a/go/store/types/geometry.go b/go/store/types/geometry.go index c2c5f2d09b..905674bbc1 100644 --- a/go/store/types/geometry.go +++ b/go/store/types/geometry.go @@ -149,11 +149,3 @@ func (v Geometry) skip(nbf *NomsBinFormat, b *binaryNomsReader) { func (v Geometry) HumanReadableString() string { return v.Inner.HumanReadableString() } - -func EncodeGeometryWKB(v Geometry) ([]byte, error) { - wr := &binaryNomsWriter{make([]byte, 128), 0} - if err := v.writeTo(wr, nil); err != nil { - return nil, err - } - return wr.data()[1:], nil // trim NomsKind -} diff --git a/go/store/types/incremental_test.go b/go/store/types/incremental_test.go index 6203807631..6a5831c525 100644 --- a/go/store/types/incremental_test.go +++ b/go/store/types/incremental_test.go @@ -94,90 +94,3 @@ func TestIncrementalLoadList(t *testing.T) { assert.Equal(expectedCount+chunkReads[i], cs.Reads()) } } - -func SkipTestIncrementalLoadSet(t *testing.T) { - assert := assert.New(t) - ts := &chunks.TestStorage{} - cs := ts.NewView() - vs := NewValueStore(cs) - - expected, err := NewSet(context.Background(), vs, getTestVals(vs)...) - require.NoError(t, err) - ref, err := vs.WriteValue(context.Background(), expected) - require.NoError(t, err) - refHash := ref.TargetHash() - - actualVar, err := vs.ReadValue(context.Background(), refHash) - require.NoError(t, err) - actual := actualVar.(Set) - - expectedCount := cs.Reads() - assert.Equal(1, expectedCount) - err = actual.Iter(context.Background(), func(v Value) (bool, error) { - expectedCount += isEncodedOutOfLine(v) - assert.Equal(expectedCount, cs.Reads()) - return false, nil - }) - - require.NoError(t, err) -} - -func SkipTestIncrementalLoadMap(t *testing.T) { - assert := assert.New(t) - ts := &chunks.TestStorage{} - cs := ts.NewView() - vs := NewValueStore(cs) - - expected, err := NewMap(context.Background(), vs, getTestVals(vs)...) - require.NoError(t, err) - ref, err := vs.WriteValue(context.Background(), expected) - require.NoError(t, err) - refHash := ref.TargetHash() - - actualVar, err := vs.ReadValue(context.Background(), refHash) - require.NoError(t, err) - actual := actualVar.(Map) - - expectedCount := cs.Reads() - assert.Equal(1, expectedCount) - err = actual.Iter(context.Background(), func(k, v Value) (bool, error) { - expectedCount += isEncodedOutOfLine(k) - expectedCount += isEncodedOutOfLine(v) - assert.Equal(expectedCount, cs.Reads()) - return false, nil - }) - require.NoError(t, err) -} - -func SkipTestIncrementalAddRef(t *testing.T) { - assert := assert.New(t) - ts := &chunks.TestStorage{} - cs := ts.NewView() - vs := NewValueStore(cs) - - expectedItem := Float(42) - ref, err := vs.WriteValue(context.Background(), expectedItem) - require.NoError(t, err) - - expected, err := NewList(context.Background(), vs, ref) - require.NoError(t, err) - ref, err = vs.WriteValue(context.Background(), expected) - require.NoError(t, err) - actualVar, err := vs.ReadValue(context.Background(), ref.TargetHash()) - require.NoError(t, err) - - assert.Equal(1, cs.Reads()) - assert.True(expected.Equals(actualVar)) - - actual := actualVar.(List) - actualItem, err := actual.Get(context.Background(), 0) - require.NoError(t, err) - assert.Equal(2, cs.Reads()) - assert.True(expectedItem.Equals(actualItem)) - - // do it again to make sure caching works. - actualItem, err = actual.Get(context.Background(), 0) - require.NoError(t, err) - assert.Equal(2, cs.Reads()) - assert.True(expectedItem.Equals(actualItem)) -} diff --git a/go/store/types/json.go b/go/store/types/json.go index 85a308c838..c62e15cd85 100644 --- a/go/store/types/json.go +++ b/go/store/types/json.go @@ -42,16 +42,6 @@ func NewJSONDoc(nbf *NomsBinFormat, vrw ValueReadWriter, value Value) (JSON, err return JSON{valueImpl{vrw, nbf, w.data(), nil}}, nil } -func NewTestJSONDoc(nbf *NomsBinFormat, vrw ValueReadWriter, buf []byte) (JSON, error) { - w := newBinaryNomsWriter() - if err := JSONKind.writeTo(&w, nbf); err != nil { - return emptyJSONDoc(nbf), err - } - - w.writeString(string(buf)) - return JSON{valueImpl{vrw, nbf, w.data(), nil}}, nil -} - // emptyJSONDoc creates and empty JSON value. func emptyJSONDoc(nbf *NomsBinFormat) JSON { w := newBinaryNomsWriter() @@ -148,12 +138,6 @@ func (t JSON) Kind() NomsKind { return JSONKind } -func (t JSON) decoderSkipToFields() (valueDecoder, uint64) { - dec := t.decoder() - dec.skipKind() - return dec, uint64(1) -} - // Len implements the Value interface. func (t JSON) Len() uint64 { // TODO(andy): is this ever 0? diff --git a/go/store/types/map.go b/go/store/types/map.go index 17469fd6b6..50c50ad026 100644 --- a/go/store/types/map.go +++ b/go/store/types/map.go @@ -26,8 +26,6 @@ import ( "errors" "fmt" - "golang.org/x/sync/errgroup" - "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" ) @@ -98,98 +96,6 @@ func NewMap(ctx context.Context, vrw ValueReadWriter, kv ...Value) (Map, error) return newMap(seq.(orderedSequence)), nil } -// NewStreamingMap takes an input channel of values and returns a value that -// will produce a finished Map when |.Wait()| is called. Values sent to the -// input channel must be alternating keys and values. (e.g. k1, v1, k2, -// v2...). Moreover keys need to be added to the channel in Noms sortorder, -// adding key values to the input channel out of order will result in an error. -// Once the input channel is closed by the caller, a finished Map will be -// available from the |Wait| call. -// -// See graph_builder.go for building collections with values that are not in -// order. -func NewStreamingMap(ctx context.Context, vrw ValueReadWriter, kvs <-chan Value) *StreamingMap { - d.PanicIfTrue(vrw == nil) - sm := &StreamingMap{} - sm.eg, sm.egCtx = errgroup.WithContext(ctx) - sm.eg.Go(func() error { - m, err := readMapInput(sm.egCtx, vrw, kvs) - sm.m = m - return err - }) - return sm -} - -type StreamingMap struct { - eg *errgroup.Group - egCtx context.Context - m Map -} - -func (sm *StreamingMap) Wait() (Map, error) { - err := sm.eg.Wait() - return sm.m, err -} - -// Done returns a signal channel which is closed once the StreamingMap is no -// longer reading from the key/values channel. A send to the key/value channel -// should be in a select with a read from this channel to ensure that the send -// does not deadlock. -func (sm *StreamingMap) Done() <-chan struct{} { - return sm.egCtx.Done() -} - -func readMapInput(ctx context.Context, vrw ValueReadWriter, kvs <-chan Value) (Map, error) { - ch, err := newEmptyMapSequenceChunker(ctx, vrw) - if err != nil { - return EmptyMap, err - } - - var lastK Value - nextIsKey := true - var k Value -LOOP: - for { - select { - case v, ok := <-kvs: - if !ok { - break LOOP - } - if nextIsKey { - k = v - - if lastK != nil { - isLess, err := lastK.Less(ctx, vrw.Format(), k) - if err != nil { - return EmptyMap, err - } - if !isLess { - return EmptyMap, ErrKeysNotOrdered - } - } - lastK = k - nextIsKey = false - } else { - _, err := ch.Append(ctx, mapEntry{key: k, value: v}) - if err != nil { - return EmptyMap, err - } - - nextIsKey = true - } - case <-ctx.Done(): - return EmptyMap, ctx.Err() - } - } - - seq, err := ch.Done(ctx) - if err != nil { - return EmptyMap, err - } - - return newMap(seq.(orderedSequence)), nil -} - // Diff computes the diff from |last| to |m| using the top-down algorithm, // which completes as fast as possible while taking longer to return early // results than left-to-right. diff --git a/go/store/types/map_editor.go b/go/store/types/map_editor.go index 0c25a0cdb6..0c23f77963 100644 --- a/go/store/types/map_editor.go +++ b/go/store/types/map_editor.go @@ -76,8 +76,7 @@ func (med *MapEditor) Map(ctx context.Context) (Map, error) { return EmptyMap, err } - m, _, err := ApplyEdits(ctx, edits, med.m) - return m, err + return ApplyEdits(ctx, edits, med.m) } // Set adds an edit diff --git a/go/store/types/map_iterator.go b/go/store/types/map_iterator.go index 0bef6c0451..1bb8ec035f 100644 --- a/go/store/types/map_iterator.go +++ b/go/store/types/map_iterator.go @@ -39,16 +39,6 @@ type MapIterator interface { Next(ctx context.Context) (k, v Value, err error) } -type EmptyMapIterator struct{} - -func (mtItr EmptyMapIterator) Next(ctx context.Context) (k, v Value, err error) { - return nil, nil, nil -} - -func (mtItr EmptyMapIterator) NextTuple(ctx context.Context) (k, v Tuple, err error) { - return Tuple{}, Tuple{}, io.EOF -} - // mapIterator can efficiently iterate through a Noms Map. type mapIterator struct { sequenceIter sequenceIterator @@ -136,49 +126,3 @@ func (m Map) RangeIterator(ctx context.Context, startIdx, endIdx uint64) (MapTup return &mapRangeIter{collItr: collItr}, nil } - -// LimitingMapIterator iterates |iter| only returning up to |limit| results. -type LimitingMapIterator struct { - iter MapIterator - limit uint64 - cnt uint64 -} - -var _ MapIterator = (*LimitingMapIterator)(nil) - -// NewLimitingMapIterator returns a *LimitingMapIterator. -func NewLimitingMapIterator(iter MapIterator, limit uint64) *LimitingMapIterator { - return &LimitingMapIterator{ - iter: iter, - limit: limit, - } -} - -// Next implements MapIterator. -func (l *LimitingMapIterator) Next(ctx context.Context) (k, v Value, err error) { - if l.cnt == l.limit { - return nil, nil, nil - } - k, v, err = l.iter.Next(ctx) - if err != nil { - return nil, nil, err - } - if k == nil { - return nil, nil, nil - } - l.cnt++ - return -} - -// NextTuple implements MapIterator. -func (l *LimitingMapIterator) NextTuple(ctx context.Context) (k, v Tuple, err error) { - if l.cnt == l.limit { - return Tuple{}, Tuple{}, io.EOF - } - k, v, err = l.iter.NextTuple(ctx) - if err != nil { - return Tuple{}, Tuple{}, err - } - l.cnt++ - return -} diff --git a/go/store/types/map_test.go b/go/store/types/map_test.go index 19f2d8ecb0..cfedc11f4f 100644 --- a/go/store/types/map_test.go +++ b/go/store/types/map_test.go @@ -27,7 +27,6 @@ import ( "fmt" "math/rand" "sort" - "sync" "testing" "time" @@ -322,67 +321,6 @@ func newMapTestSuite(size uint, expectChunkCount int, expectPrependChunkDiff int } } -func (suite *mapTestSuite) createStreamingMap(vs *ValueStore) Map { - kvChan := make(chan Value) - streamingMap := NewStreamingMap(context.Background(), vs, kvChan) - for _, entry := range suite.elems.entries.entries { - kvChan <- entry.key - kvChan <- entry.value - } - close(kvChan) - m, err := streamingMap.Wait() - suite.NoError(err) - return m -} - -func (suite *mapTestSuite) TestStreamingMap() { - vs := newTestValueStore() - defer vs.Close() - m := suite.createStreamingMap(vs) - suite.True(suite.validate(m), "map not valid") -} - -func (suite *mapTestSuite) TestStreamingMapOrder() { - vs := newTestValueStore() - defer vs.Close() - - entries := mapEntrySlice{make([]mapEntry, len(suite.elems.entries.entries))} - copy(entries.entries, suite.elems.entries.entries) - entries.entries[0], entries.entries[1] = entries.entries[1], entries.entries[0] - - kvChan := make(chan Value, len(entries.entries)*2) - for _, e := range entries.entries { - kvChan <- e.key - kvChan <- e.value - } - close(kvChan) - - sm := NewStreamingMap(context.Background(), vs, kvChan) - _, err := sm.Wait() - - suite.Assert().EqualError(err, ErrKeysNotOrdered.Error()) -} - -func (suite *mapTestSuite) TestStreamingMap2() { - wg := sync.WaitGroup{} - vs := newTestValueStore() - defer vs.Close() - - wg.Add(2) - var m1, m2 Map - go func() { - m1 = suite.createStreamingMap(vs) - wg.Done() - }() - go func() { - m2 = suite.createStreamingMap(vs) - wg.Done() - }() - wg.Wait() - suite.True(suite.validate(m1), "map 'm1' not valid") - suite.True(suite.validate(m2), "map 'm2' not valid") -} - func TestMapSuite4K(t *testing.T) { suite.Run(t, newMapTestSuite(12, 5, 2, 2, newNumber)) } diff --git a/go/store/types/read_geometry.go b/go/store/types/read_geometry.go index d3b23a014f..d6e075eacc 100644 --- a/go/store/types/read_geometry.go +++ b/go/store/types/read_geometry.go @@ -200,10 +200,6 @@ func DeserializeEWKBHeader(buf []byte) (uint32, bool, uint32, error) { return types.DeserializeEWKBHeader(buf) } -func DeserializeWKBHeader(buf []byte) (bool, uint32, error) { - return types.DeserializeWKBHeader(buf) -} - func DeserializePoint(buf []byte, isBig bool, srid uint32) types.Point { p, _, err := types.DeserializePoint(buf, isBig, srid) if err != nil { diff --git a/go/store/types/value.go b/go/store/types/value.go index 7d48301f65..18148bf001 100644 --- a/go/store/types/value.go +++ b/go/store/types/value.go @@ -171,60 +171,6 @@ type valueReadWriter interface { valueReadWriter() ValueReadWriter } -type TupleSlice []Tuple - -func (vs TupleSlice) Equals(other TupleSlice) bool { - if len(vs) != len(other) { - return false - } - - for i, v := range vs { - if !v.Equals(other[i]) { - return false - } - } - - return true -} - -func (vs TupleSlice) Contains(nbf *NomsBinFormat, v Tuple) bool { - for _, v := range vs { - if v.Equals(v) { - return true - } - } - return false -} - -type TupleSort struct { - Tuples []Tuple -} - -func (vs TupleSort) Len() int { - return len(vs.Tuples) -} - -func (vs TupleSort) Swap(i, j int) { - vs.Tuples[i], vs.Tuples[j] = vs.Tuples[j], vs.Tuples[i] -} - -func (vs TupleSort) Less(ctx context.Context, nbf *NomsBinFormat, i, j int) (bool, error) { - res, err := vs.Tuples[i].TupleCompare(ctx, nbf, vs.Tuples[j]) - if err != nil { - return false, err - } - - return res < 0, nil -} - -func (vs TupleSort) Equals(other TupleSort) bool { - return TupleSlice(vs.Tuples).Equals(other.Tuples) -} - -func (vs TupleSort) Contains(nbf *NomsBinFormat, v Tuple) bool { - return TupleSlice(vs.Tuples).Contains(nbf, v) -} - type valueImpl struct { vrw ValueReadWriter nbf *NomsBinFormat diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 214e1ebb1d..c92772d114 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -130,11 +130,6 @@ func newTestValueStore() *ValueStore { return NewValueStore(ts.NewViewWithDefaultFormat()) } -func newTestValueStore_LD_1() *ValueStore { - ts := &chunks.TestStorage{} - return NewValueStore(ts.NewView()) -} - // NewMemoryValueStore creates a simple struct that satisfies ValueReadWriter // and is backed by a chunks.TestStore. Used for dolt operations outside of noms. func NewMemoryValueStore() *ValueStore {