diff --git a/go/cmd/dolt/dolt.go b/go/cmd/dolt/dolt.go index b74499734e..94d76764fc 100644 --- a/go/cmd/dolt/dolt.go +++ b/go/cmd/dolt/dolt.go @@ -150,7 +150,7 @@ func runMain() int { restoreIO := cli.InitIO() defer restoreIO() - warnIfMaxFilesTooLow() + //warnIfMaxFilesTooLow() ctx := context.Background() dEnv := env.Load(ctx, env.GetCurrentUserHomeDir, filesys.LocalFS, doltdb.LocalDirDoltDB, Version) @@ -199,7 +199,7 @@ func runMain() int { return 1 } - err = reconfigIfTempFileMoveFails(dEnv) + //err = reconfigIfTempFileMoveFails(dEnv) if err != nil { cli.PrintErrln(color.RedString("Failed to setup the temporary directory. %v`", err)) diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index c17a8d4a55..f57641f154 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -83,6 +83,8 @@ type DoltChunkStore struct { httpFetcher HTTPFetcher } +var _ nbs.TableFileStore = &DoltChunkStore{} + func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error) { tokens := strings.Split(strings.Trim(path, "/"), "/") if len(tokens) != 2 { @@ -943,6 +945,11 @@ func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, nu return nil } +// PruneTableFiles deletes old table files that are no longer referenced in the manifest. +func (dcs *DoltChunkStore) PruneTableFiles(ctx context.Context) error { + panic("Not Implemented") +} + // Sources retrieves the current root hash, and a list of all the table files func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, error) { req := &remotesapi.ListTableFilesRequest{RepoId: dcs.getRepoId()} diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 4d453a75de..bca5b94dd9 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -603,3 +603,7 @@ func (s3p awsTablePersister) uploadPart(ctx context.Context, data []byte, key, u } return } + +func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error { + panic("Not Implemented") +} diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 18973df468..7178f532a7 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -149,3 +149,7 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch return &chunkSourceAdapter{newTableReader(index, tra, s3BlockSize), name}, nil } + +func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error { + panic("Not Implemented") +} \ No newline at end of file diff --git a/go/store/nbs/file_manifest.go b/go/store/nbs/file_manifest.go index 87a0d177ab..83368c95a8 100644 --- a/go/store/nbs/file_manifest.go +++ b/go/store/nbs/file_manifest.go @@ -197,7 +197,7 @@ func parseManifest(r io.Reader) (manifestContents, error) { return manifestContents{}, err } - ad, err := parseAddr([]byte(slices[2])) + ad, err := parseAddr(slices[2]) if err != nil { return manifestContents{}, err diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index cf09b17f07..42cff005a0 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -26,8 +26,11 @@ import ( "context" "errors" "io" + "io/ioutil" "os" + "path" "path/filepath" + "strings" "github.com/dolthub/dolt/go/store/util/tempfiles" @@ -208,3 +211,46 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource return ftp.Open(ctx, name, plan.chunkCount, stats) } + +func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error { + ss := contents.getSpecSet() + + fileInfos, err := ioutil.ReadDir(ftp.dir) + + if err != nil { + return err + } + + for _, info := range fileInfos { + if !info.Mode().IsRegular() { + continue + } + if strings.HasPrefix(info.Name(), tempTablePrefix) { + err = os.Remove(path.Join(ftp.dir, info.Name())) + if err != nil { + return err + } + continue + } + + if len(info.Name()) != 32 { + continue // not a table file + } + + addy, err := parseAddr(info.Name()) + if err != nil { + continue // not a table file + } + + if _, ok := ss[addy]; ok { + continue // file is referenced in the manifest + } + + err = os.Remove(path.Join(ftp.dir, info.Name())) + if err != nil { + return err + } + } + + return nil +} \ No newline at end of file diff --git a/go/store/nbs/fs_table_cache.go b/go/store/nbs/fs_table_cache.go index 45b6f07ae6..a3b8f5d4e4 100644 --- a/go/store/nbs/fs_table_cache.go +++ b/go/store/nbs/fs_table_cache.go @@ -95,7 +95,7 @@ func (ftc *fsTableCache) init(concurrency int) error { return errors.New(path + " is not a table file; cache dir must contain only table files") } - ad, err := parseAddr([]byte(info.Name())) + ad, err := parseAddr(info.Name()) if err != nil { return err diff --git a/go/store/nbs/manifest.go b/go/store/nbs/manifest.go index c7d1bbfc29..7be2dd3f18 100644 --- a/go/store/nbs/manifest.go +++ b/go/store/nbs/manifest.go @@ -116,6 +116,14 @@ func (mc manifestContents) getSpec(i int) tableSpec { return mc.specs[i] } +func (mc manifestContents) getSpecSet() (ss map[addr]struct{}) { + ss = make(map[addr]struct{}, len(mc.specs)) + for _, ts := range mc.specs { + ss[ts.name] = struct{}{} + } + return ss +} + func (mc manifestContents) size() (size uint64) { size += uint64(len(mc.vers)) + addrSize + hash.ByteLen for _, sp := range mc.specs { @@ -317,7 +325,7 @@ func parseSpecs(tableInfo []string) ([]tableSpec, error) { specs := make([]tableSpec, len(tableInfo)/2) for i := range specs { var err error - specs[i].name, err = parseAddr([]byte(tableInfo[2*i])) + specs[i].name, err = parseAddr(tableInfo[2*i]) if err != nil { return nil, err diff --git a/go/store/nbs/mem_table.go b/go/store/nbs/mem_table.go index a94f2d6344..e4458a13ec 100644 --- a/go/store/nbs/mem_table.go +++ b/go/store/nbs/mem_table.go @@ -79,7 +79,7 @@ func newMemTable(memTableSize uint64) *memTable { func (mt *memTable) addChunk(h addr, data []byte) bool { if len(data) == 0 { - panic("NBS blocks cannont be zero length") + panic("NBS blocks cannot be zero length") } if _, ok := mt.chunks[h]; ok { return true diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 40fd509e68..a66a5941b8 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -65,6 +65,11 @@ func (nbsMW *NBSMetricWrapper) SupportedOperations() TableFileStoreOps { return nbsMW.nbs.SupportedOperations() } +// PruneTableFiles deletes old table files that are no longer referenced in the manifest. +func (nbsMW *NBSMetricWrapper) PruneTableFiles(ctx context.Context) error { + return nbsMW.nbs.PruneTableFiles(ctx) +} + // GetManyCompressed gets the compressed Chunks with |hashes| from the store. On return, // |foundChunks| will have been fully sent all chunks which have been // found. Any non-present chunks will silently be ignored. diff --git a/go/store/nbs/root_tracker_test.go b/go/store/nbs/root_tracker_test.go index fea3148e29..1ddb8c7e80 100644 --- a/go/store/nbs/root_tracker_test.go +++ b/go/store/nbs/root_tracker_test.go @@ -528,3 +528,7 @@ func (ftp fakeTablePersister) Open(ctx context.Context, name addr, chunkCount ui defer ftp.mu.RUnlock() return chunkSourceAdapter{ftp.sources[name], name}, nil } + +func PruneTableFiles(ctx context.Context) error { + return nil +} \ No newline at end of file diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 723a372922..43ff72802a 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -93,6 +93,8 @@ type NomsBlockStore struct { stats *Stats } +var _ TableFileStore = &NomsBlockStore{} + type Range struct { Offset uint64 Length uint32 @@ -204,10 +206,7 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash. contents = manifestContents{vers: nbs.upstream.vers} } - currSpecs := make(map[addr]bool) - for _, spec := range contents.specs { - currSpecs[spec.name] = true - } + currSpecs := contents.getSpecSet() var addCount int for h, count := range updates { @@ -1095,6 +1094,40 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, nu return err } +// PruneTableFiles deletes old table files that are no longer referenced in the manifest. +func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) { + nbs.mu.Lock() + defer nbs.mu.Unlock() + + nbs.mm.LockForUpdate() + defer func() { + unlockErr := nbs.mm.UnlockForUpdate() + + if err == nil { + err = unlockErr + } + }() + + // no-op commit to persist tables and update manifest + ok, err := nbs.Commit(ctx, nbs.upstream.root, nbs.upstream.root) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("could not persist data before pruning table files") + } + + ok, contents, err := nbs.mm.Fetch(ctx, &Stats{}) + if err != nil{ + return err + } + if !ok { + return nil // no manifest exists + } + + return nbs.p.PruneTableFiles(ctx, contents) +} + // SetRootChunk changes the root chunk hash from the previous value to the new root. func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error { for { diff --git a/go/store/nbs/table.go b/go/store/nbs/table.go index 9d4a5c5954..589efafc5c 100644 --- a/go/store/nbs/table.go +++ b/go/store/nbs/table.go @@ -169,9 +169,9 @@ func (a addr) Checksum() uint32 { return binary.BigEndian.Uint32(a[addrSize-checksumSize:]) } -func parseAddr(b []byte) (addr, error) { +func parseAddr(str string) (addr, error) { var h addr - _, err := encoding.Decode(h[:], b) + _, err := encoding.Decode(h[:], []byte(str)) return h, err } @@ -300,18 +300,21 @@ type TableFileStoreOps struct { // TableFileStore is an interface for interacting with table files directly type TableFileStore interface { - // Sources retrieves the current root hash, and a list of all the table files + // Sources retrieves the current root hash, and a list of all the table files. Sources(ctx context.Context) (hash.Hash, []TableFile, error) - // Returns the total size, in bytes, of the table files in this Store. + // Size returns the total size, in bytes, of the table files in this Store. Size(ctx context.Context) (uint64, error) - // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore + // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore. WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error + // PruneTableFiles deletes old table files that are no longer referenced in the manifest. + PruneTableFiles(ctx context.Context) error + // SetRootChunk changes the root chunk hash from the previous value to the new root. SetRootChunk(ctx context.Context, root, previous hash.Hash) error - // Returns a description of the support TableFile operations. Some stores only support reading table files, not writing. + // SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing. SupportedOperations() TableFileStoreOps } diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index 6fa768ab5f..ee3f65564b 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -50,6 +50,9 @@ type tablePersister interface { // Open a table named |name|, containing |chunkCount| chunks. Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) + + // PruneTableFiles deletes old table files that are no longer referenced in the manifest. + PruneTableFiles(ctx context.Context, contents manifestContents) error } // indexCache provides sized storage for table indices. While getting and/or