From 54efef5c16e301ae90cfe08b5e8f007ab5e8dfb0 Mon Sep 17 00:00:00 2001 From: zachmu Date: Thu, 5 Aug 2021 19:54:01 +0000 Subject: [PATCH 1/2] [ga-bump-dep] Bump dependency in Dolt by zachmu --- go/go.mod | 4 ++-- go/go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go/go.mod b/go/go.mod index 12c2d06f6e..dad1657345 100644 --- a/go/go.mod +++ b/go/go.mod @@ -18,11 +18,11 @@ require ( github.com/denisbrodbeck/machineid v1.0.1 github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi v0.0.0-20201005193433-3ee972b1d078 github.com/dolthub/fslock v0.0.2 - github.com/dolthub/go-mysql-server v0.10.1-0.20210729205204-aca74186be10 + github.com/dolthub/go-mysql-server v0.10.1-0.20210805195243-614999856aff github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446 github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 - github.com/dolthub/vitess v0.0.0-20210720213737-d3d2404e7683 + github.com/dolthub/vitess v0.0.0-20210802203534-01de666843ce github.com/dustin/go-humanize v1.0.0 github.com/fatih/color v1.9.0 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 diff --git a/go/go.sum b/go/go.sum index 59b386f6d6..f3f3f3abee 100644 --- a/go/go.sum +++ b/go/go.sum @@ -142,16 +142,16 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dolthub/fslock v0.0.2 h1:8vUh47iKovgrtXNrXVIzsIoWLlspoXg+3nslhUzgKSw= github.com/dolthub/fslock v0.0.2/go.mod h1:0i7bsNkK+XHwFL3dIsSWeXSV7sykVzzVr6+jq8oeEo0= -github.com/dolthub/go-mysql-server v0.10.1-0.20210729205204-aca74186be10 h1:c/bRO3EW/HnJmYU+hrMXYTrlN/srxP+4iFVoAayJZq8= -github.com/dolthub/go-mysql-server v0.10.1-0.20210729205204-aca74186be10/go.mod h1:+GYveCPU+ONs9xEqvu2PDKpavKGMhqymkQ8cdrcJkYk= +github.com/dolthub/go-mysql-server v0.10.1-0.20210805195243-614999856aff h1:aA9UOgYWpu4oJhYIibxN052rOOr6vvdOkdpU6JliUEk= +github.com/dolthub/go-mysql-server v0.10.1-0.20210805195243-614999856aff/go.mod h1:cPg39xeFH8/+McnJxncb79SgUuREeIqR+eTvxE6OmXc= github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446 h1:0ol5pj+QlKUKAtqs1LiPM3ZJKs+rHPgLSsMXmhTrCAM= github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms= github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 h1:WRPDbpJWEnPxPmiuOTndT+lUWUeGjx6eoNOK9O4tQQQ= github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66/go.mod h1:N5ZIbMGuDUpTpOFQ7HcsN6WSIpTGQjHP+Mz27AfmAgk= github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE= github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY= -github.com/dolthub/vitess v0.0.0-20210720213737-d3d2404e7683 h1:OPX0jAwe68Ux5WBQSXKdtTSkm+CKIMumnkl0NgFF9TI= -github.com/dolthub/vitess v0.0.0-20210720213737-d3d2404e7683/go.mod h1:hUE8oSk2H5JZnvtlLBhJPYC8WZCA5AoSntdLTcBvdBM= +github.com/dolthub/vitess v0.0.0-20210802203534-01de666843ce h1:NNoKBTOCFRslRQyn0Zko/1Aq1A+bpuktdXVip/dY47w= +github.com/dolthub/vitess v0.0.0-20210802203534-01de666843ce/go.mod h1:hUE8oSk2H5JZnvtlLBhJPYC8WZCA5AoSntdLTcBvdBM= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= From 0a0ce2181c1654a6068d856533551fed3a8fc8d6 Mon Sep 17 00:00:00 2001 From: Brian Hendriks Date: Thu, 5 Aug 2021 18:20:19 -0700 Subject: [PATCH 2/2] Bh/gen cs (#1938) --- .github/workflows/ci-bats-unix.yaml | 1 + .github/workflows/ci-bats-windows.yaml | 1 + go/cmd/dolt/commands/tblcmds/export.go | 2 + go/cmd/dolt/commands/tblcmds/import.go | 2 + go/libraries/doltcore/dbfactory/file.go | 50 ++- go/libraries/doltcore/doltdb/doltdb.go | 65 ++-- go/libraries/doltcore/doltdb/gc_test.go | 88 ++++-- .../doltcore/mvdata/table_data_loc.go | 5 +- .../sqle/json/noms_json_value_test.go | 3 +- go/libraries/utils/strhelp/string_help.go | 22 +- go/store/chunks/chunk_store.go | 8 +- go/store/chunks/memory_store.go | 6 +- go/store/chunks/test_utils.go | 6 +- go/store/datas/database.go | 2 +- go/store/datas/database_common.go | 4 +- go/store/hash/hash.go | 18 ++ go/store/nbs/file_manifest.go | 1 + go/store/nbs/gc_copier.go | 22 +- go/store/nbs/generational_chunk_store.go | 297 ++++++++++++++++++ go/store/nbs/generational_chunk_store_test.go | 173 ++++++++++ go/store/nbs/manifest.go | 9 + go/store/nbs/nbs_metrics_wrapper.go | 4 +- go/store/nbs/store.go | 51 ++- go/store/nbs/store_test.go | 2 +- go/store/types/parallel_ref_walker.go | 22 ++ go/store/types/value_store.go | 187 +++++++---- go/store/types/value_store_test.go | 2 +- integration-tests/bats/branch.bats | 30 +- .../bats/garbage_collection.bats | 4 +- .../bats/import-create-tables.bats | 4 +- .../bats/import-update-tables.bats | 5 +- 31 files changed, 910 insertions(+), 186 deletions(-) create mode 100644 go/store/nbs/generational_chunk_store.go create mode 100644 go/store/nbs/generational_chunk_store_test.go diff --git a/.github/workflows/ci-bats-unix.yaml b/.github/workflows/ci-bats-unix.yaml index 53842ab715..6cc0b33b49 100644 --- a/.github/workflows/ci-bats-unix.yaml +++ b/.github/workflows/ci-bats-unix.yaml @@ -68,6 +68,7 @@ jobs: go build -mod=readonly -o ../.ci_bin/git-dolt ./cmd/git-dolt/. go build -mod=readonly -o ../.ci_bin/git-dolt-smudge ./cmd/git-dolt-smudge/. go build -mod=readonly -o ../.ci_bin/remotesrv ./utils/remotesrv/. + go build -mod=readonly -o ../.ci_bin/noms ./store/cmd/noms/. - name: Setup Dolt Config run: | dolt config --global --add user.name 'Dolthub Actions' diff --git a/.github/workflows/ci-bats-windows.yaml b/.github/workflows/ci-bats-windows.yaml index ed62558690..d196f0363f 100644 --- a/.github/workflows/ci-bats-windows.yaml +++ b/.github/workflows/ci-bats-windows.yaml @@ -72,6 +72,7 @@ jobs: go build -mod=readonly -o ../.ci_bin/git-dolt ./cmd/git-dolt/. go build -mod=readonly -o ../.ci_bin/git-dolt-smudge ./cmd/git-dolt-smudge/. go build -mod=readonly -o ../.ci_bin/remotesrv ./utils/remotesrv/. + go build -mod=readonly -o ../.ci_bin/noms ./store/cmd/noms/. - name: Setup Dolt Config run: | dolt config --global --add user.name 'Dolthub Actions' diff --git a/go/cmd/dolt/commands/tblcmds/export.go b/go/cmd/dolt/commands/tblcmds/export.go index 9e209ece0b..b9df553ebe 100644 --- a/go/cmd/dolt/commands/tblcmds/export.go +++ b/go/cmd/dolt/commands/tblcmds/export.go @@ -223,6 +223,8 @@ func (cmd ExportCmd) Exec(ctx context.Context, commandStr string, args []string, skipped, verr := mvdata.MoveData(ctx, dEnv, mover, exOpts) + cli.PrintErrln() + if skipped > 0 { cli.PrintErrln(color.YellowString("Lines skipped: %d", skipped)) } diff --git a/go/cmd/dolt/commands/tblcmds/import.go b/go/cmd/dolt/commands/tblcmds/import.go index 7b43d35875..98b9af46e1 100644 --- a/go/cmd/dolt/commands/tblcmds/import.go +++ b/go/cmd/dolt/commands/tblcmds/import.go @@ -382,6 +382,8 @@ func (cmd ImportCmd) Exec(ctx context.Context, commandStr string, args []string, skipped, verr := mvdata.MoveData(ctx, dEnv, mover, mvOpts) + cli.PrintErrln() + if skipped > 0 { cli.PrintErrln(color.YellowString("Lines skipped: %d", skipped)) } diff --git a/go/libraries/doltcore/dbfactory/file.go b/go/libraries/doltcore/dbfactory/file.go index de6b263e17..30b3c072e7 100644 --- a/go/libraries/doltcore/dbfactory/file.go +++ b/go/libraries/doltcore/dbfactory/file.go @@ -16,6 +16,7 @@ package dbfactory import ( "context" + "errors" "net/url" "os" "path/filepath" @@ -52,19 +53,50 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat, path = filepath.FromSlash(path) path = urlObj.Host + path + err = validateDir(path) + if err != nil { + return nil, err + } + + newGenSt, err := nbs.NewLocalStore(ctx, nbf.VersionString(), path, defaultMemTableSize) + + if err != nil { + return nil, err + } + + oldgenPath := filepath.Join(path, "oldgen") + err = validateDir(oldgenPath) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return nil, err + } + + err = os.Mkdir(oldgenPath, os.ModePerm) + if err != nil && !errors.Is(err, os.ErrExist) { + return nil, err + } + } + + oldGenSt, err := nbs.NewLocalStore(ctx, nbf.VersionString(), oldgenPath, defaultMemTableSize) + + if err != nil { + return nil, err + } + + st := nbs.NewGenerationalCS(oldGenSt, newGenSt) + // metrics? + + return datas.NewDatabase(st), nil +} + +func validateDir(path string) error { info, err := os.Stat(path) if err != nil { - return nil, err + return err } else if !info.IsDir() { - return nil, filesys.ErrIsFile + return filesys.ErrIsFile } - st, err := nbs.NewLocalStore(ctx, nbf.VersionString(), path, defaultMemTableSize) - - if err != nil { - return nil, err - } - - return datas.NewDatabase(nbs.NewNBSMetricWrapper(st)), nil + return nil } diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 4c8d0f0fee..7129968f10 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "math/rand" "path/filepath" "strings" "time" @@ -1096,55 +1095,39 @@ func (ddb *DoltDB) GC(ctx context.Context, uncommitedVals ...hash.Hash) error { return err } - rand.Seed(time.Now().UnixNano()) - tmpDatasets := make([]datas.Dataset, len(uncommitedVals)) - for i, h := range uncommitedVals { - v, err := ddb.db.ReadValue(ctx, h) - if err != nil { - return err - } - if v == nil { - return fmt.Errorf("empty value for value hash %s", h.String()) + datasets, err := ddb.db.Datasets(ctx) + newGen := hash.NewHashSet(uncommitedVals...) + oldGen := make(hash.HashSet) + err = datasets.IterAll(ctx, func(key, value types.Value) error { + keyStr := string(key.(types.String)) + h := value.(types.Ref).TargetHash() + + var isOldGen bool + switch { + case ref.IsRef(keyStr): + parsed, err := ref.Parse(keyStr) + if err != nil && !errors.Is(err, ref.ErrUnknownRefType) { + return err + } + + refType := parsed.GetType() + isOldGen = refType == ref.BranchRefType || refType == ref.RemoteRefType || refType == ref.InternalRefType } - ds, err := ddb.db.GetDataset(ctx, fmt.Sprintf("tmp/%d", rand.Int63())) - if err != nil { - return err + if isOldGen { + oldGen.Insert(h) + } else { + newGen.Insert(h) } - r, err := WriteValAndGetRef(ctx, ddb.db, v) - if err != nil { - return err - } + return nil + }) - ds, err = ddb.db.CommitValue(ctx, ds, r) - if err != nil { - return err - } - if !ds.HasHead() { - return fmt.Errorf("could not save value %s", h.String()) - } - - tmpDatasets[i] = ds - } - - err = collector.GC(ctx) if err != nil { return err } - for _, ds := range tmpDatasets { - ds, err = ddb.db.Delete(ctx, ds) - if err != nil { - return err - } - - if ds.HasHead() { - return fmt.Errorf("unsuccessful delete for dataset %s", ds.ID()) - } - } - - return nil + return collector.GC(ctx, oldGen, newGen) } func (ddb *DoltDB) pruneUnreferencedDatasets(ctx context.Context) error { diff --git a/go/libraries/doltcore/doltdb/gc_test.go b/go/libraries/doltcore/doltdb/gc_test.go index daa5e6fb2a..b376773ce9 100644 --- a/go/libraries/doltcore/doltdb/gc_test.go +++ b/go/libraries/doltcore/doltdb/gc_test.go @@ -23,31 +23,76 @@ import ( "github.com/stretchr/testify/require" "github.com/dolthub/dolt/go/cmd/dolt/commands" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils" + "github.com/dolthub/dolt/go/libraries/doltcore/ref" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" - "github.com/dolthub/dolt/go/store/types" + "github.com/dolthub/dolt/go/store/hash" ) +type stage struct { + commands []testCommand + preStageFunc func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, prevRes interface{}) interface{} +} + type gcTest struct { - name string - setup []testCommand - garbage types.Value - query string - expected []sql.Row + name string + stages []stage + query string + expected []sql.Row + postGCFunc func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, prevRes interface{}) } var gcTests = []gcTest{ { name: "gc test", - setup: []testCommand{ - {commands.SqlCmd{}, []string{"-q", "INSERT INTO test VALUES (0),(1),(2);"}}, + stages: []stage{ + { + preStageFunc: func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, i interface{}) interface{} { + return nil + }, + commands: []testCommand{ + {commands.CheckoutCmd{}, []string{"-b", "temp"}}, + {commands.SqlCmd{}, []string{"-q", "INSERT INTO test VALUES (0),(1),(2);"}}, + {commands.AddCmd{}, []string{"."}}, + {commands.CommitCmd{}, []string{"-m", "commit"}}, + }, + }, + { + preStageFunc: func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, i interface{}) interface{} { + cm, err := ddb.ResolveCommitRef(ctx, ref.NewBranchRef("temp")) + require.NoError(t, err) + h, err := cm.HashOf() + require.NoError(t, err) + cs, err := doltdb.NewCommitSpec(h.String()) + require.NoError(t, err) + _, err = ddb.Resolve(ctx, cs, nil) + require.NoError(t, err) + return h + }, + commands: []testCommand{ + {commands.CheckoutCmd{}, []string{"master"}}, + {commands.BranchCmd{}, []string{"-D", "temp"}}, + {commands.SqlCmd{}, []string{"-q", "INSERT INTO test VALUES (4),(5),(6);"}}, + }, + }, + }, + query: "select * from test;", + expected: []sql.Row{{int32(4)}, {int32(5)}, {int32(6)}}, + postGCFunc: func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, prevRes interface{}) { + h := prevRes.(hash.Hash) + cs, err := doltdb.NewCommitSpec(h.String()) + require.NoError(t, err) + _, err = ddb.Resolve(ctx, cs, nil) + require.Error(t, err) }, - garbage: types.String("supercalifragilisticexpialidocious"), }, } var gcSetupCommon = []testCommand{ {commands.SqlCmd{}, []string{"-q", "CREATE TABLE test (pk int PRIMARY KEY)"}}, + {commands.AddCmd{}, []string{"."}}, + {commands.CommitCmd{}, []string{"-m", "created test table"}}, } func TestGarbageCollection(t *testing.T) { @@ -59,7 +104,6 @@ func TestGarbageCollection(t *testing.T) { testGarbageCollection(t, gct) }) } - } func testGarbageCollection(t *testing.T, test gcTest) { @@ -70,24 +114,25 @@ func testGarbageCollection(t *testing.T, test gcTest) { exitCode := c.cmd.Exec(ctx, c.cmd.Name(), c.args, dEnv) require.Equal(t, 0, exitCode) } - for _, c := range test.setup { - exitCode := c.cmd.Exec(ctx, c.cmd.Name(), c.args, dEnv) - require.Equal(t, 0, exitCode) - } - garbageRef, err := dEnv.DoltDB.ValueReadWriter().WriteValue(ctx, test.garbage) - require.NoError(t, err) - val, err := dEnv.DoltDB.ValueReadWriter().ReadValue(ctx, garbageRef.TargetHash()) - require.NoError(t, err) - assert.NotNil(t, val) + var res interface{} + for _, stage := range test.stages { + res = stage.preStageFunc(ctx, t, dEnv.DoltDB, res) + for _, c := range stage.commands { + exitCode := c.cmd.Exec(ctx, c.cmd.Name(), c.args, dEnv) + require.Equal(t, 0, exitCode) + } + } working, err := dEnv.WorkingRoot(ctx) require.NoError(t, err) h, err := working.HashOf() require.NoError(t, err) // save working root during GC + err = dEnv.DoltDB.GC(ctx, h) require.NoError(t, err) + test.postGCFunc(ctx, t, dEnv.DoltDB, res) working, err = dEnv.WorkingRoot(ctx) require.NoError(t, err) @@ -95,9 +140,4 @@ func testGarbageCollection(t *testing.T, test gcTest) { actual, err := sqle.ExecuteSelect(t, dEnv, dEnv.DoltDB, working, test.query) require.NoError(t, err) assert.Equal(t, test.expected, actual) - - // assert that garbage was collected - val, err = dEnv.DoltDB.ValueReadWriter().ReadValue(ctx, garbageRef.TargetHash()) - require.NoError(t, err) - assert.Nil(t, val) } diff --git a/go/libraries/doltcore/mvdata/table_data_loc.go b/go/libraries/doltcore/mvdata/table_data_loc.go index 19f7d7f3a0..4d7ecccb8c 100644 --- a/go/libraries/doltcore/mvdata/table_data_loc.go +++ b/go/libraries/doltcore/mvdata/table_data_loc.go @@ -218,13 +218,14 @@ func (te *tableEditorWriteCloser) WriteRow(ctx context.Context, r row.Row) error te.statsCB(te.stats) } - if atomic.LoadInt64(&te.gcOps) >= tableWriterGCRate { + gcOps := atomic.AddInt64(&te.gcOps, 1) + + if gcOps%tableWriterGCRate == 0 { atomic.StoreInt64(&te.gcOps, 0) if err := te.GC(ctx); err != nil { return err } } - _ = atomic.AddInt64(&te.gcOps, 1) if te.insertOnly { err := te.tableEditor.InsertRow(ctx, r, nil) diff --git a/go/libraries/doltcore/sqle/json/noms_json_value_test.go b/go/libraries/doltcore/sqle/json/noms_json_value_test.go index cfc697fbf2..cbda5e31f5 100644 --- a/go/libraries/doltcore/sqle/json/noms_json_value_test.go +++ b/go/libraries/doltcore/sqle/json/noms_json_value_test.go @@ -27,6 +27,7 @@ import ( "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" ) @@ -205,7 +206,7 @@ func TestJSONStructuralSharing(t *testing.T) { err = db.Flush(ctx) require.NoError(t, err) - err = db.(datas.GarbageCollector).GC(ctx) + err = db.(datas.GarbageCollector).GC(ctx, hash.HashSet{}, hash.HashSet{}) require.NoError(t, err) after := ts.Len() diff --git a/go/libraries/utils/strhelp/string_help.go b/go/libraries/utils/strhelp/string_help.go index 033cc9431b..d37646799e 100644 --- a/go/libraries/utils/strhelp/string_help.go +++ b/go/libraries/utils/strhelp/string_help.go @@ -14,7 +14,11 @@ package strhelp -import "strconv" +import ( + "fmt" + "strconv" + "strings" +) // NthToken returns the Nth token in s, delimited by delim. There is always at least one token: the zeroth token is the // input string if delim doesn't occur in s. The second return value will be false if there is no Nth token. @@ -63,3 +67,19 @@ func CommaIfy(n int64) string { return result } + +// LineStrBuilder is a utility class for building strings line by line +type LineStrBuilder []string + +// AppendLine works like append in that it returns an instance of a LineStrBuilder with the contents updated to contain +// the additional line. lsb = lsb.AppendLine("n: %d, s: %s", n, s) +func (lsb LineStrBuilder) AppendLine(strFmt string, args ...interface{}) LineStrBuilder { + updated := append(lsb, fmt.Sprintf(strFmt, args...)) + return updated +} + +// String returns the built string with all lines separated by newlines +func (lsb LineStrBuilder) String() string { + s := strings.Join(lsb, "\n") + return s +} diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index 7ebbe4492b..47cdc7c23a 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -108,7 +108,13 @@ type ChunkStoreGarbageCollector interface { // and MarkAndSweepChunks returns, the chunk store will only have the // chunks sent on |keepChunks| and will have removed all other content // from the ChunkStore. - MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error + MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest ChunkStore) error +} + +// GenerationalCS is an interface supporting the getting old gen and new gen chunk stores +type GenerationalCS interface { + NewGen() ChunkStoreGarbageCollector + OldGen() ChunkStoreGarbageCollector } // ChunkStoreVersionGetter is a ChunkStore that supports getting the manifest's diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index ed6ed7b816..4a108fd2c8 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -227,7 +227,11 @@ func (ms *MemoryStoreView) Commit(ctx context.Context, current, last hash.Hash) return success, nil } -func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error { +func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest ChunkStore) error { + if dest != ms { + panic("unsupported") + } + if last != ms.rootHash { return fmt.Errorf("last does not match ms.Root()") } diff --git a/go/store/chunks/test_utils.go b/go/store/chunks/test_utils.go index 99f541b00a..1f92acff07 100644 --- a/go/store/chunks/test_utils.go +++ b/go/store/chunks/test_utils.go @@ -86,13 +86,13 @@ func (s *TestStoreView) Put(ctx context.Context, c Chunk) error { return s.ChunkStore.Put(ctx, c) } -func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error { +func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest ChunkStore) error { collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector) - if !ok { + if !ok || dest != s { return ErrUnsupportedOperation } - return collector.MarkAndSweepChunks(ctx, last, keepChunks) + return collector.MarkAndSweepChunks(ctx, last, keepChunks, collector) } func (s *TestStoreView) Reads() int { diff --git a/go/store/datas/database.go b/go/store/datas/database.go index 7b6d870ef8..dcca5e7571 100644 --- a/go/store/datas/database.go +++ b/go/store/datas/database.go @@ -175,7 +175,7 @@ type GarbageCollector interface { // GC traverses the database starting at the Root and removes // all unreferenced data from persistent storage. - GC(ctx context.Context) error + GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error } // CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all diff --git a/go/store/datas/database_common.go b/go/store/datas/database_common.go index 7e0b33b591..e7e2bc4822 100644 --- a/go/store/datas/database_common.go +++ b/go/store/datas/database_common.go @@ -683,8 +683,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string) error { } // GC traverses the database starting at the Root and removes all unreferenced data from persistent storage. -func (db *database) GC(ctx context.Context) error { - return db.ValueStore.GC(ctx) +func (db *database) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error { + return db.ValueStore.GC(ctx, oldGenRefs, newGenRefs) } func (db *database) tryCommitChunks(ctx context.Context, currentDatasets types.Map, currentRootHash hash.Hash) error { diff --git a/go/store/hash/hash.go b/go/store/hash/hash.go index b87a4f612d..60b65c5dc7 100644 --- a/go/store/hash/hash.go +++ b/go/store/hash/hash.go @@ -164,3 +164,21 @@ func (hs HashSet) Has(hash Hash) (has bool) { func (hs HashSet) Remove(hash Hash) { delete(hs, hash) } + +// Copy returns a copy of the hashset +func (hs HashSet) Copy() HashSet { + copyOf := make(HashSet, len(hs)) + + for k := range hs { + copyOf[k] = struct{}{} + } + + return copyOf +} + +// InsertAll inserts all elements of a HashSet into this HashSet +func (hs HashSet) InsertAll(other HashSet) { + for h, _ := range other { + hs[h] = struct{}{} + } +} diff --git a/go/store/nbs/file_manifest.go b/go/store/nbs/file_manifest.go index b64df47285..0101c3c1e9 100644 --- a/go/store/nbs/file_manifest.go +++ b/go/store/nbs/file_manifest.go @@ -234,6 +234,7 @@ func (fm5 fileManifestV5) UpdateGCGen(ctx context.Context, lastLock addr, newCon if contents.gcGen == upstream.gcGen { return errors.New("UpdateGCGen() must update the garbage collection generation") } + if contents.root != upstream.root { return errors.New("UpdateGCGen() cannot update the root") } diff --git a/go/store/nbs/gc_copier.go b/go/store/nbs/gc_copier.go index af7f642c45..2661ec9850 100644 --- a/go/store/nbs/gc_copier.go +++ b/go/store/nbs/gc_copier.go @@ -17,6 +17,7 @@ package nbs import ( "context" "fmt" + "os" "path" "strings" ) @@ -65,9 +66,9 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, destDir string) ([]tab } filepath := path.Join(destDir, filename) - err = gcc.writer.FlushToFile(filepath) - if err != nil { - return nil, err + + if gcc.writer.Size() == 0 { + return []tableSpec{}, nil } addr, err := parseAddr(filename) @@ -75,8 +76,21 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, destDir string) ([]tab return nil, err } + if info, err := os.Stat(filepath); err == nil { + // file already exists + if gcc.writer.ContentLength() != uint64(info.Size()) { + return nil, fmt.Errorf("'%s' already exists with different contents.", filepath) + } + } else { + // file does not exist or error determining if it existed. Try to create it. + err = gcc.writer.FlushToFile(filepath) + if err != nil { + return nil, err + } + } + return []tableSpec{ - tableSpec{ + { name: addr, chunkCount: gcc.writer.ChunkCount(), }, diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go new file mode 100644 index 0000000000..2b1528451d --- /dev/null +++ b/go/store/nbs/generational_chunk_store.go @@ -0,0 +1,297 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "context" + "io" + + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" +) + +var _ chunks.ChunkStore = (*GenerationalNBS)(nil) +var _ chunks.GenerationalCS = (*GenerationalNBS)(nil) +var _ TableFileStore = (*GenerationalNBS)(nil) + +type GenerationalNBS struct { + oldGen *NomsBlockStore + newGen *NomsBlockStore +} + +func NewGenerationalCS(oldGen, newGen *NomsBlockStore) *GenerationalNBS { + if oldGen.Version() != newGen.Version() { + panic("oldgen and newgen chunkstore versions vary") + } + + return &GenerationalNBS{ + oldGen: oldGen, + newGen: newGen, + } +} + +func (gcs *GenerationalNBS) NewGen() chunks.ChunkStoreGarbageCollector { + return gcs.newGen +} + +func (gcs *GenerationalNBS) OldGen() chunks.ChunkStoreGarbageCollector { + return gcs.oldGen +} + +// Get the Chunk for the value of the hash in the store. If the hash is absent from the store EmptyChunk is returned. +func (gcs *GenerationalNBS) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) { + c, err := gcs.oldGen.Get(ctx, h) + + if err != nil { + return chunks.EmptyChunk, err + } + + if c.IsEmpty() { + return gcs.newGen.Get(ctx, h) + } + + return c, nil +} + +// GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks +// which have been found. Any non-present chunks will silently be ignored. +func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, found func(*chunks.Chunk)) error { + notInOldGen := hashes.Copy() + err := gcs.oldGen.GetMany(ctx, hashes, func(chunk *chunks.Chunk) { + delete(notInOldGen, chunk.Hash()) + found(chunk) + }) + + if err != nil { + return err + } + + if len(notInOldGen) == 0 { + return nil + } + + return gcs.newGen.GetMany(ctx, notInOldGen, found) +} + +func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(CompressedChunk)) error { + notInOldGen := hashes.Copy() + err := gcs.oldGen.GetManyCompressed(ctx, hashes, func(chunk CompressedChunk) { + delete(notInOldGen, chunk.Hash()) + found(chunk) + }) + + if err != nil { + return err + } + + if len(notInOldGen) == 0 { + return nil + } + + return gcs.newGen.GetManyCompressed(ctx, notInOldGen, found) +} + +// Returns true iff the value at the address |h| is contained in the +// store +func (gcs *GenerationalNBS) Has(ctx context.Context, h hash.Hash) (bool, error) { + has, err := gcs.oldGen.Has(ctx, h) + + if err != nil { + return false, err + } + + if has { + return true, nil + } + + return gcs.newGen.Has(ctx, h) +} + +// Returns a new HashSet containing any members of |hashes| that are +// absent from the store. +func (gcs *GenerationalNBS) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) { + notInOldGen, err := gcs.oldGen.HasMany(ctx, hashes) + + if err != nil { + return nil, err + } + + if len(notInOldGen) == 0 { + return notInOldGen, nil + } + + return gcs.newGen.HasMany(ctx, notInOldGen) +} + +// Put caches c in the ChunkSource. Upon return, c must be visible to +// subsequent Get and Has calls, but must not be persistent until a call +// to Flush(). Put may be called concurrently with other calls to Put(), +// Get(), GetMany(), Has() and HasMany(). +func (gcs *GenerationalNBS) Put(ctx context.Context, c chunks.Chunk) error { + return gcs.newGen.Put(ctx, c) +} + +// Returns the NomsVersion with which this ChunkSource is compatible. +func (gcs *GenerationalNBS) Version() string { + return gcs.newGen.Version() +} + +// Rebase brings this ChunkStore into sync with the persistent storage's +// current root. +func (gcs *GenerationalNBS) Rebase(ctx context.Context) error { + oErr := gcs.oldGen.Rebase(ctx) + nErr := gcs.newGen.Rebase(ctx) + + if oErr != nil { + return oErr + } + + return nErr +} + +// Root returns the root of the database as of the time the ChunkStore +// was opened or the most recent call to Rebase. +func (gcs *GenerationalNBS) Root(ctx context.Context) (hash.Hash, error) { + return gcs.newGen.Root(ctx) +} + +// Commit atomically attempts to persist all novel Chunks and update the +// persisted root hash from last to current (or keeps it the same). +// If last doesn't match the root in persistent storage, returns false. +func (gcs *GenerationalNBS) Commit(ctx context.Context, current, last hash.Hash) (bool, error) { + return gcs.newGen.Commit(ctx, current, last) +} + +// Stats may return some kind of struct that reports statistics about the +// ChunkStore instance. The type is implementation-dependent, and impls +// may return nil +func (gcs *GenerationalNBS) Stats() interface{} { + return nil +} + +// StatsSummary may return a string containing summarized statistics for +// this ChunkStore. It must return "Unsupported" if this operation is not +// supported. +func (gcs *GenerationalNBS) StatsSummary() string { + return "" +} + +// Close tears down any resources in use by the implementation. After // Close(), the ChunkStore may not be used again. It is NOT SAFE to call +// Close() concurrently with any other ChunkStore method; behavior is +// undefined and probably crashy. +func (gcs *GenerationalNBS) Close() error { + oErr := gcs.oldGen.Close() + nErr := gcs.newGen.Close() + + if oErr != nil { + return oErr + } + + return nErr +} + +func (gcs *GenerationalNBS) copyToOldGen(ctx context.Context, hashes hash.HashSet) error { + notInOldGen, err := gcs.oldGen.HasMany(ctx, hashes) + + if err != nil { + return err + } + + var putErr error + err = gcs.newGen.GetMany(ctx, notInOldGen, func(chunk *chunks.Chunk) { + if putErr == nil { + putErr = gcs.oldGen.Put(ctx, *chunk) + } + }) + + if putErr != nil { + return putErr + } + + return err +} + +// Sources retrieves the current root hash, a list of all the table files (which may include appendix table files), +// and a second list containing only appendix table files for both the old gen and new gen stores. +func (gcs *GenerationalNBS) Sources(ctx context.Context) (hash.Hash, []TableFile, []TableFile, error) { + _, tFiles, appFiles, err := gcs.oldGen.Sources(ctx) + + if err != nil { + return hash.Hash{}, nil, nil, err + } + + newRoot, newTFiles, newAppFiles, err := gcs.newGen.Sources(ctx) + + if err != nil { + return hash.Hash{}, nil, nil, err + } + + for _, tf := range newTFiles { + tFiles = append(tFiles, tf) + } + for _, tf := range newAppFiles { + appFiles = append(appFiles, tf) + } + + return newRoot, tFiles, appFiles, nil +} + +// Size returns the total size, in bytes, of the table files in the new and old gen stores combined +func (gcs *GenerationalNBS) Size(ctx context.Context) (uint64, error) { + oldSize, err := gcs.oldGen.Size(ctx) + + if err != nil { + return 0, err + } + + newSize, err := gcs.newGen.Size(ctx) + + if err != nil { + return 0, err + } + + return oldSize + newSize, nil +} + +// WriteTableFile will read a table file from the provided reader and write it to the new gen TableFileStore +func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error { + return gcs.newGen.WriteTableFile(ctx, fileId, numChunks, rd, contentLength, contentHash) +} + +// AddTableFilesToManifest adds table files to the manifest of the newgen cs +func (gcs *GenerationalNBS) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error { + return gcs.newGen.AddTableFilesToManifest(ctx, fileIdToNumChunks) +} + +// PruneTableFiles deletes old table files that are no longer referenced in the manifest of the new or old gen chunkstores +func (gcs *GenerationalNBS) PruneTableFiles(ctx context.Context) error { + err := gcs.oldGen.PruneTableFiles(ctx) + + if err != nil { + return err + } + + return gcs.newGen.PruneTableFiles(ctx) +} + +// SetRootChunk changes the root chunk hash from the previous value to the new root for the newgen cs +func (gcs *GenerationalNBS) SetRootChunk(ctx context.Context, root, previous hash.Hash) error { + return gcs.newGen.SetRootChunk(ctx, root, previous) +} + +// SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing. +func (gcs *GenerationalNBS) SupportedOperations() TableFileStoreOps { + return gcs.newGen.SupportedOperations() +} diff --git a/go/store/nbs/generational_chunk_store_test.go b/go/store/nbs/generational_chunk_store_test.go new file mode 100644 index 0000000000..f9a972708a --- /dev/null +++ b/go/store/nbs/generational_chunk_store_test.go @@ -0,0 +1,173 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "context" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" +) + +var randGen = rand.New(rand.NewSource(0)) + +func genChunks(t *testing.T, count int, max int) []chunks.Chunk { + chnks := make([]chunks.Chunk, count) + for i := 0; i < count; i++ { + bytes := make([]byte, randGen.Int()%max) + n, err := randGen.Read(bytes) + require.NoError(t, err) + chnks[i] = chunks.NewChunk(bytes[:n]) + } + + return chnks +} + +func mergeMaps(m1, m2 map[int]bool) map[int]bool { + m3 := make(map[int]bool) + for k := range m1 { + m3[k] = true + } + + for k := range m2 { + m3[k] = true + } + + return m3 +} + +func hashesForChunks(chunks []chunks.Chunk, indexes map[int]bool) hash.HashSet { + hashes := make(hash.HashSet) + for idx := range indexes { + hashes[chunks[idx].Hash()] = struct{}{} + } + + return hashes +} + +type foundHashes hash.HashSet + +func (fh foundHashes) found(chk *chunks.Chunk) { + fh[chk.Hash()] = struct{}{} +} + +func requireChunks(t *testing.T, ctx context.Context, chunks []chunks.Chunk, genCS *GenerationalNBS, inOld, inNew map[int]bool) { + // Has/Get Checks + for i, chk := range chunks { + has, err := genCS.oldGen.Has(ctx, chk.Hash()) + require.NoError(t, err) + require.Equal(t, inOld[i], has, "error for index: %d", i) + + retrieved, err := genCS.oldGen.Get(ctx, chk.Hash()) + require.NoError(t, err) + require.Equal(t, !inOld[i], retrieved.IsEmpty(), "error for index: %d", i) + + has, err = genCS.newGen.Has(ctx, chk.Hash()) + require.NoError(t, err) + require.Equal(t, inNew[i], has, "error for index: %d", i) + + retrieved, err = genCS.newGen.Get(ctx, chk.Hash()) + require.NoError(t, err) + require.Equal(t, !inNew[i], retrieved.IsEmpty(), "error for index: %d", i) + + has, err = genCS.Has(ctx, chk.Hash()) + require.NoError(t, err) + require.Equal(t, inOld[i] || inNew[i], has, "error for index: %d", i) + + retrieved, err = genCS.Get(ctx, chk.Hash()) + require.NoError(t, err) + require.Equal(t, !(inOld[i] || inNew[i]), retrieved.IsEmpty(), "error for index: %d", i) + } + + // HasMany Checks + absent, err := genCS.oldGen.HasMany(ctx, hashesForChunks(chunks, inOld)) + require.NoError(t, err) + require.Len(t, absent, 0) + + absent, err = genCS.newGen.HasMany(ctx, hashesForChunks(chunks, inNew)) + require.NoError(t, err) + require.Len(t, absent, 0) + + inUnion := mergeMaps(inOld, inNew) + absent, err = genCS.HasMany(ctx, hashesForChunks(chunks, inUnion)) + require.NoError(t, err) + require.Len(t, absent, 0) + + // GetMany Checks + expected := hashesForChunks(chunks, inOld) + received := foundHashes{} + err = genCS.oldGen.GetMany(ctx, expected, received.found) + require.NoError(t, err) + require.Equal(t, expected, hash.HashSet(received)) + + expected = hashesForChunks(chunks, inNew) + received = foundHashes{} + err = genCS.newGen.GetMany(ctx, expected, received.found) + require.NoError(t, err) + require.Equal(t, expected, hash.HashSet(received)) + + expected = hashesForChunks(chunks, inUnion) + received = foundHashes{} + err = genCS.GetMany(ctx, expected, received.found) + require.NoError(t, err) + require.Equal(t, expected, hash.HashSet(received)) +} + +func putChunks(t *testing.T, ctx context.Context, chunks []chunks.Chunk, cs chunks.ChunkStore, indexesIn map[int]bool, chunkIndexes ...int) { + for _, idx := range chunkIndexes { + err := cs.Put(ctx, chunks[idx]) + require.NoError(t, err) + indexesIn[idx] = true + } +} + +func TestGenerationalCS(t *testing.T) { + ctx := context.Background() + oldGen, _ := makeTestLocalStore(t, 64) + newGen, _ := makeTestLocalStore(t, 64) + inOld := make(map[int]bool) + inNew := make(map[int]bool) + chnks := genChunks(t, 100, 1000) + + putChunks(t, ctx, chnks, oldGen, inOld, 0, 1, 2, 3, 4) + + cs := NewGenerationalCS(oldGen, newGen) + requireChunks(t, ctx, chnks, cs, inOld, inNew) + + putChunks(t, ctx, chnks, cs, inNew, 6, 7, 8, 9) + requireChunks(t, ctx, chnks, cs, inOld, inNew) + + err := cs.copyToOldGen(ctx, hashesForChunks(chnks, inNew)) + require.NoError(t, err) + + inOld = mergeMaps(inOld, inNew) + requireChunks(t, ctx, chnks, cs, inOld, inNew) + + putChunks(t, ctx, chnks, cs, inNew, 10, 11, 12, 13, 14) + requireChunks(t, ctx, chnks, cs, inOld, inNew) + + err = cs.copyToOldGen(ctx, hashesForChunks(chnks, inNew)) + require.NoError(t, err) + + inOld = mergeMaps(inOld, inNew) + requireChunks(t, ctx, chnks, cs, inOld, inNew) + + putChunks(t, ctx, chnks, cs, inNew, 15, 16, 17, 18, 19) + requireChunks(t, ctx, chnks, cs, inOld, inNew) +} diff --git a/go/store/nbs/manifest.go b/go/store/nbs/manifest.go index d8e344973f..71af0d6e8f 100644 --- a/go/store/nbs/manifest.go +++ b/go/store/nbs/manifest.go @@ -464,6 +464,15 @@ func (ts tableSpec) GetChunkCount() uint32 { return ts.chunkCount } +func tableSpecsToMap(specs []tableSpec) map[string]int { + m := make(map[string]int) + for _, spec := range specs { + m[spec.name.String()] = int(spec.chunkCount) + } + + return m +} + func parseSpecs(tableInfo []string) ([]tableSpec, error) { specs := make([]tableSpec, len(tableInfo)/2) for i := range specs { diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 1b9743c354..d240d3f828 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -71,8 +71,8 @@ func (nbsMW *NBSMetricWrapper) SupportedOperations() TableFileStoreOps { return nbsMW.nbs.SupportedOperations() } -func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error { - return nbsMW.nbs.MarkAndSweepChunks(ctx, last, keepChunks) +func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest chunks.ChunkStore) error { + return nbsMW.nbs.MarkAndSweepChunks(ctx, last, keepChunks, dest) } // PruneTableFiles deletes old table files that are no longer referenced in the manifest. diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index fa34d87ae9..f76f66da7e 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1304,6 +1304,7 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, nu // AddTableFilesToManifest adds table files to the manifest func (nbs *NomsBlockStore) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error { + var totalChunks int fileIdHashToNumChunks := make(map[hash.Hash]uint32) for fileId, numChunks := range fileIdToNumChunks { fileIdHash, ok := hash.MaybeParse(fileId) @@ -1313,6 +1314,11 @@ func (nbs *NomsBlockStore) AddTableFilesToManifest(ctx context.Context, fileIdTo } fileIdHashToNumChunks[fileIdHash] = uint32(numChunks) + totalChunks += numChunks + } + + if totalChunks == 0 { + return nil } _, err := nbs.UpdateManifest(ctx, fileIdHashToNumChunks) @@ -1390,7 +1396,7 @@ func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) { return nbs.p.PruneTableFiles(ctx, contents) } -func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error { +func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest chunks.ChunkStore) error { ops := nbs.SupportedOperations() if !ops.CanGC || !ops.CanPrune { return chunks.ErrUnsupportedOperation @@ -1406,7 +1412,17 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has return chunks.ErrNothingToCollect } - specs, err := nbs.copyMarkedChunks(ctx, keepChunks) + destNBS := nbs + if dest != nil { + switch typed := dest.(type) { + case *NomsBlockStore: + destNBS = typed + case NBSMetricWrapper: + destNBS = typed.nbs + } + } + + specs, err := nbs.copyMarkedChunks(ctx, keepChunks, destNBS) if err != nil { return err } @@ -1414,12 +1430,22 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has return ctx.Err() } - err = nbs.swapTables(ctx, specs) - if err != nil { - return err - } - if ctx.Err() != nil { - return ctx.Err() + if destNBS == nbs { + err = nbs.swapTables(ctx, specs) + if err != nil { + return err + } + + if ctx.Err() != nil { + return ctx.Err() + } + } else { + fileIdToNumChunks := tableSpecsToMap(specs) + err = destNBS.AddTableFilesToManifest(ctx, fileIdToNumChunks) + + if err != nil { + return err + } } ok, contents, err := nbs.mm.Fetch(ctx, &Stats{}) @@ -1436,7 +1462,7 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has return nbs.p.PruneTableFiles(ctx, contents) } -func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash) ([]tableSpec, error) { +func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, dest *NomsBlockStore) ([]tableSpec, error) { gcc, err := newGarbageCollectionCopier() if err != nil { return nil, err @@ -1471,7 +1497,7 @@ LOOP: } } - nomsDir := nbs.p.(*fsTablePersister).dir + nomsDir := dest.p.(*fsTablePersister).dir return gcc.copyTablesToDir(ctx, nomsDir) } @@ -1503,6 +1529,11 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) er specs: specs, } + // nothing has changed. Bail early + if newContents.gcGen == nbs.upstream.gcGen { + return nil + } + var err error nbs.mm.LockForUpdate() defer func() { diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index 929861a61c..29dc25fe4c 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -263,7 +263,7 @@ func TestNBSCopyGC(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - msErr = st.MarkAndSweepChunks(ctx, r, keepChan) + msErr = st.MarkAndSweepChunks(ctx, r, keepChan, nil) wg.Done() }() for h := range keepers { diff --git a/go/store/types/parallel_ref_walker.go b/go/store/types/parallel_ref_walker.go index 25ac5f29ec..08046b787b 100644 --- a/go/store/types/parallel_ref_walker.go +++ b/go/store/types/parallel_ref_walker.go @@ -116,6 +116,28 @@ func (w *parallelRefWalker) GetRefs(visited hash.HashSet, vals ValueSlice) ([]ha return res, nil } +func (w *parallelRefWalker) GetRefSet(visited hash.HashSet, vals ValueSlice) (hash.HashSet, error) { + res := make(hash.HashSet) + numSent, resCh, err := w.sendAllWork(vals) + if err != nil { + return nil, err + } + for i := 0; i < numSent; i++ { + select { + case b := <-resCh: + for _, r := range b { + if !visited.Has(r) { + res[r] = struct{}{} + visited.Insert(r) + } + } + case <-w.ctx.Done(): + return nil, w.ctx.Err() + } + } + return res, nil +} + func (w *parallelRefWalker) Close() error { close(w.work) return w.eg.Wait() diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 84aee3fdce..6b9ec3c96e 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -35,6 +35,12 @@ import ( "github.com/dolthub/dolt/go/store/util/sizecache" ) +type HashFilterFunc func(context.Context, hash.HashSet) (hash.HashSet, error) + +func unfilteredHashFunc(_ context.Context, hs hash.HashSet) (hash.HashSet, error) { + return hs, nil +} + // ValueReader is an interface that knows how to read Noms Values, e.g. // datas/Database. Required to avoid import cycle between this package and the // package that implements Value reading. @@ -551,12 +557,43 @@ func (lvs *ValueStore) Commit(ctx context.Context, current, last hash.Hash) (boo }() } -// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore -func (lvs *ValueStore) GC(ctx context.Context) error { - collector, ok := lvs.cs.(chunks.ChunkStoreGarbageCollector) +func makeBatches(hss []hash.HashSet, count int) [][]hash.Hash { + const maxBatchSize = 16384 - if !ok { - return chunks.ErrUnsupportedOperation + buffer := make([]hash.Hash, count) + i := 0 + for _, hs := range hss { + for h := range hs { + buffer[i] = h + i++ + } + } + + numBatches := (count + (maxBatchSize - 1)) / maxBatchSize + batchSize := count / numBatches + + res := make([][]hash.Hash, numBatches) + for i := 0; i < numBatches; i++ { + if i != numBatches-1 { + res[i] = buffer[i*batchSize : (i+1)*batchSize] + } else { + res[i] = buffer[i*batchSize:] + } + } + + return res +} + +func (lvs *ValueStore) numBuffChunks() int { + lvs.bufferMu.RLock() + defer lvs.bufferMu.RUnlock() + return len(lvs.bufferedChunks) +} + +// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore +func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error { + if lvs.numBuffChunks() > 0 { + return errors.New("invalid GC state; bufferedChunks must be empty.") } err := func() error { @@ -583,16 +620,39 @@ func (lvs *ValueStore) GC(ctx context.Context) error { if err != nil { return err } + if rootVal == nil { // empty root return nil } + newGenRefs.Insert(root) + if gcs, ok := lvs.cs.(chunks.GenerationalCS); ok { + oldGen := gcs.OldGen() + newGen := gcs.NewGen() + err = lvs.gc(ctx, root, oldGenRefs, oldGen.HasMany, newGen, oldGen) + if err != nil { + return err + } + + return lvs.gc(ctx, root, newGenRefs, oldGen.HasMany, newGen, newGen) + } else if collector, ok := lvs.cs.(chunks.ChunkStoreGarbageCollector); ok { + if len(oldGenRefs) > 0 { + newGenRefs.InsertAll(oldGenRefs) + } + + return lvs.gc(ctx, root, newGenRefs, unfilteredHashFunc, collector, collector) + } else { + return chunks.ErrUnsupportedOperation + } +} + +func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.HashSet, hashFilter HashFilterFunc, src, dest chunks.ChunkStoreGarbageCollector) error { keepChunks := make(chan []hash.Hash, gcBuffSize) eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - return collector.MarkAndSweepChunks(ctx, root, keepChunks) + return src.MarkAndSweepChunks(ctx, root, keepChunks, dest) }) keepHashes := func(hs []hash.Hash) error { @@ -603,20 +663,6 @@ func (lvs *ValueStore) GC(ctx context.Context) error { return ctx.Err() } } - const batchSize = 16384 - batches := func(hss [][]hash.Hash) [][]hash.Hash { - var res [][]hash.Hash - for _, hs := range hss { - i := 0 - for ; i+batchSize < len(hs); i += batchSize { - res = append(res, hs[i:i+batchSize]) - } - if i < len(hs) { - res = append(res, hs[i:]) - } - } - return res - } concurrency := runtime.GOMAXPROCS(0) - 1 if concurrency < 1 { @@ -625,51 +671,74 @@ func (lvs *ValueStore) GC(ctx context.Context) error { walker := newParallelRefWalker(ctx, lvs.nbf, concurrency) eg.Go(func() error { - toVisitCount := 1 - toVisit := [][]hash.Hash{{root}} - visited := hash.NewHashSet(root) - for toVisitCount > 0 { - batches := batches(toVisit) - toVisit = make([][]hash.Hash, len(batches)) - toVisitCount = 0 - for i, batch := range batches { - if err := keepHashes(batch); err != nil { - return err - } - vals, err := lvs.ReadManyValues(ctx, batch) - if err != nil { - return err - } - if len(vals) != len(batch) { - return errors.New("dangling reference found in chunk store") - } - hashes, err := walker.GetRefs(visited, vals) - if err != nil { - return err - } - toVisit[i] = hashes - toVisitCount += len(hashes) - } - } - walker.Close() + defer func() { + close(keepChunks) + _ = walker.Close() + }() - lvs.bufferMu.Lock() - defer lvs.bufferMu.Unlock() - if len(lvs.bufferedChunks) > 0 { - return errors.New("invalid GC state; bufferedChunks started empty and was not empty at end of run.") - } - lvs.decodedChunks = sizecache.New(lvs.decodedChunks.Size()) - lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize) - lvs.bufferedChunkSize = 0 - lvs.withBufferedChildren = map[hash.Hash]uint64{} - - close(keepChunks) - return nil + visited := toVisit.Copy() + return lvs.gcProcessRefs(ctx, visited, []hash.HashSet{toVisit}, keepHashes, walker, hashFilter) }) + err := eg.Wait() + if err != nil { + return err + } + + if lvs.numBuffChunks() > 0 { + return errors.New("invalid GC state; bufferedChunks started empty and was not empty at end of run.") + } + + // purge the cache + lvs.decodedChunks = sizecache.New(lvs.decodedChunks.Size()) + lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize) + lvs.bufferedChunkSize = 0 + lvs.withBufferedChildren = map[hash.Hash]uint64{} + return eg.Wait() } +func (lvs *ValueStore) gcProcessRefs(ctx context.Context, visited hash.HashSet, toVisit []hash.HashSet, keepHashes func(hs []hash.Hash) error, walker *parallelRefWalker, hashFilter HashFilterFunc) error { + if len(toVisit) != 1 { + panic("Must be one initial hashset to visit") + } + + toVisitCount := len(toVisit[0]) + for toVisitCount > 0 { + batches := makeBatches(toVisit, toVisitCount) + toVisit = make([]hash.HashSet, len(batches)) + toVisitCount = 0 + for i, batch := range batches { + if err := keepHashes(batch); err != nil { + return err + } + + vals, err := lvs.ReadManyValues(ctx, batch) + if err != nil { + return err + } + if len(vals) != len(batch) { + return errors.New("dangling reference found in chunk store") + } + + hashes, err := walker.GetRefSet(visited, vals) + if err != nil { + return err + } + + // continue processing + hashes, err = hashFilter(ctx, hashes) + if err != nil { + return err + } + + toVisit[i] = hashes + toVisitCount += len(hashes) + } + } + return nil +} + // Close closes the underlying ChunkStore func (lvs *ValueStore) Close() error { return lvs.cs.Close() diff --git a/go/store/types/value_store_test.go b/go/store/types/value_store_test.go index d77944ff23..20192efdf9 100644 --- a/go/store/types/value_store_test.go +++ b/go/store/types/value_store_test.go @@ -379,7 +379,7 @@ func TestGC(t *testing.T) { require.NoError(t, err) assert.NotNil(v2) - err = vs.GC(ctx) + err = vs.GC(ctx, hash.HashSet{}, hash.HashSet{}) require.NoError(t, err) v1, err = vs.ReadValue(ctx, h1) // non-nil diff --git a/integration-tests/bats/branch.bats b/integration-tests/bats/branch.bats index fddc269355..913a0c1270 100644 --- a/integration-tests/bats/branch.bats +++ b/integration-tests/bats/branch.bats @@ -11,26 +11,22 @@ teardown() { } @test "branch: deleting a branch deletes its working set" { - dolt gc dolt checkout -b to_delete - dolt sql -q 'create table test (id int primary key);' - values="" - for i in `seq 0 1024`; do - values="$values""${values:+,}""($i)" - done - dolt sql -q 'insert into test values '"$values"';' - dolt add . - dolt commit -m 'making a new commit' - dolt gc - with_values_sz=`du -s | awk '{print $1}'` + + root=$(noms root .dolt/noms) + run noms show .dolt/noms::#$root + [[ "$show_tables" -eq 0 ]] || false + echo $output + [[ "$output" =~ "workingSets/heads/master" ]] || false + [[ "$output" =~ "workingSets/heads/to_delete" ]] || false + dolt checkout master dolt branch -d -f to_delete - num_branches=`dolt branch | wc -l` - [[ "$num_branches" -eq 1 ]] || fail "expected num_branches to be 1" - dolt gc - without_values_sz=`du -s | awk '{print $1}'` - echo "$sz $new_sz $post_delete_sz" - [[ "$without_values_sz" -lt "$with_values_sz" ]] || false + + root=$(noms root .dolt/noms) + run noms show .dolt/noms::#$root + [[ "$show_tables" -eq 0 ]] || false + [[ ! "$output" =~ "to_delete" ]] || false } @test "branch: moving current working branch takes its working set" { diff --git a/integration-tests/bats/garbage_collection.bats b/integration-tests/bats/garbage_collection.bats index 19f18b7ddb..1f63a3efd6 100644 --- a/integration-tests/bats/garbage_collection.bats +++ b/integration-tests/bats/garbage_collection.bats @@ -116,7 +116,7 @@ SQL # leave data in the working set dolt sql -q "INSERT INTO test VALUES (11),(12),(13),(14),(15);" - BEFORE=$(du .dolt/noms/ | sed 's/[^0-9]*//g') + BEFORE=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') run dolt gc [ "$status" -eq 0 ] @@ -125,7 +125,7 @@ SQL [ "$status" -eq 0 ] [[ "$output" =~ "80" ]] || false - AFTER=$(du .dolt/noms/ | sed 's/[^0-9]*//g') + AFTER=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') # assert space was reclaimed echo "$BEFORE" diff --git a/integration-tests/bats/import-create-tables.bats b/integration-tests/bats/import-create-tables.bats index 1f69342339..2580ab7f20 100755 --- a/integration-tests/bats/import-create-tables.bats +++ b/integration-tests/bats/import-create-tables.bats @@ -539,9 +539,9 @@ DELIM [ "$status" -eq 0 ] # assert that we already collected garbage - BEFORE=$(du .dolt/noms/ | sed 's/[^0-9]*//g') + BEFORE=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') dolt gc - AFTER=$(du .dolt/noms/ | sed 's/[^0-9]*//g') + AFTER=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') # less than 10% smaller [ "$BEFORE" -lt $(($AFTER * 11 / 10)) ] diff --git a/integration-tests/bats/import-update-tables.bats b/integration-tests/bats/import-update-tables.bats index bccb80a0bc..403162f3d1 100644 --- a/integration-tests/bats/import-update-tables.bats +++ b/integration-tests/bats/import-update-tables.bats @@ -229,8 +229,9 @@ DELIM # Output to a file from the error stderr dolt sql -q "DELETE FROM test WHERE pk = 1" - dolt table import -u --continue test 1pk5col-rpt-ints.csv 2> skipped.csv - run cat skipped.csv + run dolt table import -u --continue test 1pk5col-rpt-ints.csv + echo $output + [ "$status" -eq 0 ] [[ "$output" =~ "The following rows were skipped:" ]] || false [[ "$output" =~ "1,1,2,3,4,7" ]] || false [[ "$output" =~ "1,1,2,3,4,8" ]] || false