/go/store/{blobstore,nbs}: add nbs layer

This commit is contained in:
coffeegoddd☕️✨
2026-02-10 15:47:18 -08:00
parent 94fa0634bf
commit 7537acb0df
7 changed files with 291 additions and 25 deletions

View File

@@ -522,6 +522,12 @@ func (gbs *GitBlobstore) syncForRead(ctx context.Context) error {
// 1) Fetch remote ref into our remote-tracking ref.
if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil {
// An absent remote ref is treated as an empty store. This is required for NBS open
// (manifest ParseIfExists) against a freshly-initialized remote.
var rnf *git.RefNotFoundError
if errors.As(err, &rnf) && rnf.Ref == gbs.remoteRef {
return nil
}
return err
}
@@ -554,29 +560,42 @@ func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string
var ver string
op := func() error {
// 1) Fetch remote state into local tracking ref.
remoteMissing := false
if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil {
return err
}
remoteHead, okRemote, err := gbs.api.TryResolveRefCommit(ctx, gbs.remoteTrackingRef)
if err != nil {
return backoff.Permanent(err)
}
if !okRemote {
return backoff.Permanent(&git.RefNotFoundError{Ref: gbs.remoteTrackingRef})
// If the remote ref is missing, treat this as an empty store and bootstrap on write.
var rnf *git.RefNotFoundError
if errors.As(err, &rnf) && rnf.Ref == gbs.remoteRef {
remoteMissing = true
} else {
return err
}
}
// 2) Force-set owned local ref to remote head (remote is source-of-truth).
if err := gbs.api.UpdateRef(ctx, gbs.localRef, remoteHead, "gitblobstore: sync write"); err != nil {
return backoff.Permanent(err)
}
var remoteHead git.OID
var okRemote bool
if !remoteMissing {
var err error
remoteHead, okRemote, err = gbs.api.TryResolveRefCommit(ctx, gbs.remoteTrackingRef)
if err != nil {
return backoff.Permanent(err)
}
if !okRemote {
return backoff.Permanent(&git.RefNotFoundError{Ref: gbs.remoteTrackingRef})
}
// 2b) Merge cache to reflect fetched contents.
if err := gbs.mergeCacheFromHead(ctx, remoteHead); err != nil {
return backoff.Permanent(err)
// 2) Force-set owned local ref to remote head (remote is source-of-truth).
if err := gbs.api.UpdateRef(ctx, gbs.localRef, remoteHead, "gitblobstore: sync write"); err != nil {
return backoff.Permanent(err)
}
// 2b) Merge cache to reflect fetched contents.
if err := gbs.mergeCacheFromHead(ctx, remoteHead); err != nil {
return backoff.Permanent(err)
}
}
// 3) Apply this operation's changes on top of the remote head.
newCommit, err := build(remoteHead, true)
newCommit, err := build(remoteHead, okRemote)
if err != nil {
return backoff.Permanent(err)
}
@@ -614,15 +633,15 @@ func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string
}
func (gbs *GitBlobstore) putWithRemoteSync(ctx context.Context, key string, plan putPlan, msg string) (string, error) {
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, _ bool) (git.OID, error) {
return gbs.buildCommitForKeyWrite(ctx, remoteHead, true, key, plan, msg)
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, ok bool) (git.OID, error) {
return gbs.buildCommitForKeyWrite(ctx, remoteHead, ok, key, plan, msg)
})
}
func (gbs *GitBlobstore) checkAndPutWithRemoteSync(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader, msg string) (string, error) {
var cachedPlan *putPlan
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, _ bool) (git.OID, error) {
actualKeyVersion, err := gbs.currentKeyVersion(ctx, remoteHead, true, key)
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, ok bool) (git.OID, error) {
actualKeyVersion, err := gbs.currentKeyVersion(ctx, remoteHead, ok, key)
if err != nil {
return git.OID(""), err
}
@@ -636,7 +655,7 @@ func (gbs *GitBlobstore) checkAndPutWithRemoteSync(ctx context.Context, expected
}
cachedPlan = &plan
}
return gbs.buildCommitForKeyWrite(ctx, remoteHead, true, key, *cachedPlan, msg)
return gbs.buildCommitForKeyWrite(ctx, remoteHead, ok, key, *cachedPlan, msg)
})
}

View File

@@ -275,6 +275,42 @@ func TestGitBlobstore_RemoteManaged_PutPushesToRemote(t *testing.T) {
require.Equal(t, []byte("from local\n"), got)
}
func TestGitBlobstore_RemoteManaged_PutBootstrapsEmptyRemote(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
// Do not seed refs/dolt/data in the remote: simulate a truly empty remote.
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
want := []byte("bootstrapped\n")
ver, err := bs.Put(ctx, "k", int64(len(want)), bytes.NewReader(want))
require.NoError(t, err)
require.NotEmpty(t, ver)
// Remote should now have refs/dolt/data and contain the key.
remoteRunner, err := git.NewRunner(remoteRepo.GitDir)
require.NoError(t, err)
remoteAPI := git.NewGitAPIImpl(remoteRunner)
remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef)
require.NoError(t, err)
require.NotEmpty(t, remoteHead)
oid, typ, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "k")
require.NoError(t, err)
require.Equal(t, git.ObjectTypeBlob, typ)
rc, err := remoteAPI.BlobReader(ctx, oid)
require.NoError(t, err)
got, rerr := io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, rerr)
require.Equal(t, want, got)
}
type hookPushGitAPI struct {
git.GitAPI
onFirstPush func(ctx context.Context)

View File

@@ -121,7 +121,8 @@ type GitAPI interface {
FetchRef(ctx context.Context, remote string, srcRef string, dstRef string) error
// PushRefWithLease pushes |srcRef| to |dstRef| on |remote|, but only if the remote's |dstRef|
// equals |expectedDstOID| (force-with-lease).
// equals |expectedDstOID| (force-with-lease). If |expectedDstOID| is empty, it enforces that
// the remote |dstRef| is missing (bootstrap / create-if-missing semantics).
// Equivalent plumbing: GIT_DIR=... git push --force-with-lease=<dstRef>:<expectedDstOID> <remote> <srcRef>:<dstRef>
PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID OID) error
}

View File

@@ -321,6 +321,9 @@ func (a *GitAPIImpl) FetchRef(ctx context.Context, remote string, srcRef string,
srcRef = strings.TrimPrefix(srcRef, "+")
refspec := "+" + srcRef + ":" + dstRef
_, err := a.r.Run(ctx, RunOptions{}, "fetch", "--no-tags", remote, refspec)
if err != nil && isRemoteRefNotFoundErr(err) {
return &RefNotFoundError{Ref: srcRef}
}
return err
}
@@ -334,9 +337,6 @@ func (a *GitAPIImpl) PushRefWithLease(ctx context.Context, remote string, srcRef
if dstRef == "" {
return fmt.Errorf("git push: dst ref is required")
}
if expectedDstOID == "" {
return fmt.Errorf("git push: expected dst oid is required")
}
srcRef = strings.TrimPrefix(srcRef, "+")
refspec := srcRef + ":" + dstRef
lease := "--force-with-lease=" + dstRef + ":" + expectedDstOID.String()
@@ -360,6 +360,19 @@ func isRefNotFoundErr(err error) bool {
strings.Contains(msg, "not a valid object name")
}
func isRemoteRefNotFoundErr(err error) bool {
ce, ok := err.(*CmdError)
if !ok {
return false
}
msg := strings.ToLower(string(ce.Output))
// Typical fetch failure when the remote ref doesn't exist:
// fatal: couldn't find remote ref refs/dolt/data
return strings.Contains(msg, "couldn't find remote ref") ||
strings.Contains(msg, "could not find remote ref") ||
strings.Contains(msg, "remote ref does not exist")
}
func isPathNotFoundErr(err error) bool {
ce, ok := err.(*CmdError)
if !ok {

View File

@@ -830,6 +830,33 @@ func TestGitAPIImpl_FetchRef_ForcedUpdatesTrackingRef(t *testing.T) {
}
}
func TestGitAPIImpl_FetchRef_MissingRemoteRefReturnsRefNotFound(t *testing.T) {
t.Parallel()
ctx := context.Background()
remoteRepo, _, _ := newTestRepo(t, ctx)
_, localRunner, localAPI := newTestRepo(t, ctx)
_, err := localRunner.Run(ctx, RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
if err != nil {
t.Fatal(err)
}
remoteDataRef := "refs/dolt/data"
dstRef := "refs/dolt/remotes/origin/data"
err = localAPI.FetchRef(ctx, "origin", remoteDataRef, dstRef)
if err == nil {
t.Fatalf("expected error")
}
var rnf *RefNotFoundError
if !errors.As(err, &rnf) {
t.Fatalf("expected RefNotFoundError, got %T: %v", err, err)
}
if rnf.Ref != remoteDataRef {
t.Fatalf("expected missing ref %q, got %q", remoteDataRef, rnf.Ref)
}
}
func TestGitAPIImpl_PushRefWithLease_SucceedsThenRejectsStaleLease(t *testing.T) {
t.Parallel()
@@ -919,3 +946,48 @@ func TestGitAPIImpl_PushRefWithLease_SucceedsThenRejectsStaleLease(t *testing.T)
t.Fatalf("remote ref changed unexpectedly on stale lease: got %q, want %q", got, r2)
}
}
func TestGitAPIImpl_PushRefWithLease_CreatesWhenMissing(t *testing.T) {
t.Parallel()
ctx := context.Background()
remoteRepo, _, remoteAPI := newTestRepo(t, ctx)
_, localRunner, localAPI := newTestRepo(t, ctx)
_, err := localRunner.Run(ctx, RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
if err != nil {
t.Fatal(err)
}
// Create a local commit l1 and set local refs/dolt/data to it (src ref for push).
indexFile := tempIndexFile(t)
if err := localAPI.ReadTreeEmpty(ctx, indexFile); err != nil {
t.Fatal(err)
}
treeOID, err := localAPI.WriteTree(ctx, indexFile)
if err != nil {
t.Fatal(err)
}
l1, err := localAPI.CommitTree(ctx, treeOID, nil, "l1", testAuthor())
if err != nil {
t.Fatal(err)
}
srcRef := "refs/dolt/data"
if err := localAPI.UpdateRef(ctx, srcRef, l1, "set local src"); err != nil {
t.Fatal(err)
}
// Remote ref is missing. Push with an empty expected OID should create it.
if err := localAPI.PushRefWithLease(ctx, "origin", srcRef, srcRef, ""); err != nil {
t.Fatal(err)
}
got, err := remoteAPI.ResolveRefCommit(ctx, srcRef)
if err != nil {
t.Fatal(err)
}
if got != l1 {
t.Fatalf("remote ref mismatch after bootstrap push: got %q, want %q", got, l1)
}
}

View File

@@ -0,0 +1,113 @@
// Copyright 2026 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"bytes"
"context"
"os/exec"
"testing"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/blobstore"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
"github.com/dolthub/dolt/go/store/types"
)
func TestNBS_GitBlobstore_EmptyRemote_OpenReturnsEmptyManifest(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found on PATH")
}
ctx := context.Background()
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
require.NoError(t, err)
// Do not seed refs/dolt/data in the remote: simulate a truly empty remote.
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
require.NoError(t, err)
cmd := exec.CommandContext(ctx, "git", "--git-dir", localRepo.GitDir, "remote", "add", "origin", remoteRepo.GitDir)
out, err := cmd.CombinedOutput()
require.NoError(t, err, "git remote add failed: %s", string(out))
store, err := NewGitStore(ctx, types.Format_DOLT.VersionString(), localRepo.GitDir, blobstore.DoltDataRef, blobstore.GitBlobstoreOptions{}, 0, NewUnlimitedMemQuotaProvider())
require.NoError(t, err)
defer store.Close()
exists, _, _, err := store.manifestMgr.Fetch(ctx, store.stats)
require.NoError(t, err)
require.False(t, exists)
}
func TestNBS_GitBlobstore_EmptyRemote_FirstManifestUpdateBootstrapsRef(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found on PATH")
}
ctx := context.Background()
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
require.NoError(t, err)
// Do not seed refs/dolt/data in the remote: simulate a truly empty remote.
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
require.NoError(t, err)
cmd := exec.CommandContext(ctx, "git", "--git-dir", localRepo.GitDir, "remote", "add", "origin", remoteRepo.GitDir)
out, err := cmd.CombinedOutput()
require.NoError(t, err, "git remote add failed: %s", string(out))
bs, err := blobstore.NewGitBlobstore(localRepo.GitDir, blobstore.DoltDataRef)
require.NoError(t, err)
// Write a valid v5 manifest into the empty remote via the blobstore manifest updater.
root := hash.Of([]byte("root"))
gcGen := hash.Hash{}
want := manifestContents{
nbfVers: types.Format_DOLT.VersionString(),
root: root,
gcGen: gcGen,
specs: nil,
appendix: nil,
}
want.lock = generateLockHash(want.root, want.specs, want.appendix, nil)
stats := NewStats()
got, err := blobstoreManifest{bs: bs}.Update(ctx, hash.Hash{}, want, stats, nil)
require.NoError(t, err)
require.Equal(t, want.lock, got.lock)
require.Equal(t, want.root, got.root)
// Remote ref should now exist.
cmd = exec.CommandContext(ctx, "git", "--git-dir", remoteRepo.GitDir, "rev-parse", "--verify", "--quiet", blobstore.DoltDataRef+"^{commit}")
revParseOut, err := cmd.CombinedOutput()
require.NoError(t, err, "git rev-parse failed: %s", string(revParseOut))
// Re-open via NBS and ensure manifest is readable.
store, err := NewGitStore(ctx, types.Format_DOLT.VersionString(), localRepo.GitDir, blobstore.DoltDataRef, blobstore.GitBlobstoreOptions{}, 0, NewUnlimitedMemQuotaProvider())
require.NoError(t, err)
defer store.Close()
exists, contents, _, err := store.manifestMgr.Fetch(ctx, store.stats)
require.NoError(t, err)
require.True(t, exists)
require.Equal(t, want.root, contents.root)
require.Equal(t, want.lock, contents.lock)
// Sanity: manifest blob contents are parseable.
var buf bytes.Buffer
require.NoError(t, writeManifest(&buf, contents))
require.NotEmpty(t, buf.Bytes())
}

View File

@@ -635,6 +635,18 @@ func NewGitStore(ctx context.Context, nbfVerStr string, gitDir string, ref strin
return NewBSStore(ctx, nbfVerStr, bs, memTableSize, q)
}
// NewNoConjoinGitStore returns an nbs implementation backed by a GitBlobstore, but disables conjoin.
// This can be useful for deployments where conjoin's table rewrite cost is undesirable.
func NewNoConjoinGitStore(ctx context.Context, nbfVerStr string, gitDir string, ref string, opts blobstore.GitBlobstoreOptions, memTableSize uint64, q MemoryQuotaProvider) (*NomsBlockStore, error) {
cacheOnce.Do(makeGlobalCaches)
bs, err := blobstore.NewGitBlobstoreWithOptions(gitDir, ref, opts)
if err != nil {
return nil, err
}
return NewNoConjoinBSStore(ctx, nbfVerStr, bs, memTableSize, q)
}
// NewBSStore returns an nbs implementation backed by a Blobstore
func NewBSStore(ctx context.Context, nbfVerStr string, bs blobstore.Blobstore, memTableSize uint64, q MemoryQuotaProvider) (*NomsBlockStore, error) {
cacheOnce.Do(makeGlobalCaches)