From ccd8a1354845905f06fcfbc5fcfecb962ec2bdcc Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 1 Sep 2022 15:31:52 -0700 Subject: [PATCH] go/utils/remotesrv: Iterate on supporting generational chunk stores. --- go/store/nbs/generational_chunk_store.go | 25 +++++++++++++++++++++++ go/store/nbs/store.go | 20 ++++++++++++------ go/utils/remotesrv/cscache.go | 26 +++++++++++++++++++----- go/utils/remotesrv/grpc.go | 22 ++++++++++---------- go/utils/remotesrv/http.go | 8 ++++---- 5 files changed, 75 insertions(+), 26 deletions(-) diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 464213f8ff..2f2734425f 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -308,3 +308,28 @@ func (gcs *GenerationalNBS) SetRootChunk(ctx context.Context, root, previous has func (gcs *GenerationalNBS) SupportedOperations() TableFileStoreOps { return gcs.newGen.SupportedOperations() } + +func (gcs *GenerationalNBS) GetChunkLocations(hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { + res, err := gcs.newGen.GetChunkLocations(hashes) + if err != nil { + return nil, err + } + if len(hashes) > 0 { + toadd, err := gcs.oldGen.GetChunkLocations(hashes) + if err != nil { + return nil, err + } + for k, v := range toadd { + res["oldgen/" + k] = v + } + } + return res, nil +} + +func (gcs *GenerationalNBS) Path() (string, bool) { + return gcs.newGen.Path() +} + +func (gcs *GenerationalNBS) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) { + return gcs.newGen.UpdateManifest(ctx, updates) +} diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index b9346a163f..3d3ccd24d8 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -116,10 +116,10 @@ type Range struct { Length uint32 } -func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) { +func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { gr := toGetRecords(hashes) - ranges := make(map[hash.Hash]map[hash.Hash]Range) + ranges := make(map[string]map[hash.Hash]Range) f := func(css chunkSources) error { for _, cs := range css { switch tr := cs.(type) { @@ -129,7 +129,7 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash return err } if len(offsetRecSlice) > 0 { - y, ok := ranges[hash.Hash(tr.h)] + y, ok := ranges[hash.Hash(tr.h).String()] if !ok { y = make(map[hash.Hash]Range) @@ -146,10 +146,10 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash gr = toGetRecords(hashes) } - ranges[hash.Hash(tr.h)] = y + ranges[hash.Hash(tr.h).String()] = y } case *chunkSourceAdapter: - y, ok := ranges[hash.Hash(tr.h)] + y, ok := ranges[hash.Hash(tr.h).String()] if !ok { y = make(map[hash.Hash]Range) @@ -174,7 +174,7 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash } } - ranges[hash.Hash(tr.h)] = y + ranges[hash.Hash(tr.h).String()] = y for _, h := range foundHashes { delete(hashes, h) @@ -1332,6 +1332,14 @@ func (nbs *NomsBlockStore) SupportedOperations() TableFileStoreOps { } } +func (nbs *NomsBlockStore) Path() (string, bool) { + fsPersister, ok := nbs.p.(*fsTablePersister) + if !ok { + return "", false + } + return fsPersister.dir, true +} + // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { fsPersister, ok := nbs.p.(*fsTablePersister) diff --git a/go/utils/remotesrv/cscache.go b/go/utils/remotesrv/cscache.go index b72e1a2276..1f623552a9 100644 --- a/go/utils/remotesrv/cscache.go +++ b/go/utils/remotesrv/cscache.go @@ -21,15 +21,28 @@ import ( "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/store/nbs" + "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/chunks" ) const ( defaultMemTableSize = 128 * 1024 * 1024 ) +type store interface { + chunks.ChunkStore + nbs.TableFileStore + + Path() (string, bool) + GetChunkLocations(hashes hash.HashSet) (map[string]map[hash.Hash]nbs.Range, error) +} + +var _ store = &nbs.NomsBlockStore{} +var _ store = &nbs.GenerationalNBS{} + type DBCache struct { mu *sync.Mutex - dbs map[string]*nbs.NomsBlockStore + dbs map[string]store fs filesys.Filesys } @@ -37,12 +50,12 @@ type DBCache struct { func NewLocalCSCache(filesys filesys.Filesys) *DBCache { return &DBCache{ &sync.Mutex{}, - make(map[string]*nbs.NomsBlockStore), + make(map[string]store), filesys, } } -func (cache *DBCache) Get(org, repo, nbfVerStr string) (*nbs.NomsBlockStore, error) { +func (cache *DBCache) Get(org, repo, nbfVerStr string) (store, error) { cache.mu.Lock() defer cache.mu.Unlock() @@ -55,12 +68,15 @@ func (cache *DBCache) Get(org, repo, nbfVerStr string) (*nbs.NomsBlockStore, err var newCS *nbs.NomsBlockStore if cache.fs != nil { err := cache.fs.MkDirs(id) - + if err != nil { + return nil, err + } + path, err := cache.fs.Abs(id) if err != nil { return nil, err } - newCS, err = nbs.NewLocalStore(context.TODO(), nbfVerStr, id, defaultMemTableSize, nbs.NewUnlimitedMemQuotaProvider()) + newCS, err = nbs.NewLocalStore(context.TODO(), nbfVerStr, path, defaultMemTableSize, nbs.NewUnlimitedMemQuotaProvider()) if err != nil { return nil, err diff --git a/go/utils/remotesrv/grpc.go b/go/utils/remotesrv/grpc.go index 0b6a468001..5fb8dbebe2 100644 --- a/go/utils/remotesrv/grpc.go +++ b/go/utils/remotesrv/grpc.go @@ -115,7 +115,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot ranges = append(ranges, &remotesapi.RangeChunk{Hash: hCpy[:], Offset: r.Offset, Length: r.Length}) } - url, err := rs.getDownloadUrl(logger, org, repoName, loc.String()) + url, err := rs.getDownloadUrl(logger, org, repoName, loc) if err != nil { log.Println("Failed to sign request", err) return nil, err @@ -135,7 +135,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore defer func() { logger("finished") }() var repoID *remotesapi.RepoId - var cs *nbs.NomsBlockStore + var cs store for { req, err := stream.Recv() if err != nil { @@ -170,7 +170,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore ranges = append(ranges, &remotesapi.RangeChunk{Hash: hCpy[:], Offset: r.Offset, Length: r.Length}) } - url, err := rs.getDownloadUrl(logger, org, repoName, loc.String()) + url, err := rs.getDownloadUrl(logger, org, repoName, loc) if err != nil { log.Println("Failed to sign request", err) return err @@ -306,12 +306,12 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe logger(fmt.Sprintf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)) //should validate - updates := make(map[hash.Hash]uint32) + updates := make(map[string]int) for _, cti := range req.ChunkTableInfo { - updates[hash.New(cti.Hash)] = cti.ChunkCount + updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount) } - _, err := cs.UpdateManifest(ctx, updates) + err := cs.AddTableFilesToManifest(ctx, updates) if err != nil { logger(fmt.Sprintf("error occurred updating the manifest: %s", err.Error())) @@ -422,12 +422,12 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A logger(fmt.Sprintf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)) // should validate - updates := make(map[hash.Hash]uint32) + updates := make(map[string]int) for _, cti := range req.ChunkTableInfo { - updates[hash.New(cti.Hash)] = cti.ChunkCount + updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount) } - _, err := cs.UpdateManifest(ctx, updates) + err := cs.AddTableFilesToManifest(ctx, updates) if err != nil { logger(fmt.Sprintf("error occurred updating the manifest: %s", err.Error())) @@ -437,11 +437,11 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A return &remotesapi.AddTableFilesResponse{Success: true}, nil } -func (rs *RemoteChunkStore) getStore(repoId *remotesapi.RepoId, rpcName string) *nbs.NomsBlockStore { +func (rs *RemoteChunkStore) getStore(repoId *remotesapi.RepoId, rpcName string) store { return rs.getOrCreateStore(repoId, rpcName, types.Format_Default.VersionString()) } -func (rs *RemoteChunkStore) getOrCreateStore(repoId *remotesapi.RepoId, rpcName, nbfVerStr string) *nbs.NomsBlockStore { +func (rs *RemoteChunkStore) getOrCreateStore(repoId *remotesapi.RepoId, rpcName, nbfVerStr string) store { org := repoId.Org repoName := repoId.RepoName diff --git a/go/utils/remotesrv/http.go b/go/utils/remotesrv/http.go index b06db3f709..b982f3e0a0 100644 --- a/go/utils/remotesrv/http.go +++ b/go/utils/remotesrv/http.go @@ -70,7 +70,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { defer func() { logger("finished") }() path := strings.TrimLeft(req.URL.Path, "/") - tokens := strings.Split(path, "/") + tokens := strings.SplitN(path, "/", 3) if len(tokens) != 3 { logger(fmt.Sprintf("response to: %v method: %v http response code: %v", req.RequestURI, req.Method, http.StatusNotFound)) @@ -79,15 +79,15 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { org := tokens[0] repo := tokens[1] - hashStr := tokens[2] + file := tokens[2] statusCode := http.StatusMethodNotAllowed switch req.Method { case http.MethodGet: - statusCode = readTableFile(logger, org, repo, hashStr, respWr, req) + statusCode = readTableFile(logger, org, repo, file, respWr, req) case http.MethodPost, http.MethodPut: - statusCode = writeTableFile(req.Context(), logger, fh.dbCache, fh.expectedFiles, org, repo, hashStr, req) + statusCode = writeTableFile(req.Context(), logger, fh.dbCache, fh.expectedFiles, org, repo, file, req) } if statusCode != -1 {