mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-29 03:06:35 -05:00
Merge pull request #892 from dolthub/andy/gc-table-files
Andy/gc table files
This commit is contained in:
@@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env bats
|
||||
load $BATS_TEST_DIRNAME/helper/common.bash
|
||||
|
||||
setup() {
|
||||
setup_common
|
||||
}
|
||||
|
||||
teardown() {
|
||||
teardown_common
|
||||
}
|
||||
|
||||
@test "dolt gc smoke test" {
|
||||
run dolt gc
|
||||
[ "$status" -eq "0" ]
|
||||
run dolt status
|
||||
[ "$status" -eq "0" ]
|
||||
|
||||
dolt sql <<SQL
|
||||
CREATE TABLE test (pk int PRIMARY KEY);
|
||||
INSERT INTO test VALUES
|
||||
(1),(2),(3),(4),(5);
|
||||
SQL
|
||||
|
||||
run dolt gc
|
||||
[ "$status" -eq "0" ]
|
||||
run dolt status
|
||||
[ "$status" -eq "0" ]
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
// Copyright 2019 Liquidata, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package commands
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/dolthub/dolt/go/cmd/dolt/cli"
|
||||
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/argparser"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
)
|
||||
|
||||
var gcDocs = cli.CommandDocumentationContent{
|
||||
ShortDesc: "",
|
||||
LongDesc: ``,
|
||||
Synopsis: []string{},
|
||||
}
|
||||
|
||||
type GarbageCollectionCmd struct{}
|
||||
|
||||
// Name is returns the name of the Dolt cli command. This is what is used on the command line to invoke the command
|
||||
func (cmd GarbageCollectionCmd) Name() string {
|
||||
return "gc"
|
||||
}
|
||||
|
||||
// Description returns a description of the command
|
||||
func (cmd GarbageCollectionCmd) Description() string {
|
||||
return "Cleans up unreferenced data from the database."
|
||||
}
|
||||
|
||||
// Hidden should return true if this command should be hidden from the help text
|
||||
func (cmd GarbageCollectionCmd) Hidden() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// RequiresRepo should return false if this interface is implemented, and the command does not have the requirement
|
||||
// that it be run from within a data repository directory
|
||||
func (cmd GarbageCollectionCmd) RequiresRepo() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// CreateMarkdown creates a markdown file containing the helptext for the command at the given path
|
||||
func (cmd GarbageCollectionCmd) CreateMarkdown(fs filesys.Filesys, path, commandStr string) error {
|
||||
ap := cmd.createArgParser()
|
||||
return CreateMarkdown(fs, path, cli.GetCommandDocumentation(commandStr, gcDocs, ap))
|
||||
}
|
||||
|
||||
func (cmd GarbageCollectionCmd) createArgParser() *argparser.ArgParser {
|
||||
ap := argparser.NewArgParser()
|
||||
return ap
|
||||
}
|
||||
|
||||
// Version displays the version of the running dolt client
|
||||
// Exec executes the command
|
||||
func (cmd GarbageCollectionCmd) Exec(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
|
||||
ap := cmd.createArgParser()
|
||||
_, usage := cli.HelpAndUsagePrinters(cli.GetCommandDocumentation(commandStr, lsDocs, ap))
|
||||
|
||||
var verr errhand.VerboseError
|
||||
|
||||
db, ok := dEnv.DoltDB.ValueReadWriter().(datas.Database)
|
||||
if !ok {
|
||||
verr = errhand.BuildDError("this database does not support garbage collection").Build()
|
||||
}
|
||||
|
||||
err := datas.PruneTableFiles(ctx, db)
|
||||
|
||||
if err != nil {
|
||||
verr = errhand.BuildDError("an error occurred during garbage collection").AddCause(err).Build()
|
||||
}
|
||||
|
||||
return HandleVErrAndExitCode(verr, usage)
|
||||
}
|
||||
@@ -79,6 +79,7 @@ var doltCommand = cli.NewSubCommandHandler("dolt", "it's git for data", []cli.Co
|
||||
commands.MigrateCmd{},
|
||||
indexcmds.Commands,
|
||||
commands.ReadTablesCmd{},
|
||||
commands.GarbageCollectionCmd{},
|
||||
})
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -83,6 +83,8 @@ type DoltChunkStore struct {
|
||||
httpFetcher HTTPFetcher
|
||||
}
|
||||
|
||||
var _ nbs.TableFileStore = &DoltChunkStore{}
|
||||
|
||||
func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error) {
|
||||
tokens := strings.Split(strings.Trim(path, "/"), "/")
|
||||
if len(tokens) != 2 {
|
||||
@@ -876,6 +878,7 @@ func (dcs *DoltChunkStore) SupportedOperations() nbs.TableFileStoreOps {
|
||||
return nbs.TableFileStoreOps{
|
||||
CanRead: true,
|
||||
CanWrite: true,
|
||||
CanPrune: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -943,6 +946,11 @@ func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, nu
|
||||
return nil
|
||||
}
|
||||
|
||||
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
|
||||
func (dcs *DoltChunkStore) PruneTableFiles(ctx context.Context) error {
|
||||
return nbs.ErrUnsupportedOperation
|
||||
}
|
||||
|
||||
// Sources retrieves the current root hash, and a list of all the table files
|
||||
func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, error) {
|
||||
req := &remotesapi.ListTableFilesRequest{RepoId: dcs.getRepoId()}
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
// Copyright 2020 Liquidata, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
)
|
||||
|
||||
func PruneTableFiles(ctx context.Context, db Database) error {
|
||||
tfs, ok := db.chunkStore().(nbs.TableFileStore)
|
||||
|
||||
if !ok {
|
||||
return nbs.ErrUnsupportedOperation
|
||||
}
|
||||
|
||||
return tfs.PruneTableFiles(ctx)
|
||||
}
|
||||
@@ -412,6 +412,8 @@ type TestTableFileStore struct {
|
||||
tableFiles map[string]*TestTableFile
|
||||
}
|
||||
|
||||
var _ nbs.TableFileStore = &TestTableFileStore{}
|
||||
|
||||
func (ttfs *TestTableFileStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, error) {
|
||||
var tblFiles []nbs.TableFile
|
||||
for _, tblFile := range ttfs.tableFiles {
|
||||
@@ -452,6 +454,10 @@ func (ttfs *TestTableFileStore) SupportedOperations() nbs.TableFileStoreOps {
|
||||
}
|
||||
}
|
||||
|
||||
func (ttfs *TestTableFileStore) PruneTableFiles(ctx context.Context) error {
|
||||
return nbs.ErrUnsupportedOperation
|
||||
}
|
||||
|
||||
func TestClone(t *testing.T) {
|
||||
hashBytes := [hash.ByteLen]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13}
|
||||
src := &TestTableFileStore{
|
||||
|
||||
@@ -603,3 +603,7 @@ func (s3p awsTablePersister) uploadPart(ctx context.Context, data []byte, key, u
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error {
|
||||
return ErrUnsupportedOperation
|
||||
}
|
||||
|
||||
@@ -149,3 +149,7 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch
|
||||
|
||||
return &chunkSourceAdapter{newTableReader(index, tra, s3BlockSize), name}, nil
|
||||
}
|
||||
|
||||
func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error {
|
||||
return ErrUnsupportedOperation
|
||||
}
|
||||
|
||||
@@ -197,7 +197,7 @@ func parseManifest(r io.Reader) (manifestContents, error) {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
|
||||
ad, err := parseAddr([]byte(slices[2]))
|
||||
ad, err := parseAddr(slices[2])
|
||||
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
|
||||
@@ -25,9 +25,13 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/util/tempfiles"
|
||||
|
||||
@@ -36,6 +40,27 @@ import (
|
||||
|
||||
const tempTablePrefix = "nbs_table_"
|
||||
|
||||
type gcErrAccum map[string]error
|
||||
|
||||
var _ error = gcErrAccum{}
|
||||
|
||||
func (ea gcErrAccum) add(path string, err error) {
|
||||
ea[path] = err
|
||||
}
|
||||
|
||||
func (ea gcErrAccum) isEmpty() bool {
|
||||
return len(ea) == 0
|
||||
}
|
||||
|
||||
func (ea gcErrAccum) Error() string {
|
||||
var sb strings.Builder
|
||||
sb.WriteString("error garbage collecting the following files:")
|
||||
for filePath, err := range ea {
|
||||
sb.WriteString(fmt.Sprintf("\t%s: %s", filePath, err.Error()))
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func newFSTablePersister(dir string, fc *fdCache, indexCache *indexCache) tablePersister {
|
||||
d.PanicIfTrue(fc == nil)
|
||||
return &fsTablePersister{dir, fc, indexCache}
|
||||
@@ -208,3 +233,60 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
|
||||
|
||||
return ftp.Open(ctx, name, plan.chunkCount, stats)
|
||||
}
|
||||
|
||||
func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error {
|
||||
ss := contents.getSpecSet()
|
||||
|
||||
fileInfos, err := ioutil.ReadDir(ftp.dir)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = ftp.fc.ShrinkCache()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ea := make(gcErrAccum)
|
||||
for _, info := range fileInfos {
|
||||
if info.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
filePath := path.Join(ftp.dir, info.Name())
|
||||
|
||||
if strings.HasPrefix(info.Name(), tempTablePrefix) {
|
||||
err = os.Remove(filePath)
|
||||
if err != nil {
|
||||
ea.add(filePath, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if len(info.Name()) != 32 {
|
||||
continue // not a table file
|
||||
}
|
||||
|
||||
addy, err := parseAddr(info.Name())
|
||||
if err != nil {
|
||||
continue // not a table file
|
||||
}
|
||||
|
||||
if _, ok := ss[addy]; ok {
|
||||
continue // file is referenced in the manifest
|
||||
}
|
||||
|
||||
err = os.Remove(filePath)
|
||||
if err != nil {
|
||||
ea.add(filePath, err)
|
||||
}
|
||||
}
|
||||
|
||||
if !ea.isEmpty() {
|
||||
return ea
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ func (ftc *fsTableCache) init(concurrency int) error {
|
||||
return errors.New(path + " is not a table file; cache dir must contain only table files")
|
||||
}
|
||||
|
||||
ad, err := parseAddr([]byte(info.Name()))
|
||||
ad, err := parseAddr(info.Name())
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -116,6 +116,14 @@ func (mc manifestContents) getSpec(i int) tableSpec {
|
||||
return mc.specs[i]
|
||||
}
|
||||
|
||||
func (mc manifestContents) getSpecSet() (ss map[addr]struct{}) {
|
||||
ss = make(map[addr]struct{}, len(mc.specs))
|
||||
for _, ts := range mc.specs {
|
||||
ss[ts.name] = struct{}{}
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
func (mc manifestContents) size() (size uint64) {
|
||||
size += uint64(len(mc.vers)) + addrSize + hash.ByteLen
|
||||
for _, sp := range mc.specs {
|
||||
@@ -317,7 +325,7 @@ func parseSpecs(tableInfo []string) ([]tableSpec, error) {
|
||||
specs := make([]tableSpec, len(tableInfo)/2)
|
||||
for i := range specs {
|
||||
var err error
|
||||
specs[i].name, err = parseAddr([]byte(tableInfo[2*i]))
|
||||
specs[i].name, err = parseAddr(tableInfo[2*i])
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -79,7 +79,7 @@ func newMemTable(memTableSize uint64) *memTable {
|
||||
|
||||
func (mt *memTable) addChunk(h addr, data []byte) bool {
|
||||
if len(data) == 0 {
|
||||
panic("NBS blocks cannont be zero length")
|
||||
panic("NBS blocks cannot be zero length")
|
||||
}
|
||||
if _, ok := mt.chunks[h]; ok {
|
||||
return true
|
||||
|
||||
@@ -65,6 +65,11 @@ func (nbsMW *NBSMetricWrapper) SupportedOperations() TableFileStoreOps {
|
||||
return nbsMW.nbs.SupportedOperations()
|
||||
}
|
||||
|
||||
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
|
||||
func (nbsMW *NBSMetricWrapper) PruneTableFiles(ctx context.Context) error {
|
||||
return nbsMW.nbs.PruneTableFiles(ctx)
|
||||
}
|
||||
|
||||
// GetManyCompressed gets the compressed Chunks with |hashes| from the store. On return,
|
||||
// |foundChunks| will have been fully sent all chunks which have been
|
||||
// found. Any non-present chunks will silently be ignored.
|
||||
|
||||
@@ -429,6 +429,8 @@ type fakeTablePersister struct {
|
||||
mu *sync.RWMutex
|
||||
}
|
||||
|
||||
var _ tablePersister = fakeTablePersister{}
|
||||
|
||||
func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
|
||||
if mustUint32(mt.count()) > 0 {
|
||||
name, data, chunkCount, err := mt.write(haver, stats)
|
||||
@@ -528,3 +530,7 @@ func (ftp fakeTablePersister) Open(ctx context.Context, name addr, chunkCount ui
|
||||
defer ftp.mu.RUnlock()
|
||||
return chunkSourceAdapter{ftp.sources[name], name}, nil
|
||||
}
|
||||
|
||||
func (ftp fakeTablePersister) PruneTableFiles(_ context.Context, _ manifestContents) error {
|
||||
return ErrUnsupportedOperation
|
||||
}
|
||||
|
||||
+58
-9
@@ -93,6 +93,8 @@ type NomsBlockStore struct {
|
||||
stats *Stats
|
||||
}
|
||||
|
||||
var _ TableFileStore = &NomsBlockStore{}
|
||||
|
||||
type Range struct {
|
||||
Offset uint64
|
||||
Length uint32
|
||||
@@ -204,10 +206,7 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
|
||||
contents = manifestContents{vers: nbs.upstream.vers}
|
||||
}
|
||||
|
||||
currSpecs := make(map[addr]bool)
|
||||
for _, spec := range contents.specs {
|
||||
currSpecs[spec.name] = true
|
||||
}
|
||||
currSpecs := contents.getSpecSet()
|
||||
|
||||
var addCount int
|
||||
for h, count := range updates {
|
||||
@@ -304,6 +303,10 @@ func NewGCSStore(ctx context.Context, nbfVerStr string, bucketName, path string,
|
||||
}
|
||||
|
||||
func NewLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSize uint64) (*NomsBlockStore, error) {
|
||||
return newLocalStore(ctx, nbfVerStr, dir, memTableSize, defaultMaxTables)
|
||||
}
|
||||
|
||||
func newLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSize uint64, maxTables int) (*NomsBlockStore, error) {
|
||||
cacheOnce.Do(makeGlobalCaches)
|
||||
err := checkDir(dir)
|
||||
|
||||
@@ -313,7 +316,7 @@ func NewLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSi
|
||||
|
||||
mm := makeManifestManager(fileManifest{dir})
|
||||
p := newFSTablePersister(dir, globalFDCache, globalIndexCache)
|
||||
nbs, err := newNomsBlockStore(ctx, nbfVerStr, mm, p, inlineConjoiner{defaultMaxTables}, memTableSize)
|
||||
nbs, err := newNomsBlockStore(ctx, nbfVerStr, mm, p, inlineConjoiner{maxTables}, memTableSize)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -735,8 +738,9 @@ func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash)
|
||||
// so that we avoid writing a bunch of unreachable small tables which result
|
||||
// from optismistic lock failures. However, this means that the time to
|
||||
// write tables is included in "commit" time and if all commits are
|
||||
// serialized, it means alot more waiting. Allow "non-trivial" tables to be
|
||||
// persisted outside of the commit-lock.
|
||||
// serialized, it means alot more waiting.
|
||||
// "non-trivial" tables are persisted here, outside of the commit-lock.
|
||||
// all other tables are persisted in updateManifest()
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
|
||||
@@ -769,6 +773,8 @@ func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash)
|
||||
}
|
||||
}()
|
||||
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
for {
|
||||
if err := nbs.updateManifest(ctx, current, last); err == nil {
|
||||
return true, nil
|
||||
@@ -788,9 +794,8 @@ var (
|
||||
errOptimisticLockFailedTables = fmt.Errorf("tables changed")
|
||||
)
|
||||
|
||||
// callers must acquire lock |nbs.mu|
|
||||
func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last hash.Hash) error {
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
if nbs.upstream.root != last {
|
||||
return errLastRootMismatch
|
||||
}
|
||||
@@ -1046,6 +1051,7 @@ func (nbs *NomsBlockStore) SupportedOperations() TableFileStoreOps {
|
||||
return TableFileStoreOps{
|
||||
CanRead: true,
|
||||
CanWrite: canwrite,
|
||||
CanPrune: canwrite,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1095,8 +1101,51 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, nu
|
||||
return err
|
||||
}
|
||||
|
||||
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
|
||||
func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) {
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
|
||||
nbs.mm.LockForUpdate()
|
||||
defer func() {
|
||||
unlockErr := nbs.mm.UnlockForUpdate()
|
||||
|
||||
if err == nil {
|
||||
err = unlockErr
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
// flush all tables and update manifest
|
||||
err = nbs.updateManifest(ctx, nbs.upstream.root, nbs.upstream.root)
|
||||
|
||||
if err == nil {
|
||||
break
|
||||
} else if err == errOptimisticLockFailedTables {
|
||||
continue
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
|
||||
// Same behavior as Commit
|
||||
// infinitely retries without backoff in the case off errOptimisticLockFailedTables
|
||||
}
|
||||
|
||||
ok, contents, err := nbs.mm.Fetch(ctx, &Stats{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return nil // no manifest exists
|
||||
}
|
||||
|
||||
return nbs.p.PruneTableFiles(ctx, contents)
|
||||
}
|
||||
|
||||
// SetRootChunk changes the root chunk hash from the previous value to the new root.
|
||||
func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error {
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
for {
|
||||
err := nbs.updateManifest(ctx, root, previous)
|
||||
|
||||
|
||||
+117
-7
@@ -27,33 +27,49 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/utils/set"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
func TestNBSAsTableFileStore(t *testing.T) {
|
||||
func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, nomsDir string) {
|
||||
ctx := context.Background()
|
||||
testDir := filepath.Join(os.TempDir(), uuid.New().String())
|
||||
nomsDir = filepath.Join(os.TempDir(), uuid.New().String())
|
||||
|
||||
err := os.MkdirAll(testDir, os.ModePerm)
|
||||
err := os.MkdirAll(nomsDir, os.ModePerm)
|
||||
require.NoError(t, err)
|
||||
|
||||
numTableFiles := 128
|
||||
|
||||
st, err := NewLocalStore(ctx, types.Format_Default.VersionString(), testDir, defaultMemTableSize)
|
||||
st, err = newLocalStore(ctx, types.Format_Default.VersionString(), nomsDir, defaultMemTableSize, maxTableFiles)
|
||||
require.NoError(t, err)
|
||||
return st, nomsDir
|
||||
}
|
||||
|
||||
fileToData := make(map[string][]byte, numTableFiles)
|
||||
type fileToData map[string][]byte
|
||||
|
||||
func populateLocalStore(t *testing.T, st *NomsBlockStore, numTableFiles int) fileToData {
|
||||
ctx := context.Background()
|
||||
fileToData := make(fileToData, numTableFiles)
|
||||
for i := 0; i < numTableFiles; i++ {
|
||||
var chunkData [][]byte
|
||||
for j := 0; j < i+1; j++ {
|
||||
chunkData = append(chunkData, []byte(fmt.Sprintf("%d:%d", i, j)))
|
||||
}
|
||||
data, addr, err := buildTable(chunkData)
|
||||
require.NoError(t, err)
|
||||
fileID := addr.String()
|
||||
fileToData[fileID] = data
|
||||
err = st.WriteTableFile(ctx, fileID, i+1, bytes.NewReader(data), 0, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
return fileToData
|
||||
}
|
||||
|
||||
func TestNBSAsTableFileStore(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
numTableFiles := 128
|
||||
assert.Greater(t, defaultMaxTables, numTableFiles)
|
||||
st, _ := makeTestLocalStore(t, defaultMaxTables)
|
||||
fileToData := populateLocalStore(t, st, numTableFiles)
|
||||
|
||||
_, sources, err := st.Sources(ctx)
|
||||
require.NoError(t, err)
|
||||
@@ -81,3 +97,97 @@ func TestNBSAsTableFileStore(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, size, uint64(0))
|
||||
}
|
||||
|
||||
type tableFileSet map[string]TableFile
|
||||
|
||||
func (s tableFileSet) contains(fileName string) (ok bool) {
|
||||
_, ok = s[fileName]
|
||||
return ok
|
||||
}
|
||||
|
||||
// findAbsent returns the table file names in |ftd| that don't exist in |s|
|
||||
func (s tableFileSet) findAbsent(ftd fileToData) (absent []string) {
|
||||
for fileID := range ftd {
|
||||
if !s.contains(fileID) {
|
||||
absent = append(absent, fileID)
|
||||
}
|
||||
}
|
||||
return absent
|
||||
}
|
||||
|
||||
func tableFileSetFromSources(sources []TableFile) (s tableFileSet) {
|
||||
s = make(tableFileSet, len(sources))
|
||||
for _, src := range sources {
|
||||
s[src.FileID()] = src
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func TestNBSPruneTableFiles(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// over populate table files
|
||||
numTableFiles := 64
|
||||
maxTableFiles := 16
|
||||
st, nomsDir := makeTestLocalStore(t, maxTableFiles)
|
||||
fileToData := populateLocalStore(t, st, numTableFiles)
|
||||
|
||||
// add a chunk and flush to trigger a conjoin
|
||||
c := []byte("it's a boy!")
|
||||
ok := st.addChunk(ctx, computeAddr(c), c)
|
||||
require.True(t, ok)
|
||||
ok, err := st.Commit(ctx, st.upstream.root, st.upstream.root)
|
||||
require.True(t, ok)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, sources, err := st.Sources(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Greater(t, numTableFiles, len(sources))
|
||||
|
||||
// find which input table files were conjoined
|
||||
tfSet := tableFileSetFromSources(sources)
|
||||
absent := tfSet.findAbsent(fileToData)
|
||||
// assert some input table files were conjoined
|
||||
assert.NotEmpty(t, absent)
|
||||
|
||||
currTableFiles := func(dirName string) *set.StrSet {
|
||||
infos, err := ioutil.ReadDir(dirName)
|
||||
require.NoError(t, err)
|
||||
curr := set.NewStrSet(nil)
|
||||
for _, fi := range infos {
|
||||
if fi.Name() != manifestFileName && fi.Name() != lockFileName {
|
||||
curr.Add(fi.Name())
|
||||
}
|
||||
}
|
||||
return curr
|
||||
}
|
||||
|
||||
preGC := currTableFiles(nomsDir)
|
||||
for _, tf := range sources {
|
||||
assert.True(t, preGC.Contains(tf.FileID()))
|
||||
}
|
||||
for _, fileName := range absent {
|
||||
assert.True(t, preGC.Contains(fileName))
|
||||
}
|
||||
|
||||
err = st.PruneTableFiles(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
postGC := currTableFiles(nomsDir)
|
||||
for _, tf := range sources {
|
||||
assert.True(t, preGC.Contains(tf.FileID()))
|
||||
}
|
||||
for _, fileName := range absent {
|
||||
assert.False(t, postGC.Contains(fileName))
|
||||
}
|
||||
infos, err := ioutil.ReadDir(nomsDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
// assert that we only have files for current sources,
|
||||
// the manifest, and the lock file
|
||||
assert.Equal(t, len(sources)+2, len(infos))
|
||||
|
||||
size, err := st.Size(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, size, uint64(0))
|
||||
}
|
||||
|
||||
+14
-6
@@ -27,6 +27,7 @@ import (
|
||||
"crypto/sha512"
|
||||
"encoding/base32"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"sync"
|
||||
@@ -169,9 +170,9 @@ func (a addr) Checksum() uint32 {
|
||||
return binary.BigEndian.Uint32(a[addrSize-checksumSize:])
|
||||
}
|
||||
|
||||
func parseAddr(b []byte) (addr, error) {
|
||||
func parseAddr(str string) (addr, error) {
|
||||
var h addr
|
||||
_, err := encoding.Decode(h[:], b)
|
||||
_, err := encoding.Decode(h[:], []byte(str))
|
||||
return h, err
|
||||
}
|
||||
|
||||
@@ -290,28 +291,35 @@ type TableFile interface {
|
||||
Open(ctx context.Context) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
var ErrUnsupportedOperation = errors.New("operation not supported")
|
||||
|
||||
// Describes what is possible to do with TableFiles in a TableFileStore.
|
||||
type TableFileStoreOps struct {
|
||||
// True is the TableFileStore supports reading table files.
|
||||
CanRead bool
|
||||
// True is the TableFileStore supports writing table files.
|
||||
CanWrite bool
|
||||
// True is the TableFileStore supports pruning unused table files.
|
||||
CanPrune bool
|
||||
}
|
||||
|
||||
// TableFileStore is an interface for interacting with table files directly
|
||||
type TableFileStore interface {
|
||||
// Sources retrieves the current root hash, and a list of all the table files
|
||||
// Sources retrieves the current root hash, and a list of all the table files.
|
||||
Sources(ctx context.Context) (hash.Hash, []TableFile, error)
|
||||
|
||||
// Returns the total size, in bytes, of the table files in this Store.
|
||||
// Size returns the total size, in bytes, of the table files in this Store.
|
||||
Size(ctx context.Context) (uint64, error)
|
||||
|
||||
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
|
||||
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore.
|
||||
WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error
|
||||
|
||||
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
|
||||
PruneTableFiles(ctx context.Context) error
|
||||
|
||||
// SetRootChunk changes the root chunk hash from the previous value to the new root.
|
||||
SetRootChunk(ctx context.Context, root, previous hash.Hash) error
|
||||
|
||||
// Returns a description of the support TableFile operations. Some stores only support reading table files, not writing.
|
||||
// SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing.
|
||||
SupportedOperations() TableFileStoreOps
|
||||
}
|
||||
|
||||
@@ -50,6 +50,9 @@ type tablePersister interface {
|
||||
|
||||
// Open a table named |name|, containing |chunkCount| chunks.
|
||||
Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error)
|
||||
|
||||
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
|
||||
PruneTableFiles(ctx context.Context, contents manifestContents) error
|
||||
}
|
||||
|
||||
// indexCache provides sized storage for table indices. While getting and/or
|
||||
|
||||
Reference in New Issue
Block a user