mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-25 00:54:51 -06:00
Merge pull request #10458 from dolthub/db/two-phase
GitBlobstore: remote-managed fetch/merge/push sync
This commit is contained in:
@@ -21,19 +21,19 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/google/uuid"
|
||||
|
||||
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
|
||||
)
|
||||
|
||||
const gitblobstorePartNameWidth = 8 // "00000001"
|
||||
const gitblobstorePartNameWidth = 4 // "0001"
|
||||
|
||||
type chunkPartRef struct {
|
||||
oidHex string
|
||||
@@ -53,7 +53,7 @@ type treeWrite struct {
|
||||
|
||||
type putPlan struct {
|
||||
writes []treeWrite
|
||||
// If true, the key should be represented as a tree (chunked parts under key/NNNNNNNN).
|
||||
// If true, the key should be represented as a tree (chunked parts under key/NNNN).
|
||||
chunked bool
|
||||
}
|
||||
|
||||
@@ -254,14 +254,15 @@ func (c *concatReadCloser) Close() error {
|
||||
// GitBlobstore is a Blobstore implementation backed by a git repository's object
|
||||
// database (bare repo or .git directory). It stores keys as paths within the tree
|
||||
// of the commit referenced by a git ref (e.g. refs/dolt/data).
|
||||
//
|
||||
// This implementation is being developed in phases. Read paths were implemented first,
|
||||
// then write paths were added incrementally.
|
||||
type GitBlobstore struct {
|
||||
gitDir string
|
||||
ref string
|
||||
runner *git.Runner
|
||||
api git.GitAPI
|
||||
gitDir string
|
||||
remoteRef string
|
||||
localRef string
|
||||
runner *git.Runner
|
||||
api git.GitAPI
|
||||
remoteName string
|
||||
remoteTrackingRef string
|
||||
mu sync.Mutex
|
||||
// identity, when non-nil, is used as the author/committer identity for new commits.
|
||||
// When nil, we prefer whatever identity git derives from env/config, falling back
|
||||
// to a deterministic default only if git reports the identity is missing.
|
||||
@@ -277,8 +278,7 @@ type GitBlobstore struct {
|
||||
var _ Blobstore = (*GitBlobstore)(nil)
|
||||
|
||||
// NewGitBlobstore creates a new GitBlobstore rooted at |gitDir| and |ref|.
|
||||
// |gitDir| should point at a bare repo directory or a .git directory. Put is implemented,
|
||||
// while CheckAndPut and Concatenate are still unimplemented (see type-level docs).
|
||||
// |gitDir| should point at a bare repo directory or a .git directory.
|
||||
func NewGitBlobstore(gitDir, ref string) (*GitBlobstore, error) {
|
||||
return NewGitBlobstoreWithOptions(gitDir, ref, GitBlobstoreOptions{})
|
||||
}
|
||||
@@ -296,6 +296,9 @@ type GitBlobstoreOptions struct {
|
||||
// MaxPartSize enables chunked-object writes when non-zero.
|
||||
// Read paths always support chunked objects if encountered.
|
||||
MaxPartSize uint64
|
||||
// RemoteName is the git remote name to use for remote-managed mode (e.g. "origin").
|
||||
// If empty, it defaults to "origin".
|
||||
RemoteName string
|
||||
}
|
||||
|
||||
// NewGitBlobstoreWithOptions creates a GitBlobstore rooted at |gitDir| and |ref|.
|
||||
@@ -304,18 +307,165 @@ func NewGitBlobstoreWithOptions(gitDir, ref string, opts GitBlobstoreOptions) (*
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
remoteName := opts.RemoteName
|
||||
if remoteName == "" {
|
||||
remoteName = "origin"
|
||||
}
|
||||
remoteRef := ref
|
||||
remoteTrackingRef := RemoteTrackingRef(remoteName, remoteRef)
|
||||
localRef := OwnedLocalRef(remoteName, remoteRef, uuid.NewString())
|
||||
|
||||
return &GitBlobstore{
|
||||
gitDir: gitDir,
|
||||
ref: ref,
|
||||
runner: r,
|
||||
api: git.NewGitAPIImpl(r),
|
||||
identity: opts.Identity,
|
||||
maxPartSize: opts.MaxPartSize,
|
||||
gitDir: gitDir,
|
||||
remoteRef: remoteRef,
|
||||
localRef: localRef,
|
||||
runner: r,
|
||||
api: git.NewGitAPIImpl(r),
|
||||
remoteName: remoteName,
|
||||
remoteTrackingRef: remoteTrackingRef,
|
||||
identity: opts.Identity,
|
||||
maxPartSize: opts.MaxPartSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) Path() string {
|
||||
return fmt.Sprintf("%s@%s", gbs.gitDir, gbs.ref)
|
||||
return fmt.Sprintf("%s@%s", gbs.gitDir, gbs.remoteRef)
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) validateRemoteManaged() error {
|
||||
if gbs.remoteName == "" || gbs.remoteRef == "" || gbs.remoteTrackingRef == "" || gbs.localRef == "" {
|
||||
return fmt.Errorf("gitblobstore: remote-managed mode misconfigured (remoteName=%q remoteRef=%q trackingRef=%q localRef=%q)", gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef, gbs.localRef)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CleanupOwnedLocalRef best-effort deletes this blobstore instance's UUID-owned local ref.
|
||||
//
|
||||
// This is an optional hygiene API: by default, UUID-owned local refs may accumulate in the
|
||||
// repo over time. Callers that care about cleanup (e.g. tests) may invoke this explicitly.
|
||||
func (gbs *GitBlobstore) CleanupOwnedLocalRef(ctx context.Context) error {
|
||||
gbs.mu.Lock()
|
||||
defer gbs.mu.Unlock()
|
||||
|
||||
_, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
_, err = gbs.runner.Run(ctx, git.RunOptions{}, "update-ref", "-d", gbs.localRef)
|
||||
return err
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) syncForRead(ctx context.Context) error {
|
||||
if err := gbs.validateRemoteManaged(); err != nil {
|
||||
return err
|
||||
}
|
||||
gbs.mu.Lock()
|
||||
defer gbs.mu.Unlock()
|
||||
|
||||
// 1) Fetch remote ref into our remote-tracking ref.
|
||||
if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remoteHead, okRemote, err := gbs.api.TryResolveRefCommit(ctx, gbs.remoteTrackingRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !okRemote {
|
||||
return &git.RefNotFoundError{Ref: gbs.remoteTrackingRef}
|
||||
}
|
||||
|
||||
// 2) Force-set owned local ref to remote head (no merge; remote is source-of-truth).
|
||||
return gbs.api.UpdateRef(ctx, gbs.localRef, remoteHead, "gitblobstore: sync read")
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string, build func(parent git.OID, ok bool) (git.OID, error)) (string, error) {
|
||||
if err := gbs.validateRemoteManaged(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
gbs.mu.Lock()
|
||||
defer gbs.mu.Unlock()
|
||||
|
||||
policy := gbs.casRetryPolicy(ctx)
|
||||
|
||||
var ver string
|
||||
op := func() error {
|
||||
// 1) Fetch remote state into local tracking ref.
|
||||
if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil {
|
||||
return err
|
||||
}
|
||||
remoteHead, okRemote, err := gbs.api.TryResolveRefCommit(ctx, gbs.remoteTrackingRef)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
if !okRemote {
|
||||
return backoff.Permanent(&git.RefNotFoundError{Ref: gbs.remoteTrackingRef})
|
||||
}
|
||||
|
||||
// 2) Force-set owned local ref to remote head (remote is source-of-truth).
|
||||
if err := gbs.api.UpdateRef(ctx, gbs.localRef, remoteHead, "gitblobstore: sync write"); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
// 3) Apply this operation's changes on top of the remote head.
|
||||
newCommit, err := build(remoteHead, true)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
if err := gbs.api.UpdateRef(ctx, gbs.localRef, newCommit, msg); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
// 4) Push local ref to remote with lease.
|
||||
if err := gbs.api.PushRefWithLease(ctx, gbs.remoteName, gbs.localRef, gbs.remoteRef, remoteHead); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ver, err = gbs.resolveKeyVersionAtCommit(ctx, newCommit, key)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := backoff.Retry(op, policy); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return "", ctx.Err()
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
return ver, nil
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) putWithRemoteSync(ctx context.Context, key string, plan putPlan, msg string) (string, error) {
|
||||
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, _ bool) (git.OID, error) {
|
||||
return gbs.buildCommitForKeyWrite(ctx, remoteHead, true, key, plan, msg)
|
||||
})
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) checkAndPutWithRemoteSync(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader, msg string) (string, error) {
|
||||
var cachedPlan *putPlan
|
||||
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, _ bool) (git.OID, error) {
|
||||
actualKeyVersion, err := gbs.currentKeyVersion(ctx, remoteHead, true, key)
|
||||
if err != nil {
|
||||
return git.OID(""), err
|
||||
}
|
||||
if expectedVersion != actualKeyVersion {
|
||||
return git.OID(""), CheckAndPutError{Key: key, ExpectedVersion: expectedVersion, ActualVersion: actualKeyVersion}
|
||||
}
|
||||
if cachedPlan == nil {
|
||||
plan, err := gbs.planPutWrites(ctx, key, totalSize, reader)
|
||||
if err != nil {
|
||||
return git.OID(""), err
|
||||
}
|
||||
cachedPlan = &plan
|
||||
}
|
||||
return gbs.buildCommitForKeyWrite(ctx, remoteHead, true, key, *cachedPlan, msg)
|
||||
})
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) Exists(ctx context.Context, key string) (bool, error) {
|
||||
@@ -323,7 +473,10 @@ func (gbs *GitBlobstore) Exists(ctx context.Context, key string) (bool, error) {
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
|
||||
if err := gbs.syncForRead(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -345,6 +498,9 @@ func (gbs *GitBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.
|
||||
if err != nil {
|
||||
return nil, 0, "", err
|
||||
}
|
||||
if err := gbs.syncForRead(ctx); err != nil {
|
||||
return nil, 0, "", err
|
||||
}
|
||||
commit, err := gbs.resolveCommitForGet(ctx, key)
|
||||
if err != nil {
|
||||
return nil, 0, "", err
|
||||
@@ -414,10 +570,9 @@ func (gbs *GitBlobstore) validateAndSizeChunkedParts(ctx context.Context, entrie
|
||||
return nil, 0, fmt.Errorf("gitblobstore: chunked tree has no parts")
|
||||
}
|
||||
|
||||
width := len(entries[0].Name)
|
||||
// First pass: validate names + types, and determine width.
|
||||
if width < 4 {
|
||||
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected at least 4 digits)", entries[0].Name)
|
||||
// GitBlobstore chunked trees use fixed-width 4-digit part names: 0001, 0002, ...
|
||||
if len(entries[0].Name) != gitblobstorePartNameWidth {
|
||||
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected width %d)", entries[0].Name, gitblobstorePartNameWidth)
|
||||
}
|
||||
|
||||
parts := make([]chunkPartRef, 0, len(entries))
|
||||
@@ -426,18 +581,18 @@ func (gbs *GitBlobstore) validateAndSizeChunkedParts(ctx context.Context, entrie
|
||||
if e.Type != git.ObjectTypeBlob {
|
||||
return nil, 0, fmt.Errorf("gitblobstore: invalid part %q: expected blob, got %q", e.Name, e.Type)
|
||||
}
|
||||
if len(e.Name) != width {
|
||||
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected width %d)", e.Name, width)
|
||||
if len(e.Name) != gitblobstorePartNameWidth {
|
||||
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected width %d)", e.Name, gitblobstorePartNameWidth)
|
||||
}
|
||||
n, err := strconv.Atoi(e.Name)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected digits): %w", e.Name, err)
|
||||
}
|
||||
if n != i+1 {
|
||||
want := fmt.Sprintf("%0*d", width, i+1)
|
||||
want := fmt.Sprintf("%0*d", gitblobstorePartNameWidth, i+1)
|
||||
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected %q)", e.Name, want)
|
||||
}
|
||||
if want := fmt.Sprintf("%0*d", width, n); want != e.Name {
|
||||
if want := fmt.Sprintf("%0*d", gitblobstorePartNameWidth, n); want != e.Name {
|
||||
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected %q)", e.Name, want)
|
||||
}
|
||||
|
||||
@@ -458,7 +613,7 @@ func (gbs *GitBlobstore) validateAndSizeChunkedParts(ctx context.Context, entrie
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) resolveCommitForGet(ctx context.Context, key string) (commit git.OID, err error) {
|
||||
commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
|
||||
commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef)
|
||||
if err != nil {
|
||||
return git.OID(""), err
|
||||
}
|
||||
@@ -471,7 +626,7 @@ func (gbs *GitBlobstore) resolveCommitForGet(ctx context.Context, key string) (c
|
||||
if key == "manifest" {
|
||||
return git.OID(""), NotFound{Key: key}
|
||||
}
|
||||
return git.OID(""), &git.RefNotFoundError{Ref: gbs.ref}
|
||||
return git.OID(""), &git.RefNotFoundError{Ref: gbs.localRef}
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) resolveObjectForGet(ctx context.Context, commit git.OID, key string) (oid git.OID, typ git.ObjectType, err error) {
|
||||
@@ -499,6 +654,11 @@ func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, r
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Ensure the idempotent "key exists" fast-path observes remote state.
|
||||
if err := gbs.syncForRead(ctx); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Many NBS/table-file writes are content-addressed: if the key already exists, callers
|
||||
// assume it refers to the same bytes and treat the operation as idempotent.
|
||||
//
|
||||
@@ -515,13 +675,13 @@ func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, r
|
||||
|
||||
msg := fmt.Sprintf("gitblobstore: put %s", key)
|
||||
|
||||
// Hash the contents once. If we need to retry due to concurrent updates to |gbs.ref|,
|
||||
// Hash the contents once. If we need to retry due to a concurrent remote advance (lease failure),
|
||||
// we can reuse the resulting object OIDs without re-reading |reader|.
|
||||
plan, err := gbs.planPutWrites(ctx, key, totalSize, reader)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return gbs.putWithCASRetries(ctx, key, plan, msg)
|
||||
return gbs.putWithRemoteSync(ctx, key, plan, msg)
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) planPutWrites(ctx context.Context, key string, totalSize int64, reader io.Reader) (putPlan, error) {
|
||||
@@ -596,44 +756,6 @@ func (gbs *GitBlobstore) hashParts(ctx context.Context, reader io.Reader) (parts
|
||||
return parts, partOIDs, total, nil
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) putWithCASRetries(ctx context.Context, key string, plan putPlan, msg string) (string, error) {
|
||||
// Make Put resilient to concurrent writers updating unrelated keys by using a CAS loop
|
||||
// under the hood. This matches typical object-store semantics more closely than an
|
||||
// unconditional ref update (which could clobber other keys).
|
||||
policy := gbs.casRetryPolicy(ctx)
|
||||
|
||||
var ver string
|
||||
op := func() error {
|
||||
parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
newCommit, err := gbs.buildCommitForKeyWrite(ctx, parent, ok, key, plan, msg)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
if err := gbs.updateRefCASForWrite(ctx, parent, ok, newCommit, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ver, err = gbs.resolveKeyVersionAtCommit(ctx, newCommit, key)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := backoff.Retry(op, policy); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return "", ctx.Err()
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
return ver, nil
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) casRetryPolicy(ctx context.Context) backoff.BackOff {
|
||||
const maxRetries = 31 // 32 total attempts (initial + retries)
|
||||
bo := backoff.NewExponentialBackOff()
|
||||
@@ -646,7 +768,7 @@ func (gbs *GitBlobstore) casRetryPolicy(ctx context.Context) backoff.BackOff {
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) buildCommitForKeyWrite(ctx context.Context, parent git.OID, hasParent bool, key string, plan putPlan, msg string) (git.OID, error) {
|
||||
_, indexFile, cleanup, err := newTempIndex()
|
||||
_, indexFile, cleanup, err := git.NewTempIndex()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -733,39 +855,6 @@ func (gbs *GitBlobstore) removeKeyConflictsFromIndex(ctx context.Context, parent
|
||||
}
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) updateRefCASForWrite(ctx context.Context, parent git.OID, haveParent bool, newCommit git.OID, msg string) error {
|
||||
if !haveParent {
|
||||
// Create-only CAS: oldOID=all-zero requires the ref to not exist. This avoids
|
||||
// losing concurrent writes when multiple goroutines create the ref at once.
|
||||
const zeroOID = git.OID("0000000000000000000000000000000000000000")
|
||||
if err := gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, zeroOID, msg); err != nil {
|
||||
if gbs.refAdvanced(ctx, parent) {
|
||||
return err
|
||||
}
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg); err != nil {
|
||||
// If the ref changed since we read |parent|, retry on the new head. Otherwise
|
||||
// surface the error (e.g. permissions, corruption).
|
||||
if gbs.refAdvanced(ctx, parent) {
|
||||
return err
|
||||
}
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) refAdvanced(ctx context.Context, old git.OID) bool {
|
||||
if ctx.Err() != nil {
|
||||
return false
|
||||
}
|
||||
cur, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
|
||||
return err == nil && ok && cur != old
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) resolveKeyVersionAtCommit(ctx context.Context, commit git.OID, key string) (string, error) {
|
||||
oid, _, err := gbs.api.ResolvePathObject(ctx, commit, key)
|
||||
if err != nil {
|
||||
@@ -779,7 +868,7 @@ func (gbs *GitBlobstore) tryFastSucceedPutIfKeyExists(ctx context.Context, key s
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
commit, haveCommit, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
|
||||
commit, haveCommit, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
@@ -805,61 +894,7 @@ func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key s
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("gitblobstore: checkandput %s", key)
|
||||
|
||||
policy := gbs.casRetryPolicy(ctx)
|
||||
|
||||
var newKeyVersion string
|
||||
var cachedPlan *putPlan
|
||||
op := func() error {
|
||||
parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
actualKeyVersion, err := gbs.currentKeyVersion(ctx, parent, ok, key)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
if expectedVersion != actualKeyVersion {
|
||||
return backoff.Permanent(CheckAndPutError{Key: key, ExpectedVersion: expectedVersion, ActualVersion: actualKeyVersion})
|
||||
}
|
||||
|
||||
// Only hash/consume the reader once we know the expectedVersion matches.
|
||||
// If we need to retry due to unrelated ref advances, reuse the cached plan so we
|
||||
// don't re-read |reader| (which may not be rewindable).
|
||||
if cachedPlan == nil {
|
||||
plan, err := gbs.planPutWrites(ctx, key, totalSize, reader)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
cachedPlan = &plan
|
||||
}
|
||||
|
||||
newCommit, err := gbs.buildCommitForKeyWrite(ctx, parent, ok, key, *cachedPlan, msg)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
if err := gbs.updateRefCASForWrite(ctx, parent, ok, newCommit, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ver, err := gbs.resolveKeyVersionAtCommit(ctx, newCommit, key)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
newKeyVersion = ver
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := backoff.Retry(op, policy); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return "", ctx.Err()
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
return newKeyVersion, nil
|
||||
return gbs.checkAndPutWithRemoteSync(ctx, expectedVersion, key, totalSize, reader, msg)
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) currentKeyVersion(ctx context.Context, commit git.OID, haveCommit bool, key string) (string, error) {
|
||||
@@ -884,6 +919,9 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := gbs.syncForRead(ctx); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(sources) == 0 {
|
||||
return "", fmt.Errorf("gitblobstore: concatenate requires at least one source")
|
||||
}
|
||||
@@ -905,7 +943,7 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []
|
||||
}
|
||||
|
||||
// Resolve a snapshot commit for the sources.
|
||||
commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
|
||||
commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -914,7 +952,7 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []
|
||||
if key == "manifest" {
|
||||
return "", NotFound{Key: key}
|
||||
}
|
||||
return "", &git.RefNotFoundError{Ref: gbs.ref}
|
||||
return "", &git.RefNotFoundError{Ref: gbs.localRef}
|
||||
}
|
||||
|
||||
totalSz, err := gbs.totalSizeAtCommit(ctx, commit, sources)
|
||||
@@ -937,7 +975,7 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("gitblobstore: concatenate %s", key)
|
||||
return gbs.putWithCASRetries(ctx, key, plan, msg)
|
||||
return gbs.putWithRemoteSync(ctx, key, plan, msg)
|
||||
}
|
||||
|
||||
func (gbs *GitBlobstore) openReaderAtCommit(ctx context.Context, commit git.OID, key string) (io.ReadCloser, error) {
|
||||
@@ -1179,26 +1217,6 @@ func isMissingGitIdentityErr(err error) bool {
|
||||
strings.Contains(msg, "empty ident name")
|
||||
}
|
||||
|
||||
func newTempIndex() (dir, indexFile string, cleanup func(), err error) {
|
||||
// Create a unique temp index file. This is intentionally *not* placed under GIT_DIR:
|
||||
// - some git dirs may be read-only or otherwise unsuitable for scratch files
|
||||
// - we don't want to leave temp files inside the repo on crashes
|
||||
//
|
||||
// Note: git will also create a sibling lock file (<index>.lock) during index writes.
|
||||
f, err := os.CreateTemp("", "dolt-gitblobstore-index-")
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
indexFile = f.Name()
|
||||
_ = f.Close()
|
||||
dir = filepath.Dir(indexFile)
|
||||
cleanup = func() {
|
||||
_ = os.Remove(indexFile)
|
||||
_ = os.Remove(indexFile + ".lock")
|
||||
}
|
||||
return dir, indexFile, cleanup, nil
|
||||
}
|
||||
|
||||
// normalizeGitTreePath normalizes and validates a blobstore key for use as a git tree path.
|
||||
//
|
||||
// Rules:
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
|
||||
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
|
||||
)
|
||||
|
||||
@@ -30,10 +31,19 @@ func TestGitBlobstore_CheckAndPut_ChunkedRoundTrip_CreateOnly(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
_, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 3,
|
||||
})
|
||||
@@ -60,16 +70,25 @@ func TestGitBlobstore_CheckAndPut_MismatchDoesNotConsumeReader_WithChunkingEnabl
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
_, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Seed any commit so actualVersion != "".
|
||||
bs0, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{Identity: testIdentity()})
|
||||
bs0, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{Identity: testIdentity()})
|
||||
require.NoError(t, err)
|
||||
_, err = bs0.Put(ctx, "x", 1, bytes.NewReader([]byte("x")))
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 3,
|
||||
})
|
||||
|
||||
@@ -31,24 +31,31 @@ func TestGitBlobstore_Get_ChunkedTree_AllAndRanges(t *testing.T) {
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
part1 := []byte("abc")
|
||||
part2 := []byte("defgh")
|
||||
commitOID, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
commitOID, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"chunked/0001": part1,
|
||||
"chunked/0002": part2,
|
||||
}, "seed chunked tree")
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
remoteRunner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
api := git.NewGitAPIImpl(remoteRunner)
|
||||
treeOID, _, err := api.ResolvePathObject(ctx, git.OID(commitOID), "chunked")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
wantAll := append(append([]byte(nil), part1...), part2...)
|
||||
@@ -82,17 +89,24 @@ func TestGitBlobstore_Get_ChunkedTree_InvalidPartsError(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Gap: 0001, 0003
|
||||
_, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
_, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"chunked/0001": []byte("a"),
|
||||
"chunked/0003": []byte("b"),
|
||||
}, "seed invalid chunked tree")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = GetBytes(ctx, bs, "chunked", AllRange)
|
||||
|
||||
@@ -29,10 +29,19 @@ func TestGitBlobstore_Put_ChunkedWritesTreeParts(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
_, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 3,
|
||||
})
|
||||
@@ -47,7 +56,7 @@ func TestGitBlobstore_Put_ChunkedWritesTreeParts(t *testing.T) {
|
||||
require.Equal(t, ver, ver2)
|
||||
require.Equal(t, want, got)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
runner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
|
||||
@@ -62,24 +71,33 @@ func TestGitBlobstore_Put_ChunkedWritesTreeParts(t *testing.T) {
|
||||
entries, err := api.ListTree(ctx, head, "big")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, entries, 4)
|
||||
require.Equal(t, "00000001", entries[0].Name)
|
||||
require.Equal(t, "00000004", entries[3].Name)
|
||||
require.Equal(t, "0001", entries[0].Name)
|
||||
require.Equal(t, "0004", entries[3].Name)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Put_IdempotentDoesNotChangeExistingRepresentation(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
_, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 3,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
runner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
|
||||
|
||||
@@ -33,6 +33,8 @@ type fakeGitAPI struct {
|
||||
listTree func(ctx context.Context, commit git.OID, treePath string) ([]git.TreeEntry, error)
|
||||
blobSize func(ctx context.Context, oid git.OID) (int64, error)
|
||||
blobReader func(ctx context.Context, oid git.OID) (io.ReadCloser, error)
|
||||
fetchRef func(ctx context.Context, remote string, srcRef string, dstRef string) error
|
||||
pushRefWithLease func(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID git.OID) error
|
||||
}
|
||||
|
||||
func (f fakeGitAPI) TryResolveRefCommit(ctx context.Context, ref string) (git.OID, bool, error) {
|
||||
@@ -86,6 +88,18 @@ func (f fakeGitAPI) UpdateRefCAS(ctx context.Context, ref string, newOID git.OID
|
||||
func (f fakeGitAPI) UpdateRef(ctx context.Context, ref string, newOID git.OID, msg string) error {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) FetchRef(ctx context.Context, remote string, srcRef string, dstRef string) error {
|
||||
if f.fetchRef == nil {
|
||||
panic("unexpected call")
|
||||
}
|
||||
return f.fetchRef(ctx, remote, srcRef, dstRef)
|
||||
}
|
||||
func (f fakeGitAPI) PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID git.OID) error {
|
||||
if f.pushRefWithLease == nil {
|
||||
panic("unexpected call")
|
||||
}
|
||||
return f.pushRefWithLease(ctx, remote, srcRef, dstRef, expectedDstOID)
|
||||
}
|
||||
|
||||
func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
@@ -97,7 +111,7 @@ func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) {
|
||||
return git.OID("0123456789abcdef0123456789abcdef01234567"), true, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{ref: DoltDataRef, api: api}
|
||||
gbs := &GitBlobstore{localRef: DoltDataRef, api: api}
|
||||
|
||||
commit, err := gbs.resolveCommitForGet(ctx, "k")
|
||||
require.NoError(t, err)
|
||||
@@ -110,7 +124,7 @@ func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) {
|
||||
return git.OID(""), false, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{ref: DoltDataRef, api: api}
|
||||
gbs := &GitBlobstore{localRef: DoltDataRef, api: api}
|
||||
|
||||
_, err := gbs.resolveCommitForGet(ctx, "manifest")
|
||||
var nf NotFound
|
||||
@@ -124,7 +138,7 @@ func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) {
|
||||
return git.OID(""), false, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{ref: DoltDataRef, api: api}
|
||||
gbs := &GitBlobstore{localRef: DoltDataRef, api: api}
|
||||
|
||||
_, err := gbs.resolveCommitForGet(ctx, "somekey")
|
||||
var rnf *git.RefNotFoundError
|
||||
@@ -139,7 +153,7 @@ func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) {
|
||||
return git.OID(""), false, sentinel
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{ref: DoltDataRef, api: api}
|
||||
gbs := &GitBlobstore{localRef: DoltDataRef, api: api}
|
||||
|
||||
_, err := gbs.resolveCommitForGet(ctx, "k")
|
||||
require.ErrorIs(t, err, sentinel)
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
@@ -42,14 +43,30 @@ func testIdentity() *git.Identity {
|
||||
return &git.Identity{Name: "gitblobstore test", Email: "gitblobstore@test.invalid"}
|
||||
}
|
||||
|
||||
func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) {
|
||||
func newRemoteAndLocalRepos(t *testing.T, ctx context.Context) (remoteRepo *gitrepo.Repo, localRepo *gitrepo.Repo, localRunner *git.Runner) {
|
||||
t.Helper()
|
||||
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
localRepo, err = gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err = git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
return remoteRepo, localRepo, localRunner
|
||||
}
|
||||
|
||||
func TestGitBlobstore_MissingKeysAreNotFound(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
ok, err := bs.Exists(ctx, "manifest")
|
||||
@@ -60,35 +77,31 @@ func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
require.True(t, IsNotFoundError(err))
|
||||
|
||||
// For non-manifest keys, missing the ref is a hard error.
|
||||
_, _, _, err = bs.Get(ctx, "table", AllRange)
|
||||
require.Error(t, err)
|
||||
require.False(t, IsNotFoundError(err))
|
||||
var rnf *git.RefNotFoundError
|
||||
require.True(t, errors.As(err, &rnf))
|
||||
require.True(t, IsNotFoundError(err))
|
||||
}
|
||||
|
||||
func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
|
||||
want := []byte("hello manifest\n")
|
||||
commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
commit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"manifest": want,
|
||||
"dir/file": []byte("abc"),
|
||||
}, "seed")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
remoteRunner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
manifestOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "manifest")
|
||||
remoteAPI := git.NewGitAPIImpl(remoteRunner)
|
||||
manifestOID, _, err := remoteAPI.ResolvePathObject(ctx, git.OID(commit), "manifest")
|
||||
require.NoError(t, err)
|
||||
|
||||
ok, err := bs.Exists(ctx, "manifest")
|
||||
@@ -117,19 +130,421 @@ func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) {
|
||||
_ = rc.Close()
|
||||
}
|
||||
|
||||
func TestGitBlobstore_RemoteManaged_ExistsFetchesAndAligns(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
remoteCommit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"manifest": []byte("from remote\n"),
|
||||
}, "seed remote")
|
||||
require.NoError(t, err)
|
||||
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
RemoteName: "origin",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
ok, err := bs.Exists(ctx, "manifest")
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
|
||||
require.Equal(t, DoltDataRef, bs.remoteRef)
|
||||
require.Equal(t, RemoteTrackingRef("origin", DoltDataRef), bs.remoteTrackingRef)
|
||||
require.NotEqual(t, bs.remoteRef, bs.localRef)
|
||||
require.NotEqual(t, bs.remoteTrackingRef, bs.localRef)
|
||||
require.True(t, strings.HasPrefix(bs.localRef, "refs/dolt/blobstore/origin/dolt/data/"))
|
||||
|
||||
localAPI := git.NewGitAPIImpl(localRunner)
|
||||
gotLocal, err := localAPI.ResolveRefCommit(ctx, bs.localRef)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.OID(remoteCommit), gotLocal)
|
||||
|
||||
gotTracking, err := localAPI.ResolveRefCommit(ctx, bs.remoteTrackingRef)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.OID(remoteCommit), gotTracking)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_RemoteAndLocalRefNaming_ConfigurableRemoteRef(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
const remoteRef = "refs/heads/alt"
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, remoteRef, GitBlobstoreOptions{
|
||||
RemoteName: "origin",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, remoteRef, bs.remoteRef)
|
||||
require.Equal(t, RemoteTrackingRef("origin", remoteRef), bs.remoteTrackingRef)
|
||||
require.NotEmpty(t, bs.localRef)
|
||||
require.NotEqual(t, bs.remoteRef, bs.localRef)
|
||||
require.NotEqual(t, bs.remoteTrackingRef, bs.localRef)
|
||||
require.True(t, strings.HasPrefix(bs.localRef, "refs/dolt/blobstore/origin/heads/alt/"))
|
||||
}
|
||||
|
||||
func TestGitBlobstore_CleanupOwnedLocalRef_DeletesRef(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
RemoteName: "origin",
|
||||
Identity: testIdentity(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = localRepo.SetRefToTree(ctx, bs.localRef, map[string][]byte{
|
||||
"manifest": []byte("x"),
|
||||
}, "seed localRef")
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
|
||||
_, err = api.ResolveRefCommit(ctx, bs.localRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, bs.CleanupOwnedLocalRef(ctx))
|
||||
|
||||
_, err = api.ResolveRefCommit(ctx, bs.localRef)
|
||||
var rnf *git.RefNotFoundError
|
||||
require.ErrorAs(t, err, &rnf)
|
||||
require.Equal(t, bs.localRef, rnf.Ref)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_RemoteManaged_PutPushesToRemote(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
_, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"base": []byte("base\n"),
|
||||
}, "seed remote")
|
||||
require.NoError(t, err)
|
||||
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
RemoteName: "origin",
|
||||
Identity: testIdentity(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
ver, err := PutBytes(ctx, bs, "k", []byte("from local\n"))
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver)
|
||||
|
||||
remoteRunner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
remoteAPI := git.NewGitAPIImpl(remoteRunner)
|
||||
|
||||
remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
oid, typ, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "k")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.ObjectTypeBlob, typ)
|
||||
|
||||
rc, err := remoteAPI.BlobReader(ctx, oid)
|
||||
require.NoError(t, err)
|
||||
got, err := io.ReadAll(rc)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, rc.Close())
|
||||
require.Equal(t, []byte("from local\n"), got)
|
||||
}
|
||||
|
||||
type hookPushGitAPI struct {
|
||||
git.GitAPI
|
||||
onFirstPush func(ctx context.Context)
|
||||
did atomic.Bool
|
||||
}
|
||||
|
||||
func (h *hookPushGitAPI) PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID git.OID) error {
|
||||
if h.onFirstPush != nil && !h.did.Swap(true) {
|
||||
h.onFirstPush(ctx)
|
||||
}
|
||||
return h.GitAPI.PushRefWithLease(ctx, remote, srcRef, dstRef, expectedDstOID)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_RemoteManaged_PutRetriesOnLeaseFailure(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
remoteRunner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
remoteAPI := git.NewGitAPIImpl(remoteRunner)
|
||||
|
||||
// Seed remote so it has a head for the lease.
|
||||
_, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "base", []byte("base\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
RemoteName: "origin",
|
||||
Identity: testIdentity(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
var externalHead atomic.Value // git.OID
|
||||
|
||||
// Advance the remote right before the first push to force a lease failure and trigger a retry.
|
||||
bs.api = &hookPushGitAPI{
|
||||
GitAPI: bs.api,
|
||||
onFirstPush: func(ctx context.Context) {
|
||||
oid, _ := writeKeyToRef(ctx, remoteAPI, DoltDataRef, "external", []byte("external\n"), testIdentity())
|
||||
if oid != "" {
|
||||
externalHead.Store(oid)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
ver, err := PutBytes(ctx, bs, "k", []byte("after retry\n"))
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver)
|
||||
|
||||
remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify we rebuilt on top of the advanced remote head (i.e. parent is externalHead).
|
||||
if v := externalHead.Load(); v != nil {
|
||||
wantParent := v.(git.OID)
|
||||
out, err := remoteRunner.Run(ctx, git.RunOptions{}, "rev-parse", remoteHead.String()+"^")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, wantParent.String(), string(bytes.TrimSpace(out)))
|
||||
_, err = remoteRunner.Run(ctx, git.RunOptions{}, "rev-parse", remoteHead.String()+"^2")
|
||||
require.Error(t, err) // not a merge commit
|
||||
}
|
||||
|
||||
oid, typ, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "k")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.ObjectTypeBlob, typ)
|
||||
|
||||
rc, err := remoteAPI.BlobReader(ctx, oid)
|
||||
require.NoError(t, err)
|
||||
got, err := io.ReadAll(rc)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, rc.Close())
|
||||
require.Equal(t, []byte("after retry\n"), got)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_RemoteManaged_CheckAndPut_RemoteHeadTruth(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
remoteRunner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
remoteAPI := git.NewGitAPIImpl(remoteRunner)
|
||||
|
||||
// Base manifest
|
||||
base, err := writeKeyToRef(ctx, remoteAPI, DoltDataRef, "manifest", []byte("base\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
localAPI := git.NewGitAPIImpl(localRunner)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
RemoteName: "origin",
|
||||
Identity: testIdentity(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Fetch remote so local has the base object, then create a conflicting local commit.
|
||||
require.NoError(t, localAPI.FetchRef(ctx, "origin", DoltDataRef, bs.remoteTrackingRef))
|
||||
require.NoError(t, localAPI.UpdateRef(ctx, bs.localRef, base, "set local to base"))
|
||||
_, err = writeKeyToRef(ctx, localAPI, bs.localRef, "manifest", []byte("local\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Advance remote independently so we have a conflict on "manifest".
|
||||
_, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "manifest", []byte("remote\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
remoteManifestOID, _, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "manifest")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Remote is truth: CheckAndPut validates against remoteHead and applies changes on top of it.
|
||||
newBytes := []byte("replayed\n")
|
||||
ver, err := bs.CheckAndPut(ctx, remoteManifestOID.String(), "manifest", int64(len(newBytes)), bytes.NewReader(newBytes))
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver)
|
||||
|
||||
remoteHead, err = remoteAPI.ResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
oid, _, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "manifest")
|
||||
require.NoError(t, err)
|
||||
rc, err := remoteAPI.BlobReader(ctx, oid)
|
||||
require.NoError(t, err)
|
||||
got, err := io.ReadAll(rc)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, rc.Close())
|
||||
require.Equal(t, newBytes, got)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_RemoteManaged_CheckAndPut_ExpectedMatchesLocalButNotRemoteFails(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
remoteRunner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
remoteAPI := git.NewGitAPIImpl(remoteRunner)
|
||||
|
||||
// Base manifest
|
||||
base, err := writeKeyToRef(ctx, remoteAPI, DoltDataRef, "manifest", []byte("base\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
localAPI := git.NewGitAPIImpl(localRunner)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
RemoteName: "origin",
|
||||
Identity: testIdentity(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a local-only manifest version.
|
||||
require.NoError(t, localAPI.FetchRef(ctx, "origin", DoltDataRef, bs.remoteTrackingRef))
|
||||
require.NoError(t, localAPI.UpdateRef(ctx, bs.localRef, base, "set local to base"))
|
||||
_, err = writeKeyToRef(ctx, localAPI, bs.localRef, "manifest", []byte("local\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
localHead, err := localAPI.ResolveRefCommit(ctx, bs.localRef)
|
||||
require.NoError(t, err)
|
||||
localManifestOID, _, err := localAPI.ResolvePathObject(ctx, localHead, "manifest")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Advance remote independently.
|
||||
_, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "manifest", []byte("remote\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
remoteManifestOID, _, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "manifest")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Expected version matches local, but remote is truth, so this should fail.
|
||||
_, err = bs.CheckAndPut(ctx, localManifestOID.String(), "manifest", int64(len("new\n")), bytes.NewReader([]byte("new\n")))
|
||||
var capErr CheckAndPutError
|
||||
require.ErrorAs(t, err, &capErr)
|
||||
require.Equal(t, "manifest", capErr.Key)
|
||||
require.Equal(t, localManifestOID.String(), capErr.ExpectedVersion)
|
||||
require.Equal(t, remoteManifestOID.String(), capErr.ActualVersion)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_RemoteManaged_PutOverwritesDivergedLocalRef_NoMergeCommit(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
remoteRunner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
remoteAPI := git.NewGitAPIImpl(remoteRunner)
|
||||
|
||||
// Seed + advance remote.
|
||||
_, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "base", []byte("base\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
_, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "remote", []byte("remote\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
remoteHeadBefore, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
localRunner, err := git.NewRunner(localRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
_, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
localAPI := git.NewGitAPIImpl(localRunner)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
RemoteName: "origin",
|
||||
Identity: testIdentity(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Make local diverge from remote.
|
||||
require.NoError(t, localAPI.FetchRef(ctx, "origin", DoltDataRef, bs.remoteTrackingRef))
|
||||
require.NoError(t, localAPI.UpdateRef(ctx, bs.localRef, remoteHeadBefore, "set local to remote head"))
|
||||
_, err = writeKeyToRef(ctx, localAPI, bs.localRef, "local", []byte("local\n"), testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = PutBytes(ctx, bs, "k", []byte("from local\n"))
|
||||
require.NoError(t, err)
|
||||
|
||||
remoteHeadAfter, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
// New remote head is a normal (non-merge) commit built on remoteHeadBefore.
|
||||
out, err := remoteRunner.Run(ctx, git.RunOptions{}, "rev-parse", remoteHeadAfter.String()+"^")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, remoteHeadBefore.String(), string(bytes.TrimSpace(out)))
|
||||
_, err = remoteRunner.Run(ctx, git.RunOptions{}, "rev-parse", remoteHeadAfter.String()+"^2")
|
||||
require.Error(t, err)
|
||||
|
||||
// Local-only divergence should not be present on remote.
|
||||
_, _, err = remoteAPI.ResolvePathObject(ctx, remoteHeadAfter, "local")
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Get_NotFoundMissingKey(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"present": []byte("x"),
|
||||
}, "seed")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = GetBytes(ctx, bs, "missing", AllRange)
|
||||
@@ -141,21 +556,20 @@ func TestGitBlobstore_BlobRangeSemantics(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
|
||||
maxValue := int64(16 * 1024)
|
||||
testData := rangeData(0, maxValue)
|
||||
|
||||
commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
commit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"range": testData,
|
||||
}, "range fixture")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
runner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
rangeOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "range")
|
||||
@@ -234,10 +648,11 @@ func TestGitBlobstore_Put_RoundTripAndVersion(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
want := []byte("hello put\n")
|
||||
@@ -259,10 +674,11 @@ func TestGitBlobstore_Concatenate_Basic(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = PutBytes(ctx, bs, "a", []byte("hi "))
|
||||
@@ -284,10 +700,11 @@ func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 1024,
|
||||
})
|
||||
@@ -307,7 +724,7 @@ func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) {
|
||||
require.NotEmpty(t, ver)
|
||||
|
||||
// Verify the resulting key is stored as a chunked tree (not a single blob).
|
||||
head, ok, err := bs.api.TryResolveRefCommit(ctx, DoltDataRef)
|
||||
head, ok, err := bs.api.TryResolveRefCommit(ctx, bs.localRef)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
oid, typ, err := bs.api.ResolvePathObject(ctx, head, "c")
|
||||
@@ -318,7 +735,7 @@ func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) {
|
||||
parts, err := bs.api.ListTree(ctx, head, "c")
|
||||
require.NoError(t, err)
|
||||
require.GreaterOrEqual(t, len(parts), 2)
|
||||
require.Equal(t, "00000001", parts[0].Name)
|
||||
require.Equal(t, "0001", parts[0].Name)
|
||||
|
||||
got, ver2, err := GetBytes(ctx, bs, "c", AllRange)
|
||||
require.NoError(t, err)
|
||||
@@ -330,10 +747,11 @@ func TestGitBlobstore_Concatenate_KeyExistsFastSucceeds(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
ver1, err := PutBytes(ctx, bs, "c", []byte("original"))
|
||||
@@ -359,10 +777,13 @@ func TestGitBlobstore_Concatenate_MissingSourceIsNotFound(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"present": []byte("x"),
|
||||
}, "seed")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = PutBytes(ctx, bs, "present", []byte("x"))
|
||||
@@ -380,10 +801,11 @@ func TestGitBlobstore_Concatenate_EmptySourcesErrors(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = bs.Concatenate(ctx, "c", nil)
|
||||
@@ -400,10 +822,11 @@ func TestGitBlobstore_Put_IdempotentIfKeyExists(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
ver1, err := PutBytes(ctx, bs, "k", []byte("v1\n"))
|
||||
@@ -420,22 +843,6 @@ func TestGitBlobstore_Put_IdempotentIfKeyExists(t *testing.T) {
|
||||
require.Equal(t, []byte("v1\n"), got)
|
||||
}
|
||||
|
||||
type hookGitAPI struct {
|
||||
git.GitAPI
|
||||
|
||||
ref string
|
||||
// if set, called once before the first UpdateRefCAS executes.
|
||||
onFirstCAS func(ctx context.Context, old git.OID)
|
||||
did atomic.Bool
|
||||
}
|
||||
|
||||
func (h *hookGitAPI) UpdateRefCAS(ctx context.Context, ref string, newOID git.OID, oldOID git.OID, msg string) error {
|
||||
if h.onFirstCAS != nil && !h.did.Swap(true) && ref == h.ref {
|
||||
h.onFirstCAS(ctx, oldOID)
|
||||
}
|
||||
return h.GitAPI.UpdateRefCAS(ctx, ref, newOID, oldOID, msg)
|
||||
}
|
||||
|
||||
func writeKeyToRef(ctx context.Context, api git.GitAPI, ref string, key string, data []byte, author *git.Identity) (git.OID, error) {
|
||||
parent, ok, err := api.TryResolveRefCommit(ctx, ref)
|
||||
if err != nil {
|
||||
@@ -490,54 +897,6 @@ func writeKeyToRef(ctx context.Context, api git.GitAPI, ref string, key string,
|
||||
return commitOID, nil
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Put_ContentionRetryPreservesOtherKey(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Seed the ref so Put takes the CAS path.
|
||||
_, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"base": []byte("base\n"),
|
||||
}, "seed")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
origAPI := bs.api
|
||||
h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef}
|
||||
h.onFirstCAS = func(ctx context.Context, old git.OID) {
|
||||
// Advance the ref to simulate another writer committing concurrently.
|
||||
_, _ = writeKeyToRef(ctx, origAPI, DoltDataRef, "external", []byte("external\n"), testIdentity())
|
||||
}
|
||||
bs.api = h
|
||||
|
||||
ver, err := PutBytes(ctx, bs, "k", []byte("mine\n"))
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver)
|
||||
|
||||
got, ver2, err := GetBytes(ctx, bs, "k", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver, ver2)
|
||||
require.Equal(t, []byte("mine\n"), got)
|
||||
|
||||
got, _, err = GetBytes(ctx, bs, "external", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("external\n"), got)
|
||||
|
||||
got, _, err = GetBytes(ctx, bs, "base", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("base\n"), got)
|
||||
|
||||
// Sanity: BlobReader path still works for the new commit.
|
||||
rc, _, _, err := bs.Get(ctx, "k", AllRange)
|
||||
require.NoError(t, err)
|
||||
_, _ = io.ReadAll(rc)
|
||||
_ = rc.Close()
|
||||
}
|
||||
|
||||
type failReader struct {
|
||||
called atomic.Bool
|
||||
}
|
||||
@@ -551,10 +910,11 @@ func TestGitBlobstore_CheckAndPut_CreateOnly(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
_, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
want := []byte("created\n")
|
||||
@@ -572,18 +932,17 @@ func TestGitBlobstore_CheckAndPut_MismatchDoesNotRead(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
|
||||
commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
commit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"k": []byte("base\n"),
|
||||
}, "seed")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
runner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k")
|
||||
@@ -600,19 +959,18 @@ func TestGitBlobstore_CheckAndPut_UpdateSuccess(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)
|
||||
|
||||
commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
commit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"k": []byte("base\n"),
|
||||
"keep": []byte("keep\n"),
|
||||
}, "seed")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
runner, err := git.NewRunner(remoteRepo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k")
|
||||
@@ -633,47 +991,3 @@ func TestGitBlobstore_CheckAndPut_UpdateSuccess(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("keep\n"), got)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_CheckAndPut_ConcurrentUnrelatedUpdateStillSucceeds(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"k": []byte("base\n"),
|
||||
}, "seed")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k")
|
||||
require.NoError(t, err)
|
||||
|
||||
origAPI := bs.api
|
||||
h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef}
|
||||
h.onFirstCAS = func(ctx context.Context, old git.OID) {
|
||||
// Advance the ref (without touching "k") to make UpdateRefCAS fail.
|
||||
_, _ = writeKeyToRef(ctx, origAPI, DoltDataRef, "external", []byte("external\n"), testIdentity())
|
||||
}
|
||||
bs.api = h
|
||||
|
||||
ver2, err := bs.CheckAndPut(ctx, keyOID.String(), "k", 0, bytes.NewReader([]byte("mine\n")))
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver2)
|
||||
require.NotEqual(t, keyOID.String(), ver2)
|
||||
|
||||
got, ver3, err := GetBytes(ctx, bs, "k", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver2, ver3)
|
||||
require.Equal(t, []byte("mine\n"), got)
|
||||
|
||||
got, _, err = GetBytes(ctx, bs, "external", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("external\n"), got)
|
||||
}
|
||||
|
||||
@@ -14,14 +14,29 @@
|
||||
|
||||
package blobstore
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// DoltDataRef is the local writable ref backing a git-object-db dolt blobstore.
|
||||
// It is the state that local operations mutate and (eventually) attempt to push.
|
||||
// DoltDataRef is the default remote ref backing a git-object-db dolt blobstore.
|
||||
const DoltDataRef = "refs/dolt/data"
|
||||
|
||||
// DoltRemoteTrackingDataRef returns the remote-tracking ref for a named remote.
|
||||
// This ref represents the remote's DoltDataRef as of the last fetch.
|
||||
func DoltRemoteTrackingDataRef(remote string) string {
|
||||
return fmt.Sprintf("refs/dolt/remotes/%s/data", remote)
|
||||
func trimRefsPrefix(ref string) string {
|
||||
return strings.TrimPrefix(ref, "refs/")
|
||||
}
|
||||
|
||||
// RemoteTrackingRef returns the remote-tracking ref for a named remote and remote ref.
|
||||
// This ref represents the remote's |remoteRef| as of the last fetch.
|
||||
func RemoteTrackingRef(remoteName, remoteRef string) string {
|
||||
return fmt.Sprintf("refs/dolt/remotes/%s/%s", remoteName, trimRefsPrefix(remoteRef))
|
||||
}
|
||||
|
||||
// OwnedLocalRef returns a UUID-owned local ref for a GitBlobstore instance.
|
||||
//
|
||||
// Note: these UUID refs can accumulate in the local repo over time. This is
|
||||
// intentional for now; callers that want best-effort cleanup can use
|
||||
// (*GitBlobstore).CleanupOwnedLocalRef.
|
||||
func OwnedLocalRef(remoteName, remoteRef, uuid string) string {
|
||||
return fmt.Sprintf("refs/dolt/blobstore/%s/%s/%s", remoteName, trimRefsPrefix(remoteRef), uuid)
|
||||
}
|
||||
|
||||
@@ -108,6 +108,17 @@ type GitAPI interface {
|
||||
// Equivalent plumbing:
|
||||
// GIT_DIR=... git update-ref -m <msg> <ref> <new>
|
||||
UpdateRef(ctx context.Context, ref string, newOID OID, msg string) error
|
||||
|
||||
// FetchRef fetches |srcRef| from |remote| and updates |dstRef| in the local repo.
|
||||
// It is expected to force-update (tracking refs follow remote truth).
|
||||
// Equivalent plumbing:
|
||||
// GIT_DIR=... git fetch <remote> +<srcRef>:<dstRef>
|
||||
FetchRef(ctx context.Context, remote string, srcRef string, dstRef string) error
|
||||
|
||||
// PushRefWithLease pushes |srcRef| to |dstRef| on |remote|, but only if the remote's |dstRef|
|
||||
// equals |expectedDstOID| (force-with-lease).
|
||||
// Equivalent plumbing: GIT_DIR=... git push --force-with-lease=<dstRef>:<expectedDstOID> <remote> <srcRef>:<dstRef>
|
||||
PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID OID) error
|
||||
}
|
||||
|
||||
// TreeEntry describes one entry in a git tree listing.
|
||||
|
||||
@@ -282,6 +282,43 @@ func (a *GitAPIImpl) UpdateRef(ctx context.Context, ref string, newOID OID, msg
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *GitAPIImpl) FetchRef(ctx context.Context, remote string, srcRef string, dstRef string) error {
|
||||
if remote == "" {
|
||||
return fmt.Errorf("git fetch: remote is required")
|
||||
}
|
||||
if srcRef == "" {
|
||||
return fmt.Errorf("git fetch: src ref is required")
|
||||
}
|
||||
if dstRef == "" {
|
||||
return fmt.Errorf("git fetch: dst ref is required")
|
||||
}
|
||||
// Forced refspec to keep tracking refs in sync with remote truth.
|
||||
srcRef = strings.TrimPrefix(srcRef, "+")
|
||||
refspec := "+" + srcRef + ":" + dstRef
|
||||
_, err := a.r.Run(ctx, RunOptions{}, "fetch", "--no-tags", remote, refspec)
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *GitAPIImpl) PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID OID) error {
|
||||
if remote == "" {
|
||||
return fmt.Errorf("git push: remote is required")
|
||||
}
|
||||
if srcRef == "" {
|
||||
return fmt.Errorf("git push: src ref is required")
|
||||
}
|
||||
if dstRef == "" {
|
||||
return fmt.Errorf("git push: dst ref is required")
|
||||
}
|
||||
if expectedDstOID == "" {
|
||||
return fmt.Errorf("git push: expected dst oid is required")
|
||||
}
|
||||
srcRef = strings.TrimPrefix(srcRef, "+")
|
||||
refspec := srcRef + ":" + dstRef
|
||||
lease := "--force-with-lease=" + dstRef + ":" + expectedDstOID.String()
|
||||
_, err := a.r.Run(ctx, RunOptions{}, "push", "--porcelain", lease, remote, refspec)
|
||||
return err
|
||||
}
|
||||
|
||||
func isRefNotFoundErr(err error) bool {
|
||||
ce, ok := err.(*CmdError)
|
||||
if !ok {
|
||||
|
||||
@@ -35,20 +35,26 @@ func tempIndexFile(t *testing.T) string {
|
||||
return filepath.Join(dir, "index")
|
||||
}
|
||||
|
||||
func TestGitAPIImpl_HashObject_RoundTrip(t *testing.T) {
|
||||
t.Parallel()
|
||||
func newTestRepo(t *testing.T, ctx context.Context) (*gitrepo.Repo, *Runner, GitAPI) {
|
||||
t.Helper()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
repoDir := filepath.Join(t.TempDir(), "repo.git")
|
||||
repo, err := gitrepo.InitBare(ctx, repoDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
return repo, r, NewGitAPIImpl(r)
|
||||
}
|
||||
|
||||
func TestGitAPIImpl_HashObject_RoundTrip(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
want := []byte("hello dolt\n")
|
||||
oid, err := api.HashObject(ctx, bytes.NewReader(want))
|
||||
@@ -86,16 +92,7 @@ func TestGitAPIImpl_HashObject_Empty(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
oid, err := api.HashObject(ctx, bytes.NewReader(nil))
|
||||
if err != nil {
|
||||
@@ -118,18 +115,9 @@ func TestGitAPIImpl_ResolveRefCommit_Missing(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
|
||||
_, err = api.ResolveRefCommit(ctx, "refs/does/not/exist")
|
||||
_, err := api.ResolveRefCommit(ctx, "refs/does/not/exist")
|
||||
if err == nil {
|
||||
t.Fatalf("expected error")
|
||||
}
|
||||
@@ -143,16 +131,7 @@ func TestGitAPIImpl_ResolveRefCommit_Exists(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
@@ -184,16 +163,7 @@ func TestGitAPIImpl_WriteTree_FromEmptyIndex(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
@@ -242,16 +212,7 @@ func TestGitAPIImpl_UpdateIndexCacheInfo_ReplacesExistingEntry(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
@@ -308,16 +269,7 @@ func TestGitAPIImpl_UpdateIndexCacheInfo_FileDirectoryConflictErrors(t *testing.
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
@@ -370,16 +322,7 @@ func TestGitAPIImpl_ResolvePathObject_BlobAndTree(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
@@ -426,16 +369,7 @@ func TestGitAPIImpl_ListTree_NonRecursive(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
@@ -513,16 +447,7 @@ func TestGitAPIImpl_RemoveIndexPaths_RemovesFromIndex(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
@@ -580,16 +505,7 @@ func TestGitAPIImpl_ReadTree_PreservesExistingPaths(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, r, api := newTestRepo(t, ctx)
|
||||
|
||||
// Base commit with one file.
|
||||
baseIndex := tempIndexFile(t)
|
||||
@@ -692,16 +608,7 @@ func TestGitAPIImpl_UpdateRef_And_CAS(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
_, _, api := newTestRepo(t, ctx)
|
||||
|
||||
// Create two commits on the same tree.
|
||||
indexFile := tempIndexFile(t)
|
||||
@@ -773,3 +680,165 @@ func TestGitAPIImpl_UpdateRef_And_CAS(t *testing.T) {
|
||||
t.Fatalf("ref changed unexpectedly: ok=%v got=%q want=%q", ok, got, c2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGitAPIImpl_FetchRef_ForcedUpdatesTrackingRef(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
remoteRepo, _, remoteAPI := newTestRepo(t, ctx)
|
||||
|
||||
// Create two commits on the same tree in the remote.
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := remoteAPI.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
treeOID, err := remoteAPI.WriteTree(ctx, indexFile)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c1, err := remoteAPI.CommitTree(ctx, treeOID, nil, "c1", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c2, err := remoteAPI.CommitTree(ctx, treeOID, nil, "c2", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if c1 == c2 {
|
||||
c2, err = remoteAPI.CommitTree(ctx, treeOID, nil, "c2b", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if c1 == c2 {
|
||||
t.Fatalf("expected distinct commit oids")
|
||||
}
|
||||
}
|
||||
|
||||
remoteDataRef := "refs/dolt/data"
|
||||
if err := remoteAPI.UpdateRef(ctx, remoteDataRef, c2, "seed remote"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, localRunner, localAPI := newTestRepo(t, ctx)
|
||||
_, err = localRunner.Run(ctx, RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dstRef := "refs/dolt/remotes/origin/data"
|
||||
if err := localAPI.FetchRef(ctx, "origin", remoteDataRef, dstRef); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, err := localAPI.ResolveRefCommit(ctx, dstRef)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got != c2 {
|
||||
t.Fatalf("tracking ref mismatch: got %q, want %q", got, c2)
|
||||
}
|
||||
|
||||
// Rewind the remote ref to c1 and ensure a subsequent fetch forces the tracking ref backwards.
|
||||
if err := remoteAPI.UpdateRef(ctx, remoteDataRef, c1, "rewind remote"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := localAPI.FetchRef(ctx, "origin", remoteDataRef, dstRef); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, err = localAPI.ResolveRefCommit(ctx, dstRef)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got != c1 {
|
||||
t.Fatalf("tracking ref mismatch after rewind: got %q, want %q", got, c1)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGitAPIImpl_PushRefWithLease_SucceedsThenRejectsStaleLease(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
remoteRepo, _, remoteAPI := newTestRepo(t, ctx)
|
||||
|
||||
// Seed remote ref with r1, then later advance to r2.
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := remoteAPI.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
treeOID, err := remoteAPI.WriteTree(ctx, indexFile)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r1, err := remoteAPI.CommitTree(ctx, treeOID, nil, "r1", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r2, err := remoteAPI.CommitTree(ctx, treeOID, nil, "r2", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if r1 == r2 {
|
||||
r2, err = remoteAPI.CommitTree(ctx, treeOID, nil, "r2b", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if r1 == r2 {
|
||||
t.Fatalf("expected distinct commit oids")
|
||||
}
|
||||
}
|
||||
|
||||
remoteDataRef := "refs/dolt/data"
|
||||
if err := remoteAPI.UpdateRef(ctx, remoteDataRef, r1, "seed remote"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, localRunner, localAPI := newTestRepo(t, ctx)
|
||||
_, err = localRunner.Run(ctx, RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a local commit l1 and set local refs/dolt/data to it (src ref for push).
|
||||
localIndex := tempIndexFile(t)
|
||||
if err := localAPI.ReadTreeEmpty(ctx, localIndex); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
localTree, err := localAPI.WriteTree(ctx, localIndex)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
l1, err := localAPI.CommitTree(ctx, localTree, nil, "l1", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := localAPI.UpdateRef(ctx, remoteDataRef, l1, "set local src"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Lease matches remote (r1) -> push should succeed and overwrite remoteDataRef to l1.
|
||||
if err := localAPI.PushRefWithLease(ctx, "origin", remoteDataRef, remoteDataRef, r1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, err := remoteAPI.ResolveRefCommit(ctx, remoteDataRef)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got != l1 {
|
||||
t.Fatalf("remote ref mismatch after push: got %q, want %q", got, l1)
|
||||
}
|
||||
|
||||
// Advance remote to r2, then attempt a stale-lease push expecting r1 -> should fail and not clobber r2.
|
||||
if err := remoteAPI.UpdateRef(ctx, remoteDataRef, r2, "advance remote"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = localAPI.PushRefWithLease(ctx, "origin", remoteDataRef, remoteDataRef, r1)
|
||||
if err == nil {
|
||||
t.Fatalf("expected stale lease push to fail")
|
||||
}
|
||||
got, err = remoteAPI.ResolveRefCommit(ctx, remoteDataRef)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got != r2 {
|
||||
t.Fatalf("remote ref changed unexpectedly on stale lease: got %q, want %q", got, r2)
|
||||
}
|
||||
}
|
||||
|
||||
40
go/store/blobstore/internal/git/temp_index.go
Normal file
40
go/store/blobstore/internal/git/temp_index.go
Normal file
@@ -0,0 +1,40 @@
|
||||
// Copyright 2026 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package git
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// NewTempIndex creates a unique temporary git index file (for use as GIT_INDEX_FILE).
|
||||
// The index is created outside of any repo's GIT_DIR to avoid read-only repos and to
|
||||
// avoid leaving scratch files in the repo on crashes.
|
||||
//
|
||||
// Note: git may also create a sibling lock file (<index>.lock) during index writes.
|
||||
func NewTempIndex() (dir, indexFile string, cleanup func(), err error) {
|
||||
f, err := os.CreateTemp("", "dolt-git-index-")
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
indexFile = f.Name()
|
||||
_ = f.Close()
|
||||
dir = filepath.Dir(indexFile)
|
||||
cleanup = func() {
|
||||
_ = os.Remove(indexFile)
|
||||
_ = os.Remove(indexFile + ".lock")
|
||||
}
|
||||
return dir, indexFile, cleanup, nil
|
||||
}
|
||||
@@ -35,7 +35,7 @@ func TestGitBlobstoreReadSmoke_ManifestAndTableAccessPatterns(t *testing.T) {
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Seed a valid v5 manifest with no tables. This should allow NBS to open
|
||||
@@ -58,14 +58,20 @@ func TestGitBlobstoreReadSmoke_ManifestAndTableAccessPatterns(t *testing.T) {
|
||||
table[i] = byte(i % 251)
|
||||
}
|
||||
|
||||
commit, err := repo.SetRefToTree(ctx, blobstore.DoltDataRef, map[string][]byte{
|
||||
commit, err := remoteRepo.SetRefToTree(ctx, blobstore.DoltDataRef, map[string][]byte{
|
||||
"manifest": buf.Bytes(),
|
||||
"table": table,
|
||||
}, "seed refs/dolt/data for smoke test")
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, commit)
|
||||
|
||||
bs, err := blobstore.NewGitBlobstore(repo.GitDir, blobstore.DoltDataRef)
|
||||
localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git")
|
||||
require.NoError(t, err)
|
||||
cmd := exec.CommandContext(ctx, "git", "--git-dir", localRepo.GitDir, "remote", "add", "origin", remoteRepo.GitDir)
|
||||
remoteAddOut, err := cmd.CombinedOutput()
|
||||
require.NoError(t, err, "git remote add failed: %s", string(remoteAddOut))
|
||||
|
||||
bs, err := blobstore.NewGitBlobstore(localRepo.GitDir, blobstore.DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
// 1) Manifest read path via blobstoreManifest.ParseIfExists.
|
||||
|
||||
Reference in New Issue
Block a user