Merge pull request #687 from liquidata-inc/aaron/chunkstore-size

go/{store/nbs,libraries/doltcore/remotestorage}: Expose repository size as Size() on TableFileSource.
This commit is contained in:
Aaron Son
2020-05-20 09:33:10 -07:00
committed by GitHub
6 changed files with 86 additions and 6 deletions
@@ -45,6 +45,8 @@ var ErrInvalidDoltSpecPath = errors.New("invalid dolt spec path")
var globalHttpFetcher HTTPFetcher = &http.Client{}
var _ nbs.TableFileStore = (*DoltChunkStore)(nil)
// We may need this to be configurable for users with really bad internet
var downThroughputCheck = iohelp.MinThroughputCheckParams{
MinBytesPerSec: 1024,
@@ -461,6 +463,25 @@ func (dcs *DoltChunkStore) Rebase(ctx context.Context) error {
return NewRpcError(err, "Rebase", dcs.host, req)
}
return dcs.refreshRepoMetadata(ctx)
}
func (dcs *DoltChunkStore) refreshRepoMetadata(ctx context.Context) error {
mdReq := &remotesapi.GetRepoMetadataRequest{
RepoId: &remotesapi.RepoId{
Org: dcs.org,
RepoName: dcs.repoName,
},
ClientRepoFormat: &remotesapi.ClientRepoFormat{
NbfVersion: dcs.nbf.VersionString(),
NbsVersion: nbs.StorageVersion,
},
}
metadata, err := dcs.csClient.GetRepoMetadata(ctx, mdReq)
if err != nil {
return NewRpcError(err, "GetRepoMetadata", dcs.host, mdReq)
}
dcs.metadata = metadata
return nil
}
@@ -503,13 +524,11 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash)
},
}
resp, err := dcs.csClient.Commit(ctx, req)
if err != nil {
return false, NewRpcError(err, "Commit", dcs.host, req)
}
return resp.Success, nil
return resp.Success, dcs.refreshRepoMetadata(ctx)
}
// Stats may return some kind of struct that reports statistics about the
@@ -941,6 +960,10 @@ func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableF
return hash.New(resp.RootHash), tblFiles, nil
}
func (dcs *DoltChunkStore) Size(ctx context.Context) (uint64, error) {
return dcs.metadata.StorageSize, nil
}
// SetRootChunk changes the root chunk hash from the previous value to the new root.
func (dcs *DoltChunkStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error {
panic("Not Implemented")
+11 -3
View File
@@ -409,7 +409,7 @@ func (ttfWr *TestTableFileWriter) Close(ctx context.Context) error {
type TestTableFileStore struct {
root hash.Hash
tableFiles map[string]nbs.TableFile
tableFiles map[string]*TestTableFile
}
func (ttfs *TestTableFileStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, error) {
@@ -421,6 +421,14 @@ func (ttfs *TestTableFileStore) Sources(ctx context.Context) (hash.Hash, []nbs.T
return ttfs.root, tblFiles, nil
}
func (ttfs *TestTableFileStore) Size(ctx context.Context) (uint64, error) {
sz := uint64(0)
for _, tblFile := range ttfs.tableFiles {
sz += uint64(len(tblFile.data))
}
return sz, nil
}
func (ttfs *TestTableFileStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
tblFile := &TestTableFileWriter{fileId, numChunks, bytes.NewBuffer(nil), ttfs}
_, err := io.Copy(tblFile, rd)
@@ -448,7 +456,7 @@ func TestClone(t *testing.T) {
hashBytes := [hash.ByteLen]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13}
src := &TestTableFileStore{
root: hash.Of(hashBytes[:]),
tableFiles: map[string]nbs.TableFile{
tableFiles: map[string]*TestTableFile{
"file1": &TestTableFile{
fileID: "file1",
numChunks: 1,
@@ -479,7 +487,7 @@ func TestClone(t *testing.T) {
dest := &TestTableFileStore{
root: hash.Hash{},
tableFiles: map[string]nbs.TableFile{},
tableFiles: map[string]*TestTableFile{},
}
ctx := context.Background()
+4
View File
@@ -46,6 +46,10 @@ func (nbsMW *NBSMetricWrapper) Sources(ctx context.Context) (hash.Hash, []TableF
return nbsMW.nbs.Sources(ctx)
}
func (nbsMW *NBSMetricWrapper) Size(ctx context.Context) (uint64, error) {
return nbsMW.nbs.Size(ctx)
}
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
func (nbsMW *NBSMetricWrapper) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
return nbsMW.nbs.WriteTableFile(ctx, fileId, numChunks, rd, contentLength, contentHash)
+38
View File
@@ -932,6 +932,44 @@ func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []TableFile,
return contents.GetRoot(), tableFiles, nil
}
func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
stats := &Stats{}
exists, contents, err := nbs.mm.m.ParseIfExists(ctx, stats, nil)
if err != nil {
return uint64(0), err
}
if !exists {
return uint64(0), nil
}
css, err := nbs.chunkSourcesByAddr()
if err != nil {
return uint64(0), err
}
numSpecs := contents.NumTableSpecs()
size := uint64(0)
for i := 0; i < numSpecs; i++ {
info := contents.getSpec(i)
cs, ok := css[info.name]
if !ok {
return uint64(0), errors.New("manifest referenced table file for which there is no chunkSource.")
}
ti, err := cs.index()
if err != nil {
return uint64(0), fmt.Errorf("error getting table file index for chunkSource. %w", err)
}
size += ti.tableFileSize()
}
return size, nil
}
func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[addr]chunkSource, error) {
css := make(map[addr]chunkSource, len(nbs.tables.upstream)+len(nbs.tables.novel))
for _, cs := range nbs.tables.upstream {
+4
View File
@@ -76,4 +76,8 @@ func TestNBSAsTableFileStore(t *testing.T) {
assert.Equal(t, expected, data)
}
size, err := st.Size(ctx)
require.NoError(t, err)
require.Greater(t, size, uint64(0))
}
+3
View File
@@ -293,6 +293,9 @@ type TableFileStore interface {
// Sources retrieves the current root hash, and a list of all the table files
Sources(ctx context.Context) (hash.Hash, []TableFile, error)
// Returns the total size, in bytes, of the table files in this Store.
Size(ctx context.Context) (uint64, error)
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error