diff --git a/go/store/hash/hash.go b/go/store/hash/hash.go index bf0f2ba19c..61ead00bb6 100644 --- a/go/store/hash/hash.go +++ b/go/store/hash/hash.go @@ -48,6 +48,7 @@ package hash import ( "bytes" "crypto/sha512" + "encoding/binary" "fmt" "regexp" "strconv" @@ -60,6 +61,12 @@ const ( // ByteLen is the number of bytes used to represent the Hash. ByteLen = 20 + // PrefixLen is the number of bytes used to represent the Prefix of the Hash. + PrefixLen = 8 // uint64 + + // SuffixLen is the number of bytes which come after the Prefix. + SuffixLen = ByteLen - PrefixLen + // StringLen is the number of characters need to represent the Hash using Base32. StringLen = 32 // 20 * 8 / log2(32) ) @@ -123,6 +130,16 @@ func Parse(s string) Hash { return r } +// Prefix returns the first 8 bytes of the hash as a unit64. Used for chunk indexing +func (h Hash) Prefix() uint64 { + return binary.BigEndian.Uint64(h[:PrefixLen]) +} + +// Suffix returns the last 12 bytes of the hash. Used for chunk indexing +func (h Hash) Suffix() []byte { + return h[PrefixLen:] +} + // Less compares two hashes returning whether this Hash is less than other. func (h Hash) Less(other Hash) bool { return h.Compare(other) < 0 diff --git a/go/store/nbs/aws_chunk_source.go b/go/store/nbs/aws_chunk_source.go index 47fb773c31..1733aa7a88 100644 --- a/go/store/nbs/aws_chunk_source.go +++ b/go/store/nbs/aws_chunk_source.go @@ -26,9 +26,11 @@ import ( "context" "errors" "time" + + "github.com/dolthub/dolt/go/store/hash" ) -func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) { +func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) { magic := make([]byte, magicNumberSize) n, _, err := s3.ReadFromEnd(ctx, name, magic, stats) if err != nil { @@ -40,7 +42,7 @@ func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLim return bytes.Equal(magic, []byte(magicNumber)), nil } -func newAWSChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { +func newAWSChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { var tra tableReaderAt index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error { n, _, err := s3.ReadFromEnd(ctx, name, p, stats) diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 8d76044a48..8c9f19b2b7 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -36,6 +36,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/atomicerr" "github.com/dolthub/dolt/go/store/chunks" @@ -66,7 +67,7 @@ type awsLimits struct { partTarget, partMin, partMax uint64 } -func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { +func (s3p awsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { return newAWSChunkSource( ctx, &s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, @@ -78,7 +79,7 @@ func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uin ) } -func (s3p awsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) { +func (s3p awsTablePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) { return tableExistsInChunkSource( ctx, &s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, @@ -482,7 +483,7 @@ func (s3p awsTablePersister) uploadPart(ctx context.Context, data []byte, key, u return } -func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, t time.Time) error { +func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error { return chunks.ErrUnsupportedOperation } diff --git a/go/store/nbs/bs_manifest.go b/go/store/nbs/bs_manifest.go index 46fe64771c..40ba9e4c77 100644 --- a/go/store/nbs/bs_manifest.go +++ b/go/store/nbs/bs_manifest.go @@ -21,6 +21,7 @@ import ( "github.com/dolthub/dolt/go/store/blobstore" "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" ) const ( @@ -74,7 +75,7 @@ func (bsm blobstoreManifest) ParseIfExists(ctx context.Context, stats *Stats, re } // Update updates the contents of the manifest in the blobstore -func (bsm blobstoreManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { +func (bsm blobstoreManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { checker := func(upstream, contents manifestContents) error { if contents.gcGen != upstream.gcGen { return chunks.ErrGCGenerationExpired @@ -85,7 +86,7 @@ func (bsm blobstoreManifest) Update(ctx context.Context, lastLock addr, newConte return updateBSWithChecker(ctx, bsm.bs, checker, lastLock, newContents, writeHook) } -func (bsm blobstoreManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { +func (bsm blobstoreManifest) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { checker := func(upstream, contents manifestContents) error { if contents.gcGen == upstream.gcGen { return errors.New("UpdateGCGen() must update the garbage collection generation") @@ -100,7 +101,7 @@ func (bsm blobstoreManifest) UpdateGCGen(ctx context.Context, lastLock addr, new return updateBSWithChecker(ctx, bsm.bs, checker, lastLock, newContents, writeHook) } -func updateBSWithChecker(ctx context.Context, bs blobstore.Blobstore, validate manifestChecker, lastLock addr, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) { +func updateBSWithChecker(ctx context.Context, bs blobstore.Blobstore, validate manifestChecker, lastLock hash.Hash, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) { if writeHook != nil { panic("Write hooks not supported") } diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 037c0eab96..6eae8b978b 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -22,6 +22,7 @@ import ( "io" "time" + "github.com/dolthub/dolt/go/store/hash" "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/store/blobstore" @@ -160,15 +161,15 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk } // Open a table named |name|, containing |chunkCount| chunks. -func (bsp *blobstorePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { +func (bsp *blobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats) } -func (bsp *blobstorePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) { +func (bsp *blobstorePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) { return bsp.bs.Exists(ctx, name.String()) } -func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, t time.Time) error { +func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error { return nil } @@ -275,7 +276,7 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off return totalRead, nil } -func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { +func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error { rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0)) if err != nil { diff --git a/go/store/nbs/chunk_source_adapter.go b/go/store/nbs/chunk_source_adapter.go index cefb521509..f86d7527b4 100644 --- a/go/store/nbs/chunk_source_adapter.go +++ b/go/store/nbs/chunk_source_adapter.go @@ -14,18 +14,22 @@ package nbs -import "context" +import ( + "context" + + "github.com/dolthub/dolt/go/store/hash" +) type chunkSourceAdapter struct { tableReader - h addr + h hash.Hash } -func (csa chunkSourceAdapter) hash() addr { +func (csa chunkSourceAdapter) hash() hash.Hash { return csa.h } -func newReaderFromIndexData(ctx context.Context, q MemoryQuotaProvider, idxData []byte, name addr, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) { +func newReaderFromIndexData(ctx context.Context, q MemoryQuotaProvider, idxData []byte, name hash.Hash, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) { index, err := parseTableIndexByCopy(ctx, idxData, q) if err != nil { return nil, err diff --git a/go/store/nbs/cmp_chunk_table_writer.go b/go/store/nbs/cmp_chunk_table_writer.go index 430d04fa9f..4588744efe 100644 --- a/go/store/nbs/cmp_chunk_table_writer.go +++ b/go/store/nbs/cmp_chunk_table_writer.go @@ -18,11 +18,12 @@ import ( "crypto/sha512" "encoding/binary" "errors" - "hash" + gohash "hash" "io" "os" "sort" + "github.com/dolthub/dolt/go/store/hash" "github.com/golang/snappy" ) @@ -44,7 +45,7 @@ type CmpChunkTableWriter struct { totalCompressedData uint64 totalUncompressedData uint64 prefixes prefixIndexSlice - blockAddr *addr + blockAddr *hash.Hash path string } @@ -96,7 +97,7 @@ func (tw *CmpChunkTableWriter) AddCmpChunk(c CompressedChunk) error { // Stored in insertion order tw.prefixes = append(tw.prefixes, prefixIndexRec{ - addr(c.H), + c.H, uint32(len(tw.prefixes)), uint32(fullLen), }) @@ -124,9 +125,7 @@ func (tw *CmpChunkTableWriter) Finish() (string, error) { var h []byte h = blockHash.Sum(h) - - var blockAddr addr - copy(blockAddr[:], h) + blockAddr := hash.New(h[:hash.ByteLen]) tw.blockAddr = &blockAddr return tw.blockAddr.String(), nil @@ -189,7 +188,7 @@ func containsDuplicates(prefixes prefixIndexSlice) bool { return false } -func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) { +func (tw *CmpChunkTableWriter) writeIndex() (gohash.Hash, error) { sort.Sort(tw.prefixes) // We do a sanity check here to assert that we are never writing duplicate chunks into @@ -198,13 +197,13 @@ func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) { return nil, ErrDuplicateChunkWritten } - pfxScratch := [addrPrefixSize]byte{} + pfxScratch := [hash.PrefixLen]byte{} blockHash := sha512.New() numRecords := uint32(len(tw.prefixes)) lengthsOffset := lengthsOffset(numRecords) // skip prefix and ordinal for each record suffixesOffset := suffixesOffset(numRecords) // skip size for each record - suffixesLen := uint64(numRecords) * addrSuffixSize + suffixesLen := uint64(numRecords) * hash.SuffixLen buff := make([]byte, suffixesLen+suffixesOffset) var pos uint64 @@ -213,7 +212,7 @@ func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) { // hash prefix n := uint64(copy(buff[pos:], pfxScratch[:])) - if n != addrPrefixSize { + if n != hash.PrefixLen { return nil, errors.New("failed to copy all data") } @@ -228,10 +227,10 @@ func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) { binary.BigEndian.PutUint32(buff[offset:], pi.size) // hash suffix - offset = suffixesOffset + uint64(pi.order)*addrSuffixSize + offset = suffixesOffset + uint64(pi.order)*hash.SuffixLen n = uint64(copy(buff[offset:], pi.addr.Suffix())) - if n != addrSuffixSize { + if n != hash.SuffixLen { return nil, errors.New("failed to copy all bytes") } } diff --git a/go/store/nbs/conjoiner.go b/go/store/nbs/conjoiner.go index 3c23e77f69..422f334ea3 100644 --- a/go/store/nbs/conjoiner.go +++ b/go/store/nbs/conjoiner.go @@ -27,6 +27,7 @@ import ( "sort" "time" + "github.com/dolthub/dolt/go/store/hash" "golang.org/x/sync/errgroup" ) @@ -171,8 +172,8 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, upstream, appendixSpecs = upstream.removeAppendixSpecs() } - conjoineeSet := map[addr]struct{}{} - upstreamNames := map[addr]struct{}{} + conjoineeSet := map[hash.Hash]struct{}{} + upstreamNames := map[hash.Hash]struct{}{} for _, spec := range upstream.specs { upstreamNames[spec.name] = struct{}{} } diff --git a/go/store/nbs/dynamo_manifest.go b/go/store/nbs/dynamo_manifest.go index 858772c916..883a7ab72e 100644 --- a/go/store/nbs/dynamo_manifest.go +++ b/go/store/nbs/dynamo_manifest.go @@ -146,7 +146,7 @@ func validateManifest(item map[string]*dynamodb.AttributeValue) (valid, hasSpecs return false, false, false } -func (dm dynamoManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { +func (dm dynamoManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { t1 := time.Now() defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }() @@ -174,7 +174,7 @@ func (dm dynamoManifest) Update(ctx context.Context, lastLock addr, newContents } expr := valueEqualsExpression - if lastLock == (addr{}) { + if lastLock.IsEmpty() { expr = valueNotExistsOrEqualsExpression } diff --git a/go/store/nbs/empty_chunk_source.go b/go/store/nbs/empty_chunk_source.go index ea2f082dc0..719b8539e5 100644 --- a/go/store/nbs/empty_chunk_source.go +++ b/go/store/nbs/empty_chunk_source.go @@ -34,7 +34,7 @@ import ( type emptyChunkSource struct{} -func (ecs emptyChunkSource) has(h addr) (bool, error) { +func (ecs emptyChunkSource) has(h hash.Hash) (bool, error) { return false, nil } @@ -42,7 +42,7 @@ func (ecs emptyChunkSource) hasMany(addrs []hasRecord) (bool, error) { return true, nil } -func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { +func (ecs emptyChunkSource) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { return nil, nil } @@ -62,8 +62,8 @@ func (ecs emptyChunkSource) uncompressedLen() (uint64, error) { return 0, nil } -func (ecs emptyChunkSource) hash() addr { - return addr{} +func (ecs emptyChunkSource) hash() hash.Hash { + return hash.Hash{} } func (ecs emptyChunkSource) index() (tableIndex, error) { diff --git a/go/store/nbs/file_manifest.go b/go/store/nbs/file_manifest.go index 32b4958204..970e7dcdfe 100644 --- a/go/store/nbs/file_manifest.go +++ b/go/store/nbs/file_manifest.go @@ -78,8 +78,7 @@ func MaybeMigrateFileManifest(ctx context.Context, dir string) (bool, error) { } check := func(_, contents manifestContents) error { - var empty addr - if contents.gcGen != empty { + if !contents.gcGen.IsEmpty() { return errors.New("migrating from v4 to v5 should result in a manifest with a 0 gcGen") } @@ -170,7 +169,7 @@ func (fm fileManifest) ParseIfExists( return parseIfExists(ctx, fm.dir, readHook) } -func (fm fileManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) { +func (fm fileManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) { t1 := time.Now() defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }() @@ -194,7 +193,7 @@ func (fm fileManifest) Update(ctx context.Context, lastLock addr, newContents ma return updateWithChecker(ctx, fm.dir, fm.mode, checker, lastLock, newContents, writeHook) } -func (fm fileManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) { +func (fm fileManifest) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) { t1 := time.Now() defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }() @@ -245,14 +244,14 @@ func parseV5Manifest(r io.Reader) (manifestContents, error) { return manifestContents{}, err } - lock, err := parseAddr(slices[1]) - if err != nil { - return manifestContents{}, err + lock, ok := hash.MaybeParse(slices[1]) + if !ok { + return manifestContents{}, fmt.Errorf("Could not parse lock hash: %s", slices[1]) } - gcGen, err := parseAddr(slices[3]) - if err != nil { - return manifestContents{}, err + gcGen, ok := hash.MaybeParse(slices[3]) + if !ok { + return manifestContents{}, fmt.Errorf("Could not parse GC generation hash: %s", slices[3]) } return manifestContents{ @@ -329,10 +328,9 @@ func parseV4Manifest(r io.Reader) (manifestContents, error) { return manifestContents{}, err } - ad, err := parseAddr(slices[1]) - - if err != nil { - return manifestContents{}, err + ad, ok := hash.MaybeParse(slices[1]) + if !ok { + return manifestContents{}, fmt.Errorf("Could not parse lock hash: %s", slices[1]) } return manifestContents{ @@ -373,7 +371,7 @@ func parseIfExists(_ context.Context, dir string, readHook func() error) (exists } // updateWithChecker updates the manifest if |validate| is satisfied, callers must hold the file lock. -func updateWithChecker(_ context.Context, dir string, mode updateMode, validate manifestChecker, lastLock addr, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) { +func updateWithChecker(_ context.Context, dir string, mode updateMode, validate manifestChecker, lastLock hash.Hash, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) { var tempManifestPath string // Write a temporary manifest file, to be renamed over manifestFileName upon success. @@ -450,7 +448,7 @@ func updateWithChecker(_ context.Context, dir string, mode updateMode, validate return manifestContents{}, ferr } - if lastLock != (addr{}) { + if !lastLock.IsEmpty() { return manifestContents{}, errors.New("new manifest created with non 0 lock") } diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index bbfd1ead51..caec36b9bf 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -36,6 +36,7 @@ import ( "github.com/dolthub/dolt/go/libraries/utils/file" "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/util/tempfiles" ) @@ -67,11 +68,11 @@ type fsTablePersister struct { var _ tablePersister = &fsTablePersister{} var _ tableFilePersister = &fsTablePersister{} -func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { +func (ftp *fsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q) } -func (ftp *fsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) { +func (ftp *fsTablePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) { ftp.removeMu.Lock() defer ftp.removeMu.Unlock() if ftp.toKeep != nil { @@ -157,7 +158,7 @@ func (ftp *fsTablePersister) TryMoveCmpChunkTableWriter(ctx context.Context, fil return w.FlushToFile(path) } -func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data []byte, chunkCount uint32, stats *Stats) (cs chunkSource, err error) { +func (ftp *fsTablePersister) persistTable(ctx context.Context, name hash.Hash, data []byte, chunkCount uint32, stats *Stats) (cs chunkSource, err error) { if chunkCount == 0 { return emptyChunkSource{}, nil } @@ -316,7 +317,7 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource }, nil } -func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error { +func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, mtime time.Time) error { ftp.removeMu.Lock() if ftp.toKeep != nil { ftp.removeMu.Unlock() @@ -368,8 +369,7 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func() continue // not a table file } - _, err := parseAddr(info.Name()) - if err != nil { + if _, ok := hash.MaybeParse(info.Name()); !ok { continue // not a table file } diff --git a/go/store/nbs/file_table_reader.go b/go/store/nbs/file_table_reader.go index 3dfdf367ef..e6bc119f0d 100644 --- a/go/store/nbs/file_table_reader.go +++ b/go/store/nbs/file_table_reader.go @@ -29,18 +29,20 @@ import ( "os" "path/filepath" "time" + + "github.com/dolthub/dolt/go/store/hash" ) type fileTableReader struct { tableReader - h addr + h hash.Hash } const ( fileBlockSize = 1 << 12 ) -func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) { +func tableFileExists(ctx context.Context, dir string, h hash.Hash) (bool, error) { path := filepath.Join(dir, h.String()) _, err := os.Stat(path) @@ -51,7 +53,7 @@ func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) { return err == nil, err } -func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) { +func newFileTableReader(ctx context.Context, dir string, h hash.Hash, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) { path := filepath.Join(dir, h.String()) var f *os.File @@ -134,7 +136,7 @@ func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint }, nil } -func (ftr *fileTableReader) hash() addr { +func (ftr *fileTableReader) hash() hash.Hash { return ftr.h } diff --git a/go/store/nbs/gc_copier.go b/go/store/nbs/gc_copier.go index fc735e404a..5092a5deea 100644 --- a/go/store/nbs/gc_copier.go +++ b/go/store/nbs/gc_copier.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "strings" + + "github.com/dolthub/dolt/go/store/hash" ) type gcErrAccum map[string]error @@ -72,10 +74,9 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister _ = gcc.writer.Remove() }() - var addr addr - addr, err = parseAddr(filename) - if err != nil { - return nil, err + addr, ok := hash.MaybeParse(filename) + if !ok { + return nil, fmt.Errorf("invalid filename: %s", filename) } exists, err := tfp.Exists(ctx, addr, uint32(gcc.writer.ChunkCount()), nil) diff --git a/go/store/nbs/index_transformer.go b/go/store/nbs/index_transformer.go index 59410b7a75..712b5f020a 100644 --- a/go/store/nbs/index_transformer.go +++ b/go/store/nbs/index_transformer.go @@ -18,6 +18,8 @@ import ( "encoding/binary" "errors" "io" + + "github.com/dolthub/dolt/go/store/hash" ) var ( @@ -27,7 +29,7 @@ var ( func NewIndexTransformer(src io.Reader, chunkCount int) io.Reader { tuplesSize := chunkCount * prefixTupleSize lengthsSize := chunkCount * lengthSize - suffixesSize := chunkCount * addrSuffixSize + suffixesSize := chunkCount * hash.SuffixLen tupleReader := io.LimitReader(src, int64(tuplesSize)) lengthsReader := io.LimitReader(src, int64(lengthsSize)) diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index 2020186761..3e92fe2960 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -276,7 +276,7 @@ func (j *ChunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, sta } // Open implements tablePersister. -func (j *ChunkJournal) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { +func (j *ChunkJournal) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { if name == journalAddr { if err := j.maybeInit(ctx); err != nil { return nil, err @@ -287,12 +287,12 @@ func (j *ChunkJournal) Open(ctx context.Context, name addr, chunkCount uint32, s } // Exists implements tablePersister. -func (j *ChunkJournal) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) { +func (j *ChunkJournal) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) { return j.persister.Exists(ctx, name, chunkCount, stats) } // PruneTableFiles implements tablePersister. -func (j *ChunkJournal) PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error { +func (j *ChunkJournal) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, mtime time.Time) error { if j.backing.readOnly() { return errReadOnlyManifest } @@ -326,7 +326,7 @@ func (j *ChunkJournal) Name() string { } // Update implements manifest. -func (j *ChunkJournal) Update(ctx context.Context, lastLock addr, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { +func (j *ChunkJournal) Update(ctx context.Context, lastLock hash.Hash, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { if j.backing.readOnly() { return j.contents, errReadOnlyManifest } @@ -372,7 +372,7 @@ func (j *ChunkJournal) Update(ctx context.Context, lastLock addr, next manifestC } // UpdateGCGen implements manifestGCGenUpdater. -func (j *ChunkJournal) UpdateGCGen(ctx context.Context, lastLock addr, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { +func (j *ChunkJournal) UpdateGCGen(ctx context.Context, lastLock hash.Hash, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { if j.backing.readOnly() { return j.contents, errReadOnlyManifest } else if j.wr == nil { @@ -589,7 +589,7 @@ func (jm *journalManifest) ParseIfExists(ctx context.Context, stats *Stats, read } // Update implements manifest. -func (jm *journalManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) { +func (jm *journalManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) { if jm.readOnly() { _, mc, err = jm.ParseIfExists(ctx, stats, nil) if err != nil { @@ -611,7 +611,7 @@ func (jm *journalManifest) Update(ctx context.Context, lastLock addr, newContent } // UpdateGCGen implements manifest. -func (jm *journalManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) { +func (jm *journalManifest) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) { if jm.readOnly() { _, mc, err = jm.ParseIfExists(ctx, stats, nil) if err != nil { diff --git a/go/store/nbs/journal_chunk_source.go b/go/store/nbs/journal_chunk_source.go index 36afbb646f..5751740da0 100644 --- a/go/store/nbs/journal_chunk_source.go +++ b/go/store/nbs/journal_chunk_source.go @@ -35,7 +35,7 @@ type journalChunkSource struct { var _ chunkSource = journalChunkSource{} -func (s journalChunkSource) has(h addr) (bool, error) { +func (s journalChunkSource) has(h hash.Hash) (bool, error) { return s.journal.hasAddr(h), nil } @@ -51,11 +51,11 @@ func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error) return } -func (s journalChunkSource) getCompressed(_ context.Context, h addr, _ *Stats) (CompressedChunk, error) { +func (s journalChunkSource) getCompressed(_ context.Context, h hash.Hash, _ *Stats) (CompressedChunk, error) { return s.journal.getCompressedChunk(h) } -func (s journalChunkSource) get(_ context.Context, h addr, _ *Stats) ([]byte, error) { +func (s journalChunkSource) get(_ context.Context, h hash.Hash, _ *Stats) ([]byte, error) { cc, err := s.journal.getCompressedChunk(h) if err != nil { return nil, err @@ -118,7 +118,7 @@ func (s journalChunkSource) uncompressedLen() (uint64, error) { return s.journal.uncompressedSize(), nil } -func (s journalChunkSource) hash() addr { +func (s journalChunkSource) hash() hash.Hash { return journalAddr } @@ -170,7 +170,7 @@ func equalSpecs(left, right []tableSpec) bool { if len(left) != len(right) { return false } - l := make(map[addr]struct{}, len(left)) + l := make(map[hash.Hash]struct{}, len(left)) for _, s := range left { l[s.name] = struct{}{} } @@ -181,8 +181,3 @@ func equalSpecs(left, right []tableSpec) bool { } return true } - -func emptyAddr(a addr) bool { - var b addr - return a == b -} diff --git a/go/store/nbs/journal_index_record.go b/go/store/nbs/journal_index_record.go index c1acbfdca7..aa56d06f13 100644 --- a/go/store/nbs/journal_index_record.go +++ b/go/store/nbs/journal_index_record.go @@ -234,11 +234,11 @@ func processIndexRecords(ctx context.Context, r io.ReadSeeker, sz int64, cb func } type lookup struct { - a addr + a hash.Hash r Range } -const lookupSize = addrSize + offsetSize + lengthSize +const lookupSize = hash.ByteLen + offsetSize + lengthSize // serializeLookups serializes |lookups| using the table file chunk index format. func serializeLookups(lookups []lookup) (index []byte) { @@ -249,7 +249,7 @@ func serializeLookups(lookups []lookup) (index []byte) { buf := index for _, l := range lookups { copy(buf, l.a[:]) - buf = buf[addrSize:] + buf = buf[hash.ByteLen:] binary.BigEndian.PutUint64(buf, l.r.Offset) buf = buf[offsetSize:] binary.BigEndian.PutUint32(buf, l.r.Length) @@ -262,7 +262,7 @@ func deserializeLookups(index []byte) (lookups []lookup) { lookups = make([]lookup, len(index)/lookupSize) for i := range lookups { copy(lookups[i].a[:], index) - index = index[addrSize:] + index = index[hash.ByteLen:] lookups[i].r.Offset = binary.BigEndian.Uint64(index) index = index[offsetSize:] lookups[i].r.Length = binary.BigEndian.Uint32(index) diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index a469fd044f..7891224932 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -47,7 +47,7 @@ import ( type journalRec struct { length uint32 kind journalRecKind - address addr + address hash.Hash payload []byte timestamp time.Time checksum uint32 @@ -150,7 +150,7 @@ func writeChunkRecord(buf []byte, c CompressedChunk) (n uint32) { return } -func writeRootHashRecord(buf []byte, root addr) (n uint32) { +func writeRootHashRecord(buf []byte, root hash.Hash) (n uint32) { // length l := rootHashRecordSize() writeUint32(buf[:journalRecLenSz], uint32(l)) diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index a0d0a3b01b..68a074d305 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -52,11 +52,11 @@ const ( ) var ( - journalAddr = addr(hash.Parse(chunkJournalAddr)) + journalAddr = hash.Parse(chunkJournalAddr) ) -func isJournalAddr(a addr) bool { - return a == journalAddr +func isJournalAddr(h hash.Hash) bool { + return h == journalAddr } func fileExists(path string) (bool, error) { @@ -315,7 +315,7 @@ func (wr *journalWriter) corruptIndexRecovery(ctx context.Context) (err error) { } // hasAddr returns true if the journal contains a chunk with addr |h|. -func (wr *journalWriter) hasAddr(h addr) (ok bool) { +func (wr *journalWriter) hasAddr(h hash.Hash) (ok bool) { wr.lock.RLock() defer wr.lock.RUnlock() _, ok = wr.ranges.get(h) @@ -323,7 +323,7 @@ func (wr *journalWriter) hasAddr(h addr) (ok bool) { } // getCompressedChunk reads the CompressedChunks with addr |h|. -func (wr *journalWriter) getCompressedChunk(h addr) (CompressedChunk, error) { +func (wr *journalWriter) getCompressedChunk(h hash.Hash) (CompressedChunk, error) { wr.lock.RLock() defer wr.lock.RUnlock() r, ok := wr.ranges.get(h) @@ -338,7 +338,7 @@ func (wr *journalWriter) getCompressedChunk(h addr) (CompressedChunk, error) { } // getRange returns a Range for the chunk with addr |h|. -func (wr *journalWriter) getRange(h addr) (rng Range, ok bool, err error) { +func (wr *journalWriter) getRange(h hash.Hash) (rng Range, ok bool, err error) { // callers will use |rng| to read directly from the // journal file, so we must flush here if err = wr.maybeFlush(); err != nil { @@ -365,7 +365,7 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error { } wr.unsyncd += uint64(recordLen) _ = writeChunkRecord(buf, cc) - wr.ranges.put(addr(cc.H), rng) + wr.ranges.put(cc.H, rng) // To fulfill our durability guarantees, we technically only need to // file.Sync() the journal when we commit a new root chunk. However, @@ -402,7 +402,7 @@ func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error { return err } wr.currentRoot = root - n := writeRootHashRecord(buf, addr(root)) + n := writeRootHashRecord(buf, root) if err = wr.flush(); err != nil { return err } @@ -582,7 +582,7 @@ type rangeIndex struct { // novel Ranges represent most recent chunks written to // the journal. These Ranges have not yet been written to // a journal index record. - novel *swiss.Map[addr, Range] + novel *swiss.Map[hash.Hash, Range] // cached Ranges are bootstrapped from an out-of-band journal // index file. To save memory, these Ranges are keyed by a 16-byte @@ -592,14 +592,14 @@ type rangeIndex struct { type addr16 [16]byte -func toAddr16(full addr) (prefix addr16) { +func toAddr16(full hash.Hash) (prefix addr16) { copy(prefix[:], full[:]) return } func newRangeIndex() rangeIndex { return rangeIndex{ - novel: swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel), + novel: swiss.NewMap[hash.Hash, Range](journalIndexDefaultMaxNovel), cached: swiss.NewMap[addr16, Range](0), } } @@ -608,20 +608,20 @@ func estimateRangeCount(info os.FileInfo) uint32 { return uint32(info.Size()/32) + journalIndexDefaultMaxNovel } -func (idx rangeIndex) get(a addr) (rng Range, ok bool) { - rng, ok = idx.novel.Get(a) +func (idx rangeIndex) get(h hash.Hash) (rng Range, ok bool) { + rng, ok = idx.novel.Get(h) if !ok { - rng, ok = idx.cached.Get(toAddr16(a)) + rng, ok = idx.cached.Get(toAddr16(h)) } return } -func (idx rangeIndex) put(a addr, rng Range) { - idx.novel.Put(a, rng) +func (idx rangeIndex) put(h hash.Hash, rng Range) { + idx.novel.Put(h, rng) } -func (idx rangeIndex) putCached(a addr, rng Range) { - idx.cached.Put(toAddr16(a), rng) +func (idx rangeIndex) putCached(h hash.Hash, rng Range) { + idx.cached.Put(toAddr16(h), rng) } func (idx rangeIndex) count() uint32 { @@ -634,7 +634,7 @@ func (idx rangeIndex) novelCount() int { func (idx rangeIndex) novelLookups() (lookups []lookup) { lookups = make([]lookup, 0, idx.novel.Count()) - idx.novel.Iter(func(a addr, r Range) (stop bool) { + idx.novel.Iter(func(a hash.Hash, r Range) (stop bool) { lookups = append(lookups, lookup{a: a, r: r}) return }) @@ -642,10 +642,10 @@ func (idx rangeIndex) novelLookups() (lookups []lookup) { } func (idx rangeIndex) flatten() rangeIndex { - idx.novel.Iter(func(a addr, r Range) (stop bool) { + idx.novel.Iter(func(a hash.Hash, r Range) (stop bool) { idx.cached.Put(toAddr16(a), r) return }) - idx.novel = swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel) + idx.novel = swiss.NewMap[hash.Hash, Range](journalIndexDefaultMaxNovel) return idx } diff --git a/go/store/nbs/manifest.go b/go/store/nbs/manifest.go index b524207e84..773163b55d 100644 --- a/go/store/nbs/manifest.go +++ b/go/store/nbs/manifest.go @@ -25,6 +25,7 @@ import ( "context" "crypto/sha512" "errors" + "fmt" "strconv" "sync" "time" @@ -74,7 +75,7 @@ type manifestUpdater interface { // If writeHook is non-nil, it will be invoked while the implementation is // guaranteeing exclusive access to the manifest. This allows for testing // of race conditions. - Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) + Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) } type manifestGCGenUpdater interface { @@ -87,7 +88,7 @@ type manifestGCGenUpdater interface { // If writeHook is non-nil, it will be invoked while the implementation is // guaranteeing exclusive access to the manifest. This allows for testing // of race conditions. - UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) + UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) } // ManifestInfo is an interface for retrieving data from a manifest outside of this package @@ -113,9 +114,9 @@ const ( type manifestContents struct { manifestVers string nbfVers string - lock addr + lock hash.Hash root hash.Hash - gcGen addr + gcGen hash.Hash specs []tableSpec // An appendix is a list of |tableSpecs| that track an auxillary collection of @@ -192,16 +193,16 @@ func (mc manifestContents) removeAppendixSpecs() (manifestContents, []tableSpec) }, removed } -func (mc manifestContents) getSpecSet() (ss map[addr]struct{}) { +func (mc manifestContents) getSpecSet() (ss map[hash.Hash]struct{}) { return toSpecSet(mc.specs) } -func (mc manifestContents) getAppendixSet() (ss map[addr]struct{}) { +func (mc manifestContents) getAppendixSet() (ss map[hash.Hash]struct{}) { return toSpecSet(mc.appendix) } -func toSpecSet(specs []tableSpec) (ss map[addr]struct{}) { - ss = make(map[addr]struct{}, len(specs)) +func toSpecSet(specs []tableSpec) (ss map[hash.Hash]struct{}) { + ss = make(map[hash.Hash]struct{}, len(specs)) for _, ts := range specs { ss[ts.name] = struct{}{} } @@ -209,7 +210,7 @@ func toSpecSet(specs []tableSpec) (ss map[addr]struct{}) { } func (mc manifestContents) size() (size uint64) { - size += uint64(len(mc.nbfVers)) + addrSize + hash.ByteLen + size += uint64(len(mc.nbfVers)) + hash.ByteLen + hash.ByteLen for _, sp := range mc.specs { size += uint64(len(sp.name)) + uint32Size // for sp.chunkCount } @@ -292,7 +293,7 @@ func (mm manifestManager) UnlockForUpdate() error { return mm.locks.unlockForUpdate(mm.Name()) } -func (mm manifestManager) updateWillFail(lastLock addr) (cached manifestContents, doomed bool) { +func (mm manifestManager) updateWillFail(lastLock hash.Hash) (cached manifestContents, doomed bool) { if upstream, _, hit := mm.cache.Get(mm.Name()); hit { if lastLock != upstream.lock { doomed, cached = true, upstream @@ -346,7 +347,7 @@ func (mm manifestManager) Fetch(ctx context.Context, stats *Stats) (exists bool, // Callers MUST protect uses of Update with Lock/UnlockForUpdate. // Update does not call Lock/UnlockForUpdate() on its own because it is // intended to be used in a larger critical section along with updateWillFail. -func (mm manifestManager) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) { +func (mm manifestManager) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) { if upstream, _, hit := mm.cache.Get(mm.Name()); hit { if lastLock != upstream.lock { return upstream, nil @@ -385,7 +386,7 @@ func (mm manifestManager) Update(ctx context.Context, lastLock addr, newContents // UpdateGCGen will update the manifest with a new garbage collection generation. // Callers MUST protect uses of UpdateGCGen with Lock/UnlockForUpdate. -func (mm manifestManager) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) { +func (mm manifestManager) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) { updater, ok := mm.m.(manifestGCGenUpdater) if !ok { return manifestContents{}, errors.New("manifest does not support updating gc gen") @@ -443,7 +444,7 @@ type TableSpecInfo interface { } type tableSpec struct { - name addr + name hash.Hash chunkCount uint32 } @@ -468,10 +469,10 @@ func parseSpecs(tableInfo []string) ([]tableSpec, error) { specs := make([]tableSpec, len(tableInfo)/2) for i := range specs { var err error - specs[i].name, err = parseAddr(tableInfo[2*i]) - - if err != nil { - return nil, err + var ok bool + specs[i].name, ok = hash.MaybeParse(tableInfo[2*i]) + if !ok { + return nil, fmt.Errorf("invalid table file name: %s", tableInfo[2*i]) } c, err := strconv.ParseUint(tableInfo[2*i+1], 10, 32) @@ -500,7 +501,7 @@ func formatSpecs(specs []tableSpec, tableInfo []string) { // persisted manifest against the lock hash it saw last time it loaded the // contents of a manifest. If they do not match, the client must not update // the persisted manifest. -func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) (lock addr) { +func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) (lock hash.Hash) { blockHash := sha512.New() blockHash.Write(root[:]) for _, spec := range appendix { @@ -512,6 +513,5 @@ func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) ( } var h []byte h = blockHash.Sum(h) // Appends hash to h - copy(lock[:], h) - return + return hash.New(h[:hash.ByteLen]) } diff --git a/go/store/nbs/mem_table.go b/go/store/nbs/mem_table.go index 4476979ec8..131da46df4 100644 --- a/go/store/nbs/mem_table.go +++ b/go/store/nbs/mem_table.go @@ -54,7 +54,7 @@ func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) { func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error) { for _, chunk := range chunks { - res := mt.addChunk(addr(chunk.Hash()), chunk.Data()) + res := mt.addChunk(chunk.Hash(), chunk.Data()) if res == chunkNotAdded { return "", nil, errors.New("didn't create this memory table with enough space to add all the chunks") } @@ -75,7 +75,7 @@ func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error } type memTable struct { - chunks map[addr][]byte + chunks map[hash.Hash][]byte order []hasRecord // Must maintain the invariant that these are sorted by rec.order pendingRefs []hasRecord maxData, totalData uint64 @@ -84,10 +84,10 @@ type memTable struct { } func newMemTable(memTableSize uint64) *memTable { - return &memTable{chunks: map[addr][]byte{}, maxData: memTableSize} + return &memTable{chunks: map[hash.Hash][]byte{}, maxData: memTableSize} } -func (mt *memTable) addChunk(h addr, data []byte) addChunkResult { +func (mt *memTable) addChunk(h hash.Hash, data []byte) addChunkResult { if len(data) == 0 { panic("NBS blocks cannot be zero length") } @@ -113,10 +113,9 @@ func (mt *memTable) addChunk(h addr, data []byte) addChunkResult { func (mt *memTable) addChildRefs(addrs hash.HashSet) { for h := range addrs { - a := addr(h) mt.pendingRefs = append(mt.pendingRefs, hasRecord{ - a: &a, - prefix: a.Prefix(), + a: &h, + prefix: h.Prefix(), order: len(mt.pendingRefs), }) } @@ -130,7 +129,7 @@ func (mt *memTable) uncompressedLen() (uint64, error) { return mt.totalData, nil } -func (mt *memTable) has(h addr) (bool, error) { +func (mt *memTable) has(h hash.Hash) (bool, error) { _, has := mt.chunks[h] return has, nil } @@ -157,7 +156,7 @@ func (mt *memTable) hasMany(addrs []hasRecord) (bool, error) { return remaining, nil } -func (mt *memTable) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { +func (mt *memTable) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { return mt.chunks[h], nil } @@ -200,10 +199,10 @@ func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) er return nil } -func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []byte, count uint32, err error) { +func (mt *memTable) write(haver chunkReader, stats *Stats) (name hash.Hash, data []byte, count uint32, err error) { numChunks := uint64(len(mt.order)) if numChunks == 0 { - return addr{}, nil, 0, fmt.Errorf("mem table cannot write with zero chunks") + return hash.Hash{}, nil, 0, fmt.Errorf("mem table cannot write with zero chunks") } maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData) // todo: memory quota @@ -215,7 +214,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []by _, err := haver.hasMany(mt.order) if err != nil { - return addr{}, nil, 0, err + return hash.Hash{}, nil, 0, err } sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write @@ -231,7 +230,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []by tableSize, name, err := tw.finish() if err != nil { - return addr{}, nil, 0, err + return hash.Hash{}, nil, 0, err } if count > 0 { diff --git a/go/store/nbs/no_conjoin_bs_persister.go b/go/store/nbs/no_conjoin_bs_persister.go index baf031c443..6557d61d29 100644 --- a/go/store/nbs/no_conjoin_bs_persister.go +++ b/go/store/nbs/no_conjoin_bs_persister.go @@ -21,6 +21,7 @@ import ( "io" "time" + "github.com/dolthub/dolt/go/store/hash" "github.com/fatih/color" "golang.org/x/sync/errgroup" @@ -68,15 +69,15 @@ func (bsp *noConjoinBlobstorePersister) ConjoinAll(ctx context.Context, sources } // Open a table named |name|, containing |chunkCount| chunks. -func (bsp *noConjoinBlobstorePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { +func (bsp *noConjoinBlobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats) } -func (bsp *noConjoinBlobstorePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) { +func (bsp *noConjoinBlobstorePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) { return bsp.bs.Exists(ctx, name.String()) } -func (bsp *noConjoinBlobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, t time.Time) error { +func (bsp *noConjoinBlobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error { return nil } diff --git a/go/store/nbs/s3_table_reader.go b/go/store/nbs/s3_table_reader.go index 05db80deb0..9931064f9a 100644 --- a/go/store/nbs/s3_table_reader.go +++ b/go/store/nbs/s3_table_reader.go @@ -36,6 +36,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/dolthub/dolt/go/store/hash" "github.com/jpillora/backoff" "golang.org/x/sync/errgroup" ) @@ -47,7 +48,7 @@ const ( type s3TableReaderAt struct { s3 *s3ObjectReader - h addr + h hash.Hash } func (s3tra *s3TableReaderAt) Close() error { @@ -81,11 +82,11 @@ func (s3or *s3ObjectReader) key(k string) string { return k } -func (s3or *s3ObjectReader) Reader(ctx context.Context, name addr) (io.ReadCloser, error) { +func (s3or *s3ObjectReader) Reader(ctx context.Context, name hash.Hash) (io.ReadCloser, error) { return s3or.reader(ctx, name) } -func (s3or *s3ObjectReader) ReadAt(ctx context.Context, name addr, p []byte, off int64, stats *Stats) (n int, err error) { +func (s3or *s3ObjectReader) ReadAt(ctx context.Context, name hash.Hash, p []byte, off int64, stats *Stats) (n int, err error) { t1 := time.Now() defer func() { @@ -105,7 +106,7 @@ func s3RangeHeader(off, length int64) string { const maxS3ReadFromEndReqSize = 256 * 1024 * 1024 // 256MB const preferredS3ReadFromEndReqSize = 128 * 1024 * 1024 // 128MB -func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name addr, p []byte, stats *Stats) (n int, sz uint64, err error) { +func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name hash.Hash, p []byte, stats *Stats) (n int, sz uint64, err error) { defer func(t1 time.Time) { stats.S3BytesPerRead.Sample(uint64(len(p))) stats.S3ReadLatency.SampleTimeSince(t1) @@ -149,7 +150,7 @@ func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name addr, p []byte return s3or.readRange(ctx, name, p, fmt.Sprintf("%s=-%d", s3RangePrefix, len(p))) } -func (s3or *s3ObjectReader) reader(ctx context.Context, name addr) (io.ReadCloser, error) { +func (s3or *s3ObjectReader) reader(ctx context.Context, name hash.Hash) (io.ReadCloser, error) { input := &s3.GetObjectInput{ Bucket: aws.String(s3or.bucket), Key: aws.String(s3or.key(name.String())), @@ -161,7 +162,7 @@ func (s3or *s3ObjectReader) reader(ctx context.Context, name addr) (io.ReadClose return result.Body, nil } -func (s3or *s3ObjectReader) readRange(ctx context.Context, name addr, p []byte, rangeHeader string) (n int, sz uint64, err error) { +func (s3or *s3ObjectReader) readRange(ctx context.Context, name hash.Hash, p []byte, rangeHeader string) (n int, sz uint64, err error) { read := func() (int, uint64, error) { if s3or.readRl != nil { s3or.readRl <- struct{}{} diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index adce49963f..d77fafb6f6 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -105,7 +105,7 @@ type NomsBlockStore struct { mtSize uint64 putCount uint64 - hasCache *lru.TwoQueueCache[addr, struct{}] + hasCache *lru.TwoQueueCache[hash.Hash, struct{}] stats *Stats } @@ -254,11 +254,9 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash. var addCount int for h, count := range updates { - a := addr(h) - - if _, ok := currSpecs[a]; !ok { + if _, ok := currSpecs[h]; !ok { addCount++ - contents.specs = append(contents.specs, tableSpec{a, count}) + contents.specs = append(contents.specs, tableSpec{h, count}) } } @@ -346,14 +344,12 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat appendixSpecs := make([]tableSpec, 0) var addCount int for h, count := range updates { - a := addr(h) - if option == ManifestAppendixOption_Set { - appendixSpecs = append(appendixSpecs, tableSpec{a, count}) + appendixSpecs = append(appendixSpecs, tableSpec{h, count}) } else { - if _, ok := currAppendixSpecs[a]; !ok { + if _, ok := currAppendixSpecs[h]; !ok { addCount++ - appendixSpecs = append(appendixSpecs, tableSpec{a, count}) + appendixSpecs = append(appendixSpecs, tableSpec{h, count}) } } } @@ -399,13 +395,12 @@ func (nbs *NomsBlockStore) checkAllManifestUpdatesExist(ctx context.Context, upd h := h c := c eg.Go(func() error { - a := addr(h) - ok, err := nbs.p.Exists(ctx, a, c, nbs.stats) + ok, err := nbs.p.Exists(ctx, h, c, nbs.stats) if err != nil { return err } if !ok { - return fmt.Errorf("missing table file referenced in UpdateManifest call: %v", a) + return fmt.Errorf("missing table file referenced in UpdateManifest call: %v", h) } return nil }) @@ -475,12 +470,12 @@ func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root has } // Appendix table files should come first in specs for h, c := range appendixTableFiles { - s := tableSpec{name: addr(h), chunkCount: c} + s := tableSpec{name: h, chunkCount: c} contents.appendix = append(contents.appendix, s) contents.specs = append(contents.specs, s) } for h, c := range tableFiles { - s := tableSpec{name: addr(h), chunkCount: c} + s := tableSpec{name: h, chunkCount: c} contents.specs = append(contents.specs, s) } contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix) @@ -640,7 +635,7 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, mm manifestManager memTableSize = defaultMemTableSize } - hasCache, err := lru.New2Q[addr, struct{}](hasCacheSize) + hasCache, err := lru.New2Q[hash.Hash, struct{}](hasCacheSize) if err != nil { return nil, err } @@ -790,9 +785,8 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs if nbs.mt == nil { nbs.mt = newMemTable(nbs.mtSize) } - a := addr(ch.Hash()) - addChunkRes = nbs.mt.addChunk(a, ch.Data()) + addChunkRes = nbs.mt.addChunk(ch.Hash(), ch.Data()) if addChunkRes == chunkNotAdded { ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats) if err != nil { @@ -802,7 +796,7 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs nbs.addPendingRefsToHasCache() nbs.tables = ts nbs.mt = newMemTable(nbs.mtSize) - addChunkRes = nbs.mt.addChunk(a, ch.Data()) + addChunkRes = nbs.mt.addChunk(ch.Hash(), ch.Data()) } if addChunkRes == chunkAdded || addChunkRes == chunkExists { if nbs.keeperFunc != nil && nbs.keeperFunc(ch.Hash()) { @@ -826,18 +820,17 @@ type refCheck func(reqs []hasRecord) (hash.HashSet, error) func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) error { if !root.IsEmpty() { - a := addr(root) - if _, ok := nbs.hasCache.Get(a); !ok { + if _, ok := nbs.hasCache.Get(root); !ok { var hr [1]hasRecord - hr[0].a = &a - hr[0].prefix = a.Prefix() + hr[0].a = &root + hr[0].prefix = root.Prefix() absent, err := checker(hr[:]) if err != nil { return err } else if absent.Size() > 0 { return fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String()) } - nbs.hasCache.Add(a, struct{}{}) + nbs.hasCache.Add(root, struct{}{}) } } return nil @@ -853,14 +846,13 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, nbs.stats.ChunksPerGet.Sample(1) }() - a := addr(h) data, tables, err := func() ([]byte, chunkReader, error) { var data []byte nbs.mu.RLock() defer nbs.mu.RUnlock() if nbs.mt != nil { var err error - data, err = nbs.mt.get(ctx, a, nbs.stats) + data, err = nbs.mt.get(ctx, h, nbs.stats) if err != nil { return nil, nil, err @@ -877,7 +869,7 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, return chunks.NewChunkWithHash(h, data), nil } - data, err = tables.get(ctx, a, nbs.stats) + data, err = tables.get(ctx, h, nbs.stats) if err != nil { return chunks.EmptyChunk, err @@ -954,10 +946,10 @@ func toGetRecords(hashes hash.HashSet) []getRecord { reqs := make([]getRecord, len(hashes)) idx := 0 for h := range hashes { - a := addr(h) + h := h reqs[idx] = getRecord{ - a: &a, - prefix: a.Prefix(), + a: &h, + prefix: h.Prefix(), } idx++ } @@ -1001,13 +993,12 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) { nbs.stats.AddressesPerHas.Sample(1) }() - a := addr(h) has, tables, err := func() (bool, chunkReader, error) { nbs.mu.RLock() defer nbs.mu.RUnlock() if nbs.mt != nil { - has, err := nbs.mt.has(a) + has, err := nbs.mt.has(h) if err != nil { return false, nil, err @@ -1024,7 +1015,7 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) { } if !has { - has, err = tables.has(a) + has, err = tables.has(h) if err != nil { return false, err @@ -1079,7 +1070,7 @@ func (nbs *NomsBlockStore) hasMany(reqs []hasRecord) (hash.HashSet, error) { absent := hash.HashSet{} for _, r := range reqs { if !r.has { - absent.Insert(hash.New(r.a[:])) + absent.Insert(*r.a) } } return absent, nil @@ -1089,10 +1080,10 @@ func toHasRecords(hashes hash.HashSet) []hasRecord { reqs := make([]hasRecord, len(hashes)) idx := 0 for h := range hashes { - a := addr(h) + h := h reqs[idx] = hasRecord{ - a: &a, - prefix: a.Prefix(), + a: &h, + prefix: h.Prefix(), order: idx, } idx++ @@ -1417,7 +1408,7 @@ func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []chunks.Tab return contents.GetRoot(), allTableFiles, appendixTableFiles, nil } -func getTableFiles(css map[addr]chunkSource, contents manifestContents, numSpecs int, specFunc func(mc manifestContents, idx int) tableSpec) ([]chunks.TableFile, error) { +func getTableFiles(css map[hash.Hash]chunkSource, contents manifestContents, numSpecs int, specFunc func(mc manifestContents, idx int) tableSpec) ([]chunks.TableFile, error) { tableFiles := make([]chunks.TableFile, 0) if numSpecs == 0 { return tableFiles, nil @@ -1461,8 +1452,8 @@ func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) { return size, nil } -func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[addr]chunkSource, error) { - css := make(map[addr]chunkSource, len(nbs.tables.upstream)+len(nbs.tables.novel)) +func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[hash.Hash]chunkSource, error) { + css := make(map[hash.Hash]chunkSource, len(nbs.tables.upstream)+len(nbs.tables.novel)) for _, cs := range nbs.tables.upstream { css[cs.hash()] = cs } @@ -1543,10 +1534,10 @@ func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) { func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context, checker refCheck) (err error) { mtime := time.Now() - return nbs.p.PruneTableFiles(ctx, func() []addr { + return nbs.p.PruneTableFiles(ctx, func() []hash.Hash { nbs.mu.Lock() defer nbs.mu.Unlock() - keepers := make([]addr, 0, len(nbs.tables.novel)+len(nbs.tables.upstream)) + keepers := make([]hash.Hash, 0, len(nbs.tables.novel)+len(nbs.tables.upstream)) for a, _ := range nbs.tables.novel { keepers = append(keepers, a) } diff --git a/go/store/nbs/table.go b/go/store/nbs/table.go index 39a8133c9e..7e7e25c825 100644 --- a/go/store/nbs/table.go +++ b/go/store/nbs/table.go @@ -26,7 +26,6 @@ import ( "context" "crypto/sha512" "encoding/base32" - "encoding/binary" "hash/crc32" "io" @@ -123,9 +122,8 @@ import ( */ const ( - addrSize = 20 - addrPrefixSize = 8 - addrSuffixSize = addrSize - addrPrefixSize + // addrPrefixSize = 8 + // addrSuffixSize = hash.ByteLen - addrPrefixSize uint64Size = 8 uint32Size = 4 ordinalSize = uint32Size @@ -134,7 +132,7 @@ const ( magicNumber = "\xff\xb5\xd8\xc2\x24\x63\xee\x50" magicNumberSize = 8 //len(magicNumber) footerSize = uint32Size + uint64Size + magicNumberSize - prefixTupleSize = addrPrefixSize + ordinalSize + prefixTupleSize = hash.PrefixLen + ordinalSize checksumSize = uint32Size maxChunkSize = 0xffffffff // Snappy won't compress slices bigger than this ) @@ -145,50 +143,45 @@ func crc(b []byte) uint32 { return crc32.Update(0, crcTable, b) } -func computeAddrDefault(data []byte) addr { +// NM4 - change name? +func computeAddrDefault(data []byte) hash.Hash { r := sha512.Sum512(data) - h := addr{} - copy(h[:], r[:addrSize]) - return h + return hash.New(r[:hash.ByteLen]) } var computeAddr = computeAddrDefault -type addr [addrSize]byte +// type addr [addrSize]byte var encoding = base32.NewEncoding("0123456789abcdefghijklmnopqrstuv") -func (a addr) String() string { - return encoding.EncodeToString(a[:]) -} +/* + NM4 -func (a addr) Prefix() uint64 { - return binary.BigEndian.Uint64(a[:]) -} + func (a addr) String() string { + return encoding.EncodeToString(a[:]) + } -func (a addr) Suffix() []byte { - return a[addrPrefixSize:] -} + func parseAddr(str string) (addr, error) { + var h addr + _, err := encoding.Decode(h[:], []byte(str)) + return h, err + } -func parseAddr(str string) (addr, error) { - var h addr - _, err := encoding.Decode(h[:], []byte(str)) - return h, err -} + func ValidateAddr(s string) bool { + _, err := encoding.DecodeString(s) + return err == nil + } +*/ -func ValidateAddr(s string) bool { - _, err := encoding.DecodeString(s) - return err == nil -} - -type addrSlice []addr +type addrSlice []hash.Hash func (hs addrSlice) Len() int { return len(hs) } func (hs addrSlice) Less(i, j int) bool { return bytes.Compare(hs[i][:], hs[j][:]) < 0 } func (hs addrSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] } type hasRecord struct { - a *addr + a *hash.Hash prefix uint64 order int has bool @@ -207,7 +200,7 @@ func (hs hasRecordByOrder) Less(i, j int) bool { return hs[i].order < hs[j].orde func (hs hasRecordByOrder) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] } type getRecord struct { - a *addr + a *hash.Hash prefix uint64 found bool } @@ -219,21 +212,21 @@ func (hs getRecordByPrefix) Less(i, j int) bool { return hs[i].prefix < hs[j].pr func (hs getRecordByPrefix) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] } type extractRecord struct { - a addr + a hash.Hash data []byte err error } type chunkReader interface { // has returns true if a chunk with addr |h| is present. - has(h addr) (bool, error) + has(h hash.Hash) (bool, error) // hasMany sets hasRecord.has to true for each present hasRecord query, it returns // true if any hasRecord query was not found in this chunkReader. hasMany(addrs []hasRecord) (bool, error) // get returns the chunk data for a chunk with addr |h| if present, and nil otherwise. - get(ctx context.Context, h addr, stats *Stats) ([]byte, error) + get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) // getMany sets getRecord.found to true, and calls |found| for each present getRecord query. // It returns true if any getRecord query was not found in this chunkReader. @@ -257,7 +250,7 @@ type chunkSource interface { chunkReader // hash returns the hash address of this chunkSource. - hash() addr + hash() hash.Hash // opens a Reader to the first byte of the chunkData segment of this table. reader(context.Context) (io.ReadCloser, uint64, error) @@ -281,7 +274,7 @@ type chunkSource interface { type chunkSources []chunkSource -type chunkSourceSet map[addr]chunkSource +type chunkSourceSet map[hash.Hash]chunkSource func copyChunkSourceSet(s chunkSourceSet) (cp chunkSourceSet) { cp = make(chunkSourceSet, len(s)) diff --git a/go/store/nbs/table_index.go b/go/store/nbs/table_index.go index fa72a56a12..3b06065971 100644 --- a/go/store/nbs/table_index.go +++ b/go/store/nbs/table_index.go @@ -44,16 +44,16 @@ type tableIndex interface { // entrySuffixMatches returns true if the entry at index |idx| matches // the suffix of the address |h|. Used by |lookup| after finding // matching indexes based on |Prefixes|. - entrySuffixMatches(idx uint32, h *addr) (bool, error) + entrySuffixMatches(idx uint32, h *hash.Hash) (bool, error) // indexEntry returns the |indexEntry| at |idx|. Optionally puts the // full address of that entry in |a| if |a| is not |nil|. - indexEntry(idx uint32, a *addr) (indexEntry, error) + indexEntry(idx uint32, a *hash.Hash) (indexEntry, error) // lookup returns an |indexEntry| for the chunk corresponding to the // provided address |h|. Second returns is |true| if an entry exists // and |false| otherwise. - lookup(h *addr) (indexEntry, bool, error) + lookup(h *hash.Hash) (indexEntry, bool, error) // Ordinals returns a slice of indexes which maps the |i|th chunk in // the indexed file to its corresponding entry in index. The |i|th @@ -192,11 +192,11 @@ func readTableIndexByCopy(ctx context.Context, rd io.ReadSeeker, q MemoryQuotaPr func hashSetFromTableIndex(idx tableIndex) (hash.HashSet, error) { set := hash.NewHashSet() for i := uint32(0); i < idx.chunkCount(); i++ { - var a addr - if _, err := idx.indexEntry(i, &a); err != nil { + var h hash.Hash + if _, err := idx.indexEntry(i, &h); err != nil { return nil, err } - set.Insert(hash.Hash(a)) + set.Insert(h) } return set, nil } @@ -289,22 +289,22 @@ func newOnHeapTableIndex(indexBuff []byte, offsetsBuff1 []byte, count uint32, to }, nil } -func (ti onHeapTableIndex) entrySuffixMatches(idx uint32, h *addr) (bool, error) { +func (ti onHeapTableIndex) entrySuffixMatches(idx uint32, h *hash.Hash) (bool, error) { ord := ti.ordinalAt(idx) - o := ord * addrSuffixSize - b := ti.suffixes[o : o+addrSuffixSize] - return bytes.Equal(h[addrPrefixSize:], b), nil + o := ord * hash.SuffixLen + b := ti.suffixes[o : o+hash.SuffixLen] + return bytes.Equal(h[hash.PrefixLen:], b), nil } -func (ti onHeapTableIndex) indexEntry(idx uint32, a *addr) (entry indexEntry, err error) { +func (ti onHeapTableIndex) indexEntry(idx uint32, a *hash.Hash) (entry indexEntry, err error) { prefix, ord := ti.tupleAt(idx) if a != nil { binary.BigEndian.PutUint64(a[:], prefix) - o := int64(addrSuffixSize * ord) - b := ti.suffixes[o : o+addrSuffixSize] - copy(a[addrPrefixSize:], b) + o := int64(hash.SuffixLen * ord) + b := ti.suffixes[o : o+hash.SuffixLen] + copy(a[hash.PrefixLen:], b) } return ti.getIndexEntry(ord), nil @@ -325,7 +325,7 @@ func (ti onHeapTableIndex) getIndexEntry(ord uint32) indexEntry { } } -func (ti onHeapTableIndex) lookup(h *addr) (indexEntry, bool, error) { +func (ti onHeapTableIndex) lookup(h *hash.Hash) (indexEntry, bool, error) { ord, err := ti.lookupOrdinal(h) if err != nil { return indexResult{}, false, err @@ -338,7 +338,7 @@ func (ti onHeapTableIndex) lookup(h *addr) (indexEntry, bool, error) { // lookupOrdinal returns the ordinal of |h| if present. Returns |ti.count| // if absent. -func (ti onHeapTableIndex) lookupOrdinal(h *addr) (uint32, error) { +func (ti onHeapTableIndex) lookupOrdinal(h *hash.Hash) (uint32, error) { prefix := h.Prefix() for idx := ti.findPrefix(prefix); idx < ti.count && ti.prefixAt(idx) == prefix; idx++ { @@ -364,7 +364,7 @@ func (ti onHeapTableIndex) findPrefix(prefix uint64) (idx uint32) { h := idx + (j-idx)/2 // avoid overflow when computing h // i ≤ h < j o := int64(prefixTupleSize * h) - tmp := binary.BigEndian.Uint64(ti.prefixTuples[o : o+addrPrefixSize]) + tmp := binary.BigEndian.Uint64(ti.prefixTuples[o : o+hash.PrefixLen]) if tmp < prefix { idx = h + 1 // preserves f(i-1) == false } else { @@ -379,18 +379,18 @@ func (ti onHeapTableIndex) tupleAt(idx uint32) (prefix uint64, ord uint32) { b := ti.prefixTuples[off : off+prefixTupleSize] prefix = binary.BigEndian.Uint64(b[:]) - ord = binary.BigEndian.Uint32(b[addrPrefixSize:]) + ord = binary.BigEndian.Uint32(b[hash.PrefixLen:]) return prefix, ord } func (ti onHeapTableIndex) prefixAt(idx uint32) uint64 { off := int64(prefixTupleSize * idx) - b := ti.prefixTuples[off : off+addrPrefixSize] + b := ti.prefixTuples[off : off+hash.PrefixLen] return binary.BigEndian.Uint64(b) } func (ti onHeapTableIndex) ordinalAt(idx uint32) uint32 { - off := int64(prefixTupleSize*idx) + addrPrefixSize + off := int64(prefixTupleSize*idx) + hash.PrefixLen b := ti.prefixTuples[off : off+ordinalSize] return binary.BigEndian.Uint32(b) } @@ -413,7 +413,7 @@ func (ti onHeapTableIndex) ordinals() ([]uint32, error) { // todo: |o| is not accounted for in the memory quota o := make([]uint32, ti.count) for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize { - b := ti.prefixTuples[off+addrPrefixSize : off+prefixTupleSize] + b := ti.prefixTuples[off+hash.PrefixLen : off+prefixTupleSize] o[i] = binary.BigEndian.Uint32(b) } return o, nil @@ -423,7 +423,7 @@ func (ti onHeapTableIndex) prefixes() ([]uint64, error) { // todo: |p| is not accounted for in the memory quota p := make([]uint64, ti.count) for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize { - b := ti.prefixTuples[off : off+addrPrefixSize] + b := ti.prefixTuples[off : off+hash.PrefixLen] p[i] = binary.BigEndian.Uint64(b) } return p, nil @@ -435,14 +435,14 @@ func (ti onHeapTableIndex) hashAt(idx uint32) hash.Hash { tuple := ti.prefixTuples[off : off+prefixTupleSize] // Get prefix, ordinal, and suffix - prefix := tuple[:addrPrefixSize] - ord := binary.BigEndian.Uint32(tuple[addrPrefixSize:]) * addrSuffixSize - suffix := ti.suffixes[ord : ord+addrSuffixSize] // suffix is 12 bytes + prefix := tuple[:hash.PrefixLen] + ord := binary.BigEndian.Uint32(tuple[hash.PrefixLen:]) * hash.SuffixLen + suffix := ti.suffixes[ord : ord+hash.SuffixLen] // suffix is 12 bytes // Combine prefix and suffix to get hash buf := [hash.ByteLen]byte{} - copy(buf[:addrPrefixSize], prefix) - copy(buf[addrPrefixSize:], suffix) + copy(buf[:hash.PrefixLen], prefix) + copy(buf[hash.PrefixLen:], suffix) return buf } diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index 1b0a995682..5c230daa09 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -32,6 +32,7 @@ import ( "time" "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" ) var errCacheMiss = errors.New("index cache miss") @@ -55,15 +56,15 @@ type tablePersister interface { ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) // Open a table named |name|, containing |chunkCount| chunks. - Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) + Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) // Exists checks if a table named |name| exists. - Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) + Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) // PruneTableFiles deletes table files which the persister would normally be responsible for and // which are not in the included |keeper| set and have not be written or modified more recently // than the provided |mtime|. - PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error + PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, mtime time.Time) error AccessMode() chunks.ExclusiveAccessMode @@ -118,7 +119,7 @@ type compactionPlan struct { func (cp compactionPlan) suffixes() []byte { suffixesStart := uint64(cp.chunkCount) * (prefixTupleSize + lengthSize) - return cp.mergedIndex[suffixesStart : suffixesStart+uint64(cp.chunkCount)*addrSuffixSize] + return cp.mergedIndex[suffixesStart : suffixesStart+uint64(cp.chunkCount)*hash.SuffixLen] } // planRangeCopyConjoin computes a conjoin plan for tablePersisters that can conjoin @@ -221,19 +222,19 @@ func planConjoin(sources []sourceWithSize, stats *Stats) (plan compactionPlan, e suffixesPos += uint64(n) } else { // Build up the index one entry at a time. - var a addr + var h hash.Hash for i := 0; i < len(ordinals); i++ { - e, err := index.indexEntry(uint32(i), &a) + e, err := index.indexEntry(uint32(i), &h) if err != nil { return compactionPlan{}, err } li := lengthsPos + lengthSize*uint64(ordinals[i]) - si := suffixesPos + addrSuffixSize*uint64(ordinals[i]) + si := suffixesPos + hash.SuffixLen*uint64(ordinals[i]) binary.BigEndian.PutUint32(plan.mergedIndex[li:], e.Length()) - copy(plan.mergedIndex[si:], a[addrPrefixSize:]) + copy(plan.mergedIndex[si:], h[hash.PrefixLen:]) } lengthsPos += lengthSize * uint64(len(ordinals)) - suffixesPos += addrSuffixSize * uint64(len(ordinals)) + suffixesPos += hash.SuffixLen * uint64(len(ordinals)) } } @@ -242,7 +243,7 @@ func planConjoin(sources []sourceWithSize, stats *Stats) (plan compactionPlan, e var pfxPos uint64 for _, pi := range prefixIndexRecs { binary.BigEndian.PutUint64(plan.mergedIndex[pfxPos:], pi.addr.Prefix()) - pfxPos += addrPrefixSize + pfxPos += hash.PrefixLen binary.BigEndian.PutUint32(plan.mergedIndex[pfxPos:], pi.order) pfxPos += ordinalSize } @@ -253,12 +254,11 @@ func planConjoin(sources []sourceWithSize, stats *Stats) (plan compactionPlan, e return plan, nil } -func nameFromSuffixes(suffixes []byte) (name addr) { +func nameFromSuffixes(suffixes []byte) (name hash.Hash) { sha := sha512.New() sha.Write(suffixes) var h []byte h = sha.Sum(h) // Appends hash to h - copy(name[:], h) - return + return hash.New(h[:hash.ByteLen]) } diff --git a/go/store/nbs/table_reader.go b/go/store/nbs/table_reader.go index d8091c4814..3bb7246a1e 100644 --- a/go/store/nbs/table_reader.go +++ b/go/store/nbs/table_reader.go @@ -233,14 +233,14 @@ func (tr tableReader) index() (tableIndex, error) { } // returns true iff |h| can be found in this table. -func (tr tableReader) has(h addr) (bool, error) { +func (tr tableReader) has(h hash.Hash) (bool, error) { _, ok, err := tr.idx.lookup(&h) return ok, err } // returns the storage associated with |h|, iff present. Returns nil if absent. On success, // the returned byte slice directly references the underlying storage. -func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { +func (tr tableReader) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { e, found, err := tr.idx.lookup(&h) if err != nil { return nil, err @@ -283,7 +283,7 @@ func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, er } type offsetRec struct { - a *addr + a *hash.Hash offset uint64 length uint32 } @@ -639,12 +639,12 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord) var ors offsetRecSlice for i := uint32(0); i < tr.idx.chunkCount(); i++ { - a := new(addr) - e, err := tr.idx.indexEntry(i, a) + h := new(hash.Hash) + e, err := tr.idx.indexEntry(i, h) if err != nil { return err } - ors = append(ors, offsetRec{a, e.Offset(), e.Length()}) + ors = append(ors, offsetRec{h, e.Offset(), e.Length()}) } sort.Sort(ors) for _, or := range ors { diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index 46e2441ec4..bce3aa0cf0 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -29,6 +29,7 @@ import ( "sort" "sync" + "github.com/dolthub/dolt/go/store/hash" lru "github.com/hashicorp/golang-lru/v2" "golang.org/x/sync/errgroup" @@ -57,7 +58,7 @@ type tableSet struct { rl chan struct{} } -func (ts tableSet) has(h addr) (bool, error) { +func (ts tableSet) has(h hash.Hash) (bool, error) { f := func(css chunkSourceSet) (bool, error) { for _, haver := range css { has, err := haver.has(h) @@ -114,7 +115,7 @@ func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) { return f(ts.upstream) } -func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { +func (ts tableSet) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { if err := ctx.Err(); err != nil { return nil, err } @@ -287,7 +288,7 @@ func (ts tableSet) Size() int { // append adds a memTable to an existing tableSet, compacting |mt| and // returning a new tableSet with newly compacted table added. -func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[addr, struct{}], stats *Stats) (tableSet, error) { +func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[hash.Hash, struct{}], stats *Stats) (tableSet, error) { for i := range mt.pendingRefs { if !mt.pendingRefs[i].has && hasCache.Contains(*mt.pendingRefs[i].a) { mt.pendingRefs[i].has = true @@ -345,7 +346,7 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats) // deduplicate |specs| orig := specs specs = make([]tableSpec, 0, len(orig)) - seen := map[addr]struct{}{} + seen := map[hash.Hash]struct{}{} for _, spec := range orig { if _, ok := seen[spec.name]; ok { continue diff --git a/go/store/nbs/table_writer.go b/go/store/nbs/table_writer.go index eaf820e9b8..4dd9c1569a 100644 --- a/go/store/nbs/table_writer.go +++ b/go/store/nbs/table_writer.go @@ -26,12 +26,13 @@ import ( "encoding/binary" "errors" "fmt" - "hash" + gohash "hash" "sort" "github.com/golang/snappy" "github.com/dolthub/dolt/go/store/d" + "github.com/dolthub/dolt/go/store/hash" ) // tableWriter encodes a collection of byte stream chunks into a nbs table. NOT goroutine safe. @@ -41,7 +42,7 @@ type tableWriter struct { totalCompressedData uint64 totalUncompressedData uint64 prefixes prefixIndexSlice - blockHash hash.Hash + blockHash gohash.Hash snapper snappyEncoder } @@ -61,11 +62,11 @@ func maxTableSize(numChunks, totalData uint64) uint64 { d.Chk.True(avgChunkSize < maxChunkSize) maxSnappySize := snappy.MaxEncodedLen(int(avgChunkSize)) d.Chk.True(maxSnappySize > 0) - return numChunks*(prefixTupleSize+lengthSize+addrSuffixSize+checksumSize+uint64(maxSnappySize)) + footerSize + return numChunks*(prefixTupleSize+lengthSize+hash.SuffixLen+checksumSize+uint64(maxSnappySize)) + footerSize } func indexSize(numChunks uint32) uint64 { - return uint64(numChunks) * (addrSuffixSize + lengthSize + prefixTupleSize) + return uint64(numChunks) * (hash.SuffixLen + lengthSize + prefixTupleSize) } func lengthsOffset(numChunks uint32) uint64 { @@ -88,7 +89,7 @@ func newTableWriter(buff []byte, snapper snappyEncoder) *tableWriter { } } -func (tw *tableWriter) addChunk(h addr, data []byte) bool { +func (tw *tableWriter) addChunk(h hash.Hash, data []byte) bool { if len(data) == 0 { panic("NBS blocks cannont be zero length") } @@ -123,11 +124,11 @@ func (tw *tableWriter) addChunk(h addr, data []byte) bool { return true } -func (tw *tableWriter) finish() (uncompressedLength uint64, blockAddr addr, err error) { +func (tw *tableWriter) finish() (uncompressedLength uint64, blockAddr hash.Hash, err error) { err = tw.writeIndex() if err != nil { - return 0, addr{}, err + return 0, hash.Hash{}, err } tw.writeFooter() @@ -140,20 +141,22 @@ func (tw *tableWriter) finish() (uncompressedLength uint64, blockAddr addr, err } type prefixIndexRec struct { - addr addr + addr hash.Hash order, size uint32 } type prefixIndexSlice []prefixIndexRec -func (hs prefixIndexSlice) Len() int { return len(hs) } -func (hs prefixIndexSlice) Less(i, j int) bool { return hs[i].addr.Prefix() < hs[j].addr.Prefix() } -func (hs prefixIndexSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] } +func (hs prefixIndexSlice) Len() int { return len(hs) } +func (hs prefixIndexSlice) Less(i, j int) bool { + return hs[i].addr.Prefix() < hs[j].addr.Prefix() +} +func (hs prefixIndexSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] } func (tw *tableWriter) writeIndex() error { sort.Sort(tw.prefixes) - pfxScratch := [addrPrefixSize]byte{} + pfxScratch := [hash.PrefixLen]byte{} numRecords := uint32(len(tw.prefixes)) lengthsOffset := tw.pos + lengthsOffset(numRecords) // skip prefix and ordinal for each record @@ -163,7 +166,7 @@ func (tw *tableWriter) writeIndex() error { // hash prefix n := uint64(copy(tw.buff[tw.pos:], pfxScratch[:])) - if n != addrPrefixSize { + if n != hash.PrefixLen { return errors.New("failed to copy all data") } @@ -178,14 +181,14 @@ func (tw *tableWriter) writeIndex() error { binary.BigEndian.PutUint32(tw.buff[offset:], pi.size) // hash suffix - offset = suffixesOffset + uint64(pi.order)*addrSuffixSize + offset = suffixesOffset + uint64(pi.order)*hash.SuffixLen n = uint64(copy(tw.buff[offset:], pi.addr.Suffix())) - if n != addrSuffixSize { + if n != hash.SuffixLen { return errors.New("failed to copy all bytes") } } - suffixesLen := uint64(numRecords) * addrSuffixSize + suffixesLen := uint64(numRecords) * hash.SuffixLen tw.blockHash.Write(tw.buff[suffixesOffset : suffixesOffset+suffixesLen]) tw.pos = suffixesOffset + suffixesLen diff --git a/go/store/nbs/util.go b/go/store/nbs/util.go index 0d3dcb8982..1fff409155 100644 --- a/go/store/nbs/util.go +++ b/go/store/nbs/util.go @@ -33,21 +33,21 @@ func IterChunks(ctx context.Context, rd io.ReadSeeker, cb func(chunk chunks.Chun defer idx.Close() - seen := make(map[addr]bool) + seen := make(map[hash.Hash]bool) for i := uint32(0); i < idx.chunkCount(); i++ { - var a addr - ie, err := idx.indexEntry(i, &a) + var h hash.Hash + ie, err := idx.indexEntry(i, &h) if err != nil { return err } - if _, ok := seen[a]; !ok { - seen[a] = true + if _, ok := seen[h]; !ok { + seen[h] = true chunkBytes, err := readNFrom(rd, ie.Offset(), ie.Length()) if err != nil { return err } - cmpChnk, err := NewCompressedChunk(hash.Hash(a), chunkBytes) + cmpChnk, err := NewCompressedChunk(h, chunkBytes) if err != nil { return err }