added chunkJournal, journalWriter, and journalChunkSource

This commit is contained in:
Andy Arthur
2022-11-28 14:54:11 -08:00
parent b66f5b1600
commit edcc243c13
17 changed files with 1416 additions and 35 deletions

View File

@@ -24,7 +24,6 @@ import (
"os"
"os/exec"
"strconv"
"sync"
"time"
"github.com/fatih/color"
@@ -390,11 +389,16 @@ func runMain() int {
}
start := time.Now()
var wg sync.WaitGroup
ctx, stop := context.WithCancel(ctx)
res := doltCommand.Exec(ctx, "dolt", args, dEnv)
stop()
wg.Wait()
if err = dbfactory.CloseAllLocalDatabases(); err != nil {
cli.PrintErrln(err)
if res == 0 {
res = 1
}
}
if csMetrics && dEnv.DoltDB != nil {
metricsSummary := dEnv.DoltDB.CSMetricsSummary()

View File

@@ -17,9 +17,11 @@ package dbfactory
import (
"context"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"sync"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/datas"
@@ -43,6 +45,26 @@ var DoltDataDir = filepath.Join(DoltDir, DataDir)
type FileFactory struct {
}
type singletonDB struct {
ddb datas.Database
vrw types.ValueReadWriter
ns tree.NodeStore
}
var singletonLock = new(sync.Mutex)
var singletons = make(map[string]singletonDB)
func CloseAllLocalDatabases() (err error) {
singletonLock.Lock()
defer singletonLock.Unlock()
for name, s := range singletons {
if cerr := s.ddb.Close(); cerr != nil {
err = fmt.Errorf("error closing DB %s (%s)", name, cerr)
}
}
return
}
// PrepareDB creates the directory for the DB if it doesn't exist, and returns an error if a file or symlink is at the
// path given
func (fact FileFactory) PrepareDB(ctx context.Context, nbf *types.NomsBinFormat, u *url.URL, params map[string]interface{}) error {
@@ -71,6 +93,13 @@ func (fact FileFactory) PrepareDB(ctx context.Context, nbf *types.NomsBinFormat,
// CreateDB creates a local filesys backed database
func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
singletonLock.Lock()
defer singletonLock.Unlock()
if s, ok := singletons[urlObj.String()]; ok {
return s.ddb, s.vrw, s.ns, nil
}
path, err := url.PathUnescape(urlObj.Path)
if err != nil {
@@ -115,8 +144,15 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat,
vrw := types.NewValueStore(st)
ns := tree.NewNodeStore(st)
ddb := datas.NewTypesDatabase(vrw, ns)
return datas.NewTypesDatabase(vrw, ns), vrw, ns, nil
singletons[urlObj.String()] = singletonDB{
ddb: ddb,
vrw: vrw,
ns: ns,
}
return ddb, vrw, ns, nil
}
func validateDir(path string) error {

View File

@@ -35,6 +35,9 @@ do
--row2) export ENABLE_ROW_ITER_2=true
;;
--journal) export DOLT_ENABLE_CHUNK_JOURNAL=true
;;
# specify sysbench benchmark
*) SYSBENCH_TEST="$1"
;;
@@ -123,6 +126,7 @@ sysbench \
--db-ps-mode=disable \
"$SYSBENCH_TEST" run
unset DOLT_ENABLE_CHUNK_JOURNAL
unset DOLT_DEFAULT_BIN_FORMAT
unset ENABLE_ROW_ITER_2
unset SINGLE_THREAD_FEATURE_FLAG

View File

@@ -578,3 +578,7 @@ func (s3p awsTablePersister) uploadPart(ctx context.Context, data []byte, key, u
func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents, t time.Time) error {
return chunks.ErrUnsupportedOperation
}
func (s3p awsTablePersister) Close() error {
return nil
}

View File

@@ -46,21 +46,32 @@ import (
const testMemTableSize = 1 << 8
func TestBlockStoreSuite(t *testing.T) {
suite.Run(t, &BlockStoreSuite{})
fn := func(ctx context.Context, dir string) (*NomsBlockStore, error) {
nbf := constants.FormatDefaultString
qp := NewUnlimitedMemQuotaProvider()
return NewLocalStore(ctx, nbf, dir, testMemTableSize, qp)
}
suite.Run(t, &BlockStoreSuite{factory: fn})
}
type BlockStoreSuite struct {
suite.Suite
dir string
store *NomsBlockStore
factory nbsFactory
putCountFn func() int
// if true, skip interloper tests
skipInterloper bool
}
type nbsFactory func(ctx context.Context, dir string) (*NomsBlockStore, error)
func (suite *BlockStoreSuite) SetupTest() {
var err error
suite.dir, err = os.MkdirTemp("", "")
suite.NoError(err)
suite.store, err = NewLocalStore(context.Background(), constants.FormatDefaultString, suite.dir, testMemTableSize, NewUnlimitedMemQuotaProvider())
suite.dir = suite.T().TempDir()
ctx := context.Background()
suite.store, err = suite.factory(ctx, suite.dir)
suite.NoError(err)
suite.putCountFn = func() int {
return int(suite.store.putCount)
@@ -194,9 +205,9 @@ func (suite *BlockStoreSuite) TestChunkStorePutMoreThanMemTable() {
if suite.putCountFn != nil {
suite.Equal(2, suite.putCountFn())
}
specs, err := suite.store.tables.toSpecs()
sz, err := suite.store.tables.physicalLen()
suite.NoError(err)
suite.Len(specs, 2)
suite.True(sz > testMemTableSize)
}
func (suite *BlockStoreSuite) TestChunkStoreGetMany() {
@@ -271,6 +282,9 @@ func (suite *BlockStoreSuite) TestChunkStoreHasMany() {
}
func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() {
if suite.skipInterloper {
suite.T().Skip()
}
input1, input2 := []byte("abc"), []byte("def")
c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2)
root, err := suite.store.Root(context.Background())
@@ -319,6 +333,9 @@ func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() {
}
func (suite *BlockStoreSuite) TestChunkStoreRebaseOnNoOpFlush() {
if suite.skipInterloper {
suite.T().Skip()
}
input1 := []byte("abc")
c1 := chunks.NewChunk(input1)
@@ -353,6 +370,9 @@ func (suite *BlockStoreSuite) TestChunkStoreRebaseOnNoOpFlush() {
}
func (suite *BlockStoreSuite) TestChunkStorePutWithRebase() {
if suite.skipInterloper {
suite.T().Skip()
}
input1, input2 := []byte("abc"), []byte("def")
c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2)
root, err := suite.store.Root(context.Background())

View File

@@ -131,3 +131,7 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch
func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, contents manifestContents, t time.Time) error {
return chunks.ErrUnsupportedOperation
}
func (bsp *blobstorePersister) Close() error {
return nil
}

View File

@@ -0,0 +1,361 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"time"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
var chunkJournalFeatureFlag = false
func init() {
if os.Getenv("DOLT_ENABLE_CHUNK_JOURNAL") != "" {
chunkJournalFeatureFlag = true
}
}
const (
chunkJournalName = "nbs_chunk_journal"
)
type chunkJournal struct {
journal *journalWriter
source journalChunkSource
contents manifestContents
backing manifest
}
var _ tablePersister = &chunkJournal{}
var _ manifest = &chunkJournal{}
var _ io.Closer = &chunkJournal{}
type journalChunkSource struct {
address addr
journal io.ReaderAt
lookups map[addr]jrecordLookup
compressedSz uint64
}
var _ chunkSource = journalChunkSource{}
type jrecordLookup struct {
offset int64
length uint32
}
func newChunkJournal(ctx context.Context, dir string, m manifest) (*chunkJournal, error) {
path, err := filepath.Abs(filepath.Join(dir, chunkJournalName))
if err != nil {
return nil, err
}
wr, err := openJournalWriter(ctx, path)
if err != nil {
return nil, err
}
root, source, err := wr.bootstrapJournal(ctx)
if err != nil {
return nil, err
}
ok, contents, err := m.ParseIfExists(ctx, &Stats{}, nil)
if err != nil {
return nil, err
}
if ok {
// the journal file is the source of truth for the root hash, true-up persisted manifest
contents.root = root
if contents, err = m.Update(ctx, contents.lock, contents, &Stats{}, nil); err != nil {
return nil, err
}
} else if !emptyAddr(addr(root)) {
// journal file contains root hash, but manifest is missing
return nil, fmt.Errorf("missing manifest while initializing chunk journal")
}
return &chunkJournal{
journal: wr,
source: source,
contents: contents,
backing: m,
}, nil
}
// Persist implements tablePersister.
func (j *chunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
if haver != nil {
sort.Sort(hasRecordByPrefix(mt.order)) // hasMany() requires addresses to be sorted.
if _, err := haver.hasMany(mt.order); err != nil {
return nil, err
}
sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write
}
for _, record := range mt.order {
if record.has {
continue
}
c := chunks.NewChunkWithHash(hash.Hash(*record.a), mt.chunks[*record.a])
cc := ChunkToCompressedChunk(c)
lookup, err := j.journal.writeChunk(cc)
if err != nil {
return nil, err
}
j.source.lookups[*record.a] = lookup
j.source.compressedSz += uint64(cc.CompressedSize())
}
return j.source, nil
}
// ConjoinAll implements tablePersister.
func (j *chunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
panic("unimplemented")
}
// Open implements tablePersister.
func (j *chunkJournal) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
if name == j.source.address {
return j.source, nil
}
return nil, fmt.Errorf("unknown chunk source %s", name.String())
}
// Exists implements tablePersister.
func (j *chunkJournal) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
panic("unimplemented")
}
// PruneTableFiles implements tablePersister.
func (j *chunkJournal) PruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error {
panic("unimplemented")
}
// Name implements manifest.
func (j *chunkJournal) Name() string {
return j.journal.filepath()
}
// Update implements manifest.
func (j *chunkJournal) Update(ctx context.Context, lastLock addr, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
if j.contents.gcGen != next.gcGen {
panic("chunkJournal cannot update GC generation")
} else if j.contents.lock != lastLock {
return j.contents, nil // |next| is stale
}
if writeHook != nil {
if err := writeHook(); err != nil {
return manifestContents{}, err
}
}
if emptyAddr(addr(next.root)) {
panic(next)
}
if err := j.journal.writeRootHash(next.root); err != nil {
return manifestContents{}, err
}
j.contents = next
return j.contents, nil
}
// ParseIfExists implements manifest.
func (j *chunkJournal) ParseIfExists(ctx context.Context, stats *Stats, readHook func() error) (ok bool, mc manifestContents, err error) {
if emptyAddr(j.contents.lock) {
ok, mc, err = j.backing.ParseIfExists(ctx, stats, readHook)
if err != nil || !ok {
return false, manifestContents{}, err
}
j.contents = mc
return
}
if readHook != nil {
if err = readHook(); err != nil {
return false, manifestContents{}, err
}
}
ok, mc = true, j.contents
return
}
func (j *chunkJournal) flushManifest() error {
ctx, s := context.Background(), &Stats{}
_, last, err := j.backing.ParseIfExists(ctx, s, nil)
if err != nil {
return err
}
if !emptyAddr(j.contents.lock) {
_, err = j.backing.Update(ctx, last.lock, j.contents, s, nil)
}
return err
}
// Close implements io.Closer
func (j *chunkJournal) Close() (err error) {
if cerr := j.flushManifest(); cerr != nil {
err = cerr
}
if cerr := j.journal.Close(); cerr != nil {
err = cerr
}
return
}
func (s journalChunkSource) has(h addr) (bool, error) {
_, ok := s.lookups[h]
return ok, nil
}
func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error) {
for i := range addrs {
a := addrs[i].a
if _, ok := s.lookups[*a]; ok {
addrs[i].has = true
} else {
missing = true
}
}
return
}
func (s journalChunkSource) getCompressed(_ context.Context, h addr, _ *Stats) (cc CompressedChunk, err error) {
l, ok := s.lookups[h]
if !ok {
return CompressedChunk{}, nil
}
buf := make([]byte, l.length)
if _, err = s.journal.ReadAt(buf, l.offset); err != nil {
return CompressedChunk{}, nil
}
rec := readJournalRecord(buf)
if h != rec.address {
err = fmt.Errorf("bad chunk get (%s != %s)", h.String(), rec.address.String())
return
}
return NewCompressedChunk(hash.Hash(h), rec.payload)
}
func (s journalChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
cc, err := s.getCompressed(ctx, h, stats)
if err != nil {
return nil, err
} else if cc.IsEmpty() {
return nil, nil
}
ch, err := cc.ToChunk()
if err != nil {
return nil, err
}
return ch.Data(), nil
}
func (s journalChunkSource) getMany(ctx context.Context, _ *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
var remaining bool
// todo: read planning
for i := range reqs {
data, err := s.get(ctx, *reqs[i].a, stats)
if err != nil {
return false, err
} else if data != nil {
ch := chunks.NewChunkWithHash(hash.Hash(*reqs[i].a), data)
found(ctx, &ch)
} else {
remaining = true
}
}
return remaining, nil
}
func (s journalChunkSource) getManyCompressed(ctx context.Context, _ *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
var remaining bool
// todo: read planning
for i := range reqs {
cc, err := s.getCompressed(ctx, *reqs[i].a, stats)
if err != nil {
return false, err
} else if cc.IsEmpty() {
remaining = true
} else {
found(ctx, cc)
}
}
return remaining, nil
}
func (s journalChunkSource) count() (uint32, error) {
return uint32(len(s.lookups)), nil
}
func (s journalChunkSource) uncompressedLen() (uint64, error) {
// todo(andy)
return s.compressedSz, nil
}
func (s journalChunkSource) hash() addr {
return s.address
}
// reader implements chunkSource.
func (s journalChunkSource) reader(context.Context) (io.Reader, error) {
// todo(andy): |reader()| belongs to the chunkSource interface and exists
// due to the duality between chunkSources & table files. chunkJournal
// seeks to create many chunkSources that depend on a single file.
// |reader()| in particular is relevant to conjoin implementations.
panic("unimplemented")
}
// size implements chunkSource.
// size returns the total size of the chunkSource: chunks, index, and footer
func (s journalChunkSource) size() (uint64, error) {
return s.compressedSz, nil // todo(andy)
}
// index implements chunkSource.
func (s journalChunkSource) index() (tableIndex, error) {
panic("unimplemented")
}
func (s journalChunkSource) clone() (chunkSource, error) {
return s, nil
}
func (s journalChunkSource) close() error {
return nil
}
func emptyAddr(a addr) bool {
var b addr
return a == b
}

View File

@@ -0,0 +1,234 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"bytes"
"context"
"math/rand"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/constants"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
)
func TestChunkJournalBlockStoreSuite(t *testing.T) {
cacheOnce.Do(makeGlobalCaches)
fn := func(ctx context.Context, dir string) (*NomsBlockStore, error) {
m, err := getFileManifest(ctx, dir)
if err != nil {
return nil, err
}
j, err := newChunkJournal(ctx, dir, m)
if err != nil {
return nil, err
}
nbf := constants.FormatDefaultString
mm := makeManifestManager(j)
q := NewUnlimitedMemQuotaProvider()
c := inlineConjoiner{defaultMaxTables}
return newNomsBlockStore(ctx, nbf, mm, j, q, c, testMemTableSize)
}
suite.Run(t, &BlockStoreSuite{
factory: fn,
skipInterloper: true,
})
}
func TestChunkJournalPersist(t *testing.T) {
ctx := context.Background()
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
m, err := getFileManifest(ctx, dir)
require.NoError(t, err)
j, err := newChunkJournal(ctx, dir, m)
require.NoError(t, err)
const iters = 64
stats := &Stats{}
haver := emptyChunkSource{}
for i := 0; i < iters; i++ {
memTbl, chunkMap := randomMemTable(16)
source, err := j.Persist(ctx, memTbl, haver, stats)
assert.NoError(t, err)
for h, ch := range chunkMap {
ok, err := source.has(h)
assert.NoError(t, err)
assert.True(t, ok)
data, err := source.get(ctx, h, stats)
assert.NoError(t, err)
assert.Equal(t, ch.Data(), data)
}
cs, err := j.Open(ctx, source.hash(), 16, stats)
assert.NotNil(t, cs)
assert.NoError(t, err)
}
}
func TestRoundTripRecords(t *testing.T) {
t.Run("chunk record", func(t *testing.T) {
for i := 0; i < 64; i++ {
rec, buf := makeChunkRecord()
assert.Equal(t, rec.length, uint32(len(buf)))
b := make([]byte, rec.length)
n := writeChunkRecord(b, mustCompressedChunk(rec))
assert.Equal(t, n, rec.length)
assert.Equal(t, buf, b)
r := readJournalRecord(buf)
assert.Equal(t, rec, r)
}
})
t.Run("root hash record", func(t *testing.T) {
for i := 0; i < 64; i++ {
rec, buf := makeRootHashRecord()
assert.Equal(t, rec.length, uint32(len(buf)))
b := make([]byte, rec.length)
n := writeRootHashRecord(b, rec.address)
assert.Equal(t, n, rec.length)
assert.Equal(t, buf, b)
r := readJournalRecord(buf)
assert.Equal(t, rec, r)
}
})
}
func TestProcessRecords(t *testing.T) {
const cnt = 1024
ctx := context.Background()
records := make([]jrecord, cnt)
buffers := make([][]byte, cnt)
journal := make([]byte, cnt*1024)
var off uint32
for i := range records {
var r jrecord
var b []byte
if i%8 == 0 {
r, b = makeRootHashRecord()
off += writeRootHashRecord(journal[off:], r.address)
} else {
r, b = makeChunkRecord()
off += writeChunkRecord(journal[off:], mustCompressedChunk(r))
}
records[i], buffers[i] = r, b
}
var i, sum int
check := func(o int64, r jrecord) (_ error) {
require.True(t, i < cnt)
assert.Equal(t, records[i], r)
assert.Equal(t, sum, int(o))
sum += len(buffers[i])
i++
return
}
n, err := processRecords(ctx, bytes.NewReader(journal), check)
assert.Equal(t, cnt, i)
assert.Equal(t, int(off), int(n))
require.NoError(t, err)
i, sum = 0, 0
// write a bogus record to the end and process again
writeCorruptRecord(journal[off:])
n, err = processRecords(ctx, bytes.NewReader(journal), check)
assert.Equal(t, cnt, i)
assert.Equal(t, int(off), int(n))
require.NoError(t, err)
}
func randomMemTable(cnt int) (*memTable, map[addr]chunks.Chunk) {
chnx := make(map[addr]chunks.Chunk, cnt)
for i := 0; i < cnt; i++ {
ch := chunks.NewChunk(randBuf(100))
chnx[addr(ch.Hash())] = ch
}
mt := newMemTable(uint64(cnt) * 256)
for a, ch := range chnx {
mt.addChunk(a, ch.Data())
}
return mt, chnx
}
func makeChunkRecord() (jrecord, []byte) {
ch := chunks.NewChunk(randBuf(100))
cc := ChunkToCompressedChunk(ch)
payload := cc.FullCompressedChunk
b := make([]byte, recMinSz+len(payload))
writeUint(b, uint32(len(b)))
b[recLenSz] = byte(chunkKind)
copy(b[recLenSz+recKindSz:], cc.H[:])
copy(b[recLenSz+recKindSz+addrSize:], payload)
c := crc(b[:len(b)-checksumSize])
writeUint(b[len(b)-checksumSize:], c)
r := jrecord{
length: uint32(len(b)),
kind: chunkKind,
address: addr(cc.H),
payload: payload,
checksum: c,
}
return r, b
}
func makeRootHashRecord() (jrecord, []byte) {
a := addr(hash.Of(randBuf(8)))
b := make([]byte, recMinSz)
writeUint(b, uint32(len(b)))
b[recLenSz] = byte(rootHashKind)
copy(b[recLenSz+recKindSz:], a[:])
c := crc(b[:len(b)-checksumSize])
writeUint(b[len(b)-checksumSize:], c)
r := jrecord{
length: uint32(len(b)),
kind: rootHashKind,
payload: b[len(b):],
address: a,
checksum: c,
}
return r, b
}
func writeCorruptRecord(buf []byte) (n uint32) {
// fill with random data
rand.Read(buf[:recMinSz])
// write a valid size, kind
writeUint(buf, recMinSz)
buf[recLenSz] = byte(rootHashKind)
return recMinSz
}
func mustCompressedChunk(rec jrecord) CompressedChunk {
d.PanicIfFalse(rec.kind == chunkKind)
cc, err := NewCompressedChunk(hash.Hash(rec.address), rec.payload)
d.PanicIfError(err)
return cc
}
func randBuf(n int) (b []byte) {
b = make([]byte, n)
rand.Read(b)
return
}

View File

@@ -257,3 +257,7 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manif
return nil
}
func (ftp *fsTablePersister) Close() error {
return nil
}

View File

@@ -0,0 +1,396 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"bufio"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
)
const (
chunkJournalFileSize = 256 * 1024 * 1024
// todo(andy): buffer must be able to hold an entire record,
// but we don't have a hard limit on record size right now
journalWriterBuffSize = 1024 * 1024
chunkJournalAddr = "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"
)
var (
openJournals = new(sync.Map)
journalAddr = addr(hash.Parse(chunkJournalAddr))
)
func openJournalWriter(ctx context.Context, path string) (wr *journalWriter, err error) {
var f *os.File
if path, err = filepath.Abs(path); err != nil {
return nil, err
}
if _, ok := openJournals.Load(path); ok {
return nil, fmt.Errorf("journal (%s) already opened in-process", path)
}
openJournals.Store(path, true)
var create bool
info, err := os.Stat(path)
if errors.Is(err, os.ErrNotExist) {
create = true
} else if err != nil {
return nil, err
} else if info.IsDir() {
return nil, fmt.Errorf("expected file %s found directory", chunkJournalName)
}
if create {
if f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666); err != nil {
return nil, err
}
const batch = 1024 * 1024
b := make([]byte, batch)
for i := 0; i < chunkJournalFileSize; i += batch {
if _, err = f.Write(b); err != nil { // zero fill |f|
return nil, err
}
}
if err = f.Sync(); err != nil {
return nil, err
}
if o, err := f.Seek(0, io.SeekStart); err != nil {
return nil, err
} else if o != 0 {
return nil, fmt.Errorf("expected file offset 0, got %d", o)
}
} else {
if f, err = os.OpenFile(path, os.O_RDWR, 0666); err != nil {
return nil, err
}
}
return &journalWriter{
buf: make([]byte, 0, journalWriterBuffSize),
file: f,
path: path,
}, nil
}
type journalWriter struct {
buf []byte
file *os.File
off int64
path string
}
var _ io.ReaderAt = &journalWriter{}
var _ io.WriteCloser = &journalWriter{}
func (wr *journalWriter) filepath() string {
return wr.path
}
func (wr *journalWriter) ReadAt(p []byte, off int64) (n int, err error) {
var bp []byte
if off < wr.off {
// fill some or all of |p| from |wr.file|
fread := int(wr.off - off)
if len(p) > fread {
// straddled read
bp = p[fread:]
p = p[:fread]
}
if n, err = wr.file.ReadAt(p, off); err != nil {
return 0, err
}
off = 0
} else {
// fill all of |p| from |wr.buf|
bp = p
off -= wr.off
}
n += copy(bp, wr.buf[off:])
return
}
func (wr *journalWriter) Write(p []byte) (n int, err error) {
if len(p) > len(wr.buf) {
// write directly to |wr.file|
if err = wr.flush(); err != nil {
return 0, err
}
n, err = wr.file.WriteAt(p, wr.off)
wr.off += int64(n)
return
}
var buf []byte
if buf, err = wr.getBytes(len(p)); err != nil {
return 0, err
}
n = copy(buf, p)
return
}
func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, cs journalChunkSource, err error) {
// bootstrap chunk journal from |wr.file|
src := journalChunkSource{
journal: wr,
address: journalAddr,
lookups: make(map[addr]jrecordLookup),
}
wr.off, err = processRecords(ctx, wr.file, func(o int64, r jrecord) error {
switch r.kind {
case chunkKind:
src.lookups[r.address] = jrecordLookup{offset: o, length: r.length}
src.compressedSz += uint64(r.length)
// todo(andy): uncompressed size
case rootHashKind:
last = hash.Hash(r.address)
default:
return fmt.Errorf("unknown journal record kind (%d)", r.kind)
}
return nil
})
if err != nil {
return hash.Hash{}, journalChunkSource{}, err
}
cs = src
return
}
func (wr *journalWriter) writeChunk(cc CompressedChunk) (jrecordLookup, error) {
rec := jrecordLookup{
offset: wr.offset(),
length: chunkRecordSize(cc),
}
buf, err := wr.getBytes(int(rec.length))
if err != nil {
return jrecordLookup{}, err
}
_ = writeChunkRecord(buf, cc)
return rec, nil
}
func (wr *journalWriter) writeRootHash(root hash.Hash) error {
buf, err := wr.getBytes(rootHashRecordSize)
if err != nil {
return err
}
_ = writeRootHashRecord(buf, addr(root))
if err = wr.flush(); err != nil {
return err
}
return wr.file.Sync()
}
func (wr *journalWriter) offset() int64 {
return wr.off + int64(len(wr.buf))
}
func (wr *journalWriter) getBytes(n int) (buf []byte, err error) {
c, l := cap(wr.buf), len(wr.buf)
if n > c {
err = fmt.Errorf("requested bytes (%d) exceeds capacity (%d)", n, c)
return
} else if n > c-l {
if err = wr.flush(); err != nil {
return
}
}
l = len(wr.buf)
wr.buf = wr.buf[:l+n]
buf = wr.buf[l : l+n]
return
}
func (wr *journalWriter) flush() (err error) {
if _, err = wr.file.WriteAt(wr.buf, wr.off); err != nil {
return err
}
wr.off += int64(len(wr.buf))
wr.buf = wr.buf[:0]
return
}
func (wr *journalWriter) Close() (err error) {
if err = wr.flush(); err != nil {
return err
}
if cerr := wr.file.Sync(); cerr != nil {
err = cerr
}
if cerr := wr.file.Close(); cerr != nil {
err = cerr
}
openJournals.Delete(wr.path)
return
}
// todo(andy): extensible record format
type jrecord struct {
length uint32
kind jrecordKind
address addr
payload []byte
checksum uint32
}
type jrecordKind uint8
const (
unknownKind jrecordKind = 0
rootHashKind jrecordKind = 1
chunkKind jrecordKind = 2
recKindSz = 1
recLenSz = uint32Size
recMinSz = recLenSz + recKindSz + addrSize + checksumSize
recMaxSz = 128 * 1024 // todo(andy): less arbitrary
rootHashRecordSize = recMinSz
)
func chunkRecordSize(c CompressedChunk) uint32 {
return uint32(len(c.FullCompressedChunk)) + recMinSz
}
func writeChunkRecord(buf []byte, c CompressedChunk) (n uint32) {
l := chunkRecordSize(c)
writeUint(buf[:recLenSz], l)
n += recLenSz
buf[n] = byte(chunkKind)
n += recKindSz
copy(buf[n:], c.H[:])
n += addrSize
copy(buf[n:], c.FullCompressedChunk)
n += uint32(len(c.FullCompressedChunk))
writeUint(buf[n:], crc(buf[:n]))
n += checksumSize
d.PanicIfFalse(l == n)
return
}
func writeRootHashRecord(buf []byte, root addr) (n uint32) {
writeUint(buf[:recLenSz], rootHashRecordSize)
n += recLenSz
buf[n] = byte(rootHashKind)
n += recKindSz
copy(buf[n:], root[:])
n += addrSize
writeUint(buf[n:], crc(buf[:n]))
n += checksumSize
return
}
func readJournalRecord(buf []byte) (rec jrecord) {
rec.length = readUint(buf)
buf = buf[recLenSz:]
rec.kind = jrecordKind(buf[0])
buf = buf[recKindSz:]
copy(rec.address[:], buf)
buf = buf[addrSize:]
rec.payload = buf[:len(buf)-checksumSize]
rec.checksum = readUint(buf[len(buf)-checksumSize:])
return
}
func safeReadJournalRecord(buf []byte) (jrecord, bool) {
o := len(buf) - checksumSize
if crc(buf[:o]) != readUint(buf[o:]) {
return jrecord{}, false
}
rec := readJournalRecord(buf)
switch rec.kind {
case rootHashKind:
return rec, true
case chunkKind:
_, err := NewCompressedChunk(hash.Hash(rec.address), rec.payload)
if err != nil {
return jrecord{}, false
}
return rec, true
default:
return jrecord{}, false
}
}
func processRecords(ctx context.Context, r io.ReadSeeker, cb func(o int64, r jrecord) error) (int64, error) {
var (
buf []byte
off int64
err error
)
rdr := bufio.NewReaderSize(r, journalWriterBuffSize)
for {
// peek to read next record size
if buf, err = rdr.Peek(uint32Size); err != nil {
break
}
l := readUint(buf)
if l < recMinSz || l > recMaxSz {
break
} else if buf, err = rdr.Peek(int(l)); err != nil {
break
}
rec, ok := safeReadJournalRecord(buf)
if !ok {
break // stop if we can't validate |rec|
}
if err = cb(off, rec); err != nil {
break
}
// advance |rdr| state by |l| bytes
if _, err = io.ReadFull(rdr, buf); err != nil {
break
}
off += int64(len(buf))
}
if err != nil && err != io.EOF {
return 0, err
}
// reset the file pointer to end of the last
// successfully processed journal record
if _, err = r.Seek(off, 0); err != nil {
return 0, err
}
return off, nil
}
func readUint(buf []byte) uint32 {
return binary.BigEndian.Uint32(buf)
}
func writeUint(buf []byte, u uint32) {
binary.BigEndian.PutUint32(buf, u)
}

View File

@@ -0,0 +1,289 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"context"
"fmt"
"math/rand"
"path/filepath"
"testing"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type operation struct {
kind opKind
buf []byte
readAt int64
}
type opKind byte
const (
readOp opKind = iota
writeOp
flushOp
)
func TestJournalWriter(t *testing.T) {
tests := []struct {
name string
size int
ops []operation
}{
{
name: "smoke test",
size: 16,
},
{
name: "write to empty file",
size: 16,
ops: []operation{
{kind: writeOp, buf: []byte("lorem")},
{kind: writeOp, buf: []byte("ipsum")},
},
},
{
name: "read from non-empty file",
size: 16,
ops: []operation{
{kind: writeOp, buf: []byte("loremipsum")},
{kind: flushOp},
{kind: readOp, buf: []byte("lorem"), readAt: 0},
{kind: readOp, buf: []byte("ipsum"), readAt: 5},
{kind: readOp, buf: []byte("loremipsum"), readAt: 0},
},
},
{
name: "read new writes",
size: 16,
ops: []operation{
{kind: writeOp, buf: []byte("lorem")},
{kind: readOp, buf: []byte("lorem"), readAt: 0},
{kind: writeOp, buf: []byte("ipsum")},
{kind: readOp, buf: []byte("lorem"), readAt: 0},
{kind: readOp, buf: []byte("ipsum"), readAt: 5},
},
},
{
name: "read flushed writes",
size: 16,
ops: []operation{
{kind: writeOp, buf: []byte("lorem")},
{kind: flushOp},
{kind: readOp, buf: []byte("lorem"), readAt: 0},
{kind: writeOp, buf: []byte("ipsum")},
{kind: readOp, buf: []byte("ipsum"), readAt: 5},
{kind: readOp, buf: []byte("lorem"), readAt: 0},
{kind: flushOp},
},
},
{
name: "read partially flushed writes",
size: 16,
ops: []operation{
{kind: writeOp, buf: []byte("lorem")},
{kind: flushOp},
{kind: writeOp, buf: []byte("ipsum")},
{kind: readOp, buf: []byte("loremipsum"), readAt: 0},
},
},
{
name: "successive writes trigger buffer flush ",
size: 16,
ops: []operation{
{kind: writeOp, buf: []byte("lorem")},
{kind: readOp, buf: []byte("lorem"), readAt: 0},
{kind: writeOp, buf: []byte("ipsum")},
{kind: readOp, buf: []byte("ipsum"), readAt: 5},
{kind: writeOp, buf: []byte("dolor")},
{kind: readOp, buf: []byte("dolor"), readAt: 10},
{kind: writeOp, buf: []byte("sit")}, // triggers a flush
{kind: readOp, buf: []byte("sit"), readAt: 15},
{kind: readOp, buf: []byte("loremipsumdolorsit"), readAt: 0},
{kind: writeOp, buf: []byte("amet")},
{kind: readOp, buf: []byte("amet"), readAt: 18},
{kind: readOp, buf: []byte("loremipsumdolorsitamet"), readAt: 0},
},
},
{
name: "write larger that buffer",
size: 8,
ops: []operation{
{kind: writeOp, buf: []byte("loremipsum")},
{kind: flushOp},
{kind: writeOp, buf: []byte("dolorsitamet")},
{kind: readOp, buf: []byte("dolorsitamet"), readAt: 10},
{kind: readOp, buf: []byte("loremipsumdolorsitamet"), readAt: 0},
},
},
{
name: "flush empty buffer",
size: 16,
ops: []operation{
{kind: writeOp, buf: []byte("loremipsum")},
{kind: flushOp},
},
},
{
name: "double flush write",
size: 16,
ops: []operation{
{kind: writeOp, buf: []byte("loremipsum")},
{kind: flushOp},
{kind: writeOp, buf: []byte("dolor")},
{kind: flushOp},
{kind: flushOp},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
j, err := openJournalWriter(ctx, newTestFilePath(t))
require.NoError(t, err)
var off int64
for i, op := range test.ops {
switch op.kind {
case readOp:
act := make([]byte, len(op.buf))
n, err := j.ReadAt(act, op.readAt)
assert.NoError(t, err, "operation %d errored", i)
assert.Equal(t, len(op.buf), n, "operation %d failed", i)
assert.Equal(t, op.buf, act, "operation %d failed", i)
case writeOp:
n, err := j.Write(op.buf)
assert.NoError(t, err, "operation %d errored", i)
assert.Equal(t, len(op.buf), n, "operation %d failed", i)
off += int64(n)
case flushOp:
err = j.flush()
assert.NoError(t, err, "operation %d errored", i)
default:
t.Fatal("unknown opKind")
}
assert.Equal(t, off, j.offset())
}
assert.NoError(t, j.Close())
})
}
}
func TestJournalWriterWriteChunk(t *testing.T) {
ctx := context.Background()
j, err := openJournalWriter(ctx, newTestFilePath(t))
require.NoError(t, err)
data := randomCompressedChunks()
lookups := make(map[addr]jrecordLookup)
for a, cc := range data {
l, err := j.writeChunk(cc)
require.NoError(t, err)
lookups[a] = l
validateLookup(t, j, l, cc)
}
for a, l := range lookups {
validateLookup(t, j, l, data[a])
}
require.NoError(t, j.Close())
}
func TestJournalWriterBootstrap(t *testing.T) {
ctx := context.Background()
path := newTestFilePath(t)
j, err := openJournalWriter(ctx, path)
require.NoError(t, err)
data := randomCompressedChunks()
lookups := make(map[addr]jrecordLookup)
for a, cc := range data {
l, err := j.writeChunk(cc)
require.NoError(t, err)
lookups[a] = l
}
assert.NoError(t, j.Close())
j, err = openJournalWriter(ctx, path)
require.NoError(t, err)
_, source, err := j.bootstrapJournal(ctx)
require.NoError(t, err)
for a, l := range lookups {
validateLookup(t, j, l, data[a])
}
for a, cc := range data {
buf, err := source.get(ctx, a, nil)
require.NoError(t, err)
ch, err := cc.ToChunk()
require.NoError(t, err)
assert.Equal(t, ch.Data(), buf)
}
require.NoError(t, j.Close())
}
func validateLookup(t *testing.T, j *journalWriter, l jrecordLookup, cc CompressedChunk) {
b := make([]byte, l.length)
n, err := j.ReadAt(b, l.offset)
require.NoError(t, err)
assert.Equal(t, int(l.length), n)
rec := readJournalRecord(b)
assert.Equal(t, hash.Hash(rec.address), cc.Hash())
assert.Equal(t, rec.payload, cc.FullCompressedChunk)
}
func TestJournalWriterSyncClose(t *testing.T) {
ctx := context.Background()
j, err := openJournalWriter(ctx, newTestFilePath(t))
require.NoError(t, err)
_, _, err = j.bootstrapJournal(ctx)
require.NoError(t, err)
// close triggers flush
n, err := j.Write([]byte("sit"))
require.NoError(t, err)
assert.Equal(t, 3, n)
err = j.Close()
require.NoError(t, err)
assert.Equal(t, 0, len(j.buf))
assert.Equal(t, 3, int(j.off))
}
func newTestFilePath(t *testing.T) string {
name := fmt.Sprintf("journal%d.log", rand.Intn(65536))
return filepath.Join(t.TempDir(), name)
}
func randomCompressedChunks() (compressed map[addr]CompressedChunk) {
buf := make([]byte, 1024*1024)
rand.Read(buf)
compressed = make(map[addr]CompressedChunk)
for {
k := rand.Intn(51) + 50
if k >= len(buf) {
return
}
c := chunks.NewChunk(buf[:k])
buf = buf[k:]
compressed[addr(c.Hash())] = ChunkToCompressedChunk(c)
}
}

View File

@@ -635,6 +635,10 @@ func (ftp fakeTablePersister) PruneTableFiles(_ context.Context, _ manifestConte
return chunks.ErrUnsupportedOperation
}
func (ftp fakeTablePersister) Close() error {
return nil
}
func extractAllChunks(ctx context.Context, src chunkSource, cb func(rec extractRecord)) (err error) {
var index tableIndex
if index, err = src.index(); err != nil {

View File

@@ -524,26 +524,29 @@ func NewLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSi
func newLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSize uint64, maxTables int, q MemoryQuotaProvider) (*NomsBlockStore, error) {
cacheOnce.Do(makeGlobalCaches)
err := checkDir(dir)
if err != nil {
if err := checkDir(dir); err != nil {
return nil, err
}
m, err := getFileManifest(ctx, dir)
if err != nil {
return nil, err
}
mm := makeManifestManager(m)
p := newFSTablePersister(dir, globalFDCache, q)
nbs, err := newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{maxTables}, memTableSize)
if chunkJournalFeatureFlag {
j, err := newChunkJournal(ctx, dir, m)
if err != nil {
return nil, err
}
m, p = j, j
}
mm := makeManifestManager(m)
nbs, err := newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{maxTables}, memTableSize)
if err != nil {
return nil, err
}
return nbs, nil
}
@@ -1163,8 +1166,14 @@ func (nbs *NomsBlockStore) Version() string {
return nbs.upstream.nbfVers
}
func (nbs *NomsBlockStore) Close() error {
return nbs.tables.close()
func (nbs *NomsBlockStore) Close() (err error) {
if cerr := nbs.p.Close(); cerr != nil {
err = cerr
}
if cerr := nbs.tables.close(); cerr != nil {
err = cerr
}
return
}
func (nbs *NomsBlockStore) Stats() interface{} {
@@ -1300,11 +1309,11 @@ func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) {
if !ok {
return uint64(0), errors.New("manifest referenced table file for which there is no chunkSource.")
}
ti, err := cs.index()
sz, err := cs.size()
if err != nil {
return uint64(0), fmt.Errorf("error getting table file index for chunkSource. %w", err)
}
size += ti.tableFileSize()
size += sz
}
return size, nil
}

View File

@@ -41,6 +41,10 @@ import (
)
func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, nomsDir string, q MemoryQuotaProvider) {
if chunkJournalFeatureFlag {
t.Skip()
}
ctx := context.Background()
nomsDir = filepath.Join(tempfiles.MovableTempFileProvider.GetTempDir(), "noms_"+uuid.New().String()[:8])
err := os.MkdirAll(nomsDir, os.ModePerm)

View File

@@ -27,6 +27,7 @@ import (
"crypto/sha512"
"encoding/binary"
"errors"
"io"
"sort"
"time"
)
@@ -55,6 +56,8 @@ type tablePersister interface {
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
PruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error
io.Closer
}
type chunkSourcesByAscendingCount struct {

View File

@@ -79,6 +79,9 @@ func (cmp CompressedChunk) ToChunk() (chunks.Chunk, error) {
func ChunkToCompressedChunk(chunk chunks.Chunk) CompressedChunk {
compressed := snappy.Encode(nil, chunk.Data())
length := len(compressed)
// todo: this append allocates a new buffer and copies |compressed|.
// This is costly, but maybe better, as it allows us to reclaim the
// extra space allocated in snappy.Encode (see snappy.MaxEncodedLen).
compressed = append(compressed, []byte{0, 0, 0, 0}...)
binary.BigEndian.PutUint32(compressed[length:], crc(compressed[:length]))
return CompressedChunk{H: chunk.Hash(), FullCompressedChunk: compressed, CompressedData: compressed[:length]}
@@ -94,6 +97,11 @@ func (cmp CompressedChunk) IsEmpty() bool {
return len(cmp.CompressedData) == 0 || (len(cmp.CompressedData) == 1 && cmp.CompressedData[0] == 0)
}
// CompressedSize returns the size of this CompressedChunk.
func (cmp CompressedChunk) CompressedSize() int {
return len(cmp.CompressedData)
}
var EmptyCompressedChunk CompressedChunk
func init() {

View File

@@ -231,11 +231,11 @@ func (ts tableSet) uncompressedLen() (uint64, error) {
func (ts tableSet) physicalLen() (uint64, error) {
f := func(css chunkSourceSet) (data uint64, err error) {
for _, haver := range css {
index, err := haver.index()
sz, err := haver.size()
if err != nil {
return 0, err
}
data += index.tableFileSize()
data += sz
}
return
}
@@ -417,29 +417,26 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats)
func (ts tableSet) toSpecs() ([]tableSpec, error) {
tableSpecs := make([]tableSpec, 0, ts.Size())
for _, src := range ts.novel {
cnt, err := src.count()
if err != nil {
return nil, err
for a, src := range ts.novel {
if _, ok := ts.upstream[a]; ok {
continue
}
if cnt > 0 {
cnt, err := src.count()
if err != nil {
return nil, err
} else if cnt > 0 {
h := src.hash()
tableSpecs = append(tableSpecs, tableSpec{h, cnt})
}
}
for _, src := range ts.upstream {
cnt, err := src.count()
if err != nil {
return nil, err
}
if cnt <= 0 {
} else if cnt <= 0 {
return nil, errors.New("no upstream chunks")
}
h := src.hash()
tableSpecs = append(tableSpecs, tableSpec{h, cnt})
}