From d024ea8ea6d4dae5df695e1dd4f7a3c13cd5604f Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 20 Mar 2023 16:39:50 -0700 Subject: [PATCH] go/store/nbs: store.go: Conjoin if we need to when we add new table files to a manifest. This allows us to avoid huge manifests with lots of table files when we run multiple full GCs over a period of time. --- go/store/nbs/store.go | 61 ++++++++---- .../gc_oldgen_conjoin_test.go | 97 +++++++++++++++++++ 2 files changed, 139 insertions(+), 19 deletions(-) create mode 100644 integration-tests/go-sql-server-driver/gc_oldgen_conjoin_test.go diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 2b9aec277c..0a4add62f8 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -162,6 +162,32 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash return ranges, nil } +func (nbs *NomsBlockStore) conjoinIfRequired(ctx context.Context) (bool, error) { + if nbs.c.conjoinRequired(nbs.tables) { + newUpstream, cleanup, err := conjoin(ctx, nbs.c, nbs.upstream, nbs.mm, nbs.p, nbs.stats) + if err != nil { + return false, err + } + + newTables, err := nbs.tables.rebase(ctx, newUpstream.specs, nbs.stats) + if err != nil { + return false, err + } + + nbs.upstream = newUpstream + oldTables := nbs.tables + nbs.tables = newTables + err = oldTables.close() + if err != nil { + return true, err + } + cleanup() + return true, nil + } else { + return false, nil + } +} + func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) { nbs.mu.Lock() defer nbs.mu.Unlock() @@ -181,6 +207,11 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash. } }() + _, err = nbs.conjoinIfRequired(ctx) + if err != nil { + return manifestContents{}, err + } + var updatedContents manifestContents for { ok, contents, _, ferr := nbs.mm.Fetch(ctx, nbs.stats) @@ -263,6 +294,12 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat } }() + _, err = nbs.conjoinIfRequired(ctx) + if err != nil { + return manifestContents{}, err + } + + var updatedContents manifestContents for { ok, contents, _, ferr := nbs.mm.Fetch(ctx, nbs.stats) @@ -1172,25 +1209,11 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has } } - if nbs.c.conjoinRequired(nbs.tables) { - newUpstream, cleanup, err := conjoin(ctx, nbs.c, nbs.upstream, nbs.mm, nbs.p, nbs.stats) - if err != nil { - return err - } - - newTables, err := nbs.tables.rebase(ctx, newUpstream.specs, nbs.stats) - if err != nil { - return err - } - - nbs.upstream = newUpstream - oldTables := nbs.tables - nbs.tables = newTables - err = oldTables.close() - if err != nil { - return err - } - cleanup() + didConjoin, err := nbs.conjoinIfRequired(ctx) + if err != nil { + return err + } + if didConjoin { return errOptimisticLockFailedTables } diff --git a/integration-tests/go-sql-server-driver/gc_oldgen_conjoin_test.go b/integration-tests/go-sql-server-driver/gc_oldgen_conjoin_test.go new file mode 100644 index 0000000000..b527f0f7ec --- /dev/null +++ b/integration-tests/go-sql-server-driver/gc_oldgen_conjoin_test.go @@ -0,0 +1,97 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + sqldriver "database/sql/driver" + + "github.com/stretchr/testify/require" + + driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" +) + +func TestGCConjoinsOldgen(t *testing.T) { + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + + rs, err := u.MakeRepoStore() + require.NoError(t, err) + + repo, err := rs.MakeRepo("concurrent_gc_test") + require.NoError(t, err) + + server := MakeServer(t, repo, &driver.Server{}) + server.DBName = "concurrent_gc_test" + + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + defer db.Close() + + ctx := context.Background() + + // Create the database... + func() { + conn, err := db.Conn(ctx) + require.NoError(t, err) + defer conn.Close() + _, err = conn.ExecContext(ctx, "create table vals (id bigint primary key, val bigint)") + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "call dolt_commit('-Am', 'create vals table')") + require.NoError(t, err) + }() + + // Each 1024 leaf entries is approximately one chunk. + // We create 512 commits, each of which adds the next 1024 rows sequentially. + // After we create each commit, we dolt_gc(), which moves the new chunks to + // the oldgen. At the end of the test, we will assert that there are not + // too many table files in the old gen. + for i := 0; i < 512; i++ { + var vals []string + for j := i*1024; j < (i+1)*1024; j++ { + vals = append(vals, "(" + strconv.Itoa(j) + ",0)") + } + func() { + conn, err := db.Conn(ctx) + defer func() { + // After calling dolt_gc, the connection is bad. Remove it from the connection pool. + conn.Raw(func(_ any) error { + return sqldriver.ErrBadConn + }) + }() + + _, err = conn.ExecContext(ctx, "insert into vals values " + strings.Join(vals, ",")) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "call dolt_commit('-am', 'insert from " + strconv.Itoa(i*1024) + "')") + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "call dolt_gc()") + require.NoError(t, err) + }() + } + + entries, err := os.ReadDir(filepath.Join(repo.Dir, ".dolt/noms/oldgen")) + require.NoError(t, err) + require.Greater(t, len(entries), 2) + // defaultMaxTables == 256, plus a few files extra like |manifest| and |LOCK|. + require.Less(t, len(entries), 272) +}