/go/store/blobstore/{git_blobstore,internal}: implement put

This commit is contained in:
coffeegoddd☕️✨
2026-02-04 13:35:25 -08:00
parent d25cbfe02b
commit 917ea5d950
+171 -6
View File
@@ -16,9 +16,15 @@ package blobstore
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/cenkalti/backoff/v4"
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
)
@@ -27,14 +33,18 @@ import (
// database (bare repo or .git directory). It stores keys as paths within the tree
// of the commit referenced by a git ref (e.g. refs/dolt/data).
//
// This initial implementation is intentionally READ-ONLY. Write-path methods
// (Put / CheckAndPut / Concatenate) return an explicit unimplemented error while
// we lock down read behavior for manifests and table files.
// This implementation is being developed in phases. Read paths are implemented first,
// then write paths are added incrementally. At the moment, Put is implemented, while
// CheckAndPut and Concatenate are still unimplemented.
type GitBlobstore struct {
gitDir string
ref string
runner *git.Runner
api git.GitAPI
// identity, when non-nil, is used as the author/committer identity for new commits.
// When nil, we prefer whatever identity git derives from env/config, falling back
// to a deterministic default only if git reports the identity is missing.
identity *git.Identity
}
var _ Blobstore = (*GitBlobstore)(nil)
@@ -42,11 +52,17 @@ var _ Blobstore = (*GitBlobstore)(nil)
// NewGitBlobstore creates a new read-only GitBlobstore rooted at |gitDir| and |ref|.
// |gitDir| should point at a bare repo directory or a .git directory.
func NewGitBlobstore(gitDir, ref string) (*GitBlobstore, error) {
return NewGitBlobstoreWithIdentity(gitDir, ref, nil)
}
// NewGitBlobstoreWithIdentity creates a GitBlobstore rooted at |gitDir| and |ref|, optionally
// forcing an author/committer identity for write paths.
func NewGitBlobstoreWithIdentity(gitDir, ref string, identity *git.Identity) (*GitBlobstore, error) {
r, err := git.NewRunner(gitDir)
if err != nil {
return nil, err
}
return &GitBlobstore{gitDir: gitDir, ref: ref, runner: r, api: git.NewGitAPIImpl(r)}, nil
return &GitBlobstore{gitDir: gitDir, ref: ref, runner: r, api: git.NewGitAPIImpl(r), identity: identity}, nil
}
func (gbs *GitBlobstore) Path() string {
@@ -157,10 +173,159 @@ func (l *limitReadCloser) Read(p []byte) (int, error) { return l.r.Read(p) }
func (l *limitReadCloser) Close() error { return l.c.Close() }
func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (string, error) {
if _, err := normalizeGitTreePath(key); err != nil {
key, err := normalizeGitTreePath(key)
if err != nil {
return "", err
}
return "", fmt.Errorf("%w: GitBlobstore.Put", git.ErrUnimplemented)
// Hash the contents once. If we need to retry due to concurrent updates to |gbs.ref|,
// we can reuse the blob OID without re-reading |reader|.
blobOID, err := gbs.api.HashObject(ctx, reader)
if err != nil {
return "", err
}
// Make Put resilient to concurrent writers updating unrelated keys by using a CAS loop
// under the hood. This matches typical object-store semantics more closely than an
// unconditional ref update (which could clobber other keys).
const maxRetries = 31 // 32 total attempts (initial + retries)
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 5 * time.Millisecond
bo.Multiplier = 2
bo.MaxInterval = 320 * time.Millisecond
bo.RandomizationFactor = 0 // deterministic; can add jitter later if needed
bo.Reset()
policy := backoff.WithContext(backoff.WithMaxRetries(bo, maxRetries), ctx)
var ver string
op := func() error {
parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
if err != nil {
return backoff.Permanent(err)
}
newCommit, msg, err := gbs.buildPutCommit(ctx, parent, ok, key, blobOID)
if err != nil {
return backoff.Permanent(err)
}
if !ok {
// Best-effort ref creation. If a concurrent writer created the ref first, retry
// and take the normal CAS path on the new head.
if err := gbs.api.UpdateRef(ctx, gbs.ref, newCommit, msg); err != nil {
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}
ver = newCommit.String()
return nil
}
err = gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg)
if err == nil {
ver = newCommit.String()
return nil
}
// If the ref changed since we read |parent|, retry on the new head. Otherwise
// surface the error (e.g. permissions, corruption).
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}
if err := backoff.Retry(op, policy); err != nil {
if ctx.Err() != nil {
return "", ctx.Err()
}
return "", err
}
return ver, nil
}
func (gbs *GitBlobstore) buildPutCommit(ctx context.Context, parent git.OID, hasParent bool, key string, blobOID git.OID) (git.OID, string, error) {
_, indexFile, cleanup, err := newTempIndex()
if err != nil {
return "", "", err
}
defer cleanup()
if hasParent {
if err := gbs.api.ReadTree(ctx, parent, indexFile); err != nil {
return "", "", err
}
} else {
if err := gbs.api.ReadTreeEmpty(ctx, indexFile); err != nil {
return "", "", err
}
}
if err := gbs.api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, key); err != nil {
return "", "", err
}
treeOID, err := gbs.api.WriteTree(ctx, indexFile)
if err != nil {
return "", "", err
}
var parentPtr *git.OID
if hasParent && parent != "" {
p := parent
parentPtr = &p
}
msg := fmt.Sprintf("gitblobstore: put %s", key)
// Prefer git's default identity from env/config when not explicitly configured.
commitOID, err := gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, gbs.identity)
if err != nil && gbs.identity == nil && isMissingGitIdentityErr(err) {
commitOID, err = gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, defaultGitBlobstoreIdentity())
}
if err != nil {
return "", "", err
}
return commitOID, msg, nil
}
func defaultGitBlobstoreIdentity() *git.Identity {
// Deterministic fallback identity for environments without git identity configured.
return &git.Identity{Name: "dolt gitblobstore", Email: "gitblobstore@dolt.invalid"}
}
func isMissingGitIdentityErr(err error) bool {
var ce *git.CmdError
if !errors.As(err, &ce) {
return false
}
msg := strings.ToLower(string(ce.Output))
// Common git messages:
// - "Author identity unknown"
// - "fatal: unable to auto-detect email address"
// - "fatal: empty ident name"
return strings.Contains(msg, "author identity unknown") ||
strings.Contains(msg, "unable to auto-detect email address") ||
strings.Contains(msg, "empty ident name")
}
func newTempIndex() (dir, indexFile string, cleanup func(), err error) {
dir, err = os.MkdirTemp("", "dolt-gitblobstore-index-")
if err != nil {
return "", "", nil, err
}
indexFile = filepath.Join(dir, "index")
cleanup = func() { _ = os.RemoveAll(dir) }
return dir, indexFile, cleanup, nil
}
func (gbs *GitBlobstore) refAdvanced(ctx context.Context, old git.OID) bool {
if ctx.Err() != nil {
return false
}
cur, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
return err == nil && ok && cur != old
}
func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader) (string, error) {