Merge pull request #10503 from dolthub/zachmu/kill-noms-4

[no-release-notes] more dead code deletion
This commit is contained in:
Zach Musgrave
2026-02-13 15:45:36 -08:00
committed by GitHub
47 changed files with 15 additions and 2887 deletions
@@ -31,7 +31,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/overrides"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/store/types"
)
@@ -40,6 +39,9 @@ const (
tableWriterStatUpdateRate = 64 * 1024
)
// StatsCb is a callback for reporting stats about the rows that have been processed so far
type StatsCb func(types.AppliedEditStats)
// SqlEngineTableWriter is a utility for importing a set of rows through the sql engine.
type SqlEngineTableWriter struct {
se *sqle.Engine
@@ -51,7 +53,7 @@ type SqlEngineTableWriter struct {
force bool
disableFks bool
statsCB noms.StatsCB
statsCB StatsCb
stats types.AppliedEditStats
statOps int32
@@ -60,7 +62,7 @@ type SqlEngineTableWriter struct {
rowOperationSchema sql.PrimaryKeySchema
}
func NewSqlEngineTableWriter(ctx *sql.Context, engine *sqle.Engine, createTableSchema, rowOperationSchema schema.Schema, options *MoverOptions, statsCB noms.StatsCB) (*SqlEngineTableWriter, error) {
func NewSqlEngineTableWriter(ctx *sql.Context, engine *sqle.Engine, createTableSchema, rowOperationSchema schema.Schema, options *MoverOptions, statsCB StatsCb) (*SqlEngineTableWriter, error) {
if engine.IsReadOnly() {
// SqlEngineTableWriter does not respect read only mode
return nil, analyzererrors.ErrReadOnlyDatabase.New(ctx.GetCurrentDatabase())
@@ -26,7 +26,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
@@ -174,7 +173,6 @@ func (itr *rangePartitionIter) nextProllyPartition() (sql.Partition, error) {
}
type rangePartition struct {
nomsRange *noms.ReadRange
key []byte
prollyRange prolly.Range
isReverse bool
@@ -15,10 +15,6 @@
package editor
import (
"context"
"fmt"
"strings"
"github.com/dolthub/dolt/go/store/types"
)
@@ -38,33 +34,3 @@ type Options struct {
func TestEditorOptions(vrw types.ValueReadWriter) Options {
return Options{}
}
// formatKey returns a comma-separated string representation of the key given.
func formatKey(ctx context.Context, key types.Value) (string, error) {
tuple, ok := key.(types.Tuple)
if !ok {
return "", fmt.Errorf("Expected types.Tuple but got %T", key)
}
var vals []string
iter, err := tuple.Iterator()
if err != nil {
return "", err
}
for iter.HasMore() {
i, val, err := iter.Next()
if err != nil {
return "", err
}
if i%2 == 1 {
str, err := types.EncodedValue(ctx, val)
if err != nil {
return "", err
}
vals = append(vals, str)
}
}
return fmt.Sprintf("[%s]", strings.Join(vals, ",")), nil
}
@@ -1,444 +0,0 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package editor
import (
"context"
"io"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/types/edits"
)
// var for testing
var indexFlushThreshold int64 = 256 * 1024
type IndexEditAccumulator interface {
// Delete adds a row to be deleted when these edits are eventually applied.
Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error
// Insert adds a row to be inserted when these edits are eventually applied.
Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error
// Has returns true if the current TableEditAccumulator contains the given key, or it exists in the row data.
Has(ctx context.Context, keyHash hash.Hash, key types.Tuple) (bool, error)
// HasPartial returns true if the current TableEditAccumulator contains the given partialKey
HasPartial(ctx context.Context, idxSch schema.Schema, partialKeyHash hash.Hash, partialKey types.Tuple) ([]hashedTuple, error)
// Commit applies the in memory edits to the list of committed in memory edits
Commit(ctx context.Context, nbf *types.NomsBinFormat) error
// Rollback rolls back in memory edits until it reaches the state represented by the savedTea
Rollback(ctx context.Context) error
// MaterializeEdits commits and applies the in memory edits to the row data
MaterializeEdits(ctx context.Context, nbf *types.NomsBinFormat) (types.Map, error)
}
// hashedTuple is a tuple accompanied by a hash. The representing value of the hash is dependent on the function
// it is obtained from.
type hashedTuple struct {
key types.Tuple
value types.Tuple
hash hash.Hash
}
// inMemIndexEdits represent row adds and deletes that have not been written to the underlying storage and only exist in memory
type inMemIndexEdits struct {
// addedPartialKeys is a map of partial keys to a map of full keys that match the partial key
partialAdds map[hash.Hash]map[hash.Hash]types.Tuple
// These hashes represent the hash of the partial key, with the tuple being the full key
deletes map[hash.Hash]*hashedTuple
// These hashes represent the hash of the partial key, with the tuple being the full key
adds map[hash.Hash]*hashedTuple
ops int64
}
func newInMemIndexEdits() *inMemIndexEdits {
return &inMemIndexEdits{
partialAdds: make(map[hash.Hash]map[hash.Hash]types.Tuple),
deletes: make(map[hash.Hash]*hashedTuple),
adds: make(map[hash.Hash]*hashedTuple),
}
}
// MergeIn merges changes from another inMemIndexEdits object into this instance
func (edits *inMemIndexEdits) MergeIn(other *inMemIndexEdits) {
for keyHash, ht := range other.deletes {
delete(edits.adds, keyHash)
edits.deletes[keyHash] = ht
}
for keyHash, ht := range other.adds {
delete(edits.deletes, keyHash)
edits.adds[keyHash] = ht
}
for partialKeyHash, keyHashToPartialKey := range other.partialAdds {
if dest, ok := edits.partialAdds[partialKeyHash]; !ok {
edits.partialAdds[partialKeyHash] = keyHashToPartialKey
} else {
for keyHash, partialKey := range keyHashToPartialKey {
dest[keyHash] = partialKey
}
}
}
edits.ops += other.ops
}
// Has returns whether a key hash has been added as an insert, or a delete in this inMemIndexEdits object
func (edits *inMemIndexEdits) Has(keyHash hash.Hash) (added, deleted bool) {
if _, ok := edits.adds[keyHash]; ok {
return true, false
}
if _, ok := edits.deletes[keyHash]; ok {
return false, true
}
return false, false
}
// indexEditAccumulatorImpl is the index equivalent of the tableEditAccumulatorImpl.
//
// indexEditAccumulatorImpl accumulates edits that need to be applied to the index row data. It needs to be able to
// support rollback and commit without having to materialize the types.Map. To do this it tracks committed and uncommitted
// modifications in memory. When a commit occurs the list of uncommitted changes are added to the list of committed changes.
// When a rollback occurs uncommitted changes are dropped.
//
// In addition to the in memory edits, the changes are applied to committedEA when a commit occurs. It is possible
// for the uncommitted changes to become so large that they need to be flushed to disk. At this point we change modes to write all edits
// to a separate map edit accumulator as they occur until the next commit occurs.
type indexEditAccumulatorImpl struct {
vr types.ValueReader
// state of the index last time edits were applied
rowData types.Map
// in memory changes which will be applied to the rowData when the map is materialized
committed *inMemIndexEdits
uncommitted *inMemIndexEdits
// accumulatorIdx defines the order in which types.EditAccumulators will be applied
accumulatorIdx uint64
// flusher manages flushing of the types.EditAccumulators to disk when needed
flusher *edits.DiskEditFlusher
// committedEaIds tracks ids of edit accumulators which have changes that have been committed
committedEaIds *set.Uint64Set
// uncommittedEAIds tracks ids of edit accumulators which have not been committed yet.
uncommittedEaIds *set.Uint64Set
// commitEA is the types.EditAccumulator containing the committed changes that are being accumulated currently
commitEA types.EditAccumulator
// commitEAId is the id used for ordering the commitEA with other types.EditAccumulators that will be applied when
// materializing all changes.
commitEAId uint64
// flushingUncommitted is a flag that tracks whether we are in a state where we write uncommitted map edits to uncommittedEA
flushingUncommitted bool
// lastFlush is the number of uncommitted ops that had occurred at the time of the last flush
lastFlush int64
// uncommittedEA is a types.EditAccumulator that we write to as uncommitted edits come in when the number of uncommitted
// edits becomes large
uncommittedEA types.EditAccumulator
// uncommittedEAId is the id used for ordering the uncommittedEA with other types.EditAccumulators that will be applied
// when materializing all changes
uncommittedEAId uint64
}
var _ IndexEditAccumulator = (*indexEditAccumulatorImpl)(nil)
func (iea *indexEditAccumulatorImpl) flushUncommitted() {
// if we are not already actively writing edits to the uncommittedEA then change the state and push all in mem edits
// to a types.EditAccumulator
if !iea.flushingUncommitted {
iea.flushingUncommitted = true
if iea.commitEA != nil && iea.commitEA.EditsAdded() > 0 {
// if there are uncommitted flushed changes we need to flush the committed changes first
// so they can be applied before the uncommitted flushed changes and future changes can be applied after
iea.committedEaIds.Add(iea.commitEAId)
iea.flusher.Flush(iea.commitEA, iea.commitEAId)
iea.commitEA = nil
iea.commitEAId = invalidEaId
}
iea.uncommittedEA = edits.NewAsyncSortedEditsWithDefaults(iea.vr)
iea.uncommittedEAId = iea.accumulatorIdx
iea.accumulatorIdx++
for _, ht := range iea.uncommitted.adds {
iea.uncommittedEA.AddEdit(ht.key, ht.value)
}
for _, ht := range iea.uncommitted.deletes {
iea.uncommittedEA.AddEdit(ht.key, nil)
}
}
// flush uncommitted
iea.lastFlush = iea.uncommitted.ops
iea.uncommittedEaIds.Add(iea.uncommittedEAId)
iea.flusher.Flush(iea.uncommittedEA, iea.uncommittedEAId)
// initialize a new types.EditAccumulator for additional uncommitted edits to be written to.
iea.uncommittedEA = edits.NewAsyncSortedEditsWithDefaults(iea.vr)
iea.uncommittedEAId = iea.accumulatorIdx
iea.accumulatorIdx++
}
// Insert adds a row to be inserted when these edits are eventually applied.
func (iea *indexEditAccumulatorImpl) Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error {
if _, ok := iea.uncommitted.deletes[keyHash]; ok {
delete(iea.uncommitted.deletes, keyHash)
} else {
iea.uncommitted.adds[keyHash] = &hashedTuple{key, value, partialKeyHash}
if matchingMap, ok := iea.uncommitted.partialAdds[partialKeyHash]; ok {
matchingMap[keyHash] = key
} else {
iea.uncommitted.partialAdds[partialKeyHash] = map[hash.Hash]types.Tuple{keyHash: key}
}
}
iea.uncommitted.ops++
if iea.flushingUncommitted {
iea.uncommittedEA.AddEdit(key, value)
if iea.uncommitted.ops-iea.lastFlush > indexFlushThreshold {
iea.flushUncommitted()
}
} else if iea.uncommitted.ops > indexFlushThreshold {
iea.flushUncommitted()
}
return nil
}
// Delete adds a row to be deleted when these edits are eventually applied.
func (iea *indexEditAccumulatorImpl) Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error {
if _, ok := iea.uncommitted.adds[keyHash]; ok {
delete(iea.uncommitted.adds, keyHash)
delete(iea.uncommitted.partialAdds[partialKeyHash], keyHash)
} else {
iea.uncommitted.deletes[keyHash] = &hashedTuple{key, value, partialKeyHash}
}
iea.uncommitted.ops++
if iea.flushingUncommitted {
iea.uncommittedEA.AddEdit(key, nil)
if iea.uncommitted.ops-iea.lastFlush > indexFlushThreshold {
iea.flushUncommitted()
}
} else if iea.uncommitted.ops > indexFlushThreshold {
iea.flushUncommitted()
}
return nil
}
// Has returns whether the current indexEditAccumulatorImpl contains the given key. This assumes that the given hash is for
// the given key.
func (iea *indexEditAccumulatorImpl) Has(ctx context.Context, keyHash hash.Hash, key types.Tuple) (bool, error) {
// in order of most recent changes to least recent falling back to whats in the materialized row data
orderedMods := []*inMemIndexEdits{iea.uncommitted, iea.committed}
for _, mods := range orderedMods {
added, deleted := mods.Has(keyHash)
if added {
return true, nil
} else if deleted {
return false, nil
}
}
_, ok, err := iea.rowData.MaybeGetTuple(ctx, key)
return ok, err
}
// HasPartial returns whether the current indexEditAccumulatorImpl contains the given partial key. This assumes that the
// given hash is for the given key. The hashes returned represent the hash of the returned tuple.
func (iea *indexEditAccumulatorImpl) HasPartial(ctx context.Context, idxSch schema.Schema, partialKeyHash hash.Hash, partialKey types.Tuple) ([]hashedTuple, error) {
if hasNulls, err := partialKey.Contains(types.NullValue); err != nil {
return nil, err
} else if hasNulls { // rows with NULL are considered distinct, and therefore we do not match on them
return nil, nil
}
var err error
var matches []hashedTuple
var mapIter table.ReadCloser = noms.NewNomsRangeReader(iea.vr, idxSch, iea.rowData, []*noms.ReadRange{
{Start: partialKey, Inclusive: true, Reverse: false, Check: noms.InRangeCheckPartial(partialKey)}})
defer mapIter.Close(ctx)
var r row.Row
for r, err = mapIter.ReadRow(ctx); err == nil; r, err = mapIter.ReadRow(ctx) {
tplKeyVal, err := r.NomsMapKey(idxSch).Value(ctx)
if err != nil {
return nil, err
}
key := tplKeyVal.(types.Tuple)
tplValVal, err := r.NomsMapValue(idxSch).Value(ctx)
if err != nil {
return nil, err
}
val := tplValVal.(types.Tuple)
keyHash, err := key.Hash(key.Format())
if err != nil {
return nil, err
}
matches = append(matches, hashedTuple{key, val, keyHash})
}
if err != io.EOF {
return nil, err
}
// reapply partial key edits in order
orderedMods := []*inMemIndexEdits{iea.committed, iea.uncommitted}
for _, mods := range orderedMods {
for i := len(matches) - 1; i >= 0; i-- {
// If we've removed a key that's present here, remove it from the slice
if _, ok := mods.deletes[matches[i].hash]; ok {
matches[i] = matches[len(matches)-1]
matches = matches[:len(matches)-1]
}
}
for addedHash, addedTpl := range mods.partialAdds[partialKeyHash] {
matches = append(matches, hashedTuple{addedTpl, types.EmptyTuple(addedTpl.Format()), addedHash})
}
}
return matches, nil
}
// Commit applies the in memory edits to the list of committed in memory edits
func (iea *indexEditAccumulatorImpl) Commit(ctx context.Context, nbf *types.NomsBinFormat) error {
if iea.uncommitted.ops > 0 {
if !iea.flushingUncommitted {
// if there are uncommitted changes add them to the committed list of map edits
for _, ht := range iea.uncommitted.adds {
iea.commitEA.AddEdit(ht.key, ht.value)
}
for _, ht := range iea.uncommitted.deletes {
iea.commitEA.AddEdit(ht.key, nil)
}
} else {
// if we were flushing to the uncommittedEA make the current uncommittedEA the active committedEA and add
// any uncommittedEA IDs that we already flushed
iea.commitEA = iea.uncommittedEA
iea.commitEAId = iea.uncommittedEAId
iea.committedEaIds.Add(iea.uncommittedEaIds.AsSlice()...)
// reset state to not be flushing uncommitted
iea.uncommittedEA = nil
iea.uncommittedEAId = invalidEaId
iea.uncommittedEaIds = set.NewUint64Set(nil)
iea.lastFlush = 0
iea.flushingUncommitted = false
}
// apply in memory uncommitted changes to the committed in memory edits
iea.committed.MergeIn(iea.uncommitted)
// initialize uncommitted to future in memory edits
iea.uncommitted = newInMemIndexEdits()
}
return nil
}
// Rollback rolls back in memory edits until it reaches the state represented by the savedTea
func (iea *indexEditAccumulatorImpl) Rollback(ctx context.Context) error {
// drop uncommitted ea IDs
iea.uncommittedEaIds = set.NewUint64Set(nil)
if iea.uncommitted.ops > 0 {
iea.uncommitted = newInMemIndexEdits()
if iea.flushingUncommitted {
_ = iea.uncommittedEA.Close(ctx)
iea.uncommittedEA = nil
iea.uncommittedEAId = invalidEaId
iea.uncommittedEaIds = set.NewUint64Set(nil)
iea.lastFlush = 0
iea.flushingUncommitted = false
}
}
return nil
}
// MaterializeEdits applies the in memory edits to the row data and returns types.Map
func (iea *indexEditAccumulatorImpl) MaterializeEdits(ctx context.Context, nbf *types.NomsBinFormat) (m types.Map, err error) {
err = iea.Commit(ctx, nbf)
if err != nil {
return types.EmptyMap, err
}
if iea.committed.ops == 0 {
return iea.rowData, nil
}
committedEP, err := iea.commitEA.FinishedEditing(ctx)
iea.commitEA = nil
if err != nil {
return types.EmptyMap, err
}
flushedEPs, err := iea.flusher.WaitForIDs(ctx, iea.committedEaIds)
if err != nil {
return types.EmptyMap, err
}
eps := make([]types.EditProvider, 0, len(flushedEPs)+1)
for i := 0; i < len(flushedEPs); i++ {
eps = append(eps, flushedEPs[i].Edits)
}
eps = append(eps, committedEP)
defer func() {
for _, ep := range eps {
_ = ep.Close(ctx)
}
}()
accEdits, err := edits.NewEPMerger(ctx, iea.vr, eps)
if err != nil {
return types.EmptyMap, err
}
// We are guaranteed that rowData is valid, as we process ieas sequentially.
updatedMap, _, err := types.ApplyEdits(ctx, accEdits, iea.rowData)
if err != nil {
return types.EmptyMap, err
}
iea.rowData = updatedMap
iea.committed = newInMemIndexEdits()
iea.commitEAId = iea.accumulatorIdx
iea.accumulatorIdx++
iea.commitEA = edits.NewAsyncSortedEditsWithDefaults(iea.vr)
iea.committedEaIds = set.NewUint64Set(nil)
iea.uncommittedEaIds = set.NewUint64Set(nil)
return updatedMap, nil
}
@@ -1,254 +0,0 @@
// Copyright 2020-2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package editor
import (
"context"
"fmt"
"sync"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/types"
)
const rebuildIndexFlushInterval = 1 << 25
var _ error = (*uniqueKeyErr)(nil)
// uniqueKeyErr is an error that is returned when a unique constraint has been violated. It contains the index key
// (which is the full row).
type uniqueKeyErr struct {
TableTuple types.Tuple
IndexTuple types.Tuple
IndexName string
}
// Error implements the error interface.
func (u *uniqueKeyErr) Error() string {
keyStr, _ := formatKey(context.Background(), u.IndexTuple)
return fmt.Sprintf("duplicate unique key given: %s", keyStr)
}
// NOTE: Regarding partial keys and full keys. For this example, let's say that our table has a primary key W, with
// non-pk columns X, Y, and Z. You then declare an index over X and Y (in that order). In the table map containing all of
// the rows for the table, each row is composed of two tuples: the first tuple is called the key, the second tuple is
// called the value. The key is the entire primary key, which in this case is Tuple<W> (tags are ignored for this
// example). The value is the remaining three columns: Tuple<X,Y,Z>. Therefore, a row in the table map is
// Row(Tuple<W>,Tuple<X,Y,Z>).
//
// The index map containing all of the rows for the index also follows this format of key and value tuples. However,
// indexes store all of the columns in the key, and have an empty value tuple. An index key contains the indexed columns
// in the order they were defined, along with any primary keys that were not defined in the index. Thus, our example key
// looks like Tuple<X,Y,W>. We refer to this key as the full key in the index context, as with the full key you can
// construct an index row, as it's simply adding an empty tuple to the value, i.e. Row(Tuple<X,Y,W>,Tuple<>). Also with
// a full key, you can find the table row that matches this index row, as the entire primary key (just W) is in the full
// key.
//
// In both the table and index maps, keys are sorted. This means that given X and Y values for the index, we can
// construct a tuple with just those values, Tuple<X,Y>, and find all of the rows in the table with those two values by
// the appended primary key(s). We refer to this prefix of the full key as a partial key. It's easy to think of partial
// keys as just the indexed columns (Tuple<X,Y>), and the full key as the partial key along with the referring primary
// key (Tuple<X,Y> + W = Tuple<X,Y,W>).
// IndexEditor takes in changes to an index map and returns the updated map if changes have been made.
// This type is thread-safe, and may be used in a multi-threaded environment.
type IndexEditor struct {
nbf *types.NomsBinFormat
idxSch schema.Schema
tblSch schema.Schema
idx schema.Index
iea IndexEditAccumulator
stack indexOperationStack
permanentErr error // If this is set then we should always return this error as the IndexEditor is no longer usable
// This mutex blocks on each operation, so that map reads and updates are serialized
writeMutex *sync.Mutex
}
// InsertRow adds the given row to the index. If the row already exists and the index is unique, then an error is returned.
// Otherwise, it is a no-op.
func (ie *IndexEditor) InsertRow(ctx context.Context, key, partialKey types.Tuple, value types.Tuple) error {
return ie.InsertRowWithDupCb(ctx, key, partialKey, value, func(ctx context.Context, uke *uniqueKeyErr) error {
msg, err := formatKey(context.Background(), uke.IndexTuple)
if err != nil {
return err
}
// The only secondary index that can throw unique key errors is a unique index
return sql.NewUniqueKeyErr(msg, !ie.idx.IsUnique(), nil)
})
}
// InsertRowWithDupCb adds the given row to the index. If the row already exists and the
// index is unique, then a uniqueKeyErr is passed to |cb|. If |cb| returns a non-nil
// error then the insert is aborted. Otherwise, the insert proceeds.
func (ie *IndexEditor) InsertRowWithDupCb(ctx context.Context, key, partialKey types.Tuple, value types.Tuple, cb func(ctx context.Context, uke *uniqueKeyErr) error) error {
keyHash, err := key.Hash(key.Format())
if err != nil {
return err
}
partialKeyHash, err := partialKey.Hash(partialKey.Format())
if err != nil {
return err
}
ie.writeMutex.Lock()
defer ie.writeMutex.Unlock()
if ie.permanentErr != nil {
return ie.permanentErr
}
if ie.idx.IsUnique() {
if matches, err := ie.iea.HasPartial(ctx, ie.idxSch, partialKeyHash, partialKey); err != nil {
return err
} else if len(matches) > 0 {
tableTuple, err := ie.idx.ToTableTuple(ctx, matches[0].key, ie.nbf)
if err != nil {
return err
}
cause := &uniqueKeyErr{tableTuple, partialKey, ie.idx.Name()}
err = cb(ctx, cause)
if err != nil {
return err
}
}
} else {
if rowExists, err := ie.iea.Has(ctx, keyHash, key); err != nil {
return err
} else if rowExists && value.Empty() {
ie.stack.Push(true, types.EmptyTuple(key.Format()), types.EmptyTuple(key.Format()), types.EmptyTuple(value.Format()))
return nil
}
}
err = ie.iea.Insert(ctx, keyHash, partialKeyHash, key, value)
if err != nil {
return err
}
ie.stack.Push(true, key, partialKey, value)
return nil
}
// DeleteRow removes the given row from the index.
func (ie *IndexEditor) DeleteRow(ctx context.Context, key, partialKey, value types.Tuple) error {
keyHash, err := key.Hash(ie.nbf)
if err != nil {
return err
}
partialKeyHash, err := partialKey.Hash(partialKey.Format())
if err != nil {
return err
}
ie.writeMutex.Lock()
defer ie.writeMutex.Unlock()
if ie.permanentErr != nil {
return ie.permanentErr
}
err = ie.iea.Delete(ctx, keyHash, partialKeyHash, key, value)
if err != nil {
return err
}
ie.stack.Push(false, key, partialKey, value)
return nil
}
// HasPartial returns whether the index editor has the given partial key.
func (ie *IndexEditor) HasPartial(ctx context.Context, partialKey types.Tuple) (bool, error) {
partialKeyHash, err := partialKey.Hash(partialKey.Format())
if err != nil {
return false, err
}
ie.writeMutex.Lock()
defer ie.writeMutex.Unlock()
if ie.permanentErr != nil {
return false, ie.permanentErr
}
tpls, err := ie.iea.HasPartial(ctx, ie.idxSch, partialKeyHash, partialKey)
if err != nil {
return false, err
}
return len(tpls) > 0, nil
}
// Undo will cause the index editor to undo the last operation at the top of the stack. As Insert and Delete are called,
// they are added onto a limited-size stack, and Undo pops an operation off the top and undoes it. So if there was an
// Insert on a key, it will use Delete on that same key. The stack size is very small, therefore too many consecutive
// calls will cause the stack to empty. This should only be called in the event that an operation was performed that
// has failed for other reasons, such as an INSERT on the parent table failing on a separate index editor. In the event
// that Undo is called and there are no operations to undo OR the reverse operation fails (such as memory capacity
// reached), then we set a permanent error as the index editor is in an invalid state that cannot be corrected.
//
// We don't return an error here as Undo will only be called when there's an error in a different editor. We allow the
// user to handle that initial error, as ALL further calls to this IndexEditor will return the error set here.
func (ie *IndexEditor) Undo(ctx context.Context) {
if ie.permanentErr != nil {
return
}
indexOp, ok := ie.stack.Pop()
if !ok {
panic(fmt.Sprintf("attempted to undo the last operation on index '%s' but failed due to an empty stack", ie.idx.Name()))
}
// If an operation succeeds and does not do anything, then an empty tuple is pushed onto the stack.
if indexOp.fullKey.Empty() {
return
}
if indexOp.isInsert {
err := ie.DeleteRow(ctx, indexOp.fullKey, indexOp.partialKey, indexOp.value)
if err != nil {
ie.permanentErr = fmt.Errorf("index '%s' is in an invalid and unrecoverable state: "+
"attempted to undo previous insertion but encountered the following error: %v",
ie.idx.Name(), err)
return
}
} else {
err := ie.InsertRow(ctx, indexOp.fullKey, indexOp.partialKey, indexOp.value)
if err != nil {
ie.permanentErr = fmt.Errorf("index '%s' is in an invalid and unrecoverable state: "+
"attempted to undo previous deletion but encountered the following error: %v",
ie.idx.Name(), err)
return
}
}
}
// Map returns a map based on the edits given, if any.
func (ie *IndexEditor) Map(ctx context.Context) (types.Map, error) {
ie.writeMutex.Lock()
defer ie.writeMutex.Unlock()
if ie.permanentErr != nil {
return types.EmptyMap, ie.permanentErr
}
return ie.iea.MaterializeEdits(ctx, ie.nbf)
}
// Close is a no-op for an IndexEditor.
func (ie *IndexEditor) Close() error {
return ie.permanentErr
}
@@ -1,61 +0,0 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package editor
import "github.com/dolthub/dolt/go/store/types"
// indexOperationStack is a limited-size stack, intended for usage with the index editor and its undo functionality.
// As operations are added, the internal array is filled up. Once it is full, new operations replace the oldest ones.
// This reduces memory usage compared to a traditional stack with an unbounded size, as undo should always come
// immediately after an operation is added.
type indexOperationStack struct {
// entries has a length of 4 as an UPDATE on a table is a Delete & Insert on the index, so we double it for safety.
entries [4]indexOperation
// This is the index of the next item we are adding. Add at this index, then increment.
currentIndex uint64
// Represents the number of items relative to the "stack size".
numOfItems uint64
}
// indexOperation is an operation performed by the index editor, along with the key used.
type indexOperation struct {
isInsert bool
fullKey types.Tuple
partialKey types.Tuple
value types.Tuple
}
// Push adds the given keys to the top of the stack.
func (ios *indexOperationStack) Push(isInsert bool, fullKey, partialKey, value types.Tuple) {
ios.entries[ios.currentIndex].isInsert = isInsert
ios.entries[ios.currentIndex].fullKey = fullKey
ios.entries[ios.currentIndex].partialKey = partialKey
ios.entries[ios.currentIndex].value = value
ios.currentIndex = (ios.currentIndex + 1) % uint64(len(ios.entries))
ios.numOfItems++
if ios.numOfItems > uint64(len(ios.entries)) {
ios.numOfItems = uint64(len(ios.entries))
}
}
// Pop removes and returns the keys from the top of the stack. Returns false if the stack is empty.
func (ios *indexOperationStack) Pop() (indexOperation, bool) {
if ios.numOfItems == 0 {
return indexOperation{}, false
}
ios.numOfItems--
ios.currentIndex = (ios.currentIndex - 1) % uint64(len(ios.entries))
return ios.entries[ios.currentIndex], true
}
@@ -1,96 +0,0 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package editor
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/types"
)
func TestIndexOperationStack(t *testing.T) {
ios := &indexOperationStack{}
require.True(t, len(ios.entries) >= 2) // Entries should always at least have a length of 2
ios.Push(true, iosTuple(t, 100, 100), iosTuple(t, 100), iosTuple(t, 0))
entry, ok := ios.Pop()
require.True(t, ok)
iosTupleComp(t, entry.fullKey, 100, 100)
iosTupleComp(t, entry.partialKey, 100)
iosTupleComp(t, entry.value, 0)
require.True(t, entry.isInsert)
_, ok = ios.Pop()
require.False(t, ok)
for i := 0; i < len(ios.entries); i++ {
ios.Push(false, iosTuple(t, i, i), iosTuple(t, i), iosTuple(t, i*2))
}
for i := len(ios.entries) - 1; i >= 0; i-- {
entry, ok = ios.Pop()
require.True(t, ok)
iosTupleComp(t, entry.fullKey, i, i)
iosTupleComp(t, entry.partialKey, i)
iosTupleComp(t, entry.partialKey, i*2)
require.False(t, entry.isInsert)
}
_, ok = ios.Pop()
require.False(t, ok)
for i := 0; i < (len(ios.entries)*2)+1; i++ {
ios.Push(true, iosTuple(t, i, i), iosTuple(t, i), iosTuple(t, i*2))
}
for i := len(ios.entries) - 1; i >= 0; i-- {
entry, ok = ios.Pop()
require.True(t, ok)
val := ((len(ios.entries) * 2) + 1) - i
iosTupleComp(t, entry.fullKey, val, val)
iosTupleComp(t, entry.partialKey, val)
iosTupleComp(t, entry.value, val*2)
require.True(t, entry.isInsert)
}
_, ok = ios.Pop()
require.False(t, ok)
}
func iosTuple(t *testing.T, vals ...int) types.Tuple {
typeVals := make([]types.Value, len(vals))
for i, val := range vals {
typeVals[i] = types.Int(val)
}
tpl, err := types.NewTuple(types.Format_Default, typeVals...)
if err != nil {
require.NoError(t, err)
}
return tpl
}
func iosTupleComp(t *testing.T, tpl types.Tuple, vals ...int) bool {
if tpl.Len() != uint64(len(vals)) {
return false
}
iter, err := tpl.Iterator()
require.NoError(t, err)
var i uint64
var val types.Value
for i, val, err = iter.Next(); i < uint64(len(vals)) && err == nil; i, val, err = iter.Next() {
if !types.Int(vals[i]).Equals(val) {
return false
}
}
require.NoError(t, err)
return true
}
@@ -1,16 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package nbf provides TableReadCloser and TableWriteCloser implementations for working with dolt tables in noms.
package noms
@@ -1,333 +0,0 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package noms
import (
"context"
"errors"
"fmt"
"io"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/types"
)
// InRangeCheck evaluates tuples to determine whether they are valid and/or should be skipped.
type InRangeCheck interface {
// Check is a call made as the reader reads through values to check that the next value either being read is valid
// and whether it should be skipped or returned.
Check(ctx context.Context, vr types.ValueReader, tuple types.Tuple) (valid bool, skip bool, err error)
}
// InRangeCheckAlways will always return that the given tuple is valid and not to be skipped.
type InRangeCheckAlways struct{}
func (InRangeCheckAlways) Check(context.Context, types.ValueReader, types.Tuple) (valid bool, skip bool, err error) {
return true, false, nil
}
func (InRangeCheckAlways) String() string {
return "Always"
}
// InRangeCheckNever will always return that the given tuple is not valid.
type InRangeCheckNever struct{}
func (InRangeCheckNever) Check(context.Context, types.ValueReader, types.Tuple) (valid bool, skip bool, err error) {
return false, false, nil
}
func (InRangeCheckNever) String() string {
return "Never"
}
// InRangeCheckPartial will check if the given tuple contains the aliased tuple as a partial key.
type InRangeCheckPartial types.Tuple
func (ircp InRangeCheckPartial) Check(_ context.Context, vr types.ValueReader, t types.Tuple) (valid bool, skip bool, err error) {
return t.StartsWith(types.Tuple(ircp)), false, nil
}
func (ircp InRangeCheckPartial) String() string {
return fmt.Sprintf("StartsWith(%v)", types.Tuple(ircp).HumanReadableString())
}
// ReadRange represents a range of values to be read
type ReadRange struct {
// Start is a Dolt map key which is the starting point (or ending point if Reverse is true)
Start types.Tuple
// Inclusive says whether the Start key should be included in the range.
Inclusive bool
// Reverse says if the range should be read in reverse (from high to low) instead of the default (low to high)
Reverse bool
// Check is a callb made as the reader reads through values to check that the next value being read is in the range.
Check InRangeCheck
}
func (rr *ReadRange) String() string {
return fmt.Sprintf("ReadRange[Start: %v, Inclusive: %t, Reverse %t, Check: %v]", rr.Start.HumanReadableString(), rr.Inclusive, rr.Reverse, rr.Check)
}
// NewRangeEndingAt creates a range with a starting key which will be iterated in reverse
func NewRangeEndingAt(key types.Tuple, inRangeCheck InRangeCheck) *ReadRange {
return &ReadRange{
Start: key,
Inclusive: true,
Reverse: true,
Check: inRangeCheck,
}
}
// NewRangeEndingBefore creates a range starting before the provided key iterating in reverse
func NewRangeEndingBefore(key types.Tuple, inRangeCheck InRangeCheck) *ReadRange {
return &ReadRange{
Start: key,
Inclusive: false,
Reverse: true,
Check: inRangeCheck,
}
}
// NewRangeStartingAt creates a range with a starting key
func NewRangeStartingAt(key types.Tuple, inRangeCheck InRangeCheck) *ReadRange {
return &ReadRange{
Start: key,
Inclusive: true,
Reverse: false,
Check: inRangeCheck,
}
}
// NewRangeStartingAfter creates a range starting after the provided key
func NewRangeStartingAfter(key types.Tuple, inRangeCheck InRangeCheck) *ReadRange {
return &ReadRange{
Start: key,
Inclusive: false,
Reverse: false,
Check: inRangeCheck,
}
}
// NomsRangeReader reads values in one or more ranges from a map
type NomsRangeReader struct {
vr types.ValueReader
sch schema.Schema
m types.Map
ranges []*ReadRange
idx int
itr types.MapIterator
currCheck InRangeCheck
cardCounter *CardinalityCounter
}
// NewNomsRangeReader creates a NomsRangeReader
func NewNomsRangeReader(vr types.ValueReader, sch schema.Schema, m types.Map, ranges []*ReadRange) *NomsRangeReader {
return &NomsRangeReader{
vr,
sch,
m,
ranges,
0,
nil,
nil,
NewCardinalityCounter(),
}
}
// GetSchema gets the schema of the rows being read.
func (nrr *NomsRangeReader) GetSchema() schema.Schema {
return nrr.sch
}
// ReadRow reads a row from a table. If there is a bad row the returned error will be non nil, and calling
// IsBadRow(err) will be return true. This is a potentially non-fatal error and callers can decide if they want to
// continue on a bad row, or fail.
func (nrr *NomsRangeReader) ReadRow(ctx context.Context) (row.Row, error) {
k, v, err := nrr.ReadKV(ctx)
if err != nil {
return nil, err
}
return row.FromNoms(nrr.sch, k, v)
}
func (nrr *NomsRangeReader) ReadKey(ctx context.Context) (types.Tuple, error) {
k, _, err := nrr.ReadKV(ctx)
return k, err
}
func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Tuple, types.Tuple, error) {
var err error
var k types.Tuple
var v types.Tuple
for nrr.itr != nil || nrr.idx < len(nrr.ranges) {
if !nrr.cardCounter.empty() {
if nrr.cardCounter.done() {
nrr.cardCounter.reset()
} else {
return nrr.cardCounter.next()
}
}
if nrr.itr == nil {
r := nrr.ranges[nrr.idx]
nrr.idx++
if r.Reverse {
nrr.itr, err = nrr.m.IteratorBackFrom(ctx, r.Start)
} else {
nrr.itr, err = nrr.m.IteratorFrom(ctx, r.Start)
}
if err != nil {
return types.Tuple{}, types.Tuple{}, err
}
nrr.currCheck = r.Check
k, v, err = nrr.itr.NextTuple(ctx)
if err == nil && !r.Inclusive {
var res int
res, err = r.Start.Compare(ctx, nrr.vr.Format(), k)
if err == nil && res == 0 {
k, v, err = nrr.itr.NextTuple(ctx)
}
}
} else {
k, v, err = nrr.itr.NextTuple(ctx)
}
if err != nil && err != io.EOF {
return types.Tuple{}, types.Tuple{}, err
}
if err != io.EOF {
valid, skip, err := nrr.currCheck.Check(ctx, nrr.vr, k)
if err != nil {
return types.Tuple{}, types.Tuple{}, err
}
if valid {
if skip {
continue
}
if !v.Empty() {
nrr.cardCounter.updateWithKV(k, v)
if !nrr.cardCounter.empty() && !nrr.cardCounter.done() {
return nrr.cardCounter.next()
}
}
return k, v, nil
}
}
nrr.itr = nil
nrr.currCheck = nil
}
return types.Tuple{}, types.Tuple{}, io.EOF
}
// VerifySchema checks that the incoming schema matches the schema from the existing table
func (nrr *NomsRangeReader) VerifySchema(outSch schema.Schema) (bool, error) {
return schema.VerifyInSchema(nrr.sch, outSch)
}
// Close should release resources being held
func (nrr *NomsRangeReader) Close(ctx context.Context) error {
return nil
}
type CardinalityCounter struct {
key *types.Tuple
value *types.Tuple
card int
idx int
}
func NewCardinalityCounter() *CardinalityCounter {
return &CardinalityCounter{
nil,
nil,
-1,
-1,
}
}
func (cc *CardinalityCounter) updateWithKV(k, v types.Tuple) error {
if !v.Empty() {
cardTagVal, err := v.Get(0)
if err != nil {
return err
}
cardTag, ok := cardTagVal.(types.Uint)
if !ok {
return errors.New("index cardinality invalid tag type")
}
if uint64(cardTag) != schema.KeylessRowCardinalityTag {
return errors.New("index cardinality tag invalid")
}
cardVal, err := v.Get(1)
if err != nil {
return err
}
card, ok := cardVal.(types.Uint)
if !ok {
return errors.New("index cardinality value invalid type")
}
if int(card) > 1 {
cc.card = int(card)
cc.idx = 0
cc.key = &k
cc.value = &v
return nil
} else {
cc.card = -1
cc.idx = -1
cc.key = nil
cc.value = nil
}
}
return nil
}
func (cc *CardinalityCounter) empty() bool {
return cc.key == nil || cc.value == nil
}
func (cc *CardinalityCounter) done() bool {
return cc.card < 1 || cc.idx >= cc.card
}
func (cc *CardinalityCounter) next() (types.Tuple, types.Tuple, error) {
if cc.key == nil || cc.value == nil {
return types.Tuple{}, types.Tuple{}, errors.New("cannot increment empty cardinality counter")
}
cc.idx++
return *cc.key, *cc.value, nil
}
func (cc *CardinalityCounter) reset() {
cc.card = -1
cc.idx = -1
cc.key = nil
cc.value = nil
}
@@ -1,204 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package noms
import (
"context"
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/types"
)
var rangeReaderTests = []struct {
name string
ranges []*ReadRange
expectKeys []int64
}{
{
"test range ending at",
[]*ReadRange{NewRangeEndingAt(mustTuple(10), greaterThanCheck(2))},
[]int64{10, 8, 6, 4},
},
{
"test range ending before",
[]*ReadRange{NewRangeEndingBefore(mustTuple(10), greaterThanCheck(2))},
[]int64{8, 6, 4},
},
{
"test range starting at",
[]*ReadRange{NewRangeStartingAt(mustTuple(10), lessThanCheck(20))},
[]int64{10, 12, 14, 16, 18},
},
{
"test range starting after",
[]*ReadRange{NewRangeStartingAfter(mustTuple(10), lessThanCheck(20))},
[]int64{12, 14, 16, 18},
},
{
"test range iterating to the end",
[]*ReadRange{NewRangeStartingAt(mustTuple(100), lessThanCheck(200))},
[]int64{100},
},
{
"test multiple ranges",
[]*ReadRange{
NewRangeEndingBefore(mustTuple(10), greaterThanCheck(2)),
NewRangeStartingAt(mustTuple(10), lessThanCheck(20)),
},
[]int64{8, 6, 4, 10, 12, 14, 16, 18},
},
{
"test empty range starting after",
[]*ReadRange{NewRangeStartingAfter(mustTuple(100), lessThanCheck(200))},
[]int64(nil),
},
{
"test empty range starting at",
[]*ReadRange{NewRangeStartingAt(mustTuple(101), lessThanCheck(200))},
[]int64(nil),
},
{
"test empty range ending before",
[]*ReadRange{NewRangeEndingBefore(mustTuple(0), greaterThanCheck(-100))},
[]int64(nil),
},
{
"test empty range ending at",
[]*ReadRange{NewRangeEndingAt(mustTuple(-1), greaterThanCheck(-100))},
[]int64(nil),
},
}
func mustTuple(id int64) types.Tuple {
t, err := types.NewTuple(types.Format_Default, types.Uint(pkTag), types.Int(id))
if err != nil {
panic(err)
}
return t
}
func TestRangeReader(t *testing.T) {
ctx := context.Background()
colColl := schema.NewColCollection(
schema.NewColumn("id", pkTag, types.IntKind, true),
schema.NewColumn("val", valTag, types.IntKind, false))
sch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
storage := &chunks.MemoryStorage{}
vrw := types.NewValueStore(storage.NewView())
m, err := types.NewMap(ctx, vrw)
assert.NoError(t, err)
me := m.Edit()
for i := 0; i <= 100; i += 2 {
k, err := types.NewTuple(vrw.Format(), types.Uint(pkTag), types.Int(i))
require.NoError(t, err)
v, err := types.NewTuple(vrw.Format(), types.Uint(valTag), types.Int(100-i))
require.NoError(t, err)
me.Set(k, v)
}
m, err = me.Map(ctx)
assert.NoError(t, err)
for _, test := range rangeReaderTests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
rd := NewNomsRangeReader(vrw, sch, m, test.ranges)
var keys []int64
for {
r, err := rd.ReadRow(ctx)
if err == io.EOF {
break
}
assert.NoError(t, err)
col0, ok := r.GetColVal(0)
assert.True(t, ok)
keys = append(keys, int64(col0.(types.Int)))
}
err = rd.Close(ctx)
assert.NoError(t, err)
assert.Equal(t, test.expectKeys, keys)
})
}
}
func TestRangeReaderOnEmptyMap(t *testing.T) {
ctx := context.Background()
colColl := schema.NewColCollection(
schema.NewColumn("id", pkTag, types.IntKind, true),
schema.NewColumn("val", valTag, types.IntKind, false))
sch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
storage := &chunks.MemoryStorage{}
vrw := types.NewValueStore(storage.NewView())
m, err := types.NewMap(ctx, vrw)
assert.NoError(t, err)
for _, test := range rangeReaderTests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
rd := NewNomsRangeReader(vrw, sch, m, test.ranges)
r, err := rd.ReadRow(ctx)
assert.Equal(t, io.EOF, err)
assert.Nil(t, r)
})
}
}
type greaterThanCheck int64
func (n greaterThanCheck) Check(ctx context.Context, _ types.ValueReader, k types.Tuple) (valid bool, skip bool, err error) {
col0, err := k.Get(1)
if err != nil {
panic(err)
}
return int64(col0.(types.Int)) > int64(n), false, nil
}
type lessThanCheck int64
func (n lessThanCheck) Check(ctx context.Context, _ types.ValueReader, k types.Tuple) (valid bool, skip bool, err error) {
col0, err := k.Get(1)
if err != nil {
panic(err)
}
return int64(col0.(types.Int)) < int64(n), false, nil
}
@@ -1,74 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package noms
import (
"context"
"io"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/types"
)
type StatsCB func(stats types.AppliedEditStats)
// NomsMapReader is a TableReader that reads rows from a noms table which is stored in a types.Map where the key is
// a types.Value and the value is a types.Tuple of field values.
type NomsMapReader struct {
sch schema.Schema
itr types.MapIterator
}
// NewNomsMapReader creates a NomsMapReader for a given noms types.Map
func NewNomsMapReader(ctx context.Context, m types.Map, sch schema.Schema) (*NomsMapReader, error) {
itr, err := m.Iterator(ctx)
if err != nil {
return nil, err
}
return &NomsMapReader{sch, itr}, nil
}
// GetSchema gets the schema of the rows that this reader will return
func (nmr *NomsMapReader) GetSchema() schema.Schema {
return nmr.sch
}
// ReadRow reads a row from a table. If there is a bad row the returned error will be non nil, and callin IsBadRow(err)
// will be return true. This is a potentially non-fatal error and callers can decide if they want to continue on a bad row, or fail.
func (nmr *NomsMapReader) ReadRow(ctx context.Context) (row.Row, error) {
key, val, err := nmr.itr.Next(ctx)
if err != nil {
return nil, err
} else if key == nil {
return nil, io.EOF
}
return row.FromNoms(nmr.sch, key.(types.Tuple), val.(types.Tuple))
}
// Close should release resources being held
func (nmr *NomsMapReader) Close(ctx context.Context) error {
nmr.itr = nil
return nil
}
// VerifySchema checks that the incoming schema matches the schema from the existing table
func (nmr *NomsMapReader) VerifySchema(outSch schema.Schema) (bool, error) {
return schema.VerifyInSchema(nmr.sch, outSch)
}
@@ -1,108 +0,0 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package noms
import (
"context"
"io"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/types"
)
// KeyIterator is an interface for iterating through a collection of keys
type KeyIterator interface {
// Next returns the next key in the collection. When all keys are exhausted nil, io.EOF must be returned.
Next() (types.Value, error)
}
// SliceOfKeysIterator is a KeyIterator implementation backed by a slice of keys which are iterated in order
type SliceOfKeysIterator struct {
keys []types.Tuple
idx int
}
// Next returns the next key in the slice. When all keys are exhausted nil, io.EOF is be returned.
func (sokItr *SliceOfKeysIterator) Next() (types.Value, error) {
if sokItr.idx < len(sokItr.keys) {
k := sokItr.keys[sokItr.idx]
sokItr.idx++
return k, nil
}
return nil, io.EOF
}
// NomsMapReaderForKeys implements TableReadCloser
type NomsMapReaderForKeys struct {
sch schema.Schema
m types.Map
keyItr KeyIterator
}
// NewNomsMapReaderForKeys creates a NomsMapReaderForKeys for a given noms types.Map, and a list of keys
func NewNomsMapReaderForKeys(m types.Map, sch schema.Schema, keys []types.Tuple) *NomsMapReaderForKeys {
return NewNomsMapReaderForKeyItr(m, sch, &SliceOfKeysIterator{keys, 0})
}
// NewNomsMapReaderForKeyItr creates a NomsMapReaderForKeys for a given noms types.Map, and a list of keys
func NewNomsMapReaderForKeyItr(m types.Map, sch schema.Schema, keyItr KeyIterator) *NomsMapReaderForKeys {
return &NomsMapReaderForKeys{sch, m, keyItr}
}
// GetSchema gets the schema of the rows being read.
func (nmr *NomsMapReaderForKeys) GetSchema() schema.Schema {
return nmr.sch
}
// ReadRow reads a row from a table. If there is a bad row the returned error will be non nil, and calling
// IsBadRow(err) will be return true. This is a potentially non-fatal error and callers can decide if they want to
// continue on a bad row, or fail.
func (nmr *NomsMapReaderForKeys) ReadRow(ctx context.Context) (row.Row, error) {
var key types.Value
var value types.Value
var err error
for value == nil {
key, err = nmr.keyItr.Next()
if err != nil {
return nil, err
}
v, ok, err := nmr.m.MaybeGet(ctx, key)
if err != nil {
return nil, err
}
if ok {
value = v
}
}
return row.FromNoms(nmr.sch, key.(types.Tuple), value.(types.Tuple))
}
// VerifySchema checks that the incoming schema matches the schema from the existing table
func (nmr *NomsMapReaderForKeys) VerifySchema(outSch schema.Schema) (bool, error) {
return schema.VerifyInSchema(nmr.sch, outSch)
}
// Close should release resources being held
func (nmr *NomsMapReaderForKeys) Close(ctx context.Context) error {
return nil
}
@@ -1,144 +0,0 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package noms
import (
"context"
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/types"
)
const (
pkTag uint64 = iota
valTag
)
func TestReaderForKeys(t *testing.T) {
ctx := context.Background()
colColl := schema.NewColCollection(
schema.NewColumn("id", pkTag, types.IntKind, true),
schema.NewColumn("val", valTag, types.IntKind, false))
sch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
storage := &chunks.MemoryStorage{}
vrw := types.NewValueStore(storage.NewView())
m, err := types.NewMap(ctx, vrw)
assert.NoError(t, err)
me := m.Edit()
for i := 0; i <= 100; i += 2 {
k, err := types.NewTuple(vrw.Format(), types.Uint(pkTag), types.Int(i))
require.NoError(t, err)
v, err := types.NewTuple(vrw.Format(), types.Uint(valTag), types.Int(100-i))
require.NoError(t, err)
me.Set(k, v)
}
m, err = me.Map(ctx)
assert.NoError(t, err)
tests := []struct {
name string
keys []int
expected []int
}{
{
name: "tens",
keys: []int{10, 20, 30, 40, 50, 60, 70, 80, 90},
expected: []int{10, 20, 30, 40, 50, 60, 70, 80, 90},
},
{
name: "fives",
keys: []int{5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95},
expected: []int{10, 20, 30, 40, 50, 60, 70, 80, 90},
},
{
name: "empty",
keys: []int{},
expected: []int{},
},
{
name: "no keys that are in the map",
keys: []int{-5, -3, -1, 1, 3, 5, 102, 104, 106},
expected: []int{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
rd := NewNomsMapReaderForKeys(m, sch, intKeysToTupleKeys(t, vrw.Format(), test.keys))
var rows []row.Row
for {
r, err := rd.ReadRow(ctx)
if err == io.EOF {
break
}
assert.NoError(t, err)
rows = append(rows, r)
}
testAgainstExpected(t, rows, test.expected)
rd.Close(ctx)
})
}
}
func intKeysToTupleKeys(t *testing.T, nbf *types.NomsBinFormat, keys []int) []types.Tuple {
tupleKeys := make([]types.Tuple, len(keys))
for i, key := range keys {
tuple, err := types.NewTuple(nbf, types.Uint(pkTag), types.Int(key))
require.NoError(t, err)
tupleKeys[i] = tuple
}
return tupleKeys
}
func testAgainstExpected(t *testing.T, rows []row.Row, expected []int) {
assert.Equal(t, len(expected), len(rows))
for i, r := range rows {
k, ok := r.GetColVal(pkTag)
require.True(t, ok)
v, ok := r.GetColVal(valTag)
require.True(t, ok)
kn := int(k.(types.Int))
vn := int(v.(types.Int))
expectedK := expected[i]
expectedV := 100 - expectedK
assert.Equal(t, expectedK, kn)
assert.Equal(t, expectedV, vn)
}
}
@@ -27,21 +27,6 @@ import (
var ErrTableNameMatchSheetName = errors.New("table name must match excel sheet name.")
func UnmarshalFromXLSX(path string) ([][][]string, error) {
data, err := openFile(path)
if err != nil {
return nil, err
}
dataSlice, err := data.ToSlice()
if err != nil {
return nil, err
}
return dataSlice, nil
}
func openFile(path string) (*xlsx.File, error) {
data, err := xlsx.OpenFile(path)
@@ -41,35 +41,6 @@ type XLSXReader struct {
vrw types.ValueReadWriter
}
func OpenXLSXReaderFromBinary(ctx context.Context, vrw types.ValueReadWriter, r io.ReadCloser, info *XLSXFileInfo) (*XLSXReader, error) {
br := bufio.NewReaderSize(r, ReadBufSize)
contents, err := io.ReadAll(r)
if err != nil {
return nil, err
}
colStrs, err := getColHeadersFromBinary(contents, info.SheetName)
if err != nil {
return nil, err
}
data, err := getXlsxRowsFromBinary(contents, info.SheetName)
if err != nil {
return nil, err
}
_, sch := untyped.NewUntypedSchema(colStrs...)
decodedRows, err := decodeXLSXRows(data, sch)
if err != nil {
r.Close()
return nil, err
}
return &XLSXReader{r, br, info, sch, 0, decodedRows, vrw}, nil
}
func OpenXLSXReader(ctx context.Context, vrw types.ValueReadWriter, path string, fs filesys.ReadableFS, info *XLSXFileInfo) (*XLSXReader, error) {
r, err := fs.OpenForRead(path)
@@ -110,16 +81,6 @@ func getColHeadersFromPath(path string, sheetName string) ([]string, error) {
return colHeaders, nil
}
func getColHeadersFromBinary(content []byte, sheetName string) ([]string, error) {
data, err := getXlsxRowsFromBinary(content, sheetName)
if err != nil {
return nil, err
}
colHeaders := data[0][0]
return colHeaders, nil
}
// GetSchema gets the schema of the rows that this reader will return
func (xlsxr *XLSXReader) GetSchema() schema.Schema {
return xlsxr.sch
-148
View File
@@ -1,148 +0,0 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"io"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
)
type DynamicBuffer struct {
blocks [][]byte
blockSize int
}
func New(blockSize int) *DynamicBuffer {
return &DynamicBuffer{blockSize: blockSize}
}
func (buf *DynamicBuffer) Append(bytes []byte) {
blockIdx := len(buf.blocks) - 1
var space int
var pos int
if blockIdx >= 0 {
currBlock := buf.blocks[blockIdx]
pos = len(currBlock)
space = cap(currBlock) - pos
}
for len(bytes) > 0 {
if space == 0 {
for len(bytes) >= buf.blockSize {
buf.blocks = append(buf.blocks, bytes[:buf.blockSize])
bytes = bytes[buf.blockSize:]
blockIdx++
}
if len(bytes) == 0 {
return
}
buf.blocks = append(buf.blocks, make([]byte, 0, buf.blockSize))
pos = 0
space = buf.blockSize
blockIdx++
}
n := len(bytes)
if n > space {
n = space
}
buf.blocks[blockIdx] = buf.blocks[blockIdx][:pos+n]
copy(buf.blocks[blockIdx][pos:], bytes[:n])
bytes = bytes[n:]
space -= n
pos += n
}
}
func (buf *DynamicBuffer) Close() *BufferIterator {
itr := &BufferIterator{blocks: buf.blocks}
buf.blocks = nil
return itr
}
type BufferIterator struct {
blocks [][]byte
i int
}
func (itr *BufferIterator) Next() ([]byte, error) {
if itr.i >= len(itr.blocks) {
return nil, io.EOF
}
next := itr.blocks[itr.i]
itr.i++
return next, nil
}
func (itr *BufferIterator) NumBlocks() int {
return len(itr.blocks)
}
func (itr *BufferIterator) FlushTo(wr io.Writer) error {
for {
data, err := itr.Next()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
err = iohelp.WriteAll(wr, data)
if err != nil {
return err
}
}
}
func (itr *BufferIterator) AsReader() io.Reader {
return &bufferIteratorReader{
itr: itr,
}
}
type bufferIteratorReader struct {
itr *BufferIterator
currBuff []byte
}
func (b *bufferIteratorReader) Read(dest []byte) (n int, err error) {
if len(b.currBuff) == 0 {
b.currBuff, err = b.itr.Next()
if err != nil {
return 0, err
}
}
destSize := len(dest)
toCopy := b.currBuff
if len(b.currBuff) > destSize {
toCopy = b.currBuff[:destSize]
}
n = copy(dest, toCopy)
b.currBuff = b.currBuff[n:]
return n, err
}
-49
View File
@@ -1,49 +0,0 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"bytes"
"math/rand"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestDynamicBuffer(t *testing.T) {
const blockSize = 53
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 100; i++ {
n := 1000 + rand.Int63()%10000
t.Run(strconv.FormatInt(n, 10), func(t *testing.T) {
data := make([]byte, n)
read, err := rand.Read(data)
require.NoError(t, err)
require.Equal(t, int(n), read)
buf := New(blockSize)
buf.Append(data)
itr := buf.Close()
reassembled := bytes.NewBuffer(nil)
err = itr.FlushTo(reassembled)
require.NoError(t, err)
require.Equal(t, data, reassembled.Bytes())
})
}
}
-18
View File
@@ -22,7 +22,6 @@ import (
"io"
"math"
"net/http"
"os"
"path"
"github.com/oracle/oci-go-sdk/v65/common"
@@ -44,23 +43,6 @@ type toUpload struct {
type uploadFunc func(ctx context.Context, objectName, uploadID string, partNumber int, contentLength int64, reader io.Reader) (objectstorage.CommitMultipartUploadPartDetails, error)
type tempLocalObject struct {
f *os.File
path string
}
var _ io.ReadCloser = &tempLocalObject{}
func (t *tempLocalObject) Read(p []byte) (int, error) {
return t.f.Read(p)
}
func (t *tempLocalObject) Close() error {
err := t.f.Close()
os.Remove(t.path)
return err
}
// OCIBlobstore provides an OCI implementation of the Blobstore interface
type OCIBlobstore struct {
provider common.ConfigurationProvider
-17
View File
@@ -26,7 +26,6 @@ import (
"fmt"
"strings"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/spec"
@@ -140,16 +139,6 @@ func (r *Resolver) GetDatabase(ctx context.Context, str string) (datas.Database,
return sp.GetDatabase(ctx), sp.GetVRW(ctx), sp.GetNodeStore(ctx), nil
}
// Resolve string to a chunkstore. Like ResolveDatabase, but returns the underlying ChunkStore
func (r *Resolver) GetChunkStore(ctx context.Context, str string) (chunks.ChunkStore, error) {
dbc := r.DbConfigForDbSpec(str)
sp, err := spec.ForDatabaseOpts(r.verbose(ctx, str, dbc.Url), specOptsForConfig(r.config, dbc))
if err != nil {
return nil, err
}
return sp.NewChunkStore(ctx), nil
}
// Resolve string to a dataset. If a config is present,
// - if no db prefix is present, assume the default db
// - if the db prefix is an alias, replace it
@@ -178,9 +167,3 @@ func (r *Resolver) GetPath(ctx context.Context, str string) (datas.Database, typ
return sp.GetDatabase(ctx), sp.GetVRW(ctx), value, nil
}
// GetDatabaseSpecForPath returns the database and a VRW for the path given, but does not attempt to load a value
func (r *Resolver) GetDatabaseSpecForPath(ctx context.Context, str string) (spec.Spec, error) {
specStr, dbc := r.ResolvePathSpecAndGetDbConfig(str)
return spec.ForPathOpts(r.verbose(ctx, str, specStr), specOptsForConfig(r.config, dbc))
}
-8
View File
@@ -31,7 +31,6 @@ import (
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nomdl"
"github.com/dolthub/dolt/go/store/prolly/tree"
@@ -741,13 +740,6 @@ func makeCommitStructType(metaType, parentsType, parentsListType, parentsClosure
}
}
func getRefElementType(t *types.Type) *types.Type {
// precondition checks
d.PanicIfFalse(t.TargetKind() == types.RefKind)
return t.Desc.(types.CompoundDesc).ElemTypes[0]
}
func firstError(l, r error) error {
if l != nil {
return l
-36
View File
@@ -47,13 +47,6 @@ func mustHead(ds Dataset) types.Value {
return s
}
func mustHeight(ds Dataset) uint64 {
h, ok, err := ds.MaybeHeight()
d.PanicIfError(err)
d.PanicIfFalse(ok)
return h
}
func mustHeadValue(ds Dataset) types.Value {
val, ok, err := ds.MaybeHeadValue()
if err != nil {
@@ -70,11 +63,6 @@ func mustString(str string, err error) string {
return str
}
func mustStruct(st types.Struct, err error) types.Struct {
d.PanicIfError(err)
return st
}
func mustSet(s types.Set, err error) types.Set {
d.PanicIfError(err)
return s
@@ -85,11 +73,6 @@ func mustList(l types.List, err error) types.List {
return l
}
func mustMap(m types.Map, err error) types.Map {
d.PanicIfError(err)
return m
}
func mustParentsClosure(t *testing.T, exists bool) func(types.Ref, bool, error) types.Ref {
return func(r types.Ref, got bool, err error) types.Ref {
t.Helper()
@@ -114,11 +97,6 @@ func mustValue(val types.Value, err error) types.Value {
return val
}
func mustTuple(val types.Tuple, err error) types.Tuple {
d.PanicIfError(err)
return val
}
func TestNewCommit(t *testing.T) {
assert := assert.New(t)
@@ -279,20 +257,6 @@ func mustCommitToTargetHashes(vrw types.ValueReadWriter, commits ...types.Value)
return ret
}
// Convert list of Struct's to List<Ref>
func toRefList(vrw types.ValueReadWriter, commits ...types.Struct) (types.List, error) {
l, err := types.NewList(context.Background(), vrw)
if err != nil {
return types.EmptyList, err
}
le := l.Edit()
for _, p := range commits {
le = le.Append(mustRef(types.NewRef(p, vrw.Format())))
}
return le.List(context.Background())
}
func commonAncWithSetClosure(ctx context.Context, c1, c2 *Commit, vr1, vr2 types.ValueReader, ns1, ns2 tree.NodeStore) (a hash.Hash, ok bool, err error) {
closure, err := NewSetCommitClosure(ctx, vr1, c1)
if err != nil {
-64
View File
@@ -531,74 +531,10 @@ func newStatisticHead(sm types.SerialMessage, addr hash.Hash) serialStashListHea
return serialStashListHead{sm, addr}
}
type statisticsHead struct {
msg types.SerialMessage
addr hash.Hash
}
var _ dsHead = statisticsHead{}
// TypeName implements dsHead
func (s statisticsHead) TypeName() string {
return "Statistics"
}
// Addr implements dsHead
func (s statisticsHead) Addr() hash.Hash {
return s.addr
}
// HeadTag implements dsHead
func (s statisticsHead) HeadTag() (*TagMeta, hash.Hash, error) {
return nil, hash.Hash{}, errors.New("HeadTag called on statistic")
}
// HeadWorkingSet implements dsHead
func (s statisticsHead) HeadWorkingSet() (*WorkingSetHead, error) {
return nil, errors.New("HeadWorkingSet called on statistic")
}
// value implements dsHead
func (s statisticsHead) value() types.Value {
return s.msg
}
func newTupleHead(sm types.SerialMessage, addr hash.Hash) serialStashListHead {
return serialStashListHead{sm, addr}
}
type tupleHead struct {
msg types.SerialMessage
addr hash.Hash
}
var _ dsHead = tupleHead{}
// TypeName implements dsHead
func (s tupleHead) TypeName() string {
return "Tuple"
}
// Addr implements dsHead
func (s tupleHead) Addr() hash.Hash {
return s.addr
}
// HeadTag implements dsHead
func (s tupleHead) HeadTag() (*TagMeta, hash.Hash, error) {
return nil, hash.Hash{}, errors.New("HeadTag called on tuple")
}
// HeadWorkingSet implements dsHead
func (s tupleHead) HeadWorkingSet() (*WorkingSetHead, error) {
return nil, errors.New("HeadWorkingSet called on statistic")
}
// value implements dsHead
func (s tupleHead) value() types.Value {
return s.msg
}
// Dataset is a named value within a Database. Different head values may be stored in a dataset. Most commonly, this is
// a commit, but other values are also supported in some cases.
type Dataset struct {
-12
View File
@@ -142,18 +142,6 @@ func (s *StashList) getStashAtIdx(ctx context.Context, idx int) (hash.Hash, erro
return stash.addr, nil
}
// IsStashList determines whether the types.Value is a stash list object.
func IsStashList(v types.Value) (bool, error) {
if _, ok := v.(types.Struct); ok {
// this should not return true as stash is not supported for old format
return false, nil
} else if sm, ok := v.(types.SerialMessage); ok {
return serial.GetFileID(sm) == serial.StashListFileID, nil
} else {
return false, nil
}
}
// GetStashAtIdx returns hash address of stash at given index in the stash list.
func GetStashAtIdx(ctx context.Context, ns tree.NodeStore, val types.Value, idx int) (hash.Hash, error) {
stashList, err := getExistingStashList(ctx, ns, val)
-12
View File
@@ -49,18 +49,6 @@ func (s *Statistics) Count() (int, error) {
return s.m.Count()
}
// IsStatistic determines whether the types.Value is a stash list object.
func IsStatistic(v types.Value) (bool, error) {
if _, ok := v.(types.Struct); ok {
// this should not return true as stash is not supported for old format
return false, nil
} else if sm, ok := v.(types.SerialMessage); ok {
return serial.GetFileID(sm) == serial.StatisticFileID, nil
} else {
return false, nil
}
}
// LoadStatistics attempts to dereference a database's statistics Dataset into a typed Statistics object.
func LoadStatistics(ctx context.Context, nbf *types.NomsBinFormat, ns tree.NodeStore, vr types.ValueReader, ds Dataset) (*Statistics, error) {
if !nbf.UsesFlatbuffers() {
-12
View File
@@ -35,18 +35,6 @@ func (t Tuple) Bytes() []byte {
return t.val
}
// IsTuple determines whether the types.Value is a tuple
func IsTuple(v types.Value) (bool, error) {
if _, ok := v.(types.Struct); ok {
// this should not return true as stash is not supported for old format
return false, nil
} else if sm, ok := v.(types.SerialMessage); ok {
return serial.GetFileID(sm) == serial.StatisticFileID, nil
} else {
return false, nil
}
}
// LoadTuple attempts to dereference a database's Tuple Dataset into a typed Tuple object.
func LoadTuple(ctx context.Context, nbf *types.NomsBinFormat, ns tree.NodeStore, vr types.ValueReader, ds Dataset) (*Tuple, error) {
if !nbf.UsesFlatbuffers() {
-25
View File
@@ -34,7 +34,6 @@ import (
"github.com/stretchr/testify/require"
dherrors "github.com/dolthub/dolt/go/libraries/utils/errors"
"github.com/dolthub/dolt/go/store/hash"
)
func randomChunks(t *testing.T, r *rand.Rand, sz int) [][]byte {
@@ -159,30 +158,6 @@ func TestAWSTablePersisterPersist(t *testing.T) {
})
}
type waitOnStoreTableCache struct {
readers map[hash.Hash]io.ReaderAt
mu sync.RWMutex
storeWG sync.WaitGroup
}
func (mtc *waitOnStoreTableCache) checkout(h hash.Hash) (io.ReaderAt, error) {
mtc.mu.RLock()
defer mtc.mu.RUnlock()
return mtc.readers[h], nil
}
func (mtc *waitOnStoreTableCache) checkin(h hash.Hash) error {
return nil
}
func (mtc *waitOnStoreTableCache) store(h hash.Hash, data io.Reader, size uint64) error {
defer mtc.storeWG.Done()
mtc.mu.Lock()
defer mtc.mu.Unlock()
mtc.readers[h] = data.(io.ReaderAt)
return nil
}
type failingFakeS3 struct {
*fakeS3
mu sync.Mutex
-87
View File
@@ -18,11 +18,9 @@ import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -93,91 +91,6 @@ func journalIndexRecordSize(idx []byte) (recordSz uint32) {
return
}
func writeJournalIndexRecord(buf []byte, root hash.Hash, start, end uint64, idx []byte) (n uint32) {
//defer trace.StartRegion(ctx, "writeJournalIndexRecord").End()
// length
l := journalIndexRecordSize(idx)
writeUint32(buf[:indexRecLenSz], l)
n += indexRecLenSz
// last root
buf[n] = byte(lastRootIndexRecTag)
n += indexRecTagSz
copy(buf[n:], root[:])
n += indexRecLastRootSz
// start offset
buf[n] = byte(startOffsetIndexRecTag)
n += indexRecTagSz
writeUint64(buf[n:], start)
n += indexRecOffsetSz
// end offset
buf[n] = byte(endOffsetIndexRecTag)
n += indexRecTagSz
writeUint64(buf[n:], end)
n += indexRecOffsetSz
// kind
buf[n] = byte(kindIndexRecTag)
n += indexRecTagSz
buf[n] = byte(tableIndexRecKind)
n += indexRecKindSz
// payload
buf[n] = byte(payloadIndexRecTag)
n += indexRecTagSz
copy(buf[n:], idx)
n += uint32(len(idx))
// checksum
writeUint32(buf[n:], crc(buf[:n]))
n += indexRecChecksumSz
d.PanicIfFalse(l == n)
return
}
func readJournalIndexRecord(buf []byte) (rec indexRec, err error) {
rec.length = readUint32(buf)
buf = buf[indexRecLenSz:]
for len(buf) > indexRecChecksumSz {
tag := indexRecTag(buf[0])
buf = buf[indexRecTagSz:]
switch tag {
case lastRootIndexRecTag:
copy(rec.lastRoot[:], buf)
buf = buf[indexRecLastRootSz:]
case startOffsetIndexRecTag:
rec.start = readUint64(buf)
buf = buf[indexRecOffsetSz:]
case endOffsetIndexRecTag:
rec.end = readUint64(buf)
buf = buf[indexRecOffsetSz:]
case kindIndexRecTag:
rec.kind = indexRecKind(buf[0])
buf = buf[indexRecKindSz:]
case payloadIndexRecTag:
sz := len(buf) - indexRecChecksumSz
rec.payload = buf[:sz]
buf = buf[sz:]
case unknownIndexRecTag:
fallthrough
default:
err = fmt.Errorf("unknown record field tag: %d", tag)
return
}
}
rec.checksum = readUint32(buf[:indexRecChecksumSz])
return
}
func validateIndexRecord(buf []byte) bool {
if len(buf) < (indexRecLenSz + indexRecChecksumSz) {
return false
}
off := readUint32(buf)
if int(off) > len(buf) {
return false
}
off -= indexRecChecksumSz
return crc(buf[:off]) == readUint32(buf[off:])
}
type lookupMeta struct {
batchStart int64
batchEnd int64
-16
View File
@@ -115,19 +115,3 @@ func TestRoundTripIndexLookupsMeta(t *testing.T) {
// do a bunch of iters
// use processIndexRecords2 to read back, make sure roots/checksums are consistent, counts, etc
}
func makeLookups(cnt int) (lookups []lookup) {
lookups = make([]lookup, cnt)
buf := make([]byte, cnt*hash.ByteLen)
rand.Read(buf)
var off uint64
for i := range lookups {
copy(lookups[i].a[:], buf)
buf = buf[hash.ByteLen:]
lookups[i].r.Offset = off
l := rand.Uint32() % 1024
lookups[i].r.Length = l
off += uint64(l)
}
return
}
-8
View File
@@ -699,14 +699,6 @@ func (idx rangeIndex) novelCount() int {
return len(idx.novel)
}
func (idx rangeIndex) novelLookups() (lookups []lookup) {
lookups = make([]lookup, 0, len(idx.novel))
for a, r := range idx.novel {
lookups = append(lookups, lookup{a: toAddr16(a), r: r})
}
return
}
func (idx rangeIndex) flatten(ctx context.Context) rangeIndex {
defer trace.StartRegion(ctx, "flatten journal index").End()
trace.Logf(ctx, "map index cached count", "%d", len(idx.cached))
-8
View File
@@ -16,7 +16,6 @@ package nbs
import (
"context"
"encoding/base32"
"math/rand"
"os"
"path/filepath"
@@ -405,13 +404,6 @@ func TestJournalIndexBootstrap(t *testing.T) {
}
}
var encoding = base32.NewEncoding("0123456789abcdefghijklmnopqrstuv")
// encode returns the base32 encoding in the Dolt alphabet.
func encode(data []byte) string {
return encoding.EncodeToString(data)
}
func randomCompressedChunks(cnt int) (compressed map[hash.Hash]CompressedChunk) {
compressed = make(map[hash.Hash]CompressedChunk)
var buf []byte
-8
View File
@@ -212,14 +212,6 @@ func (mt *memTable) getManyCompressed(ctx context.Context, eg *errgroup.Group, r
return remaining, gcBehavior_Continue, nil
}
func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) error {
for _, hrec := range mt.order {
chunks <- extractRecord{a: *hrec.a, data: mt.chunks[*hrec.a], err: nil}
}
return nil
}
func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name hash.Hash, data []byte, splitOffset uint64, chunkCount uint32, gcb gcBehavior, err error) {
gcb = gcBehavior_Continue
numChunks := uint64(len(mt.order))
-7
View File
@@ -122,13 +122,6 @@ func (c CommitClosure) ContainsKey(ctx context.Context, h hash.Hash, height uint
return c.closure.Has(ctx, k)
}
func DecodeCommitClosureKey(key []byte) (height uint64, addr hash.Hash) {
height = binary.LittleEndian.Uint64(key)
addr = hash.New(key[prefixWidth:])
return
}
func (c CommitClosure) AsHashSet(ctx context.Context) (hash.HashSet, error) {
closureIter, err := c.IterAllReverse(ctx)
if err != nil {
-21
View File
@@ -35,7 +35,6 @@ type rangeIter[K, V ~[]byte] interface {
var _ rangeIter[val.Tuple, val.Tuple] = &tree.OrderedTreeIter[val.Tuple, val.Tuple]{}
var _ rangeIter[val.Tuple, val.Tuple] = &memRangeIter{}
var _ rangeIter[val.Tuple, val.Tuple] = emptyIter{}
// mutableMapIter iterates over a Range of Tuples.
type mutableMapIter[K, V ~[]byte, O tree.Ordering[K]] struct {
@@ -88,16 +87,6 @@ func (it mutableMapIter[K, V, O]) Next(ctx context.Context) (key K, value V, err
}
}
func (it mutableMapIter[K, V, O]) currentKeys() (memKey, proKey K) {
if it.memory != nil {
memKey, _ = it.memory.Current()
}
if it.prolly != nil {
proKey, _ = it.prolly.Current()
}
return
}
func (it mutableMapIter[K, V, O]) compareKeys(ctx context.Context, memKey, proKey K) int {
if memKey == nil {
return 1
@@ -179,16 +168,6 @@ func (it *memRangeIter) Iterate(ctx context.Context) (err error) {
}
}
type emptyIter struct{}
func (e emptyIter) Next(context.Context) (val.Tuple, val.Tuple, error) {
return nil, nil, io.EOF
}
func (e emptyIter) Iterate(ctx context.Context) (err error) { return }
func (e emptyIter) Current() (key, value val.Tuple) { return }
type filteredIter struct {
iter MapIter
rng Range
+2 -34
View File
@@ -95,8 +95,8 @@ func testIterRange(t *testing.T, om testMap, tuples [][2]val.Tuple) {
}
for _, test := range tests {
//s := fmt.Sprintf(test.testRange.format())
//fmt.Println(s)
// s := fmt.Sprintf(test.testRange.format())
// fmt.Println(s)
iter, err := om.IterRange(ctx, test.testRange)
require.NoError(t, err)
@@ -500,38 +500,6 @@ func testIterOrdinalRangeWithBounds(t *testing.T, om Map, tuples [][2]val.Tuple,
})
}
func testIterKeyRange(t *testing.T, m Map, tuples [][2]val.Tuple) {
ctx := context.Background()
t.Run("RandomKeyRange", func(t *testing.T) {
bounds := generateInserts(t, m, m.keyDesc, m.valDesc, 2)
start, stop := bounds[0][0], bounds[1][0]
if m.keyDesc.Compare(ctx, start, stop) > 0 {
start, stop = stop, start
}
kR := keyRange{kd: m.keyDesc, start: start, stop: stop}
var expectedKeys []val.Tuple
for _, kv := range tuples {
if kR.includes(kv[0]) {
expectedKeys = append(expectedKeys, kv[0])
}
}
itr, err := m.IterKeyRange(ctx, start, stop)
require.NoError(t, err)
for _, eK := range expectedKeys {
k, _, err := itr.Next(ctx)
require.NoError(t, err)
assert.Equal(t, eK, k)
}
_, _, err = itr.Next(ctx)
require.Equal(t, io.EOF, err)
})
}
func iterOrdinalRange(t *testing.T, ctx context.Context, iter MapIter) (actual [][2]val.Tuple) {
for {
k, v, err := iter.Next(ctx)
-5
View File
@@ -23,7 +23,6 @@ package spec
import (
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
)
func CreateDatabaseSpecString(protocol, db string) string {
@@ -35,7 +34,3 @@ func CreateValueSpecString(protocol, db, path string) string {
d.Chk.NoError(err)
return Spec{Protocol: protocol, DatabaseName: db, Path: p}.String()
}
func CreateHashSpecString(protocol, db string, h hash.Hash) string {
return Spec{Protocol: protocol, DatabaseName: db, Path: AbsolutePath{Hash: h}}.String()
}
+7 -26
View File
@@ -99,28 +99,15 @@ type AppliedEditStats struct {
NonExistentDeletes int64
}
// Add adds two AppliedEditStats structures member by member.
func (stats AppliedEditStats) Add(other AppliedEditStats) AppliedEditStats {
return AppliedEditStats{
Additions: stats.Additions + other.Additions,
Modifications: stats.Modifications + other.Modifications,
SameVal: stats.SameVal + other.SameVal,
Deletions: stats.Deletions + other.Deletions,
NonExistentDeletes: stats.NonExistentDeletes + other.NonExistentDeletes,
}
}
// ApplyEdits applies all the edits to a given Map and returns the resulting map, and some statistics about the edits
// that were applied.
func ApplyEdits(ctx context.Context, edits EditProvider, m Map) (Map, AppliedEditStats, error) {
func ApplyEdits(ctx context.Context, edits EditProvider, m Map) (Map, error) {
return ApplyNEdits(ctx, edits, m, -1)
}
func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64) (Map, AppliedEditStats, error) {
var stats AppliedEditStats
func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64) (Map, error) {
if edits.ReachedEOF() {
return m, stats, nil // no edits
return m, nil // no edits
}
var seq sequence = m.orderedSequence
@@ -168,12 +155,10 @@ func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64)
}
if existingValue == nil && kv.value == nil {
stats.NonExistentDeletes++
continue // already non-present
}
if existingValue != nil && kv.value != nil && existingValue.Equals(kv.value) {
stats.SameVal++
continue // same value
}
@@ -193,15 +178,11 @@ func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64)
}
if existingValue != nil {
stats.Modifications++
err := ch.Skip(ctx)
if ae.SetIfError(err) {
continue
}
} else {
stats.Additions++
}
if kv.value != nil {
@@ -219,20 +200,20 @@ func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64)
}
if ae.IsSet() {
return EmptyMap, AppliedEditStats{}, ae.Get()
return EmptyMap, ae.Get()
}
if ch == nil {
return m, stats, nil // no edits required application
return m, nil // no edits required application
}
seq, err := ch.Done(ctx)
if err != nil {
return EmptyMap, AppliedEditStats{}, err
return EmptyMap, err
}
return newMap(seq.(orderedSequence)), stats, nil
return newMap(seq.(orderedSequence)), nil
}
// prepWorker will wait for work to be read from a channel, then iterate over all of the edits finding the appropriate
-51
View File
@@ -22,7 +22,6 @@
package types
import (
"fmt"
"math"
)
@@ -34,53 +33,3 @@ func Round(v Value) Value {
return val
}
}
func Increment(v Value) Value {
switch val := v.(type) {
case Int:
return Int(int64(val) + 1)
case Uint:
return Uint(uint64(val) + 1)
case Float:
return Float(float64(val) + 1)
default:
return val
}
}
func float64IsInt(f float64) bool {
return math.Trunc(f) == f
}
// convert float64 to int64 where f == i * 2^exp
func float64ToIntExp(f float64) (int64, int) {
if math.IsNaN(f) || math.IsInf(f, 0) {
panic(fmt.Errorf("%v is not a supported number", f))
}
if f == 0 {
return 0, 0
}
isNegative := math.Signbit(f)
f = math.Abs(f)
frac, exp := math.Frexp(f)
// frac is [.5, 1)
// Move frac up until it is an integer.
for !float64IsInt(frac) {
frac *= 2
exp--
}
if isNegative {
frac *= -1
}
return int64(frac), exp
}
// fracExpToFloat returns frac * 2 ** exp
func fracExpToFloat(frac int64, exp int) float64 {
return float64(frac) * math.Pow(2, float64(exp))
}
-8
View File
@@ -149,11 +149,3 @@ func (v Geometry) skip(nbf *NomsBinFormat, b *binaryNomsReader) {
func (v Geometry) HumanReadableString() string {
return v.Inner.HumanReadableString()
}
func EncodeGeometryWKB(v Geometry) ([]byte, error) {
wr := &binaryNomsWriter{make([]byte, 128), 0}
if err := v.writeTo(wr, nil); err != nil {
return nil, err
}
return wr.data()[1:], nil // trim NomsKind
}
-87
View File
@@ -94,90 +94,3 @@ func TestIncrementalLoadList(t *testing.T) {
assert.Equal(expectedCount+chunkReads[i], cs.Reads())
}
}
func SkipTestIncrementalLoadSet(t *testing.T) {
assert := assert.New(t)
ts := &chunks.TestStorage{}
cs := ts.NewView()
vs := NewValueStore(cs)
expected, err := NewSet(context.Background(), vs, getTestVals(vs)...)
require.NoError(t, err)
ref, err := vs.WriteValue(context.Background(), expected)
require.NoError(t, err)
refHash := ref.TargetHash()
actualVar, err := vs.ReadValue(context.Background(), refHash)
require.NoError(t, err)
actual := actualVar.(Set)
expectedCount := cs.Reads()
assert.Equal(1, expectedCount)
err = actual.Iter(context.Background(), func(v Value) (bool, error) {
expectedCount += isEncodedOutOfLine(v)
assert.Equal(expectedCount, cs.Reads())
return false, nil
})
require.NoError(t, err)
}
func SkipTestIncrementalLoadMap(t *testing.T) {
assert := assert.New(t)
ts := &chunks.TestStorage{}
cs := ts.NewView()
vs := NewValueStore(cs)
expected, err := NewMap(context.Background(), vs, getTestVals(vs)...)
require.NoError(t, err)
ref, err := vs.WriteValue(context.Background(), expected)
require.NoError(t, err)
refHash := ref.TargetHash()
actualVar, err := vs.ReadValue(context.Background(), refHash)
require.NoError(t, err)
actual := actualVar.(Map)
expectedCount := cs.Reads()
assert.Equal(1, expectedCount)
err = actual.Iter(context.Background(), func(k, v Value) (bool, error) {
expectedCount += isEncodedOutOfLine(k)
expectedCount += isEncodedOutOfLine(v)
assert.Equal(expectedCount, cs.Reads())
return false, nil
})
require.NoError(t, err)
}
func SkipTestIncrementalAddRef(t *testing.T) {
assert := assert.New(t)
ts := &chunks.TestStorage{}
cs := ts.NewView()
vs := NewValueStore(cs)
expectedItem := Float(42)
ref, err := vs.WriteValue(context.Background(), expectedItem)
require.NoError(t, err)
expected, err := NewList(context.Background(), vs, ref)
require.NoError(t, err)
ref, err = vs.WriteValue(context.Background(), expected)
require.NoError(t, err)
actualVar, err := vs.ReadValue(context.Background(), ref.TargetHash())
require.NoError(t, err)
assert.Equal(1, cs.Reads())
assert.True(expected.Equals(actualVar))
actual := actualVar.(List)
actualItem, err := actual.Get(context.Background(), 0)
require.NoError(t, err)
assert.Equal(2, cs.Reads())
assert.True(expectedItem.Equals(actualItem))
// do it again to make sure caching works.
actualItem, err = actual.Get(context.Background(), 0)
require.NoError(t, err)
assert.Equal(2, cs.Reads())
assert.True(expectedItem.Equals(actualItem))
}
-16
View File
@@ -42,16 +42,6 @@ func NewJSONDoc(nbf *NomsBinFormat, vrw ValueReadWriter, value Value) (JSON, err
return JSON{valueImpl{vrw, nbf, w.data(), nil}}, nil
}
func NewTestJSONDoc(nbf *NomsBinFormat, vrw ValueReadWriter, buf []byte) (JSON, error) {
w := newBinaryNomsWriter()
if err := JSONKind.writeTo(&w, nbf); err != nil {
return emptyJSONDoc(nbf), err
}
w.writeString(string(buf))
return JSON{valueImpl{vrw, nbf, w.data(), nil}}, nil
}
// emptyJSONDoc creates and empty JSON value.
func emptyJSONDoc(nbf *NomsBinFormat) JSON {
w := newBinaryNomsWriter()
@@ -148,12 +138,6 @@ func (t JSON) Kind() NomsKind {
return JSONKind
}
func (t JSON) decoderSkipToFields() (valueDecoder, uint64) {
dec := t.decoder()
dec.skipKind()
return dec, uint64(1)
}
// Len implements the Value interface.
func (t JSON) Len() uint64 {
// TODO(andy): is this ever 0?
-94
View File
@@ -26,8 +26,6 @@ import (
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -98,98 +96,6 @@ func NewMap(ctx context.Context, vrw ValueReadWriter, kv ...Value) (Map, error)
return newMap(seq.(orderedSequence)), nil
}
// NewStreamingMap takes an input channel of values and returns a value that
// will produce a finished Map when |.Wait()| is called. Values sent to the
// input channel must be alternating keys and values. (e.g. k1, v1, k2,
// v2...). Moreover keys need to be added to the channel in Noms sortorder,
// adding key values to the input channel out of order will result in an error.
// Once the input channel is closed by the caller, a finished Map will be
// available from the |Wait| call.
//
// See graph_builder.go for building collections with values that are not in
// order.
func NewStreamingMap(ctx context.Context, vrw ValueReadWriter, kvs <-chan Value) *StreamingMap {
d.PanicIfTrue(vrw == nil)
sm := &StreamingMap{}
sm.eg, sm.egCtx = errgroup.WithContext(ctx)
sm.eg.Go(func() error {
m, err := readMapInput(sm.egCtx, vrw, kvs)
sm.m = m
return err
})
return sm
}
type StreamingMap struct {
eg *errgroup.Group
egCtx context.Context
m Map
}
func (sm *StreamingMap) Wait() (Map, error) {
err := sm.eg.Wait()
return sm.m, err
}
// Done returns a signal channel which is closed once the StreamingMap is no
// longer reading from the key/values channel. A send to the key/value channel
// should be in a select with a read from this channel to ensure that the send
// does not deadlock.
func (sm *StreamingMap) Done() <-chan struct{} {
return sm.egCtx.Done()
}
func readMapInput(ctx context.Context, vrw ValueReadWriter, kvs <-chan Value) (Map, error) {
ch, err := newEmptyMapSequenceChunker(ctx, vrw)
if err != nil {
return EmptyMap, err
}
var lastK Value
nextIsKey := true
var k Value
LOOP:
for {
select {
case v, ok := <-kvs:
if !ok {
break LOOP
}
if nextIsKey {
k = v
if lastK != nil {
isLess, err := lastK.Less(ctx, vrw.Format(), k)
if err != nil {
return EmptyMap, err
}
if !isLess {
return EmptyMap, ErrKeysNotOrdered
}
}
lastK = k
nextIsKey = false
} else {
_, err := ch.Append(ctx, mapEntry{key: k, value: v})
if err != nil {
return EmptyMap, err
}
nextIsKey = true
}
case <-ctx.Done():
return EmptyMap, ctx.Err()
}
}
seq, err := ch.Done(ctx)
if err != nil {
return EmptyMap, err
}
return newMap(seq.(orderedSequence)), nil
}
// Diff computes the diff from |last| to |m| using the top-down algorithm,
// which completes as fast as possible while taking longer to return early
// results than left-to-right.
+1 -2
View File
@@ -76,8 +76,7 @@ func (med *MapEditor) Map(ctx context.Context) (Map, error) {
return EmptyMap, err
}
m, _, err := ApplyEdits(ctx, edits, med.m)
return m, err
return ApplyEdits(ctx, edits, med.m)
}
// Set adds an edit
-56
View File
@@ -39,16 +39,6 @@ type MapIterator interface {
Next(ctx context.Context) (k, v Value, err error)
}
type EmptyMapIterator struct{}
func (mtItr EmptyMapIterator) Next(ctx context.Context) (k, v Value, err error) {
return nil, nil, nil
}
func (mtItr EmptyMapIterator) NextTuple(ctx context.Context) (k, v Tuple, err error) {
return Tuple{}, Tuple{}, io.EOF
}
// mapIterator can efficiently iterate through a Noms Map.
type mapIterator struct {
sequenceIter sequenceIterator
@@ -136,49 +126,3 @@ func (m Map) RangeIterator(ctx context.Context, startIdx, endIdx uint64) (MapTup
return &mapRangeIter{collItr: collItr}, nil
}
// LimitingMapIterator iterates |iter| only returning up to |limit| results.
type LimitingMapIterator struct {
iter MapIterator
limit uint64
cnt uint64
}
var _ MapIterator = (*LimitingMapIterator)(nil)
// NewLimitingMapIterator returns a *LimitingMapIterator.
func NewLimitingMapIterator(iter MapIterator, limit uint64) *LimitingMapIterator {
return &LimitingMapIterator{
iter: iter,
limit: limit,
}
}
// Next implements MapIterator.
func (l *LimitingMapIterator) Next(ctx context.Context) (k, v Value, err error) {
if l.cnt == l.limit {
return nil, nil, nil
}
k, v, err = l.iter.Next(ctx)
if err != nil {
return nil, nil, err
}
if k == nil {
return nil, nil, nil
}
l.cnt++
return
}
// NextTuple implements MapIterator.
func (l *LimitingMapIterator) NextTuple(ctx context.Context) (k, v Tuple, err error) {
if l.cnt == l.limit {
return Tuple{}, Tuple{}, io.EOF
}
k, v, err = l.iter.NextTuple(ctx)
if err != nil {
return Tuple{}, Tuple{}, err
}
l.cnt++
return
}
-62
View File
@@ -27,7 +27,6 @@ import (
"fmt"
"math/rand"
"sort"
"sync"
"testing"
"time"
@@ -322,67 +321,6 @@ func newMapTestSuite(size uint, expectChunkCount int, expectPrependChunkDiff int
}
}
func (suite *mapTestSuite) createStreamingMap(vs *ValueStore) Map {
kvChan := make(chan Value)
streamingMap := NewStreamingMap(context.Background(), vs, kvChan)
for _, entry := range suite.elems.entries.entries {
kvChan <- entry.key
kvChan <- entry.value
}
close(kvChan)
m, err := streamingMap.Wait()
suite.NoError(err)
return m
}
func (suite *mapTestSuite) TestStreamingMap() {
vs := newTestValueStore()
defer vs.Close()
m := suite.createStreamingMap(vs)
suite.True(suite.validate(m), "map not valid")
}
func (suite *mapTestSuite) TestStreamingMapOrder() {
vs := newTestValueStore()
defer vs.Close()
entries := mapEntrySlice{make([]mapEntry, len(suite.elems.entries.entries))}
copy(entries.entries, suite.elems.entries.entries)
entries.entries[0], entries.entries[1] = entries.entries[1], entries.entries[0]
kvChan := make(chan Value, len(entries.entries)*2)
for _, e := range entries.entries {
kvChan <- e.key
kvChan <- e.value
}
close(kvChan)
sm := NewStreamingMap(context.Background(), vs, kvChan)
_, err := sm.Wait()
suite.Assert().EqualError(err, ErrKeysNotOrdered.Error())
}
func (suite *mapTestSuite) TestStreamingMap2() {
wg := sync.WaitGroup{}
vs := newTestValueStore()
defer vs.Close()
wg.Add(2)
var m1, m2 Map
go func() {
m1 = suite.createStreamingMap(vs)
wg.Done()
}()
go func() {
m2 = suite.createStreamingMap(vs)
wg.Done()
}()
wg.Wait()
suite.True(suite.validate(m1), "map 'm1' not valid")
suite.True(suite.validate(m2), "map 'm2' not valid")
}
func TestMapSuite4K(t *testing.T) {
suite.Run(t, newMapTestSuite(12, 5, 2, 2, newNumber))
}
-4
View File
@@ -200,10 +200,6 @@ func DeserializeEWKBHeader(buf []byte) (uint32, bool, uint32, error) {
return types.DeserializeEWKBHeader(buf)
}
func DeserializeWKBHeader(buf []byte) (bool, uint32, error) {
return types.DeserializeWKBHeader(buf)
}
func DeserializePoint(buf []byte, isBig bool, srid uint32) types.Point {
p, _, err := types.DeserializePoint(buf, isBig, srid)
if err != nil {
-54
View File
@@ -171,60 +171,6 @@ type valueReadWriter interface {
valueReadWriter() ValueReadWriter
}
type TupleSlice []Tuple
func (vs TupleSlice) Equals(other TupleSlice) bool {
if len(vs) != len(other) {
return false
}
for i, v := range vs {
if !v.Equals(other[i]) {
return false
}
}
return true
}
func (vs TupleSlice) Contains(nbf *NomsBinFormat, v Tuple) bool {
for _, v := range vs {
if v.Equals(v) {
return true
}
}
return false
}
type TupleSort struct {
Tuples []Tuple
}
func (vs TupleSort) Len() int {
return len(vs.Tuples)
}
func (vs TupleSort) Swap(i, j int) {
vs.Tuples[i], vs.Tuples[j] = vs.Tuples[j], vs.Tuples[i]
}
func (vs TupleSort) Less(ctx context.Context, nbf *NomsBinFormat, i, j int) (bool, error) {
res, err := vs.Tuples[i].TupleCompare(ctx, nbf, vs.Tuples[j])
if err != nil {
return false, err
}
return res < 0, nil
}
func (vs TupleSort) Equals(other TupleSort) bool {
return TupleSlice(vs.Tuples).Equals(other.Tuples)
}
func (vs TupleSort) Contains(nbf *NomsBinFormat, v Tuple) bool {
return TupleSlice(vs.Tuples).Contains(nbf, v)
}
type valueImpl struct {
vrw ValueReadWriter
nbf *NomsBinFormat
-5
View File
@@ -130,11 +130,6 @@ func newTestValueStore() *ValueStore {
return NewValueStore(ts.NewViewWithDefaultFormat())
}
func newTestValueStore_LD_1() *ValueStore {
ts := &chunks.TestStorage{}
return NewValueStore(ts.NewView())
}
// NewMemoryValueStore creates a simple struct that satisfies ValueReadWriter
// and is backed by a chunks.TestStore. Used for dolt operations outside of noms.
func NewMemoryValueStore() *ValueStore {