/go/{libraries,store}: expose manifest version through GetManifestStorageVersion, update VisitMapBFS to accept a writer

This commit is contained in:
Dustin Brown
2021-04-02 09:54:23 -07:00
parent 2622f6842e
commit 71c1e6bc12
6 changed files with 90 additions and 8 deletions

View File

@@ -1039,3 +1039,7 @@ func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB
func (ddb *DoltDB) Clone(ctx context.Context, destDB *DoltDB, eventCh chan<- datas.TableFileEvent) error {
return datas.Clone(ctx, ddb.db, destDB.db, eventCh)
}
func (ddb *DoltDB) GetStorageVersion(ctx context.Context) (string, error) {
return datas.GetManifestStorageVersion(ctx, ddb.db)
}

View File

@@ -100,6 +100,15 @@ type ChunkStoreGarbageCollector interface {
MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash) error
}
// ChunkStoreVersionGetter is a ChunkStore that supports getting the manifest's
// storage version
type ChunkStoreVersionGetter interface {
ChunkStore
// GetManifestStorageVersion returns the storage version of the Chunkstore's manifest
GetManifestStorageVersion(ctx context.Context) (string, error)
}
var ErrUnsupportedOperation = errors.New("operation not supported")
var ErrGCGenerationExpired = errors.New("garbage collection generation expired")

29
go/store/datas/version.go Normal file
View File

@@ -0,0 +1,29 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datas
import (
"context"
"github.com/dolthub/dolt/go/store/chunks"
)
func GetManifestStorageVersion(ctx context.Context, db Database) (string, error) {
store, ok := db.chunkStore().(chunks.ChunkStoreVersionGetter)
if !ok {
return "", chunks.ErrUnsupportedOperation
}
return store.GetManifestStorageVersion(ctx)
}

View File

@@ -81,3 +81,8 @@ func (nbsMW *NBSMetricWrapper) GetManyCompressed(ctx context.Context, hashes has
atomic.AddInt32(&nbsMW.TotalChunkGets, int32(len(hashes)))
return nbsMW.nbs.GetManyCompressed(ctx, hashes, found)
}
// GetManifestStorageVersion returns the storage version of the manifest.
func (nbsMW *NBSMetricWrapper) GetManifestStorageVersion(ctx context.Context) (string, error) {
return nbsMW.nbs.GetManifestStorageVersion(ctx)
}

View File

@@ -359,6 +359,26 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
}
}
// GetManifestStorageVersion returns the manifest storage version or an error if the operation
// is not supported
func (nbs *NomsBlockStore) GetManifestStorageVersion(ctx context.Context) (version string, err error) {
info, ok := nbs.mm.m.(ManifestInfo)
if !ok {
return "", chunks.ErrUnsupportedOperation
}
// possibly unnecessary
nbs.mm.LockForUpdate()
defer func() {
err = nbs.mm.UnlockForUpdate()
}()
nbs.mu.Lock()
defer nbs.mu.Unlock()
return info.GetVersion(), nil
}
func NewAWSStoreWithMMapIndex(ctx context.Context, nbfVerStr string, table, ns, bucket string, s3 s3svc, ddb ddbsvc, memTableSize uint64) (*NomsBlockStore, error) {
cacheOnce.Do(makeGlobalCaches)
readRateLimiter := make(chan struct{}, 32)

View File

@@ -25,6 +25,7 @@ import (
"context"
"errors"
"fmt"
"io"
"golang.org/x/sync/errgroup"
@@ -618,28 +619,41 @@ func (m Map) HumanReadableString() string {
panic("unreachable")
}
// VisitMapLevelOrder returns a list hash strings for all map chunks in level order
func VisitMapLevelOrder(m Map) ([]string, error) {
// VisitMapLevelOrder writes hashes of internal node chunks to a writer
// delimited with a newline character and returns the total number of
// bytes written or an error if encountered
func VisitMapLevelOrder(w io.Writer, m Map) (total int, err error) {
total = 0
curLevel := []Map{m}
allhashes := []string{}
for len(curLevel) > 0 {
nextLevel := []Map{}
for _, m := range curLevel {
if metaSeq, ok := m.orderedSequence.(metaSequence); ok {
ts, err := metaSeq.tuples()
if err != nil {
return nil, err
return 0, err
}
for _, t := range ts {
r, err := t.ref()
if err != nil {
return nil, err
return 0, err
}
allhashes = append(allhashes, r.TargetHash().String())
p := []byte(r.TargetHash().String() + "\n")
n, err := w.Write(p)
if err != nil {
return 0, err
}
total += n
v, err := r.TargetValue(context.Background(), m.valueReadWriter())
if err != nil {
return nil, err
return 0, err
}
nextLevel = append(nextLevel, v.(Map))
}
} else if _, ok := m.orderedSequence.(mapLeafSequence); ok {
@@ -648,5 +662,6 @@ func VisitMapLevelOrder(m Map) ([]string, error) {
}
curLevel = nextLevel
}
return allhashes, nil
return total, nil
}