Merge pull request #10215 from dolthub/macneale4-claude/valuefile

[no-release-notes] Remove the 'valuefile' and other dead code
This commit is contained in:
Neil Macneale IV
2025-12-15 14:29:17 -08:00
committed by GitHub
8 changed files with 9 additions and 1327 deletions

View File

@@ -16,128 +16,9 @@ package iohelp
import (
"bufio"
"encoding/binary"
"errors"
"io"
"sync"
"sync/atomic"
"time"
)
// ErrPreservingReader is a utility class that provides methods to read from a reader where errors can be ignored and
// handled later. Once an error occurs subsequent calls to read won't pull data from the io.Reader, will be a noop, and
// the initial error can be retrieved from Err at any time. ErrPreservingReader implements the io.Reader interface
// itself so it can be used as any other Reader would be.
type ErrPreservingReader struct {
// R is the reader supplying the actual data.
R io.Reader
// Err is the first error that occurred, or nil
Err error
}
// NewErrPreservingReader creates a new instance of an ErrPreservingReader
func NewErrPreservingReader(r io.Reader) *ErrPreservingReader {
return &ErrPreservingReader{r, nil}
}
// Read reads data from the underlying io.Reader if no previous errors have occurred. If an error has already occurred
// then read will simply no-op and return 0 for the number of bytes read and the original error.
func (r *ErrPreservingReader) Read(p []byte) (int, error) {
n := 0
if r.Err == nil {
n, r.Err = r.R.Read(p)
}
return n, r.Err
}
// Read
func (r *ErrPreservingReader) ReadUint32(order binary.ByteOrder) (uint32, error) {
if r.Err != nil {
return 0, r.Err
}
bytes, err := ReadNBytes(r, 4)
if err != nil {
r.Err = err
return 0, r.Err
}
return order.Uint32(bytes), nil
}
// ReadNBytes will read n bytes from the given reader and return a new slice containing the data. ReadNBytes will always
// return n bytes, or it will return no data and an error (So if you request 100 bytes and there are only 99 left before
// the reader returns io.EOF you won't receive any of the data as this is considered an error as it can't read 100 bytes).
func ReadNBytes(r io.Reader, n int) ([]byte, error) {
bytes := make([]byte, n)
var err error
for totalRead := 0; totalRead < n; {
if err != nil {
return nil, err
}
read := 0
read, err = r.Read(bytes[totalRead:])
totalRead += read
}
return bytes, nil
}
// ReadLineNoBuf will read a line from an unbuffered io.Reader where it considers lines to be separated by newlines (\n).
// The data returned will be a string with \r\n characters removed from the end, a bool which says whether the end of
// the stream has been reached, and any errors that have been encountered (other than eof which is treated as the end of
// the final line). This isn't efficient, so you shouldn't do this if you can use a buffered reader and the
// iohelp.ReadLine method.
func ReadLineNoBuf(r io.Reader) (string, bool, error) {
var err error
var dest []byte
var oneByte [1]byte
for {
var n int
n, err = r.Read(oneByte[:])
if err != nil && err != io.EOF {
return "", true, err
}
if n == 1 {
c := oneByte[0]
if c == '\n' {
break
}
dest = append(dest, c)
}
if err == io.EOF {
break
}
}
crlfCount := 0
lineLen := len(dest)
for i := lineLen - 1; i >= 0; i-- {
ch := dest[i]
if ch == '\r' || ch == '\n' {
crlfCount++
} else {
break
}
}
return string(dest[:lineLen-crlfCount]), err != nil, nil
}
// ReadLine will read a line from an unbuffered io.Reader where it considers lines to be separated by newlines (\n).
// The data returned will be a string with \r\n characters removed from the end, a bool which says whether the end of
// the stream has been reached, and any errors that have been encountered (other than eof which is treated as the end of
@@ -164,166 +45,3 @@ func ReadLine(br *bufio.Reader) (line string, done bool, err error) {
return line[:lineLen-crlfCount], err != nil, nil
}
/*func ReadLineFromJSON(br *bufio.Reader) (line map[string]interface{}, done bool, err error) {
line, err = br.ReadMap()
}*/
// ErrThroughput is the error that is returned by ReadWithMinThroughput if the throughput drops below the threshold
var ErrThroughput = errors.New("throughput below minimum allowable")
// MinThroughputCheckParams defines the miminimum throughput, how often it should be checked, and what the time window
// size is
type MinThroughputCheckParams struct {
// MinBytesPerSec is the minimum throughput. If ReadWithMinThroughput drops below this value for the most recent
// time window then it will fail.
MinBytesPerSec int64
// CheckInterval how often should the throughput be checked
CheckInterval time.Duration
// NumIntervals defines the number of intervals that should be considered when looking at the throughput.
// NumIntervals*CheckInterval defines the window size
NumIntervals int
}
type datapoint struct {
ts time.Time
val int64
}
type datapoints []datapoint
// getThroughput returns the throughput for the most recent time window
func (initialDps datapoints) getThroughput(duration time.Duration) (datapoints, int64) {
dps := initialDps
now := time.Now()
cutoff := now.Add(-duration)
// restrict datapoints to datapoints within the time window
for len(dps) > 1 {
if cutoff.After(dps[0].ts) {
dps = dps[1:]
} else {
break
}
}
if len(dps) <= 1 {
return dps, 0
}
elapsed := now.Sub(dps[0].ts)
bytesRead := dps[len(dps)-1].val - dps[0].val
return dps, int64(float64(bytesRead) / elapsed.Seconds())
}
// safeClose closes the provided closer recovering from any errors.
func safeClose(c io.Closer) {
defer func() {
recover()
}()
c.Close()
}
type readResults struct {
bytes []byte
err error
}
// ReadNWithProgress reads n bytes from reader r. As it reads it atomically updates the value pointed at by
// bytesRead. In order to cancel this read the reader should be closed.
func ReadNWithProgress(r io.Reader, n int64, bytesRead *int64) ([]byte, error) {
var totalRead int64
bytes := make([]byte, n)
var err error
for totalRead < n && err == nil {
var read int
read, err = r.Read(bytes[totalRead:])
if err != nil && err != io.EOF {
break
}
totalRead += int64(read)
if bytesRead != nil {
atomic.StoreInt64(bytesRead, totalRead)
}
if err == io.EOF {
err = nil
if totalRead != n {
err = io.ErrUnexpectedEOF
}
}
}
return bytes[:totalRead], err
}
// ReadWithMinThroughput reads n bytes from reader r erroring if the throughput ever drops below the threshold
// defined by MinThroughputCheckParams.
func ReadWithMinThroughput(r io.ReadCloser, n int64, mtcParams MinThroughputCheckParams) ([]byte, error) {
resChan := make(chan readResults, 1)
defer close(resChan)
wg := &sync.WaitGroup{}
var bytesReadSync int64
wg.Add(1)
go func() {
defer wg.Done()
defer func() { recover() }()
bytes, err := ReadNWithProgress(r, n, &bytesReadSync)
res := readResults{bytes, err}
resChan <- res
}()
checkDuration := mtcParams.CheckInterval * time.Duration(mtcParams.NumIntervals)
ticker := time.NewTicker(mtcParams.CheckInterval)
defer ticker.Stop()
var points datapoints
var throughputErr bool
for !throughputErr {
select {
case res := <-resChan:
return res.bytes, res.err
case <-ticker.C:
}
read := atomic.LoadInt64(&bytesReadSync)
points = append(points, datapoint{time.Now(), read})
if len(points) >= mtcParams.NumIntervals {
var bps int64
points, bps = points.getThroughput(checkDuration)
if bps < mtcParams.MinBytesPerSec {
safeClose(r)
throughputErr = true
}
}
}
wg.Wait()
select {
case res := <-resChan:
err := res.err
if throughputErr {
err = ErrThroughput
}
return res.bytes, err
default:
panic("bug. Should never reach here.")
}
}

View File

@@ -17,54 +17,10 @@ package iohelp
import (
"bufio"
"bytes"
"errors"
"io"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/dolthub/dolt/go/libraries/utils/osutil"
"github.com/dolthub/dolt/go/libraries/utils/test"
)
func TestErrPreservingReader(t *testing.T) {
tr := test.NewTestReader(32, 16)
epr := NewErrPreservingReader(tr)
read1, noErr1 := ReadNBytes(epr, 8)
read2, noErr2 := ReadNBytes(epr, 8)
read3, firstErr := ReadNBytes(epr, 8)
read4, secondErr := ReadNBytes(epr, 8)
for i := 0; i < 8; i++ {
if read1[i] != byte(i) || read2[i] != byte(i)+8 {
t.Error("Unexpected values read.")
}
}
if read3 != nil || read4 != nil {
t.Error("Unexpected read values should be nil.")
}
if noErr1 != nil || noErr2 != nil {
t.Error("Unexpected error.")
}
if firstErr == nil || secondErr == nil || epr.Err == nil {
t.Error("Expected error not received.")
} else {
first := firstErr.(*test.TestError).ErrId
second := secondErr.(*test.TestError).ErrId
preservedErrID := epr.Err.(*test.TestError).ErrId
if preservedErrID != first || preservedErrID != second {
t.Error("Error not preserved properly.")
}
}
}
var rlTests = []struct {
inputStr string
expectedLines []string
@@ -77,10 +33,8 @@ var rlTests = []struct {
func TestReadReadLineFunctions(t *testing.T) {
for _, test := range rlTests {
bufferedTest := getTestReadLineClosure(test.inputStr)
unbufferedTest := getTestReadLineNoBufClosure(test.inputStr)
testReadLineFunctions(t, "buffered", test.expectedLines, bufferedTest)
testReadLineFunctions(t, "unbuffered", test.expectedLines, unbufferedTest)
}
}
@@ -93,14 +47,6 @@ func getTestReadLineClosure(inputStr string) func() (string, bool, error) {
}
}
func getTestReadLineNoBufClosure(inputStr string) func() (string, bool, error) {
r := bytes.NewReader([]byte(inputStr))
return func() (string, bool, error) {
return ReadLineNoBuf(r)
}
}
func testReadLineFunctions(t *testing.T, testType string, expected []string, rlFunc func() (string, bool, error)) {
var isDone bool
var line string
@@ -119,198 +65,3 @@ func testReadLineFunctions(t *testing.T, testType string, expected []string, rlF
t.Error("Received unexpected results.")
}
}
var ErrClosed = errors.New("")
type FixedRateDataGenerator struct {
BytesPerInterval int
Interval time.Duration
lastRead time.Time
closeChan chan struct{}
dataGenerated uint64
}
func NewFixedRateDataGenerator(bytesPerInterval int, interval time.Duration) *FixedRateDataGenerator {
return &FixedRateDataGenerator{
bytesPerInterval,
interval,
time.Now(),
make(chan struct{}),
0,
}
}
func (gen *FixedRateDataGenerator) Read(p []byte) (int, error) {
nextRead := gen.Interval - (time.Now().Sub(gen.lastRead))
select {
case <-gen.closeChan:
return 0, ErrClosed
case <-time.After(nextRead):
gen.dataGenerated += uint64(gen.BytesPerInterval)
gen.lastRead = time.Now()
return min(gen.BytesPerInterval, len(p)), nil
}
}
func (gen *FixedRateDataGenerator) Close() error {
close(gen.closeChan)
return nil
}
type ErroringReader struct {
Err error
}
func (er ErroringReader) Read(p []byte) (int, error) {
return 0, er.Err
}
func (er ErroringReader) Close() error {
return nil
}
type ReaderSizePair struct {
Reader io.ReadCloser
Size int
}
type ReaderCollection struct {
ReadersAndSizes []ReaderSizePair
currIdx int
currReaderRead int
}
func NewReaderCollection(readerSizePair ...ReaderSizePair) *ReaderCollection {
if len(readerSizePair) == 0 {
panic("no readers")
}
for _, rsp := range readerSizePair {
if rsp.Size <= 0 {
panic("invalid size")
}
if rsp.Reader == nil {
panic("invalid reader")
}
}
return &ReaderCollection{readerSizePair, 0, 0}
}
func (rc *ReaderCollection) Read(p []byte) (int, error) {
if rc.currIdx < len(rc.ReadersAndSizes) {
currReader := rc.ReadersAndSizes[rc.currIdx].Reader
currSize := rc.ReadersAndSizes[rc.currIdx].Size
remaining := currSize - rc.currReaderRead
n, err := currReader.Read(p)
if err != nil {
return 0, err
}
if n >= remaining {
n = remaining
rc.currIdx++
rc.currReaderRead = 0
} else {
rc.currReaderRead += n
}
return n, err
}
return 0, io.EOF
}
func (rc *ReaderCollection) Close() error {
for _, rsp := range rc.ReadersAndSizes {
err := rsp.Reader.Close()
if err != nil {
return err
}
}
return nil
}
func TestReadWithMinThroughput(t *testing.T) {
t.Skip("Skipping test in all cases as it is inconsistent on Unix")
if osutil.IsWindows {
t.Skip("Skipping test as it is too inconsistent on Windows and will randomly pass or fail")
}
tests := []struct {
name string
numBytes int64
reader io.ReadCloser
mtcp MinThroughputCheckParams
expErr bool
expThroughErr bool
}{
{
"10MB @ max(100MBps) > 50MBps",
10 * 1024 * 1024,
NewReaderCollection(
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 10 * 1024 * 1024},
),
MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10},
false,
false,
},
{
"5MB then error",
10 * 1024 * 1024,
NewReaderCollection(
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024},
ReaderSizePair{ErroringReader{errors.New("test err")}, 100 * 1024},
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024},
),
MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10},
true,
false,
},
{
"5MB then slow < 50Mbps",
10 * 1024 * 1024,
NewReaderCollection(
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024},
ReaderSizePair{NewFixedRateDataGenerator(49*1024, time.Millisecond), 5 * 1024 * 1024},
),
MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10},
false,
true,
},
{
"5MB then stops",
10 * 1024 * 1024,
NewReaderCollection(
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024},
ReaderSizePair{NewFixedRateDataGenerator(0, 100*time.Second), 5 * 1024 * 1024},
),
MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10},
false,
true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
data, err := ReadWithMinThroughput(test.reader, test.numBytes, test.mtcp)
if test.expErr || test.expThroughErr {
if test.expThroughErr {
assert.Equal(t, err, ErrThroughput)
} else {
assert.Error(t, err)
assert.NotEqual(t, err, ErrThroughput)
}
} else {
assert.Equal(t, len(data), int(test.numBytes))
assert.NoError(t, err)
}
})
}
}

View File

@@ -25,7 +25,6 @@ import (
"runtime/debug"
"sync/atomic"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -91,7 +90,8 @@ func ReadTableFooter(rd io.ReadSeeker) (chunkCount uint32, totalUncompressedData
return 0, 0, err
}
footer, err := iohelp.ReadNBytes(rd, int(footerSize))
footer := make([]byte, footerSize)
_, err = io.ReadFull(rd, footer)
if err != nil {
return 0, 0, err

View File

@@ -19,8 +19,6 @@ import (
"io"
"math"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -100,5 +98,7 @@ func readNFrom(rd io.ReadSeeker, offset uint64, length uint32) ([]byte, error) {
return nil, err
}
return iohelp.ReadNBytes(rd, int(length))
buf := make([]byte, length)
_, err = io.ReadFull(rd, buf)
return buf, err
}

View File

@@ -150,7 +150,8 @@ func NewTupleReader(nbf *NomsBinFormat, vrw ValueReadWriter, rd io.Reader) Tuple
// Read reades the next tuple from the TupleReader
func (trd *tupleReaderImpl) Read() (*Tuple, error) {
sizeBytes, err := iohelp.ReadNBytes(trd.rd, 4)
sizeBytes := make([]byte, 4)
_, err := io.ReadFull(trd.rd, sizeBytes)
if err != nil {
return nil, err
}
@@ -161,7 +162,8 @@ func (trd *tupleReaderImpl) Read() (*Tuple, error) {
return nil, nil
}
data, err := iohelp.ReadNBytes(trd.rd, int(size))
data := make([]byte, size)
_, err = io.ReadFull(trd.rd, data)
if err != nil {
if err == io.EOF {
return nil, errors.New("corrupt tuple stream")

View File

@@ -1,303 +0,0 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package valuefile
import (
"context"
"fmt"
"sort"
"sync"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)
var _ chunks.ChunkStore = (*FileValueStore)(nil)
var _ types.ValueReadWriter = (*FileValueStore)(nil)
// FileValueStore implements a trivial in memory chunks.ChunkStore and types.ValueReadWriter in order to allow easy
// serialization / deserialization of noms data to and from a file
type FileValueStore struct {
nbf *types.NomsBinFormat
valLock *sync.Mutex
values map[hash.Hash]types.Value
chunkLock *sync.Mutex
chunks map[hash.Hash][]byte
rootHash hash.Hash
}
// NewFileValueStore creates a new FileValueStore
func NewFileValueStore(nbf *types.NomsBinFormat) (*FileValueStore, error) {
return &FileValueStore{
nbf: nbf,
valLock: &sync.Mutex{},
values: make(map[hash.Hash]types.Value),
chunkLock: &sync.Mutex{},
chunks: make(map[hash.Hash][]byte),
}, nil
}
// Gets the NomsBinaryFormat for the Store
func (f *FileValueStore) Format() *types.NomsBinFormat {
return f.nbf
}
// ReadValue reads a value from the store
func (f *FileValueStore) ReadValue(ctx context.Context, h hash.Hash) (types.Value, error) {
f.valLock.Lock()
defer f.valLock.Unlock()
v := f.values[h]
return v, nil
}
// ReadManyValues reads and decodes Values indicated by |hashes| from lvs and returns the found Values in the same order.
// Any non-present Values will be represented by nil.
func (f *FileValueStore) ReadManyValues(ctx context.Context, hashes hash.HashSlice) (types.ValueSlice, error) {
f.valLock.Lock()
defer f.valLock.Unlock()
vals := make(types.ValueSlice, len(hashes))
for i, h := range hashes {
vals[i] = f.values[h]
}
return vals, nil
}
// WriteValue adds a value to the store
func (f *FileValueStore) WriteValue(ctx context.Context, v types.Value) (types.Ref, error) {
f.valLock.Lock()
defer f.valLock.Unlock()
h, err := v.Hash(f.nbf)
if err != nil {
return types.Ref{}, err
}
_, ok := f.values[h]
if !ok {
f.values[h] = v
c, err := types.EncodeValue(v, f.nbf)
if err != nil {
return types.Ref{}, err
}
err = f.Put(ctx, c, func(c chunks.Chunk) chunks.GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet, _ chunks.PendingRefExists) error {
return types.AddrsFromNomsValue(c, f.nbf, addrs)
}
})
if err != nil {
return types.Ref{}, err
}
}
return types.NewRef(v, f.nbf)
}
// Get gets a chunk by it's hash
func (f *FileValueStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
data, ok := f.chunks[h]
if !ok {
return chunks.EmptyChunk, nil
} else {
return chunks.NewChunkWithHash(h, data), nil
}
}
// GetMany gets chunks by their hashes. Chunks that are found are written to the channel.
func (f *FileValueStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
for h := range hashes {
data, ok := f.chunks[h]
if ok {
ch := chunks.NewChunkWithHash(h, data)
found(ctx, &ch)
}
}
return nil
}
// Has returns true if a chunk is present in the store, false if not
func (f *FileValueStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
_, ok := f.chunks[h]
return ok, nil
}
func (f *FileValueStore) CacheHas(h hash.Hash) bool {
_, ok := f.chunks[h]
return ok
}
func (f *FileValueStore) PurgeCaches() {
}
// HasMany returns the set of hashes that are absent from the store
func (f *FileValueStore) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
absent = make(hash.HashSet, len(hashes))
for h := range hashes {
_, ok := f.chunks[h]
if !ok {
absent[h] = struct{}{}
}
}
return absent, nil
}
func (f *FileValueStore) errorIfDangling(ctx context.Context, addrs hash.HashSet) error {
absent, err := f.HasMany(ctx, addrs)
if err != nil {
return err
}
if len(absent) != 0 {
s := absent.String()
return fmt.Errorf("Found dangling references to %s", s)
}
return nil
}
// Put puts a chunk into the store
func (f *FileValueStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
addrs := hash.NewHashSet()
err := getAddrs(c)(ctx, addrs, f.CacheHas)
if err != nil {
return err
}
err = f.errorIfDangling(ctx, addrs)
if err != nil {
return err
}
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
f.chunks[c.Hash()] = c.Data()
return nil
}
// Version returns the nbf version string
func (f *FileValueStore) Version() string {
return f.nbf.VersionString()
}
func (f *FileValueStore) AccessMode() chunks.ExclusiveAccessMode {
return chunks.ExclusiveAccessMode_Shared
}
// Rebase brings this ChunkStore into sync with the persistent storage's current root. Has no impact here
func (f *FileValueStore) Rebase(ctx context.Context) error {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
return nil
}
// Root returns the root hash
func (f *FileValueStore) Root(ctx context.Context) (hash.Hash, error) {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
return f.rootHash, nil
}
// Commit sets the root hash
func (f *FileValueStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
if f.rootHash == last {
f.rootHash = current
return true, nil
}
return false, nil
}
// Stats doesn't do anything
func (f *FileValueStore) Stats() interface{} {
return nil
}
// StatsSummary doesn't do anything
func (f *FileValueStore) StatsSummary() string {
return ""
}
// Close doesn't do anything
func (f *FileValueStore) Close() error {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
return nil
}
func (f *FileValueStore) numChunks() int {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
return len(f.chunks)
}
func (f *FileValueStore) iterChunks(cb func(ch chunks.Chunk) error) error {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
hashes := make(hash.HashSlice, 0, len(f.chunks))
for h := range f.chunks {
hashes = append(hashes, h)
}
sort.Slice(hashes, func(i, j int) bool {
return hashes[i].Less(hashes[j])
})
for _, h := range hashes {
data := f.chunks[h]
err := cb(chunks.NewChunkWithHash(h, data))
if err != nil {
return err
}
}
return nil
}
func (f *FileValueStore) PersistGhostHashes(ctx context.Context, refs hash.HashSet) error {
// Current unimplemented, but may be useful for testing someday.
panic("not implemented")
}

View File

@@ -1,316 +0,0 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package valuefile
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
)
// ErrCorruptNVF is the error used when the file being read is corrupt
var ErrCorruptNVF = errors.New("nvf file is corrupt")
// WritePrimitiveValueFile writes values to the filepath provided
func WritePrimitiveValueFile(ctx context.Context, filepath string, values ...types.Value) error {
for _, v := range values {
if !types.IsPrimitiveKind(v.Kind()) {
return errors.New("non-primitve value found")
}
}
nbf := types.Format_Default
store, err := NewFileValueStore(nbf)
if err != nil {
return err
}
return WriteValueFile(ctx, filepath, store, values...)
}
// WriteValueFile writes the values stored in the *FileValueStore to the filepath provided
func WriteValueFile(ctx context.Context, filepath string, store *FileValueStore, values ...types.Value) (err error) {
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
defer func() {
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()
if err != nil {
return err
}
return WriteToWriter(ctx, f, store, values...)
}
// WriteToWriter writes the values out to the provided writer in the value file format
func WriteToWriter(ctx context.Context, wr io.Writer, store *FileValueStore, values ...types.Value) error {
vrw := types.NewValueStore(store)
ns := tree.NewNodeStore(store)
db := datas.NewTypesDatabase(vrw, ns)
ds, err := db.GetDataset(ctx, env.DefaultInitBranch)
if err != nil {
return err
}
l, err := types.NewList(ctx, vrw, values...)
if err != nil {
return err
}
ds, err = datas.CommitValue(ctx, db, ds, l)
if err != nil {
return err
}
addr, _ := ds.MaybeHeadAddr()
err = write(wr, addr, store)
if err != nil {
return err
}
return nil
}
// write writes out:
// NomsBinFormat version string length
// NomsBinFormat version String
// Root Hash
// uint32 num chunks
//
// for each chunk:
//
// hash of chunk
// len of chunk
//
// for each chunk
//
// chunk bytes
func write(wr io.Writer, h hash.Hash, store *FileValueStore) error {
// The Write*IfNoErr functions makes the error handling code less annoying
err := iohelp.WritePrimIfNoErr(wr, uint32(len(store.nbf.VersionString())), nil)
err = iohelp.WriteIfNoErr(wr, []byte(store.nbf.VersionString()), err)
err = iohelp.WriteIfNoErr(wr, h[:], err)
err = iohelp.WritePrimIfNoErr(wr, uint32(store.numChunks()), err)
if err != nil {
return err
}
err = store.iterChunks(func(ch chunks.Chunk) error {
h := ch.Hash()
err = iohelp.WriteIfNoErr(wr, h[:], nil)
return iohelp.WritePrimIfNoErr(wr, uint32(len(ch.Data())), err)
})
if err != nil {
return err
}
err = store.iterChunks(func(ch chunks.Chunk) error {
return iohelp.WriteIfNoErr(wr, ch.Data(), nil)
})
return err
}
// ValueFile is the in memory representation of a value file.
type ValueFile struct {
Ns tree.NodeStore
Vrw types.ValueReadWriter
Values []types.Value
}
// ReadValueFile reads from the provided file and returns the values stored in the file
func ReadValueFile(ctx context.Context, filepath string) (*ValueFile, error) {
f, err := os.Open(filepath)
if err != nil {
return nil, err
}
defer f.Close()
return ReadFromReader(ctx, f)
}
// ReadFromReader reads from the provided reader which should provided access to data in the value file format and returns
// the values
func ReadFromReader(ctx context.Context, rd io.Reader) (*ValueFile, error) {
h, store, err := read(ctx, rd)
if err != nil {
return nil, err
}
vrw := types.NewValueStore(store)
v, err := vrw.ReadValue(ctx, h)
if err != nil {
return nil, err
}
rootVal, err := datas.GetCommittedValue(ctx, vrw, v)
if err != nil {
return nil, err
}
l := rootVal.(types.List)
values := make([]types.Value, l.Len())
err = l.IterAll(ctx, func(v types.Value, index uint64) error {
values[index] = v
return nil
})
if err != nil {
return nil, err
}
ns := tree.NewNodeStore(store)
return &ValueFile{
Values: values,
Ns: ns,
Vrw: vrw,
}, nil
}
// see the write section to see the value file
func read(ctx context.Context, rd io.Reader) (hash.Hash, *FileValueStore, error) {
// ErrPreservingReader allows me to ignore errors until I need to use the data
errRd := iohelp.NewErrPreservingReader(rd)
// read len of NBF version string and then read the version string and check it
fmtLen, err := errRd.ReadUint32(binary.BigEndian)
if err != nil {
if err == io.EOF {
err = fmt.Errorf("EOF read while trying to get nbf format len - %w", ErrCorruptNVF)
}
return hash.Hash{}, nil, err
}
data, err := iohelp.ReadNBytes(errRd, int(fmtLen))
if err != nil {
if err == io.EOF {
err = fmt.Errorf("EOF read while trying to get nbf format string - %w", ErrCorruptNVF)
}
return hash.Hash{}, nil, err
}
var nbf *types.NomsBinFormat
switch string(data) {
case types.Format_LD_1.VersionString():
nbf = types.Format_LD_1
case types.Format_DOLT.VersionString():
nbf = types.Format_DOLT
default:
return hash.Hash{}, nil, fmt.Errorf("unknown noms format: %s", string(data))
}
store, err := NewFileValueStore(nbf)
if err != nil {
return hash.Hash{}, nil, err
}
// read the root hash and the chunk count
hashBytes, _ := iohelp.ReadNBytes(errRd, hash.ByteLen)
numChunks, err := errRd.ReadUint32(binary.BigEndian)
if err != nil {
if err == io.EOF {
err = fmt.Errorf("EOF read while trying to read the root hash and chunk count - %w", ErrCorruptNVF)
}
return hash.Hash{}, nil, err
}
// read the hashes and sizes
type hashAndSize struct {
h hash.Hash
size uint32
}
hashesAndSizes := make([]hashAndSize, numChunks)
for i := uint32(0); i < numChunks; i++ {
chHashBytes, _ := iohelp.ReadNBytes(errRd, hash.ByteLen)
size, err := errRd.ReadUint32(binary.BigEndian)
if err != nil {
if err == io.EOF {
err = fmt.Errorf("EOF read the root hash and chunk count - %w", ErrCorruptNVF)
}
return hash.Hash{}, nil, err
}
hashesAndSizes[i] = hashAndSize{hash.New(chHashBytes), size}
}
// read the data and validate it against the expected hashes
for _, hashAndSize := range hashesAndSizes {
h := hashAndSize.h
size := hashAndSize.size
chBytes, err := iohelp.ReadNBytes(errRd, int(size))
if err != nil && err != io.EOF || err == io.EOF && uint32(len(chBytes)) != size {
if err == io.EOF {
err = fmt.Errorf("EOF read trying to read chunk - %w", ErrCorruptNVF)
}
return hash.Hash{}, nil, err
}
ch := chunks.NewChunk(chBytes)
if h != ch.Hash() {
return hash.Hash{}, nil, errors.New("data corrupted")
}
err = store.Put(ctx, ch, func(c chunks.Chunk) chunks.GetAddrsCb {
return func(_ context.Context, _ hash.HashSet, _ chunks.PendingRefExists) error { return nil }
})
if err != nil {
return hash.Hash{}, nil, err
}
}
return hash.New(hashBytes), store, nil
}

View File

@@ -1,170 +0,0 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package valuefile
import (
"os"
"path/filepath"
"testing"
"github.com/dolthub/go-mysql-server/sql"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/message"
"github.com/dolthub/dolt/go/store/prolly/shim"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
func TestReadWriteValueFile(t *testing.T) {
const numMaps = 1
const numMapValues = 1
ctx := sql.NewEmptyContext()
store, err := NewFileValueStore(types.Format_Default)
require.NoError(t, err)
var values []types.Value
for i := 0; i < numMaps; i++ {
var kvs []types.Value
for j := 0; j < numMapValues; j++ {
kvs = append(kvs, types.Int(j), types.String(uuid.New().String()))
}
m, err := types.NewMap(ctx, store, kvs...)
require.NoError(t, err)
values = append(values, m)
}
path := filepath.Join(os.TempDir(), "file.nvf")
err = WriteValueFile(ctx, path, store, values...)
require.NoError(t, err)
vf, err := ReadValueFile(ctx, path)
require.NoError(t, err)
require.NotNil(t, vf.Ns)
require.Equal(t, len(values), len(vf.Values))
for i := 0; i < len(values); i++ {
require.True(t, values[i].Equals(vf.Values[i]))
}
}
func TestRoundtripProllyMapIntoValueFile(t *testing.T) {
const numMaps = 5
const numMapEntries = 1000
ctx := sql.NewEmptyContext()
store, err := NewFileValueStore(types.Format_DOLT)
require.NoError(t, err)
oldNs := tree.NewNodeStore(store)
vrw := types.NewValueStore(store)
var values []types.Value
var expectedMaps []prolly.Map
for i := 0; i < numMaps; i++ {
m, _ := makeProllyMap(t, oldNs, numMapEntries)
expectedMaps = append(expectedMaps, m)
v := shim.ValueFromMap(m)
ref, err := vrw.WriteValue(ctx, v)
require.NoError(t, err)
values = append(values, ref)
}
path := filepath.Join(os.TempDir(), "file.nvf")
err = WriteValueFile(ctx, path, store, values...)
require.NoError(t, err)
vf, err := ReadValueFile(ctx, path)
require.NoError(t, err)
require.NotNil(t, vf.Ns)
require.Equal(t, len(values), len(vf.Values))
for i := 0; i < len(vf.Values); i++ {
ref := vf.Values[i].(types.Ref)
v, err := vrw.ReadValue(ctx, ref.TargetHash())
require.NoError(t, err)
rootNode, fileId, err := shim.NodeFromValue(v)
require.NoError(t, err)
require.Equal(t, fileId, serial.ProllyTreeNodeFileID)
m := prolly.NewMap(rootNode, vf.Ns, kd, vd)
assertProllyMapsEqual(t, expectedMaps[i], m)
}
}
func assertProllyMapsEqual(t *testing.T, expected, received prolly.Map) {
assert.Equal(t, expected.HashOf(), received.HashOf())
s, err := prolly.DebugFormat(sql.NewEmptyContext(), expected)
require.NoError(t, err)
s2, err := prolly.DebugFormat(sql.NewEmptyContext(), received)
require.NoError(t, err)
require.Equal(t, s, s2)
}
var kd = val.NewTupleDescriptor(
val.Type{Enc: val.Uint32Enc, Nullable: false},
)
var vd = val.NewTupleDescriptor(
val.Type{Enc: val.Uint32Enc, Nullable: true},
val.Type{Enc: val.Uint32Enc, Nullable: true},
val.Type{Enc: val.Uint32Enc, Nullable: true},
)
func makeProllyMap(t *testing.T, ns tree.NodeStore, count int) (prolly.Map, [][2]val.Tuple) {
ctx := sql.NewEmptyContext()
tuples, err := tree.RandomTuplePairs(ctx, count, kd, vd, ns)
require.NoError(t, err)
om := mustProllyMapFromTuples(t, kd, vd, ns, tuples)
for i := 0; i < len(tuples); i++ {
var found bool
err := om.Get(sql.NewEmptyContext(), tuples[i][0], func(k, v val.Tuple) error {
assert.Equal(t, tuples[i][0], k)
assert.Equal(t, tuples[i][1], v)
found = true
return nil
})
require.NoError(t, err)
assert.True(t, found)
}
return om, tuples
}
func mustProllyMapFromTuples(t *testing.T, kd, vd *val.TupleDesc, ns tree.NodeStore, tuples [][2]val.Tuple) prolly.Map {
ctx := sql.NewEmptyContext()
serializer := message.NewProllyMapSerializer(vd, ns.Pool())
chunker, err := tree.NewEmptyChunker(ctx, ns, serializer)
require.NoError(t, err)
for _, pair := range tuples {
err := chunker.AddPair(ctx, tree.Item(pair[0]), tree.Item(pair[1]))
require.NoError(t, err)
}
root, err := chunker.Done(ctx)
require.NoError(t, err)
return prolly.NewMap(root, ns, kd, vd)
}