diff --git a/go/cmd/dolt/dolt.go b/go/cmd/dolt/dolt.go index 0bfdf2fc76..f0087210f7 100644 --- a/go/cmd/dolt/dolt.go +++ b/go/cmd/dolt/dolt.go @@ -24,7 +24,6 @@ import ( "os" "os/exec" "strconv" - "sync" "time" "github.com/fatih/color" @@ -390,11 +389,16 @@ func runMain() int { } start := time.Now() - var wg sync.WaitGroup ctx, stop := context.WithCancel(ctx) res := doltCommand.Exec(ctx, "dolt", args, dEnv) stop() - wg.Wait() + + if err = dbfactory.CloseAllLocalDatabases(); err != nil { + cli.PrintErrln(err) + if res == 0 { + res = 1 + } + } if csMetrics && dEnv.DoltDB != nil { metricsSummary := dEnv.DoltDB.CSMetricsSummary() diff --git a/go/libraries/doltcore/dbfactory/file.go b/go/libraries/doltcore/dbfactory/file.go index e2a180faea..7dcb68a1b7 100644 --- a/go/libraries/doltcore/dbfactory/file.go +++ b/go/libraries/doltcore/dbfactory/file.go @@ -17,9 +17,11 @@ package dbfactory import ( "context" "errors" + "fmt" "net/url" "os" "path/filepath" + "sync" "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/store/datas" @@ -43,6 +45,26 @@ var DoltDataDir = filepath.Join(DoltDir, DataDir) type FileFactory struct { } +type singletonDB struct { + ddb datas.Database + vrw types.ValueReadWriter + ns tree.NodeStore +} + +var singletonLock = new(sync.Mutex) +var singletons = make(map[string]singletonDB) + +func CloseAllLocalDatabases() (err error) { + singletonLock.Lock() + defer singletonLock.Unlock() + for name, s := range singletons { + if cerr := s.ddb.Close(); cerr != nil { + err = fmt.Errorf("error closing DB %s (%s)", name, cerr) + } + } + return +} + // PrepareDB creates the directory for the DB if it doesn't exist, and returns an error if a file or symlink is at the // path given func (fact FileFactory) PrepareDB(ctx context.Context, nbf *types.NomsBinFormat, u *url.URL, params map[string]interface{}) error { @@ -71,6 +93,13 @@ func (fact FileFactory) PrepareDB(ctx context.Context, nbf *types.NomsBinFormat, // CreateDB creates a local filesys backed database func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) { + singletonLock.Lock() + defer singletonLock.Unlock() + + if s, ok := singletons[urlObj.String()]; ok { + return s.ddb, s.vrw, s.ns, nil + } + path, err := url.PathUnescape(urlObj.Path) if err != nil { @@ -115,8 +144,15 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat, vrw := types.NewValueStore(st) ns := tree.NewNodeStore(st) + ddb := datas.NewTypesDatabase(vrw, ns) - return datas.NewTypesDatabase(vrw, ns), vrw, ns, nil + singletons[urlObj.String()] = singletonDB{ + ddb: ddb, + vrw: vrw, + ns: ns, + } + + return ddb, vrw, ns, nil } func validateDir(path string) error { diff --git a/go/performance/scripts/local_sysbench.sh b/go/performance/scripts/local_sysbench.sh index 8ce5bdb029..8592138583 100755 --- a/go/performance/scripts/local_sysbench.sh +++ b/go/performance/scripts/local_sysbench.sh @@ -35,6 +35,9 @@ do --row2) export ENABLE_ROW_ITER_2=true ;; + --journal) export DOLT_ENABLE_CHUNK_JOURNAL=true + ;; + # specify sysbench benchmark *) SYSBENCH_TEST="$1" ;; @@ -123,6 +126,7 @@ sysbench \ --db-ps-mode=disable \ "$SYSBENCH_TEST" run +unset DOLT_ENABLE_CHUNK_JOURNAL unset DOLT_DEFAULT_BIN_FORMAT unset ENABLE_ROW_ITER_2 unset SINGLE_THREAD_FEATURE_FLAG diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 962237a956..a546e5b0b0 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -578,3 +578,7 @@ func (s3p awsTablePersister) uploadPart(ctx context.Context, data []byte, key, u func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents, t time.Time) error { return chunks.ErrUnsupportedOperation } + +func (s3p awsTablePersister) Close() error { + return nil +} diff --git a/go/store/nbs/block_store_test.go b/go/store/nbs/block_store_test.go index 55d4c8e5b4..e3b0667fd0 100644 --- a/go/store/nbs/block_store_test.go +++ b/go/store/nbs/block_store_test.go @@ -46,21 +46,32 @@ import ( const testMemTableSize = 1 << 8 func TestBlockStoreSuite(t *testing.T) { - suite.Run(t, &BlockStoreSuite{}) + fn := func(ctx context.Context, dir string) (*NomsBlockStore, error) { + nbf := constants.FormatDefaultString + qp := NewUnlimitedMemQuotaProvider() + return NewLocalStore(ctx, nbf, dir, testMemTableSize, qp) + } + suite.Run(t, &BlockStoreSuite{factory: fn}) } type BlockStoreSuite struct { suite.Suite dir string store *NomsBlockStore + factory nbsFactory putCountFn func() int + + // if true, skip interloper tests + skipInterloper bool } +type nbsFactory func(ctx context.Context, dir string) (*NomsBlockStore, error) + func (suite *BlockStoreSuite) SetupTest() { var err error - suite.dir, err = os.MkdirTemp("", "") - suite.NoError(err) - suite.store, err = NewLocalStore(context.Background(), constants.FormatDefaultString, suite.dir, testMemTableSize, NewUnlimitedMemQuotaProvider()) + suite.dir = suite.T().TempDir() + ctx := context.Background() + suite.store, err = suite.factory(ctx, suite.dir) suite.NoError(err) suite.putCountFn = func() int { return int(suite.store.putCount) @@ -194,9 +205,9 @@ func (suite *BlockStoreSuite) TestChunkStorePutMoreThanMemTable() { if suite.putCountFn != nil { suite.Equal(2, suite.putCountFn()) } - specs, err := suite.store.tables.toSpecs() + sz, err := suite.store.tables.physicalLen() suite.NoError(err) - suite.Len(specs, 2) + suite.True(sz > testMemTableSize) } func (suite *BlockStoreSuite) TestChunkStoreGetMany() { @@ -271,6 +282,9 @@ func (suite *BlockStoreSuite) TestChunkStoreHasMany() { } func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() { + if suite.skipInterloper { + suite.T().Skip() + } input1, input2 := []byte("abc"), []byte("def") c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2) root, err := suite.store.Root(context.Background()) @@ -319,6 +333,9 @@ func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() { } func (suite *BlockStoreSuite) TestChunkStoreRebaseOnNoOpFlush() { + if suite.skipInterloper { + suite.T().Skip() + } input1 := []byte("abc") c1 := chunks.NewChunk(input1) @@ -353,6 +370,9 @@ func (suite *BlockStoreSuite) TestChunkStoreRebaseOnNoOpFlush() { } func (suite *BlockStoreSuite) TestChunkStorePutWithRebase() { + if suite.skipInterloper { + suite.T().Skip() + } input1, input2 := []byte("abc"), []byte("def") c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2) root, err := suite.store.Root(context.Background()) diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index a161cec877..9a0d2dbea1 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -131,3 +131,7 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, contents manifestContents, t time.Time) error { return chunks.ErrUnsupportedOperation } + +func (bsp *blobstorePersister) Close() error { + return nil +} diff --git a/go/store/nbs/chunk_journal.go b/go/store/nbs/chunk_journal.go new file mode 100644 index 0000000000..a549ecc01f --- /dev/null +++ b/go/store/nbs/chunk_journal.go @@ -0,0 +1,361 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" +) + +var chunkJournalFeatureFlag = false + +func init() { + if os.Getenv("DOLT_ENABLE_CHUNK_JOURNAL") != "" { + chunkJournalFeatureFlag = true + } +} + +const ( + chunkJournalName = "nbs_chunk_journal" +) + +type chunkJournal struct { + journal *journalWriter + + source journalChunkSource + + contents manifestContents + backing manifest +} + +var _ tablePersister = &chunkJournal{} +var _ manifest = &chunkJournal{} +var _ io.Closer = &chunkJournal{} + +type journalChunkSource struct { + address addr + journal io.ReaderAt + lookups map[addr]jrecordLookup + compressedSz uint64 +} + +var _ chunkSource = journalChunkSource{} + +type jrecordLookup struct { + offset int64 + length uint32 +} + +func newChunkJournal(ctx context.Context, dir string, m manifest) (*chunkJournal, error) { + path, err := filepath.Abs(filepath.Join(dir, chunkJournalName)) + if err != nil { + return nil, err + } + + wr, err := openJournalWriter(ctx, path) + if err != nil { + return nil, err + } + + root, source, err := wr.bootstrapJournal(ctx) + if err != nil { + return nil, err + } + + ok, contents, err := m.ParseIfExists(ctx, &Stats{}, nil) + if err != nil { + return nil, err + } + + if ok { + // the journal file is the source of truth for the root hash, true-up persisted manifest + contents.root = root + if contents, err = m.Update(ctx, contents.lock, contents, &Stats{}, nil); err != nil { + return nil, err + } + } else if !emptyAddr(addr(root)) { + // journal file contains root hash, but manifest is missing + return nil, fmt.Errorf("missing manifest while initializing chunk journal") + } + + return &chunkJournal{ + journal: wr, + source: source, + contents: contents, + backing: m, + }, nil +} + +// Persist implements tablePersister. +func (j *chunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { + if haver != nil { + sort.Sort(hasRecordByPrefix(mt.order)) // hasMany() requires addresses to be sorted. + if _, err := haver.hasMany(mt.order); err != nil { + return nil, err + } + sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write + } + + for _, record := range mt.order { + if record.has { + continue + } + c := chunks.NewChunkWithHash(hash.Hash(*record.a), mt.chunks[*record.a]) + cc := ChunkToCompressedChunk(c) + lookup, err := j.journal.writeChunk(cc) + if err != nil { + return nil, err + } + j.source.lookups[*record.a] = lookup + j.source.compressedSz += uint64(cc.CompressedSize()) + } + return j.source, nil +} + +// ConjoinAll implements tablePersister. +func (j *chunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) { + panic("unimplemented") +} + +// Open implements tablePersister. +func (j *chunkJournal) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { + if name == j.source.address { + return j.source, nil + } + return nil, fmt.Errorf("unknown chunk source %s", name.String()) +} + +// Exists implements tablePersister. +func (j *chunkJournal) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) { + panic("unimplemented") +} + +// PruneTableFiles implements tablePersister. +func (j *chunkJournal) PruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error { + panic("unimplemented") +} + +// Name implements manifest. +func (j *chunkJournal) Name() string { + return j.journal.filepath() +} + +// Update implements manifest. +func (j *chunkJournal) Update(ctx context.Context, lastLock addr, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) { + if j.contents.gcGen != next.gcGen { + panic("chunkJournal cannot update GC generation") + } else if j.contents.lock != lastLock { + return j.contents, nil // |next| is stale + } + + if writeHook != nil { + if err := writeHook(); err != nil { + return manifestContents{}, err + } + } + + if emptyAddr(addr(next.root)) { + panic(next) + } + + if err := j.journal.writeRootHash(next.root); err != nil { + return manifestContents{}, err + } + j.contents = next + + return j.contents, nil +} + +// ParseIfExists implements manifest. +func (j *chunkJournal) ParseIfExists(ctx context.Context, stats *Stats, readHook func() error) (ok bool, mc manifestContents, err error) { + if emptyAddr(j.contents.lock) { + ok, mc, err = j.backing.ParseIfExists(ctx, stats, readHook) + if err != nil || !ok { + return false, manifestContents{}, err + } + j.contents = mc + return + } + if readHook != nil { + if err = readHook(); err != nil { + return false, manifestContents{}, err + } + } + ok, mc = true, j.contents + return +} + +func (j *chunkJournal) flushManifest() error { + ctx, s := context.Background(), &Stats{} + _, last, err := j.backing.ParseIfExists(ctx, s, nil) + if err != nil { + return err + } + if !emptyAddr(j.contents.lock) { + _, err = j.backing.Update(ctx, last.lock, j.contents, s, nil) + } + return err +} + +// Close implements io.Closer +func (j *chunkJournal) Close() (err error) { + if cerr := j.flushManifest(); cerr != nil { + err = cerr + } + if cerr := j.journal.Close(); cerr != nil { + err = cerr + } + return +} + +func (s journalChunkSource) has(h addr) (bool, error) { + _, ok := s.lookups[h] + return ok, nil +} + +func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error) { + for i := range addrs { + a := addrs[i].a + if _, ok := s.lookups[*a]; ok { + addrs[i].has = true + } else { + missing = true + } + } + return +} + +func (s journalChunkSource) getCompressed(_ context.Context, h addr, _ *Stats) (cc CompressedChunk, err error) { + l, ok := s.lookups[h] + if !ok { + return CompressedChunk{}, nil + } + + buf := make([]byte, l.length) + if _, err = s.journal.ReadAt(buf, l.offset); err != nil { + return CompressedChunk{}, nil + } + + rec := readJournalRecord(buf) + if h != rec.address { + err = fmt.Errorf("bad chunk get (%s != %s)", h.String(), rec.address.String()) + return + } + + return NewCompressedChunk(hash.Hash(h), rec.payload) +} + +func (s journalChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { + cc, err := s.getCompressed(ctx, h, stats) + if err != nil { + return nil, err + } else if cc.IsEmpty() { + return nil, nil + } + ch, err := cc.ToChunk() + if err != nil { + return nil, err + } + return ch.Data(), nil +} + +func (s journalChunkSource) getMany(ctx context.Context, _ *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { + var remaining bool + // todo: read planning + for i := range reqs { + data, err := s.get(ctx, *reqs[i].a, stats) + if err != nil { + return false, err + } else if data != nil { + ch := chunks.NewChunkWithHash(hash.Hash(*reqs[i].a), data) + found(ctx, &ch) + } else { + remaining = true + } + } + return remaining, nil +} + +func (s journalChunkSource) getManyCompressed(ctx context.Context, _ *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { + var remaining bool + // todo: read planning + for i := range reqs { + cc, err := s.getCompressed(ctx, *reqs[i].a, stats) + if err != nil { + return false, err + } else if cc.IsEmpty() { + remaining = true + } else { + found(ctx, cc) + } + } + return remaining, nil +} + +func (s journalChunkSource) count() (uint32, error) { + return uint32(len(s.lookups)), nil +} + +func (s journalChunkSource) uncompressedLen() (uint64, error) { + // todo(andy) + return s.compressedSz, nil +} + +func (s journalChunkSource) hash() addr { + return s.address +} + +// reader implements chunkSource. +func (s journalChunkSource) reader(context.Context) (io.Reader, error) { + // todo(andy): |reader()| belongs to the chunkSource interface and exists + // due to the duality between chunkSources & table files. chunkJournal + // seeks to create many chunkSources that depend on a single file. + // |reader()| in particular is relevant to conjoin implementations. + panic("unimplemented") +} + +// size implements chunkSource. +// size returns the total size of the chunkSource: chunks, index, and footer +func (s journalChunkSource) size() (uint64, error) { + return s.compressedSz, nil // todo(andy) +} + +// index implements chunkSource. +func (s journalChunkSource) index() (tableIndex, error) { + panic("unimplemented") +} + +func (s journalChunkSource) clone() (chunkSource, error) { + return s, nil +} + +func (s journalChunkSource) close() error { + return nil +} + +func emptyAddr(a addr) bool { + var b addr + return a == b +} diff --git a/go/store/nbs/chunk_journal_test.go b/go/store/nbs/chunk_journal_test.go new file mode 100644 index 0000000000..ff6f8e125f --- /dev/null +++ b/go/store/nbs/chunk_journal_test.go @@ -0,0 +1,234 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "bytes" + "context" + "math/rand" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/constants" + "github.com/dolthub/dolt/go/store/d" + "github.com/dolthub/dolt/go/store/hash" +) + +func TestChunkJournalBlockStoreSuite(t *testing.T) { + cacheOnce.Do(makeGlobalCaches) + fn := func(ctx context.Context, dir string) (*NomsBlockStore, error) { + m, err := getFileManifest(ctx, dir) + if err != nil { + return nil, err + } + j, err := newChunkJournal(ctx, dir, m) + if err != nil { + return nil, err + } + nbf := constants.FormatDefaultString + mm := makeManifestManager(j) + q := NewUnlimitedMemQuotaProvider() + c := inlineConjoiner{defaultMaxTables} + return newNomsBlockStore(ctx, nbf, mm, j, q, c, testMemTableSize) + } + suite.Run(t, &BlockStoreSuite{ + factory: fn, + skipInterloper: true, + }) +} + +func TestChunkJournalPersist(t *testing.T) { + ctx := context.Background() + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + m, err := getFileManifest(ctx, dir) + require.NoError(t, err) + j, err := newChunkJournal(ctx, dir, m) + require.NoError(t, err) + + const iters = 64 + stats := &Stats{} + haver := emptyChunkSource{} + for i := 0; i < iters; i++ { + memTbl, chunkMap := randomMemTable(16) + source, err := j.Persist(ctx, memTbl, haver, stats) + assert.NoError(t, err) + + for h, ch := range chunkMap { + ok, err := source.has(h) + assert.NoError(t, err) + assert.True(t, ok) + data, err := source.get(ctx, h, stats) + assert.NoError(t, err) + assert.Equal(t, ch.Data(), data) + } + + cs, err := j.Open(ctx, source.hash(), 16, stats) + assert.NotNil(t, cs) + assert.NoError(t, err) + } +} + +func TestRoundTripRecords(t *testing.T) { + t.Run("chunk record", func(t *testing.T) { + for i := 0; i < 64; i++ { + rec, buf := makeChunkRecord() + assert.Equal(t, rec.length, uint32(len(buf))) + b := make([]byte, rec.length) + n := writeChunkRecord(b, mustCompressedChunk(rec)) + assert.Equal(t, n, rec.length) + assert.Equal(t, buf, b) + r := readJournalRecord(buf) + assert.Equal(t, rec, r) + } + }) + t.Run("root hash record", func(t *testing.T) { + for i := 0; i < 64; i++ { + rec, buf := makeRootHashRecord() + assert.Equal(t, rec.length, uint32(len(buf))) + b := make([]byte, rec.length) + n := writeRootHashRecord(b, rec.address) + assert.Equal(t, n, rec.length) + assert.Equal(t, buf, b) + r := readJournalRecord(buf) + assert.Equal(t, rec, r) + } + }) +} + +func TestProcessRecords(t *testing.T) { + const cnt = 1024 + ctx := context.Background() + records := make([]jrecord, cnt) + buffers := make([][]byte, cnt) + journal := make([]byte, cnt*1024) + + var off uint32 + for i := range records { + var r jrecord + var b []byte + if i%8 == 0 { + r, b = makeRootHashRecord() + off += writeRootHashRecord(journal[off:], r.address) + } else { + r, b = makeChunkRecord() + off += writeChunkRecord(journal[off:], mustCompressedChunk(r)) + } + records[i], buffers[i] = r, b + } + + var i, sum int + check := func(o int64, r jrecord) (_ error) { + require.True(t, i < cnt) + assert.Equal(t, records[i], r) + assert.Equal(t, sum, int(o)) + sum += len(buffers[i]) + i++ + return + } + + n, err := processRecords(ctx, bytes.NewReader(journal), check) + assert.Equal(t, cnt, i) + assert.Equal(t, int(off), int(n)) + require.NoError(t, err) + + i, sum = 0, 0 + // write a bogus record to the end and process again + writeCorruptRecord(journal[off:]) + n, err = processRecords(ctx, bytes.NewReader(journal), check) + assert.Equal(t, cnt, i) + assert.Equal(t, int(off), int(n)) + require.NoError(t, err) +} + +func randomMemTable(cnt int) (*memTable, map[addr]chunks.Chunk) { + chnx := make(map[addr]chunks.Chunk, cnt) + for i := 0; i < cnt; i++ { + ch := chunks.NewChunk(randBuf(100)) + chnx[addr(ch.Hash())] = ch + } + mt := newMemTable(uint64(cnt) * 256) + for a, ch := range chnx { + mt.addChunk(a, ch.Data()) + } + return mt, chnx +} + +func makeChunkRecord() (jrecord, []byte) { + ch := chunks.NewChunk(randBuf(100)) + cc := ChunkToCompressedChunk(ch) + payload := cc.FullCompressedChunk + + b := make([]byte, recMinSz+len(payload)) + writeUint(b, uint32(len(b))) + b[recLenSz] = byte(chunkKind) + copy(b[recLenSz+recKindSz:], cc.H[:]) + copy(b[recLenSz+recKindSz+addrSize:], payload) + c := crc(b[:len(b)-checksumSize]) + writeUint(b[len(b)-checksumSize:], c) + r := jrecord{ + length: uint32(len(b)), + kind: chunkKind, + address: addr(cc.H), + payload: payload, + checksum: c, + } + return r, b +} + +func makeRootHashRecord() (jrecord, []byte) { + a := addr(hash.Of(randBuf(8))) + b := make([]byte, recMinSz) + writeUint(b, uint32(len(b))) + b[recLenSz] = byte(rootHashKind) + copy(b[recLenSz+recKindSz:], a[:]) + c := crc(b[:len(b)-checksumSize]) + writeUint(b[len(b)-checksumSize:], c) + r := jrecord{ + length: uint32(len(b)), + kind: rootHashKind, + payload: b[len(b):], + address: a, + checksum: c, + } + return r, b +} + +func writeCorruptRecord(buf []byte) (n uint32) { + // fill with random data + rand.Read(buf[:recMinSz]) + // write a valid size, kind + writeUint(buf, recMinSz) + buf[recLenSz] = byte(rootHashKind) + return recMinSz +} + +func mustCompressedChunk(rec jrecord) CompressedChunk { + d.PanicIfFalse(rec.kind == chunkKind) + cc, err := NewCompressedChunk(hash.Hash(rec.address), rec.payload) + d.PanicIfError(err) + return cc +} + +func randBuf(n int) (b []byte) { + b = make([]byte, n) + rand.Read(b) + return +} diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index f1cb4b1670..99e417243a 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -257,3 +257,7 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manif return nil } + +func (ftp *fsTablePersister) Close() error { + return nil +} diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go new file mode 100644 index 0000000000..462415728f --- /dev/null +++ b/go/store/nbs/journal_writer.go @@ -0,0 +1,396 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "bufio" + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + + "github.com/dolthub/dolt/go/store/d" + "github.com/dolthub/dolt/go/store/hash" +) + +const ( + chunkJournalFileSize = 256 * 1024 * 1024 + + // todo(andy): buffer must be able to hold an entire record, + // but we don't have a hard limit on record size right now + journalWriterBuffSize = 1024 * 1024 + + chunkJournalAddr = "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv" +) + +var ( + openJournals = new(sync.Map) + journalAddr = addr(hash.Parse(chunkJournalAddr)) +) + +func openJournalWriter(ctx context.Context, path string) (wr *journalWriter, err error) { + var f *os.File + + if path, err = filepath.Abs(path); err != nil { + return nil, err + } + + if _, ok := openJournals.Load(path); ok { + return nil, fmt.Errorf("journal (%s) already opened in-process", path) + } + openJournals.Store(path, true) + + var create bool + info, err := os.Stat(path) + if errors.Is(err, os.ErrNotExist) { + create = true + } else if err != nil { + return nil, err + } else if info.IsDir() { + return nil, fmt.Errorf("expected file %s found directory", chunkJournalName) + } + + if create { + if f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666); err != nil { + return nil, err + } + const batch = 1024 * 1024 + b := make([]byte, batch) + for i := 0; i < chunkJournalFileSize; i += batch { + if _, err = f.Write(b); err != nil { // zero fill |f| + return nil, err + } + } + if err = f.Sync(); err != nil { + return nil, err + } + if o, err := f.Seek(0, io.SeekStart); err != nil { + return nil, err + } else if o != 0 { + return nil, fmt.Errorf("expected file offset 0, got %d", o) + } + } else { + if f, err = os.OpenFile(path, os.O_RDWR, 0666); err != nil { + return nil, err + } + } + + return &journalWriter{ + buf: make([]byte, 0, journalWriterBuffSize), + file: f, + path: path, + }, nil +} + +type journalWriter struct { + buf []byte + file *os.File + off int64 + path string +} + +var _ io.ReaderAt = &journalWriter{} +var _ io.WriteCloser = &journalWriter{} + +func (wr *journalWriter) filepath() string { + return wr.path +} + +func (wr *journalWriter) ReadAt(p []byte, off int64) (n int, err error) { + var bp []byte + if off < wr.off { + // fill some or all of |p| from |wr.file| + fread := int(wr.off - off) + if len(p) > fread { + // straddled read + bp = p[fread:] + p = p[:fread] + } + if n, err = wr.file.ReadAt(p, off); err != nil { + return 0, err + } + off = 0 + } else { + // fill all of |p| from |wr.buf| + bp = p + off -= wr.off + } + n += copy(bp, wr.buf[off:]) + return +} + +func (wr *journalWriter) Write(p []byte) (n int, err error) { + if len(p) > len(wr.buf) { + // write directly to |wr.file| + if err = wr.flush(); err != nil { + return 0, err + } + n, err = wr.file.WriteAt(p, wr.off) + wr.off += int64(n) + return + } + var buf []byte + if buf, err = wr.getBytes(len(p)); err != nil { + return 0, err + } + n = copy(buf, p) + return +} + +func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, cs journalChunkSource, err error) { + // bootstrap chunk journal from |wr.file| + src := journalChunkSource{ + journal: wr, + address: journalAddr, + lookups: make(map[addr]jrecordLookup), + } + wr.off, err = processRecords(ctx, wr.file, func(o int64, r jrecord) error { + switch r.kind { + case chunkKind: + src.lookups[r.address] = jrecordLookup{offset: o, length: r.length} + src.compressedSz += uint64(r.length) + // todo(andy): uncompressed size + case rootHashKind: + last = hash.Hash(r.address) + default: + return fmt.Errorf("unknown journal record kind (%d)", r.kind) + } + return nil + }) + if err != nil { + return hash.Hash{}, journalChunkSource{}, err + } + cs = src + return +} + +func (wr *journalWriter) writeChunk(cc CompressedChunk) (jrecordLookup, error) { + rec := jrecordLookup{ + offset: wr.offset(), + length: chunkRecordSize(cc), + } + buf, err := wr.getBytes(int(rec.length)) + if err != nil { + return jrecordLookup{}, err + } + _ = writeChunkRecord(buf, cc) + return rec, nil +} + +func (wr *journalWriter) writeRootHash(root hash.Hash) error { + buf, err := wr.getBytes(rootHashRecordSize) + if err != nil { + return err + } + _ = writeRootHashRecord(buf, addr(root)) + + if err = wr.flush(); err != nil { + return err + } + return wr.file.Sync() +} + +func (wr *journalWriter) offset() int64 { + return wr.off + int64(len(wr.buf)) +} + +func (wr *journalWriter) getBytes(n int) (buf []byte, err error) { + c, l := cap(wr.buf), len(wr.buf) + if n > c { + err = fmt.Errorf("requested bytes (%d) exceeds capacity (%d)", n, c) + return + } else if n > c-l { + if err = wr.flush(); err != nil { + return + } + } + l = len(wr.buf) + wr.buf = wr.buf[:l+n] + buf = wr.buf[l : l+n] + return +} + +func (wr *journalWriter) flush() (err error) { + if _, err = wr.file.WriteAt(wr.buf, wr.off); err != nil { + return err + } + wr.off += int64(len(wr.buf)) + wr.buf = wr.buf[:0] + return +} + +func (wr *journalWriter) Close() (err error) { + if err = wr.flush(); err != nil { + return err + } + if cerr := wr.file.Sync(); cerr != nil { + err = cerr + } + if cerr := wr.file.Close(); cerr != nil { + err = cerr + } + openJournals.Delete(wr.path) + return +} + +// todo(andy): extensible record format +type jrecord struct { + length uint32 + kind jrecordKind + address addr + payload []byte + checksum uint32 +} + +type jrecordKind uint8 + +const ( + unknownKind jrecordKind = 0 + rootHashKind jrecordKind = 1 + chunkKind jrecordKind = 2 + + recKindSz = 1 + recLenSz = uint32Size + recMinSz = recLenSz + recKindSz + addrSize + checksumSize + recMaxSz = 128 * 1024 // todo(andy): less arbitrary + + rootHashRecordSize = recMinSz +) + +func chunkRecordSize(c CompressedChunk) uint32 { + return uint32(len(c.FullCompressedChunk)) + recMinSz +} + +func writeChunkRecord(buf []byte, c CompressedChunk) (n uint32) { + l := chunkRecordSize(c) + writeUint(buf[:recLenSz], l) + n += recLenSz + buf[n] = byte(chunkKind) + n += recKindSz + copy(buf[n:], c.H[:]) + n += addrSize + copy(buf[n:], c.FullCompressedChunk) + n += uint32(len(c.FullCompressedChunk)) + writeUint(buf[n:], crc(buf[:n])) + n += checksumSize + d.PanicIfFalse(l == n) + return +} + +func writeRootHashRecord(buf []byte, root addr) (n uint32) { + writeUint(buf[:recLenSz], rootHashRecordSize) + n += recLenSz + buf[n] = byte(rootHashKind) + n += recKindSz + copy(buf[n:], root[:]) + n += addrSize + writeUint(buf[n:], crc(buf[:n])) + n += checksumSize + return +} + +func readJournalRecord(buf []byte) (rec jrecord) { + rec.length = readUint(buf) + buf = buf[recLenSz:] + rec.kind = jrecordKind(buf[0]) + buf = buf[recKindSz:] + copy(rec.address[:], buf) + buf = buf[addrSize:] + rec.payload = buf[:len(buf)-checksumSize] + rec.checksum = readUint(buf[len(buf)-checksumSize:]) + return +} + +func safeReadJournalRecord(buf []byte) (jrecord, bool) { + o := len(buf) - checksumSize + if crc(buf[:o]) != readUint(buf[o:]) { + return jrecord{}, false + } + + rec := readJournalRecord(buf) + switch rec.kind { + case rootHashKind: + return rec, true + + case chunkKind: + _, err := NewCompressedChunk(hash.Hash(rec.address), rec.payload) + if err != nil { + return jrecord{}, false + } + return rec, true + + default: + return jrecord{}, false + } +} + +func processRecords(ctx context.Context, r io.ReadSeeker, cb func(o int64, r jrecord) error) (int64, error) { + var ( + buf []byte + off int64 + err error + ) + + rdr := bufio.NewReaderSize(r, journalWriterBuffSize) + for { + // peek to read next record size + if buf, err = rdr.Peek(uint32Size); err != nil { + break + } + + l := readUint(buf) + if l < recMinSz || l > recMaxSz { + break + } else if buf, err = rdr.Peek(int(l)); err != nil { + break + } + + rec, ok := safeReadJournalRecord(buf) + if !ok { + break // stop if we can't validate |rec| + } + + if err = cb(off, rec); err != nil { + break + } + + // advance |rdr| state by |l| bytes + if _, err = io.ReadFull(rdr, buf); err != nil { + break + } + off += int64(len(buf)) + } + if err != nil && err != io.EOF { + return 0, err + } + // reset the file pointer to end of the last + // successfully processed journal record + if _, err = r.Seek(off, 0); err != nil { + return 0, err + } + return off, nil +} + +func readUint(buf []byte) uint32 { + return binary.BigEndian.Uint32(buf) +} + +func writeUint(buf []byte, u uint32) { + binary.BigEndian.PutUint32(buf, u) +} diff --git a/go/store/nbs/journal_writer_test.go b/go/store/nbs/journal_writer_test.go new file mode 100644 index 0000000000..4ae65643f0 --- /dev/null +++ b/go/store/nbs/journal_writer_test.go @@ -0,0 +1,289 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "testing" + + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type operation struct { + kind opKind + buf []byte + readAt int64 +} + +type opKind byte + +const ( + readOp opKind = iota + writeOp + flushOp +) + +func TestJournalWriter(t *testing.T) { + tests := []struct { + name string + size int + ops []operation + }{ + { + name: "smoke test", + size: 16, + }, + { + name: "write to empty file", + size: 16, + ops: []operation{ + {kind: writeOp, buf: []byte("lorem")}, + {kind: writeOp, buf: []byte("ipsum")}, + }, + }, + { + name: "read from non-empty file", + size: 16, + ops: []operation{ + {kind: writeOp, buf: []byte("loremipsum")}, + {kind: flushOp}, + {kind: readOp, buf: []byte("lorem"), readAt: 0}, + {kind: readOp, buf: []byte("ipsum"), readAt: 5}, + {kind: readOp, buf: []byte("loremipsum"), readAt: 0}, + }, + }, + { + name: "read new writes", + size: 16, + ops: []operation{ + {kind: writeOp, buf: []byte("lorem")}, + {kind: readOp, buf: []byte("lorem"), readAt: 0}, + {kind: writeOp, buf: []byte("ipsum")}, + {kind: readOp, buf: []byte("lorem"), readAt: 0}, + {kind: readOp, buf: []byte("ipsum"), readAt: 5}, + }, + }, + { + name: "read flushed writes", + size: 16, + ops: []operation{ + {kind: writeOp, buf: []byte("lorem")}, + {kind: flushOp}, + {kind: readOp, buf: []byte("lorem"), readAt: 0}, + {kind: writeOp, buf: []byte("ipsum")}, + {kind: readOp, buf: []byte("ipsum"), readAt: 5}, + {kind: readOp, buf: []byte("lorem"), readAt: 0}, + {kind: flushOp}, + }, + }, + { + name: "read partially flushed writes", + size: 16, + ops: []operation{ + {kind: writeOp, buf: []byte("lorem")}, + {kind: flushOp}, + {kind: writeOp, buf: []byte("ipsum")}, + {kind: readOp, buf: []byte("loremipsum"), readAt: 0}, + }, + }, + { + name: "successive writes trigger buffer flush ", + size: 16, + ops: []operation{ + {kind: writeOp, buf: []byte("lorem")}, + {kind: readOp, buf: []byte("lorem"), readAt: 0}, + {kind: writeOp, buf: []byte("ipsum")}, + {kind: readOp, buf: []byte("ipsum"), readAt: 5}, + {kind: writeOp, buf: []byte("dolor")}, + {kind: readOp, buf: []byte("dolor"), readAt: 10}, + {kind: writeOp, buf: []byte("sit")}, // triggers a flush + {kind: readOp, buf: []byte("sit"), readAt: 15}, + {kind: readOp, buf: []byte("loremipsumdolorsit"), readAt: 0}, + {kind: writeOp, buf: []byte("amet")}, + {kind: readOp, buf: []byte("amet"), readAt: 18}, + {kind: readOp, buf: []byte("loremipsumdolorsitamet"), readAt: 0}, + }, + }, + { + name: "write larger that buffer", + size: 8, + ops: []operation{ + {kind: writeOp, buf: []byte("loremipsum")}, + {kind: flushOp}, + {kind: writeOp, buf: []byte("dolorsitamet")}, + {kind: readOp, buf: []byte("dolorsitamet"), readAt: 10}, + {kind: readOp, buf: []byte("loremipsumdolorsitamet"), readAt: 0}, + }, + }, + { + name: "flush empty buffer", + size: 16, + ops: []operation{ + {kind: writeOp, buf: []byte("loremipsum")}, + {kind: flushOp}, + }, + }, + { + name: "double flush write", + size: 16, + ops: []operation{ + {kind: writeOp, buf: []byte("loremipsum")}, + {kind: flushOp}, + {kind: writeOp, buf: []byte("dolor")}, + {kind: flushOp}, + {kind: flushOp}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + j, err := openJournalWriter(ctx, newTestFilePath(t)) + require.NoError(t, err) + + var off int64 + for i, op := range test.ops { + switch op.kind { + case readOp: + act := make([]byte, len(op.buf)) + n, err := j.ReadAt(act, op.readAt) + assert.NoError(t, err, "operation %d errored", i) + assert.Equal(t, len(op.buf), n, "operation %d failed", i) + assert.Equal(t, op.buf, act, "operation %d failed", i) + case writeOp: + n, err := j.Write(op.buf) + assert.NoError(t, err, "operation %d errored", i) + assert.Equal(t, len(op.buf), n, "operation %d failed", i) + off += int64(n) + case flushOp: + err = j.flush() + assert.NoError(t, err, "operation %d errored", i) + default: + t.Fatal("unknown opKind") + } + assert.Equal(t, off, j.offset()) + } + assert.NoError(t, j.Close()) + }) + } +} + +func TestJournalWriterWriteChunk(t *testing.T) { + ctx := context.Background() + j, err := openJournalWriter(ctx, newTestFilePath(t)) + require.NoError(t, err) + + data := randomCompressedChunks() + lookups := make(map[addr]jrecordLookup) + + for a, cc := range data { + l, err := j.writeChunk(cc) + require.NoError(t, err) + lookups[a] = l + validateLookup(t, j, l, cc) + } + for a, l := range lookups { + validateLookup(t, j, l, data[a]) + } + require.NoError(t, j.Close()) +} + +func TestJournalWriterBootstrap(t *testing.T) { + ctx := context.Background() + path := newTestFilePath(t) + j, err := openJournalWriter(ctx, path) + require.NoError(t, err) + + data := randomCompressedChunks() + lookups := make(map[addr]jrecordLookup) + for a, cc := range data { + l, err := j.writeChunk(cc) + require.NoError(t, err) + lookups[a] = l + } + assert.NoError(t, j.Close()) + + j, err = openJournalWriter(ctx, path) + require.NoError(t, err) + _, source, err := j.bootstrapJournal(ctx) + require.NoError(t, err) + + for a, l := range lookups { + validateLookup(t, j, l, data[a]) + } + for a, cc := range data { + buf, err := source.get(ctx, a, nil) + require.NoError(t, err) + ch, err := cc.ToChunk() + require.NoError(t, err) + assert.Equal(t, ch.Data(), buf) + } + require.NoError(t, j.Close()) +} + +func validateLookup(t *testing.T, j *journalWriter, l jrecordLookup, cc CompressedChunk) { + b := make([]byte, l.length) + n, err := j.ReadAt(b, l.offset) + require.NoError(t, err) + assert.Equal(t, int(l.length), n) + rec := readJournalRecord(b) + assert.Equal(t, hash.Hash(rec.address), cc.Hash()) + assert.Equal(t, rec.payload, cc.FullCompressedChunk) +} + +func TestJournalWriterSyncClose(t *testing.T) { + ctx := context.Background() + j, err := openJournalWriter(ctx, newTestFilePath(t)) + require.NoError(t, err) + _, _, err = j.bootstrapJournal(ctx) + require.NoError(t, err) + + // close triggers flush + n, err := j.Write([]byte("sit")) + require.NoError(t, err) + assert.Equal(t, 3, n) + err = j.Close() + require.NoError(t, err) + assert.Equal(t, 0, len(j.buf)) + assert.Equal(t, 3, int(j.off)) +} + +func newTestFilePath(t *testing.T) string { + name := fmt.Sprintf("journal%d.log", rand.Intn(65536)) + return filepath.Join(t.TempDir(), name) +} + +func randomCompressedChunks() (compressed map[addr]CompressedChunk) { + buf := make([]byte, 1024*1024) + rand.Read(buf) + + compressed = make(map[addr]CompressedChunk) + for { + k := rand.Intn(51) + 50 + if k >= len(buf) { + return + } + c := chunks.NewChunk(buf[:k]) + buf = buf[k:] + compressed[addr(c.Hash())] = ChunkToCompressedChunk(c) + } +} diff --git a/go/store/nbs/root_tracker_test.go b/go/store/nbs/root_tracker_test.go index c27a8515d4..7ae44f6059 100644 --- a/go/store/nbs/root_tracker_test.go +++ b/go/store/nbs/root_tracker_test.go @@ -635,6 +635,10 @@ func (ftp fakeTablePersister) PruneTableFiles(_ context.Context, _ manifestConte return chunks.ErrUnsupportedOperation } +func (ftp fakeTablePersister) Close() error { + return nil +} + func extractAllChunks(ctx context.Context, src chunkSource, cb func(rec extractRecord)) (err error) { var index tableIndex if index, err = src.index(); err != nil { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 4584943135..b71da1b85c 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -524,26 +524,29 @@ func NewLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSi func newLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSize uint64, maxTables int, q MemoryQuotaProvider) (*NomsBlockStore, error) { cacheOnce.Do(makeGlobalCaches) - err := checkDir(dir) - - if err != nil { + if err := checkDir(dir); err != nil { return nil, err } m, err := getFileManifest(ctx, dir) - if err != nil { return nil, err } - - mm := makeManifestManager(m) p := newFSTablePersister(dir, globalFDCache, q) - nbs, err := newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{maxTables}, memTableSize) + if chunkJournalFeatureFlag { + j, err := newChunkJournal(ctx, dir, m) + if err != nil { + return nil, err + } + m, p = j, j + } + mm := makeManifestManager(m) + + nbs, err := newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{maxTables}, memTableSize) if err != nil { return nil, err } - return nbs, nil } @@ -1163,8 +1166,14 @@ func (nbs *NomsBlockStore) Version() string { return nbs.upstream.nbfVers } -func (nbs *NomsBlockStore) Close() error { - return nbs.tables.close() +func (nbs *NomsBlockStore) Close() (err error) { + if cerr := nbs.p.Close(); cerr != nil { + err = cerr + } + if cerr := nbs.tables.close(); cerr != nil { + err = cerr + } + return } func (nbs *NomsBlockStore) Stats() interface{} { @@ -1300,11 +1309,11 @@ func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) { if !ok { return uint64(0), errors.New("manifest referenced table file for which there is no chunkSource.") } - ti, err := cs.index() + sz, err := cs.size() if err != nil { return uint64(0), fmt.Errorf("error getting table file index for chunkSource. %w", err) } - size += ti.tableFileSize() + size += sz } return size, nil } diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index 6c353603df..86adf161ce 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -41,6 +41,10 @@ import ( ) func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, nomsDir string, q MemoryQuotaProvider) { + if chunkJournalFeatureFlag { + t.Skip() + } + ctx := context.Background() nomsDir = filepath.Join(tempfiles.MovableTempFileProvider.GetTempDir(), "noms_"+uuid.New().String()[:8]) err := os.MkdirAll(nomsDir, os.ModePerm) diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index ef7e70c217..8d7c62596a 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -27,6 +27,7 @@ import ( "crypto/sha512" "encoding/binary" "errors" + "io" "sort" "time" ) @@ -55,6 +56,8 @@ type tablePersister interface { // PruneTableFiles deletes old table files that are no longer referenced in the manifest. PruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error + + io.Closer } type chunkSourcesByAscendingCount struct { diff --git a/go/store/nbs/table_reader.go b/go/store/nbs/table_reader.go index fe6f850927..b622c18976 100644 --- a/go/store/nbs/table_reader.go +++ b/go/store/nbs/table_reader.go @@ -79,6 +79,9 @@ func (cmp CompressedChunk) ToChunk() (chunks.Chunk, error) { func ChunkToCompressedChunk(chunk chunks.Chunk) CompressedChunk { compressed := snappy.Encode(nil, chunk.Data()) length := len(compressed) + // todo: this append allocates a new buffer and copies |compressed|. + // This is costly, but maybe better, as it allows us to reclaim the + // extra space allocated in snappy.Encode (see snappy.MaxEncodedLen). compressed = append(compressed, []byte{0, 0, 0, 0}...) binary.BigEndian.PutUint32(compressed[length:], crc(compressed[:length])) return CompressedChunk{H: chunk.Hash(), FullCompressedChunk: compressed, CompressedData: compressed[:length]} @@ -94,6 +97,11 @@ func (cmp CompressedChunk) IsEmpty() bool { return len(cmp.CompressedData) == 0 || (len(cmp.CompressedData) == 1 && cmp.CompressedData[0] == 0) } +// CompressedSize returns the size of this CompressedChunk. +func (cmp CompressedChunk) CompressedSize() int { + return len(cmp.CompressedData) +} + var EmptyCompressedChunk CompressedChunk func init() { diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index 76418f1497..e29749c097 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -231,11 +231,11 @@ func (ts tableSet) uncompressedLen() (uint64, error) { func (ts tableSet) physicalLen() (uint64, error) { f := func(css chunkSourceSet) (data uint64, err error) { for _, haver := range css { - index, err := haver.index() + sz, err := haver.size() if err != nil { return 0, err } - data += index.tableFileSize() + data += sz } return } @@ -417,29 +417,26 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats) func (ts tableSet) toSpecs() ([]tableSpec, error) { tableSpecs := make([]tableSpec, 0, ts.Size()) - for _, src := range ts.novel { - cnt, err := src.count() - - if err != nil { - return nil, err + for a, src := range ts.novel { + if _, ok := ts.upstream[a]; ok { + continue } - if cnt > 0 { + cnt, err := src.count() + if err != nil { + return nil, err + } else if cnt > 0 { h := src.hash() tableSpecs = append(tableSpecs, tableSpec{h, cnt}) } } for _, src := range ts.upstream { cnt, err := src.count() - if err != nil { return nil, err - } - - if cnt <= 0 { + } else if cnt <= 0 { return nil, errors.New("no upstream chunks") } - h := src.hash() tableSpecs = append(tableSpecs, tableSpec{h, cnt}) }