Merge remote-tracking branch 'origin/main' into aaron/remotesrv-fix-logging-verbosity

This commit is contained in:
Aaron Son
2023-01-10 12:37:36 -08:00
18 changed files with 90 additions and 49 deletions
+1 -1
View File
@@ -58,7 +58,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/cespare/xxhash v1.1.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/go-mysql-server v0.14.1-0.20230106225759-dc24b2e2e3e0
github.com/dolthub/go-mysql-server v0.14.1-0.20230109224253-74f8047bb890
github.com/google/flatbuffers v2.0.6+incompatible
github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6
github.com/mitchellh/go-ps v1.0.0
+2 -2
View File
@@ -161,8 +161,8 @@ github.com/dolthub/flatbuffers v1.13.0-dh.1 h1:OWJdaPep22N52O/0xsUevxJ6Qfw1M2txC
github.com/dolthub/flatbuffers v1.13.0-dh.1/go.mod h1:CorYGaDmXjHz1Z7i50PYXG1Ricn31GcA2wNOTFIQAKE=
github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-mysql-server v0.14.1-0.20230106225759-dc24b2e2e3e0 h1:c3nboWDBY3LjHGTdryhNlUWn1zuVdNuJWTom+iBhiQE=
github.com/dolthub/go-mysql-server v0.14.1-0.20230106225759-dc24b2e2e3e0/go.mod h1:2ZHPn64+LPJWSfj/GvlaI/6yLSeVnbHTC3ih3ZBhtWg=
github.com/dolthub/go-mysql-server v0.14.1-0.20230109224253-74f8047bb890 h1:odCCXP1goWPFM2zVwXdH+0mUzg54hKDIo14T+GEiiU0=
github.com/dolthub/go-mysql-server v0.14.1-0.20230109224253-74f8047bb890/go.mod h1:2ZHPn64+LPJWSfj/GvlaI/6yLSeVnbHTC3ih3ZBhtWg=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=
+7 -1
View File
@@ -19,6 +19,7 @@ import (
"net/url"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
@@ -38,9 +39,14 @@ func (fact MemFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat, u
var db datas.Database
storage := &chunks.MemoryStorage{}
cs := storage.NewViewWithFormat(nbf.VersionString())
//bs := blobstore.NewInMemoryBlobstore(uuid.New().String())
//q := nbs.NewUnlimitedMemQuotaProvider()
//cs, err := nbs.NewBSStore(ctx, nbf.VersionString(), bs, defaultMemTableSize, q)
//if err != nil {
// return nil, nil, nil, err
//}
vrw := types.NewValueStore(cs)
ns := tree.NewNodeStore(cs)
db = datas.NewTypesDatabase(vrw, ns)
return db, vrw, ns, nil
}
+1 -13
View File
@@ -407,17 +407,11 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
cs, err := rs.getStore(logger, repoPath)
_, err := rs.getStore(logger, repoPath)
if err != nil {
return nil, err
}
err = cs.Rebase(ctx)
if err != nil {
logger.WithError(err).Error("error rebasing chunk store")
return nil, status.Error(codes.Internal, "error calling Rebase on chunk store")
}
return &remotesapi.RebaseResponse{}, nil
}
@@ -492,12 +486,6 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi
return nil, err
}
err = cs.Rebase(ctx)
if err != nil {
logger.WithError(err).Error("error calling Rebase")
return nil, err
}
size, err := cs.Size(ctx)
if err != nil {
logger.WithError(err).Error("error calling Size")
@@ -110,6 +110,7 @@ type DoltChunkStore struct {
repoPath string
repoToken *atomic.Value // string
host string
root hash.Hash
csClient remotesapi.ChunkStoreServiceClient
cache ChunkCache
metadata *remotesapi.GetRepoMetadataResponse
@@ -146,10 +147,15 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
return nil, err
}
repoToken := new(atomic.Value)
if metadata.RepoToken != "" {
repoToken.Store(metadata.RepoToken)
}
cs := &DoltChunkStore{
repoId: repoId,
repoPath: path,
repoToken: new(atomic.Value),
repoToken: repoToken,
host: host,
csClient: csClient,
cache: newMapChunkCache(),
@@ -158,6 +164,10 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
httpFetcher: globalHttpFetcher,
concurrency: defaultConcurrency,
}
err = cs.loadRoot(ctx)
if err != nil {
return nil, err
}
return cs, nil
}
@@ -167,6 +177,7 @@ func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
cache: dcs.cache,
metadata: dcs.metadata,
@@ -183,6 +194,7 @@ func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
cache: noopChunkCache,
metadata: dcs.metadata,
@@ -200,6 +212,7 @@ func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
cache: cache,
metadata: dcs.metadata,
@@ -217,6 +230,7 @@ func (dcs *DoltChunkStore) WithDownloadConcurrency(concurrency ConcurrencyParams
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
cache: dcs.cache,
metadata: dcs.metadata,
@@ -796,17 +810,10 @@ func (dcs *DoltChunkStore) Version() string {
// Rebase brings this ChunkStore into sync with the persistent storage's
// current root.
func (dcs *DoltChunkStore) Rebase(ctx context.Context) error {
id, token := dcs.getRepoId()
req := &remotesapi.RebaseRequest{RepoId: id, RepoToken: token, RepoPath: dcs.repoPath}
resp, err := dcs.csClient.Rebase(ctx, req)
err := dcs.loadRoot(ctx)
if err != nil {
return NewRpcError(err, "Rebase", dcs.host, req)
return err
}
if resp.RepoToken != "" {
dcs.repoToken.Store(token)
}
return dcs.refreshRepoMetadata(ctx)
}
@@ -833,18 +840,21 @@ func (dcs *DoltChunkStore) refreshRepoMetadata(ctx context.Context) error {
// Root returns the root of the database as of the time the ChunkStore
// was opened or the most recent call to Rebase.
func (dcs *DoltChunkStore) Root(ctx context.Context) (hash.Hash, error) {
return dcs.root, nil
}
func (dcs *DoltChunkStore) loadRoot(ctx context.Context) error {
id, token := dcs.getRepoId()
req := &remotesapi.RootRequest{RepoId: id, RepoToken: token, RepoPath: dcs.repoPath}
resp, err := dcs.csClient.Root(ctx, req)
if err != nil {
return hash.Hash{}, NewRpcError(err, "Root", dcs.host, req)
return NewRpcError(err, "Root", dcs.host, req)
}
if resp.RepoToken != "" {
dcs.repoToken.Store(resp.RepoToken)
}
return hash.New(resp.RootHash), nil
dcs.root = hash.New(resp.RootHash)
return nil
}
// Commit atomically attempts to persist all novel Chunks and update the
@@ -878,6 +888,10 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash)
if err != nil {
return false, NewRpcError(err, "Commit", dcs.host, req)
}
err = dcs.loadRoot(ctx)
if err != nil {
return false, NewRpcError(err, "Commit", dcs.host, req)
}
return resp.Success, dcs.refreshRepoMetadata(ctx)
}
@@ -226,11 +226,13 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
var curRootHash hash.Hash
if curRootHash, err = cs.Root(ctx); err == nil {
var ok bool
ok, err = cs.Commit(ctx, toPush, curRootHash)
if err == nil && !ok {
err = errDestDBRootHashMoved
if err = cs.Rebase(ctx); err == nil {
if curRootHash, err = cs.Root(ctx); err == nil {
var ok bool
ok, err = cs.Commit(ctx, toPush, curRootHash)
if err == nil && !ok {
err = errDestDBRootHashMoved
}
}
}
}
+3
View File
@@ -22,6 +22,9 @@ import (
// Blobstore is an interface for storing and retrieving blobs of data by key
type Blobstore interface {
// Path returns this blobstore's path.
Path() (path string)
// Exists returns true if a blob keyed by |key| exists.
Exists(ctx context.Context, key string) (ok bool, err error)
+1 -1
View File
@@ -88,7 +88,7 @@ func appendLocalTest(tests []BlobstoreTest) []BlobstoreTest {
func newBlobStoreTests() []BlobstoreTest {
var tests []BlobstoreTest
tests = append(tests, BlobstoreTest{"inmem", NewInMemoryBlobstore(), 10, 20})
tests = append(tests, BlobstoreTest{"inmem", NewInMemoryBlobstore(""), 10, 20})
tests = appendLocalTest(tests)
tests = appendGCSTest(tests)
+4
View File
@@ -52,6 +52,10 @@ func NewGCSBlobstore(gcs *storage.Client, bucketName, prefix string) *GCSBlobsto
return &GCSBlobstore{bucket, bucketName, prefix}
}
func (bs *GCSBlobstore) Path() string {
return path.Join(bs.bucketName, bs.prefix)
}
// Exists returns true if a blob exists for the given key, and false if it does not.
// For InMemoryBlobstore instances error should never be returned (though other
// implementations of this interface can)
+11 -2
View File
@@ -38,6 +38,7 @@ func newByteSliceReadCloser(data []byte) *byteSliceReadCloser {
// InMemoryBlobstore provides an in memory implementation of the Blobstore interface
type InMemoryBlobstore struct {
path string
mutex sync.RWMutex
blobs map[string][]byte
versions map[string]string
@@ -46,8 +47,16 @@ type InMemoryBlobstore struct {
var _ Blobstore = &InMemoryBlobstore{}
// NewInMemoryBlobstore creates an instance of an InMemoryBlobstore
func NewInMemoryBlobstore() *InMemoryBlobstore {
return &InMemoryBlobstore{blobs: make(map[string][]byte), versions: make(map[string]string)}
func NewInMemoryBlobstore(path string) *InMemoryBlobstore {
return &InMemoryBlobstore{
path: path,
blobs: make(map[string][]byte),
versions: make(map[string]string),
}
}
func (bs *InMemoryBlobstore) Path() string {
return bs.path
}
// Get retrieves an io.reader for the portion of a blob specified by br along with
+4
View File
@@ -75,6 +75,10 @@ func NewLocalBlobstore(dir string) *LocalBlobstore {
return &LocalBlobstore{dir}
}
func (bs *LocalBlobstore) Path() string {
return bs.RootDir
}
// Get retrieves an io.reader for the portion of a blob specified by br along with
// its version
func (bs *LocalBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) {
+4
View File
@@ -59,6 +59,10 @@ func NewOSSBlobstore(ossClient *oss.Client, bucketName, prefix string) (*OSSBlob
}, nil
}
func (ob *OSSBlobstore) Path() string {
return path.Join(ob.bucketName, ob.prefix)
}
func (ob *OSSBlobstore) Exists(_ context.Context, key string) (bool, error) {
return ob.bucket.IsObjectExist(ob.absKey(key))
}
-4
View File
@@ -136,18 +136,14 @@ func pull(ctx context.Context, srcCS, sinkCS chunks.ChunkStore, walkAddrs WalkAd
}
func persistChunks(ctx context.Context, cs chunks.ChunkStore) error {
// todo: there is no call to rebase on an unsuccessful Commit()
// will this loop forever?
var success bool
for !success {
r, err := cs.Root(ctx)
if err != nil {
return err
}
success, err = cs.Commit(ctx, r, r)
if err != nil {
return err
}
+12
View File
@@ -187,6 +187,18 @@ func (hs HashSet) InsertAll(other HashSet) {
}
}
func (hs HashSet) Equals(other HashSet) bool {
if hs.Size() != other.Size() {
return false
}
for h := range hs {
if !other.Has(h) {
return false
}
}
return true
}
func (hs HashSet) Empty() {
for h := range hs {
delete(hs, h)
+1 -1
View File
@@ -456,7 +456,7 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
t.Run("in memory blobstore persister", func(t *testing.T) {
testBlockStoreConjoinOnCommit(t, func(t *testing.T) tablePersister {
return &blobstorePersister{
bs: blobstore.NewInMemoryBlobstore(),
bs: blobstore.NewInMemoryBlobstore(""),
blockSize: 4096,
q: &UnlimitedQuotaProvider{},
}
+2 -3
View File
@@ -26,12 +26,11 @@ const (
)
type blobstoreManifest struct {
name string
bs blobstore.Blobstore
bs blobstore.Blobstore
}
func (bsm blobstoreManifest) Name() string {
return bsm.name
return bsm.bs.Path()
}
func manifestVersionAndContents(ctx context.Context, bs blobstore.Blobstore) (string, manifestContents, error) {
+1 -1
View File
@@ -91,7 +91,7 @@ func TestConjoin(t *testing.T) {
t.Run("in-memory blobstore persister", func(t *testing.T) {
testConjoin(t, func(*testing.T) tablePersister {
return &blobstorePersister{
bs: blobstore.NewInMemoryBlobstore(),
bs: blobstore.NewInMemoryBlobstore(""),
blockSize: 4096,
q: &UnlimitedQuotaProvider{},
}
+1 -1
View File
@@ -455,7 +455,7 @@ func NewGCSStore(ctx context.Context, nbfVerStr string, bucketName, path string,
func NewBSStore(ctx context.Context, nbfVerStr string, bs blobstore.Blobstore, memTableSize uint64, q MemoryQuotaProvider) (*NomsBlockStore, error) {
cacheOnce.Do(makeGlobalCaches)
mm := makeManifestManager(blobstoreManifest{"manifest", bs})
mm := makeManifestManager(blobstoreManifest{bs})
p := &blobstorePersister{bs, s3BlockSize, q}
return newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{defaultMaxTables}, memTableSize)