first pass at table files pruning

This commit is contained in:
Andy Arthur
2020-09-17 23:14:08 -07:00
parent 97be86529a
commit 27c47cd553
14 changed files with 133 additions and 16 deletions

View File

@@ -150,7 +150,7 @@ func runMain() int {
restoreIO := cli.InitIO()
defer restoreIO()
warnIfMaxFilesTooLow()
//warnIfMaxFilesTooLow()
ctx := context.Background()
dEnv := env.Load(ctx, env.GetCurrentUserHomeDir, filesys.LocalFS, doltdb.LocalDirDoltDB, Version)
@@ -199,7 +199,7 @@ func runMain() int {
return 1
}
err = reconfigIfTempFileMoveFails(dEnv)
//err = reconfigIfTempFileMoveFails(dEnv)
if err != nil {
cli.PrintErrln(color.RedString("Failed to setup the temporary directory. %v`", err))

View File

@@ -83,6 +83,8 @@ type DoltChunkStore struct {
httpFetcher HTTPFetcher
}
var _ nbs.TableFileStore = &DoltChunkStore{}
func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error) {
tokens := strings.Split(strings.Trim(path, "/"), "/")
if len(tokens) != 2 {
@@ -943,6 +945,11 @@ func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, nu
return nil
}
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
func (dcs *DoltChunkStore) PruneTableFiles(ctx context.Context) error {
panic("Not Implemented")
}
// Sources retrieves the current root hash, and a list of all the table files
func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, error) {
req := &remotesapi.ListTableFilesRequest{RepoId: dcs.getRepoId()}

View File

@@ -603,3 +603,7 @@ func (s3p awsTablePersister) uploadPart(ctx context.Context, data []byte, key, u
}
return
}
func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error {
panic("Not Implemented")
}

View File

@@ -149,3 +149,7 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch
return &chunkSourceAdapter{newTableReader(index, tra, s3BlockSize), name}, nil
}
func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error {
panic("Not Implemented")
}

View File

@@ -197,7 +197,7 @@ func parseManifest(r io.Reader) (manifestContents, error) {
return manifestContents{}, err
}
ad, err := parseAddr([]byte(slices[2]))
ad, err := parseAddr(slices[2])
if err != nil {
return manifestContents{}, err

View File

@@ -26,8 +26,11 @@ import (
"context"
"errors"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"github.com/dolthub/dolt/go/store/util/tempfiles"
@@ -208,3 +211,46 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
return ftp.Open(ctx, name, plan.chunkCount, stats)
}
func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error {
ss := contents.getSpecSet()
fileInfos, err := ioutil.ReadDir(ftp.dir)
if err != nil {
return err
}
for _, info := range fileInfos {
if !info.Mode().IsRegular() {
continue
}
if strings.HasPrefix(info.Name(), tempTablePrefix) {
err = os.Remove(path.Join(ftp.dir, info.Name()))
if err != nil {
return err
}
continue
}
if len(info.Name()) != 32 {
continue // not a table file
}
addy, err := parseAddr(info.Name())
if err != nil {
continue // not a table file
}
if _, ok := ss[addy]; ok {
continue // file is referenced in the manifest
}
err = os.Remove(path.Join(ftp.dir, info.Name()))
if err != nil {
return err
}
}
return nil
}

View File

@@ -95,7 +95,7 @@ func (ftc *fsTableCache) init(concurrency int) error {
return errors.New(path + " is not a table file; cache dir must contain only table files")
}
ad, err := parseAddr([]byte(info.Name()))
ad, err := parseAddr(info.Name())
if err != nil {
return err

View File

@@ -116,6 +116,14 @@ func (mc manifestContents) getSpec(i int) tableSpec {
return mc.specs[i]
}
func (mc manifestContents) getSpecSet() (ss map[addr]struct{}) {
ss = make(map[addr]struct{}, len(mc.specs))
for _, ts := range mc.specs {
ss[ts.name] = struct{}{}
}
return ss
}
func (mc manifestContents) size() (size uint64) {
size += uint64(len(mc.vers)) + addrSize + hash.ByteLen
for _, sp := range mc.specs {
@@ -317,7 +325,7 @@ func parseSpecs(tableInfo []string) ([]tableSpec, error) {
specs := make([]tableSpec, len(tableInfo)/2)
for i := range specs {
var err error
specs[i].name, err = parseAddr([]byte(tableInfo[2*i]))
specs[i].name, err = parseAddr(tableInfo[2*i])
if err != nil {
return nil, err

View File

@@ -79,7 +79,7 @@ func newMemTable(memTableSize uint64) *memTable {
func (mt *memTable) addChunk(h addr, data []byte) bool {
if len(data) == 0 {
panic("NBS blocks cannont be zero length")
panic("NBS blocks cannot be zero length")
}
if _, ok := mt.chunks[h]; ok {
return true

View File

@@ -65,6 +65,11 @@ func (nbsMW *NBSMetricWrapper) SupportedOperations() TableFileStoreOps {
return nbsMW.nbs.SupportedOperations()
}
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
func (nbsMW *NBSMetricWrapper) PruneTableFiles(ctx context.Context) error {
return nbsMW.nbs.PruneTableFiles(ctx)
}
// GetManyCompressed gets the compressed Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.

View File

@@ -528,3 +528,7 @@ func (ftp fakeTablePersister) Open(ctx context.Context, name addr, chunkCount ui
defer ftp.mu.RUnlock()
return chunkSourceAdapter{ftp.sources[name], name}, nil
}
func PruneTableFiles(ctx context.Context) error {
return nil
}

View File

@@ -93,6 +93,8 @@ type NomsBlockStore struct {
stats *Stats
}
var _ TableFileStore = &NomsBlockStore{}
type Range struct {
Offset uint64
Length uint32
@@ -204,10 +206,7 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
contents = manifestContents{vers: nbs.upstream.vers}
}
currSpecs := make(map[addr]bool)
for _, spec := range contents.specs {
currSpecs[spec.name] = true
}
currSpecs := contents.getSpecSet()
var addCount int
for h, count := range updates {
@@ -1095,6 +1094,40 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, nu
return err
}
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
if err == nil {
err = unlockErr
}
}()
// no-op commit to persist tables and update manifest
ok, err := nbs.Commit(ctx, nbs.upstream.root, nbs.upstream.root)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("could not persist data before pruning table files")
}
ok, contents, err := nbs.mm.Fetch(ctx, &Stats{})
if err != nil{
return err
}
if !ok {
return nil // no manifest exists
}
return nbs.p.PruneTableFiles(ctx, contents)
}
// SetRootChunk changes the root chunk hash from the previous value to the new root.
func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error {
for {

View File

@@ -169,9 +169,9 @@ func (a addr) Checksum() uint32 {
return binary.BigEndian.Uint32(a[addrSize-checksumSize:])
}
func parseAddr(b []byte) (addr, error) {
func parseAddr(str string) (addr, error) {
var h addr
_, err := encoding.Decode(h[:], b)
_, err := encoding.Decode(h[:], []byte(str))
return h, err
}
@@ -300,18 +300,21 @@ type TableFileStoreOps struct {
// TableFileStore is an interface for interacting with table files directly
type TableFileStore interface {
// Sources retrieves the current root hash, and a list of all the table files
// Sources retrieves the current root hash, and a list of all the table files.
Sources(ctx context.Context) (hash.Hash, []TableFile, error)
// Returns the total size, in bytes, of the table files in this Store.
// Size returns the total size, in bytes, of the table files in this Store.
Size(ctx context.Context) (uint64, error)
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore.
WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
PruneTableFiles(ctx context.Context) error
// SetRootChunk changes the root chunk hash from the previous value to the new root.
SetRootChunk(ctx context.Context, root, previous hash.Hash) error
// Returns a description of the support TableFile operations. Some stores only support reading table files, not writing.
// SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing.
SupportedOperations() TableFileStoreOps
}

View File

@@ -50,6 +50,9 @@ type tablePersister interface {
// Open a table named |name|, containing |chunkCount| chunks.
Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error)
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
PruneTableFiles(ctx context.Context, contents manifestContents) error
}
// indexCache provides sized storage for table indices. While getting and/or