From bb6b7e0a7e19d917b8273a34f7ad032f748ad994 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Mon, 15 Dec 2025 20:14:47 +0000 Subject: [PATCH 1/5] Remove the 'valuefile' dead code --- go/store/valuefile/file_value_store.go | 303 ------------------------ go/store/valuefile/value_file.go | 316 ------------------------- go/store/valuefile/value_file_test.go | 170 ------------- 3 files changed, 789 deletions(-) delete mode 100644 go/store/valuefile/file_value_store.go delete mode 100644 go/store/valuefile/value_file.go delete mode 100644 go/store/valuefile/value_file_test.go diff --git a/go/store/valuefile/file_value_store.go b/go/store/valuefile/file_value_store.go deleted file mode 100644 index d81e71a47e..0000000000 --- a/go/store/valuefile/file_value_store.go +++ /dev/null @@ -1,303 +0,0 @@ -// Copyright 2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package valuefile - -import ( - "context" - "fmt" - "sort" - "sync" - - "github.com/dolthub/dolt/go/store/chunks" - "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/types" -) - -var _ chunks.ChunkStore = (*FileValueStore)(nil) -var _ types.ValueReadWriter = (*FileValueStore)(nil) - -// FileValueStore implements a trivial in memory chunks.ChunkStore and types.ValueReadWriter in order to allow easy -// serialization / deserialization of noms data to and from a file -type FileValueStore struct { - nbf *types.NomsBinFormat - valLock *sync.Mutex - values map[hash.Hash]types.Value - chunkLock *sync.Mutex - chunks map[hash.Hash][]byte - rootHash hash.Hash -} - -// NewFileValueStore creates a new FileValueStore -func NewFileValueStore(nbf *types.NomsBinFormat) (*FileValueStore, error) { - return &FileValueStore{ - nbf: nbf, - valLock: &sync.Mutex{}, - values: make(map[hash.Hash]types.Value), - chunkLock: &sync.Mutex{}, - chunks: make(map[hash.Hash][]byte), - }, nil -} - -// Gets the NomsBinaryFormat for the Store -func (f *FileValueStore) Format() *types.NomsBinFormat { - return f.nbf -} - -// ReadValue reads a value from the store -func (f *FileValueStore) ReadValue(ctx context.Context, h hash.Hash) (types.Value, error) { - f.valLock.Lock() - defer f.valLock.Unlock() - - v := f.values[h] - return v, nil -} - -// ReadManyValues reads and decodes Values indicated by |hashes| from lvs and returns the found Values in the same order. -// Any non-present Values will be represented by nil. -func (f *FileValueStore) ReadManyValues(ctx context.Context, hashes hash.HashSlice) (types.ValueSlice, error) { - f.valLock.Lock() - defer f.valLock.Unlock() - - vals := make(types.ValueSlice, len(hashes)) - for i, h := range hashes { - vals[i] = f.values[h] - } - - return vals, nil -} - -// WriteValue adds a value to the store -func (f *FileValueStore) WriteValue(ctx context.Context, v types.Value) (types.Ref, error) { - f.valLock.Lock() - defer f.valLock.Unlock() - - h, err := v.Hash(f.nbf) - - if err != nil { - return types.Ref{}, err - } - - _, ok := f.values[h] - - if !ok { - f.values[h] = v - - c, err := types.EncodeValue(v, f.nbf) - - if err != nil { - return types.Ref{}, err - } - - err = f.Put(ctx, c, func(c chunks.Chunk) chunks.GetAddrsCb { - return func(ctx context.Context, addrs hash.HashSet, _ chunks.PendingRefExists) error { - return types.AddrsFromNomsValue(c, f.nbf, addrs) - } - }) - - if err != nil { - return types.Ref{}, err - } - } - - return types.NewRef(v, f.nbf) -} - -// Get gets a chunk by it's hash -func (f *FileValueStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - - data, ok := f.chunks[h] - - if !ok { - return chunks.EmptyChunk, nil - } else { - return chunks.NewChunkWithHash(h, data), nil - } -} - -// GetMany gets chunks by their hashes. Chunks that are found are written to the channel. -func (f *FileValueStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - - for h := range hashes { - data, ok := f.chunks[h] - - if ok { - ch := chunks.NewChunkWithHash(h, data) - found(ctx, &ch) - } - } - - return nil -} - -// Has returns true if a chunk is present in the store, false if not -func (f *FileValueStore) Has(ctx context.Context, h hash.Hash) (bool, error) { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - - _, ok := f.chunks[h] - return ok, nil -} - -func (f *FileValueStore) CacheHas(h hash.Hash) bool { - _, ok := f.chunks[h] - return ok -} - -func (f *FileValueStore) PurgeCaches() { -} - -// HasMany returns the set of hashes that are absent from the store -func (f *FileValueStore) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - - absent = make(hash.HashSet, len(hashes)) - for h := range hashes { - _, ok := f.chunks[h] - - if !ok { - absent[h] = struct{}{} - } - } - - return absent, nil -} - -func (f *FileValueStore) errorIfDangling(ctx context.Context, addrs hash.HashSet) error { - absent, err := f.HasMany(ctx, addrs) - if err != nil { - return err - } - if len(absent) != 0 { - s := absent.String() - return fmt.Errorf("Found dangling references to %s", s) - } - return nil -} - -// Put puts a chunk into the store -func (f *FileValueStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error { - addrs := hash.NewHashSet() - err := getAddrs(c)(ctx, addrs, f.CacheHas) - if err != nil { - return err - } - - err = f.errorIfDangling(ctx, addrs) - if err != nil { - return err - } - - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - - f.chunks[c.Hash()] = c.Data() - return nil -} - -// Version returns the nbf version string -func (f *FileValueStore) Version() string { - return f.nbf.VersionString() -} - -func (f *FileValueStore) AccessMode() chunks.ExclusiveAccessMode { - return chunks.ExclusiveAccessMode_Shared -} - -// Rebase brings this ChunkStore into sync with the persistent storage's current root. Has no impact here -func (f *FileValueStore) Rebase(ctx context.Context) error { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - return nil -} - -// Root returns the root hash -func (f *FileValueStore) Root(ctx context.Context) (hash.Hash, error) { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - return f.rootHash, nil -} - -// Commit sets the root hash -func (f *FileValueStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - - if f.rootHash == last { - f.rootHash = current - return true, nil - } - - return false, nil -} - -// Stats doesn't do anything -func (f *FileValueStore) Stats() interface{} { - return nil -} - -// StatsSummary doesn't do anything -func (f *FileValueStore) StatsSummary() string { - return "" -} - -// Close doesn't do anything -func (f *FileValueStore) Close() error { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - - return nil -} - -func (f *FileValueStore) numChunks() int { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - - return len(f.chunks) -} - -func (f *FileValueStore) iterChunks(cb func(ch chunks.Chunk) error) error { - f.chunkLock.Lock() - defer f.chunkLock.Unlock() - - hashes := make(hash.HashSlice, 0, len(f.chunks)) - for h := range f.chunks { - hashes = append(hashes, h) - } - - sort.Slice(hashes, func(i, j int) bool { - return hashes[i].Less(hashes[j]) - }) - - for _, h := range hashes { - data := f.chunks[h] - err := cb(chunks.NewChunkWithHash(h, data)) - - if err != nil { - return err - } - } - - return nil -} - -func (f *FileValueStore) PersistGhostHashes(ctx context.Context, refs hash.HashSet) error { - // Current unimplemented, but may be useful for testing someday. - panic("not implemented") -} diff --git a/go/store/valuefile/value_file.go b/go/store/valuefile/value_file.go deleted file mode 100644 index de18c00f6b..0000000000 --- a/go/store/valuefile/value_file.go +++ /dev/null @@ -1,316 +0,0 @@ -// Copyright 2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package valuefile - -import ( - "context" - "encoding/binary" - "errors" - "fmt" - "io" - "os" - - "github.com/dolthub/dolt/go/libraries/doltcore/env" - - "github.com/dolthub/dolt/go/libraries/utils/iohelp" - "github.com/dolthub/dolt/go/store/chunks" - "github.com/dolthub/dolt/go/store/datas" - "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/prolly/tree" - "github.com/dolthub/dolt/go/store/types" -) - -// ErrCorruptNVF is the error used when the file being read is corrupt -var ErrCorruptNVF = errors.New("nvf file is corrupt") - -// WritePrimitiveValueFile writes values to the filepath provided -func WritePrimitiveValueFile(ctx context.Context, filepath string, values ...types.Value) error { - for _, v := range values { - if !types.IsPrimitiveKind(v.Kind()) { - return errors.New("non-primitve value found") - } - } - - nbf := types.Format_Default - store, err := NewFileValueStore(nbf) - - if err != nil { - return err - } - - return WriteValueFile(ctx, filepath, store, values...) -} - -// WriteValueFile writes the values stored in the *FileValueStore to the filepath provided -func WriteValueFile(ctx context.Context, filepath string, store *FileValueStore, values ...types.Value) (err error) { - - f, err := os.OpenFile(filepath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) - defer func() { - closeErr := f.Close() - if err == nil { - err = closeErr - } - }() - - if err != nil { - return err - } - - return WriteToWriter(ctx, f, store, values...) -} - -// WriteToWriter writes the values out to the provided writer in the value file format -func WriteToWriter(ctx context.Context, wr io.Writer, store *FileValueStore, values ...types.Value) error { - vrw := types.NewValueStore(store) - ns := tree.NewNodeStore(store) - db := datas.NewTypesDatabase(vrw, ns) - ds, err := db.GetDataset(ctx, env.DefaultInitBranch) - - if err != nil { - return err - } - - l, err := types.NewList(ctx, vrw, values...) - - if err != nil { - return err - } - - ds, err = datas.CommitValue(ctx, db, ds, l) - - if err != nil { - return err - } - - addr, _ := ds.MaybeHeadAddr() - - err = write(wr, addr, store) - - if err != nil { - return err - } - - return nil -} - -// write writes out: -// NomsBinFormat version string length -// NomsBinFormat version String -// Root Hash -// uint32 num chunks -// -// for each chunk: -// -// hash of chunk -// len of chunk -// -// for each chunk -// -// chunk bytes -func write(wr io.Writer, h hash.Hash, store *FileValueStore) error { - // The Write*IfNoErr functions makes the error handling code less annoying - err := iohelp.WritePrimIfNoErr(wr, uint32(len(store.nbf.VersionString())), nil) - err = iohelp.WriteIfNoErr(wr, []byte(store.nbf.VersionString()), err) - err = iohelp.WriteIfNoErr(wr, h[:], err) - err = iohelp.WritePrimIfNoErr(wr, uint32(store.numChunks()), err) - - if err != nil { - return err - } - - err = store.iterChunks(func(ch chunks.Chunk) error { - h := ch.Hash() - err = iohelp.WriteIfNoErr(wr, h[:], nil) - return iohelp.WritePrimIfNoErr(wr, uint32(len(ch.Data())), err) - }) - if err != nil { - return err - } - - err = store.iterChunks(func(ch chunks.Chunk) error { - return iohelp.WriteIfNoErr(wr, ch.Data(), nil) - }) - - return err -} - -// ValueFile is the in memory representation of a value file. -type ValueFile struct { - Ns tree.NodeStore - Vrw types.ValueReadWriter - Values []types.Value -} - -// ReadValueFile reads from the provided file and returns the values stored in the file -func ReadValueFile(ctx context.Context, filepath string) (*ValueFile, error) { - f, err := os.Open(filepath) - - if err != nil { - return nil, err - } - - defer f.Close() - - return ReadFromReader(ctx, f) -} - -// ReadFromReader reads from the provided reader which should provided access to data in the value file format and returns -// the values -func ReadFromReader(ctx context.Context, rd io.Reader) (*ValueFile, error) { - h, store, err := read(ctx, rd) - - if err != nil { - return nil, err - } - - vrw := types.NewValueStore(store) - - v, err := vrw.ReadValue(ctx, h) - - if err != nil { - return nil, err - } - - rootVal, err := datas.GetCommittedValue(ctx, vrw, v) - if err != nil { - return nil, err - } - - l := rootVal.(types.List) - values := make([]types.Value, l.Len()) - err = l.IterAll(ctx, func(v types.Value, index uint64) error { - values[index] = v - return nil - }) - - if err != nil { - return nil, err - } - - ns := tree.NewNodeStore(store) - - return &ValueFile{ - Values: values, - Ns: ns, - Vrw: vrw, - }, nil -} - -// see the write section to see the value file -func read(ctx context.Context, rd io.Reader) (hash.Hash, *FileValueStore, error) { - // ErrPreservingReader allows me to ignore errors until I need to use the data - errRd := iohelp.NewErrPreservingReader(rd) - - // read len of NBF version string and then read the version string and check it - fmtLen, err := errRd.ReadUint32(binary.BigEndian) - - if err != nil { - if err == io.EOF { - err = fmt.Errorf("EOF read while trying to get nbf format len - %w", ErrCorruptNVF) - } - - return hash.Hash{}, nil, err - } - - data, err := iohelp.ReadNBytes(errRd, int(fmtLen)) - - if err != nil { - if err == io.EOF { - err = fmt.Errorf("EOF read while trying to get nbf format string - %w", ErrCorruptNVF) - } - - return hash.Hash{}, nil, err - } - - var nbf *types.NomsBinFormat - switch string(data) { - case types.Format_LD_1.VersionString(): - nbf = types.Format_LD_1 - case types.Format_DOLT.VersionString(): - nbf = types.Format_DOLT - default: - return hash.Hash{}, nil, fmt.Errorf("unknown noms format: %s", string(data)) - } - - store, err := NewFileValueStore(nbf) - - if err != nil { - return hash.Hash{}, nil, err - } - - // read the root hash and the chunk count - hashBytes, _ := iohelp.ReadNBytes(errRd, hash.ByteLen) - numChunks, err := errRd.ReadUint32(binary.BigEndian) - - if err != nil { - if err == io.EOF { - err = fmt.Errorf("EOF read while trying to read the root hash and chunk count - %w", ErrCorruptNVF) - } - - return hash.Hash{}, nil, err - } - - // read the hashes and sizes - type hashAndSize struct { - h hash.Hash - size uint32 - } - hashesAndSizes := make([]hashAndSize, numChunks) - for i := uint32(0); i < numChunks; i++ { - chHashBytes, _ := iohelp.ReadNBytes(errRd, hash.ByteLen) - size, err := errRd.ReadUint32(binary.BigEndian) - - if err != nil { - if err == io.EOF { - err = fmt.Errorf("EOF read the root hash and chunk count - %w", ErrCorruptNVF) - } - - return hash.Hash{}, nil, err - } - - hashesAndSizes[i] = hashAndSize{hash.New(chHashBytes), size} - } - - // read the data and validate it against the expected hashes - for _, hashAndSize := range hashesAndSizes { - h := hashAndSize.h - size := hashAndSize.size - chBytes, err := iohelp.ReadNBytes(errRd, int(size)) - - if err != nil && err != io.EOF || err == io.EOF && uint32(len(chBytes)) != size { - if err == io.EOF { - err = fmt.Errorf("EOF read trying to read chunk - %w", ErrCorruptNVF) - } - - return hash.Hash{}, nil, err - } - - ch := chunks.NewChunk(chBytes) - - if h != ch.Hash() { - return hash.Hash{}, nil, errors.New("data corrupted") - } - - err = store.Put(ctx, ch, func(c chunks.Chunk) chunks.GetAddrsCb { - return func(_ context.Context, _ hash.HashSet, _ chunks.PendingRefExists) error { return nil } - }) - - if err != nil { - return hash.Hash{}, nil, err - } - } - - return hash.New(hashBytes), store, nil -} diff --git a/go/store/valuefile/value_file_test.go b/go/store/valuefile/value_file_test.go deleted file mode 100644 index d1926b749f..0000000000 --- a/go/store/valuefile/value_file_test.go +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright 2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package valuefile - -import ( - "os" - "path/filepath" - "testing" - - "github.com/dolthub/go-mysql-server/sql" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/dolthub/dolt/go/gen/fb/serial" - "github.com/dolthub/dolt/go/store/prolly" - "github.com/dolthub/dolt/go/store/prolly/message" - "github.com/dolthub/dolt/go/store/prolly/shim" - "github.com/dolthub/dolt/go/store/prolly/tree" - "github.com/dolthub/dolt/go/store/types" - "github.com/dolthub/dolt/go/store/val" -) - -func TestReadWriteValueFile(t *testing.T) { - const numMaps = 1 - const numMapValues = 1 - - ctx := sql.NewEmptyContext() - store, err := NewFileValueStore(types.Format_Default) - require.NoError(t, err) - - var values []types.Value - for i := 0; i < numMaps; i++ { - var kvs []types.Value - for j := 0; j < numMapValues; j++ { - kvs = append(kvs, types.Int(j), types.String(uuid.New().String())) - } - m, err := types.NewMap(ctx, store, kvs...) - require.NoError(t, err) - - values = append(values, m) - } - - path := filepath.Join(os.TempDir(), "file.nvf") - err = WriteValueFile(ctx, path, store, values...) - require.NoError(t, err) - - vf, err := ReadValueFile(ctx, path) - require.NoError(t, err) - require.NotNil(t, vf.Ns) - require.Equal(t, len(values), len(vf.Values)) - - for i := 0; i < len(values); i++ { - require.True(t, values[i].Equals(vf.Values[i])) - } -} - -func TestRoundtripProllyMapIntoValueFile(t *testing.T) { - const numMaps = 5 - const numMapEntries = 1000 - - ctx := sql.NewEmptyContext() - store, err := NewFileValueStore(types.Format_DOLT) - require.NoError(t, err) - oldNs := tree.NewNodeStore(store) - vrw := types.NewValueStore(store) - - var values []types.Value - var expectedMaps []prolly.Map - - for i := 0; i < numMaps; i++ { - m, _ := makeProllyMap(t, oldNs, numMapEntries) - expectedMaps = append(expectedMaps, m) - v := shim.ValueFromMap(m) - - ref, err := vrw.WriteValue(ctx, v) - require.NoError(t, err) - - values = append(values, ref) - } - - path := filepath.Join(os.TempDir(), "file.nvf") - err = WriteValueFile(ctx, path, store, values...) - require.NoError(t, err) - - vf, err := ReadValueFile(ctx, path) - require.NoError(t, err) - require.NotNil(t, vf.Ns) - require.Equal(t, len(values), len(vf.Values)) - - for i := 0; i < len(vf.Values); i++ { - ref := vf.Values[i].(types.Ref) - v, err := vrw.ReadValue(ctx, ref.TargetHash()) - require.NoError(t, err) - rootNode, fileId, err := shim.NodeFromValue(v) - require.NoError(t, err) - require.Equal(t, fileId, serial.ProllyTreeNodeFileID) - m := prolly.NewMap(rootNode, vf.Ns, kd, vd) - assertProllyMapsEqual(t, expectedMaps[i], m) - } -} - -func assertProllyMapsEqual(t *testing.T, expected, received prolly.Map) { - assert.Equal(t, expected.HashOf(), received.HashOf()) - - s, err := prolly.DebugFormat(sql.NewEmptyContext(), expected) - require.NoError(t, err) - s2, err := prolly.DebugFormat(sql.NewEmptyContext(), received) - require.NoError(t, err) - require.Equal(t, s, s2) -} - -var kd = val.NewTupleDescriptor( - val.Type{Enc: val.Uint32Enc, Nullable: false}, -) -var vd = val.NewTupleDescriptor( - val.Type{Enc: val.Uint32Enc, Nullable: true}, - val.Type{Enc: val.Uint32Enc, Nullable: true}, - val.Type{Enc: val.Uint32Enc, Nullable: true}, -) - -func makeProllyMap(t *testing.T, ns tree.NodeStore, count int) (prolly.Map, [][2]val.Tuple) { - ctx := sql.NewEmptyContext() - tuples, err := tree.RandomTuplePairs(ctx, count, kd, vd, ns) - require.NoError(t, err) - om := mustProllyMapFromTuples(t, kd, vd, ns, tuples) - - for i := 0; i < len(tuples); i++ { - var found bool - err := om.Get(sql.NewEmptyContext(), tuples[i][0], func(k, v val.Tuple) error { - assert.Equal(t, tuples[i][0], k) - assert.Equal(t, tuples[i][1], v) - found = true - return nil - }) - require.NoError(t, err) - assert.True(t, found) - } - - return om, tuples -} - -func mustProllyMapFromTuples(t *testing.T, kd, vd *val.TupleDesc, ns tree.NodeStore, tuples [][2]val.Tuple) prolly.Map { - ctx := sql.NewEmptyContext() - - serializer := message.NewProllyMapSerializer(vd, ns.Pool()) - chunker, err := tree.NewEmptyChunker(ctx, ns, serializer) - require.NoError(t, err) - - for _, pair := range tuples { - err := chunker.AddPair(ctx, tree.Item(pair[0]), tree.Item(pair[1])) - require.NoError(t, err) - } - root, err := chunker.Done(ctx) - require.NoError(t, err) - - return prolly.NewMap(root, ns, kd, vd) -} From f637baa3af9d25ffe6811843918053b6fd6d40cc Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Mon, 15 Dec 2025 20:56:07 +0000 Subject: [PATCH 2/5] Remove dead iohelp code --- go/libraries/utils/iohelp/read.go | 229 ------------------------- go/libraries/utils/iohelp/read_test.go | 208 ---------------------- 2 files changed, 437 deletions(-) diff --git a/go/libraries/utils/iohelp/read.go b/go/libraries/utils/iohelp/read.go index 1f50a3457d..11e6e9d720 100644 --- a/go/libraries/utils/iohelp/read.go +++ b/go/libraries/utils/iohelp/read.go @@ -16,12 +16,7 @@ package iohelp import ( "bufio" - "encoding/binary" - "errors" "io" - "sync" - "sync/atomic" - "time" ) // ErrPreservingReader is a utility class that provides methods to read from a reader where errors can be ignored and @@ -53,22 +48,6 @@ func (r *ErrPreservingReader) Read(p []byte) (int, error) { return n, r.Err } -// Read -func (r *ErrPreservingReader) ReadUint32(order binary.ByteOrder) (uint32, error) { - if r.Err != nil { - return 0, r.Err - } - - bytes, err := ReadNBytes(r, 4) - - if err != nil { - r.Err = err - return 0, r.Err - } - - return order.Uint32(bytes), nil -} - // ReadNBytes will read n bytes from the given reader and return a new slice containing the data. ReadNBytes will always // return n bytes, or it will return no data and an error (So if you request 100 bytes and there are only 99 left before // the reader returns io.EOF you won't receive any of the data as this is considered an error as it can't read 100 bytes). @@ -90,53 +69,6 @@ func ReadNBytes(r io.Reader, n int) ([]byte, error) { return bytes, nil } -// ReadLineNoBuf will read a line from an unbuffered io.Reader where it considers lines to be separated by newlines (\n). -// The data returned will be a string with \r\n characters removed from the end, a bool which says whether the end of -// the stream has been reached, and any errors that have been encountered (other than eof which is treated as the end of -// the final line). This isn't efficient, so you shouldn't do this if you can use a buffered reader and the -// iohelp.ReadLine method. -func ReadLineNoBuf(r io.Reader) (string, bool, error) { - var err error - var dest []byte - var oneByte [1]byte - - for { - var n int - n, err = r.Read(oneByte[:]) - - if err != nil && err != io.EOF { - return "", true, err - } - - if n == 1 { - c := oneByte[0] - - if c == '\n' { - break - } - - dest = append(dest, c) - } - - if err == io.EOF { - break - } - } - - crlfCount := 0 - lineLen := len(dest) - for i := lineLen - 1; i >= 0; i-- { - ch := dest[i] - - if ch == '\r' || ch == '\n' { - crlfCount++ - } else { - break - } - } - - return string(dest[:lineLen-crlfCount]), err != nil, nil -} // ReadLine will read a line from an unbuffered io.Reader where it considers lines to be separated by newlines (\n). // The data returned will be a string with \r\n characters removed from the end, a bool which says whether the end of @@ -165,165 +97,4 @@ func ReadLine(br *bufio.Reader) (line string, done bool, err error) { return line[:lineLen-crlfCount], err != nil, nil } -/*func ReadLineFromJSON(br *bufio.Reader) (line map[string]interface{}, done bool, err error) { - line, err = br.ReadMap() -}*/ -// ErrThroughput is the error that is returned by ReadWithMinThroughput if the throughput drops below the threshold -var ErrThroughput = errors.New("throughput below minimum allowable") - -// MinThroughputCheckParams defines the miminimum throughput, how often it should be checked, and what the time window -// size is -type MinThroughputCheckParams struct { - // MinBytesPerSec is the minimum throughput. If ReadWithMinThroughput drops below this value for the most recent - // time window then it will fail. - MinBytesPerSec int64 - - // CheckInterval how often should the throughput be checked - CheckInterval time.Duration - - // NumIntervals defines the number of intervals that should be considered when looking at the throughput. - // NumIntervals*CheckInterval defines the window size - NumIntervals int -} - -type datapoint struct { - ts time.Time - val int64 -} - -type datapoints []datapoint - -// getThroughput returns the throughput for the most recent time window -func (initialDps datapoints) getThroughput(duration time.Duration) (datapoints, int64) { - dps := initialDps - now := time.Now() - cutoff := now.Add(-duration) - - // restrict datapoints to datapoints within the time window - for len(dps) > 1 { - if cutoff.After(dps[0].ts) { - dps = dps[1:] - } else { - break - } - } - - if len(dps) <= 1 { - return dps, 0 - } - - elapsed := now.Sub(dps[0].ts) - bytesRead := dps[len(dps)-1].val - dps[0].val - - return dps, int64(float64(bytesRead) / elapsed.Seconds()) -} - -// safeClose closes the provided closer recovering from any errors. -func safeClose(c io.Closer) { - defer func() { - recover() - }() - - c.Close() -} - -type readResults struct { - bytes []byte - err error -} - -// ReadNWithProgress reads n bytes from reader r. As it reads it atomically updates the value pointed at by -// bytesRead. In order to cancel this read the reader should be closed. -func ReadNWithProgress(r io.Reader, n int64, bytesRead *int64) ([]byte, error) { - var totalRead int64 - bytes := make([]byte, n) - - var err error - for totalRead < n && err == nil { - var read int - read, err = r.Read(bytes[totalRead:]) - - if err != nil && err != io.EOF { - break - } - - totalRead += int64(read) - - if bytesRead != nil { - atomic.StoreInt64(bytesRead, totalRead) - } - - if err == io.EOF { - err = nil - if totalRead != n { - err = io.ErrUnexpectedEOF - } - } - } - - return bytes[:totalRead], err -} - -// ReadWithMinThroughput reads n bytes from reader r erroring if the throughput ever drops below the threshold -// defined by MinThroughputCheckParams. -func ReadWithMinThroughput(r io.ReadCloser, n int64, mtcParams MinThroughputCheckParams) ([]byte, error) { - resChan := make(chan readResults, 1) - defer close(resChan) - - wg := &sync.WaitGroup{} - - var bytesReadSync int64 - - wg.Add(1) - go func() { - defer wg.Done() - defer func() { recover() }() - - bytes, err := ReadNWithProgress(r, n, &bytesReadSync) - res := readResults{bytes, err} - resChan <- res - }() - - checkDuration := mtcParams.CheckInterval * time.Duration(mtcParams.NumIntervals) - ticker := time.NewTicker(mtcParams.CheckInterval) - defer ticker.Stop() - - var points datapoints - var throughputErr bool - for !throughputErr { - select { - case res := <-resChan: - return res.bytes, res.err - case <-ticker.C: - } - - read := atomic.LoadInt64(&bytesReadSync) - points = append(points, datapoint{time.Now(), read}) - - if len(points) >= mtcParams.NumIntervals { - var bps int64 - points, bps = points.getThroughput(checkDuration) - - if bps < mtcParams.MinBytesPerSec { - safeClose(r) - throughputErr = true - } - } - } - - wg.Wait() - - select { - case res := <-resChan: - err := res.err - - if throughputErr { - err = ErrThroughput - } - - return res.bytes, err - default: - panic("bug. Should never reach here.") - } -} diff --git a/go/libraries/utils/iohelp/read_test.go b/go/libraries/utils/iohelp/read_test.go index 5947e1743b..f1ce642674 100644 --- a/go/libraries/utils/iohelp/read_test.go +++ b/go/libraries/utils/iohelp/read_test.go @@ -17,15 +17,9 @@ package iohelp import ( "bufio" "bytes" - "errors" - "io" "reflect" "testing" - "time" - "github.com/stretchr/testify/assert" - - "github.com/dolthub/dolt/go/libraries/utils/osutil" "github.com/dolthub/dolt/go/libraries/utils/test" ) @@ -77,10 +71,8 @@ var rlTests = []struct { func TestReadReadLineFunctions(t *testing.T) { for _, test := range rlTests { bufferedTest := getTestReadLineClosure(test.inputStr) - unbufferedTest := getTestReadLineNoBufClosure(test.inputStr) testReadLineFunctions(t, "buffered", test.expectedLines, bufferedTest) - testReadLineFunctions(t, "unbuffered", test.expectedLines, unbufferedTest) } } @@ -93,13 +85,6 @@ func getTestReadLineClosure(inputStr string) func() (string, bool, error) { } } -func getTestReadLineNoBufClosure(inputStr string) func() (string, bool, error) { - r := bytes.NewReader([]byte(inputStr)) - - return func() (string, bool, error) { - return ReadLineNoBuf(r) - } -} func testReadLineFunctions(t *testing.T, testType string, expected []string, rlFunc func() (string, bool, error)) { var isDone bool @@ -120,197 +105,4 @@ func testReadLineFunctions(t *testing.T, testType string, expected []string, rlF } } -var ErrClosed = errors.New("") -type FixedRateDataGenerator struct { - BytesPerInterval int - Interval time.Duration - lastRead time.Time - closeChan chan struct{} - dataGenerated uint64 -} - -func NewFixedRateDataGenerator(bytesPerInterval int, interval time.Duration) *FixedRateDataGenerator { - return &FixedRateDataGenerator{ - bytesPerInterval, - interval, - time.Now(), - make(chan struct{}), - 0, - } -} - -func (gen *FixedRateDataGenerator) Read(p []byte) (int, error) { - nextRead := gen.Interval - (time.Now().Sub(gen.lastRead)) - - select { - case <-gen.closeChan: - return 0, ErrClosed - case <-time.After(nextRead): - gen.dataGenerated += uint64(gen.BytesPerInterval) - gen.lastRead = time.Now() - return min(gen.BytesPerInterval, len(p)), nil - } -} - -func (gen *FixedRateDataGenerator) Close() error { - close(gen.closeChan) - return nil -} - -type ErroringReader struct { - Err error -} - -func (er ErroringReader) Read(p []byte) (int, error) { - return 0, er.Err -} - -func (er ErroringReader) Close() error { - return nil -} - -type ReaderSizePair struct { - Reader io.ReadCloser - Size int -} - -type ReaderCollection struct { - ReadersAndSizes []ReaderSizePair - currIdx int - currReaderRead int -} - -func NewReaderCollection(readerSizePair ...ReaderSizePair) *ReaderCollection { - if len(readerSizePair) == 0 { - panic("no readers") - } - - for _, rsp := range readerSizePair { - if rsp.Size <= 0 { - panic("invalid size") - } - - if rsp.Reader == nil { - panic("invalid reader") - } - } - - return &ReaderCollection{readerSizePair, 0, 0} -} - -func (rc *ReaderCollection) Read(p []byte) (int, error) { - if rc.currIdx < len(rc.ReadersAndSizes) { - currReader := rc.ReadersAndSizes[rc.currIdx].Reader - currSize := rc.ReadersAndSizes[rc.currIdx].Size - remaining := currSize - rc.currReaderRead - - n, err := currReader.Read(p) - - if err != nil { - return 0, err - } - - if n >= remaining { - n = remaining - rc.currIdx++ - rc.currReaderRead = 0 - } else { - rc.currReaderRead += n - } - - return n, err - } - - return 0, io.EOF -} - -func (rc *ReaderCollection) Close() error { - for _, rsp := range rc.ReadersAndSizes { - err := rsp.Reader.Close() - - if err != nil { - return err - } - } - - return nil -} - -func TestReadWithMinThroughput(t *testing.T) { - t.Skip("Skipping test in all cases as it is inconsistent on Unix") - if osutil.IsWindows { - t.Skip("Skipping test as it is too inconsistent on Windows and will randomly pass or fail") - } - tests := []struct { - name string - numBytes int64 - reader io.ReadCloser - mtcp MinThroughputCheckParams - expErr bool - expThroughErr bool - }{ - { - "10MB @ max(100MBps) > 50MBps", - 10 * 1024 * 1024, - NewReaderCollection( - ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 10 * 1024 * 1024}, - ), - MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10}, - false, - false, - }, - { - "5MB then error", - 10 * 1024 * 1024, - NewReaderCollection( - ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024}, - ReaderSizePair{ErroringReader{errors.New("test err")}, 100 * 1024}, - ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024}, - ), - MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10}, - true, - false, - }, - { - "5MB then slow < 50Mbps", - 10 * 1024 * 1024, - NewReaderCollection( - ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024}, - ReaderSizePair{NewFixedRateDataGenerator(49*1024, time.Millisecond), 5 * 1024 * 1024}, - ), - MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10}, - false, - true, - }, - { - "5MB then stops", - 10 * 1024 * 1024, - NewReaderCollection( - ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024}, - ReaderSizePair{NewFixedRateDataGenerator(0, 100*time.Second), 5 * 1024 * 1024}, - ), - MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10}, - false, - true, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - data, err := ReadWithMinThroughput(test.reader, test.numBytes, test.mtcp) - - if test.expErr || test.expThroughErr { - if test.expThroughErr { - assert.Equal(t, err, ErrThroughput) - } else { - assert.Error(t, err) - assert.NotEqual(t, err, ErrThroughput) - } - } else { - assert.Equal(t, len(data), int(test.numBytes)) - assert.NoError(t, err) - } - }) - } -} From 66f6b271834794cce4ccfa00ac4389c81fc6b483 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Mon, 15 Dec 2025 21:11:33 +0000 Subject: [PATCH 3/5] Remove the iohelp.ReadNBytes which is better done with ReadFull --- go/libraries/utils/iohelp/read.go | 20 -------------------- go/libraries/utils/iohelp/read_test.go | 18 ++++++++++++------ go/store/nbs/table_index.go | 4 ++-- go/store/nbs/util.go | 6 +++--- go/store/types/tuple_stream.go | 6 ++++-- 5 files changed, 21 insertions(+), 33 deletions(-) diff --git a/go/libraries/utils/iohelp/read.go b/go/libraries/utils/iohelp/read.go index 11e6e9d720..10ec592d7e 100644 --- a/go/libraries/utils/iohelp/read.go +++ b/go/libraries/utils/iohelp/read.go @@ -48,26 +48,6 @@ func (r *ErrPreservingReader) Read(p []byte) (int, error) { return n, r.Err } -// ReadNBytes will read n bytes from the given reader and return a new slice containing the data. ReadNBytes will always -// return n bytes, or it will return no data and an error (So if you request 100 bytes and there are only 99 left before -// the reader returns io.EOF you won't receive any of the data as this is considered an error as it can't read 100 bytes). -func ReadNBytes(r io.Reader, n int) ([]byte, error) { - bytes := make([]byte, n) - - var err error - for totalRead := 0; totalRead < n; { - if err != nil { - return nil, err - } - - read := 0 - read, err = r.Read(bytes[totalRead:]) - - totalRead += read - } - - return bytes, nil -} // ReadLine will read a line from an unbuffered io.Reader where it considers lines to be separated by newlines (\n). diff --git a/go/libraries/utils/iohelp/read_test.go b/go/libraries/utils/iohelp/read_test.go index f1ce642674..12ebd0ff3f 100644 --- a/go/libraries/utils/iohelp/read_test.go +++ b/go/libraries/utils/iohelp/read_test.go @@ -17,6 +17,7 @@ package iohelp import ( "bufio" "bytes" + "io" "reflect" "testing" @@ -27,10 +28,14 @@ func TestErrPreservingReader(t *testing.T) { tr := test.NewTestReader(32, 16) epr := NewErrPreservingReader(tr) - read1, noErr1 := ReadNBytes(epr, 8) - read2, noErr2 := ReadNBytes(epr, 8) - read3, firstErr := ReadNBytes(epr, 8) - read4, secondErr := ReadNBytes(epr, 8) + read1 := make([]byte, 8) + _, noErr1 := io.ReadFull(epr, read1) + read2 := make([]byte, 8) + _, noErr2 := io.ReadFull(epr, read2) + read3 := make([]byte, 8) + _, firstErr := io.ReadFull(epr, read3) + read4 := make([]byte, 8) + _, secondErr := io.ReadFull(epr, read4) for i := 0; i < 8; i++ { if read1[i] != byte(i) || read2[i] != byte(i)+8 { @@ -38,8 +43,9 @@ func TestErrPreservingReader(t *testing.T) { } } - if read3 != nil || read4 != nil { - t.Error("Unexpected read values should be nil.") + // With io.ReadFull, we expect the buffers to exist but error should be set + if len(read3) == 0 || len(read4) == 0 { + t.Error("Expected read buffers to exist.") } if noErr1 != nil || noErr2 != nil { diff --git a/go/store/nbs/table_index.go b/go/store/nbs/table_index.go index 40ad646210..0ec6e6767c 100644 --- a/go/store/nbs/table_index.go +++ b/go/store/nbs/table_index.go @@ -25,7 +25,6 @@ import ( "runtime/debug" "sync/atomic" - "github.com/dolthub/dolt/go/libraries/utils/iohelp" "github.com/dolthub/dolt/go/store/hash" ) @@ -91,7 +90,8 @@ func ReadTableFooter(rd io.ReadSeeker) (chunkCount uint32, totalUncompressedData return 0, 0, err } - footer, err := iohelp.ReadNBytes(rd, int(footerSize)) + footer := make([]byte, footerSize) + _, err = io.ReadFull(rd, footer) if err != nil { return 0, 0, err diff --git a/go/store/nbs/util.go b/go/store/nbs/util.go index 6ee16b033c..c1db74f186 100644 --- a/go/store/nbs/util.go +++ b/go/store/nbs/util.go @@ -19,8 +19,6 @@ import ( "io" "math" - "github.com/dolthub/dolt/go/libraries/utils/iohelp" - "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" ) @@ -100,5 +98,7 @@ func readNFrom(rd io.ReadSeeker, offset uint64, length uint32) ([]byte, error) { return nil, err } - return iohelp.ReadNBytes(rd, int(length)) + buf := make([]byte, length) + _, err = io.ReadFull(rd, buf) + return buf, err } diff --git a/go/store/types/tuple_stream.go b/go/store/types/tuple_stream.go index 0160b336c6..6ca6bc6441 100644 --- a/go/store/types/tuple_stream.go +++ b/go/store/types/tuple_stream.go @@ -150,7 +150,8 @@ func NewTupleReader(nbf *NomsBinFormat, vrw ValueReadWriter, rd io.Reader) Tuple // Read reades the next tuple from the TupleReader func (trd *tupleReaderImpl) Read() (*Tuple, error) { - sizeBytes, err := iohelp.ReadNBytes(trd.rd, 4) + sizeBytes := make([]byte, 4) + _, err := io.ReadFull(trd.rd, sizeBytes) if err != nil { return nil, err } @@ -161,7 +162,8 @@ func (trd *tupleReaderImpl) Read() (*Tuple, error) { return nil, nil } - data, err := iohelp.ReadNBytes(trd.rd, int(size)) + data := make([]byte, size) + _, err = io.ReadFull(trd.rd, data) if err != nil { if err == io.EOF { return nil, errors.New("corrupt tuple stream") From 338d835f6f626a5dcbf7731184e1d0e05cc23466 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Mon, 15 Dec 2025 21:14:50 +0000 Subject: [PATCH 4/5] Remove ErrPreservingReader --- go/libraries/utils/iohelp/read.go | 33 -------------------- go/libraries/utils/iohelp/read_test.go | 43 -------------------------- 2 files changed, 76 deletions(-) diff --git a/go/libraries/utils/iohelp/read.go b/go/libraries/utils/iohelp/read.go index 10ec592d7e..952559c013 100644 --- a/go/libraries/utils/iohelp/read.go +++ b/go/libraries/utils/iohelp/read.go @@ -19,37 +19,6 @@ import ( "io" ) -// ErrPreservingReader is a utility class that provides methods to read from a reader where errors can be ignored and -// handled later. Once an error occurs subsequent calls to read won't pull data from the io.Reader, will be a noop, and -// the initial error can be retrieved from Err at any time. ErrPreservingReader implements the io.Reader interface -// itself so it can be used as any other Reader would be. -type ErrPreservingReader struct { - // R is the reader supplying the actual data. - R io.Reader - - // Err is the first error that occurred, or nil - Err error -} - -// NewErrPreservingReader creates a new instance of an ErrPreservingReader -func NewErrPreservingReader(r io.Reader) *ErrPreservingReader { - return &ErrPreservingReader{r, nil} -} - -// Read reads data from the underlying io.Reader if no previous errors have occurred. If an error has already occurred -// then read will simply no-op and return 0 for the number of bytes read and the original error. -func (r *ErrPreservingReader) Read(p []byte) (int, error) { - n := 0 - - if r.Err == nil { - n, r.Err = r.R.Read(p) - } - - return n, r.Err -} - - - // ReadLine will read a line from an unbuffered io.Reader where it considers lines to be separated by newlines (\n). // The data returned will be a string with \r\n characters removed from the end, a bool which says whether the end of // the stream has been reached, and any errors that have been encountered (other than eof which is treated as the end of @@ -76,5 +45,3 @@ func ReadLine(br *bufio.Reader) (line string, done bool, err error) { return line[:lineLen-crlfCount], err != nil, nil } - - diff --git a/go/libraries/utils/iohelp/read_test.go b/go/libraries/utils/iohelp/read_test.go index 12ebd0ff3f..40e4559fb5 100644 --- a/go/libraries/utils/iohelp/read_test.go +++ b/go/libraries/utils/iohelp/read_test.go @@ -17,53 +17,10 @@ package iohelp import ( "bufio" "bytes" - "io" "reflect" "testing" - - "github.com/dolthub/dolt/go/libraries/utils/test" ) -func TestErrPreservingReader(t *testing.T) { - tr := test.NewTestReader(32, 16) - epr := NewErrPreservingReader(tr) - - read1 := make([]byte, 8) - _, noErr1 := io.ReadFull(epr, read1) - read2 := make([]byte, 8) - _, noErr2 := io.ReadFull(epr, read2) - read3 := make([]byte, 8) - _, firstErr := io.ReadFull(epr, read3) - read4 := make([]byte, 8) - _, secondErr := io.ReadFull(epr, read4) - - for i := 0; i < 8; i++ { - if read1[i] != byte(i) || read2[i] != byte(i)+8 { - t.Error("Unexpected values read.") - } - } - - // With io.ReadFull, we expect the buffers to exist but error should be set - if len(read3) == 0 || len(read4) == 0 { - t.Error("Expected read buffers to exist.") - } - - if noErr1 != nil || noErr2 != nil { - t.Error("Unexpected error.") - } - - if firstErr == nil || secondErr == nil || epr.Err == nil { - t.Error("Expected error not received.") - } else { - first := firstErr.(*test.TestError).ErrId - second := secondErr.(*test.TestError).ErrId - preservedErrID := epr.Err.(*test.TestError).ErrId - - if preservedErrID != first || preservedErrID != second { - t.Error("Error not preserved properly.") - } - } -} var rlTests = []struct { inputStr string From e49f1992db76752eaff8dd58c920d3f726cea332 Mon Sep 17 00:00:00 2001 From: macneale4 Date: Mon, 15 Dec 2025 21:26:35 +0000 Subject: [PATCH 5/5] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/utils/iohelp/read_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/go/libraries/utils/iohelp/read_test.go b/go/libraries/utils/iohelp/read_test.go index 40e4559fb5..e08bd2f1bf 100644 --- a/go/libraries/utils/iohelp/read_test.go +++ b/go/libraries/utils/iohelp/read_test.go @@ -21,7 +21,6 @@ import ( "testing" ) - var rlTests = []struct { inputStr string expectedLines []string @@ -48,7 +47,6 @@ func getTestReadLineClosure(inputStr string) func() (string, bool, error) { } } - func testReadLineFunctions(t *testing.T, testType string, expected []string, rlFunc func() (string, bool, error)) { var isDone bool var line string @@ -67,5 +65,3 @@ func testReadLineFunctions(t *testing.T, testType string, expected []string, rlF t.Error("Received unexpected results.") } } - -