Drop nbs.addr type in favor of hash.Hash type

This commit is contained in:
Neil Macneale IV
2024-03-11 18:15:17 -07:00
parent 737c0efba4
commit 3535a4908e
32 changed files with 317 additions and 304 deletions

View File

@@ -48,6 +48,7 @@ package hash
import (
"bytes"
"crypto/sha512"
"encoding/binary"
"fmt"
"regexp"
"strconv"
@@ -60,6 +61,12 @@ const (
// ByteLen is the number of bytes used to represent the Hash.
ByteLen = 20
// PrefixLen is the number of bytes used to represent the Prefix of the Hash.
PrefixLen = 8 // uint64
// SuffixLen is the number of bytes which come after the Prefix.
SuffixLen = ByteLen - PrefixLen
// StringLen is the number of characters need to represent the Hash using Base32.
StringLen = 32 // 20 * 8 / log2(32)
)
@@ -123,6 +130,16 @@ func Parse(s string) Hash {
return r
}
// Prefix returns the first 8 bytes of the hash as a unit64. Used for chunk indexing
func (h Hash) Prefix() uint64 {
return binary.BigEndian.Uint64(h[:PrefixLen])
}
// Suffix returns the last 12 bytes of the hash. Used for chunk indexing
func (h Hash) Suffix() []byte {
return h[PrefixLen:]
}
// Less compares two hashes returning whether this Hash is less than other.
func (h Hash) Less(other Hash) bool {
return h.Compare(other) < 0

View File

@@ -26,9 +26,11 @@ import (
"context"
"errors"
"time"
"github.com/dolthub/dolt/go/store/hash"
)
func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) {
func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) {
magic := make([]byte, magicNumberSize)
n, _, err := s3.ReadFromEnd(ctx, name, magic, stats)
if err != nil {
@@ -40,7 +42,7 @@ func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLim
return bytes.Equal(magic, []byte(magicNumber)), nil
}
func newAWSChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
func newAWSChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
var tra tableReaderAt
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
n, _, err := s3.ReadFromEnd(ctx, name, p, stats)

View File

@@ -36,6 +36,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/chunks"
@@ -66,7 +67,7 @@ type awsLimits struct {
partTarget, partMin, partMax uint64
}
func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (s3p awsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newAWSChunkSource(
ctx,
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
@@ -78,7 +79,7 @@ func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uin
)
}
func (s3p awsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (s3p awsTablePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
return tableExistsInChunkSource(
ctx,
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
@@ -482,7 +483,7 @@ func (s3p awsTablePersister) uploadPart(ctx context.Context, data []byte, key, u
return
}
func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, t time.Time) error {
func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error {
return chunks.ErrUnsupportedOperation
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/dolthub/dolt/go/store/blobstore"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
const (
@@ -74,7 +75,7 @@ func (bsm blobstoreManifest) ParseIfExists(ctx context.Context, stats *Stats, re
}
// Update updates the contents of the manifest in the blobstore
func (bsm blobstoreManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (bsm blobstoreManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
checker := func(upstream, contents manifestContents) error {
if contents.gcGen != upstream.gcGen {
return chunks.ErrGCGenerationExpired
@@ -85,7 +86,7 @@ func (bsm blobstoreManifest) Update(ctx context.Context, lastLock addr, newConte
return updateBSWithChecker(ctx, bsm.bs, checker, lastLock, newContents, writeHook)
}
func (bsm blobstoreManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (bsm blobstoreManifest) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
checker := func(upstream, contents manifestContents) error {
if contents.gcGen == upstream.gcGen {
return errors.New("UpdateGCGen() must update the garbage collection generation")
@@ -100,7 +101,7 @@ func (bsm blobstoreManifest) UpdateGCGen(ctx context.Context, lastLock addr, new
return updateBSWithChecker(ctx, bsm.bs, checker, lastLock, newContents, writeHook)
}
func updateBSWithChecker(ctx context.Context, bs blobstore.Blobstore, validate manifestChecker, lastLock addr, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) {
func updateBSWithChecker(ctx context.Context, bs blobstore.Blobstore, validate manifestChecker, lastLock hash.Hash, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) {
if writeHook != nil {
panic("Write hooks not supported")
}

View File

@@ -22,6 +22,7 @@ import (
"io"
"time"
"github.com/dolthub/dolt/go/store/hash"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/store/blobstore"
@@ -160,15 +161,15 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk
}
// Open a table named |name|, containing |chunkCount| chunks.
func (bsp *blobstorePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (bsp *blobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats)
}
func (bsp *blobstorePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (bsp *blobstorePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
return bsp.bs.Exists(ctx, name.String())
}
func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, t time.Time) error {
func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error {
return nil
}
@@ -275,7 +276,7 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off
return totalRead, nil
}
func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0))
if err != nil {

View File

@@ -14,18 +14,22 @@
package nbs
import "context"
import (
"context"
"github.com/dolthub/dolt/go/store/hash"
)
type chunkSourceAdapter struct {
tableReader
h addr
h hash.Hash
}
func (csa chunkSourceAdapter) hash() addr {
func (csa chunkSourceAdapter) hash() hash.Hash {
return csa.h
}
func newReaderFromIndexData(ctx context.Context, q MemoryQuotaProvider, idxData []byte, name addr, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) {
func newReaderFromIndexData(ctx context.Context, q MemoryQuotaProvider, idxData []byte, name hash.Hash, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) {
index, err := parseTableIndexByCopy(ctx, idxData, q)
if err != nil {
return nil, err

View File

@@ -18,11 +18,12 @@ import (
"crypto/sha512"
"encoding/binary"
"errors"
"hash"
gohash "hash"
"io"
"os"
"sort"
"github.com/dolthub/dolt/go/store/hash"
"github.com/golang/snappy"
)
@@ -44,7 +45,7 @@ type CmpChunkTableWriter struct {
totalCompressedData uint64
totalUncompressedData uint64
prefixes prefixIndexSlice
blockAddr *addr
blockAddr *hash.Hash
path string
}
@@ -96,7 +97,7 @@ func (tw *CmpChunkTableWriter) AddCmpChunk(c CompressedChunk) error {
// Stored in insertion order
tw.prefixes = append(tw.prefixes, prefixIndexRec{
addr(c.H),
c.H,
uint32(len(tw.prefixes)),
uint32(fullLen),
})
@@ -124,9 +125,7 @@ func (tw *CmpChunkTableWriter) Finish() (string, error) {
var h []byte
h = blockHash.Sum(h)
var blockAddr addr
copy(blockAddr[:], h)
blockAddr := hash.New(h[:hash.ByteLen])
tw.blockAddr = &blockAddr
return tw.blockAddr.String(), nil
@@ -189,7 +188,7 @@ func containsDuplicates(prefixes prefixIndexSlice) bool {
return false
}
func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) {
func (tw *CmpChunkTableWriter) writeIndex() (gohash.Hash, error) {
sort.Sort(tw.prefixes)
// We do a sanity check here to assert that we are never writing duplicate chunks into
@@ -198,13 +197,13 @@ func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) {
return nil, ErrDuplicateChunkWritten
}
pfxScratch := [addrPrefixSize]byte{}
pfxScratch := [hash.PrefixLen]byte{}
blockHash := sha512.New()
numRecords := uint32(len(tw.prefixes))
lengthsOffset := lengthsOffset(numRecords) // skip prefix and ordinal for each record
suffixesOffset := suffixesOffset(numRecords) // skip size for each record
suffixesLen := uint64(numRecords) * addrSuffixSize
suffixesLen := uint64(numRecords) * hash.SuffixLen
buff := make([]byte, suffixesLen+suffixesOffset)
var pos uint64
@@ -213,7 +212,7 @@ func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) {
// hash prefix
n := uint64(copy(buff[pos:], pfxScratch[:]))
if n != addrPrefixSize {
if n != hash.PrefixLen {
return nil, errors.New("failed to copy all data")
}
@@ -228,10 +227,10 @@ func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) {
binary.BigEndian.PutUint32(buff[offset:], pi.size)
// hash suffix
offset = suffixesOffset + uint64(pi.order)*addrSuffixSize
offset = suffixesOffset + uint64(pi.order)*hash.SuffixLen
n = uint64(copy(buff[offset:], pi.addr.Suffix()))
if n != addrSuffixSize {
if n != hash.SuffixLen {
return nil, errors.New("failed to copy all bytes")
}
}

View File

@@ -27,6 +27,7 @@ import (
"sort"
"time"
"github.com/dolthub/dolt/go/store/hash"
"golang.org/x/sync/errgroup"
)
@@ -171,8 +172,8 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
upstream, appendixSpecs = upstream.removeAppendixSpecs()
}
conjoineeSet := map[addr]struct{}{}
upstreamNames := map[addr]struct{}{}
conjoineeSet := map[hash.Hash]struct{}{}
upstreamNames := map[hash.Hash]struct{}{}
for _, spec := range upstream.specs {
upstreamNames[spec.name] = struct{}{}
}

View File

@@ -146,7 +146,7 @@ func validateManifest(item map[string]*dynamodb.AttributeValue) (valid, hasSpecs
return false, false, false
}
func (dm dynamoManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (dm dynamoManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
t1 := time.Now()
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
@@ -174,7 +174,7 @@ func (dm dynamoManifest) Update(ctx context.Context, lastLock addr, newContents
}
expr := valueEqualsExpression
if lastLock == (addr{}) {
if lastLock.IsEmpty() {
expr = valueNotExistsOrEqualsExpression
}

View File

@@ -34,7 +34,7 @@ import (
type emptyChunkSource struct{}
func (ecs emptyChunkSource) has(h addr) (bool, error) {
func (ecs emptyChunkSource) has(h hash.Hash) (bool, error) {
return false, nil
}
@@ -42,7 +42,7 @@ func (ecs emptyChunkSource) hasMany(addrs []hasRecord) (bool, error) {
return true, nil
}
func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
func (ecs emptyChunkSource) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
return nil, nil
}
@@ -62,8 +62,8 @@ func (ecs emptyChunkSource) uncompressedLen() (uint64, error) {
return 0, nil
}
func (ecs emptyChunkSource) hash() addr {
return addr{}
func (ecs emptyChunkSource) hash() hash.Hash {
return hash.Hash{}
}
func (ecs emptyChunkSource) index() (tableIndex, error) {

View File

@@ -78,8 +78,7 @@ func MaybeMigrateFileManifest(ctx context.Context, dir string) (bool, error) {
}
check := func(_, contents manifestContents) error {
var empty addr
if contents.gcGen != empty {
if !contents.gcGen.IsEmpty() {
return errors.New("migrating from v4 to v5 should result in a manifest with a 0 gcGen")
}
@@ -170,7 +169,7 @@ func (fm fileManifest) ParseIfExists(
return parseIfExists(ctx, fm.dir, readHook)
}
func (fm fileManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
func (fm fileManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
t1 := time.Now()
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
@@ -194,7 +193,7 @@ func (fm fileManifest) Update(ctx context.Context, lastLock addr, newContents ma
return updateWithChecker(ctx, fm.dir, fm.mode, checker, lastLock, newContents, writeHook)
}
func (fm fileManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
func (fm fileManifest) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
t1 := time.Now()
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
@@ -245,14 +244,14 @@ func parseV5Manifest(r io.Reader) (manifestContents, error) {
return manifestContents{}, err
}
lock, err := parseAddr(slices[1])
if err != nil {
return manifestContents{}, err
lock, ok := hash.MaybeParse(slices[1])
if !ok {
return manifestContents{}, fmt.Errorf("Could not parse lock hash: %s", slices[1])
}
gcGen, err := parseAddr(slices[3])
if err != nil {
return manifestContents{}, err
gcGen, ok := hash.MaybeParse(slices[3])
if !ok {
return manifestContents{}, fmt.Errorf("Could not parse GC generation hash: %s", slices[3])
}
return manifestContents{
@@ -329,10 +328,9 @@ func parseV4Manifest(r io.Reader) (manifestContents, error) {
return manifestContents{}, err
}
ad, err := parseAddr(slices[1])
if err != nil {
return manifestContents{}, err
ad, ok := hash.MaybeParse(slices[1])
if !ok {
return manifestContents{}, fmt.Errorf("Could not parse lock hash: %s", slices[1])
}
return manifestContents{
@@ -373,7 +371,7 @@ func parseIfExists(_ context.Context, dir string, readHook func() error) (exists
}
// updateWithChecker updates the manifest if |validate| is satisfied, callers must hold the file lock.
func updateWithChecker(_ context.Context, dir string, mode updateMode, validate manifestChecker, lastLock addr, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) {
func updateWithChecker(_ context.Context, dir string, mode updateMode, validate manifestChecker, lastLock hash.Hash, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) {
var tempManifestPath string
// Write a temporary manifest file, to be renamed over manifestFileName upon success.
@@ -450,7 +448,7 @@ func updateWithChecker(_ context.Context, dir string, mode updateMode, validate
return manifestContents{}, ferr
}
if lastLock != (addr{}) {
if !lastLock.IsEmpty() {
return manifestContents{}, errors.New("new manifest created with non 0 lock")
}

View File

@@ -36,6 +36,7 @@ import (
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/util/tempfiles"
)
@@ -67,11 +68,11 @@ type fsTablePersister struct {
var _ tablePersister = &fsTablePersister{}
var _ tableFilePersister = &fsTablePersister{}
func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (ftp *fsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q)
}
func (ftp *fsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (ftp *fsTablePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
ftp.removeMu.Lock()
defer ftp.removeMu.Unlock()
if ftp.toKeep != nil {
@@ -157,7 +158,7 @@ func (ftp *fsTablePersister) TryMoveCmpChunkTableWriter(ctx context.Context, fil
return w.FlushToFile(path)
}
func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data []byte, chunkCount uint32, stats *Stats) (cs chunkSource, err error) {
func (ftp *fsTablePersister) persistTable(ctx context.Context, name hash.Hash, data []byte, chunkCount uint32, stats *Stats) (cs chunkSource, err error) {
if chunkCount == 0 {
return emptyChunkSource{}, nil
}
@@ -316,7 +317,7 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
}, nil
}
func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error {
func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, mtime time.Time) error {
ftp.removeMu.Lock()
if ftp.toKeep != nil {
ftp.removeMu.Unlock()
@@ -368,8 +369,7 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func()
continue // not a table file
}
_, err := parseAddr(info.Name())
if err != nil {
if _, ok := hash.MaybeParse(info.Name()); !ok {
continue // not a table file
}

View File

@@ -29,18 +29,20 @@ import (
"os"
"path/filepath"
"time"
"github.com/dolthub/dolt/go/store/hash"
)
type fileTableReader struct {
tableReader
h addr
h hash.Hash
}
const (
fileBlockSize = 1 << 12
)
func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) {
func tableFileExists(ctx context.Context, dir string, h hash.Hash) (bool, error) {
path := filepath.Join(dir, h.String())
_, err := os.Stat(path)
@@ -51,7 +53,7 @@ func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) {
return err == nil, err
}
func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) {
func newFileTableReader(ctx context.Context, dir string, h hash.Hash, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) {
path := filepath.Join(dir, h.String())
var f *os.File
@@ -134,7 +136,7 @@ func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint
}, nil
}
func (ftr *fileTableReader) hash() addr {
func (ftr *fileTableReader) hash() hash.Hash {
return ftr.h
}

View File

@@ -18,6 +18,8 @@ import (
"context"
"fmt"
"strings"
"github.com/dolthub/dolt/go/store/hash"
)
type gcErrAccum map[string]error
@@ -72,10 +74,9 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister
_ = gcc.writer.Remove()
}()
var addr addr
addr, err = parseAddr(filename)
if err != nil {
return nil, err
addr, ok := hash.MaybeParse(filename)
if !ok {
return nil, fmt.Errorf("invalid filename: %s", filename)
}
exists, err := tfp.Exists(ctx, addr, uint32(gcc.writer.ChunkCount()), nil)

View File

@@ -18,6 +18,8 @@ import (
"encoding/binary"
"errors"
"io"
"github.com/dolthub/dolt/go/store/hash"
)
var (
@@ -27,7 +29,7 @@ var (
func NewIndexTransformer(src io.Reader, chunkCount int) io.Reader {
tuplesSize := chunkCount * prefixTupleSize
lengthsSize := chunkCount * lengthSize
suffixesSize := chunkCount * addrSuffixSize
suffixesSize := chunkCount * hash.SuffixLen
tupleReader := io.LimitReader(src, int64(tuplesSize))
lengthsReader := io.LimitReader(src, int64(lengthsSize))

View File

@@ -276,7 +276,7 @@ func (j *ChunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, sta
}
// Open implements tablePersister.
func (j *ChunkJournal) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (j *ChunkJournal) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
if name == journalAddr {
if err := j.maybeInit(ctx); err != nil {
return nil, err
@@ -287,12 +287,12 @@ func (j *ChunkJournal) Open(ctx context.Context, name addr, chunkCount uint32, s
}
// Exists implements tablePersister.
func (j *ChunkJournal) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (j *ChunkJournal) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
return j.persister.Exists(ctx, name, chunkCount, stats)
}
// PruneTableFiles implements tablePersister.
func (j *ChunkJournal) PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error {
func (j *ChunkJournal) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, mtime time.Time) error {
if j.backing.readOnly() {
return errReadOnlyManifest
}
@@ -326,7 +326,7 @@ func (j *ChunkJournal) Name() string {
}
// Update implements manifest.
func (j *ChunkJournal) Update(ctx context.Context, lastLock addr, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (j *ChunkJournal) Update(ctx context.Context, lastLock hash.Hash, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
if j.backing.readOnly() {
return j.contents, errReadOnlyManifest
}
@@ -372,7 +372,7 @@ func (j *ChunkJournal) Update(ctx context.Context, lastLock addr, next manifestC
}
// UpdateGCGen implements manifestGCGenUpdater.
func (j *ChunkJournal) UpdateGCGen(ctx context.Context, lastLock addr, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (j *ChunkJournal) UpdateGCGen(ctx context.Context, lastLock hash.Hash, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
if j.backing.readOnly() {
return j.contents, errReadOnlyManifest
} else if j.wr == nil {
@@ -589,7 +589,7 @@ func (jm *journalManifest) ParseIfExists(ctx context.Context, stats *Stats, read
}
// Update implements manifest.
func (jm *journalManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
func (jm *journalManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
if jm.readOnly() {
_, mc, err = jm.ParseIfExists(ctx, stats, nil)
if err != nil {
@@ -611,7 +611,7 @@ func (jm *journalManifest) Update(ctx context.Context, lastLock addr, newContent
}
// UpdateGCGen implements manifest.
func (jm *journalManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
func (jm *journalManifest) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
if jm.readOnly() {
_, mc, err = jm.ParseIfExists(ctx, stats, nil)
if err != nil {

View File

@@ -35,7 +35,7 @@ type journalChunkSource struct {
var _ chunkSource = journalChunkSource{}
func (s journalChunkSource) has(h addr) (bool, error) {
func (s journalChunkSource) has(h hash.Hash) (bool, error) {
return s.journal.hasAddr(h), nil
}
@@ -51,11 +51,11 @@ func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error)
return
}
func (s journalChunkSource) getCompressed(_ context.Context, h addr, _ *Stats) (CompressedChunk, error) {
func (s journalChunkSource) getCompressed(_ context.Context, h hash.Hash, _ *Stats) (CompressedChunk, error) {
return s.journal.getCompressedChunk(h)
}
func (s journalChunkSource) get(_ context.Context, h addr, _ *Stats) ([]byte, error) {
func (s journalChunkSource) get(_ context.Context, h hash.Hash, _ *Stats) ([]byte, error) {
cc, err := s.journal.getCompressedChunk(h)
if err != nil {
return nil, err
@@ -118,7 +118,7 @@ func (s journalChunkSource) uncompressedLen() (uint64, error) {
return s.journal.uncompressedSize(), nil
}
func (s journalChunkSource) hash() addr {
func (s journalChunkSource) hash() hash.Hash {
return journalAddr
}
@@ -170,7 +170,7 @@ func equalSpecs(left, right []tableSpec) bool {
if len(left) != len(right) {
return false
}
l := make(map[addr]struct{}, len(left))
l := make(map[hash.Hash]struct{}, len(left))
for _, s := range left {
l[s.name] = struct{}{}
}
@@ -181,8 +181,3 @@ func equalSpecs(left, right []tableSpec) bool {
}
return true
}
func emptyAddr(a addr) bool {
var b addr
return a == b
}

View File

@@ -234,11 +234,11 @@ func processIndexRecords(ctx context.Context, r io.ReadSeeker, sz int64, cb func
}
type lookup struct {
a addr
a hash.Hash
r Range
}
const lookupSize = addrSize + offsetSize + lengthSize
const lookupSize = hash.ByteLen + offsetSize + lengthSize
// serializeLookups serializes |lookups| using the table file chunk index format.
func serializeLookups(lookups []lookup) (index []byte) {
@@ -249,7 +249,7 @@ func serializeLookups(lookups []lookup) (index []byte) {
buf := index
for _, l := range lookups {
copy(buf, l.a[:])
buf = buf[addrSize:]
buf = buf[hash.ByteLen:]
binary.BigEndian.PutUint64(buf, l.r.Offset)
buf = buf[offsetSize:]
binary.BigEndian.PutUint32(buf, l.r.Length)
@@ -262,7 +262,7 @@ func deserializeLookups(index []byte) (lookups []lookup) {
lookups = make([]lookup, len(index)/lookupSize)
for i := range lookups {
copy(lookups[i].a[:], index)
index = index[addrSize:]
index = index[hash.ByteLen:]
lookups[i].r.Offset = binary.BigEndian.Uint64(index)
index = index[offsetSize:]
lookups[i].r.Length = binary.BigEndian.Uint32(index)

View File

@@ -47,7 +47,7 @@ import (
type journalRec struct {
length uint32
kind journalRecKind
address addr
address hash.Hash
payload []byte
timestamp time.Time
checksum uint32
@@ -150,7 +150,7 @@ func writeChunkRecord(buf []byte, c CompressedChunk) (n uint32) {
return
}
func writeRootHashRecord(buf []byte, root addr) (n uint32) {
func writeRootHashRecord(buf []byte, root hash.Hash) (n uint32) {
// length
l := rootHashRecordSize()
writeUint32(buf[:journalRecLenSz], uint32(l))

View File

@@ -52,11 +52,11 @@ const (
)
var (
journalAddr = addr(hash.Parse(chunkJournalAddr))
journalAddr = hash.Parse(chunkJournalAddr)
)
func isJournalAddr(a addr) bool {
return a == journalAddr
func isJournalAddr(h hash.Hash) bool {
return h == journalAddr
}
func fileExists(path string) (bool, error) {
@@ -315,7 +315,7 @@ func (wr *journalWriter) corruptIndexRecovery(ctx context.Context) (err error) {
}
// hasAddr returns true if the journal contains a chunk with addr |h|.
func (wr *journalWriter) hasAddr(h addr) (ok bool) {
func (wr *journalWriter) hasAddr(h hash.Hash) (ok bool) {
wr.lock.RLock()
defer wr.lock.RUnlock()
_, ok = wr.ranges.get(h)
@@ -323,7 +323,7 @@ func (wr *journalWriter) hasAddr(h addr) (ok bool) {
}
// getCompressedChunk reads the CompressedChunks with addr |h|.
func (wr *journalWriter) getCompressedChunk(h addr) (CompressedChunk, error) {
func (wr *journalWriter) getCompressedChunk(h hash.Hash) (CompressedChunk, error) {
wr.lock.RLock()
defer wr.lock.RUnlock()
r, ok := wr.ranges.get(h)
@@ -338,7 +338,7 @@ func (wr *journalWriter) getCompressedChunk(h addr) (CompressedChunk, error) {
}
// getRange returns a Range for the chunk with addr |h|.
func (wr *journalWriter) getRange(h addr) (rng Range, ok bool, err error) {
func (wr *journalWriter) getRange(h hash.Hash) (rng Range, ok bool, err error) {
// callers will use |rng| to read directly from the
// journal file, so we must flush here
if err = wr.maybeFlush(); err != nil {
@@ -365,7 +365,7 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error {
}
wr.unsyncd += uint64(recordLen)
_ = writeChunkRecord(buf, cc)
wr.ranges.put(addr(cc.H), rng)
wr.ranges.put(cc.H, rng)
// To fulfill our durability guarantees, we technically only need to
// file.Sync() the journal when we commit a new root chunk. However,
@@ -402,7 +402,7 @@ func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error {
return err
}
wr.currentRoot = root
n := writeRootHashRecord(buf, addr(root))
n := writeRootHashRecord(buf, root)
if err = wr.flush(); err != nil {
return err
}
@@ -582,7 +582,7 @@ type rangeIndex struct {
// novel Ranges represent most recent chunks written to
// the journal. These Ranges have not yet been written to
// a journal index record.
novel *swiss.Map[addr, Range]
novel *swiss.Map[hash.Hash, Range]
// cached Ranges are bootstrapped from an out-of-band journal
// index file. To save memory, these Ranges are keyed by a 16-byte
@@ -592,14 +592,14 @@ type rangeIndex struct {
type addr16 [16]byte
func toAddr16(full addr) (prefix addr16) {
func toAddr16(full hash.Hash) (prefix addr16) {
copy(prefix[:], full[:])
return
}
func newRangeIndex() rangeIndex {
return rangeIndex{
novel: swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel),
novel: swiss.NewMap[hash.Hash, Range](journalIndexDefaultMaxNovel),
cached: swiss.NewMap[addr16, Range](0),
}
}
@@ -608,20 +608,20 @@ func estimateRangeCount(info os.FileInfo) uint32 {
return uint32(info.Size()/32) + journalIndexDefaultMaxNovel
}
func (idx rangeIndex) get(a addr) (rng Range, ok bool) {
rng, ok = idx.novel.Get(a)
func (idx rangeIndex) get(h hash.Hash) (rng Range, ok bool) {
rng, ok = idx.novel.Get(h)
if !ok {
rng, ok = idx.cached.Get(toAddr16(a))
rng, ok = idx.cached.Get(toAddr16(h))
}
return
}
func (idx rangeIndex) put(a addr, rng Range) {
idx.novel.Put(a, rng)
func (idx rangeIndex) put(h hash.Hash, rng Range) {
idx.novel.Put(h, rng)
}
func (idx rangeIndex) putCached(a addr, rng Range) {
idx.cached.Put(toAddr16(a), rng)
func (idx rangeIndex) putCached(h hash.Hash, rng Range) {
idx.cached.Put(toAddr16(h), rng)
}
func (idx rangeIndex) count() uint32 {
@@ -634,7 +634,7 @@ func (idx rangeIndex) novelCount() int {
func (idx rangeIndex) novelLookups() (lookups []lookup) {
lookups = make([]lookup, 0, idx.novel.Count())
idx.novel.Iter(func(a addr, r Range) (stop bool) {
idx.novel.Iter(func(a hash.Hash, r Range) (stop bool) {
lookups = append(lookups, lookup{a: a, r: r})
return
})
@@ -642,10 +642,10 @@ func (idx rangeIndex) novelLookups() (lookups []lookup) {
}
func (idx rangeIndex) flatten() rangeIndex {
idx.novel.Iter(func(a addr, r Range) (stop bool) {
idx.novel.Iter(func(a hash.Hash, r Range) (stop bool) {
idx.cached.Put(toAddr16(a), r)
return
})
idx.novel = swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel)
idx.novel = swiss.NewMap[hash.Hash, Range](journalIndexDefaultMaxNovel)
return idx
}

View File

@@ -25,6 +25,7 @@ import (
"context"
"crypto/sha512"
"errors"
"fmt"
"strconv"
"sync"
"time"
@@ -74,7 +75,7 @@ type manifestUpdater interface {
// If writeHook is non-nil, it will be invoked while the implementation is
// guaranteeing exclusive access to the manifest. This allows for testing
// of race conditions.
Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error)
Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error)
}
type manifestGCGenUpdater interface {
@@ -87,7 +88,7 @@ type manifestGCGenUpdater interface {
// If writeHook is non-nil, it will be invoked while the implementation is
// guaranteeing exclusive access to the manifest. This allows for testing
// of race conditions.
UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error)
UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error)
}
// ManifestInfo is an interface for retrieving data from a manifest outside of this package
@@ -113,9 +114,9 @@ const (
type manifestContents struct {
manifestVers string
nbfVers string
lock addr
lock hash.Hash
root hash.Hash
gcGen addr
gcGen hash.Hash
specs []tableSpec
// An appendix is a list of |tableSpecs| that track an auxillary collection of
@@ -192,16 +193,16 @@ func (mc manifestContents) removeAppendixSpecs() (manifestContents, []tableSpec)
}, removed
}
func (mc manifestContents) getSpecSet() (ss map[addr]struct{}) {
func (mc manifestContents) getSpecSet() (ss map[hash.Hash]struct{}) {
return toSpecSet(mc.specs)
}
func (mc manifestContents) getAppendixSet() (ss map[addr]struct{}) {
func (mc manifestContents) getAppendixSet() (ss map[hash.Hash]struct{}) {
return toSpecSet(mc.appendix)
}
func toSpecSet(specs []tableSpec) (ss map[addr]struct{}) {
ss = make(map[addr]struct{}, len(specs))
func toSpecSet(specs []tableSpec) (ss map[hash.Hash]struct{}) {
ss = make(map[hash.Hash]struct{}, len(specs))
for _, ts := range specs {
ss[ts.name] = struct{}{}
}
@@ -209,7 +210,7 @@ func toSpecSet(specs []tableSpec) (ss map[addr]struct{}) {
}
func (mc manifestContents) size() (size uint64) {
size += uint64(len(mc.nbfVers)) + addrSize + hash.ByteLen
size += uint64(len(mc.nbfVers)) + hash.ByteLen + hash.ByteLen
for _, sp := range mc.specs {
size += uint64(len(sp.name)) + uint32Size // for sp.chunkCount
}
@@ -292,7 +293,7 @@ func (mm manifestManager) UnlockForUpdate() error {
return mm.locks.unlockForUpdate(mm.Name())
}
func (mm manifestManager) updateWillFail(lastLock addr) (cached manifestContents, doomed bool) {
func (mm manifestManager) updateWillFail(lastLock hash.Hash) (cached manifestContents, doomed bool) {
if upstream, _, hit := mm.cache.Get(mm.Name()); hit {
if lastLock != upstream.lock {
doomed, cached = true, upstream
@@ -346,7 +347,7 @@ func (mm manifestManager) Fetch(ctx context.Context, stats *Stats) (exists bool,
// Callers MUST protect uses of Update with Lock/UnlockForUpdate.
// Update does not call Lock/UnlockForUpdate() on its own because it is
// intended to be used in a larger critical section along with updateWillFail.
func (mm manifestManager) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) {
func (mm manifestManager) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) {
if upstream, _, hit := mm.cache.Get(mm.Name()); hit {
if lastLock != upstream.lock {
return upstream, nil
@@ -385,7 +386,7 @@ func (mm manifestManager) Update(ctx context.Context, lastLock addr, newContents
// UpdateGCGen will update the manifest with a new garbage collection generation.
// Callers MUST protect uses of UpdateGCGen with Lock/UnlockForUpdate.
func (mm manifestManager) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) {
func (mm manifestManager) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) {
updater, ok := mm.m.(manifestGCGenUpdater)
if !ok {
return manifestContents{}, errors.New("manifest does not support updating gc gen")
@@ -443,7 +444,7 @@ type TableSpecInfo interface {
}
type tableSpec struct {
name addr
name hash.Hash
chunkCount uint32
}
@@ -468,10 +469,10 @@ func parseSpecs(tableInfo []string) ([]tableSpec, error) {
specs := make([]tableSpec, len(tableInfo)/2)
for i := range specs {
var err error
specs[i].name, err = parseAddr(tableInfo[2*i])
if err != nil {
return nil, err
var ok bool
specs[i].name, ok = hash.MaybeParse(tableInfo[2*i])
if !ok {
return nil, fmt.Errorf("invalid table file name: %s", tableInfo[2*i])
}
c, err := strconv.ParseUint(tableInfo[2*i+1], 10, 32)
@@ -500,7 +501,7 @@ func formatSpecs(specs []tableSpec, tableInfo []string) {
// persisted manifest against the lock hash it saw last time it loaded the
// contents of a manifest. If they do not match, the client must not update
// the persisted manifest.
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) (lock addr) {
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) (lock hash.Hash) {
blockHash := sha512.New()
blockHash.Write(root[:])
for _, spec := range appendix {
@@ -512,6 +513,5 @@ func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) (
}
var h []byte
h = blockHash.Sum(h) // Appends hash to h
copy(lock[:], h)
return
return hash.New(h[:hash.ByteLen])
}

View File

@@ -54,7 +54,7 @@ func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) {
func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error) {
for _, chunk := range chunks {
res := mt.addChunk(addr(chunk.Hash()), chunk.Data())
res := mt.addChunk(chunk.Hash(), chunk.Data())
if res == chunkNotAdded {
return "", nil, errors.New("didn't create this memory table with enough space to add all the chunks")
}
@@ -75,7 +75,7 @@ func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error
}
type memTable struct {
chunks map[addr][]byte
chunks map[hash.Hash][]byte
order []hasRecord // Must maintain the invariant that these are sorted by rec.order
pendingRefs []hasRecord
maxData, totalData uint64
@@ -84,10 +84,10 @@ type memTable struct {
}
func newMemTable(memTableSize uint64) *memTable {
return &memTable{chunks: map[addr][]byte{}, maxData: memTableSize}
return &memTable{chunks: map[hash.Hash][]byte{}, maxData: memTableSize}
}
func (mt *memTable) addChunk(h addr, data []byte) addChunkResult {
func (mt *memTable) addChunk(h hash.Hash, data []byte) addChunkResult {
if len(data) == 0 {
panic("NBS blocks cannot be zero length")
}
@@ -113,10 +113,9 @@ func (mt *memTable) addChunk(h addr, data []byte) addChunkResult {
func (mt *memTable) addChildRefs(addrs hash.HashSet) {
for h := range addrs {
a := addr(h)
mt.pendingRefs = append(mt.pendingRefs, hasRecord{
a: &a,
prefix: a.Prefix(),
a: &h,
prefix: h.Prefix(),
order: len(mt.pendingRefs),
})
}
@@ -130,7 +129,7 @@ func (mt *memTable) uncompressedLen() (uint64, error) {
return mt.totalData, nil
}
func (mt *memTable) has(h addr) (bool, error) {
func (mt *memTable) has(h hash.Hash) (bool, error) {
_, has := mt.chunks[h]
return has, nil
}
@@ -157,7 +156,7 @@ func (mt *memTable) hasMany(addrs []hasRecord) (bool, error) {
return remaining, nil
}
func (mt *memTable) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
func (mt *memTable) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
return mt.chunks[h], nil
}
@@ -200,10 +199,10 @@ func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) er
return nil
}
func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []byte, count uint32, err error) {
func (mt *memTable) write(haver chunkReader, stats *Stats) (name hash.Hash, data []byte, count uint32, err error) {
numChunks := uint64(len(mt.order))
if numChunks == 0 {
return addr{}, nil, 0, fmt.Errorf("mem table cannot write with zero chunks")
return hash.Hash{}, nil, 0, fmt.Errorf("mem table cannot write with zero chunks")
}
maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData)
// todo: memory quota
@@ -215,7 +214,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []by
_, err := haver.hasMany(mt.order)
if err != nil {
return addr{}, nil, 0, err
return hash.Hash{}, nil, 0, err
}
sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write
@@ -231,7 +230,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []by
tableSize, name, err := tw.finish()
if err != nil {
return addr{}, nil, 0, err
return hash.Hash{}, nil, 0, err
}
if count > 0 {

View File

@@ -21,6 +21,7 @@ import (
"io"
"time"
"github.com/dolthub/dolt/go/store/hash"
"github.com/fatih/color"
"golang.org/x/sync/errgroup"
@@ -68,15 +69,15 @@ func (bsp *noConjoinBlobstorePersister) ConjoinAll(ctx context.Context, sources
}
// Open a table named |name|, containing |chunkCount| chunks.
func (bsp *noConjoinBlobstorePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (bsp *noConjoinBlobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats)
}
func (bsp *noConjoinBlobstorePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (bsp *noConjoinBlobstorePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
return bsp.bs.Exists(ctx, name.String())
}
func (bsp *noConjoinBlobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, t time.Time) error {
func (bsp *noConjoinBlobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error {
return nil
}

View File

@@ -36,6 +36,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/dolthub/dolt/go/store/hash"
"github.com/jpillora/backoff"
"golang.org/x/sync/errgroup"
)
@@ -47,7 +48,7 @@ const (
type s3TableReaderAt struct {
s3 *s3ObjectReader
h addr
h hash.Hash
}
func (s3tra *s3TableReaderAt) Close() error {
@@ -81,11 +82,11 @@ func (s3or *s3ObjectReader) key(k string) string {
return k
}
func (s3or *s3ObjectReader) Reader(ctx context.Context, name addr) (io.ReadCloser, error) {
func (s3or *s3ObjectReader) Reader(ctx context.Context, name hash.Hash) (io.ReadCloser, error) {
return s3or.reader(ctx, name)
}
func (s3or *s3ObjectReader) ReadAt(ctx context.Context, name addr, p []byte, off int64, stats *Stats) (n int, err error) {
func (s3or *s3ObjectReader) ReadAt(ctx context.Context, name hash.Hash, p []byte, off int64, stats *Stats) (n int, err error) {
t1 := time.Now()
defer func() {
@@ -105,7 +106,7 @@ func s3RangeHeader(off, length int64) string {
const maxS3ReadFromEndReqSize = 256 * 1024 * 1024 // 256MB
const preferredS3ReadFromEndReqSize = 128 * 1024 * 1024 // 128MB
func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name addr, p []byte, stats *Stats) (n int, sz uint64, err error) {
func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name hash.Hash, p []byte, stats *Stats) (n int, sz uint64, err error) {
defer func(t1 time.Time) {
stats.S3BytesPerRead.Sample(uint64(len(p)))
stats.S3ReadLatency.SampleTimeSince(t1)
@@ -149,7 +150,7 @@ func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name addr, p []byte
return s3or.readRange(ctx, name, p, fmt.Sprintf("%s=-%d", s3RangePrefix, len(p)))
}
func (s3or *s3ObjectReader) reader(ctx context.Context, name addr) (io.ReadCloser, error) {
func (s3or *s3ObjectReader) reader(ctx context.Context, name hash.Hash) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(s3or.bucket),
Key: aws.String(s3or.key(name.String())),
@@ -161,7 +162,7 @@ func (s3or *s3ObjectReader) reader(ctx context.Context, name addr) (io.ReadClose
return result.Body, nil
}
func (s3or *s3ObjectReader) readRange(ctx context.Context, name addr, p []byte, rangeHeader string) (n int, sz uint64, err error) {
func (s3or *s3ObjectReader) readRange(ctx context.Context, name hash.Hash, p []byte, rangeHeader string) (n int, sz uint64, err error) {
read := func() (int, uint64, error) {
if s3or.readRl != nil {
s3or.readRl <- struct{}{}

View File

@@ -105,7 +105,7 @@ type NomsBlockStore struct {
mtSize uint64
putCount uint64
hasCache *lru.TwoQueueCache[addr, struct{}]
hasCache *lru.TwoQueueCache[hash.Hash, struct{}]
stats *Stats
}
@@ -254,11 +254,9 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
var addCount int
for h, count := range updates {
a := addr(h)
if _, ok := currSpecs[a]; !ok {
if _, ok := currSpecs[h]; !ok {
addCount++
contents.specs = append(contents.specs, tableSpec{a, count})
contents.specs = append(contents.specs, tableSpec{h, count})
}
}
@@ -346,14 +344,12 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat
appendixSpecs := make([]tableSpec, 0)
var addCount int
for h, count := range updates {
a := addr(h)
if option == ManifestAppendixOption_Set {
appendixSpecs = append(appendixSpecs, tableSpec{a, count})
appendixSpecs = append(appendixSpecs, tableSpec{h, count})
} else {
if _, ok := currAppendixSpecs[a]; !ok {
if _, ok := currAppendixSpecs[h]; !ok {
addCount++
appendixSpecs = append(appendixSpecs, tableSpec{a, count})
appendixSpecs = append(appendixSpecs, tableSpec{h, count})
}
}
}
@@ -399,13 +395,12 @@ func (nbs *NomsBlockStore) checkAllManifestUpdatesExist(ctx context.Context, upd
h := h
c := c
eg.Go(func() error {
a := addr(h)
ok, err := nbs.p.Exists(ctx, a, c, nbs.stats)
ok, err := nbs.p.Exists(ctx, h, c, nbs.stats)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("missing table file referenced in UpdateManifest call: %v", a)
return fmt.Errorf("missing table file referenced in UpdateManifest call: %v", h)
}
return nil
})
@@ -475,12 +470,12 @@ func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root has
}
// Appendix table files should come first in specs
for h, c := range appendixTableFiles {
s := tableSpec{name: addr(h), chunkCount: c}
s := tableSpec{name: h, chunkCount: c}
contents.appendix = append(contents.appendix, s)
contents.specs = append(contents.specs, s)
}
for h, c := range tableFiles {
s := tableSpec{name: addr(h), chunkCount: c}
s := tableSpec{name: h, chunkCount: c}
contents.specs = append(contents.specs, s)
}
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
@@ -640,7 +635,7 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, mm manifestManager
memTableSize = defaultMemTableSize
}
hasCache, err := lru.New2Q[addr, struct{}](hasCacheSize)
hasCache, err := lru.New2Q[hash.Hash, struct{}](hasCacheSize)
if err != nil {
return nil, err
}
@@ -790,9 +785,8 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
if nbs.mt == nil {
nbs.mt = newMemTable(nbs.mtSize)
}
a := addr(ch.Hash())
addChunkRes = nbs.mt.addChunk(a, ch.Data())
addChunkRes = nbs.mt.addChunk(ch.Hash(), ch.Data())
if addChunkRes == chunkNotAdded {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
if err != nil {
@@ -802,7 +796,7 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
nbs.addPendingRefsToHasCache()
nbs.tables = ts
nbs.mt = newMemTable(nbs.mtSize)
addChunkRes = nbs.mt.addChunk(a, ch.Data())
addChunkRes = nbs.mt.addChunk(ch.Hash(), ch.Data())
}
if addChunkRes == chunkAdded || addChunkRes == chunkExists {
if nbs.keeperFunc != nil && nbs.keeperFunc(ch.Hash()) {
@@ -826,18 +820,17 @@ type refCheck func(reqs []hasRecord) (hash.HashSet, error)
func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) error {
if !root.IsEmpty() {
a := addr(root)
if _, ok := nbs.hasCache.Get(a); !ok {
if _, ok := nbs.hasCache.Get(root); !ok {
var hr [1]hasRecord
hr[0].a = &a
hr[0].prefix = a.Prefix()
hr[0].a = &root
hr[0].prefix = root.Prefix()
absent, err := checker(hr[:])
if err != nil {
return err
} else if absent.Size() > 0 {
return fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
}
nbs.hasCache.Add(a, struct{}{})
nbs.hasCache.Add(root, struct{}{})
}
}
return nil
@@ -853,14 +846,13 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
nbs.stats.ChunksPerGet.Sample(1)
}()
a := addr(h)
data, tables, err := func() ([]byte, chunkReader, error) {
var data []byte
nbs.mu.RLock()
defer nbs.mu.RUnlock()
if nbs.mt != nil {
var err error
data, err = nbs.mt.get(ctx, a, nbs.stats)
data, err = nbs.mt.get(ctx, h, nbs.stats)
if err != nil {
return nil, nil, err
@@ -877,7 +869,7 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
return chunks.NewChunkWithHash(h, data), nil
}
data, err = tables.get(ctx, a, nbs.stats)
data, err = tables.get(ctx, h, nbs.stats)
if err != nil {
return chunks.EmptyChunk, err
@@ -954,10 +946,10 @@ func toGetRecords(hashes hash.HashSet) []getRecord {
reqs := make([]getRecord, len(hashes))
idx := 0
for h := range hashes {
a := addr(h)
h := h
reqs[idx] = getRecord{
a: &a,
prefix: a.Prefix(),
a: &h,
prefix: h.Prefix(),
}
idx++
}
@@ -1001,13 +993,12 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
nbs.stats.AddressesPerHas.Sample(1)
}()
a := addr(h)
has, tables, err := func() (bool, chunkReader, error) {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
if nbs.mt != nil {
has, err := nbs.mt.has(a)
has, err := nbs.mt.has(h)
if err != nil {
return false, nil, err
@@ -1024,7 +1015,7 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
}
if !has {
has, err = tables.has(a)
has, err = tables.has(h)
if err != nil {
return false, err
@@ -1079,7 +1070,7 @@ func (nbs *NomsBlockStore) hasMany(reqs []hasRecord) (hash.HashSet, error) {
absent := hash.HashSet{}
for _, r := range reqs {
if !r.has {
absent.Insert(hash.New(r.a[:]))
absent.Insert(*r.a)
}
}
return absent, nil
@@ -1089,10 +1080,10 @@ func toHasRecords(hashes hash.HashSet) []hasRecord {
reqs := make([]hasRecord, len(hashes))
idx := 0
for h := range hashes {
a := addr(h)
h := h
reqs[idx] = hasRecord{
a: &a,
prefix: a.Prefix(),
a: &h,
prefix: h.Prefix(),
order: idx,
}
idx++
@@ -1417,7 +1408,7 @@ func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []chunks.Tab
return contents.GetRoot(), allTableFiles, appendixTableFiles, nil
}
func getTableFiles(css map[addr]chunkSource, contents manifestContents, numSpecs int, specFunc func(mc manifestContents, idx int) tableSpec) ([]chunks.TableFile, error) {
func getTableFiles(css map[hash.Hash]chunkSource, contents manifestContents, numSpecs int, specFunc func(mc manifestContents, idx int) tableSpec) ([]chunks.TableFile, error) {
tableFiles := make([]chunks.TableFile, 0)
if numSpecs == 0 {
return tableFiles, nil
@@ -1461,8 +1452,8 @@ func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) {
return size, nil
}
func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[addr]chunkSource, error) {
css := make(map[addr]chunkSource, len(nbs.tables.upstream)+len(nbs.tables.novel))
func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[hash.Hash]chunkSource, error) {
css := make(map[hash.Hash]chunkSource, len(nbs.tables.upstream)+len(nbs.tables.novel))
for _, cs := range nbs.tables.upstream {
css[cs.hash()] = cs
}
@@ -1543,10 +1534,10 @@ func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) {
func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context, checker refCheck) (err error) {
mtime := time.Now()
return nbs.p.PruneTableFiles(ctx, func() []addr {
return nbs.p.PruneTableFiles(ctx, func() []hash.Hash {
nbs.mu.Lock()
defer nbs.mu.Unlock()
keepers := make([]addr, 0, len(nbs.tables.novel)+len(nbs.tables.upstream))
keepers := make([]hash.Hash, 0, len(nbs.tables.novel)+len(nbs.tables.upstream))
for a, _ := range nbs.tables.novel {
keepers = append(keepers, a)
}

View File

@@ -26,7 +26,6 @@ import (
"context"
"crypto/sha512"
"encoding/base32"
"encoding/binary"
"hash/crc32"
"io"
@@ -123,9 +122,8 @@ import (
*/
const (
addrSize = 20
addrPrefixSize = 8
addrSuffixSize = addrSize - addrPrefixSize
// addrPrefixSize = 8
// addrSuffixSize = hash.ByteLen - addrPrefixSize
uint64Size = 8
uint32Size = 4
ordinalSize = uint32Size
@@ -134,7 +132,7 @@ const (
magicNumber = "\xff\xb5\xd8\xc2\x24\x63\xee\x50"
magicNumberSize = 8 //len(magicNumber)
footerSize = uint32Size + uint64Size + magicNumberSize
prefixTupleSize = addrPrefixSize + ordinalSize
prefixTupleSize = hash.PrefixLen + ordinalSize
checksumSize = uint32Size
maxChunkSize = 0xffffffff // Snappy won't compress slices bigger than this
)
@@ -145,50 +143,45 @@ func crc(b []byte) uint32 {
return crc32.Update(0, crcTable, b)
}
func computeAddrDefault(data []byte) addr {
// NM4 - change name?
func computeAddrDefault(data []byte) hash.Hash {
r := sha512.Sum512(data)
h := addr{}
copy(h[:], r[:addrSize])
return h
return hash.New(r[:hash.ByteLen])
}
var computeAddr = computeAddrDefault
type addr [addrSize]byte
// type addr [addrSize]byte
var encoding = base32.NewEncoding("0123456789abcdefghijklmnopqrstuv")
func (a addr) String() string {
return encoding.EncodeToString(a[:])
}
/*
NM4
func (a addr) Prefix() uint64 {
return binary.BigEndian.Uint64(a[:])
}
func (a addr) String() string {
return encoding.EncodeToString(a[:])
}
func (a addr) Suffix() []byte {
return a[addrPrefixSize:]
}
func parseAddr(str string) (addr, error) {
var h addr
_, err := encoding.Decode(h[:], []byte(str))
return h, err
}
func parseAddr(str string) (addr, error) {
var h addr
_, err := encoding.Decode(h[:], []byte(str))
return h, err
}
func ValidateAddr(s string) bool {
_, err := encoding.DecodeString(s)
return err == nil
}
*/
func ValidateAddr(s string) bool {
_, err := encoding.DecodeString(s)
return err == nil
}
type addrSlice []addr
type addrSlice []hash.Hash
func (hs addrSlice) Len() int { return len(hs) }
func (hs addrSlice) Less(i, j int) bool { return bytes.Compare(hs[i][:], hs[j][:]) < 0 }
func (hs addrSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
type hasRecord struct {
a *addr
a *hash.Hash
prefix uint64
order int
has bool
@@ -207,7 +200,7 @@ func (hs hasRecordByOrder) Less(i, j int) bool { return hs[i].order < hs[j].orde
func (hs hasRecordByOrder) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
type getRecord struct {
a *addr
a *hash.Hash
prefix uint64
found bool
}
@@ -219,21 +212,21 @@ func (hs getRecordByPrefix) Less(i, j int) bool { return hs[i].prefix < hs[j].pr
func (hs getRecordByPrefix) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
type extractRecord struct {
a addr
a hash.Hash
data []byte
err error
}
type chunkReader interface {
// has returns true if a chunk with addr |h| is present.
has(h addr) (bool, error)
has(h hash.Hash) (bool, error)
// hasMany sets hasRecord.has to true for each present hasRecord query, it returns
// true if any hasRecord query was not found in this chunkReader.
hasMany(addrs []hasRecord) (bool, error)
// get returns the chunk data for a chunk with addr |h| if present, and nil otherwise.
get(ctx context.Context, h addr, stats *Stats) ([]byte, error)
get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error)
// getMany sets getRecord.found to true, and calls |found| for each present getRecord query.
// It returns true if any getRecord query was not found in this chunkReader.
@@ -257,7 +250,7 @@ type chunkSource interface {
chunkReader
// hash returns the hash address of this chunkSource.
hash() addr
hash() hash.Hash
// opens a Reader to the first byte of the chunkData segment of this table.
reader(context.Context) (io.ReadCloser, uint64, error)
@@ -281,7 +274,7 @@ type chunkSource interface {
type chunkSources []chunkSource
type chunkSourceSet map[addr]chunkSource
type chunkSourceSet map[hash.Hash]chunkSource
func copyChunkSourceSet(s chunkSourceSet) (cp chunkSourceSet) {
cp = make(chunkSourceSet, len(s))

View File

@@ -44,16 +44,16 @@ type tableIndex interface {
// entrySuffixMatches returns true if the entry at index |idx| matches
// the suffix of the address |h|. Used by |lookup| after finding
// matching indexes based on |Prefixes|.
entrySuffixMatches(idx uint32, h *addr) (bool, error)
entrySuffixMatches(idx uint32, h *hash.Hash) (bool, error)
// indexEntry returns the |indexEntry| at |idx|. Optionally puts the
// full address of that entry in |a| if |a| is not |nil|.
indexEntry(idx uint32, a *addr) (indexEntry, error)
indexEntry(idx uint32, a *hash.Hash) (indexEntry, error)
// lookup returns an |indexEntry| for the chunk corresponding to the
// provided address |h|. Second returns is |true| if an entry exists
// and |false| otherwise.
lookup(h *addr) (indexEntry, bool, error)
lookup(h *hash.Hash) (indexEntry, bool, error)
// Ordinals returns a slice of indexes which maps the |i|th chunk in
// the indexed file to its corresponding entry in index. The |i|th
@@ -192,11 +192,11 @@ func readTableIndexByCopy(ctx context.Context, rd io.ReadSeeker, q MemoryQuotaPr
func hashSetFromTableIndex(idx tableIndex) (hash.HashSet, error) {
set := hash.NewHashSet()
for i := uint32(0); i < idx.chunkCount(); i++ {
var a addr
if _, err := idx.indexEntry(i, &a); err != nil {
var h hash.Hash
if _, err := idx.indexEntry(i, &h); err != nil {
return nil, err
}
set.Insert(hash.Hash(a))
set.Insert(h)
}
return set, nil
}
@@ -289,22 +289,22 @@ func newOnHeapTableIndex(indexBuff []byte, offsetsBuff1 []byte, count uint32, to
}, nil
}
func (ti onHeapTableIndex) entrySuffixMatches(idx uint32, h *addr) (bool, error) {
func (ti onHeapTableIndex) entrySuffixMatches(idx uint32, h *hash.Hash) (bool, error) {
ord := ti.ordinalAt(idx)
o := ord * addrSuffixSize
b := ti.suffixes[o : o+addrSuffixSize]
return bytes.Equal(h[addrPrefixSize:], b), nil
o := ord * hash.SuffixLen
b := ti.suffixes[o : o+hash.SuffixLen]
return bytes.Equal(h[hash.PrefixLen:], b), nil
}
func (ti onHeapTableIndex) indexEntry(idx uint32, a *addr) (entry indexEntry, err error) {
func (ti onHeapTableIndex) indexEntry(idx uint32, a *hash.Hash) (entry indexEntry, err error) {
prefix, ord := ti.tupleAt(idx)
if a != nil {
binary.BigEndian.PutUint64(a[:], prefix)
o := int64(addrSuffixSize * ord)
b := ti.suffixes[o : o+addrSuffixSize]
copy(a[addrPrefixSize:], b)
o := int64(hash.SuffixLen * ord)
b := ti.suffixes[o : o+hash.SuffixLen]
copy(a[hash.PrefixLen:], b)
}
return ti.getIndexEntry(ord), nil
@@ -325,7 +325,7 @@ func (ti onHeapTableIndex) getIndexEntry(ord uint32) indexEntry {
}
}
func (ti onHeapTableIndex) lookup(h *addr) (indexEntry, bool, error) {
func (ti onHeapTableIndex) lookup(h *hash.Hash) (indexEntry, bool, error) {
ord, err := ti.lookupOrdinal(h)
if err != nil {
return indexResult{}, false, err
@@ -338,7 +338,7 @@ func (ti onHeapTableIndex) lookup(h *addr) (indexEntry, bool, error) {
// lookupOrdinal returns the ordinal of |h| if present. Returns |ti.count|
// if absent.
func (ti onHeapTableIndex) lookupOrdinal(h *addr) (uint32, error) {
func (ti onHeapTableIndex) lookupOrdinal(h *hash.Hash) (uint32, error) {
prefix := h.Prefix()
for idx := ti.findPrefix(prefix); idx < ti.count && ti.prefixAt(idx) == prefix; idx++ {
@@ -364,7 +364,7 @@ func (ti onHeapTableIndex) findPrefix(prefix uint64) (idx uint32) {
h := idx + (j-idx)/2 // avoid overflow when computing h
// i ≤ h < j
o := int64(prefixTupleSize * h)
tmp := binary.BigEndian.Uint64(ti.prefixTuples[o : o+addrPrefixSize])
tmp := binary.BigEndian.Uint64(ti.prefixTuples[o : o+hash.PrefixLen])
if tmp < prefix {
idx = h + 1 // preserves f(i-1) == false
} else {
@@ -379,18 +379,18 @@ func (ti onHeapTableIndex) tupleAt(idx uint32) (prefix uint64, ord uint32) {
b := ti.prefixTuples[off : off+prefixTupleSize]
prefix = binary.BigEndian.Uint64(b[:])
ord = binary.BigEndian.Uint32(b[addrPrefixSize:])
ord = binary.BigEndian.Uint32(b[hash.PrefixLen:])
return prefix, ord
}
func (ti onHeapTableIndex) prefixAt(idx uint32) uint64 {
off := int64(prefixTupleSize * idx)
b := ti.prefixTuples[off : off+addrPrefixSize]
b := ti.prefixTuples[off : off+hash.PrefixLen]
return binary.BigEndian.Uint64(b)
}
func (ti onHeapTableIndex) ordinalAt(idx uint32) uint32 {
off := int64(prefixTupleSize*idx) + addrPrefixSize
off := int64(prefixTupleSize*idx) + hash.PrefixLen
b := ti.prefixTuples[off : off+ordinalSize]
return binary.BigEndian.Uint32(b)
}
@@ -413,7 +413,7 @@ func (ti onHeapTableIndex) ordinals() ([]uint32, error) {
// todo: |o| is not accounted for in the memory quota
o := make([]uint32, ti.count)
for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize {
b := ti.prefixTuples[off+addrPrefixSize : off+prefixTupleSize]
b := ti.prefixTuples[off+hash.PrefixLen : off+prefixTupleSize]
o[i] = binary.BigEndian.Uint32(b)
}
return o, nil
@@ -423,7 +423,7 @@ func (ti onHeapTableIndex) prefixes() ([]uint64, error) {
// todo: |p| is not accounted for in the memory quota
p := make([]uint64, ti.count)
for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize {
b := ti.prefixTuples[off : off+addrPrefixSize]
b := ti.prefixTuples[off : off+hash.PrefixLen]
p[i] = binary.BigEndian.Uint64(b)
}
return p, nil
@@ -435,14 +435,14 @@ func (ti onHeapTableIndex) hashAt(idx uint32) hash.Hash {
tuple := ti.prefixTuples[off : off+prefixTupleSize]
// Get prefix, ordinal, and suffix
prefix := tuple[:addrPrefixSize]
ord := binary.BigEndian.Uint32(tuple[addrPrefixSize:]) * addrSuffixSize
suffix := ti.suffixes[ord : ord+addrSuffixSize] // suffix is 12 bytes
prefix := tuple[:hash.PrefixLen]
ord := binary.BigEndian.Uint32(tuple[hash.PrefixLen:]) * hash.SuffixLen
suffix := ti.suffixes[ord : ord+hash.SuffixLen] // suffix is 12 bytes
// Combine prefix and suffix to get hash
buf := [hash.ByteLen]byte{}
copy(buf[:addrPrefixSize], prefix)
copy(buf[addrPrefixSize:], suffix)
copy(buf[:hash.PrefixLen], prefix)
copy(buf[hash.PrefixLen:], suffix)
return buf
}

View File

@@ -32,6 +32,7 @@ import (
"time"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
var errCacheMiss = errors.New("index cache miss")
@@ -55,15 +56,15 @@ type tablePersister interface {
ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error)
// Open a table named |name|, containing |chunkCount| chunks.
Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error)
Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error)
// Exists checks if a table named |name| exists.
Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error)
Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error)
// PruneTableFiles deletes table files which the persister would normally be responsible for and
// which are not in the included |keeper| set and have not be written or modified more recently
// than the provided |mtime|.
PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error
PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, mtime time.Time) error
AccessMode() chunks.ExclusiveAccessMode
@@ -118,7 +119,7 @@ type compactionPlan struct {
func (cp compactionPlan) suffixes() []byte {
suffixesStart := uint64(cp.chunkCount) * (prefixTupleSize + lengthSize)
return cp.mergedIndex[suffixesStart : suffixesStart+uint64(cp.chunkCount)*addrSuffixSize]
return cp.mergedIndex[suffixesStart : suffixesStart+uint64(cp.chunkCount)*hash.SuffixLen]
}
// planRangeCopyConjoin computes a conjoin plan for tablePersisters that can conjoin
@@ -221,19 +222,19 @@ func planConjoin(sources []sourceWithSize, stats *Stats) (plan compactionPlan, e
suffixesPos += uint64(n)
} else {
// Build up the index one entry at a time.
var a addr
var h hash.Hash
for i := 0; i < len(ordinals); i++ {
e, err := index.indexEntry(uint32(i), &a)
e, err := index.indexEntry(uint32(i), &h)
if err != nil {
return compactionPlan{}, err
}
li := lengthsPos + lengthSize*uint64(ordinals[i])
si := suffixesPos + addrSuffixSize*uint64(ordinals[i])
si := suffixesPos + hash.SuffixLen*uint64(ordinals[i])
binary.BigEndian.PutUint32(plan.mergedIndex[li:], e.Length())
copy(plan.mergedIndex[si:], a[addrPrefixSize:])
copy(plan.mergedIndex[si:], h[hash.PrefixLen:])
}
lengthsPos += lengthSize * uint64(len(ordinals))
suffixesPos += addrSuffixSize * uint64(len(ordinals))
suffixesPos += hash.SuffixLen * uint64(len(ordinals))
}
}
@@ -242,7 +243,7 @@ func planConjoin(sources []sourceWithSize, stats *Stats) (plan compactionPlan, e
var pfxPos uint64
for _, pi := range prefixIndexRecs {
binary.BigEndian.PutUint64(plan.mergedIndex[pfxPos:], pi.addr.Prefix())
pfxPos += addrPrefixSize
pfxPos += hash.PrefixLen
binary.BigEndian.PutUint32(plan.mergedIndex[pfxPos:], pi.order)
pfxPos += ordinalSize
}
@@ -253,12 +254,11 @@ func planConjoin(sources []sourceWithSize, stats *Stats) (plan compactionPlan, e
return plan, nil
}
func nameFromSuffixes(suffixes []byte) (name addr) {
func nameFromSuffixes(suffixes []byte) (name hash.Hash) {
sha := sha512.New()
sha.Write(suffixes)
var h []byte
h = sha.Sum(h) // Appends hash to h
copy(name[:], h)
return
return hash.New(h[:hash.ByteLen])
}

View File

@@ -233,14 +233,14 @@ func (tr tableReader) index() (tableIndex, error) {
}
// returns true iff |h| can be found in this table.
func (tr tableReader) has(h addr) (bool, error) {
func (tr tableReader) has(h hash.Hash) (bool, error) {
_, ok, err := tr.idx.lookup(&h)
return ok, err
}
// returns the storage associated with |h|, iff present. Returns nil if absent. On success,
// the returned byte slice directly references the underlying storage.
func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
func (tr tableReader) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
e, found, err := tr.idx.lookup(&h)
if err != nil {
return nil, err
@@ -283,7 +283,7 @@ func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, er
}
type offsetRec struct {
a *addr
a *hash.Hash
offset uint64
length uint32
}
@@ -639,12 +639,12 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord)
var ors offsetRecSlice
for i := uint32(0); i < tr.idx.chunkCount(); i++ {
a := new(addr)
e, err := tr.idx.indexEntry(i, a)
h := new(hash.Hash)
e, err := tr.idx.indexEntry(i, h)
if err != nil {
return err
}
ors = append(ors, offsetRec{a, e.Offset(), e.Length()})
ors = append(ors, offsetRec{h, e.Offset(), e.Length()})
}
sort.Sort(ors)
for _, or := range ors {

View File

@@ -29,6 +29,7 @@ import (
"sort"
"sync"
"github.com/dolthub/dolt/go/store/hash"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/sync/errgroup"
@@ -57,7 +58,7 @@ type tableSet struct {
rl chan struct{}
}
func (ts tableSet) has(h addr) (bool, error) {
func (ts tableSet) has(h hash.Hash) (bool, error) {
f := func(css chunkSourceSet) (bool, error) {
for _, haver := range css {
has, err := haver.has(h)
@@ -114,7 +115,7 @@ func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) {
return f(ts.upstream)
}
func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
func (ts tableSet) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
@@ -287,7 +288,7 @@ func (ts tableSet) Size() int {
// append adds a memTable to an existing tableSet, compacting |mt| and
// returning a new tableSet with newly compacted table added.
func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[addr, struct{}], stats *Stats) (tableSet, error) {
func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[hash.Hash, struct{}], stats *Stats) (tableSet, error) {
for i := range mt.pendingRefs {
if !mt.pendingRefs[i].has && hasCache.Contains(*mt.pendingRefs[i].a) {
mt.pendingRefs[i].has = true
@@ -345,7 +346,7 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats)
// deduplicate |specs|
orig := specs
specs = make([]tableSpec, 0, len(orig))
seen := map[addr]struct{}{}
seen := map[hash.Hash]struct{}{}
for _, spec := range orig {
if _, ok := seen[spec.name]; ok {
continue

View File

@@ -26,12 +26,13 @@ import (
"encoding/binary"
"errors"
"fmt"
"hash"
gohash "hash"
"sort"
"github.com/golang/snappy"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
)
// tableWriter encodes a collection of byte stream chunks into a nbs table. NOT goroutine safe.
@@ -41,7 +42,7 @@ type tableWriter struct {
totalCompressedData uint64
totalUncompressedData uint64
prefixes prefixIndexSlice
blockHash hash.Hash
blockHash gohash.Hash
snapper snappyEncoder
}
@@ -61,11 +62,11 @@ func maxTableSize(numChunks, totalData uint64) uint64 {
d.Chk.True(avgChunkSize < maxChunkSize)
maxSnappySize := snappy.MaxEncodedLen(int(avgChunkSize))
d.Chk.True(maxSnappySize > 0)
return numChunks*(prefixTupleSize+lengthSize+addrSuffixSize+checksumSize+uint64(maxSnappySize)) + footerSize
return numChunks*(prefixTupleSize+lengthSize+hash.SuffixLen+checksumSize+uint64(maxSnappySize)) + footerSize
}
func indexSize(numChunks uint32) uint64 {
return uint64(numChunks) * (addrSuffixSize + lengthSize + prefixTupleSize)
return uint64(numChunks) * (hash.SuffixLen + lengthSize + prefixTupleSize)
}
func lengthsOffset(numChunks uint32) uint64 {
@@ -88,7 +89,7 @@ func newTableWriter(buff []byte, snapper snappyEncoder) *tableWriter {
}
}
func (tw *tableWriter) addChunk(h addr, data []byte) bool {
func (tw *tableWriter) addChunk(h hash.Hash, data []byte) bool {
if len(data) == 0 {
panic("NBS blocks cannont be zero length")
}
@@ -123,11 +124,11 @@ func (tw *tableWriter) addChunk(h addr, data []byte) bool {
return true
}
func (tw *tableWriter) finish() (uncompressedLength uint64, blockAddr addr, err error) {
func (tw *tableWriter) finish() (uncompressedLength uint64, blockAddr hash.Hash, err error) {
err = tw.writeIndex()
if err != nil {
return 0, addr{}, err
return 0, hash.Hash{}, err
}
tw.writeFooter()
@@ -140,20 +141,22 @@ func (tw *tableWriter) finish() (uncompressedLength uint64, blockAddr addr, err
}
type prefixIndexRec struct {
addr addr
addr hash.Hash
order, size uint32
}
type prefixIndexSlice []prefixIndexRec
func (hs prefixIndexSlice) Len() int { return len(hs) }
func (hs prefixIndexSlice) Less(i, j int) bool { return hs[i].addr.Prefix() < hs[j].addr.Prefix() }
func (hs prefixIndexSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
func (hs prefixIndexSlice) Len() int { return len(hs) }
func (hs prefixIndexSlice) Less(i, j int) bool {
return hs[i].addr.Prefix() < hs[j].addr.Prefix()
}
func (hs prefixIndexSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
func (tw *tableWriter) writeIndex() error {
sort.Sort(tw.prefixes)
pfxScratch := [addrPrefixSize]byte{}
pfxScratch := [hash.PrefixLen]byte{}
numRecords := uint32(len(tw.prefixes))
lengthsOffset := tw.pos + lengthsOffset(numRecords) // skip prefix and ordinal for each record
@@ -163,7 +166,7 @@ func (tw *tableWriter) writeIndex() error {
// hash prefix
n := uint64(copy(tw.buff[tw.pos:], pfxScratch[:]))
if n != addrPrefixSize {
if n != hash.PrefixLen {
return errors.New("failed to copy all data")
}
@@ -178,14 +181,14 @@ func (tw *tableWriter) writeIndex() error {
binary.BigEndian.PutUint32(tw.buff[offset:], pi.size)
// hash suffix
offset = suffixesOffset + uint64(pi.order)*addrSuffixSize
offset = suffixesOffset + uint64(pi.order)*hash.SuffixLen
n = uint64(copy(tw.buff[offset:], pi.addr.Suffix()))
if n != addrSuffixSize {
if n != hash.SuffixLen {
return errors.New("failed to copy all bytes")
}
}
suffixesLen := uint64(numRecords) * addrSuffixSize
suffixesLen := uint64(numRecords) * hash.SuffixLen
tw.blockHash.Write(tw.buff[suffixesOffset : suffixesOffset+suffixesLen])
tw.pos = suffixesOffset + suffixesLen

View File

@@ -33,21 +33,21 @@ func IterChunks(ctx context.Context, rd io.ReadSeeker, cb func(chunk chunks.Chun
defer idx.Close()
seen := make(map[addr]bool)
seen := make(map[hash.Hash]bool)
for i := uint32(0); i < idx.chunkCount(); i++ {
var a addr
ie, err := idx.indexEntry(i, &a)
var h hash.Hash
ie, err := idx.indexEntry(i, &h)
if err != nil {
return err
}
if _, ok := seen[a]; !ok {
seen[a] = true
if _, ok := seen[h]; !ok {
seen[h] = true
chunkBytes, err := readNFrom(rd, ie.Offset(), ie.Length())
if err != nil {
return err
}
cmpChnk, err := NewCompressedChunk(hash.Hash(a), chunkBytes)
cmpChnk, err := NewCompressedChunk(h, chunkBytes)
if err != nil {
return err
}