mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-13 11:09:10 -05:00
go/store/types: value_store.go: Change GC implementation to call TableFileStore.PruneTableFiles after the copy is complete.
This commit is contained in:
10
go/libraries/doltcore/env/actions/clone.go
vendored
10
go/libraries/doltcore/env/actions/clone.go
vendored
@@ -32,9 +32,9 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/strhelp"
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
"github.com/dolthub/dolt/go/store/datas/pull"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -91,11 +91,11 @@ func EnvForClone(ctx context.Context, nbf *types.NomsBinFormat, r env.Remote, di
|
||||
|
||||
func cloneProg(eventCh <-chan pull.TableFileEvent) {
|
||||
var (
|
||||
chunks int64
|
||||
chunksC int64
|
||||
chunksDownloading int64
|
||||
chunksDownloaded int64
|
||||
currStats = make(map[string]iohelp.ReadStats)
|
||||
tableFiles = make(map[string]*nbs.TableFile)
|
||||
tableFiles = make(map[string]*chunks.TableFile)
|
||||
)
|
||||
|
||||
p := cli.NewEphemeralPrinter()
|
||||
@@ -109,7 +109,7 @@ func cloneProg(eventCh <-chan pull.TableFileEvent) {
|
||||
for _, tf := range tblFEvt.TableFiles {
|
||||
c := tf
|
||||
tableFiles[c.FileID()] = &c
|
||||
chunks += int64(tf.NumChunks())
|
||||
chunksC += int64(tf.NumChunks())
|
||||
}
|
||||
case pull.DownloadStart:
|
||||
for _, tf := range tblFEvt.TableFiles {
|
||||
@@ -134,7 +134,7 @@ func cloneProg(eventCh <-chan pull.TableFileEvent) {
|
||||
}
|
||||
|
||||
p.Printf("%s of %s chunks complete. %s chunks being downloaded currently.\n",
|
||||
strhelp.CommaIfy(chunksDownloaded), strhelp.CommaIfy(chunks), strhelp.CommaIfy(chunksDownloading))
|
||||
strhelp.CommaIfy(chunksDownloaded), strhelp.CommaIfy(chunksC), strhelp.CommaIfy(chunksDownloading))
|
||||
for _, fileId := range sortedKeys(currStats) {
|
||||
s := currStats[fileId]
|
||||
bps := float64(s.Read) / s.Elapsed.Seconds()
|
||||
|
||||
@@ -26,7 +26,7 @@ type DBCache interface {
|
||||
|
||||
type RemoteSrvStore interface {
|
||||
chunks.ChunkStore
|
||||
nbs.TableFileStore
|
||||
chunks.TableFileStore
|
||||
|
||||
Path() (string, bool)
|
||||
GetChunkLocationsWithPaths(hashes hash.HashSet) (map[string]map[hash.Hash]nbs.Range, error)
|
||||
|
||||
@@ -34,8 +34,8 @@ import (
|
||||
remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -548,7 +548,7 @@ func getTableFileInfo(
|
||||
logger *logrus.Entry,
|
||||
md metadata.MD,
|
||||
rs *RemoteChunkStore,
|
||||
tableList []nbs.TableFile,
|
||||
tableList []chunks.TableFile,
|
||||
req *remotesapi.ListTableFilesRequest,
|
||||
cs RemoteSrvStore,
|
||||
) ([]*remotesapi.TableFileInfo, error) {
|
||||
|
||||
@@ -61,7 +61,7 @@ var ErrUploadFailed = errors.New("upload failed")
|
||||
|
||||
var globalHttpFetcher HTTPFetcher = &http.Client{}
|
||||
|
||||
var _ nbs.TableFileStore = (*DoltChunkStore)(nil)
|
||||
var _ chunks.TableFileStore = (*DoltChunkStore)(nil)
|
||||
var _ nbs.NBSCompressedChunkStore = (*DoltChunkStore)(nil)
|
||||
var _ chunks.ChunkStore = (*DoltChunkStore)(nil)
|
||||
var _ chunks.LoggingChunkStore = (*DoltChunkStore)(nil)
|
||||
@@ -1275,8 +1275,8 @@ func collapseBuffers(bufs [][]byte, length uint64) []byte {
|
||||
return res
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) SupportedOperations() nbs.TableFileStoreOps {
|
||||
return nbs.TableFileStoreOps{
|
||||
func (dcs *DoltChunkStore) SupportedOperations() chunks.TableFileStoreOps {
|
||||
return chunks.TableFileStoreOps{
|
||||
CanRead: true,
|
||||
CanWrite: true,
|
||||
CanPrune: false,
|
||||
@@ -1341,7 +1341,7 @@ func (dcs *DoltChunkStore) PruneTableFiles(ctx context.Context) error {
|
||||
|
||||
// Sources retrieves the current root hash, a list of all the table files (which may include appendix table files)
|
||||
// and a list of only appendix table files
|
||||
func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, []nbs.TableFile, error) {
|
||||
func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []chunks.TableFile, []chunks.TableFile, error) {
|
||||
id, token := dcs.getRepoId()
|
||||
req := &remotesapi.ListTableFilesRequest{RepoId: id, RepoPath: dcs.repoPath, RepoToken: token}
|
||||
resp, err := dcs.csClient.ListTableFiles(ctx, req)
|
||||
@@ -1356,8 +1356,8 @@ func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableF
|
||||
return hash.New(resp.RootHash), sourceFiles, appendixFiles, nil
|
||||
}
|
||||
|
||||
func getTableFiles(dcs *DoltChunkStore, infoList []*remotesapi.TableFileInfo) []nbs.TableFile {
|
||||
tableFiles := make([]nbs.TableFile, 0)
|
||||
func getTableFiles(dcs *DoltChunkStore, infoList []*remotesapi.TableFileInfo) []chunks.TableFile {
|
||||
tableFiles := make([]chunks.TableFile, 0)
|
||||
for _, nfo := range infoList {
|
||||
tableFiles = append(tableFiles, DoltRemoteTableFile{dcs, nfo})
|
||||
}
|
||||
|
||||
72
go/store/chunks/tablefilestore.go
Normal file
72
go/store/chunks/tablefilestore.go
Normal file
@@ -0,0 +1,72 @@
|
||||
// Copyright 2023 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
)
|
||||
|
||||
// TableFile is an interface for working with an existing table file
|
||||
type TableFile interface {
|
||||
// FileID gets the id of the file
|
||||
FileID() string
|
||||
|
||||
// NumChunks returns the number of chunks in a table file
|
||||
NumChunks() int
|
||||
|
||||
// Open returns an io.ReadCloser which can be used to read the bytes of a
|
||||
// table file. It also returns the content length of the table file.
|
||||
Open(ctx context.Context) (io.ReadCloser, uint64, error)
|
||||
}
|
||||
|
||||
// Describes what is possible to do with TableFiles in a TableFileStore.
|
||||
type TableFileStoreOps struct {
|
||||
// True is the TableFileStore supports reading table files.
|
||||
CanRead bool
|
||||
// True is the TableFileStore supports writing table files.
|
||||
CanWrite bool
|
||||
// True is the TableFileStore supports pruning unused table files.
|
||||
CanPrune bool
|
||||
// True is the TableFileStore supports garbage collecting chunks.
|
||||
CanGC bool
|
||||
}
|
||||
|
||||
// TableFileStore is an interface for interacting with table files directly
|
||||
type TableFileStore interface {
|
||||
// Sources retrieves the current root hash, a list of all the table files (which may include appendix table files),
|
||||
// and a second list containing only appendix table files.
|
||||
Sources(ctx context.Context) (hash.Hash, []TableFile, []TableFile, error)
|
||||
|
||||
// Size returns the total size, in bytes, of the table files in this Store.
|
||||
Size(ctx context.Context) (uint64, error)
|
||||
|
||||
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore.
|
||||
WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error
|
||||
|
||||
// AddTableFilesToManifest adds table files to the manifest
|
||||
AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error
|
||||
|
||||
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
|
||||
PruneTableFiles(ctx context.Context) error
|
||||
|
||||
// SetRootChunk changes the root chunk hash from the previous value to the new root.
|
||||
SetRootChunk(ctx context.Context, root, previous hash.Hash) error
|
||||
|
||||
// SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing.
|
||||
SupportedOperations() TableFileStoreOps
|
||||
}
|
||||
@@ -28,7 +28,6 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
@@ -168,7 +167,7 @@ type GarbageCollector interface {
|
||||
// Databases support this yet.
|
||||
func CanUsePuller(db Database) bool {
|
||||
cs := db.chunkStore()
|
||||
if tfs, ok := cs.(nbs.TableFileStore); ok {
|
||||
if tfs, ok := cs.(chunks.TableFileStore); ok {
|
||||
ops := tfs.SupportedOperations()
|
||||
return ops.CanRead && ops.CanWrite
|
||||
}
|
||||
|
||||
@@ -18,11 +18,10 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
)
|
||||
|
||||
func PruneTableFiles(ctx context.Context, db Database) error {
|
||||
tfs, ok := db.chunkStore().(nbs.TableFileStore)
|
||||
tfs, ok := db.chunkStore().(chunks.TableFileStore)
|
||||
|
||||
if !ok {
|
||||
return chunks.ErrUnsupportedOperation
|
||||
|
||||
@@ -26,13 +26,12 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
)
|
||||
|
||||
var ErrNoData = errors.New("no data")
|
||||
|
||||
func Clone(ctx context.Context, srcCS, sinkCS chunks.ChunkStore, eventCh chan<- TableFileEvent) error {
|
||||
srcTS, srcOK := srcCS.(nbs.TableFileStore)
|
||||
srcTS, srcOK := srcCS.(chunks.TableFileStore)
|
||||
|
||||
if !srcOK {
|
||||
return errors.New("src db is not a Table File Store")
|
||||
@@ -48,7 +47,7 @@ func Clone(ctx context.Context, srcCS, sinkCS chunks.ChunkStore, eventCh chan<-
|
||||
return ErrNoData
|
||||
}
|
||||
|
||||
sinkTS, sinkOK := sinkCS.(nbs.TableFileStore)
|
||||
sinkTS, sinkOK := sinkCS.(chunks.TableFileStore)
|
||||
|
||||
if !sinkOK {
|
||||
return errors.New("sink db is not a Table File Store")
|
||||
@@ -69,14 +68,14 @@ const (
|
||||
|
||||
type TableFileEvent struct {
|
||||
EventType CloneTableFileEvent
|
||||
TableFiles []nbs.TableFile
|
||||
TableFiles []chunks.TableFile
|
||||
Stats []iohelp.ReadStats
|
||||
}
|
||||
|
||||
// mapTableFiles returns the list of all fileIDs for the table files, and a map from fileID to nbs.TableFile
|
||||
func mapTableFiles(tblFiles []nbs.TableFile) ([]string, map[string]nbs.TableFile, map[string]int) {
|
||||
// mapTableFiles returns the list of all fileIDs for the table files, and a map from fileID to chunks.TableFile
|
||||
func mapTableFiles(tblFiles []chunks.TableFile) ([]string, map[string]chunks.TableFile, map[string]int) {
|
||||
fileIds := make([]string, len(tblFiles))
|
||||
fileIDtoTblFile := make(map[string]nbs.TableFile)
|
||||
fileIDtoTblFile := make(map[string]chunks.TableFile)
|
||||
fileIDtoNumChunks := make(map[string]int)
|
||||
|
||||
for i, tblFile := range tblFiles {
|
||||
@@ -90,7 +89,7 @@ func mapTableFiles(tblFiles []nbs.TableFile) ([]string, map[string]nbs.TableFile
|
||||
|
||||
const concurrentTableFileDownloads = 3
|
||||
|
||||
func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- TableFileEvent) error {
|
||||
func clone(ctx context.Context, srcTS, sinkTS chunks.TableFileStore, eventCh chan<- TableFileEvent) error {
|
||||
root, sourceFiles, appendixFiles, err := srcTS.Sources(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -104,7 +103,7 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
|
||||
}
|
||||
|
||||
// Initializes the list of fileIDs we are going to download, and the map of fileIDToTF. If this clone takes a long
|
||||
// time some of the urls within the nbs.TableFiles will expire and fail to download. At that point we will retrieve
|
||||
// time some of the urls within the chunks.TableFiles will expire and fail to download. At that point we will retrieve
|
||||
// the sources again, and update the fileIDToTF map with updated info, but not change the files we are downloading.
|
||||
desiredFiles, fileIDToTF, fileIDToNumChunks := mapTableFiles(tblFiles)
|
||||
completed := make([]bool, len(desiredFiles))
|
||||
@@ -134,7 +133,7 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
|
||||
return backoff.Permanent(errors.New("table file not found. please try again"))
|
||||
}
|
||||
|
||||
report(TableFileEvent{EventType: DownloadStart, TableFiles: []nbs.TableFile{tblFile}})
|
||||
report(TableFileEvent{EventType: DownloadStart, TableFiles: []chunks.TableFile{tblFile}})
|
||||
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), nil, func() (io.ReadCloser, uint64, error) {
|
||||
rd, contentLength, err := tblFile.Open(ctx)
|
||||
if err != nil {
|
||||
@@ -145,7 +144,7 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
|
||||
rdStats.Start(func(s iohelp.ReadStats) {
|
||||
report(TableFileEvent{
|
||||
EventType: DownloadStats,
|
||||
TableFiles: []nbs.TableFile{tblFile},
|
||||
TableFiles: []chunks.TableFile{tblFile},
|
||||
Stats: []iohelp.ReadStats{s},
|
||||
})
|
||||
})
|
||||
@@ -153,11 +152,11 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
|
||||
return rdStats, contentLength, nil
|
||||
})
|
||||
if err != nil {
|
||||
report(TableFileEvent{EventType: DownloadFailed, TableFiles: []nbs.TableFile{tblFile}})
|
||||
report(TableFileEvent{EventType: DownloadFailed, TableFiles: []chunks.TableFile{tblFile}})
|
||||
return err
|
||||
}
|
||||
|
||||
report(TableFileEvent{EventType: DownloadSuccess, TableFiles: []nbs.TableFile{tblFile}})
|
||||
report(TableFileEvent{EventType: DownloadSuccess, TableFiles: []chunks.TableFile{tblFile}})
|
||||
completed[idx] = true
|
||||
return nil
|
||||
})
|
||||
@@ -213,11 +212,11 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
|
||||
return sinkTS.SetRootChunk(ctx, root, hash.Hash{})
|
||||
}
|
||||
|
||||
func filterAppendicesFromSourceFiles(appendixFiles []nbs.TableFile, sourceFiles []nbs.TableFile) []nbs.TableFile {
|
||||
func filterAppendicesFromSourceFiles(appendixFiles []chunks.TableFile, sourceFiles []chunks.TableFile) []chunks.TableFile {
|
||||
if len(appendixFiles) == 0 {
|
||||
return sourceFiles
|
||||
}
|
||||
tblFiles := make([]nbs.TableFile, 0)
|
||||
tblFiles := make([]chunks.TableFile, 0)
|
||||
_, appendixMap, _ := mapTableFiles(appendixFiles)
|
||||
for _, sf := range sourceFiles {
|
||||
if _, ok := appendixMap[sf.FileID()]; !ok {
|
||||
|
||||
@@ -314,7 +314,7 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile
|
||||
// we can add bytes on to our bufferedSendBytes when
|
||||
// we have to retry a table file write.
|
||||
var localUploaded uint64
|
||||
return p.sinkDBCS.(nbs.TableFileStore).WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, tmpTblFile.contentHash, func() (io.ReadCloser, uint64, error) {
|
||||
return p.sinkDBCS.(chunks.TableFileStore).WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, tmpTblFile.contentHash, func() (io.ReadCloser, uint64, error) {
|
||||
rc, err := tmpTblFile.read.Reader()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
@@ -376,7 +376,7 @@ LOOP:
|
||||
}
|
||||
}
|
||||
|
||||
return p.sinkDBCS.(nbs.TableFileStore).AddTableFilesToManifest(ctx, fileIdToNumChunks)
|
||||
return p.sinkDBCS.(chunks.TableFileStore).AddTableFilesToManifest(ctx, fileIdToNumChunks)
|
||||
}
|
||||
|
||||
// Pull executes the sync operation
|
||||
|
||||
@@ -28,7 +28,7 @@ import (
|
||||
|
||||
var _ chunks.ChunkStore = (*GenerationalNBS)(nil)
|
||||
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
|
||||
var _ TableFileStore = (*GenerationalNBS)(nil)
|
||||
var _ chunks.TableFileStore = (*GenerationalNBS)(nil)
|
||||
|
||||
type GenerationalNBS struct {
|
||||
oldGen *NomsBlockStore
|
||||
@@ -262,7 +262,7 @@ func (gcs *GenerationalNBS) copyToOldGen(ctx context.Context, hashes hash.HashSe
|
||||
}
|
||||
|
||||
type prefixedTableFile struct {
|
||||
TableFile
|
||||
chunks.TableFile
|
||||
prefix string
|
||||
}
|
||||
|
||||
@@ -272,7 +272,7 @@ func (p prefixedTableFile) FileID() string {
|
||||
|
||||
// Sources retrieves the current root hash, a list of all the table files (which may include appendix table files),
|
||||
// and a second list containing only appendix table files for both the old gen and new gen stores.
|
||||
func (gcs *GenerationalNBS) Sources(ctx context.Context) (hash.Hash, []TableFile, []TableFile, error) {
|
||||
func (gcs *GenerationalNBS) Sources(ctx context.Context) (hash.Hash, []chunks.TableFile, []chunks.TableFile, error) {
|
||||
root, tFiles, appFiles, err := gcs.newGen.Sources(ctx)
|
||||
if err != nil {
|
||||
return hash.Hash{}, nil, nil, err
|
||||
@@ -339,7 +339,7 @@ func (gcs *GenerationalNBS) SetRootChunk(ctx context.Context, root, previous has
|
||||
}
|
||||
|
||||
// SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing.
|
||||
func (gcs *GenerationalNBS) SupportedOperations() TableFileStoreOps {
|
||||
func (gcs *GenerationalNBS) SupportedOperations() chunks.TableFileStoreOps {
|
||||
return gcs.newGen.SupportedOperations()
|
||||
}
|
||||
|
||||
|
||||
@@ -38,12 +38,12 @@ func NewNBSMetricWrapper(nbs *NomsBlockStore) *NBSMetricWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
var _ TableFileStore = &NBSMetricWrapper{}
|
||||
var _ chunks.TableFileStore = &NBSMetricWrapper{}
|
||||
var _ chunks.ChunkStoreGarbageCollector = &NBSMetricWrapper{}
|
||||
|
||||
// Sources retrieves the current root hash, a list of all the table files,
|
||||
// and a list of the appendix table files.
|
||||
func (nbsMW *NBSMetricWrapper) Sources(ctx context.Context) (hash.Hash, []TableFile, []TableFile, error) {
|
||||
func (nbsMW *NBSMetricWrapper) Sources(ctx context.Context) (hash.Hash, []chunks.TableFile, []chunks.TableFile, error) {
|
||||
return nbsMW.nbs.Sources(ctx)
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ func (nbsMW *NBSMetricWrapper) SetRootChunk(ctx context.Context, root, previous
|
||||
}
|
||||
|
||||
// Forwards SupportedOperations to wrapped block store.
|
||||
func (nbsMW *NBSMetricWrapper) SupportedOperations() TableFileStoreOps {
|
||||
func (nbsMW *NBSMetricWrapper) SupportedOperations() chunks.TableFileStoreOps {
|
||||
return nbsMW.nbs.SupportedOperations()
|
||||
}
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ type NomsBlockStore struct {
|
||||
stats *Stats
|
||||
}
|
||||
|
||||
var _ TableFileStore = &NomsBlockStore{}
|
||||
var _ chunks.TableFileStore = &NomsBlockStore{}
|
||||
var _ chunks.ChunkStoreGarbageCollector = &NomsBlockStore{}
|
||||
|
||||
type Range struct {
|
||||
@@ -1245,7 +1245,7 @@ func (tf tableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) {
|
||||
|
||||
// Sources retrieves the current root hash, a list of all table files (which may include appendix tablefiles),
|
||||
// and a second list of only the appendix table files
|
||||
func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []TableFile, []TableFile, error) {
|
||||
func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []chunks.TableFile, []chunks.TableFile, error) {
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
|
||||
@@ -1281,8 +1281,8 @@ func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []TableFile,
|
||||
return contents.GetRoot(), allTableFiles, appendixTableFiles, nil
|
||||
}
|
||||
|
||||
func getTableFiles(css map[addr]chunkSource, contents manifestContents, numSpecs int, specFunc func(mc manifestContents, idx int) tableSpec) ([]TableFile, error) {
|
||||
tableFiles := make([]TableFile, 0)
|
||||
func getTableFiles(css map[addr]chunkSource, contents manifestContents, numSpecs int, specFunc func(mc manifestContents, idx int) tableSpec) ([]chunks.TableFile, error) {
|
||||
tableFiles := make([]chunks.TableFile, 0)
|
||||
if numSpecs == 0 {
|
||||
return tableFiles, nil
|
||||
}
|
||||
@@ -1355,11 +1355,11 @@ func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[addr]chunkSource, error) {
|
||||
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) SupportedOperations() TableFileStoreOps {
|
||||
func (nbs *NomsBlockStore) SupportedOperations() chunks.TableFileStoreOps {
|
||||
var ok bool
|
||||
_, ok = nbs.p.(tableFilePersister)
|
||||
|
||||
return TableFileStoreOps{
|
||||
return chunks.TableFileStoreOps{
|
||||
CanRead: true,
|
||||
CanWrite: ok,
|
||||
CanPrune: ok,
|
||||
@@ -1523,23 +1523,7 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
|
||||
}
|
||||
|
||||
if destNBS == nbs {
|
||||
err = nbs.swapTables(ctx, specs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
currentContents := func() manifestContents {
|
||||
nbs.mu.RLock()
|
||||
defer nbs.mu.RUnlock()
|
||||
return nbs.upstream
|
||||
}()
|
||||
|
||||
t := time.Now()
|
||||
return nbs.p.PruneTableFiles(ctx, currentContents, t)
|
||||
return nbs.swapTables(ctx, specs)
|
||||
} else {
|
||||
fileIdToNumChunks := tableSpecsToMap(specs)
|
||||
err = destNBS.AddTableFilesToManifest(ctx, fileIdToNumChunks)
|
||||
|
||||
@@ -157,7 +157,7 @@ func makeChunk(i uint32) chunks.Chunk {
|
||||
return chunks.NewChunk(b)
|
||||
}
|
||||
|
||||
type tableFileSet map[string]TableFile
|
||||
type tableFileSet map[string]chunks.TableFile
|
||||
|
||||
func (s tableFileSet) contains(fileName string) (ok bool) {
|
||||
_, ok = s[fileName]
|
||||
@@ -174,7 +174,7 @@ func (s tableFileSet) findAbsent(ftd fileToData) (absent []string) {
|
||||
return absent
|
||||
}
|
||||
|
||||
func tableFileSetFromSources(sources []TableFile) (s tableFileSet) {
|
||||
func tableFileSetFromSources(sources []chunks.TableFile) (s tableFileSet) {
|
||||
s = make(tableFileSet, len(sources))
|
||||
for _, src := range sources {
|
||||
s[src.FileID()] = src
|
||||
|
||||
@@ -290,53 +290,3 @@ func copyChunkSourceSet(s chunkSourceSet) (cp chunkSourceSet) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// TableFile is an interface for working with an existing table file
|
||||
type TableFile interface {
|
||||
// FileID gets the id of the file
|
||||
FileID() string
|
||||
|
||||
// NumChunks returns the number of chunks in a table file
|
||||
NumChunks() int
|
||||
|
||||
// Open returns an io.ReadCloser which can be used to read the bytes of a
|
||||
// table file. It also returns the content length of the table file.
|
||||
Open(ctx context.Context) (io.ReadCloser, uint64, error)
|
||||
}
|
||||
|
||||
// Describes what is possible to do with TableFiles in a TableFileStore.
|
||||
type TableFileStoreOps struct {
|
||||
// True is the TableFileStore supports reading table files.
|
||||
CanRead bool
|
||||
// True is the TableFileStore supports writing table files.
|
||||
CanWrite bool
|
||||
// True is the TableFileStore supports pruning unused table files.
|
||||
CanPrune bool
|
||||
// True is the TableFileStore supports garbage collecting chunks.
|
||||
CanGC bool
|
||||
}
|
||||
|
||||
// TableFileStore is an interface for interacting with table files directly
|
||||
type TableFileStore interface {
|
||||
// Sources retrieves the current root hash, a list of all the table files (which may include appendix table files),
|
||||
// and a second list containing only appendix table files.
|
||||
Sources(ctx context.Context) (hash.Hash, []TableFile, []TableFile, error)
|
||||
|
||||
// Size returns the total size, in bytes, of the table files in this Store.
|
||||
Size(ctx context.Context) (uint64, error)
|
||||
|
||||
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore.
|
||||
WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error
|
||||
|
||||
// AddTableFilesToManifest adds table files to the manifest
|
||||
AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error
|
||||
|
||||
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.
|
||||
PruneTableFiles(ctx context.Context) error
|
||||
|
||||
// SetRootChunk changes the root chunk hash from the previous value to the new root.
|
||||
SetRootChunk(ctx context.Context, root, previous hash.Hash) error
|
||||
|
||||
// SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing.
|
||||
SupportedOperations() TableFileStoreOps
|
||||
}
|
||||
|
||||
@@ -733,16 +733,27 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS
|
||||
return err
|
||||
}
|
||||
|
||||
return lvs.gc(ctx, root, newGenRefs, oldGen.HasMany, newGen, newGen)
|
||||
err = lvs.gc(ctx, root, newGenRefs, oldGen.HasMany, newGen, newGen)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if collector, ok := lvs.cs.(chunks.ChunkStoreGarbageCollector); ok {
|
||||
if len(oldGenRefs) > 0 {
|
||||
newGenRefs.InsertAll(oldGenRefs)
|
||||
}
|
||||
|
||||
return lvs.gc(ctx, root, newGenRefs, unfilteredHashFunc, collector, collector)
|
||||
err = lvs.gc(ctx, root, newGenRefs, unfilteredHashFunc, collector, collector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return chunks.ErrUnsupportedOperation
|
||||
}
|
||||
|
||||
if tfs, ok := lvs.cs.(chunks.TableFileStore); ok {
|
||||
return tfs.PruneTableFiles(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.HashSet, hashFilter HashFilterFunc, src, dest chunks.ChunkStoreGarbageCollector) error {
|
||||
|
||||
Reference in New Issue
Block a user