Merge remote-tracking branch 'origin/master' into aaron/store-nbs-manifest-update-lock-fixes

This commit is contained in:
Aaron Son
2021-08-06 08:49:53 -07:00
33 changed files with 924 additions and 199 deletions

View File

@@ -68,6 +68,7 @@ jobs:
go build -mod=readonly -o ../.ci_bin/git-dolt ./cmd/git-dolt/.
go build -mod=readonly -o ../.ci_bin/git-dolt-smudge ./cmd/git-dolt-smudge/.
go build -mod=readonly -o ../.ci_bin/remotesrv ./utils/remotesrv/.
go build -mod=readonly -o ../.ci_bin/noms ./store/cmd/noms/.
- name: Setup Dolt Config
run: |
dolt config --global --add user.name 'Dolthub Actions'

View File

@@ -72,6 +72,7 @@ jobs:
go build -mod=readonly -o ../.ci_bin/git-dolt ./cmd/git-dolt/.
go build -mod=readonly -o ../.ci_bin/git-dolt-smudge ./cmd/git-dolt-smudge/.
go build -mod=readonly -o ../.ci_bin/remotesrv ./utils/remotesrv/.
go build -mod=readonly -o ../.ci_bin/noms ./store/cmd/noms/.
- name: Setup Dolt Config
run: |
dolt config --global --add user.name 'Dolthub Actions'

View File

@@ -223,6 +223,8 @@ func (cmd ExportCmd) Exec(ctx context.Context, commandStr string, args []string,
skipped, verr := mvdata.MoveData(ctx, dEnv, mover, exOpts)
cli.PrintErrln()
if skipped > 0 {
cli.PrintErrln(color.YellowString("Lines skipped: %d", skipped))
}

View File

@@ -382,6 +382,8 @@ func (cmd ImportCmd) Exec(ctx context.Context, commandStr string, args []string,
skipped, verr := mvdata.MoveData(ctx, dEnv, mover, mvOpts)
cli.PrintErrln()
if skipped > 0 {
cli.PrintErrln(color.YellowString("Lines skipped: %d", skipped))
}

View File

@@ -18,11 +18,11 @@ require (
github.com/denisbrodbeck/machineid v1.0.1
github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi v0.0.0-20201005193433-3ee972b1d078
github.com/dolthub/fslock v0.0.2
github.com/dolthub/go-mysql-server v0.10.1-0.20210729205204-aca74186be10
github.com/dolthub/go-mysql-server v0.10.1-0.20210805195243-614999856aff
github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
github.com/dolthub/vitess v0.0.0-20210720213737-d3d2404e7683
github.com/dolthub/vitess v0.0.0-20210802203534-01de666843ce
github.com/dustin/go-humanize v1.0.0
github.com/fatih/color v1.9.0
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568

View File

@@ -142,16 +142,16 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dolthub/fslock v0.0.2 h1:8vUh47iKovgrtXNrXVIzsIoWLlspoXg+3nslhUzgKSw=
github.com/dolthub/fslock v0.0.2/go.mod h1:0i7bsNkK+XHwFL3dIsSWeXSV7sykVzzVr6+jq8oeEo0=
github.com/dolthub/go-mysql-server v0.10.1-0.20210729205204-aca74186be10 h1:c/bRO3EW/HnJmYU+hrMXYTrlN/srxP+4iFVoAayJZq8=
github.com/dolthub/go-mysql-server v0.10.1-0.20210729205204-aca74186be10/go.mod h1:+GYveCPU+ONs9xEqvu2PDKpavKGMhqymkQ8cdrcJkYk=
github.com/dolthub/go-mysql-server v0.10.1-0.20210805195243-614999856aff h1:aA9UOgYWpu4oJhYIibxN052rOOr6vvdOkdpU6JliUEk=
github.com/dolthub/go-mysql-server v0.10.1-0.20210805195243-614999856aff/go.mod h1:cPg39xeFH8/+McnJxncb79SgUuREeIqR+eTvxE6OmXc=
github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446 h1:0ol5pj+QlKUKAtqs1LiPM3ZJKs+rHPgLSsMXmhTrCAM=
github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms=
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 h1:WRPDbpJWEnPxPmiuOTndT+lUWUeGjx6eoNOK9O4tQQQ=
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66/go.mod h1:N5ZIbMGuDUpTpOFQ7HcsN6WSIpTGQjHP+Mz27AfmAgk=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
github.com/dolthub/vitess v0.0.0-20210720213737-d3d2404e7683 h1:OPX0jAwe68Ux5WBQSXKdtTSkm+CKIMumnkl0NgFF9TI=
github.com/dolthub/vitess v0.0.0-20210720213737-d3d2404e7683/go.mod h1:hUE8oSk2H5JZnvtlLBhJPYC8WZCA5AoSntdLTcBvdBM=
github.com/dolthub/vitess v0.0.0-20210802203534-01de666843ce h1:NNoKBTOCFRslRQyn0Zko/1Aq1A+bpuktdXVip/dY47w=
github.com/dolthub/vitess v0.0.0-20210802203534-01de666843ce/go.mod h1:hUE8oSk2H5JZnvtlLBhJPYC8WZCA5AoSntdLTcBvdBM=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=

View File

@@ -16,6 +16,7 @@ package dbfactory
import (
"context"
"errors"
"net/url"
"os"
"path/filepath"
@@ -52,19 +53,50 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat,
path = filepath.FromSlash(path)
path = urlObj.Host + path
err = validateDir(path)
if err != nil {
return nil, err
}
newGenSt, err := nbs.NewLocalStore(ctx, nbf.VersionString(), path, defaultMemTableSize)
if err != nil {
return nil, err
}
oldgenPath := filepath.Join(path, "oldgen")
err = validateDir(oldgenPath)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return nil, err
}
err = os.Mkdir(oldgenPath, os.ModePerm)
if err != nil && !errors.Is(err, os.ErrExist) {
return nil, err
}
}
oldGenSt, err := nbs.NewLocalStore(ctx, nbf.VersionString(), oldgenPath, defaultMemTableSize)
if err != nil {
return nil, err
}
st := nbs.NewGenerationalCS(oldGenSt, newGenSt)
// metrics?
return datas.NewDatabase(st), nil
}
func validateDir(path string) error {
info, err := os.Stat(path)
if err != nil {
return nil, err
return err
} else if !info.IsDir() {
return nil, filesys.ErrIsFile
return filesys.ErrIsFile
}
st, err := nbs.NewLocalStore(ctx, nbf.VersionString(), path, defaultMemTableSize)
if err != nil {
return nil, err
}
return datas.NewDatabase(nbs.NewNBSMetricWrapper(st)), nil
return nil
}

View File

@@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"path/filepath"
"strings"
"time"
@@ -1096,55 +1095,39 @@ func (ddb *DoltDB) GC(ctx context.Context, uncommitedVals ...hash.Hash) error {
return err
}
rand.Seed(time.Now().UnixNano())
tmpDatasets := make([]datas.Dataset, len(uncommitedVals))
for i, h := range uncommitedVals {
v, err := ddb.db.ReadValue(ctx, h)
if err != nil {
return err
}
if v == nil {
return fmt.Errorf("empty value for value hash %s", h.String())
datasets, err := ddb.db.Datasets(ctx)
newGen := hash.NewHashSet(uncommitedVals...)
oldGen := make(hash.HashSet)
err = datasets.IterAll(ctx, func(key, value types.Value) error {
keyStr := string(key.(types.String))
h := value.(types.Ref).TargetHash()
var isOldGen bool
switch {
case ref.IsRef(keyStr):
parsed, err := ref.Parse(keyStr)
if err != nil && !errors.Is(err, ref.ErrUnknownRefType) {
return err
}
refType := parsed.GetType()
isOldGen = refType == ref.BranchRefType || refType == ref.RemoteRefType || refType == ref.InternalRefType
}
ds, err := ddb.db.GetDataset(ctx, fmt.Sprintf("tmp/%d", rand.Int63()))
if err != nil {
return err
if isOldGen {
oldGen.Insert(h)
} else {
newGen.Insert(h)
}
r, err := WriteValAndGetRef(ctx, ddb.db, v)
if err != nil {
return err
}
return nil
})
ds, err = ddb.db.CommitValue(ctx, ds, r)
if err != nil {
return err
}
if !ds.HasHead() {
return fmt.Errorf("could not save value %s", h.String())
}
tmpDatasets[i] = ds
}
err = collector.GC(ctx)
if err != nil {
return err
}
for _, ds := range tmpDatasets {
ds, err = ddb.db.Delete(ctx, ds)
if err != nil {
return err
}
if ds.HasHead() {
return fmt.Errorf("unsuccessful delete for dataset %s", ds.ID())
}
}
return nil
return collector.GC(ctx, oldGen, newGen)
}
func (ddb *DoltDB) pruneUnreferencedDatasets(ctx context.Context) error {

View File

@@ -23,31 +23,76 @@ import (
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/cmd/dolt/commands"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/hash"
)
type stage struct {
commands []testCommand
preStageFunc func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, prevRes interface{}) interface{}
}
type gcTest struct {
name string
setup []testCommand
garbage types.Value
query string
expected []sql.Row
name string
stages []stage
query string
expected []sql.Row
postGCFunc func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, prevRes interface{})
}
var gcTests = []gcTest{
{
name: "gc test",
setup: []testCommand{
{commands.SqlCmd{}, []string{"-q", "INSERT INTO test VALUES (0),(1),(2);"}},
stages: []stage{
{
preStageFunc: func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, i interface{}) interface{} {
return nil
},
commands: []testCommand{
{commands.CheckoutCmd{}, []string{"-b", "temp"}},
{commands.SqlCmd{}, []string{"-q", "INSERT INTO test VALUES (0),(1),(2);"}},
{commands.AddCmd{}, []string{"."}},
{commands.CommitCmd{}, []string{"-m", "commit"}},
},
},
{
preStageFunc: func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, i interface{}) interface{} {
cm, err := ddb.ResolveCommitRef(ctx, ref.NewBranchRef("temp"))
require.NoError(t, err)
h, err := cm.HashOf()
require.NoError(t, err)
cs, err := doltdb.NewCommitSpec(h.String())
require.NoError(t, err)
_, err = ddb.Resolve(ctx, cs, nil)
require.NoError(t, err)
return h
},
commands: []testCommand{
{commands.CheckoutCmd{}, []string{"master"}},
{commands.BranchCmd{}, []string{"-D", "temp"}},
{commands.SqlCmd{}, []string{"-q", "INSERT INTO test VALUES (4),(5),(6);"}},
},
},
},
query: "select * from test;",
expected: []sql.Row{{int32(4)}, {int32(5)}, {int32(6)}},
postGCFunc: func(ctx context.Context, t *testing.T, ddb *doltdb.DoltDB, prevRes interface{}) {
h := prevRes.(hash.Hash)
cs, err := doltdb.NewCommitSpec(h.String())
require.NoError(t, err)
_, err = ddb.Resolve(ctx, cs, nil)
require.Error(t, err)
},
garbage: types.String("supercalifragilisticexpialidocious"),
},
}
var gcSetupCommon = []testCommand{
{commands.SqlCmd{}, []string{"-q", "CREATE TABLE test (pk int PRIMARY KEY)"}},
{commands.AddCmd{}, []string{"."}},
{commands.CommitCmd{}, []string{"-m", "created test table"}},
}
func TestGarbageCollection(t *testing.T) {
@@ -59,7 +104,6 @@ func TestGarbageCollection(t *testing.T) {
testGarbageCollection(t, gct)
})
}
}
func testGarbageCollection(t *testing.T, test gcTest) {
@@ -70,24 +114,25 @@ func testGarbageCollection(t *testing.T, test gcTest) {
exitCode := c.cmd.Exec(ctx, c.cmd.Name(), c.args, dEnv)
require.Equal(t, 0, exitCode)
}
for _, c := range test.setup {
exitCode := c.cmd.Exec(ctx, c.cmd.Name(), c.args, dEnv)
require.Equal(t, 0, exitCode)
}
garbageRef, err := dEnv.DoltDB.ValueReadWriter().WriteValue(ctx, test.garbage)
require.NoError(t, err)
val, err := dEnv.DoltDB.ValueReadWriter().ReadValue(ctx, garbageRef.TargetHash())
require.NoError(t, err)
assert.NotNil(t, val)
var res interface{}
for _, stage := range test.stages {
res = stage.preStageFunc(ctx, t, dEnv.DoltDB, res)
for _, c := range stage.commands {
exitCode := c.cmd.Exec(ctx, c.cmd.Name(), c.args, dEnv)
require.Equal(t, 0, exitCode)
}
}
working, err := dEnv.WorkingRoot(ctx)
require.NoError(t, err)
h, err := working.HashOf()
require.NoError(t, err)
// save working root during GC
err = dEnv.DoltDB.GC(ctx, h)
require.NoError(t, err)
test.postGCFunc(ctx, t, dEnv.DoltDB, res)
working, err = dEnv.WorkingRoot(ctx)
require.NoError(t, err)
@@ -95,9 +140,4 @@ func testGarbageCollection(t *testing.T, test gcTest) {
actual, err := sqle.ExecuteSelect(t, dEnv, dEnv.DoltDB, working, test.query)
require.NoError(t, err)
assert.Equal(t, test.expected, actual)
// assert that garbage was collected
val, err = dEnv.DoltDB.ValueReadWriter().ReadValue(ctx, garbageRef.TargetHash())
require.NoError(t, err)
assert.Nil(t, val)
}

View File

@@ -218,13 +218,14 @@ func (te *tableEditorWriteCloser) WriteRow(ctx context.Context, r row.Row) error
te.statsCB(te.stats)
}
if atomic.LoadInt64(&te.gcOps) >= tableWriterGCRate {
gcOps := atomic.AddInt64(&te.gcOps, 1)
if gcOps%tableWriterGCRate == 0 {
atomic.StoreInt64(&te.gcOps, 0)
if err := te.GC(ctx); err != nil {
return err
}
}
_ = atomic.AddInt64(&te.gcOps, 1)
if te.insertOnly {
err := te.tableEditor.InsertRow(ctx, r, nil)

View File

@@ -27,6 +27,7 @@ import (
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)
@@ -205,7 +206,7 @@ func TestJSONStructuralSharing(t *testing.T) {
err = db.Flush(ctx)
require.NoError(t, err)
err = db.(datas.GarbageCollector).GC(ctx)
err = db.(datas.GarbageCollector).GC(ctx, hash.HashSet{}, hash.HashSet{})
require.NoError(t, err)
after := ts.Len()

View File

@@ -14,7 +14,11 @@
package strhelp
import "strconv"
import (
"fmt"
"strconv"
"strings"
)
// NthToken returns the Nth token in s, delimited by delim. There is always at least one token: the zeroth token is the
// input string if delim doesn't occur in s. The second return value will be false if there is no Nth token.
@@ -63,3 +67,19 @@ func CommaIfy(n int64) string {
return result
}
// LineStrBuilder is a utility class for building strings line by line
type LineStrBuilder []string
// AppendLine works like append in that it returns an instance of a LineStrBuilder with the contents updated to contain
// the additional line. lsb = lsb.AppendLine("n: %d, s: %s", n, s)
func (lsb LineStrBuilder) AppendLine(strFmt string, args ...interface{}) LineStrBuilder {
updated := append(lsb, fmt.Sprintf(strFmt, args...))
return updated
}
// String returns the built string with all lines separated by newlines
func (lsb LineStrBuilder) String() string {
s := strings.Join(lsb, "\n")
return s
}

View File

@@ -108,7 +108,13 @@ type ChunkStoreGarbageCollector interface {
// and MarkAndSweepChunks returns, the chunk store will only have the
// chunks sent on |keepChunks| and will have removed all other content
// from the ChunkStore.
MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error
MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest ChunkStore) error
}
// GenerationalCS is an interface supporting the getting old gen and new gen chunk stores
type GenerationalCS interface {
NewGen() ChunkStoreGarbageCollector
OldGen() ChunkStoreGarbageCollector
}
// ChunkStoreVersionGetter is a ChunkStore that supports getting the manifest's

View File

@@ -227,7 +227,11 @@ func (ms *MemoryStoreView) Commit(ctx context.Context, current, last hash.Hash)
return success, nil
}
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error {
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest ChunkStore) error {
if dest != ms {
panic("unsupported")
}
if last != ms.rootHash {
return fmt.Errorf("last does not match ms.Root()")
}

View File

@@ -86,13 +86,13 @@ func (s *TestStoreView) Put(ctx context.Context, c Chunk) error {
return s.ChunkStore.Put(ctx, c)
}
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error {
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest ChunkStore) error {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok {
if !ok || dest != s {
return ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, last, keepChunks)
return collector.MarkAndSweepChunks(ctx, last, keepChunks, collector)
}
func (s *TestStoreView) Reads() int {

View File

@@ -175,7 +175,7 @@ type GarbageCollector interface {
// GC traverses the database starting at the Root and removes
// all unreferenced data from persistent storage.
GC(ctx context.Context) error
GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error
}
// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all

View File

@@ -683,8 +683,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string) error {
}
// GC traverses the database starting at the Root and removes all unreferenced data from persistent storage.
func (db *database) GC(ctx context.Context) error {
return db.ValueStore.GC(ctx)
func (db *database) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error {
return db.ValueStore.GC(ctx, oldGenRefs, newGenRefs)
}
func (db *database) tryCommitChunks(ctx context.Context, currentDatasets types.Map, currentRootHash hash.Hash) error {

View File

@@ -164,3 +164,21 @@ func (hs HashSet) Has(hash Hash) (has bool) {
func (hs HashSet) Remove(hash Hash) {
delete(hs, hash)
}
// Copy returns a copy of the hashset
func (hs HashSet) Copy() HashSet {
copyOf := make(HashSet, len(hs))
for k := range hs {
copyOf[k] = struct{}{}
}
return copyOf
}
// InsertAll inserts all elements of a HashSet into this HashSet
func (hs HashSet) InsertAll(other HashSet) {
for h, _ := range other {
hs[h] = struct{}{}
}
}

View File

@@ -234,6 +234,7 @@ func (fm5 fileManifestV5) UpdateGCGen(ctx context.Context, lastLock addr, newCon
if contents.gcGen == upstream.gcGen {
return errors.New("UpdateGCGen() must update the garbage collection generation")
}
if contents.root != upstream.root {
return errors.New("UpdateGCGen() cannot update the root")
}

View File

@@ -17,6 +17,7 @@ package nbs
import (
"context"
"fmt"
"os"
"path"
"strings"
)
@@ -65,9 +66,9 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, destDir string) ([]tab
}
filepath := path.Join(destDir, filename)
err = gcc.writer.FlushToFile(filepath)
if err != nil {
return nil, err
if gcc.writer.Size() == 0 {
return []tableSpec{}, nil
}
addr, err := parseAddr(filename)
@@ -75,8 +76,21 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, destDir string) ([]tab
return nil, err
}
if info, err := os.Stat(filepath); err == nil {
// file already exists
if gcc.writer.ContentLength() != uint64(info.Size()) {
return nil, fmt.Errorf("'%s' already exists with different contents.", filepath)
}
} else {
// file does not exist or error determining if it existed. Try to create it.
err = gcc.writer.FlushToFile(filepath)
if err != nil {
return nil, err
}
}
return []tableSpec{
tableSpec{
{
name: addr,
chunkCount: gcc.writer.ChunkCount(),
},

View File

@@ -0,0 +1,297 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"context"
"io"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
var _ chunks.ChunkStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ TableFileStore = (*GenerationalNBS)(nil)
type GenerationalNBS struct {
oldGen *NomsBlockStore
newGen *NomsBlockStore
}
func NewGenerationalCS(oldGen, newGen *NomsBlockStore) *GenerationalNBS {
if oldGen.Version() != newGen.Version() {
panic("oldgen and newgen chunkstore versions vary")
}
return &GenerationalNBS{
oldGen: oldGen,
newGen: newGen,
}
}
func (gcs *GenerationalNBS) NewGen() chunks.ChunkStoreGarbageCollector {
return gcs.newGen
}
func (gcs *GenerationalNBS) OldGen() chunks.ChunkStoreGarbageCollector {
return gcs.oldGen
}
// Get the Chunk for the value of the hash in the store. If the hash is absent from the store EmptyChunk is returned.
func (gcs *GenerationalNBS) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) {
c, err := gcs.oldGen.Get(ctx, h)
if err != nil {
return chunks.EmptyChunk, err
}
if c.IsEmpty() {
return gcs.newGen.Get(ctx, h)
}
return c, nil
}
// GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks
// which have been found. Any non-present chunks will silently be ignored.
func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, found func(*chunks.Chunk)) error {
notInOldGen := hashes.Copy()
err := gcs.oldGen.GetMany(ctx, hashes, func(chunk *chunks.Chunk) {
delete(notInOldGen, chunk.Hash())
found(chunk)
})
if err != nil {
return err
}
if len(notInOldGen) == 0 {
return nil
}
return gcs.newGen.GetMany(ctx, notInOldGen, found)
}
func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(CompressedChunk)) error {
notInOldGen := hashes.Copy()
err := gcs.oldGen.GetManyCompressed(ctx, hashes, func(chunk CompressedChunk) {
delete(notInOldGen, chunk.Hash())
found(chunk)
})
if err != nil {
return err
}
if len(notInOldGen) == 0 {
return nil
}
return gcs.newGen.GetManyCompressed(ctx, notInOldGen, found)
}
// Returns true iff the value at the address |h| is contained in the
// store
func (gcs *GenerationalNBS) Has(ctx context.Context, h hash.Hash) (bool, error) {
has, err := gcs.oldGen.Has(ctx, h)
if err != nil {
return false, err
}
if has {
return true, nil
}
return gcs.newGen.Has(ctx, h)
}
// Returns a new HashSet containing any members of |hashes| that are
// absent from the store.
func (gcs *GenerationalNBS) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) {
notInOldGen, err := gcs.oldGen.HasMany(ctx, hashes)
if err != nil {
return nil, err
}
if len(notInOldGen) == 0 {
return notInOldGen, nil
}
return gcs.newGen.HasMany(ctx, notInOldGen)
}
// Put caches c in the ChunkSource. Upon return, c must be visible to
// subsequent Get and Has calls, but must not be persistent until a call
// to Flush(). Put may be called concurrently with other calls to Put(),
// Get(), GetMany(), Has() and HasMany().
func (gcs *GenerationalNBS) Put(ctx context.Context, c chunks.Chunk) error {
return gcs.newGen.Put(ctx, c)
}
// Returns the NomsVersion with which this ChunkSource is compatible.
func (gcs *GenerationalNBS) Version() string {
return gcs.newGen.Version()
}
// Rebase brings this ChunkStore into sync with the persistent storage's
// current root.
func (gcs *GenerationalNBS) Rebase(ctx context.Context) error {
oErr := gcs.oldGen.Rebase(ctx)
nErr := gcs.newGen.Rebase(ctx)
if oErr != nil {
return oErr
}
return nErr
}
// Root returns the root of the database as of the time the ChunkStore
// was opened or the most recent call to Rebase.
func (gcs *GenerationalNBS) Root(ctx context.Context) (hash.Hash, error) {
return gcs.newGen.Root(ctx)
}
// Commit atomically attempts to persist all novel Chunks and update the
// persisted root hash from last to current (or keeps it the same).
// If last doesn't match the root in persistent storage, returns false.
func (gcs *GenerationalNBS) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
return gcs.newGen.Commit(ctx, current, last)
}
// Stats may return some kind of struct that reports statistics about the
// ChunkStore instance. The type is implementation-dependent, and impls
// may return nil
func (gcs *GenerationalNBS) Stats() interface{} {
return nil
}
// StatsSummary may return a string containing summarized statistics for
// this ChunkStore. It must return "Unsupported" if this operation is not
// supported.
func (gcs *GenerationalNBS) StatsSummary() string {
return ""
}
// Close tears down any resources in use by the implementation. After // Close(), the ChunkStore may not be used again. It is NOT SAFE to call
// Close() concurrently with any other ChunkStore method; behavior is
// undefined and probably crashy.
func (gcs *GenerationalNBS) Close() error {
oErr := gcs.oldGen.Close()
nErr := gcs.newGen.Close()
if oErr != nil {
return oErr
}
return nErr
}
func (gcs *GenerationalNBS) copyToOldGen(ctx context.Context, hashes hash.HashSet) error {
notInOldGen, err := gcs.oldGen.HasMany(ctx, hashes)
if err != nil {
return err
}
var putErr error
err = gcs.newGen.GetMany(ctx, notInOldGen, func(chunk *chunks.Chunk) {
if putErr == nil {
putErr = gcs.oldGen.Put(ctx, *chunk)
}
})
if putErr != nil {
return putErr
}
return err
}
// Sources retrieves the current root hash, a list of all the table files (which may include appendix table files),
// and a second list containing only appendix table files for both the old gen and new gen stores.
func (gcs *GenerationalNBS) Sources(ctx context.Context) (hash.Hash, []TableFile, []TableFile, error) {
_, tFiles, appFiles, err := gcs.oldGen.Sources(ctx)
if err != nil {
return hash.Hash{}, nil, nil, err
}
newRoot, newTFiles, newAppFiles, err := gcs.newGen.Sources(ctx)
if err != nil {
return hash.Hash{}, nil, nil, err
}
for _, tf := range newTFiles {
tFiles = append(tFiles, tf)
}
for _, tf := range newAppFiles {
appFiles = append(appFiles, tf)
}
return newRoot, tFiles, appFiles, nil
}
// Size returns the total size, in bytes, of the table files in the new and old gen stores combined
func (gcs *GenerationalNBS) Size(ctx context.Context) (uint64, error) {
oldSize, err := gcs.oldGen.Size(ctx)
if err != nil {
return 0, err
}
newSize, err := gcs.newGen.Size(ctx)
if err != nil {
return 0, err
}
return oldSize + newSize, nil
}
// WriteTableFile will read a table file from the provided reader and write it to the new gen TableFileStore
func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
return gcs.newGen.WriteTableFile(ctx, fileId, numChunks, rd, contentLength, contentHash)
}
// AddTableFilesToManifest adds table files to the manifest of the newgen cs
func (gcs *GenerationalNBS) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error {
return gcs.newGen.AddTableFilesToManifest(ctx, fileIdToNumChunks)
}
// PruneTableFiles deletes old table files that are no longer referenced in the manifest of the new or old gen chunkstores
func (gcs *GenerationalNBS) PruneTableFiles(ctx context.Context) error {
err := gcs.oldGen.PruneTableFiles(ctx)
if err != nil {
return err
}
return gcs.newGen.PruneTableFiles(ctx)
}
// SetRootChunk changes the root chunk hash from the previous value to the new root for the newgen cs
func (gcs *GenerationalNBS) SetRootChunk(ctx context.Context, root, previous hash.Hash) error {
return gcs.newGen.SetRootChunk(ctx, root, previous)
}
// SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing.
func (gcs *GenerationalNBS) SupportedOperations() TableFileStoreOps {
return gcs.newGen.SupportedOperations()
}

View File

@@ -0,0 +1,173 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"context"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
var randGen = rand.New(rand.NewSource(0))
func genChunks(t *testing.T, count int, max int) []chunks.Chunk {
chnks := make([]chunks.Chunk, count)
for i := 0; i < count; i++ {
bytes := make([]byte, randGen.Int()%max)
n, err := randGen.Read(bytes)
require.NoError(t, err)
chnks[i] = chunks.NewChunk(bytes[:n])
}
return chnks
}
func mergeMaps(m1, m2 map[int]bool) map[int]bool {
m3 := make(map[int]bool)
for k := range m1 {
m3[k] = true
}
for k := range m2 {
m3[k] = true
}
return m3
}
func hashesForChunks(chunks []chunks.Chunk, indexes map[int]bool) hash.HashSet {
hashes := make(hash.HashSet)
for idx := range indexes {
hashes[chunks[idx].Hash()] = struct{}{}
}
return hashes
}
type foundHashes hash.HashSet
func (fh foundHashes) found(chk *chunks.Chunk) {
fh[chk.Hash()] = struct{}{}
}
func requireChunks(t *testing.T, ctx context.Context, chunks []chunks.Chunk, genCS *GenerationalNBS, inOld, inNew map[int]bool) {
// Has/Get Checks
for i, chk := range chunks {
has, err := genCS.oldGen.Has(ctx, chk.Hash())
require.NoError(t, err)
require.Equal(t, inOld[i], has, "error for index: %d", i)
retrieved, err := genCS.oldGen.Get(ctx, chk.Hash())
require.NoError(t, err)
require.Equal(t, !inOld[i], retrieved.IsEmpty(), "error for index: %d", i)
has, err = genCS.newGen.Has(ctx, chk.Hash())
require.NoError(t, err)
require.Equal(t, inNew[i], has, "error for index: %d", i)
retrieved, err = genCS.newGen.Get(ctx, chk.Hash())
require.NoError(t, err)
require.Equal(t, !inNew[i], retrieved.IsEmpty(), "error for index: %d", i)
has, err = genCS.Has(ctx, chk.Hash())
require.NoError(t, err)
require.Equal(t, inOld[i] || inNew[i], has, "error for index: %d", i)
retrieved, err = genCS.Get(ctx, chk.Hash())
require.NoError(t, err)
require.Equal(t, !(inOld[i] || inNew[i]), retrieved.IsEmpty(), "error for index: %d", i)
}
// HasMany Checks
absent, err := genCS.oldGen.HasMany(ctx, hashesForChunks(chunks, inOld))
require.NoError(t, err)
require.Len(t, absent, 0)
absent, err = genCS.newGen.HasMany(ctx, hashesForChunks(chunks, inNew))
require.NoError(t, err)
require.Len(t, absent, 0)
inUnion := mergeMaps(inOld, inNew)
absent, err = genCS.HasMany(ctx, hashesForChunks(chunks, inUnion))
require.NoError(t, err)
require.Len(t, absent, 0)
// GetMany Checks
expected := hashesForChunks(chunks, inOld)
received := foundHashes{}
err = genCS.oldGen.GetMany(ctx, expected, received.found)
require.NoError(t, err)
require.Equal(t, expected, hash.HashSet(received))
expected = hashesForChunks(chunks, inNew)
received = foundHashes{}
err = genCS.newGen.GetMany(ctx, expected, received.found)
require.NoError(t, err)
require.Equal(t, expected, hash.HashSet(received))
expected = hashesForChunks(chunks, inUnion)
received = foundHashes{}
err = genCS.GetMany(ctx, expected, received.found)
require.NoError(t, err)
require.Equal(t, expected, hash.HashSet(received))
}
func putChunks(t *testing.T, ctx context.Context, chunks []chunks.Chunk, cs chunks.ChunkStore, indexesIn map[int]bool, chunkIndexes ...int) {
for _, idx := range chunkIndexes {
err := cs.Put(ctx, chunks[idx])
require.NoError(t, err)
indexesIn[idx] = true
}
}
func TestGenerationalCS(t *testing.T) {
ctx := context.Background()
oldGen, _ := makeTestLocalStore(t, 64)
newGen, _ := makeTestLocalStore(t, 64)
inOld := make(map[int]bool)
inNew := make(map[int]bool)
chnks := genChunks(t, 100, 1000)
putChunks(t, ctx, chnks, oldGen, inOld, 0, 1, 2, 3, 4)
cs := NewGenerationalCS(oldGen, newGen)
requireChunks(t, ctx, chnks, cs, inOld, inNew)
putChunks(t, ctx, chnks, cs, inNew, 6, 7, 8, 9)
requireChunks(t, ctx, chnks, cs, inOld, inNew)
err := cs.copyToOldGen(ctx, hashesForChunks(chnks, inNew))
require.NoError(t, err)
inOld = mergeMaps(inOld, inNew)
requireChunks(t, ctx, chnks, cs, inOld, inNew)
putChunks(t, ctx, chnks, cs, inNew, 10, 11, 12, 13, 14)
requireChunks(t, ctx, chnks, cs, inOld, inNew)
err = cs.copyToOldGen(ctx, hashesForChunks(chnks, inNew))
require.NoError(t, err)
inOld = mergeMaps(inOld, inNew)
requireChunks(t, ctx, chnks, cs, inOld, inNew)
putChunks(t, ctx, chnks, cs, inNew, 15, 16, 17, 18, 19)
requireChunks(t, ctx, chnks, cs, inOld, inNew)
}

View File

@@ -464,6 +464,15 @@ func (ts tableSpec) GetChunkCount() uint32 {
return ts.chunkCount
}
func tableSpecsToMap(specs []tableSpec) map[string]int {
m := make(map[string]int)
for _, spec := range specs {
m[spec.name.String()] = int(spec.chunkCount)
}
return m
}
func parseSpecs(tableInfo []string) ([]tableSpec, error) {
specs := make([]tableSpec, len(tableInfo)/2)
for i := range specs {

View File

@@ -71,8 +71,8 @@ func (nbsMW *NBSMetricWrapper) SupportedOperations() TableFileStoreOps {
return nbsMW.nbs.SupportedOperations()
}
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error {
return nbsMW.nbs.MarkAndSweepChunks(ctx, last, keepChunks)
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest chunks.ChunkStore) error {
return nbsMW.nbs.MarkAndSweepChunks(ctx, last, keepChunks, dest)
}
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.

View File

@@ -1314,6 +1314,7 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, nu
// AddTableFilesToManifest adds table files to the manifest
func (nbs *NomsBlockStore) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error {
var totalChunks int
fileIdHashToNumChunks := make(map[hash.Hash]uint32)
for fileId, numChunks := range fileIdToNumChunks {
fileIdHash, ok := hash.MaybeParse(fileId)
@@ -1323,6 +1324,11 @@ func (nbs *NomsBlockStore) AddTableFilesToManifest(ctx context.Context, fileIdTo
}
fileIdHashToNumChunks[fileIdHash] = uint32(numChunks)
totalChunks += numChunks
}
if totalChunks == 0 {
return nil
}
_, err := nbs.UpdateManifest(ctx, fileIdHashToNumChunks)
@@ -1400,7 +1406,7 @@ func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) {
return nbs.p.PruneTableFiles(ctx, contents)
}
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error {
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest chunks.ChunkStore) error {
ops := nbs.SupportedOperations()
if !ops.CanGC || !ops.CanPrune {
return chunks.ErrUnsupportedOperation
@@ -1427,7 +1433,17 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
return err
}
specs, err := nbs.copyMarkedChunks(ctx, keepChunks)
destNBS := nbs
if dest != nil {
switch typed := dest.(type) {
case *NomsBlockStore:
destNBS = typed
case NBSMetricWrapper:
destNBS = typed.nbs
}
}
specs, err := nbs.copyMarkedChunks(ctx, keepChunks, destNBS)
if err != nil {
return err
}
@@ -1435,24 +1451,35 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
return ctx.Err()
}
err = nbs.swapTables(ctx, specs)
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
if destNBS == nbs {
err = nbs.swapTables(ctx, specs)
if err != nil {
return err
}
currentContents := func() manifestContents {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
return nbs.upstream
}()
if ctx.Err() != nil {
return ctx.Err()
}
return nbs.p.PruneTableFiles(ctx, currentContents)
currentContents := func() manifestContents {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
return nbs.upstream
}()
return nbs.p.PruneTableFiles(ctx, currentContents)
} else {
fileIdToNumChunks := tableSpecsToMap(specs)
err = destNBS.AddTableFilesToManifest(ctx, fileIdToNumChunks)
if err != nil {
return err
}
return nil
}
}
func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash) ([]tableSpec, error) {
func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, dest *NomsBlockStore) ([]tableSpec, error) {
gcc, err := newGarbageCollectionCopier()
if err != nil {
return nil, err
@@ -1487,7 +1514,7 @@ LOOP:
}
}
nomsDir := nbs.p.(*fsTablePersister).dir
nomsDir := dest.p.(*fsTablePersister).dir
return gcc.copyTablesToDir(ctx, nomsDir)
}
@@ -1530,6 +1557,11 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
specs: specs,
}
// nothing has changed. Bail early
if newContents.gcGen == nbs.upstream.gcGen {
return nil
}
upstream, uerr := nbs.mm.UpdateGCGen(ctx, nbs.upstream.lock, newContents, nbs.stats, nil)
if uerr != nil {
return uerr

View File

@@ -263,7 +263,7 @@ func TestNBSCopyGC(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
msErr = st.MarkAndSweepChunks(ctx, r, keepChan)
msErr = st.MarkAndSweepChunks(ctx, r, keepChan, nil)
wg.Done()
}()
for h := range keepers {

View File

@@ -116,6 +116,28 @@ func (w *parallelRefWalker) GetRefs(visited hash.HashSet, vals ValueSlice) ([]ha
return res, nil
}
func (w *parallelRefWalker) GetRefSet(visited hash.HashSet, vals ValueSlice) (hash.HashSet, error) {
res := make(hash.HashSet)
numSent, resCh, err := w.sendAllWork(vals)
if err != nil {
return nil, err
}
for i := 0; i < numSent; i++ {
select {
case b := <-resCh:
for _, r := range b {
if !visited.Has(r) {
res[r] = struct{}{}
visited.Insert(r)
}
}
case <-w.ctx.Done():
return nil, w.ctx.Err()
}
}
return res, nil
}
func (w *parallelRefWalker) Close() error {
close(w.work)
return w.eg.Wait()

View File

@@ -35,6 +35,12 @@ import (
"github.com/dolthub/dolt/go/store/util/sizecache"
)
type HashFilterFunc func(context.Context, hash.HashSet) (hash.HashSet, error)
func unfilteredHashFunc(_ context.Context, hs hash.HashSet) (hash.HashSet, error) {
return hs, nil
}
// ValueReader is an interface that knows how to read Noms Values, e.g.
// datas/Database. Required to avoid import cycle between this package and the
// package that implements Value reading.
@@ -551,12 +557,43 @@ func (lvs *ValueStore) Commit(ctx context.Context, current, last hash.Hash) (boo
}()
}
// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore
func (lvs *ValueStore) GC(ctx context.Context) error {
collector, ok := lvs.cs.(chunks.ChunkStoreGarbageCollector)
func makeBatches(hss []hash.HashSet, count int) [][]hash.Hash {
const maxBatchSize = 16384
if !ok {
return chunks.ErrUnsupportedOperation
buffer := make([]hash.Hash, count)
i := 0
for _, hs := range hss {
for h := range hs {
buffer[i] = h
i++
}
}
numBatches := (count + (maxBatchSize - 1)) / maxBatchSize
batchSize := count / numBatches
res := make([][]hash.Hash, numBatches)
for i := 0; i < numBatches; i++ {
if i != numBatches-1 {
res[i] = buffer[i*batchSize : (i+1)*batchSize]
} else {
res[i] = buffer[i*batchSize:]
}
}
return res
}
func (lvs *ValueStore) numBuffChunks() int {
lvs.bufferMu.RLock()
defer lvs.bufferMu.RUnlock()
return len(lvs.bufferedChunks)
}
// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore
func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error {
if lvs.numBuffChunks() > 0 {
return errors.New("invalid GC state; bufferedChunks must be empty.")
}
err := func() error {
@@ -583,16 +620,39 @@ func (lvs *ValueStore) GC(ctx context.Context) error {
if err != nil {
return err
}
if rootVal == nil {
// empty root
return nil
}
newGenRefs.Insert(root)
if gcs, ok := lvs.cs.(chunks.GenerationalCS); ok {
oldGen := gcs.OldGen()
newGen := gcs.NewGen()
err = lvs.gc(ctx, root, oldGenRefs, oldGen.HasMany, newGen, oldGen)
if err != nil {
return err
}
return lvs.gc(ctx, root, newGenRefs, oldGen.HasMany, newGen, newGen)
} else if collector, ok := lvs.cs.(chunks.ChunkStoreGarbageCollector); ok {
if len(oldGenRefs) > 0 {
newGenRefs.InsertAll(oldGenRefs)
}
return lvs.gc(ctx, root, newGenRefs, unfilteredHashFunc, collector, collector)
} else {
return chunks.ErrUnsupportedOperation
}
}
func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.HashSet, hashFilter HashFilterFunc, src, dest chunks.ChunkStoreGarbageCollector) error {
keepChunks := make(chan []hash.Hash, gcBuffSize)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return collector.MarkAndSweepChunks(ctx, root, keepChunks)
return src.MarkAndSweepChunks(ctx, root, keepChunks, dest)
})
keepHashes := func(hs []hash.Hash) error {
@@ -603,20 +663,6 @@ func (lvs *ValueStore) GC(ctx context.Context) error {
return ctx.Err()
}
}
const batchSize = 16384
batches := func(hss [][]hash.Hash) [][]hash.Hash {
var res [][]hash.Hash
for _, hs := range hss {
i := 0
for ; i+batchSize < len(hs); i += batchSize {
res = append(res, hs[i:i+batchSize])
}
if i < len(hs) {
res = append(res, hs[i:])
}
}
return res
}
concurrency := runtime.GOMAXPROCS(0) - 1
if concurrency < 1 {
@@ -625,51 +671,74 @@ func (lvs *ValueStore) GC(ctx context.Context) error {
walker := newParallelRefWalker(ctx, lvs.nbf, concurrency)
eg.Go(func() error {
toVisitCount := 1
toVisit := [][]hash.Hash{{root}}
visited := hash.NewHashSet(root)
for toVisitCount > 0 {
batches := batches(toVisit)
toVisit = make([][]hash.Hash, len(batches))
toVisitCount = 0
for i, batch := range batches {
if err := keepHashes(batch); err != nil {
return err
}
vals, err := lvs.ReadManyValues(ctx, batch)
if err != nil {
return err
}
if len(vals) != len(batch) {
return errors.New("dangling reference found in chunk store")
}
hashes, err := walker.GetRefs(visited, vals)
if err != nil {
return err
}
toVisit[i] = hashes
toVisitCount += len(hashes)
}
}
walker.Close()
defer func() {
close(keepChunks)
_ = walker.Close()
}()
lvs.bufferMu.Lock()
defer lvs.bufferMu.Unlock()
if len(lvs.bufferedChunks) > 0 {
return errors.New("invalid GC state; bufferedChunks started empty and was not empty at end of run.")
}
lvs.decodedChunks = sizecache.New(lvs.decodedChunks.Size())
lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize)
lvs.bufferedChunkSize = 0
lvs.withBufferedChildren = map[hash.Hash]uint64{}
close(keepChunks)
return nil
visited := toVisit.Copy()
return lvs.gcProcessRefs(ctx, visited, []hash.HashSet{toVisit}, keepHashes, walker, hashFilter)
})
err := eg.Wait()
if err != nil {
return err
}
if lvs.numBuffChunks() > 0 {
return errors.New("invalid GC state; bufferedChunks started empty and was not empty at end of run.")
}
// purge the cache
lvs.decodedChunks = sizecache.New(lvs.decodedChunks.Size())
lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize)
lvs.bufferedChunkSize = 0
lvs.withBufferedChildren = map[hash.Hash]uint64{}
return eg.Wait()
}
func (lvs *ValueStore) gcProcessRefs(ctx context.Context, visited hash.HashSet, toVisit []hash.HashSet, keepHashes func(hs []hash.Hash) error, walker *parallelRefWalker, hashFilter HashFilterFunc) error {
if len(toVisit) != 1 {
panic("Must be one initial hashset to visit")
}
toVisitCount := len(toVisit[0])
for toVisitCount > 0 {
batches := makeBatches(toVisit, toVisitCount)
toVisit = make([]hash.HashSet, len(batches))
toVisitCount = 0
for i, batch := range batches {
if err := keepHashes(batch); err != nil {
return err
}
vals, err := lvs.ReadManyValues(ctx, batch)
if err != nil {
return err
}
if len(vals) != len(batch) {
return errors.New("dangling reference found in chunk store")
}
hashes, err := walker.GetRefSet(visited, vals)
if err != nil {
return err
}
// continue processing
hashes, err = hashFilter(ctx, hashes)
if err != nil {
return err
}
toVisit[i] = hashes
toVisitCount += len(hashes)
}
}
return nil
}
// Close closes the underlying ChunkStore
func (lvs *ValueStore) Close() error {
return lvs.cs.Close()

View File

@@ -379,7 +379,7 @@ func TestGC(t *testing.T) {
require.NoError(t, err)
assert.NotNil(v2)
err = vs.GC(ctx)
err = vs.GC(ctx, hash.HashSet{}, hash.HashSet{})
require.NoError(t, err)
v1, err = vs.ReadValue(ctx, h1) // non-nil

View File

@@ -11,26 +11,22 @@ teardown() {
}
@test "branch: deleting a branch deletes its working set" {
dolt gc
dolt checkout -b to_delete
dolt sql -q 'create table test (id int primary key);'
values=""
for i in `seq 0 1024`; do
values="$values""${values:+,}""($i)"
done
dolt sql -q 'insert into test values '"$values"';'
dolt add .
dolt commit -m 'making a new commit'
dolt gc
with_values_sz=`du -s | awk '{print $1}'`
root=$(noms root .dolt/noms)
run noms show .dolt/noms::#$root
[[ "$show_tables" -eq 0 ]] || false
echo $output
[[ "$output" =~ "workingSets/heads/master" ]] || false
[[ "$output" =~ "workingSets/heads/to_delete" ]] || false
dolt checkout master
dolt branch -d -f to_delete
num_branches=`dolt branch | wc -l`
[[ "$num_branches" -eq 1 ]] || fail "expected num_branches to be 1"
dolt gc
without_values_sz=`du -s | awk '{print $1}'`
echo "$sz $new_sz $post_delete_sz"
[[ "$without_values_sz" -lt "$with_values_sz" ]] || false
root=$(noms root .dolt/noms)
run noms show .dolt/noms::#$root
[[ "$show_tables" -eq 0 ]] || false
[[ ! "$output" =~ "to_delete" ]] || false
}
@test "branch: moving current working branch takes its working set" {

View File

@@ -116,7 +116,7 @@ SQL
# leave data in the working set
dolt sql -q "INSERT INTO test VALUES (11),(12),(13),(14),(15);"
BEFORE=$(du .dolt/noms/ | sed 's/[^0-9]*//g')
BEFORE=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g')
run dolt gc
[ "$status" -eq 0 ]
@@ -125,7 +125,7 @@ SQL
[ "$status" -eq 0 ]
[[ "$output" =~ "80" ]] || false
AFTER=$(du .dolt/noms/ | sed 's/[^0-9]*//g')
AFTER=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g')
# assert space was reclaimed
echo "$BEFORE"

View File

@@ -539,9 +539,9 @@ DELIM
[ "$status" -eq 0 ]
# assert that we already collected garbage
BEFORE=$(du .dolt/noms/ | sed 's/[^0-9]*//g')
BEFORE=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g')
dolt gc
AFTER=$(du .dolt/noms/ | sed 's/[^0-9]*//g')
AFTER=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g')
# less than 10% smaller
[ "$BEFORE" -lt $(($AFTER * 11 / 10)) ]

View File

@@ -229,8 +229,9 @@ DELIM
# Output to a file from the error stderr
dolt sql -q "DELETE FROM test WHERE pk = 1"
dolt table import -u --continue test 1pk5col-rpt-ints.csv 2> skipped.csv
run cat skipped.csv
run dolt table import -u --continue test 1pk5col-rpt-ints.csv
echo $output
[ "$status" -eq 0 ]
[[ "$output" =~ "The following rows were skipped:" ]] || false
[[ "$output" =~ "1,1,2,3,4,7" ]] || false
[[ "$output" =~ "1,1,2,3,4,8" ]] || false