Merge pull request #10417 from dolthub/db/gitblobstore-next

GitBlobstore: implement `Put` with CAS retries + configurable identity; add `Put` tests
This commit is contained in:
Dustin Brown
2026-02-04 16:10:14 -08:00
committed by GitHub
3 changed files with 508 additions and 21 deletions

View File

@@ -16,9 +16,15 @@ package blobstore
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/cenkalti/backoff/v4"
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
)
@@ -27,26 +33,37 @@ import (
// database (bare repo or .git directory). It stores keys as paths within the tree
// of the commit referenced by a git ref (e.g. refs/dolt/data).
//
// This initial implementation is intentionally READ-ONLY. Write-path methods
// (Put / CheckAndPut / Concatenate) return an explicit unimplemented error while
// we lock down read behavior for manifests and table files.
// This implementation is being developed in phases. Read paths are implemented first,
// then write paths are added incrementally. At the moment, Put is implemented, while
// CheckAndPut and Concatenate are still unimplemented.
type GitBlobstore struct {
gitDir string
ref string
runner *git.Runner
api git.GitAPI
// identity, when non-nil, is used as the author/committer identity for new commits.
// When nil, we prefer whatever identity git derives from env/config, falling back
// to a deterministic default only if git reports the identity is missing.
identity *git.Identity
}
var _ Blobstore = (*GitBlobstore)(nil)
// NewGitBlobstore creates a new read-only GitBlobstore rooted at |gitDir| and |ref|.
// |gitDir| should point at a bare repo directory or a .git directory.
// NewGitBlobstore creates a new GitBlobstore rooted at |gitDir| and |ref|.
// |gitDir| should point at a bare repo directory or a .git directory. Put is implemented,
// while CheckAndPut and Concatenate are still unimplemented (see type-level docs).
func NewGitBlobstore(gitDir, ref string) (*GitBlobstore, error) {
return NewGitBlobstoreWithIdentity(gitDir, ref, nil)
}
// NewGitBlobstoreWithIdentity creates a GitBlobstore rooted at |gitDir| and |ref|, optionally
// forcing an author/committer identity for write paths.
func NewGitBlobstoreWithIdentity(gitDir, ref string, identity *git.Identity) (*GitBlobstore, error) {
r, err := git.NewRunner(gitDir)
if err != nil {
return nil, err
}
return &GitBlobstore{gitDir: gitDir, ref: ref, runner: r, api: git.NewGitAPIImpl(r)}, nil
return &GitBlobstore{gitDir: gitDir, ref: ref, runner: r, api: git.NewGitAPIImpl(r), identity: identity}, nil
}
func (gbs *GitBlobstore) Path() string {
@@ -157,10 +174,175 @@ func (l *limitReadCloser) Read(p []byte) (int, error) { return l.r.Read(p) }
func (l *limitReadCloser) Close() error { return l.c.Close() }
func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (string, error) {
if _, err := normalizeGitTreePath(key); err != nil {
key, err := normalizeGitTreePath(key)
if err != nil {
return "", err
}
return "", fmt.Errorf("%w: GitBlobstore.Put", git.ErrUnimplemented)
// Hash the contents once. If we need to retry due to concurrent updates to |gbs.ref|,
// we can reuse the blob OID without re-reading |reader|.
blobOID, err := gbs.api.HashObject(ctx, reader)
if err != nil {
return "", err
}
// Make Put resilient to concurrent writers updating unrelated keys by using a CAS loop
// under the hood. This matches typical object-store semantics more closely than an
// unconditional ref update (which could clobber other keys).
const maxRetries = 31 // 32 total attempts (initial + retries)
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 5 * time.Millisecond
bo.Multiplier = 2
bo.MaxInterval = 320 * time.Millisecond
bo.RandomizationFactor = 0 // deterministic; can add jitter later if needed
bo.Reset()
policy := backoff.WithContext(backoff.WithMaxRetries(bo, maxRetries), ctx)
var ver string
op := func() error {
parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
if err != nil {
return backoff.Permanent(err)
}
newCommit, msg, err := gbs.buildPutCommit(ctx, parent, ok, key, blobOID)
if err != nil {
return backoff.Permanent(err)
}
if !ok {
// Best-effort ref creation. If a concurrent writer created the ref first, retry
// and take the normal CAS path on the new head.
if err := gbs.api.UpdateRef(ctx, gbs.ref, newCommit, msg); err != nil {
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}
ver = newCommit.String()
return nil
}
err = gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg)
if err == nil {
ver = newCommit.String()
return nil
}
// If the ref changed since we read |parent|, retry on the new head. Otherwise
// surface the error (e.g. permissions, corruption).
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}
if err := backoff.Retry(op, policy); err != nil {
if ctx.Err() != nil {
return "", ctx.Err()
}
return "", err
}
return ver, nil
}
func (gbs *GitBlobstore) buildPutCommit(ctx context.Context, parent git.OID, hasParent bool, key string, blobOID git.OID) (git.OID, string, error) {
_, indexFile, cleanup, err := newTempIndex()
if err != nil {
return "", "", err
}
defer cleanup()
if hasParent {
if err := gbs.api.ReadTree(ctx, parent, indexFile); err != nil {
return "", "", err
}
} else {
if err := gbs.api.ReadTreeEmpty(ctx, indexFile); err != nil {
return "", "", err
}
}
// TODO(gitblobstore): Decide on a policy for file-vs-directory prefix conflicts when staging keys.
// For example, staging "a" when "a/b" already exists in the tree/index (or vice-versa) can fail
// with a git index error (path appears as both a file and directory). Today our NBS keyspace is
// flat (e.g. "manifest", "<tableid>", "<tableid>.records"), so this should not occur. If we ever
// namespace keys into directories, consider proactively removing conflicting paths from the index
// before UpdateIndexCacheInfo so Put/CheckAndPut remain robust.
if err := gbs.api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, key); err != nil {
return "", "", err
}
treeOID, err := gbs.api.WriteTree(ctx, indexFile)
if err != nil {
return "", "", err
}
var parentPtr *git.OID
if hasParent && parent != "" {
p := parent
parentPtr = &p
}
msg := fmt.Sprintf("gitblobstore: put %s", key)
// Prefer git's default identity from env/config when not explicitly configured.
commitOID, err := gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, gbs.identity)
if err != nil && gbs.identity == nil && isMissingGitIdentityErr(err) {
commitOID, err = gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, defaultGitBlobstoreIdentity())
}
if err != nil {
return "", "", err
}
return commitOID, msg, nil
}
func defaultGitBlobstoreIdentity() *git.Identity {
// Deterministic fallback identity for environments without git identity configured.
return &git.Identity{Name: "dolt gitblobstore", Email: "gitblobstore@dolt.invalid"}
}
func isMissingGitIdentityErr(err error) bool {
var ce *git.CmdError
if !errors.As(err, &ce) {
return false
}
msg := strings.ToLower(string(ce.Output))
// Common git messages:
// - "Author identity unknown"
// - "fatal: unable to auto-detect email address"
// - "fatal: empty ident name"
return strings.Contains(msg, "author identity unknown") ||
strings.Contains(msg, "unable to auto-detect email address") ||
strings.Contains(msg, "empty ident name")
}
func newTempIndex() (dir, indexFile string, cleanup func(), err error) {
// Create a unique temp index file. This is intentionally *not* placed under GIT_DIR:
// - some git dirs may be read-only or otherwise unsuitable for scratch files
// - we don't want to leave temp files inside the repo on crashes
//
// Note: git will also create a sibling lock file (<index>.lock) during index writes.
f, err := os.CreateTemp("", "dolt-gitblobstore-index-")
if err != nil {
return "", "", nil, err
}
indexFile = f.Name()
_ = f.Close()
dir = filepath.Dir(indexFile)
cleanup = func() {
_ = os.Remove(indexFile)
_ = os.Remove(indexFile + ".lock")
}
return dir, indexFile, cleanup, nil
}
func (gbs *GitBlobstore) refAdvanced(ctx context.Context, old git.OID) bool {
if ctx.Err() != nil {
return false
}
cur, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
return err == nil && ok && cur != old
}
func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader) (string, error) {

View File

@@ -15,9 +15,14 @@
package blobstore
import (
"bytes"
"context"
"errors"
"io"
"os"
"os/exec"
"path/filepath"
"sync/atomic"
"testing"
"github.com/stretchr/testify/require"
@@ -26,10 +31,19 @@ import (
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
)
func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) {
func requireGitOnPath(t *testing.T) {
t.Helper()
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found on PATH")
}
}
func testIdentity() *git.Identity {
return &git.Identity{Name: "gitblobstore test", Email: "gitblobstore@test.invalid"}
}
func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
@@ -55,9 +69,7 @@ func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) {
}
func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found on PATH")
}
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
@@ -100,9 +112,7 @@ func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) {
}
func TestGitBlobstore_Get_NotFoundMissingKey(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found on PATH")
}
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
@@ -122,9 +132,7 @@ func TestGitBlobstore_Get_NotFoundMissingKey(t *testing.T) {
}
func TestGitBlobstore_BlobRangeSemantics(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found on PATH")
}
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
@@ -173,9 +181,7 @@ func TestGitBlobstore_BlobRangeSemantics(t *testing.T) {
}
func TestGitBlobstore_InvalidKeysError(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found on PATH")
}
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
@@ -206,5 +212,176 @@ func TestGitBlobstore_InvalidKeysError(t *testing.T) {
_, _, _, err = bs.Get(ctx, k, AllRange)
require.Error(t, err, "expected error for key %q", k)
_, err = bs.Put(ctx, k, 1, bytes.NewReader([]byte("x")))
require.Error(t, err, "expected error for key %q", k)
}
}
func TestGitBlobstore_Put_RoundTripAndVersion(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
want := []byte("hello put\n")
ver, err := PutBytes(ctx, bs, "k", want)
require.NoError(t, err)
require.NotEmpty(t, ver)
ok, err := bs.Exists(ctx, "k")
require.NoError(t, err)
require.True(t, ok)
got, ver2, err := GetBytes(ctx, bs, "k", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, want, got)
}
func TestGitBlobstore_Put_Overwrite(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
ver1, err := PutBytes(ctx, bs, "k", []byte("v1\n"))
require.NoError(t, err)
require.NotEmpty(t, ver1)
ver2, err := PutBytes(ctx, bs, "k", []byte("v2\n"))
require.NoError(t, err)
require.NotEmpty(t, ver2)
require.NotEqual(t, ver1, ver2)
got, ver3, err := GetBytes(ctx, bs, "k", AllRange)
require.NoError(t, err)
require.Equal(t, ver2, ver3)
require.Equal(t, []byte("v2\n"), got)
}
type hookGitAPI struct {
git.GitAPI
ref string
// if set, called once before the first UpdateRefCAS executes.
onFirstCAS func(ctx context.Context, old git.OID)
did atomic.Bool
}
func (h *hookGitAPI) UpdateRefCAS(ctx context.Context, ref string, newOID git.OID, oldOID git.OID, msg string) error {
if h.onFirstCAS != nil && !h.did.Swap(true) && ref == h.ref {
h.onFirstCAS(ctx, oldOID)
}
return h.GitAPI.UpdateRefCAS(ctx, ref, newOID, oldOID, msg)
}
func writeKeyToRef(ctx context.Context, api git.GitAPI, ref string, key string, data []byte, author *git.Identity) (git.OID, error) {
parent, ok, err := api.TryResolveRefCommit(ctx, ref)
if err != nil {
return "", err
}
indexDir, err := os.MkdirTemp("", "gitblobstore-test-index-")
if err != nil {
return "", err
}
defer func() { _ = os.RemoveAll(indexDir) }()
indexFile := filepath.Join(indexDir, "index")
if ok {
if err := api.ReadTree(ctx, parent, indexFile); err != nil {
return "", err
}
} else {
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
return "", err
}
}
blobOID, err := api.HashObject(ctx, bytes.NewReader(data))
if err != nil {
return "", err
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, key); err != nil {
return "", err
}
treeOID, err := api.WriteTree(ctx, indexFile)
if err != nil {
return "", err
}
var parentPtr *git.OID
if ok && parent != "" {
p := parent
parentPtr = &p
}
msg := "test external writer"
commitOID, err := api.CommitTree(ctx, treeOID, parentPtr, msg, author)
if err != nil {
return "", err
}
if err := api.UpdateRef(ctx, ref, commitOID, msg); err != nil {
return "", err
}
return commitOID, nil
}
func TestGitBlobstore_Put_ContentionRetryPreservesOtherKey(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
// Seed the ref so Put takes the CAS path.
_, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
"base": []byte("base\n"),
}, "seed")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
origAPI := bs.api
h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef}
h.onFirstCAS = func(ctx context.Context, old git.OID) {
// Advance the ref to simulate another writer committing concurrently.
_, _ = writeKeyToRef(ctx, origAPI, DoltDataRef, "external", []byte("external\n"), testIdentity())
}
bs.api = h
ver, err := PutBytes(ctx, bs, "k", []byte("mine\n"))
require.NoError(t, err)
require.NotEmpty(t, ver)
got, ver2, err := GetBytes(ctx, bs, "k", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, []byte("mine\n"), got)
got, _, err = GetBytes(ctx, bs, "external", AllRange)
require.NoError(t, err)
require.Equal(t, []byte("external\n"), got)
got, _, err = GetBytes(ctx, bs, "base", AllRange)
require.NoError(t, err)
require.Equal(t, []byte("base\n"), got)
// Sanity: BlobReader path still works for the new commit.
rc, _, _, err := bs.Get(ctx, "k", AllRange)
require.NoError(t, err)
_, _ = io.ReadAll(rc)
_ = rc.Close()
}

View File

@@ -238,6 +238,134 @@ func TestGitAPIImpl_WriteTree_FromEmptyIndex(t *testing.T) {
}
}
func TestGitAPIImpl_UpdateIndexCacheInfo_ReplacesExistingEntry(t *testing.T) {
t.Parallel()
ctx := context.Background()
repo, err := gitrepo.InitBareTemp(ctx, "")
if err != nil {
t.Fatal(err)
}
r, err := NewRunner(repo.GitDir)
if err != nil {
t.Fatal(err)
}
api := NewGitAPIImpl(r)
indexFile := tempIndexFile(t)
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
t.Fatal(err)
}
path := "same.txt"
oid1, err := api.HashObject(ctx, bytes.NewReader([]byte("one\n")))
if err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oid1, path); err != nil {
t.Fatal(err)
}
// Update the same path again; this should succeed and replace the index entry.
oid2, err := api.HashObject(ctx, bytes.NewReader([]byte("two\n")))
if err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oid2, path); err != nil {
t.Fatal(err)
}
treeOID, err := api.WriteTree(ctx, indexFile)
if err != nil {
t.Fatal(err)
}
commitOID, err := api.CommitTree(ctx, treeOID, nil, "replace entry", testAuthor())
if err != nil {
t.Fatal(err)
}
gotBlobOID, err := api.ResolvePathBlob(ctx, commitOID, path)
if err != nil {
t.Fatal(err)
}
rc, err := api.BlobReader(ctx, gotBlobOID)
if err != nil {
t.Fatal(err)
}
defer rc.Close()
got, err := io.ReadAll(rc)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got, []byte("two\n")) {
t.Fatalf("expected replacement contents %q, got %q", "two\n", string(got))
}
}
func TestGitAPIImpl_UpdateIndexCacheInfo_FileDirectoryConflictErrors(t *testing.T) {
t.Parallel()
ctx := context.Background()
repo, err := gitrepo.InitBareTemp(ctx, "")
if err != nil {
t.Fatal(err)
}
r, err := NewRunner(repo.GitDir)
if err != nil {
t.Fatal(err)
}
api := NewGitAPIImpl(r)
indexFile := tempIndexFile(t)
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
t.Fatal(err)
}
oidDirChild, err := api.HashObject(ctx, bytes.NewReader([]byte("child\n")))
if err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidDirChild, "a/b.txt"); err != nil {
t.Fatal(err)
}
// Now try to stage "a" as a file; git should reject this (file vs directory conflict).
oidA, err := api.HashObject(ctx, bytes.NewReader([]byte("a\n")))
if err != nil {
t.Fatal(err)
}
err = api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidA, "a")
if err == nil {
t.Fatalf("expected conflict error staging %q when %q exists", "a", "a/b.txt")
}
// Inverse conflict: stage "x" as a file, then try to stage "x/y".
indexFile2 := tempIndexFile(t)
if err := api.ReadTreeEmpty(ctx, indexFile2); err != nil {
t.Fatal(err)
}
oidX, err := api.HashObject(ctx, bytes.NewReader([]byte("x\n")))
if err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile2, "100644", oidX, "x"); err != nil {
t.Fatal(err)
}
oidXY, err := api.HashObject(ctx, bytes.NewReader([]byte("xy\n")))
if err != nil {
t.Fatal(err)
}
err = api.UpdateIndexCacheInfo(ctx, indexFile2, "100644", oidXY, "x/y.txt")
if err == nil {
t.Fatalf("expected conflict error staging %q when %q exists", "x/y.txt", "x")
}
}
func TestGitAPIImpl_ReadTree_PreservesExistingPaths(t *testing.T) {
t.Parallel()