Merge pull request #4959 from dolthub/taylor/gc

First pass at online GC
This commit is contained in:
Taylor Bantle
2023-01-25 14:01:36 -08:00
committed by GitHub
26 changed files with 224 additions and 84 deletions

View File

@@ -112,6 +112,7 @@ const (
MinParentsFlag = "min-parents"
DecorateFlag = "decorate"
OneLineFlag = "oneline"
ShallowFlag = "shallow"
)
const (
@@ -306,6 +307,12 @@ func CreateLogArgParser() *argparser.ArgParser {
return ap
}
func CreateGCArgParser() *argparser.ArgParser {
ap := argparser.NewArgParser()
ap.SupportsFlag(ShallowFlag, "s", "perform a fast, but incomplete garbage collection pass")
return ap
}
var awsParams = []string{dbfactory.AWSRegionParam, dbfactory.AWSCredsTypeParam, dbfactory.AWSCredsFileParam, dbfactory.AWSCredsProfile}
var ossParams = []string{dbfactory.OSSCredsFileParam, dbfactory.OSSCredsProfile}

View File

@@ -32,10 +32,6 @@ import (
"github.com/dolthub/dolt/go/store/nbs"
)
const (
gcShallowFlag = "shallow"
)
var gcDocs = cli.CommandDocumentationContent{
ShortDesc: "Cleans up unreferenced data from the repository.",
LongDesc: `Searches the repository for data that is no longer referenced and no longer needed.
@@ -75,9 +71,7 @@ func (cmd GarbageCollectionCmd) Docs() *cli.CommandDocumentation {
}
func (cmd GarbageCollectionCmd) ArgParser() *argparser.ArgParser {
ap := argparser.NewArgParser()
ap.SupportsFlag(gcShallowFlag, "s", "perform a fast, but incomplete garbage collection pass")
return ap
return cli.CreateGCArgParser()
}
// EventType returns the type of the event to log
@@ -99,7 +93,7 @@ func (cmd GarbageCollectionCmd) Exec(ctx context.Context, commandStr string, arg
}
var err error
if apr.Contains(gcShallowFlag) {
if apr.Contains(cli.ShallowFlag) {
err = dEnv.DoltDB.ShallowGC(ctx)
if err != nil {
if err == chunks.ErrUnsupportedOperation {

View File

@@ -97,6 +97,7 @@ func (fact OSSFactory) newChunkStore(ctx context.Context, nbf *types.NomsBinForm
if err != nil {
return nil, errors.New("failed to initialize oss blob store")
}
q := nbs.NewUnlimitedMemQuotaProvider()
return nbs.NewBSStore(ctx, nbf.VersionString(), bs, defaultMemTableSize, q)
}

View File

@@ -241,7 +241,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
if h.role == RolePrimary {
if err == nil {
h.currentError = nil
lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
lgr.Tracef("cluster/commithook: successfully Committed chunks on destDB")
h.lastPushedHead = toPush
h.lastSuccess = incomingTime
h.nextPushAttempt = time.Time{}

View File

@@ -929,7 +929,7 @@ func (p DoltDatabaseProvider) GetRevisionForRevisionDatabase(ctx *sql.Context, d
// IsRevisionDatabase returns true if the specified dbName represents a database that is tied to a specific
// branch or commit from a database (e.g. "dolt/branch1").
func (p DoltDatabaseProvider) IsRevisionDatabase(ctx *sql.Context, dbName string) (bool, error) {
dbName, revision, err := p.GetRevisionForRevisionDatabase(ctx, dbName)
_, revision, err := p.GetRevisionForRevisionDatabase(ctx, dbName)
if err != nil {
return false, err
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
)
@@ -47,15 +48,31 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
return cmdFailure, err
}
apr, err := cli.CreateGCArgParser().Parse(args)
if err != nil {
return cmdFailure, err
}
if apr.NArg() != 0 {
return cmdFailure, InvalidArgErr
}
dSess := dsess.DSessFromSess(ctx.Session)
ddb, ok := dSess.GetDoltDB(ctx, dbName)
if !ok {
return cmdFailure, fmt.Errorf("Could not load database %s", dbName)
}
err := ddb.ShallowGC(ctx)
if err != nil {
return cmdFailure, err
if apr.Contains(cli.ShallowFlag) {
err = ddb.ShallowGC(ctx)
if err != nil {
return cmdFailure, err
}
} else {
err = ddb.GC(ctx)
if err != nil {
return cmdFailure, err
}
}
return cmdSuccess, nil

View File

@@ -1018,9 +1018,6 @@ func TestDoltReset(t *testing.T) {
}
func TestDoltGC(t *testing.T) {
// TODO: This does not work because `db.chunkStore().(nbs.TableFileStore)`
// returns not ok in PruneTableFiles
t.Skip()
for _, script := range DoltGC {
enginetest.TestScript(t, newDoltHarness(t), script)
}

View File

@@ -1832,12 +1832,28 @@ func gcSetup() []string {
var DoltGC = []queries.ScriptTest{
{
Name: "base case: shallow gc",
Name: "base case: gc",
SetUpScript: gcSetup(),
Assertions: []queries.ScriptTestAssertion{
{
Query: "CALL DOLT_GC(null);",
ExpectedErrStr: "error: invalid usage",
},
{
Query: "CALL DOLT_GC('bad', '--shallow');",
ExpectedErrStr: "error: invalid usage",
},
{
Query: "CALL DOLT_GC('--shallow');",
Expected: []sql.Row{{1}},
},
{
Query: "CALL DOLT_GC();",
Expected: []sql.Row{{0}},
Expected: []sql.Row{{1}},
},
{
Query: "CALL DOLT_GC();",
ExpectedErrStr: "no changes since last gc",
},
},
},

View File

@@ -1698,4 +1698,46 @@ var DoltConstraintViolationTransactionTests = []queries.TransactionTest{
},
},
},
{
Name: "Run GC concurrently with other transactions",
SetUpScript: gcSetup(),
Assertions: []queries.ScriptTestAssertion{
{
Query: "/* client a */ SELECT count(*) FROM t;",
Expected: []sql.Row{{250}},
},
{
Query: "/* client a */ START TRANSACTION",
Expected: []sql.Row{},
},
{
Query: "/* client b */ START TRANSACTION",
Expected: []sql.Row{},
},
{
Query: "/* client a */ CALL DOLT_GC();",
Expected: []sql.Row{{1}},
},
{
Query: "/* client b */ INSERT into t VALUES (300);",
Expected: []sql.Row{{types.NewOkResult(1)}},
},
{
Query: "/* client a */ COMMIT;",
Expected: []sql.Row{},
},
{
Query: "/* client b */ COMMIT;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ SELECT count(*) FROM t;",
Expected: []sql.Row{{251}},
},
{
Query: "/* client b */ SELECT count(*) FROM t;",
Expected: []sql.Row{{251}},
},
},
},
}

View File

@@ -42,6 +42,7 @@ type GCSBlobstore struct {
var _ Blobstore = &GCSBlobstore{}
// NewGCSBlobstore creates a new instance of a GCSBlobstore
func NewGCSBlobstore(gcs *storage.Client, bucketName, prefix string) *GCSBlobstore {
for len(prefix) > 0 && prefix[0] == '/' {
prefix = prefix[1:]

View File

@@ -36,7 +36,7 @@ type ChunkStoreTestSuite struct {
Factory *memoryStoreFactory
}
func getAddrsCb(ctx context.Context, c Chunk) (hash.HashSet, error) {
func noopGetAddrs(ctx context.Context, c Chunk) (hash.HashSet, error) {
return nil, nil
}
@@ -45,7 +45,7 @@ func (suite *ChunkStoreTestSuite) TestChunkStorePut() {
store := suite.Factory.CreateStore(ctx, "ns")
input := "abc"
c := NewChunk([]byte(input))
err := store.Put(ctx, c, getAddrsCb)
err := store.Put(ctx, c, noopGetAddrs)
suite.NoError(err)
h := c.Hash()
@@ -91,7 +91,7 @@ func (suite *ChunkStoreTestSuite) TestChunkStoreCommitPut() {
store := suite.Factory.CreateStore(context.Background(), name)
input := "abc"
c := NewChunk([]byte(input))
err := store.Put(context.Background(), c, getAddrsCb)
err := store.Put(context.Background(), c, noopGetAddrs)
suite.NoError(err)
h := c.Hash()
@@ -133,7 +133,7 @@ func (suite *ChunkStoreTestSuite) TestChunkStoreCommitUnchangedRoot() {
store1, store2 := suite.Factory.CreateStore(context.Background(), "ns"), suite.Factory.CreateStore(context.Background(), "ns")
input := "abc"
c := NewChunk([]byte(input))
err := store1.Put(context.Background(), c, getAddrsCb)
err := store1.Put(context.Background(), c, noopGetAddrs)
suite.NoError(err)
h := c.Hash()

View File

@@ -220,6 +220,7 @@ func (ms *MemoryStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCb
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.pending == nil {
ms.pending = map[hash.Hash]Chunk{}
}

View File

@@ -62,6 +62,7 @@ type Database interface {
GetDataset(ctx context.Context, datasetID string) (Dataset, error)
GetDatasetsByRootHash(ctx context.Context, rootHash hash.Hash) (DatasetsMap, error)
// Commit updates the Commit that ds.ID() in this database points at. All
// Values that have been written to this Database are guaranteed to be
// persistent after Commit() returns successfully.

View File

@@ -296,7 +296,7 @@ func (db *database) doSetHead(ctx context.Context, ds Dataset, addr hash.Hash) e
return err
}
if !iscommit {
return fmt.Errorf("SetHead failed: reffered to value is not a tag:")
return fmt.Errorf("SetHead failed: referred to value is not a tag:")
}
default:
return fmt.Errorf("Unrecognized dataset value: %s", headType)
@@ -855,7 +855,7 @@ func (db *database) validateRefAsCommit(ctx context.Context, r types.Ref) (types
return types.Struct{}, fmt.Errorf("validateRefAsCommit: unable to validate ref; %s not found", r.TargetHash().String())
}
if rHead.TypeName() != commitName {
return types.Struct{}, fmt.Errorf("validateRefAsCommit: referred valus is not a commit")
return types.Struct{}, fmt.Errorf("validateRefAsCommit: referred values is not a commit")
}
var v types.Value
@@ -868,7 +868,7 @@ func (db *database) validateRefAsCommit(ctx context.Context, r types.Ref) (types
}
if !is {
return types.Struct{}, fmt.Errorf("validateRefAsCommit: referred valus is not a commit")
return types.Struct{}, fmt.Errorf("validateRefAsCommit: referred values is not a commit")
}
return v.(types.Struct), nil

View File

@@ -43,7 +43,7 @@ func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.Tes
return true
}
func getAddrsCb(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
func noopGetAddrs(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
}
@@ -54,11 +54,11 @@ func writeToEmptyStore(store chunks.ChunkStore, src *dataSource, t assert.Testin
chunx := goReadChunks(src)
for c := range chunx {
err := store.Put(context.Background(), *c, getAddrsCb)
err := store.Put(context.Background(), *c, noopGetAddrs)
assert.NoError(t, err)
}
newRoot := chunks.NewChunk([]byte("root"))
err = store.Put(context.Background(), newRoot, getAddrsCb)
err = store.Put(context.Background(), newRoot, noopGetAddrs)
assert.NoError(t, err)
success, err := store.Commit(context.Background(), newRoot.Hash(), root)
assert.NoError(t, err)
@@ -82,7 +82,7 @@ func benchmarkNoRefreshWrite(openStore storeOpenFn, src *dataSource, t assert.Te
assert.NoError(t, err)
chunx := goReadChunks(src)
for c := range chunx {
err := store.Put(context.Background(), *c, getAddrsCb)
err := store.Put(context.Background(), *c, noopGetAddrs)
assert.NoError(t, err)
}
assert.NoError(t, store.Close())

View File

@@ -115,14 +115,14 @@ func (suite *BlockStoreSuite) TestChunkStoreNotDir() {
suite.Error(err)
}
func getAddrsCb(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
func noopGetAddrs(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
}
func (suite *BlockStoreSuite) TestChunkStorePut() {
input := []byte("abc")
c := chunks.NewChunk(input)
err := suite.store.Put(context.Background(), c, getAddrsCb)
err := suite.store.Put(context.Background(), c, noopGetAddrs)
suite.NoError(err)
h := c.Hash()
@@ -143,7 +143,7 @@ func (suite *BlockStoreSuite) TestChunkStorePut() {
// Re-writing the same data should cause a second put
c = chunks.NewChunk(input)
err = suite.store.Put(context.Background(), c, getAddrsCb)
err = suite.store.Put(context.Background(), c, noopGetAddrs)
suite.NoError(err)
suite.Equal(h, c.Hash())
assertInputInStore(input, h, suite.store, suite.Assert())
@@ -171,9 +171,9 @@ func (suite *BlockStoreSuite) TestChunkStorePut() {
func (suite *BlockStoreSuite) TestChunkStorePutMany() {
input1, input2 := []byte("abc"), []byte("def")
c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2)
err := suite.store.Put(context.Background(), c1, getAddrsCb)
err := suite.store.Put(context.Background(), c1, noopGetAddrs)
suite.NoError(err)
err = suite.store.Put(context.Background(), c2, getAddrsCb)
err = suite.store.Put(context.Background(), c2, noopGetAddrs)
suite.NoError(err)
rt, err := suite.store.Root(context.Background())
@@ -193,9 +193,9 @@ func (suite *BlockStoreSuite) TestChunkStorePutMany() {
func (suite *BlockStoreSuite) TestChunkStoreStatsSummary() {
input1, input2 := []byte("abc"), []byte("def")
c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2)
err := suite.store.Put(context.Background(), c1, getAddrsCb)
err := suite.store.Put(context.Background(), c1, noopGetAddrs)
suite.NoError(err)
err = suite.store.Put(context.Background(), c2, getAddrsCb)
err = suite.store.Put(context.Background(), c2, noopGetAddrs)
suite.NoError(err)
rt, err := suite.store.Root(context.Background())
@@ -216,9 +216,9 @@ func (suite *BlockStoreSuite) TestChunkStorePutMoreThanMemTable() {
_, err = rand.Read(input2)
suite.NoError(err)
c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2)
err = suite.store.Put(context.Background(), c1, getAddrsCb)
err = suite.store.Put(context.Background(), c1, noopGetAddrs)
suite.NoError(err)
err = suite.store.Put(context.Background(), c2, getAddrsCb)
err = suite.store.Put(context.Background(), c2, noopGetAddrs)
suite.NoError(err)
rt, err := suite.store.Root(context.Background())
@@ -247,7 +247,7 @@ func (suite *BlockStoreSuite) TestChunkStoreGetMany() {
chnx := make([]chunks.Chunk, len(inputs))
for i, data := range inputs {
chnx[i] = chunks.NewChunk(data)
err = suite.store.Put(context.Background(), chnx[i], getAddrsCb)
err = suite.store.Put(context.Background(), chnx[i], noopGetAddrs)
suite.NoError(err)
}
@@ -287,7 +287,7 @@ func (suite *BlockStoreSuite) TestChunkStoreHasMany() {
chunks.NewChunk([]byte("def")),
}
for _, c := range chnx {
err := suite.store.Put(context.Background(), c, getAddrsCb)
err := suite.store.Put(context.Background(), c, noopGetAddrs)
suite.NoError(err)
}
@@ -320,7 +320,7 @@ func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() {
interloper, err := suite.factory(context.Background(), suite.dir)
suite.NoError(err)
err = interloper.Put(context.Background(), c1, getAddrsCb)
err = interloper.Put(context.Background(), c1, noopGetAddrs)
suite.NoError(err)
h, err := interloper.Root(context.Background())
suite.NoError(err)
@@ -328,7 +328,7 @@ func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() {
suite.NoError(err)
suite.True(success)
err = suite.store.Put(context.Background(), c2, getAddrsCb)
err = suite.store.Put(context.Background(), c2, noopGetAddrs)
suite.NoError(err)
h, err = suite.store.Root(context.Background())
suite.NoError(err)
@@ -369,7 +369,7 @@ func (suite *BlockStoreSuite) TestChunkStoreRebaseOnNoOpFlush() {
interloper, err := suite.factory(context.Background(), suite.dir)
suite.NoError(err)
err = interloper.Put(context.Background(), c1, getAddrsCb)
err = interloper.Put(context.Background(), c1, noopGetAddrs)
suite.NoError(err)
root, err := interloper.Root(context.Background())
suite.NoError(err)
@@ -408,7 +408,7 @@ func (suite *BlockStoreSuite) TestChunkStorePutWithRebase() {
interloper, err := suite.factory(context.Background(), suite.dir)
suite.NoError(err)
err = interloper.Put(context.Background(), c1, getAddrsCb)
err = interloper.Put(context.Background(), c1, noopGetAddrs)
suite.NoError(err)
h, err := interloper.Root(context.Background())
suite.NoError(err)
@@ -416,7 +416,7 @@ func (suite *BlockStoreSuite) TestChunkStorePutWithRebase() {
suite.NoError(err)
suite.True(success)
err = suite.store.Put(context.Background(), c2, getAddrsCb)
err = suite.store.Put(context.Background(), c2, noopGetAddrs)
suite.NoError(err)
// Reading c2 via the API should work pre-rebase
@@ -515,7 +515,7 @@ func testBlockStoreConjoinOnCommit(t *testing.T, factory func(t *testing.T) tabl
root, err := smallTableStore.Root(context.Background())
require.NoError(t, err)
err = smallTableStore.Put(context.Background(), newChunk, getAddrsCb)
err = smallTableStore.Put(context.Background(), newChunk, noopGetAddrs)
require.NoError(t, err)
success, err := smallTableStore.Commit(context.Background(), newChunk.Hash(), root)
require.NoError(t, err)
@@ -547,7 +547,7 @@ func testBlockStoreConjoinOnCommit(t *testing.T, factory func(t *testing.T) tabl
root, err := smallTableStore.Root(context.Background())
require.NoError(t, err)
err = smallTableStore.Put(context.Background(), newChunk, getAddrsCb)
err = smallTableStore.Put(context.Background(), newChunk, noopGetAddrs)
require.NoError(t, err)
success, err := smallTableStore.Commit(context.Background(), newChunk.Hash(), root)
require.NoError(t, err)
@@ -584,7 +584,7 @@ func testBlockStoreConjoinOnCommit(t *testing.T, factory func(t *testing.T) tabl
root, err := smallTableStore.Root(context.Background())
require.NoError(t, err)
err = smallTableStore.Put(context.Background(), newChunk, getAddrsCb)
err = smallTableStore.Put(context.Background(), newChunk, noopGetAddrs)
require.NoError(t, err)
success, err := smallTableStore.Commit(context.Background(), newChunk.Hash(), root)
require.NoError(t, err)

View File

@@ -158,7 +158,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
}
}
// No appendix change occured, so we remove the appendix
// No appendix change occurred, so we remove the appendix
// on the "latest" upstream which will be added back
// before the conjoin completes
upstream, appendixSpecs = upstream.removeAppendixSpecs()

View File

@@ -131,7 +131,7 @@ func requireChunks(t *testing.T, ctx context.Context, chunks []chunks.Chunk, gen
func putChunks(t *testing.T, ctx context.Context, chunks []chunks.Chunk, cs chunks.ChunkStore, indexesIn map[int]bool, chunkIndexes ...int) {
for _, idx := range chunkIndexes {
err := cs.Put(ctx, chunks[idx], getAddrsCb)
err := cs.Put(ctx, chunks[idx], noopGetAddrs)
require.NoError(t, err)
indexesIn[idx] = true
}

View File

@@ -113,7 +113,7 @@ func TestChunkStoreCommit(t *testing.T) {
newRootChunk := chunks.NewChunk([]byte("new root"))
newRoot := newRootChunk.Hash()
err = store.Put(context.Background(), newRootChunk, getAddrsCb)
err = store.Put(context.Background(), newRootChunk, noopGetAddrs)
require.NoError(t, err)
success, err := store.Commit(context.Background(), newRoot, hash.Hash{})
require.NoError(t, err)
@@ -128,7 +128,7 @@ func TestChunkStoreCommit(t *testing.T) {
secondRootChunk := chunks.NewChunk([]byte("newer root"))
secondRoot := secondRootChunk.Hash()
err = store.Put(context.Background(), secondRootChunk, getAddrsCb)
err = store.Put(context.Background(), secondRootChunk, noopGetAddrs)
require.NoError(t, err)
success, err = store.Commit(context.Background(), secondRoot, newRoot)
require.NoError(t, err)
@@ -241,13 +241,13 @@ func TestChunkStoreManifestPreemptiveOptimisticLockFail(t *testing.T) {
}()
chunk := chunks.NewChunk([]byte("hello"))
err = interloper.Put(context.Background(), chunk, getAddrsCb)
err = interloper.Put(context.Background(), chunk, noopGetAddrs)
require.NoError(t, err)
assert.True(interloper.Commit(context.Background(), chunk.Hash(), hash.Hash{}))
// Try to land a new chunk in store, which should fail AND not persist the contents of store.mt
chunk = chunks.NewChunk([]byte("goodbye"))
err = store.Put(context.Background(), chunk, getAddrsCb)
err = store.Put(context.Background(), chunk, noopGetAddrs)
require.NoError(t, err)
assert.NotNil(store.mt)
assert.False(store.Commit(context.Background(), chunk.Hash(), hash.Hash{}))
@@ -296,7 +296,7 @@ func TestChunkStoreCommitLocksOutFetch(t *testing.T) {
}
rootChunk := chunks.NewChunk([]byte("new root"))
err = store.Put(context.Background(), rootChunk, getAddrsCb)
err = store.Put(context.Background(), rootChunk, noopGetAddrs)
require.NoError(t, err)
h, err := store.Root(context.Background())
require.NoError(t, err)
@@ -352,7 +352,7 @@ func TestChunkStoreSerializeCommits(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := interloper.Put(context.Background(), interloperChunk, getAddrsCb)
err := interloper.Put(context.Background(), interloperChunk, noopGetAddrs)
require.NoError(t, err)
h, err := interloper.Root(context.Background())
require.NoError(t, err)
@@ -364,7 +364,7 @@ func TestChunkStoreSerializeCommits(t *testing.T) {
updateCount++
}
err = store.Put(context.Background(), storeChunk, getAddrsCb)
err = store.Put(context.Background(), storeChunk, noopGetAddrs)
require.NoError(t, err)
h, err := store.Root(context.Background())
require.NoError(t, err)

View File

@@ -57,11 +57,11 @@ func TestStats(t *testing.T) {
c1, c2, c3, c4, c5 := chunks.NewChunk(i1), chunks.NewChunk(i2), chunks.NewChunk(i3), chunks.NewChunk(i4), chunks.NewChunk(i5)
// These just go to mem table, only operation stats
err = store.Put(context.Background(), c1, getAddrsCb)
err = store.Put(context.Background(), c1, noopGetAddrs)
require.NoError(t, err)
err = store.Put(context.Background(), c2, getAddrsCb)
err = store.Put(context.Background(), c2, noopGetAddrs)
require.NoError(t, err)
err = store.Put(context.Background(), c3, getAddrsCb)
err = store.Put(context.Background(), c3, noopGetAddrs)
require.NoError(t, err)
assert.Equal(uint64(3), stats(store).PutLatency.Samples())
assert.Equal(uint64(0), stats(store).PersistLatency.Samples())
@@ -131,14 +131,14 @@ func TestStats(t *testing.T) {
// Force a conjoin
store.c = inlineConjoiner{2}
err = store.Put(context.Background(), c4, getAddrsCb)
err = store.Put(context.Background(), c4, noopGetAddrs)
require.NoError(t, err)
h, err = store.Root(context.Background())
require.NoError(t, err)
_, err = store.Commit(context.Background(), h, h)
require.NoError(t, err)
err = store.Put(context.Background(), c5, getAddrsCb)
err = store.Put(context.Background(), c5, noopGetAddrs)
require.NoError(t, err)
h, err = store.Root(context.Background())
require.NoError(t, err)

View File

@@ -96,6 +96,9 @@ type NomsBlockStore struct {
tables tableSet
upstream manifestContents
cond *sync.Cond
gcInProgress atomic.Bool
mtSize uint64
putCount uint64
@@ -161,6 +164,10 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash
}
func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
@@ -170,9 +177,6 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
}
}()
nbs.mu.Lock()
defer nbs.mu.Unlock()
var updatedContents manifestContents
for {
ok, contents, _, ferr := nbs.mm.Fetch(ctx, nbs.stats)
@@ -242,6 +246,10 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
}
func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
@@ -251,9 +259,6 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat
}
}()
nbs.mu.Lock()
defer nbs.mu.Unlock()
var updatedContents manifestContents
for {
ok, contents, _, ferr := nbs.mm.Fetch(ctx, nbs.stats)
@@ -527,6 +532,7 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, mm manifestManager
mtSize: memTableSize,
stats: NewStats(),
}
nbs.cond = sync.NewCond(&nbs.mu)
t1 := time.Now()
defer nbs.stats.OpenLatency.SampleTimeSince(t1)
@@ -575,6 +581,13 @@ func (nbs *NomsBlockStore) WithoutConjoiner() *NomsBlockStore {
}
}
// Wait for GC to complete to continue with writes
func (nbs *NomsBlockStore) waitForGC() {
for nbs.gcInProgress.Load() {
nbs.cond.Wait()
}
}
func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
t1 := time.Now()
addrs, err := getAddrs(ctx, c)
@@ -598,6 +611,7 @@ func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chu
func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs hash.HashSet) (bool, error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
if nbs.mt == nil {
nbs.mt = newMemTable(nbs.mtSize)
}
@@ -921,6 +935,7 @@ func toHasRecords(hashes hash.HashSet) []hasRecord {
func (nbs *NomsBlockStore) Rebase(ctx context.Context) error {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
exists, contents, _, err := nbs.mm.Fetch(ctx, nbs.stats)
if err != nil {
return err
@@ -996,6 +1011,7 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash,
// all other tables are persisted in updateManifest()
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
if nbs.mt != nil {
cnt, err := nbs.mt.count()
@@ -1020,6 +1036,10 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash,
return false, err
}
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
@@ -1029,8 +1049,6 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash,
}
}()
nbs.mu.Lock()
defer nbs.mu.Unlock()
for {
if err := nbs.updateManifest(ctx, current, last); err == nil {
return true, nil
@@ -1403,6 +1421,7 @@ func (nbs *NomsBlockStore) AddTableFilesToManifest(ctx context.Context, fileIdTo
func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
nbs.mm.LockForUpdate()
defer func() {
@@ -1440,7 +1459,26 @@ func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) {
return nbs.p.PruneTableFiles(ctx, contents, t)
}
func (nbs *NomsBlockStore) setGCInProgress(inProgress bool) bool {
nbs.cond.L.Lock()
defer nbs.cond.L.Unlock()
swapped := nbs.gcInProgress.CompareAndSwap(!inProgress, inProgress)
if swapped {
nbs.cond.Broadcast()
return true
}
return false
}
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest chunks.ChunkStore) error {
swapped := nbs.setGCInProgress(true)
if !swapped {
return errors.New("gc already in progress")
}
defer nbs.setGCInProgress(false)
ops := nbs.SupportedOperations()
if !ops.CanGC || !ops.CanPrune {
return chunks.ErrUnsupportedOperation
@@ -1454,7 +1492,7 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
return errLastRootMismatch
}
// check to see if the specs have changed since last gc. If they haven't bail early.
// check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(last, nbs.upstream.specs, nbs.upstream.appendix)
if nbs.upstream.gcGen == gcGenCheck {
return chunks.ErrNothingToCollect
@@ -1594,7 +1632,7 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
specs: specs,
}
// nothing has changed. Bail early
// nothing has changed. Bail early
if newContents.gcGen == nbs.upstream.gcGen {
return nil
}
@@ -1631,6 +1669,7 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
for {
err := nbs.updateManifest(ctx, root, previous)

View File

@@ -133,7 +133,7 @@ func TestConcurrentPuts(t *testing.T) {
c := makeChunk(uint32(i))
hashes[i] = c.Hash()
errgrp.Go(func() error {
err := st.Put(ctx, c, getAddrsCb)
err := st.Put(ctx, c, noopGetAddrs)
require.NoError(t, err)
return nil
})
@@ -277,7 +277,7 @@ func TestNBSCopyGC(t *testing.T) {
tossers := makeChunkSet(64, 64)
for _, c := range keepers {
err := st.Put(ctx, c, getAddrsCb)
err := st.Put(ctx, c, noopGetAddrs)
require.NoError(t, err)
}
for h, c := range keepers {
@@ -293,7 +293,7 @@ func TestNBSCopyGC(t *testing.T) {
assert.Equal(t, chunks.Chunk{}, c)
}
for _, c := range tossers {
err := st.Put(ctx, c, getAddrsCb)
err := st.Put(ctx, c, noopGetAddrs)
require.NoError(t, err)
}
for h, c := range tossers {
@@ -363,7 +363,7 @@ func prepStore(ctx context.Context, t *testing.T, assert *assert.Assertions) (*f
rootChunk := chunks.NewChunk([]byte("root"))
rootHash := rootChunk.Hash()
err = store.Put(ctx, rootChunk, getAddrsCb)
err = store.Put(ctx, rootChunk, noopGetAddrs)
require.NoError(t, err)
success, err := store.Commit(ctx, rootHash, hash.Hash{})
require.NoError(t, err)
@@ -562,7 +562,7 @@ func TestNBSCommitRetainsAppendix(t *testing.T) {
// Make second Commit
secondRootChunk := chunks.NewChunk([]byte("newer root"))
secondRoot := secondRootChunk.Hash()
err = store.Put(ctx, secondRootChunk, getAddrsCb)
err = store.Put(ctx, secondRootChunk, noopGetAddrs)
require.NoError(t, err)
success, err := store.Commit(ctx, secondRoot, rootChunk.Hash())
require.NoError(t, err)

View File

@@ -468,7 +468,7 @@ func (t *testProtocol) NewDatabase(sp Spec) (datas.Database, error) {
return datas.NewDatabase(cs), nil
}
func getAddrsCb(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
func noopGetAddrs(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
}
@@ -485,7 +485,7 @@ func TestExternalProtocol(t *testing.T) {
cs := sp.NewChunkStore(context.Background())
assert.Equal("foo", tp.name)
c := chunks.NewChunk([]byte("hi!"))
err = cs.Put(context.Background(), c, getAddrsCb)
err = cs.Put(context.Background(), c, noopGetAddrs)
assert.NoError(err)
ok, err := cs.Has(context.Background(), c.Hash())
assert.NoError(err)

View File

@@ -146,7 +146,7 @@ func (w *parallelRefWalker) Close() error {
// |parallelRefWalker| provides a way to walk the |Ref|s in a |ValueSlice|
// using background worker threads to exploit hardware parallelism in cases
// where walking the merkle-DAG can become CPU bound. Construct a
// |parllelRefWalker| with a configured level of |concurrency| and then call
// |parallelRefWalker| with a configured level of |concurrency| and then call
// |GetRefs(hash.HashSet, ValueSlice)| with the |ValueSlice| to get back a
// slice of |hash.Hash| for all the |Ref|s which appear in the values of
// |ValueSlice|. |GetRefs| will not return any |Ref|s which already appear in

View File

@@ -524,8 +524,6 @@ func (lvs *ValueStore) bufferChunk(ctx context.Context, v Value, c chunks.Chunk,
}
err := putChildren(tallest)
// TODO: fix panics
if err != nil {
return err
}
@@ -783,7 +781,7 @@ func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.Hash
// closes, it signals to NBSStore.MarkAndSweepChunks that we
// are done walking the references. If gcProcessRefs returns an
// error, we did not successfully walk all references and we do
// not want MarkAndSweepChunks finishing its work, swaping
// not want MarkAndSweepChunks finishing its work, swapping
// table files, etc. It would be racing with returning an error
// here. Instead, we have returned the error above and that
// will force it to fail when the errgroup ctx fails.

View File

@@ -320,6 +320,32 @@ skip_if_chunk_journal() {
[ "$BEFORE" -gt "$AFTER" ]
}
@test "garbage_collection: online gc" {
dolt sql <<SQL
CREATE TABLE test (pk int PRIMARY KEY);
INSERT INTO test VALUES (1),(2),(3),(4),(5);
CALL DOLT_COMMIT('-Am', 'added values 1-5');
INSERT INTO test VALUES (6),(7),(8);
CALL DOLT_RESET('--hard');
INSERT INTO test VALUES (11),(12),(13),(14),(15);
SQL
BEFORE=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g')
run dolt sql -q "call dolt_gc();"
[ "$status" -eq 0 ]
run dolt sql -q "SELECT sum(pk) FROM test;"
[ "$status" -eq 0 ]
[[ "$output" =~ "80" ]] || false
AFTER=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g')
# assert space was reclaimed
echo "$BEFORE"
echo "$AFTER"
[ "$BEFORE" -gt "$AFTER" ]
}
@test "garbage_collection: online shallow gc" {
skip_if_chunk_journal
create_many_commits
@@ -328,7 +354,7 @@ skip_if_chunk_journal() {
dolt sql -q "INSERT INTO test VALUES ($(($NUM_COMMITS+1))),($(($NUM_COMMITS+2))),($(($NUM_COMMITS+3)));"
BEFORE=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g')
run dolt sql -q "call dolt_gc();"
run dolt sql -q "call dolt_gc('--shallow');"
[ "$status" -eq 0 ]
run dolt sql -q "select count(*) from test"