[no release notes] Merge pull request #7609 from dolthub/macneale4/nbs-addr

Drop the nbs.addr type in favor of hash.Hash type
This commit is contained in:
Neil Macneale IV
2024-03-14 09:29:25 -07:00
committed by GitHub
51 changed files with 435 additions and 449 deletions
+2
View File
@@ -25,10 +25,12 @@ import "encoding/base32"
var encoding = base32.NewEncoding("0123456789abcdefghijklmnopqrstuv")
// encode returns the base32 encoding in the Dolt alphabet.
func encode(data []byte) string {
return encoding.EncodeToString(data)
}
// decode returns the bytes represented by the Base32 string using the Dolt alphabet.
func decode(s string) []byte {
slice, _ := encoding.DecodeString(s)
return slice
+17
View File
@@ -48,6 +48,7 @@ package hash
import (
"bytes"
"crypto/sha512"
"encoding/binary"
"fmt"
"regexp"
"strconv"
@@ -60,6 +61,12 @@ const (
// ByteLen is the number of bytes used to represent the Hash.
ByteLen = 20
// PrefixLen is the number of bytes used to represent the Prefix of the Hash.
PrefixLen = 8 // uint64
// SuffixLen is the number of bytes which come after the Prefix.
SuffixLen = ByteLen - PrefixLen
// StringLen is the number of characters need to represent the Hash using Base32.
StringLen = 32 // 20 * 8 / log2(32)
)
@@ -123,6 +130,16 @@ func Parse(s string) Hash {
return r
}
// Prefix returns the first 8 bytes of the hash as a unit64. Used for chunk indexing
func (h Hash) Prefix() uint64 {
return binary.BigEndian.Uint64(h[:PrefixLen])
}
// Suffix returns the last 12 bytes of the hash. Used for chunk indexing
func (h Hash) Suffix() []byte {
return h[PrefixLen:]
}
// Less compares two hashes returning whether this Hash is less than other.
func (h Hash) Less(other Hash) bool {
return h.Compare(other) < 0
+4 -2
View File
@@ -26,9 +26,11 @@ import (
"context"
"errors"
"time"
"github.com/dolthub/dolt/go/store/hash"
)
func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) {
func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) {
magic := make([]byte, magicNumberSize)
n, _, err := s3.ReadFromEnd(ctx, name, magic, stats)
if err != nil {
@@ -40,7 +42,7 @@ func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLim
return bytes.Equal(magic, []byte(magicNumber)), nil
}
func newAWSChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
func newAWSChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
var tra tableReaderAt
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
n, _, err := s3.ReadFromEnd(ctx, name, p, stats)
+4 -3
View File
@@ -39,6 +39,7 @@ import (
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/util/verbose"
)
@@ -66,7 +67,7 @@ type awsLimits struct {
partTarget, partMin, partMax uint64
}
func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (s3p awsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newAWSChunkSource(
ctx,
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
@@ -78,7 +79,7 @@ func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uin
)
}
func (s3p awsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (s3p awsTablePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
return tableExistsInChunkSource(
ctx,
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
@@ -482,7 +483,7 @@ func (s3p awsTablePersister) uploadPart(ctx context.Context, data []byte, key, u
return
}
func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, t time.Time) error {
func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error {
return chunks.ErrUnsupportedOperation
}
+6 -4
View File
@@ -35,6 +35,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/hash"
)
func randomChunks(t *testing.T, r *rand.Rand, sz int) [][]byte {
@@ -160,22 +162,22 @@ func TestAWSTablePersisterPersist(t *testing.T) {
}
type waitOnStoreTableCache struct {
readers map[addr]io.ReaderAt
readers map[hash.Hash]io.ReaderAt
mu sync.RWMutex
storeWG sync.WaitGroup
}
func (mtc *waitOnStoreTableCache) checkout(h addr) (io.ReaderAt, error) {
func (mtc *waitOnStoreTableCache) checkout(h hash.Hash) (io.ReaderAt, error) {
mtc.mu.RLock()
defer mtc.mu.RUnlock()
return mtc.readers[h], nil
}
func (mtc *waitOnStoreTableCache) checkin(h addr) error {
func (mtc *waitOnStoreTableCache) checkin(h hash.Hash) error {
return nil
}
func (mtc *waitOnStoreTableCache) store(h addr, data io.Reader, size uint64) error {
func (mtc *waitOnStoreTableCache) store(h hash.Hash, data io.Reader, size uint64) error {
defer mtc.storeWG.Done()
mtc.mu.Lock()
defer mtc.mu.Unlock()
+4 -3
View File
@@ -21,6 +21,7 @@ import (
"github.com/dolthub/dolt/go/store/blobstore"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
const (
@@ -74,7 +75,7 @@ func (bsm blobstoreManifest) ParseIfExists(ctx context.Context, stats *Stats, re
}
// Update updates the contents of the manifest in the blobstore
func (bsm blobstoreManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (bsm blobstoreManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
checker := func(upstream, contents manifestContents) error {
if contents.gcGen != upstream.gcGen {
return chunks.ErrGCGenerationExpired
@@ -85,7 +86,7 @@ func (bsm blobstoreManifest) Update(ctx context.Context, lastLock addr, newConte
return updateBSWithChecker(ctx, bsm.bs, checker, lastLock, newContents, writeHook)
}
func (bsm blobstoreManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (bsm blobstoreManifest) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
checker := func(upstream, contents manifestContents) error {
if contents.gcGen == upstream.gcGen {
return errors.New("UpdateGCGen() must update the garbage collection generation")
@@ -100,7 +101,7 @@ func (bsm blobstoreManifest) UpdateGCGen(ctx context.Context, lastLock addr, new
return updateBSWithChecker(ctx, bsm.bs, checker, lastLock, newContents, writeHook)
}
func updateBSWithChecker(ctx context.Context, bs blobstore.Blobstore, validate manifestChecker, lastLock addr, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) {
func updateBSWithChecker(ctx context.Context, bs blobstore.Blobstore, validate manifestChecker, lastLock hash.Hash, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) {
if writeHook != nil {
panic("Write hooks not supported")
}
+5 -4
View File
@@ -26,6 +26,7 @@ import (
"github.com/dolthub/dolt/go/store/blobstore"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
const (
@@ -160,15 +161,15 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk
}
// Open a table named |name|, containing |chunkCount| chunks.
func (bsp *blobstorePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (bsp *blobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats)
}
func (bsp *blobstorePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (bsp *blobstorePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
return bsp.bs.Exists(ctx, name.String())
}
func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, t time.Time) error {
func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error {
return nil
}
@@ -275,7 +276,7 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off
return totalRead, nil
}
func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0))
if err != nil {
+8 -4
View File
@@ -14,18 +14,22 @@
package nbs
import "context"
import (
"context"
"github.com/dolthub/dolt/go/store/hash"
)
type chunkSourceAdapter struct {
tableReader
h addr
h hash.Hash
}
func (csa chunkSourceAdapter) hash() addr {
func (csa chunkSourceAdapter) hash() hash.Hash {
return csa.h
}
func newReaderFromIndexData(ctx context.Context, q MemoryQuotaProvider, idxData []byte, name addr, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) {
func newReaderFromIndexData(ctx context.Context, q MemoryQuotaProvider, idxData []byte, name hash.Hash, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) {
index, err := parseTableIndexByCopy(ctx, idxData, q)
if err != nil {
return nil, err
+12 -12
View File
@@ -18,12 +18,14 @@ import (
"crypto/sha512"
"encoding/binary"
"errors"
"hash"
gohash "hash"
"io"
"os"
"sort"
"github.com/golang/snappy"
"github.com/dolthub/dolt/go/store/hash"
)
const defaultTableSinkBlockSize = 2 * 1024 * 1024
@@ -44,7 +46,7 @@ type CmpChunkTableWriter struct {
totalCompressedData uint64
totalUncompressedData uint64
prefixes prefixIndexSlice
blockAddr *addr
blockAddr *hash.Hash
path string
}
@@ -96,7 +98,7 @@ func (tw *CmpChunkTableWriter) AddCmpChunk(c CompressedChunk) error {
// Stored in insertion order
tw.prefixes = append(tw.prefixes, prefixIndexRec{
addr(c.H),
c.H,
uint32(len(tw.prefixes)),
uint32(fullLen),
})
@@ -124,9 +126,7 @@ func (tw *CmpChunkTableWriter) Finish() (string, error) {
var h []byte
h = blockHash.Sum(h)
var blockAddr addr
copy(blockAddr[:], h)
blockAddr := hash.New(h[:hash.ByteLen])
tw.blockAddr = &blockAddr
return tw.blockAddr.String(), nil
@@ -189,7 +189,7 @@ func containsDuplicates(prefixes prefixIndexSlice) bool {
return false
}
func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) {
func (tw *CmpChunkTableWriter) writeIndex() (gohash.Hash, error) {
sort.Sort(tw.prefixes)
// We do a sanity check here to assert that we are never writing duplicate chunks into
@@ -198,13 +198,13 @@ func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) {
return nil, ErrDuplicateChunkWritten
}
pfxScratch := [addrPrefixSize]byte{}
pfxScratch := [hash.PrefixLen]byte{}
blockHash := sha512.New()
numRecords := uint32(len(tw.prefixes))
lengthsOffset := lengthsOffset(numRecords) // skip prefix and ordinal for each record
suffixesOffset := suffixesOffset(numRecords) // skip size for each record
suffixesLen := uint64(numRecords) * addrSuffixSize
suffixesLen := uint64(numRecords) * hash.SuffixLen
buff := make([]byte, suffixesLen+suffixesOffset)
var pos uint64
@@ -213,7 +213,7 @@ func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) {
// hash prefix
n := uint64(copy(buff[pos:], pfxScratch[:]))
if n != addrPrefixSize {
if n != hash.PrefixLen {
return nil, errors.New("failed to copy all data")
}
@@ -228,10 +228,10 @@ func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) {
binary.BigEndian.PutUint32(buff[offset:], pi.size)
// hash suffix
offset = suffixesOffset + uint64(pi.order)*addrSuffixSize
offset = suffixesOffset + uint64(pi.order)*hash.SuffixLen
n = uint64(copy(buff[offset:], pi.addr.Suffix()))
if n != addrSuffixSize {
if n != hash.SuffixLen {
return nil, errors.New("failed to copy all bytes")
}
}
+4 -2
View File
@@ -28,6 +28,8 @@ import (
"time"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/store/hash"
)
type conjoinStrategy interface {
@@ -171,8 +173,8 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
upstream, appendixSpecs = upstream.removeAppendixSpecs()
}
conjoineeSet := map[addr]struct{}{}
upstreamNames := map[addr]struct{}{}
conjoineeSet := map[hash.Hash]struct{}{}
upstreamNames := map[hash.Hash]struct{}{}
for _, spec := range upstream.specs {
upstreamNames[spec.name] = struct{}{}
}
+3 -3
View File
@@ -111,7 +111,7 @@ func TestConjoin(t *testing.T) {
func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
stats := &Stats{}
setup := func(lock addr, root hash.Hash, sizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) {
setup := func(lock hash.Hash, root hash.Hash, sizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) {
p = factory(t)
fm = &fakeManifest{}
fm.set(constants.FormatLD1String, lock, root, makeTestTableSpecs(t, sizes, p), nil)
@@ -257,7 +257,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
}
})
setupAppendix := func(lock addr, root hash.Hash, specSizes, appendixSizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) {
setupAppendix := func(lock hash.Hash, root hash.Hash, specSizes, appendixSizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) {
p = newFakeTablePersister(&UnlimitedQuotaProvider{})
fm = &fakeManifest{}
fm.set(constants.FormatLD1String, lock, root, makeTestTableSpecs(t, specSizes, p), makeTestTableSpecs(t, appendixSizes, p))
@@ -403,7 +403,7 @@ type updatePreemptManifest struct {
preUpdate func()
}
func (u updatePreemptManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (u updatePreemptManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
if u.preUpdate != nil {
u.preUpdate()
}
+2 -2
View File
@@ -146,7 +146,7 @@ func validateManifest(item map[string]*dynamodb.AttributeValue) (valid, hasSpecs
return false, false, false
}
func (dm dynamoManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (dm dynamoManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
t1 := time.Now()
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
@@ -174,7 +174,7 @@ func (dm dynamoManifest) Update(ctx context.Context, lastLock addr, newContents
}
expr := valueEqualsExpression
if lastLock == (addr{}) {
if lastLock.IsEmpty() {
expr = valueNotExistsOrEqualsExpression
}
+6 -6
View File
@@ -110,7 +110,7 @@ func TestDynamoManifestUpdate(t *testing.T) {
// First, test winning the race against another process.
contents := makeContents("locker", "nuroot", []tableSpec{{computeAddr([]byte("a")), 3}}, nil)
upstream, err := mm.Update(context.Background(), addr{}, contents, stats, func() error {
upstream, err := mm.Update(context.Background(), hash.Hash{}, contents, stats, func() error {
// This should fail to get the lock, and therefore _not_ clobber the manifest. So the Update should succeed.
lock := computeAddr([]byte("nolock"))
newRoot2 := hash.Of([]byte("noroot"))
@@ -124,7 +124,7 @@ func TestDynamoManifestUpdate(t *testing.T) {
// Now, test the case where the optimistic lock fails, and someone else updated the root since last we checked.
rejected := makeContents("locker 2", "new root 2", nil, nil)
upstream, err = mm.Update(context.Background(), addr{}, rejected, stats, nil)
upstream, err = mm.Update(context.Background(), hash.Hash{}, rejected, stats, nil)
require.NoError(t, err)
assert.Equal(contents.lock, upstream.lock)
assert.Equal(contents.root, upstream.root)
@@ -162,7 +162,7 @@ func TestDynamoManifestUpdateAppendix(t *testing.T) {
app := []tableSpec{{computeAddr([]byte("app-a")), 3}}
contents := makeContents("locker", "nuroot", specs, app)
upstream, err := mm.Update(context.Background(), addr{}, contents, stats, func() error {
upstream, err := mm.Update(context.Background(), hash.Hash{}, contents, stats, func() error {
// This should fail to get the lock, and therefore _not_ clobber the manifest. So the Update should succeed.
lock := computeAddr([]byte("nolock"))
newRoot2 := hash.Of([]byte("noroot"))
@@ -177,7 +177,7 @@ func TestDynamoManifestUpdateAppendix(t *testing.T) {
// Now, test the case where the optimistic lock fails, and someone else updated the root since last we checked.
rejected := makeContents("locker 2", "new root 2", nil, nil)
upstream, err = mm.Update(context.Background(), addr{}, rejected, stats, nil)
upstream, err = mm.Update(context.Background(), hash.Hash{}, rejected, stats, nil)
require.NoError(t, err)
assert.Equal(contents.lock, upstream.lock)
assert.Equal(contents.root, upstream.root)
@@ -232,7 +232,7 @@ func TestDynamoManifestCaching(t *testing.T) {
// When failing the optimistic lock, we should hit persistent storage.
reads = ddb.NumGets()
contents := makeContents("lock2", "nuroot", []tableSpec{{computeAddr([]byte("a")), 3}}, nil)
upstream, err := mm.Update(context.Background(), addr{}, contents, stats, nil)
upstream, err := mm.Update(context.Background(), hash.Hash{}, contents, stats, nil)
require.NoError(t, err)
assert.NotEqual(contents.lock, upstream.lock)
assert.Equal(reads+1, ddb.NumGets())
@@ -251,7 +251,7 @@ func TestDynamoManifestUpdateEmpty(t *testing.T) {
stats := &Stats{}
contents := manifestContents{nbfVers: constants.FormatLD1String, lock: computeAddr([]byte{0x01})}
upstream, err := mm.Update(context.Background(), addr{}, contents, stats, nil)
upstream, err := mm.Update(context.Background(), hash.Hash{}, contents, stats, nil)
require.NoError(t, err)
assert.Equal(contents.lock, upstream.lock)
assert.True(upstream.root.IsEmpty())
+4 -4
View File
@@ -34,7 +34,7 @@ import (
type emptyChunkSource struct{}
func (ecs emptyChunkSource) has(h addr) (bool, error) {
func (ecs emptyChunkSource) has(h hash.Hash) (bool, error) {
return false, nil
}
@@ -42,7 +42,7 @@ func (ecs emptyChunkSource) hasMany(addrs []hasRecord) (bool, error) {
return true, nil
}
func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
func (ecs emptyChunkSource) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
return nil, nil
}
@@ -62,8 +62,8 @@ func (ecs emptyChunkSource) uncompressedLen() (uint64, error) {
return 0, nil
}
func (ecs emptyChunkSource) hash() addr {
return addr{}
func (ecs emptyChunkSource) hash() hash.Hash {
return hash.Hash{}
}
func (ecs emptyChunkSource) index() (tableIndex, error) {
+14 -16
View File
@@ -78,8 +78,7 @@ func MaybeMigrateFileManifest(ctx context.Context, dir string) (bool, error) {
}
check := func(_, contents manifestContents) error {
var empty addr
if contents.gcGen != empty {
if !contents.gcGen.IsEmpty() {
return errors.New("migrating from v4 to v5 should result in a manifest with a 0 gcGen")
}
@@ -170,7 +169,7 @@ func (fm fileManifest) ParseIfExists(
return parseIfExists(ctx, fm.dir, readHook)
}
func (fm fileManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
func (fm fileManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
t1 := time.Now()
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
@@ -194,7 +193,7 @@ func (fm fileManifest) Update(ctx context.Context, lastLock addr, newContents ma
return updateWithChecker(ctx, fm.dir, fm.mode, checker, lastLock, newContents, writeHook)
}
func (fm fileManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
func (fm fileManifest) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
t1 := time.Now()
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
@@ -245,14 +244,14 @@ func parseV5Manifest(r io.Reader) (manifestContents, error) {
return manifestContents{}, err
}
lock, err := parseAddr(slices[1])
if err != nil {
return manifestContents{}, err
lock, ok := hash.MaybeParse(slices[1])
if !ok {
return manifestContents{}, fmt.Errorf("Could not parse lock hash: %s", slices[1])
}
gcGen, err := parseAddr(slices[3])
if err != nil {
return manifestContents{}, err
gcGen, ok := hash.MaybeParse(slices[3])
if !ok {
return manifestContents{}, fmt.Errorf("Could not parse GC generation hash: %s", slices[3])
}
return manifestContents{
@@ -329,10 +328,9 @@ func parseV4Manifest(r io.Reader) (manifestContents, error) {
return manifestContents{}, err
}
ad, err := parseAddr(slices[1])
if err != nil {
return manifestContents{}, err
ad, ok := hash.MaybeParse(slices[1])
if !ok {
return manifestContents{}, fmt.Errorf("Could not parse lock hash: %s", slices[1])
}
return manifestContents{
@@ -373,7 +371,7 @@ func parseIfExists(_ context.Context, dir string, readHook func() error) (exists
}
// updateWithChecker updates the manifest if |validate| is satisfied, callers must hold the file lock.
func updateWithChecker(_ context.Context, dir string, mode updateMode, validate manifestChecker, lastLock addr, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) {
func updateWithChecker(_ context.Context, dir string, mode updateMode, validate manifestChecker, lastLock hash.Hash, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) {
var tempManifestPath string
// Write a temporary manifest file, to be renamed over manifestFileName upon success.
@@ -450,7 +448,7 @@ func updateWithChecker(_ context.Context, dir string, mode updateMode, validate
return manifestContents{}, ferr
}
if lastLock != (addr{}) {
if !lastLock.IsEmpty() {
return manifestContents{}, errors.New("new manifest created with non 0 lock")
}
+8 -8
View File
@@ -60,7 +60,7 @@ func TestFileManifestLoadIfExists(t *testing.T) {
jerk := computeAddr([]byte("jerk"))
newRoot := hash.Of([]byte("new root"))
tableName := hash.Of([]byte("table1"))
gcGen := addr{}
gcGen := hash.Hash{}
m := strings.Join([]string{StorageVersion, "0", jerk.String(), newRoot.String(), gcGen.String(), tableName.String(), "0"}, ":")
err = clobberManifest(fm.dir, m)
require.NoError(t, err)
@@ -85,11 +85,11 @@ func TestFileManifestUpdateWontClobberOldVersion(t *testing.T) {
stats := &Stats{}
// Simulate another process having already put old Noms data in dir/.
m := strings.Join([]string{StorageVersion, "0", addr{}.String(), hash.Hash{}.String(), addr{}.String()}, ":")
m := strings.Join([]string{StorageVersion, "0", hash.Hash{}.String(), hash.Hash{}.String(), hash.Hash{}.String()}, ":")
err := clobberManifest(fm.dir, m)
require.NoError(t, err)
_, err = fm.Update(context.Background(), addr{}, manifestContents{}, stats, nil)
_, err = fm.Update(context.Background(), hash.Hash{}, manifestContents{}, stats, nil)
assert.Error(err)
}
@@ -100,7 +100,7 @@ func TestFileManifestUpdateEmpty(t *testing.T) {
stats := &Stats{}
l := computeAddr([]byte{0x01})
upstream, err := fm.Update(context.Background(), addr{}, manifestContents{nbfVers: constants.FormatLD1String, lock: l}, stats, nil)
upstream, err := fm.Update(context.Background(), hash.Hash{}, manifestContents{nbfVers: constants.FormatLD1String, lock: l}, stats, nil)
require.NoError(t, err)
assert.Equal(l, upstream.lock)
assert.True(upstream.root.IsEmpty())
@@ -136,11 +136,11 @@ func TestFileManifestUpdate(t *testing.T) {
root: hash.Of([]byte("new root")),
specs: []tableSpec{{computeAddr([]byte("a")), 3}},
}
upstream, err := fm.Update(context.Background(), addr{}, contents, stats, func() error {
upstream, err := fm.Update(context.Background(), hash.Hash{}, contents, stats, func() error {
// This should fail to get the lock, and therefore _not_ clobber the manifest. So the Update should succeed.
lock := computeAddr([]byte("nolock"))
newRoot2 := hash.Of([]byte("noroot"))
gcGen := addr{}
gcGen := hash.Hash{}
m := strings.Join([]string{StorageVersion, constants.FormatLD1String, lock.String(), newRoot2.String(), gcGen.String()}, ":")
b, err := tryClobberManifest(fm.dir, m)
require.NoError(t, err, string(b))
@@ -153,7 +153,7 @@ func TestFileManifestUpdate(t *testing.T) {
// Now, test the case where the optimistic lock fails, and someone else updated the root since last we checked.
contents2 := manifestContents{lock: computeAddr([]byte("locker 2")), root: hash.Of([]byte("new root 2")), nbfVers: constants.FormatLD1String}
upstream, err = fm.Update(context.Background(), addr{}, contents2, stats, nil)
upstream, err = fm.Update(context.Background(), hash.Hash{}, contents2, stats, nil)
require.NoError(t, err)
assert.Equal(contents.lock, upstream.lock)
assert.Equal(contents.root, upstream.root)
@@ -167,7 +167,7 @@ func TestFileManifestUpdate(t *testing.T) {
// Now, test the case where the optimistic lock fails because someone else updated only the tables since last we checked
jerkLock := computeAddr([]byte("jerk"))
tableName := computeAddr([]byte("table1"))
gcGen := addr{}
gcGen := hash.Hash{}
m := strings.Join([]string{StorageVersion, constants.FormatLD1String, jerkLock.String(), contents2.root.String(), gcGen.String(), tableName.String(), "1"}, ":")
err = clobberManifest(fm.dir, m)
require.NoError(t, err)
+6 -6
View File
@@ -36,6 +36,7 @@ import (
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/util/tempfiles"
)
@@ -67,11 +68,11 @@ type fsTablePersister struct {
var _ tablePersister = &fsTablePersister{}
var _ tableFilePersister = &fsTablePersister{}
func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (ftp *fsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q)
}
func (ftp *fsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (ftp *fsTablePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
ftp.removeMu.Lock()
defer ftp.removeMu.Unlock()
if ftp.toKeep != nil {
@@ -157,7 +158,7 @@ func (ftp *fsTablePersister) TryMoveCmpChunkTableWriter(ctx context.Context, fil
return w.FlushToFile(path)
}
func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data []byte, chunkCount uint32, stats *Stats) (cs chunkSource, err error) {
func (ftp *fsTablePersister) persistTable(ctx context.Context, name hash.Hash, data []byte, chunkCount uint32, stats *Stats) (cs chunkSource, err error) {
if chunkCount == 0 {
return emptyChunkSource{}, nil
}
@@ -316,7 +317,7 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
}, nil
}
func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error {
func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, mtime time.Time) error {
ftp.removeMu.Lock()
if ftp.toKeep != nil {
ftp.removeMu.Unlock()
@@ -368,8 +369,7 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func()
continue // not a table file
}
_, err := parseAddr(info.Name())
if err != nil {
if _, ok := hash.MaybeParse(info.Name()); !ok {
continue // not a table file
}
+5 -4
View File
@@ -30,6 +30,7 @@ import (
"testing"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/store/hash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -41,23 +42,23 @@ func makeTempDir(t *testing.T) string {
return dir
}
func writeTableData(dir string, chunx ...[]byte) (addr, error) {
func writeTableData(dir string, chunx ...[]byte) (hash.Hash, error) {
tableData, name, err := buildTable(chunx)
if err != nil {
return addr{}, err
return hash.Hash{}, err
}
err = os.WriteFile(filepath.Join(dir, name.String()), tableData, 0666)
if err != nil {
return addr{}, err
return hash.Hash{}, err
}
return name, nil
}
func removeTables(dir string, names ...addr) error {
func removeTables(dir string, names ...hash.Hash) error {
for _, name := range names {
if err := file.Remove(filepath.Join(dir, name.String())); err != nil {
return err
+6 -4
View File
@@ -29,18 +29,20 @@ import (
"os"
"path/filepath"
"time"
"github.com/dolthub/dolt/go/store/hash"
)
type fileTableReader struct {
tableReader
h addr
h hash.Hash
}
const (
fileBlockSize = 1 << 12
)
func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) {
func tableFileExists(ctx context.Context, dir string, h hash.Hash) (bool, error) {
path := filepath.Join(dir, h.String())
_, err := os.Stat(path)
@@ -51,7 +53,7 @@ func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) {
return err == nil, err
}
func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) {
func newFileTableReader(ctx context.Context, dir string, h hash.Hash, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) {
path := filepath.Join(dir, h.String())
var f *os.File
@@ -134,7 +136,7 @@ func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint
}, nil
}
func (ftr *fileTableReader) hash() addr {
func (ftr *fileTableReader) hash() hash.Hash {
return ftr.h
}
+5 -4
View File
@@ -18,6 +18,8 @@ import (
"context"
"fmt"
"strings"
"github.com/dolthub/dolt/go/store/hash"
)
type gcErrAccum map[string]error
@@ -72,10 +74,9 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister
_ = gcc.writer.Remove()
}()
var addr addr
addr, err = parseAddr(filename)
if err != nil {
return nil, err
addr, ok := hash.MaybeParse(filename)
if !ok {
return nil, fmt.Errorf("invalid filename: %s", filename)
}
exists, err := tfp.Exists(ctx, addr, uint32(gcc.writer.ChunkCount()), nil)
+3 -1
View File
@@ -18,6 +18,8 @@ import (
"encoding/binary"
"errors"
"io"
"github.com/dolthub/dolt/go/store/hash"
)
var (
@@ -27,7 +29,7 @@ var (
func NewIndexTransformer(src io.Reader, chunkCount int) io.Reader {
tuplesSize := chunkCount * prefixTupleSize
lengthsSize := chunkCount * lengthSize
suffixesSize := chunkCount * addrSuffixSize
suffixesSize := chunkCount * hash.SuffixLen
tupleReader := io.LimitReader(src, int64(tuplesSize))
lengthsReader := io.LimitReader(src, int64(lengthsSize))
+2 -1
View File
@@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/libraries/utils/test"
"github.com/dolthub/dolt/go/store/hash"
)
// minByteReader is a copy of smallerByteReader from testing/iotest
@@ -166,7 +167,7 @@ func TestIndexTransformer(t *testing.T) {
offsetBytes := get64Bytes(offsets)
tupleBytes := test.RandomData(chunkCount * prefixTupleSize)
suffixBytes := test.RandomData(chunkCount * addrSuffixSize)
suffixBytes := test.RandomData(chunkCount * hash.SuffixLen)
var inBytes []byte
inBytes = append(inBytes, tupleBytes...)
+7 -7
View File
@@ -276,7 +276,7 @@ func (j *ChunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, sta
}
// Open implements tablePersister.
func (j *ChunkJournal) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (j *ChunkJournal) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
if name == journalAddr {
if err := j.maybeInit(ctx); err != nil {
return nil, err
@@ -287,12 +287,12 @@ func (j *ChunkJournal) Open(ctx context.Context, name addr, chunkCount uint32, s
}
// Exists implements tablePersister.
func (j *ChunkJournal) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (j *ChunkJournal) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
return j.persister.Exists(ctx, name, chunkCount, stats)
}
// PruneTableFiles implements tablePersister.
func (j *ChunkJournal) PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error {
func (j *ChunkJournal) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, mtime time.Time) error {
if j.backing.readOnly() {
return errReadOnlyManifest
}
@@ -326,7 +326,7 @@ func (j *ChunkJournal) Name() string {
}
// Update implements manifest.
func (j *ChunkJournal) Update(ctx context.Context, lastLock addr, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (j *ChunkJournal) Update(ctx context.Context, lastLock hash.Hash, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
if j.backing.readOnly() {
return j.contents, errReadOnlyManifest
}
@@ -372,7 +372,7 @@ func (j *ChunkJournal) Update(ctx context.Context, lastLock addr, next manifestC
}
// UpdateGCGen implements manifestGCGenUpdater.
func (j *ChunkJournal) UpdateGCGen(ctx context.Context, lastLock addr, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (j *ChunkJournal) UpdateGCGen(ctx context.Context, lastLock hash.Hash, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
if j.backing.readOnly() {
return j.contents, errReadOnlyManifest
} else if j.wr == nil {
@@ -589,7 +589,7 @@ func (jm *journalManifest) ParseIfExists(ctx context.Context, stats *Stats, read
}
// Update implements manifest.
func (jm *journalManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
func (jm *journalManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
if jm.readOnly() {
_, mc, err = jm.ParseIfExists(ctx, stats, nil)
if err != nil {
@@ -611,7 +611,7 @@ func (jm *journalManifest) Update(ctx context.Context, lastLock addr, newContent
}
// UpdateGCGen implements manifest.
func (jm *journalManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
func (jm *journalManifest) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
if jm.readOnly() {
_, mc, err = jm.ParseIfExists(ctx, stats, nil)
if err != nil {
+5 -10
View File
@@ -35,7 +35,7 @@ type journalChunkSource struct {
var _ chunkSource = journalChunkSource{}
func (s journalChunkSource) has(h addr) (bool, error) {
func (s journalChunkSource) has(h hash.Hash) (bool, error) {
return s.journal.hasAddr(h), nil
}
@@ -51,11 +51,11 @@ func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error)
return
}
func (s journalChunkSource) getCompressed(_ context.Context, h addr, _ *Stats) (CompressedChunk, error) {
func (s journalChunkSource) getCompressed(_ context.Context, h hash.Hash, _ *Stats) (CompressedChunk, error) {
return s.journal.getCompressedChunk(h)
}
func (s journalChunkSource) get(_ context.Context, h addr, _ *Stats) ([]byte, error) {
func (s journalChunkSource) get(_ context.Context, h hash.Hash, _ *Stats) ([]byte, error) {
cc, err := s.journal.getCompressedChunk(h)
if err != nil {
return nil, err
@@ -118,7 +118,7 @@ func (s journalChunkSource) uncompressedLen() (uint64, error) {
return s.journal.uncompressedSize(), nil
}
func (s journalChunkSource) hash() addr {
func (s journalChunkSource) hash() hash.Hash {
return journalAddr
}
@@ -170,7 +170,7 @@ func equalSpecs(left, right []tableSpec) bool {
if len(left) != len(right) {
return false
}
l := make(map[addr]struct{}, len(left))
l := make(map[hash.Hash]struct{}, len(left))
for _, s := range left {
l[s.name] = struct{}{}
}
@@ -181,8 +181,3 @@ func equalSpecs(left, right []tableSpec) bool {
}
return true
}
func emptyAddr(a addr) bool {
var b addr
return a == b
}
+4 -4
View File
@@ -234,11 +234,11 @@ func processIndexRecords(ctx context.Context, r io.ReadSeeker, sz int64, cb func
}
type lookup struct {
a addr
a hash.Hash
r Range
}
const lookupSize = addrSize + offsetSize + lengthSize
const lookupSize = hash.ByteLen + offsetSize + lengthSize
// serializeLookups serializes |lookups| using the table file chunk index format.
func serializeLookups(lookups []lookup) (index []byte) {
@@ -249,7 +249,7 @@ func serializeLookups(lookups []lookup) (index []byte) {
buf := index
for _, l := range lookups {
copy(buf, l.a[:])
buf = buf[addrSize:]
buf = buf[hash.ByteLen:]
binary.BigEndian.PutUint64(buf, l.r.Offset)
buf = buf[offsetSize:]
binary.BigEndian.PutUint32(buf, l.r.Length)
@@ -262,7 +262,7 @@ func deserializeLookups(index []byte) (lookups []lookup) {
lookups = make([]lookup, len(index)/lookupSize)
for i := range lookups {
copy(lookups[i].a[:], index)
index = index[addrSize:]
index = index[hash.ByteLen:]
lookups[i].r.Offset = binary.BigEndian.Uint64(index)
index = index[offsetSize:]
lookups[i].r.Length = binary.BigEndian.Uint32(index)
+2 -2
View File
@@ -190,12 +190,12 @@ func mustPayload(rec indexRec) []byte {
func makeLookups(cnt int) (lookups []lookup) {
lookups = make([]lookup, cnt)
buf := make([]byte, cnt*addrSize)
buf := make([]byte, cnt*hash.ByteLen)
rand.Read(buf)
var off uint64
for i := range lookups {
copy(lookups[i].a[:], buf)
buf = buf[addrSize:]
buf = buf[hash.ByteLen:]
lookups[i].r.Offset = off
l := rand.Uint32() % 1024
lookups[i].r.Length = l
+2 -2
View File
@@ -47,7 +47,7 @@ import (
type journalRec struct {
length uint32
kind journalRecKind
address addr
address hash.Hash
payload []byte
timestamp time.Time
checksum uint32
@@ -150,7 +150,7 @@ func writeChunkRecord(buf []byte, c CompressedChunk) (n uint32) {
return
}
func writeRootHashRecord(buf []byte, root addr) (n uint32) {
func writeRootHashRecord(buf []byte, root hash.Hash) (n uint32) {
// length
l := rootHashRecordSize()
writeUint32(buf[:journalRecLenSz], uint32(l))
+5 -5
View File
@@ -127,11 +127,11 @@ func TestProcessJournalRecords(t *testing.T) {
require.NoError(t, err)
}
func randomMemTable(cnt int) (*memTable, map[addr]chunks.Chunk) {
chnx := make(map[addr]chunks.Chunk, cnt)
func randomMemTable(cnt int) (*memTable, map[hash.Hash]chunks.Chunk) {
chnx := make(map[hash.Hash]chunks.Chunk, cnt)
for i := 0; i < cnt; i++ {
ch := chunks.NewChunk(randBuf(100))
chnx[addr(ch.Hash())] = ch
chnx[ch.Hash()] = ch
}
mt := newMemTable(uint64(cnt) * 256)
for a, ch := range chnx {
@@ -173,7 +173,7 @@ func makeChunkRecord() (journalRec, []byte) {
r := journalRec{
length: uint32(len(buf)),
kind: chunkJournalRecKind,
address: addr(cc.H),
address: cc.H,
payload: payload,
checksum: c,
}
@@ -181,7 +181,7 @@ func makeChunkRecord() (journalRec, []byte) {
}
func makeRootHashRecord() (journalRec, []byte) {
a := addr(hash.Of(randBuf(8)))
a := hash.Of(randBuf(8))
var n int
buf := make([]byte, rootHashRecordSize())
// length
+3 -3
View File
@@ -112,17 +112,17 @@ func TestReadRecordRanges(t *testing.T) {
require.NoError(t, err)
for h, rng := range ranges {
b, err := jcs.get(ctx, addr(h), &Stats{})
b, err := jcs.get(ctx, h, &Stats{})
assert.NoError(t, err)
ch1 := chunks.NewChunkWithHash(h, b)
assert.Equal(t, data[addr(h)], ch1)
assert.Equal(t, data[h], ch1)
start, stop := rng.Offset, uint32(rng.Offset)+rng.Length
cc2, err := NewCompressedChunk(h, buf[start:stop])
assert.NoError(t, err)
ch2, err := cc2.ToChunk()
assert.NoError(t, err)
assert.Equal(t, data[addr(h)], ch2)
assert.Equal(t, data[h], ch2)
}
}
+21 -21
View File
@@ -52,11 +52,11 @@ const (
)
var (
journalAddr = addr(hash.Parse(chunkJournalAddr))
journalAddr = hash.Parse(chunkJournalAddr)
)
func isJournalAddr(a addr) bool {
return a == journalAddr
func isJournalAddr(h hash.Hash) bool {
return h == journalAddr
}
func fileExists(path string) (bool, error) {
@@ -315,7 +315,7 @@ func (wr *journalWriter) corruptIndexRecovery(ctx context.Context) (err error) {
}
// hasAddr returns true if the journal contains a chunk with addr |h|.
func (wr *journalWriter) hasAddr(h addr) (ok bool) {
func (wr *journalWriter) hasAddr(h hash.Hash) (ok bool) {
wr.lock.RLock()
defer wr.lock.RUnlock()
_, ok = wr.ranges.get(h)
@@ -323,7 +323,7 @@ func (wr *journalWriter) hasAddr(h addr) (ok bool) {
}
// getCompressedChunk reads the CompressedChunks with addr |h|.
func (wr *journalWriter) getCompressedChunk(h addr) (CompressedChunk, error) {
func (wr *journalWriter) getCompressedChunk(h hash.Hash) (CompressedChunk, error) {
wr.lock.RLock()
defer wr.lock.RUnlock()
r, ok := wr.ranges.get(h)
@@ -338,7 +338,7 @@ func (wr *journalWriter) getCompressedChunk(h addr) (CompressedChunk, error) {
}
// getRange returns a Range for the chunk with addr |h|.
func (wr *journalWriter) getRange(h addr) (rng Range, ok bool, err error) {
func (wr *journalWriter) getRange(h hash.Hash) (rng Range, ok bool, err error) {
// callers will use |rng| to read directly from the
// journal file, so we must flush here
if err = wr.maybeFlush(); err != nil {
@@ -365,7 +365,7 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error {
}
wr.unsyncd += uint64(recordLen)
_ = writeChunkRecord(buf, cc)
wr.ranges.put(addr(cc.H), rng)
wr.ranges.put(cc.H, rng)
// To fulfill our durability guarantees, we technically only need to
// file.Sync() the journal when we commit a new root chunk. However,
@@ -402,7 +402,7 @@ func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error {
return err
}
wr.currentRoot = root
n := writeRootHashRecord(buf, addr(root))
n := writeRootHashRecord(buf, root)
if err = wr.flush(); err != nil {
return err
}
@@ -582,7 +582,7 @@ type rangeIndex struct {
// novel Ranges represent most recent chunks written to
// the journal. These Ranges have not yet been written to
// a journal index record.
novel *swiss.Map[addr, Range]
novel *swiss.Map[hash.Hash, Range]
// cached Ranges are bootstrapped from an out-of-band journal
// index file. To save memory, these Ranges are keyed by a 16-byte
@@ -592,14 +592,14 @@ type rangeIndex struct {
type addr16 [16]byte
func toAddr16(full addr) (prefix addr16) {
func toAddr16(full hash.Hash) (prefix addr16) {
copy(prefix[:], full[:])
return
}
func newRangeIndex() rangeIndex {
return rangeIndex{
novel: swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel),
novel: swiss.NewMap[hash.Hash, Range](journalIndexDefaultMaxNovel),
cached: swiss.NewMap[addr16, Range](0),
}
}
@@ -608,20 +608,20 @@ func estimateRangeCount(info os.FileInfo) uint32 {
return uint32(info.Size()/32) + journalIndexDefaultMaxNovel
}
func (idx rangeIndex) get(a addr) (rng Range, ok bool) {
rng, ok = idx.novel.Get(a)
func (idx rangeIndex) get(h hash.Hash) (rng Range, ok bool) {
rng, ok = idx.novel.Get(h)
if !ok {
rng, ok = idx.cached.Get(toAddr16(a))
rng, ok = idx.cached.Get(toAddr16(h))
}
return
}
func (idx rangeIndex) put(a addr, rng Range) {
idx.novel.Put(a, rng)
func (idx rangeIndex) put(h hash.Hash, rng Range) {
idx.novel.Put(h, rng)
}
func (idx rangeIndex) putCached(a addr, rng Range) {
idx.cached.Put(toAddr16(a), rng)
func (idx rangeIndex) putCached(h hash.Hash, rng Range) {
idx.cached.Put(toAddr16(h), rng)
}
func (idx rangeIndex) count() uint32 {
@@ -634,7 +634,7 @@ func (idx rangeIndex) novelCount() int {
func (idx rangeIndex) novelLookups() (lookups []lookup) {
lookups = make([]lookup, 0, idx.novel.Count())
idx.novel.Iter(func(a addr, r Range) (stop bool) {
idx.novel.Iter(func(a hash.Hash, r Range) (stop bool) {
lookups = append(lookups, lookup{a: a, r: r})
return
})
@@ -642,10 +642,10 @@ func (idx rangeIndex) novelLookups() (lookups []lookup) {
}
func (idx rangeIndex) flatten() rangeIndex {
idx.novel.Iter(func(a addr, r Range) (stop bool) {
idx.novel.Iter(func(a hash.Hash, r Range) (stop bool) {
idx.cached.Put(toAddr16(a), r)
return
})
idx.novel = swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel)
idx.novel = swiss.NewMap[hash.Hash, Range](journalIndexDefaultMaxNovel)
return idx
}
+8 -8
View File
@@ -234,7 +234,7 @@ func TestJournalWriterBootstrap(t *testing.T) {
}
}
func validateAllLookups(t *testing.T, j *journalWriter, data map[addr]CompressedChunk) {
func validateAllLookups(t *testing.T, j *journalWriter, data map[hash.Hash]CompressedChunk) {
// move |data| to addr16-keyed map
prefixMap := make(map[addr16]CompressedChunk, len(data))
var prefix addr16
@@ -249,7 +249,7 @@ func validateAllLookups(t *testing.T, j *journalWriter, data map[addr]Compressed
}
func iterRangeIndex(idx rangeIndex, cb func(addr16, Range) (stop bool)) {
idx.novel.Iter(func(a addr, r Range) (stop bool) {
idx.novel.Iter(func(a hash.Hash, r Range) (stop bool) {
return cb(toAddr16(a), r)
})
idx.cached.Iter(cb)
@@ -285,7 +285,7 @@ func newTestFilePath(t *testing.T) string {
func TestJournalIndexBootstrap(t *testing.T) {
// potentially indexed region of a journal
type epoch struct {
records map[addr]CompressedChunk
records map[hash.Hash]CompressedChunk
last hash.Hash
}
@@ -396,8 +396,8 @@ func TestJournalIndexBootstrap(t *testing.T) {
}
}
func randomCompressedChunks(cnt int) (compressed map[addr]CompressedChunk) {
compressed = make(map[addr]CompressedChunk)
func randomCompressedChunks(cnt int) (compressed map[hash.Hash]CompressedChunk) {
compressed = make(map[hash.Hash]CompressedChunk)
var buf []byte
for i := 0; i < cnt; i++ {
k := rand.Intn(51) + 50
@@ -407,7 +407,7 @@ func randomCompressedChunks(cnt int) (compressed map[addr]CompressedChunk) {
}
c := chunks.NewChunk(buf[:k])
buf = buf[k:]
compressed[addr(c.Hash())] = ChunkToCompressedChunk(c)
compressed[c.Hash()] = ChunkToCompressedChunk(c)
}
return
}
@@ -427,10 +427,10 @@ func TestRangeIndex(t *testing.T) {
data := randomCompressedChunks(1024)
idx := newRangeIndex()
for _, c := range data {
idx.put(addr(c.Hash()), Range{})
idx.put(c.Hash(), Range{})
}
for _, c := range data {
_, ok := idx.get(addr(c.Hash()))
_, ok := idx.get(c.Hash())
assert.True(t, ok)
}
assert.Equal(t, len(data), idx.novelCount())
+20 -20
View File
@@ -25,6 +25,7 @@ import (
"context"
"crypto/sha512"
"errors"
"fmt"
"strconv"
"sync"
"time"
@@ -74,7 +75,7 @@ type manifestUpdater interface {
// If writeHook is non-nil, it will be invoked while the implementation is
// guaranteeing exclusive access to the manifest. This allows for testing
// of race conditions.
Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error)
Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error)
}
type manifestGCGenUpdater interface {
@@ -87,7 +88,7 @@ type manifestGCGenUpdater interface {
// If writeHook is non-nil, it will be invoked while the implementation is
// guaranteeing exclusive access to the manifest. This allows for testing
// of race conditions.
UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error)
UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error)
}
// ManifestInfo is an interface for retrieving data from a manifest outside of this package
@@ -113,9 +114,9 @@ const (
type manifestContents struct {
manifestVers string
nbfVers string
lock addr
lock hash.Hash
root hash.Hash
gcGen addr
gcGen hash.Hash
specs []tableSpec
// An appendix is a list of |tableSpecs| that track an auxillary collection of
@@ -192,16 +193,16 @@ func (mc manifestContents) removeAppendixSpecs() (manifestContents, []tableSpec)
}, removed
}
func (mc manifestContents) getSpecSet() (ss map[addr]struct{}) {
func (mc manifestContents) getSpecSet() (ss map[hash.Hash]struct{}) {
return toSpecSet(mc.specs)
}
func (mc manifestContents) getAppendixSet() (ss map[addr]struct{}) {
func (mc manifestContents) getAppendixSet() (ss map[hash.Hash]struct{}) {
return toSpecSet(mc.appendix)
}
func toSpecSet(specs []tableSpec) (ss map[addr]struct{}) {
ss = make(map[addr]struct{}, len(specs))
func toSpecSet(specs []tableSpec) (ss map[hash.Hash]struct{}) {
ss = make(map[hash.Hash]struct{}, len(specs))
for _, ts := range specs {
ss[ts.name] = struct{}{}
}
@@ -209,7 +210,7 @@ func toSpecSet(specs []tableSpec) (ss map[addr]struct{}) {
}
func (mc manifestContents) size() (size uint64) {
size += uint64(len(mc.nbfVers)) + addrSize + hash.ByteLen
size += uint64(len(mc.nbfVers)) + hash.ByteLen + hash.ByteLen
for _, sp := range mc.specs {
size += uint64(len(sp.name)) + uint32Size // for sp.chunkCount
}
@@ -292,7 +293,7 @@ func (mm manifestManager) UnlockForUpdate() error {
return mm.locks.unlockForUpdate(mm.Name())
}
func (mm manifestManager) updateWillFail(lastLock addr) (cached manifestContents, doomed bool) {
func (mm manifestManager) updateWillFail(lastLock hash.Hash) (cached manifestContents, doomed bool) {
if upstream, _, hit := mm.cache.Get(mm.Name()); hit {
if lastLock != upstream.lock {
doomed, cached = true, upstream
@@ -346,7 +347,7 @@ func (mm manifestManager) Fetch(ctx context.Context, stats *Stats) (exists bool,
// Callers MUST protect uses of Update with Lock/UnlockForUpdate.
// Update does not call Lock/UnlockForUpdate() on its own because it is
// intended to be used in a larger critical section along with updateWillFail.
func (mm manifestManager) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) {
func (mm manifestManager) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) {
if upstream, _, hit := mm.cache.Get(mm.Name()); hit {
if lastLock != upstream.lock {
return upstream, nil
@@ -385,7 +386,7 @@ func (mm manifestManager) Update(ctx context.Context, lastLock addr, newContents
// UpdateGCGen will update the manifest with a new garbage collection generation.
// Callers MUST protect uses of UpdateGCGen with Lock/UnlockForUpdate.
func (mm manifestManager) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) {
func (mm manifestManager) UpdateGCGen(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (contents manifestContents, err error) {
updater, ok := mm.m.(manifestGCGenUpdater)
if !ok {
return manifestContents{}, errors.New("manifest does not support updating gc gen")
@@ -443,7 +444,7 @@ type TableSpecInfo interface {
}
type tableSpec struct {
name addr
name hash.Hash
chunkCount uint32
}
@@ -468,10 +469,10 @@ func parseSpecs(tableInfo []string) ([]tableSpec, error) {
specs := make([]tableSpec, len(tableInfo)/2)
for i := range specs {
var err error
specs[i].name, err = parseAddr(tableInfo[2*i])
if err != nil {
return nil, err
var ok bool
specs[i].name, ok = hash.MaybeParse(tableInfo[2*i])
if !ok {
return nil, fmt.Errorf("invalid table file name: %s", tableInfo[2*i])
}
c, err := strconv.ParseUint(tableInfo[2*i+1], 10, 32)
@@ -500,7 +501,7 @@ func formatSpecs(specs []tableSpec, tableInfo []string) {
// persisted manifest against the lock hash it saw last time it loaded the
// contents of a manifest. If they do not match, the client must not update
// the persisted manifest.
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) (lock addr) {
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) hash.Hash {
blockHash := sha512.New()
blockHash.Write(root[:])
for _, spec := range appendix {
@@ -512,6 +513,5 @@ func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) (
}
var h []byte
h = blockHash.Sum(h) // Appends hash to h
copy(lock[:], h)
return
return hash.New(h[:hash.ByteLen])
}
+13 -13
View File
@@ -54,7 +54,7 @@ func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) {
func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error) {
for _, chunk := range chunks {
res := mt.addChunk(addr(chunk.Hash()), chunk.Data())
res := mt.addChunk(chunk.Hash(), chunk.Data())
if res == chunkNotAdded {
return "", nil, errors.New("didn't create this memory table with enough space to add all the chunks")
}
@@ -75,7 +75,7 @@ func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error
}
type memTable struct {
chunks map[addr][]byte
chunks map[hash.Hash][]byte
order []hasRecord // Must maintain the invariant that these are sorted by rec.order
pendingRefs []hasRecord
maxData, totalData uint64
@@ -84,10 +84,10 @@ type memTable struct {
}
func newMemTable(memTableSize uint64) *memTable {
return &memTable{chunks: map[addr][]byte{}, maxData: memTableSize}
return &memTable{chunks: map[hash.Hash][]byte{}, maxData: memTableSize}
}
func (mt *memTable) addChunk(h addr, data []byte) addChunkResult {
func (mt *memTable) addChunk(h hash.Hash, data []byte) addChunkResult {
if len(data) == 0 {
panic("NBS blocks cannot be zero length")
}
@@ -113,10 +113,10 @@ func (mt *memTable) addChunk(h addr, data []byte) addChunkResult {
func (mt *memTable) addChildRefs(addrs hash.HashSet) {
for h := range addrs {
a := addr(h)
h := h
mt.pendingRefs = append(mt.pendingRefs, hasRecord{
a: &a,
prefix: a.Prefix(),
a: &h,
prefix: h.Prefix(),
order: len(mt.pendingRefs),
})
}
@@ -130,7 +130,7 @@ func (mt *memTable) uncompressedLen() (uint64, error) {
return mt.totalData, nil
}
func (mt *memTable) has(h addr) (bool, error) {
func (mt *memTable) has(h hash.Hash) (bool, error) {
_, has := mt.chunks[h]
return has, nil
}
@@ -157,7 +157,7 @@ func (mt *memTable) hasMany(addrs []hasRecord) (bool, error) {
return remaining, nil
}
func (mt *memTable) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
func (mt *memTable) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
return mt.chunks[h], nil
}
@@ -200,10 +200,10 @@ func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) er
return nil
}
func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []byte, count uint32, err error) {
func (mt *memTable) write(haver chunkReader, stats *Stats) (name hash.Hash, data []byte, count uint32, err error) {
numChunks := uint64(len(mt.order))
if numChunks == 0 {
return addr{}, nil, 0, fmt.Errorf("mem table cannot write with zero chunks")
return hash.Hash{}, nil, 0, fmt.Errorf("mem table cannot write with zero chunks")
}
maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData)
// todo: memory quota
@@ -215,7 +215,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []by
_, err := haver.hasMany(mt.order)
if err != nil {
return addr{}, nil, 0, err
return hash.Hash{}, nil, 0, err
}
sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write
@@ -231,7 +231,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []by
tableSize, name, err := tw.finish()
if err != nil {
return addr{}, nil, 0, err
return hash.Hash{}, nil, 0, err
}
if count > 0 {
+3 -2
View File
@@ -35,6 +35,7 @@ import (
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)
@@ -243,7 +244,7 @@ func (o *outOfLineSnappy) Encode(dst, src []byte) []byte {
type chunkReaderGroup []chunkReader
func (crg chunkReaderGroup) has(h addr) (bool, error) {
func (crg chunkReaderGroup) has(h hash.Hash) (bool, error) {
for _, haver := range crg {
ok, err := haver.has(h)
@@ -258,7 +259,7 @@ func (crg chunkReaderGroup) has(h addr) (bool, error) {
return false, nil
}
func (crg chunkReaderGroup) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
func (crg chunkReaderGroup) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
for _, haver := range crg {
if data, err := haver.get(ctx, h, stats); err != nil {
return nil, err
+3 -6
View File
@@ -14,7 +14,9 @@
package nbs
import "github.com/dolthub/dolt/go/store/d"
import (
"github.com/dolthub/dolt/go/store/d"
)
func mustUint32(val uint32, err error) uint32 {
d.PanicIfError(err)
@@ -25,8 +27,3 @@ func mustUint64(val uint64, err error) uint64 {
d.PanicIfError(err)
return val
}
func mustAddr(h addr, err error) addr {
d.PanicIfError(err)
return h
}
+4 -3
View File
@@ -26,6 +26,7 @@ import (
"github.com/dolthub/dolt/go/store/blobstore"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
type noConjoinBlobstorePersister struct {
@@ -68,15 +69,15 @@ func (bsp *noConjoinBlobstorePersister) ConjoinAll(ctx context.Context, sources
}
// Open a table named |name|, containing |chunkCount| chunks.
func (bsp *noConjoinBlobstorePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (bsp *noConjoinBlobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats)
}
func (bsp *noConjoinBlobstorePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (bsp *noConjoinBlobstorePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
return bsp.bs.Exists(ctx, name.String())
}
func (bsp *noConjoinBlobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []addr, t time.Time) error {
func (bsp *noConjoinBlobstorePersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error {
return nil
}
+19 -19
View File
@@ -446,7 +446,7 @@ func (fm *fakeManifest) Name() string { return fm.name }
func (fm *fakeManifest) ParseIfExists(ctx context.Context, stats *Stats, readHook func() error) (bool, manifestContents, error) {
fm.mu.RLock()
defer fm.mu.RUnlock()
if fm.contents.lock != (addr{}) {
if !fm.contents.lock.IsEmpty() {
return true, fm.contents, nil
}
@@ -458,7 +458,7 @@ func (fm *fakeManifest) ParseIfExists(ctx context.Context, stats *Stats, readHoo
// to |newLock|, |fm.root| is set to |newRoot|, and the contents of |specs|
// replace |fm.tableSpecs|. If |lastLock| != |fm.lock|, then the update
// fails. Regardless of success or failure, the current state is returned.
func (fm *fakeManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
func (fm *fakeManifest) Update(ctx context.Context, lastLock hash.Hash, newContents manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
fm.mu.Lock()
defer fm.mu.Unlock()
if fm.contents.lock == lastLock {
@@ -467,7 +467,7 @@ func (fm *fakeManifest) Update(ctx context.Context, lastLock addr, newContents m
nbfVers: newContents.nbfVers,
lock: newContents.lock,
root: newContents.root,
gcGen: addr(hash.Hash{}),
gcGen: hash.Hash{},
}
fm.contents.specs = make([]tableSpec, len(newContents.specs))
copy(fm.contents.specs, newContents.specs)
@@ -479,13 +479,13 @@ func (fm *fakeManifest) Update(ctx context.Context, lastLock addr, newContents m
return fm.contents, nil
}
func (fm *fakeManifest) set(version string, lock addr, root hash.Hash, specs, appendix []tableSpec) {
func (fm *fakeManifest) set(version string, lock hash.Hash, root hash.Hash, specs, appendix []tableSpec) {
fm.contents = manifestContents{
manifestVers: StorageVersion,
nbfVers: version,
lock: lock,
root: root,
gcGen: addr(hash.Hash{}),
gcGen: hash.Hash{},
specs: specs,
appendix: appendix,
}
@@ -496,14 +496,14 @@ func newFakeTableSet(q MemoryQuotaProvider) tableSet {
}
func newFakeTablePersister(q MemoryQuotaProvider) fakeTablePersister {
return fakeTablePersister{q, map[addr][]byte{}, map[addr]bool{}, map[addr]bool{}, &sync.RWMutex{}}
return fakeTablePersister{q, map[hash.Hash][]byte{}, map[hash.Hash]bool{}, map[hash.Hash]bool{}, &sync.RWMutex{}}
}
type fakeTablePersister struct {
q MemoryQuotaProvider
sources map[addr][]byte
sourcesToFail map[addr]bool
opened map[addr]bool
sources map[hash.Hash][]byte
sourcesToFail map[hash.Hash]bool
opened map[hash.Hash]bool
mu *sync.RWMutex
}
@@ -561,7 +561,7 @@ func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSourc
return chunkSourceAdapter{cs, name}, func() {}, nil
}
func compactSourcesToBuffer(sources chunkSources) (name addr, data []byte, chunkCount uint32, err error) {
func compactSourcesToBuffer(sources chunkSources) (name hash.Hash, data []byte, chunkCount uint32, err error) {
totalData := uint64(0)
for _, src := range sources {
chunkCount += mustUint32(src.count())
@@ -599,19 +599,19 @@ func compactSourcesToBuffer(sources chunkSources) (name addr, data []byte, chunk
}
if errString != "" {
return addr{}, nil, 0, fmt.Errorf(errString)
return hash.Hash{}, nil, 0, fmt.Errorf(errString)
}
tableSize, name, err := tw.finish()
if err != nil {
return addr{}, nil, 0, err
return hash.Hash{}, nil, 0, err
}
return name, buff[:tableSize], chunkCount, nil
}
func (ftp fakeTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
func (ftp fakeTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
ftp.mu.Lock()
defer ftp.mu.Unlock()
@@ -633,14 +633,14 @@ func (ftp fakeTablePersister) Open(ctx context.Context, name addr, chunkCount ui
return chunkSourceAdapter{cs, name}, nil
}
func (ftp fakeTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
func (ftp fakeTablePersister) Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error) {
if _, ok := ftp.sourcesToFail[name]; ok {
return false, errors.New("intentional failure")
}
return true, nil
}
func (ftp fakeTablePersister) PruneTableFiles(_ context.Context, _ func() []addr, _ time.Time) error {
func (ftp fakeTablePersister) PruneTableFiles(_ context.Context, _ func() []hash.Hash, _ time.Time) error {
return chunks.ErrUnsupportedOperation
}
@@ -658,18 +658,18 @@ func extractAllChunks(ctx context.Context, src chunkSource, cb func(rec extractR
return err
}
var a addr
for i := uint32(0); i < index.chunkCount(); i++ {
_, err = index.indexEntry(i, &a)
var h hash.Hash
_, err = index.indexEntry(i, &h)
if err != nil {
return err
}
data, err := src.get(ctx, a, nil)
data, err := src.get(ctx, h, nil)
if err != nil {
return err
}
cb(extractRecord{a: a, data: data})
cb(extractRecord{a: h, data: data})
}
return
}
+2 -2
View File
@@ -77,7 +77,7 @@ type fakeS3Multipart struct {
etags []string
}
func (m *fakeS3) readerForTable(ctx context.Context, name addr) (chunkReader, error) {
func (m *fakeS3) readerForTable(ctx context.Context, name hash.Hash) (chunkReader, error) {
m.mu.Lock()
defer m.mu.Unlock()
if buff, present := m.data[name.String()]; present {
@@ -95,7 +95,7 @@ func (m *fakeS3) readerForTable(ctx context.Context, name addr) (chunkReader, er
return nil, nil
}
func (m *fakeS3) readerForTableWithNamespace(ctx context.Context, ns string, name addr) (chunkReader, error) {
func (m *fakeS3) readerForTableWithNamespace(ctx context.Context, ns string, name hash.Hash) (chunkReader, error) {
m.mu.Lock()
defer m.mu.Unlock()
key := name.String()
+8 -6
View File
@@ -38,6 +38,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/jpillora/backoff"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/store/hash"
)
const (
@@ -47,7 +49,7 @@ const (
type s3TableReaderAt struct {
s3 *s3ObjectReader
h addr
h hash.Hash
}
func (s3tra *s3TableReaderAt) Close() error {
@@ -81,11 +83,11 @@ func (s3or *s3ObjectReader) key(k string) string {
return k
}
func (s3or *s3ObjectReader) Reader(ctx context.Context, name addr) (io.ReadCloser, error) {
func (s3or *s3ObjectReader) Reader(ctx context.Context, name hash.Hash) (io.ReadCloser, error) {
return s3or.reader(ctx, name)
}
func (s3or *s3ObjectReader) ReadAt(ctx context.Context, name addr, p []byte, off int64, stats *Stats) (n int, err error) {
func (s3or *s3ObjectReader) ReadAt(ctx context.Context, name hash.Hash, p []byte, off int64, stats *Stats) (n int, err error) {
t1 := time.Now()
defer func() {
@@ -105,7 +107,7 @@ func s3RangeHeader(off, length int64) string {
const maxS3ReadFromEndReqSize = 256 * 1024 * 1024 // 256MB
const preferredS3ReadFromEndReqSize = 128 * 1024 * 1024 // 128MB
func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name addr, p []byte, stats *Stats) (n int, sz uint64, err error) {
func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name hash.Hash, p []byte, stats *Stats) (n int, sz uint64, err error) {
defer func(t1 time.Time) {
stats.S3BytesPerRead.Sample(uint64(len(p)))
stats.S3ReadLatency.SampleTimeSince(t1)
@@ -149,7 +151,7 @@ func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name addr, p []byte
return s3or.readRange(ctx, name, p, fmt.Sprintf("%s=-%d", s3RangePrefix, len(p)))
}
func (s3or *s3ObjectReader) reader(ctx context.Context, name addr) (io.ReadCloser, error) {
func (s3or *s3ObjectReader) reader(ctx context.Context, name hash.Hash) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(s3or.bucket),
Key: aws.String(s3or.key(name.String())),
@@ -161,7 +163,7 @@ func (s3or *s3ObjectReader) reader(ctx context.Context, name addr) (io.ReadClose
return result.Body, nil
}
func (s3or *s3ObjectReader) readRange(ctx context.Context, name addr, p []byte, rangeHeader string) (n int, sz uint64, err error) {
func (s3or *s3ObjectReader) readRange(ctx context.Context, name hash.Hash, p []byte, rangeHeader string) (n int, sz uint64, err error) {
read := func() (int, uint64, error) {
if s3or.readRl != nil {
s3or.readRl <- struct{}{}
+33 -42
View File
@@ -105,7 +105,7 @@ type NomsBlockStore struct {
mtSize uint64
putCount uint64
hasCache *lru.TwoQueueCache[addr, struct{}]
hasCache *lru.TwoQueueCache[hash.Hash, struct{}]
stats *Stats
}
@@ -254,11 +254,9 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
var addCount int
for h, count := range updates {
a := addr(h)
if _, ok := currSpecs[a]; !ok {
if _, ok := currSpecs[h]; !ok {
addCount++
contents.specs = append(contents.specs, tableSpec{a, count})
contents.specs = append(contents.specs, tableSpec{h, count})
}
}
@@ -346,14 +344,12 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat
appendixSpecs := make([]tableSpec, 0)
var addCount int
for h, count := range updates {
a := addr(h)
if option == ManifestAppendixOption_Set {
appendixSpecs = append(appendixSpecs, tableSpec{a, count})
appendixSpecs = append(appendixSpecs, tableSpec{h, count})
} else {
if _, ok := currAppendixSpecs[a]; !ok {
if _, ok := currAppendixSpecs[h]; !ok {
addCount++
appendixSpecs = append(appendixSpecs, tableSpec{a, count})
appendixSpecs = append(appendixSpecs, tableSpec{h, count})
}
}
}
@@ -399,13 +395,12 @@ func (nbs *NomsBlockStore) checkAllManifestUpdatesExist(ctx context.Context, upd
h := h
c := c
eg.Go(func() error {
a := addr(h)
ok, err := nbs.p.Exists(ctx, a, c, nbs.stats)
ok, err := nbs.p.Exists(ctx, h, c, nbs.stats)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("missing table file referenced in UpdateManifest call: %v", a)
return fmt.Errorf("missing table file referenced in UpdateManifest call: %v", h)
}
return nil
})
@@ -475,12 +470,12 @@ func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root has
}
// Appendix table files should come first in specs
for h, c := range appendixTableFiles {
s := tableSpec{name: addr(h), chunkCount: c}
s := tableSpec{name: h, chunkCount: c}
contents.appendix = append(contents.appendix, s)
contents.specs = append(contents.specs, s)
}
for h, c := range tableFiles {
s := tableSpec{name: addr(h), chunkCount: c}
s := tableSpec{name: h, chunkCount: c}
contents.specs = append(contents.specs, s)
}
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
@@ -640,7 +635,7 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, mm manifestManager
memTableSize = defaultMemTableSize
}
hasCache, err := lru.New2Q[addr, struct{}](hasCacheSize)
hasCache, err := lru.New2Q[hash.Hash, struct{}](hasCacheSize)
if err != nil {
return nil, err
}
@@ -790,9 +785,8 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
if nbs.mt == nil {
nbs.mt = newMemTable(nbs.mtSize)
}
a := addr(ch.Hash())
addChunkRes = nbs.mt.addChunk(a, ch.Data())
addChunkRes = nbs.mt.addChunk(ch.Hash(), ch.Data())
if addChunkRes == chunkNotAdded {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
if err != nil {
@@ -802,7 +796,7 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
nbs.addPendingRefsToHasCache()
nbs.tables = ts
nbs.mt = newMemTable(nbs.mtSize)
addChunkRes = nbs.mt.addChunk(a, ch.Data())
addChunkRes = nbs.mt.addChunk(ch.Hash(), ch.Data())
}
if addChunkRes == chunkAdded || addChunkRes == chunkExists {
if nbs.keeperFunc != nil && nbs.keeperFunc(ch.Hash()) {
@@ -826,18 +820,17 @@ type refCheck func(reqs []hasRecord) (hash.HashSet, error)
func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) error {
if !root.IsEmpty() {
a := addr(root)
if _, ok := nbs.hasCache.Get(a); !ok {
if _, ok := nbs.hasCache.Get(root); !ok {
var hr [1]hasRecord
hr[0].a = &a
hr[0].prefix = a.Prefix()
hr[0].a = &root
hr[0].prefix = root.Prefix()
absent, err := checker(hr[:])
if err != nil {
return err
} else if absent.Size() > 0 {
return fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
}
nbs.hasCache.Add(a, struct{}{})
nbs.hasCache.Add(root, struct{}{})
}
}
return nil
@@ -853,14 +846,13 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
nbs.stats.ChunksPerGet.Sample(1)
}()
a := addr(h)
data, tables, err := func() ([]byte, chunkReader, error) {
var data []byte
nbs.mu.RLock()
defer nbs.mu.RUnlock()
if nbs.mt != nil {
var err error
data, err = nbs.mt.get(ctx, a, nbs.stats)
data, err = nbs.mt.get(ctx, h, nbs.stats)
if err != nil {
return nil, nil, err
@@ -877,7 +869,7 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
return chunks.NewChunkWithHash(h, data), nil
}
data, err = tables.get(ctx, a, nbs.stats)
data, err = tables.get(ctx, h, nbs.stats)
if err != nil {
return chunks.EmptyChunk, err
@@ -954,10 +946,10 @@ func toGetRecords(hashes hash.HashSet) []getRecord {
reqs := make([]getRecord, len(hashes))
idx := 0
for h := range hashes {
a := addr(h)
h := h
reqs[idx] = getRecord{
a: &a,
prefix: a.Prefix(),
a: &h,
prefix: h.Prefix(),
}
idx++
}
@@ -1001,13 +993,12 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
nbs.stats.AddressesPerHas.Sample(1)
}()
a := addr(h)
has, tables, err := func() (bool, chunkReader, error) {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
if nbs.mt != nil {
has, err := nbs.mt.has(a)
has, err := nbs.mt.has(h)
if err != nil {
return false, nil, err
@@ -1024,7 +1015,7 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
}
if !has {
has, err = tables.has(a)
has, err = tables.has(h)
if err != nil {
return false, err
@@ -1079,7 +1070,7 @@ func (nbs *NomsBlockStore) hasMany(reqs []hasRecord) (hash.HashSet, error) {
absent := hash.HashSet{}
for _, r := range reqs {
if !r.has {
absent.Insert(hash.New(r.a[:]))
absent.Insert(*r.a)
}
}
return absent, nil
@@ -1089,10 +1080,10 @@ func toHasRecords(hashes hash.HashSet) []hasRecord {
reqs := make([]hasRecord, len(hashes))
idx := 0
for h := range hashes {
a := addr(h)
h := h
reqs[idx] = hasRecord{
a: &a,
prefix: a.Prefix(),
a: &h,
prefix: h.Prefix(),
order: idx,
}
idx++
@@ -1417,7 +1408,7 @@ func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []chunks.Tab
return contents.GetRoot(), allTableFiles, appendixTableFiles, nil
}
func getTableFiles(css map[addr]chunkSource, contents manifestContents, numSpecs int, specFunc func(mc manifestContents, idx int) tableSpec) ([]chunks.TableFile, error) {
func getTableFiles(css map[hash.Hash]chunkSource, contents manifestContents, numSpecs int, specFunc func(mc manifestContents, idx int) tableSpec) ([]chunks.TableFile, error) {
tableFiles := make([]chunks.TableFile, 0)
if numSpecs == 0 {
return tableFiles, nil
@@ -1461,8 +1452,8 @@ func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) {
return size, nil
}
func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[addr]chunkSource, error) {
css := make(map[addr]chunkSource, len(nbs.tables.upstream)+len(nbs.tables.novel))
func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[hash.Hash]chunkSource, error) {
css := make(map[hash.Hash]chunkSource, len(nbs.tables.upstream)+len(nbs.tables.novel))
for _, cs := range nbs.tables.upstream {
css[cs.hash()] = cs
}
@@ -1543,10 +1534,10 @@ func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) {
func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context, checker refCheck) (err error) {
mtime := time.Now()
return nbs.p.PruneTableFiles(ctx, func() []addr {
return nbs.p.PruneTableFiles(ctx, func() []hash.Hash {
nbs.mu.Lock()
defer nbs.mu.Unlock()
keepers := make([]addr, 0, len(nbs.tables.novel)+len(nbs.tables.upstream))
keepers := make([]hash.Hash, 0, len(nbs.tables.novel)+len(nbs.tables.upstream))
for a, _ := range nbs.tables.novel {
keepers = append(keepers, a)
}
+1 -1
View File
@@ -49,7 +49,7 @@ func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, no
// create a v5 manifest
fm, err := getFileManifest(ctx, nomsDir, asyncFlush)
require.NoError(t, err)
_, err = fm.Update(ctx, addr{}, manifestContents{}, &Stats{}, nil)
_, err = fm.Update(ctx, hash.Hash{}, manifestContents{}, &Stats{}, nil)
require.NoError(t, err)
q = NewUnlimitedMemQuotaProvider()
+11 -52
View File
@@ -22,11 +22,8 @@
package nbs
import (
"bytes"
"context"
"crypto/sha512"
"encoding/base32"
"encoding/binary"
"hash/crc32"
"io"
@@ -123,9 +120,6 @@ import (
*/
const (
addrSize = 20
addrPrefixSize = 8
addrSuffixSize = addrSize - addrPrefixSize
uint64Size = 8
uint32Size = 4
ordinalSize = uint32Size
@@ -134,7 +128,7 @@ const (
magicNumber = "\xff\xb5\xd8\xc2\x24\x63\xee\x50"
magicNumberSize = 8 //len(magicNumber)
footerSize = uint32Size + uint64Size + magicNumberSize
prefixTupleSize = addrPrefixSize + ordinalSize
prefixTupleSize = hash.PrefixLen + ordinalSize
checksumSize = uint32Size
maxChunkSize = 0xffffffff // Snappy won't compress slices bigger than this
)
@@ -145,50 +139,15 @@ func crc(b []byte) uint32 {
return crc32.Update(0, crcTable, b)
}
func computeAddrDefault(data []byte) addr {
func computeHashDefault(data []byte) hash.Hash {
r := sha512.Sum512(data)
h := addr{}
copy(h[:], r[:addrSize])
return h
return hash.New(r[:hash.ByteLen])
}
var computeAddr = computeAddrDefault
type addr [addrSize]byte
var encoding = base32.NewEncoding("0123456789abcdefghijklmnopqrstuv")
func (a addr) String() string {
return encoding.EncodeToString(a[:])
}
func (a addr) Prefix() uint64 {
return binary.BigEndian.Uint64(a[:])
}
func (a addr) Suffix() []byte {
return a[addrPrefixSize:]
}
func parseAddr(str string) (addr, error) {
var h addr
_, err := encoding.Decode(h[:], []byte(str))
return h, err
}
func ValidateAddr(s string) bool {
_, err := encoding.DecodeString(s)
return err == nil
}
type addrSlice []addr
func (hs addrSlice) Len() int { return len(hs) }
func (hs addrSlice) Less(i, j int) bool { return bytes.Compare(hs[i][:], hs[j][:]) < 0 }
func (hs addrSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
var computeAddr = computeHashDefault
type hasRecord struct {
a *addr
a *hash.Hash
prefix uint64
order int
has bool
@@ -207,7 +166,7 @@ func (hs hasRecordByOrder) Less(i, j int) bool { return hs[i].order < hs[j].orde
func (hs hasRecordByOrder) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
type getRecord struct {
a *addr
a *hash.Hash
prefix uint64
found bool
}
@@ -219,21 +178,21 @@ func (hs getRecordByPrefix) Less(i, j int) bool { return hs[i].prefix < hs[j].pr
func (hs getRecordByPrefix) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
type extractRecord struct {
a addr
a hash.Hash
data []byte
err error
}
type chunkReader interface {
// has returns true if a chunk with addr |h| is present.
has(h addr) (bool, error)
has(h hash.Hash) (bool, error)
// hasMany sets hasRecord.has to true for each present hasRecord query, it returns
// true if any hasRecord query was not found in this chunkReader.
hasMany(addrs []hasRecord) (bool, error)
// get returns the chunk data for a chunk with addr |h| if present, and nil otherwise.
get(ctx context.Context, h addr, stats *Stats) ([]byte, error)
get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error)
// getMany sets getRecord.found to true, and calls |found| for each present getRecord query.
// It returns true if any getRecord query was not found in this chunkReader.
@@ -257,7 +216,7 @@ type chunkSource interface {
chunkReader
// hash returns the hash address of this chunkSource.
hash() addr
hash() hash.Hash
// opens a Reader to the first byte of the chunkData segment of this table.
reader(context.Context) (io.ReadCloser, uint64, error)
@@ -281,7 +240,7 @@ type chunkSource interface {
type chunkSources []chunkSource
type chunkSourceSet map[addr]chunkSource
type chunkSourceSet map[hash.Hash]chunkSource
func copyChunkSourceSet(s chunkSourceSet) (cp chunkSourceSet) {
cp = make(chunkSourceSet, len(s))
+38 -37
View File
@@ -44,16 +44,16 @@ type tableIndex interface {
// entrySuffixMatches returns true if the entry at index |idx| matches
// the suffix of the address |h|. Used by |lookup| after finding
// matching indexes based on |Prefixes|.
entrySuffixMatches(idx uint32, h *addr) (bool, error)
entrySuffixMatches(idx uint32, h *hash.Hash) (bool, error)
// indexEntry returns the |indexEntry| at |idx|. Optionally puts the
// full address of that entry in |a| if |a| is not |nil|.
indexEntry(idx uint32, a *addr) (indexEntry, error)
indexEntry(idx uint32, a *hash.Hash) (indexEntry, error)
// lookup returns an |indexEntry| for the chunk corresponding to the
// provided address |h|. Second returns is |true| if an entry exists
// and |false| otherwise.
lookup(h *addr) (indexEntry, bool, error)
lookup(h *hash.Hash) (indexEntry, bool, error)
// Ordinals returns a slice of indexes which maps the |i|th chunk in
// the indexed file to its corresponding entry in index. The |i|th
@@ -192,11 +192,11 @@ func readTableIndexByCopy(ctx context.Context, rd io.ReadSeeker, q MemoryQuotaPr
func hashSetFromTableIndex(idx tableIndex) (hash.HashSet, error) {
set := hash.NewHashSet()
for i := uint32(0); i < idx.chunkCount(); i++ {
var a addr
if _, err := idx.indexEntry(i, &a); err != nil {
var h hash.Hash
if _, err := idx.indexEntry(i, &h); err != nil {
return nil, err
}
set.Insert(hash.Hash(a))
set.Insert(h)
}
return set, nil
}
@@ -289,22 +289,22 @@ func newOnHeapTableIndex(indexBuff []byte, offsetsBuff1 []byte, count uint32, to
}, nil
}
func (ti onHeapTableIndex) entrySuffixMatches(idx uint32, h *addr) (bool, error) {
func (ti onHeapTableIndex) entrySuffixMatches(idx uint32, h *hash.Hash) (bool, error) {
ord := ti.ordinalAt(idx)
o := ord * addrSuffixSize
b := ti.suffixes[o : o+addrSuffixSize]
return bytes.Equal(h[addrPrefixSize:], b), nil
o := ord * hash.SuffixLen
b := ti.suffixes[o : o+hash.SuffixLen]
return bytes.Equal(h[hash.PrefixLen:], b), nil
}
func (ti onHeapTableIndex) indexEntry(idx uint32, a *addr) (entry indexEntry, err error) {
func (ti onHeapTableIndex) indexEntry(idx uint32, a *hash.Hash) (entry indexEntry, err error) {
prefix, ord := ti.tupleAt(idx)
if a != nil {
binary.BigEndian.PutUint64(a[:], prefix)
o := int64(addrSuffixSize * ord)
b := ti.suffixes[o : o+addrSuffixSize]
copy(a[addrPrefixSize:], b)
o := int64(hash.SuffixLen * ord)
b := ti.suffixes[o : o+hash.SuffixLen]
copy(a[hash.PrefixLen:], b)
}
return ti.getIndexEntry(ord), nil
@@ -325,7 +325,7 @@ func (ti onHeapTableIndex) getIndexEntry(ord uint32) indexEntry {
}
}
func (ti onHeapTableIndex) lookup(h *addr) (indexEntry, bool, error) {
func (ti onHeapTableIndex) lookup(h *hash.Hash) (indexEntry, bool, error) {
ord, err := ti.lookupOrdinal(h)
if err != nil {
return indexResult{}, false, err
@@ -338,7 +338,7 @@ func (ti onHeapTableIndex) lookup(h *addr) (indexEntry, bool, error) {
// lookupOrdinal returns the ordinal of |h| if present. Returns |ti.count|
// if absent.
func (ti onHeapTableIndex) lookupOrdinal(h *addr) (uint32, error) {
func (ti onHeapTableIndex) lookupOrdinal(h *hash.Hash) (uint32, error) {
prefix := h.Prefix()
for idx := ti.findPrefix(prefix); idx < ti.count && ti.prefixAt(idx) == prefix; idx++ {
@@ -364,7 +364,7 @@ func (ti onHeapTableIndex) findPrefix(prefix uint64) (idx uint32) {
h := idx + (j-idx)/2 // avoid overflow when computing h
// i ≤ h < j
o := int64(prefixTupleSize * h)
tmp := binary.BigEndian.Uint64(ti.prefixTuples[o : o+addrPrefixSize])
tmp := binary.BigEndian.Uint64(ti.prefixTuples[o : o+hash.PrefixLen])
if tmp < prefix {
idx = h + 1 // preserves f(i-1) == false
} else {
@@ -379,18 +379,18 @@ func (ti onHeapTableIndex) tupleAt(idx uint32) (prefix uint64, ord uint32) {
b := ti.prefixTuples[off : off+prefixTupleSize]
prefix = binary.BigEndian.Uint64(b[:])
ord = binary.BigEndian.Uint32(b[addrPrefixSize:])
ord = binary.BigEndian.Uint32(b[hash.PrefixLen:])
return prefix, ord
}
func (ti onHeapTableIndex) prefixAt(idx uint32) uint64 {
off := int64(prefixTupleSize * idx)
b := ti.prefixTuples[off : off+addrPrefixSize]
b := ti.prefixTuples[off : off+hash.PrefixLen]
return binary.BigEndian.Uint64(b)
}
func (ti onHeapTableIndex) ordinalAt(idx uint32) uint32 {
off := int64(prefixTupleSize*idx) + addrPrefixSize
off := int64(prefixTupleSize*idx) + hash.PrefixLen
b := ti.prefixTuples[off : off+ordinalSize]
return binary.BigEndian.Uint32(b)
}
@@ -413,7 +413,7 @@ func (ti onHeapTableIndex) ordinals() ([]uint32, error) {
// todo: |o| is not accounted for in the memory quota
o := make([]uint32, ti.count)
for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize {
b := ti.prefixTuples[off+addrPrefixSize : off+prefixTupleSize]
b := ti.prefixTuples[off+hash.PrefixLen : off+prefixTupleSize]
o[i] = binary.BigEndian.Uint32(b)
}
return o, nil
@@ -423,7 +423,7 @@ func (ti onHeapTableIndex) prefixes() ([]uint64, error) {
// todo: |p| is not accounted for in the memory quota
p := make([]uint64, ti.count)
for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize {
b := ti.prefixTuples[off : off+addrPrefixSize]
b := ti.prefixTuples[off : off+hash.PrefixLen]
p[i] = binary.BigEndian.Uint64(b)
}
return p, nil
@@ -435,14 +435,14 @@ func (ti onHeapTableIndex) hashAt(idx uint32) hash.Hash {
tuple := ti.prefixTuples[off : off+prefixTupleSize]
// Get prefix, ordinal, and suffix
prefix := tuple[:addrPrefixSize]
ord := binary.BigEndian.Uint32(tuple[addrPrefixSize:]) * addrSuffixSize
suffix := ti.suffixes[ord : ord+addrSuffixSize] // suffix is 12 bytes
prefix := tuple[:hash.PrefixLen]
ord := binary.BigEndian.Uint32(tuple[hash.PrefixLen:]) * hash.SuffixLen
suffix := ti.suffixes[ord : ord+hash.SuffixLen] // suffix is 12 bytes
// Combine prefix and suffix to get hash
buf := [hash.ByteLen]byte{}
copy(buf[:addrPrefixSize], prefix)
copy(buf[addrPrefixSize:], suffix)
copy(buf[:hash.PrefixLen], prefix)
copy(buf[hash.PrefixLen:], suffix)
return buf
}
@@ -484,20 +484,21 @@ func (ti onHeapTableIndex) prefixIdxUBound(prefix uint64) (idx uint32) {
}
func (ti onHeapTableIndex) padStringAndDecode(s string, p string) uint64 {
// Pad string
if p == "0" {
for i := len(s); i < 16; i++ {
s = s + p
}
} else {
for i := len(s); i < 16; i++ {
s = p + s
if len(p) != 1 {
panic("pad string must be of length 1") // This is a programmer error that should never get out of PR.
}
for len(s) < hash.StringLen {
if p == "0" {
s = s + p // Pad on the right side.
} else {
s = p + s // pad on the left side.
}
}
// Decode
h, _ := encoding.DecodeString(s)
return binary.BigEndian.Uint64(h)
h := hash.Parse(s)
return binary.BigEndian.Uint64(h[:])
}
func (ti onHeapTableIndex) chunkCount() uint32 {
+15 -13
View File
@@ -23,6 +23,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/hash"
)
func TestParseTableIndex(t *testing.T) {
@@ -36,9 +38,9 @@ func TestParseTableIndex(t *testing.T) {
require.NoError(t, err)
defer idx.Close()
assert.Equal(t, uint32(596), idx.chunkCount())
seen := make(map[addr]bool)
seen := make(map[hash.Hash]bool)
for i := uint32(0); i < idx.chunkCount(); i++ {
var onheapaddr addr
var onheapaddr hash.Hash
e, err := idx.indexEntry(i, &onheapaddr)
require.NoError(t, err)
if _, ok := seen[onheapaddr]; !ok {
@@ -229,25 +231,25 @@ func TestAmbiguousShortHash(t *testing.T) {
// fakeChunk is chunk with a faked address
type fakeChunk struct {
address addr
address hash.Hash
data []byte
}
var fakeData = []byte("supercalifragilisticexpialidocious")
func addrFromPrefix(prefix string) (a addr) {
func addrFromPrefix(prefix string) hash.Hash {
// create a full length addr from a prefix
for i := 0; i < addrSize; i++ {
prefix += "0"
for {
if len(prefix) < hash.StringLen {
prefix += "0"
} else {
break
}
}
// base32 decode string
h, _ := encoding.DecodeString(prefix)
copy(a[:], h)
return
return hash.Parse(prefix)
}
func buildFakeChunkTable(chunks []fakeChunk) ([]byte, addr, error) {
func buildFakeChunkTable(chunks []fakeChunk) ([]byte, hash.Hash, error) {
totalData := uint64(0)
for _, chunk := range chunks {
totalData += uint64(len(chunk.data))
@@ -265,7 +267,7 @@ func buildFakeChunkTable(chunks []fakeChunk) ([]byte, addr, error) {
length, blockHash, err := tw.finish()
if err != nil {
return nil, addr{}, err
return nil, hash.Hash{}, err
}
return buff[:length], blockHash, nil
+13 -13
View File
@@ -32,6 +32,7 @@ import (
"time"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
var errCacheMiss = errors.New("index cache miss")
@@ -55,15 +56,15 @@ type tablePersister interface {
ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error)
// Open a table named |name|, containing |chunkCount| chunks.
Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error)
Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error)
// Exists checks if a table named |name| exists.
Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error)
Exists(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (bool, error)
// PruneTableFiles deletes table files which the persister would normally be responsible for and
// which are not in the included |keeper| set and have not be written or modified more recently
// than the provided |mtime|.
PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error
PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, mtime time.Time) error
AccessMode() chunks.ExclusiveAccessMode
@@ -118,7 +119,7 @@ type compactionPlan struct {
func (cp compactionPlan) suffixes() []byte {
suffixesStart := uint64(cp.chunkCount) * (prefixTupleSize + lengthSize)
return cp.mergedIndex[suffixesStart : suffixesStart+uint64(cp.chunkCount)*addrSuffixSize]
return cp.mergedIndex[suffixesStart : suffixesStart+uint64(cp.chunkCount)*hash.SuffixLen]
}
// planRangeCopyConjoin computes a conjoin plan for tablePersisters that can conjoin
@@ -221,19 +222,19 @@ func planConjoin(sources []sourceWithSize, stats *Stats) (plan compactionPlan, e
suffixesPos += uint64(n)
} else {
// Build up the index one entry at a time.
var a addr
var h hash.Hash
for i := 0; i < len(ordinals); i++ {
e, err := index.indexEntry(uint32(i), &a)
e, err := index.indexEntry(uint32(i), &h)
if err != nil {
return compactionPlan{}, err
}
li := lengthsPos + lengthSize*uint64(ordinals[i])
si := suffixesPos + addrSuffixSize*uint64(ordinals[i])
si := suffixesPos + hash.SuffixLen*uint64(ordinals[i])
binary.BigEndian.PutUint32(plan.mergedIndex[li:], e.Length())
copy(plan.mergedIndex[si:], a[addrPrefixSize:])
copy(plan.mergedIndex[si:], h[hash.PrefixLen:])
}
lengthsPos += lengthSize * uint64(len(ordinals))
suffixesPos += addrSuffixSize * uint64(len(ordinals))
suffixesPos += hash.SuffixLen * uint64(len(ordinals))
}
}
@@ -242,7 +243,7 @@ func planConjoin(sources []sourceWithSize, stats *Stats) (plan compactionPlan, e
var pfxPos uint64
for _, pi := range prefixIndexRecs {
binary.BigEndian.PutUint64(plan.mergedIndex[pfxPos:], pi.addr.Prefix())
pfxPos += addrPrefixSize
pfxPos += hash.PrefixLen
binary.BigEndian.PutUint32(plan.mergedIndex[pfxPos:], pi.order)
pfxPos += ordinalSize
}
@@ -253,12 +254,11 @@ func planConjoin(sources []sourceWithSize, stats *Stats) (plan compactionPlan, e
return plan, nil
}
func nameFromSuffixes(suffixes []byte) (name addr) {
func nameFromSuffixes(suffixes []byte) (name hash.Hash) {
sha := sha512.New()
sha.Write(suffixes)
var h []byte
h = sha.Sum(h) // Appends hash to h
copy(name[:], h)
return
return hash.New(h[:hash.ByteLen])
}
+6 -6
View File
@@ -233,14 +233,14 @@ func (tr tableReader) index() (tableIndex, error) {
}
// returns true iff |h| can be found in this table.
func (tr tableReader) has(h addr) (bool, error) {
func (tr tableReader) has(h hash.Hash) (bool, error) {
_, ok, err := tr.idx.lookup(&h)
return ok, err
}
// returns the storage associated with |h|, iff present. Returns nil if absent. On success,
// the returned byte slice directly references the underlying storage.
func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
func (tr tableReader) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
e, found, err := tr.idx.lookup(&h)
if err != nil {
return nil, err
@@ -283,7 +283,7 @@ func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, er
}
type offsetRec struct {
a *addr
a *hash.Hash
offset uint64
length uint32
}
@@ -639,12 +639,12 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord)
var ors offsetRecSlice
for i := uint32(0); i < tr.idx.chunkCount(); i++ {
a := new(addr)
e, err := tr.idx.indexEntry(i, a)
h := new(hash.Hash)
e, err := tr.idx.indexEntry(i, h)
if err != nil {
return err
}
ors = append(ors, offsetRec{a, e.Offset(), e.Length()})
ors = append(ors, offsetRec{h, e.Offset(), e.Length()})
}
sort.Sort(ors)
for _, or := range ors {
+5 -4
View File
@@ -33,6 +33,7 @@ import (
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
// Returned when a chunk with a reference to a non-existence chunk is
@@ -57,7 +58,7 @@ type tableSet struct {
rl chan struct{}
}
func (ts tableSet) has(h addr) (bool, error) {
func (ts tableSet) has(h hash.Hash) (bool, error) {
f := func(css chunkSourceSet) (bool, error) {
for _, haver := range css {
has, err := haver.has(h)
@@ -114,7 +115,7 @@ func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) {
return f(ts.upstream)
}
func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
func (ts tableSet) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
@@ -287,7 +288,7 @@ func (ts tableSet) Size() int {
// append adds a memTable to an existing tableSet, compacting |mt| and
// returning a new tableSet with newly compacted table added.
func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[addr, struct{}], stats *Stats) (tableSet, error) {
func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[hash.Hash, struct{}], stats *Stats) (tableSet, error) {
for i := range mt.pendingRefs {
if !mt.pendingRefs[i].has && hasCache.Contains(*mt.pendingRefs[i].a) {
mt.pendingRefs[i].has = true
@@ -345,7 +346,7 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats)
// deduplicate |specs|
orig := specs
specs = make([]tableSpec, 0, len(orig))
seen := map[addr]struct{}{}
seen := map[hash.Hash]struct{}{}
for _, spec := range orig {
if _, ok := seen[spec.name]; ok {
continue
+6 -6
View File
@@ -39,7 +39,7 @@ var hasManyHasAll = func([]hasRecord) (hash.HashSet, error) {
}
func TestTableSetPrependEmpty(t *testing.T) {
hasCache, err := lru.New2Q[addr, struct{}](1024)
hasCache, err := lru.New2Q[hash.Hash, struct{}](1024)
require.NoError(t, err)
ts, err := newFakeTableSet(&UnlimitedQuotaProvider{}).append(context.Background(), newMemTable(testMemTableSize), hasManyHasAll, hasCache, &Stats{})
require.NoError(t, err)
@@ -59,7 +59,7 @@ func TestTableSetPrepend(t *testing.T) {
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
hasCache, err := lru.New2Q[addr, struct{}](1024)
hasCache, err := lru.New2Q[hash.Hash, struct{}](1024)
require.NoError(t, err)
ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{})
require.NoError(t, err)
@@ -91,7 +91,7 @@ func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) {
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
hasCache, err := lru.New2Q[addr, struct{}](1024)
hasCache, err := lru.New2Q[hash.Hash, struct{}](1024)
require.NoError(t, err)
ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{})
require.NoError(t, err)
@@ -122,7 +122,7 @@ func TestTableSetFlattenExcludesEmptyTable(t *testing.T) {
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
hasCache, err := lru.New2Q[addr, struct{}](1024)
hasCache, err := lru.New2Q[hash.Hash, struct{}](1024)
require.NoError(t, err)
ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{})
require.NoError(t, err)
@@ -156,7 +156,7 @@ func TestTableSetRebase(t *testing.T) {
assert := assert.New(t)
q := NewUnlimitedMemQuotaProvider()
persister := newFakeTablePersister(q)
hasCache, err := lru.New2Q[addr, struct{}](1024)
hasCache, err := lru.New2Q[hash.Hash, struct{}](1024)
require.NoError(t, err)
insert := func(ts tableSet, chunks ...[]byte) tableSet {
@@ -211,7 +211,7 @@ func TestTableSetPhysicalLen(t *testing.T) {
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
hasCache, err := lru.New2Q[addr, struct{}](1024)
hasCache, err := lru.New2Q[hash.Hash, struct{}](1024)
require.NoError(t, err)
ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{})
require.NoError(t, err)
+23 -27
View File
@@ -37,7 +37,7 @@ import (
"github.com/dolthub/dolt/go/store/hash"
)
func buildTable(chunks [][]byte) ([]byte, addr, error) {
func buildTable(chunks [][]byte) ([]byte, hash.Hash, error) {
totalData := uint64(0)
for _, chunk := range chunks {
totalData += uint64(len(chunk))
@@ -55,7 +55,7 @@ func buildTable(chunks [][]byte) ([]byte, addr, error) {
length, blockHash, err := tw.finish()
if err != nil {
return nil, addr{}, err
return nil, hash.Hash{}, err
}
return buff[:length], blockHash, nil
@@ -134,11 +134,11 @@ func TestHasMany(t *testing.T) {
require.NoError(t, err)
defer tr.close()
addrs := addrSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
addrs := hash.HashSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
hasAddrs := []hasRecord{
{&addrs[0], binary.BigEndian.Uint64(addrs[0][:addrPrefixSize]), 0, false},
{&addrs[1], binary.BigEndian.Uint64(addrs[1][:addrPrefixSize]), 1, false},
{&addrs[2], binary.BigEndian.Uint64(addrs[2][:addrPrefixSize]), 2, false},
{&addrs[0], binary.BigEndian.Uint64(addrs[0][:hash.PrefixLen]), 0, false},
{&addrs[1], binary.BigEndian.Uint64(addrs[1][:hash.PrefixLen]), 1, false},
{&addrs[2], binary.BigEndian.Uint64(addrs[2][:hash.PrefixLen]), 2, false},
}
sort.Sort(hasRecordByPrefix(hasAddrs))
@@ -161,9 +161,9 @@ func TestHasManySequentialPrefix(t *testing.T) {
"0rfgadopg6h3fk7d253ivbjsij4qo9nv",
}
addrs := make([]addr, len(addrStrings))
addrs := make([]hash.Hash, len(addrStrings))
for i, s := range addrStrings {
addrs[i] = addr(hash.Parse(s))
addrs[i] = hash.Parse(s)
}
bogusData := []byte("bogus") // doesn't matter what this is. hasMany() won't check chunkRecords
@@ -203,7 +203,7 @@ func TestHasManySequentialPrefix(t *testing.T) {
func BenchmarkHasMany(b *testing.B) {
const cnt = 64 * 1024
chnks := make([][]byte, cnt)
addrs := make(addrSlice, cnt)
addrs := make(hash.HashSlice, cnt)
hrecs := make([]hasRecord, cnt)
sparse := make([]hasRecord, cnt/1024)
@@ -218,7 +218,7 @@ func BenchmarkHasMany(b *testing.B) {
for i := range hrecs {
hrecs[i] = hasRecord{
a: &addrs[i],
prefix: prefixOf(addrs[i]),
prefix: addrs[i].Prefix(),
order: i,
}
}
@@ -226,7 +226,7 @@ func BenchmarkHasMany(b *testing.B) {
j := i * 64
hrecs[i] = hasRecord{
a: &addrs[j],
prefix: prefixOf(addrs[j]),
prefix: addrs[j].Prefix(),
order: j,
}
}
@@ -261,10 +261,6 @@ func BenchmarkHasMany(b *testing.B) {
})
}
func prefixOf(a addr) uint64 {
return binary.BigEndian.Uint64(a[:addrPrefixSize])
}
func TestGetMany(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)
@@ -283,11 +279,11 @@ func TestGetMany(t *testing.T) {
require.NoError(t, err)
defer tr.close()
addrs := addrSlice{computeAddr(data[0]), computeAddr(data[1]), computeAddr(data[2])}
addrs := hash.HashSlice{computeAddr(data[0]), computeAddr(data[1]), computeAddr(data[2])}
getBatch := []getRecord{
{&addrs[0], binary.BigEndian.Uint64(addrs[0][:addrPrefixSize]), false},
{&addrs[1], binary.BigEndian.Uint64(addrs[1][:addrPrefixSize]), false},
{&addrs[2], binary.BigEndian.Uint64(addrs[2][:addrPrefixSize]), false},
{&addrs[0], binary.BigEndian.Uint64(addrs[0][:hash.PrefixLen]), false},
{&addrs[1], binary.BigEndian.Uint64(addrs[1][:hash.PrefixLen]), false},
{&addrs[2], binary.BigEndian.Uint64(addrs[2][:hash.PrefixLen]), false},
}
sort.Sort(getRecordByPrefix(getBatch))
@@ -318,11 +314,11 @@ func TestCalcReads(t *testing.T) {
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), 0)
require.NoError(t, err)
defer tr.close()
addrs := addrSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
addrs := hash.HashSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
getBatch := []getRecord{
{&addrs[0], binary.BigEndian.Uint64(addrs[0][:addrPrefixSize]), false},
{&addrs[1], binary.BigEndian.Uint64(addrs[1][:addrPrefixSize]), false},
{&addrs[2], binary.BigEndian.Uint64(addrs[2][:addrPrefixSize]), false},
{&addrs[0], binary.BigEndian.Uint64(addrs[0][:hash.PrefixLen]), false},
{&addrs[1], binary.BigEndian.Uint64(addrs[1][:hash.PrefixLen]), false},
{&addrs[2], binary.BigEndian.Uint64(addrs[2][:hash.PrefixLen]), false},
}
gb2 := []getRecord{getBatch[0], getBatch[2]}
@@ -358,7 +354,7 @@ func TestExtract(t *testing.T) {
require.NoError(t, err)
defer tr.close()
addrs := addrSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
addrs := hash.HashSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
chunkChan := make(chan extractRecord)
go func() {
@@ -420,8 +416,8 @@ func Test65k(t *testing.T) {
// Ensure all addresses share the first 7 bytes. Useful for easily generating tests which have
// "prefix" collisions.
func computeAddrCommonPrefix(data []byte) addr {
a := computeAddrDefault(data)
func computeAddrCommonPrefix(data []byte) hash.Hash {
a := computeHashDefault(data)
a[0] = 0x01
a[1] = 0x23
a[2] = 0x45
@@ -479,7 +475,7 @@ func Test65kGetMany(t *testing.T) {
func Test2kGetManyCommonPrefix(t *testing.T) {
computeAddr = computeAddrCommonPrefix
defer func() {
computeAddr = computeAddrDefault
computeAddr = computeHashDefault
}()
doTestNGetMany(t, 1<<11)
+19 -16
View File
@@ -26,12 +26,13 @@ import (
"encoding/binary"
"errors"
"fmt"
"hash"
gohash "hash"
"sort"
"github.com/golang/snappy"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
)
// tableWriter encodes a collection of byte stream chunks into a nbs table. NOT goroutine safe.
@@ -41,7 +42,7 @@ type tableWriter struct {
totalCompressedData uint64
totalUncompressedData uint64
prefixes prefixIndexSlice
blockHash hash.Hash
blockHash gohash.Hash
snapper snappyEncoder
}
@@ -61,11 +62,11 @@ func maxTableSize(numChunks, totalData uint64) uint64 {
d.Chk.True(avgChunkSize < maxChunkSize)
maxSnappySize := snappy.MaxEncodedLen(int(avgChunkSize))
d.Chk.True(maxSnappySize > 0)
return numChunks*(prefixTupleSize+lengthSize+addrSuffixSize+checksumSize+uint64(maxSnappySize)) + footerSize
return numChunks*(prefixTupleSize+lengthSize+hash.SuffixLen+checksumSize+uint64(maxSnappySize)) + footerSize
}
func indexSize(numChunks uint32) uint64 {
return uint64(numChunks) * (addrSuffixSize + lengthSize + prefixTupleSize)
return uint64(numChunks) * (hash.SuffixLen + lengthSize + prefixTupleSize)
}
func lengthsOffset(numChunks uint32) uint64 {
@@ -88,7 +89,7 @@ func newTableWriter(buff []byte, snapper snappyEncoder) *tableWriter {
}
}
func (tw *tableWriter) addChunk(h addr, data []byte) bool {
func (tw *tableWriter) addChunk(h hash.Hash, data []byte) bool {
if len(data) == 0 {
panic("NBS blocks cannont be zero length")
}
@@ -123,11 +124,11 @@ func (tw *tableWriter) addChunk(h addr, data []byte) bool {
return true
}
func (tw *tableWriter) finish() (uncompressedLength uint64, blockAddr addr, err error) {
func (tw *tableWriter) finish() (uncompressedLength uint64, blockAddr hash.Hash, err error) {
err = tw.writeIndex()
if err != nil {
return 0, addr{}, err
return 0, hash.Hash{}, err
}
tw.writeFooter()
@@ -140,20 +141,22 @@ func (tw *tableWriter) finish() (uncompressedLength uint64, blockAddr addr, err
}
type prefixIndexRec struct {
addr addr
addr hash.Hash
order, size uint32
}
type prefixIndexSlice []prefixIndexRec
func (hs prefixIndexSlice) Len() int { return len(hs) }
func (hs prefixIndexSlice) Less(i, j int) bool { return hs[i].addr.Prefix() < hs[j].addr.Prefix() }
func (hs prefixIndexSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
func (hs prefixIndexSlice) Len() int { return len(hs) }
func (hs prefixIndexSlice) Less(i, j int) bool {
return hs[i].addr.Prefix() < hs[j].addr.Prefix()
}
func (hs prefixIndexSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
func (tw *tableWriter) writeIndex() error {
sort.Sort(tw.prefixes)
pfxScratch := [addrPrefixSize]byte{}
pfxScratch := [hash.PrefixLen]byte{}
numRecords := uint32(len(tw.prefixes))
lengthsOffset := tw.pos + lengthsOffset(numRecords) // skip prefix and ordinal for each record
@@ -163,7 +166,7 @@ func (tw *tableWriter) writeIndex() error {
// hash prefix
n := uint64(copy(tw.buff[tw.pos:], pfxScratch[:]))
if n != addrPrefixSize {
if n != hash.PrefixLen {
return errors.New("failed to copy all data")
}
@@ -178,14 +181,14 @@ func (tw *tableWriter) writeIndex() error {
binary.BigEndian.PutUint32(tw.buff[offset:], pi.size)
// hash suffix
offset = suffixesOffset + uint64(pi.order)*addrSuffixSize
offset = suffixesOffset + uint64(pi.order)*hash.SuffixLen
n = uint64(copy(tw.buff[offset:], pi.addr.Suffix()))
if n != addrSuffixSize {
if n != hash.SuffixLen {
return errors.New("failed to copy all bytes")
}
}
suffixesLen := uint64(numRecords) * addrSuffixSize
suffixesLen := uint64(numRecords) * hash.SuffixLen
tw.blockHash.Write(tw.buff[suffixesOffset : suffixesOffset+suffixesLen])
tw.pos = suffixesOffset + suffixesLen
+6 -6
View File
@@ -33,21 +33,21 @@ func IterChunks(ctx context.Context, rd io.ReadSeeker, cb func(chunk chunks.Chun
defer idx.Close()
seen := make(map[addr]bool)
seen := make(map[hash.Hash]bool)
for i := uint32(0); i < idx.chunkCount(); i++ {
var a addr
ie, err := idx.indexEntry(i, &a)
var h hash.Hash
ie, err := idx.indexEntry(i, &h)
if err != nil {
return err
}
if _, ok := seen[a]; !ok {
seen[a] = true
if _, ok := seen[h]; !ok {
seen[h] = true
chunkBytes, err := readNFrom(rd, ie.Offset(), ie.Length())
if err != nil {
return err
}
cmpChnk, err := NewCompressedChunk(hash.Hash(a), chunkBytes)
cmpChnk, err := NewCompressedChunk(h, chunkBytes)
if err != nil {
return err
}