/go/store/blobstore: wip implementing concatenate

This commit is contained in:
coffeegoddd☕️✨
2026-02-04 16:11:25 -08:00
parent 30d58283f0
commit 337aee528f
2 changed files with 312 additions and 7 deletions

View File

@@ -15,6 +15,7 @@
package blobstore
import (
"bytes"
"context"
"errors"
"fmt"
@@ -25,6 +26,7 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
"golang.org/x/sync/errgroup"
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
)
@@ -33,9 +35,8 @@ import (
// database (bare repo or .git directory). It stores keys as paths within the tree
// of the commit referenced by a git ref (e.g. refs/dolt/data).
//
// This implementation is being developed in phases. Read paths are implemented first,
// then write paths are added incrementally. At the moment, Put is implemented, while
// CheckAndPut and Concatenate are still unimplemented.
// This implementation is being developed in phases. Read paths were implemented first,
// then write paths were added incrementally.
type GitBlobstore struct {
gitDir string
ref string
@@ -408,15 +409,162 @@ func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key s
}
func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []string) (string, error) {
if _, err := normalizeGitTreePath(key); err != nil {
key, err := normalizeGitTreePath(key)
if err != nil {
return "", err
}
for _, src := range sources {
if _, err := normalizeGitTreePath(src); err != nil {
normSources := make([]string, len(sources))
for i, src := range sources {
src, err := normalizeGitTreePath(src)
if err != nil {
return "", err
}
normSources[i] = src
}
return "", fmt.Errorf("%w: GitBlobstore.Concatenate", git.ErrUnimplemented)
// Snapshot the current head for reading sources so we don't depend on the ref staying
// stable while we stream the concatenated contents into a new blob object.
snapshot, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
if err != nil {
return "", err
}
if !ok && len(normSources) > 0 {
// If the ref doesn't exist, the store is missing/corrupt (there is no commit to
// resolve source paths against).
return "", &git.RefNotFoundError{Ref: gbs.ref}
}
blobOID, err := gbs.hashConcatenation(ctx, snapshot, ok, normSources)
if err != nil {
return "", err
}
const maxRetries = 31 // 32 total attempts (initial + retries)
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 5 * time.Millisecond
bo.Multiplier = 2
bo.MaxInterval = 320 * time.Millisecond
bo.RandomizationFactor = 0 // deterministic; can add jitter later if needed
bo.Reset()
policy := backoff.WithContext(backoff.WithMaxRetries(bo, maxRetries), ctx)
var ver string
op := func() error {
parent, hasParent, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
if err != nil {
return backoff.Permanent(err)
}
newCommit, msg, err := gbs.buildConcatenateCommit(ctx, parent, hasParent, key, blobOID, len(normSources))
if err != nil {
return backoff.Permanent(err)
}
if !hasParent {
// Best-effort ref creation. If a concurrent writer created the ref first, retry.
if err := gbs.api.UpdateRef(ctx, gbs.ref, newCommit, msg); err != nil {
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}
ver = newCommit.String()
return nil
}
err = gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg)
if err == nil {
ver = newCommit.String()
return nil
}
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}
if err := backoff.Retry(op, policy); err != nil {
if ctx.Err() != nil {
return "", ctx.Err()
}
return "", err
}
return ver, nil
}
func (gbs *GitBlobstore) buildConcatenateCommit(ctx context.Context, parent git.OID, hasParent bool, key string, blobOID git.OID, nSources int) (git.OID, string, error) {
msg := fmt.Sprintf("gitblobstore: concatenate %s (%d sources)", key, nSources)
commitOID, err := gbs.buildCommitWithMessage(ctx, parent, hasParent, key, blobOID, msg)
if err != nil {
return "", "", err
}
return commitOID, msg, nil
}
func (gbs *GitBlobstore) hashConcatenation(ctx context.Context, commit git.OID, hasCommit bool, sources []string) (git.OID, error) {
if len(sources) == 0 {
return gbs.api.HashObject(ctx, bytes.NewReader(nil))
}
if !hasCommit {
return "", &git.RefNotFoundError{Ref: gbs.ref}
}
pr, pw := io.Pipe()
eg, ectx := errgroup.WithContext(ctx)
eg.Go(func() error {
defer func() {
_ = pw.Close()
}()
for _, src := range sources {
blobOID, err := gbs.api.ResolvePathBlob(ectx, commit, src)
if err != nil {
if git.IsPathNotFound(err) {
_ = pw.CloseWithError(NotFound{Key: src})
return NotFound{Key: src}
}
_ = pw.CloseWithError(err)
return err
}
rc, err := gbs.api.BlobReader(ectx, blobOID)
if err != nil {
_ = pw.CloseWithError(err)
return err
}
_, err = io.Copy(pw, rc)
cerr := rc.Close()
if err == nil {
err = cerr
}
if err != nil {
_ = pw.CloseWithError(err)
return err
}
}
return nil
})
oid, err := gbs.api.HashObject(ectx, pr)
if err != nil {
_ = pr.CloseWithError(err)
if werr := eg.Wait(); werr != nil {
return "", werr
}
if ctx.Err() != nil {
return "", ctx.Err()
}
return "", err
}
_ = pr.Close()
if err := eg.Wait(); err != nil {
return "", err
}
return oid, nil
}
// normalizeGitTreePath normalizes and validates a blobstore key for use as a git tree path.

View File

@@ -215,6 +215,12 @@ func TestGitBlobstore_InvalidKeysError(t *testing.T) {
_, err = bs.Put(ctx, k, 1, bytes.NewReader([]byte("x")))
require.Error(t, err, "expected error for key %q", k)
_, err = bs.Concatenate(ctx, k, []string{"ok"})
require.Error(t, err, "expected error for key %q", k)
_, err = bs.Concatenate(ctx, "ok2", []string{k})
require.Error(t, err, "expected error for source key %q", k)
}
}
@@ -268,6 +274,157 @@ func TestGitBlobstore_Put_Overwrite(t *testing.T) {
require.Equal(t, []byte("v2\n"), got)
}
func TestGitBlobstore_Concatenate_RoundTripAndRanges(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
a := []byte("aaaaa")
b := []byte("bbb")
c := []byte("cccccccc")
_, err = PutBytes(ctx, bs, "a", a)
require.NoError(t, err)
_, err = PutBytes(ctx, bs, "b", b)
require.NoError(t, err)
_, err = PutBytes(ctx, bs, "c", c)
require.NoError(t, err)
ver, err := bs.Concatenate(ctx, "composite", []string{"a", "b", "c"})
require.NoError(t, err)
require.NotEmpty(t, ver)
// Full object.
got, ver2, err := GetBytes(ctx, bs, "composite", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, append(append(append([]byte(nil), a...), b...), c...), got)
// Range verification across boundaries.
var off int64
rc, sz, ver3, err := bs.Get(ctx, "composite", BlobRange{offset: off, length: int64(len(a))})
require.NoError(t, err)
require.Equal(t, ver, ver3)
require.Equal(t, uint64(len(a)+len(b)+len(c)), sz)
buf, err := io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, err)
require.Equal(t, a, buf)
off += int64(len(a))
rc, sz, ver3, err = bs.Get(ctx, "composite", BlobRange{offset: off, length: int64(len(b))})
require.NoError(t, err)
require.Equal(t, ver, ver3)
require.Equal(t, uint64(len(a)+len(b)+len(c)), sz)
buf, err = io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, err)
require.Equal(t, b, buf)
off += int64(len(b))
rc, sz, ver3, err = bs.Get(ctx, "composite", BlobRange{offset: off, length: int64(len(c))})
require.NoError(t, err)
require.Equal(t, ver, ver3)
require.Equal(t, uint64(len(a)+len(b)+len(c)), sz)
buf, err = io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, err)
require.Equal(t, c, buf)
}
func TestGitBlobstore_Concatenate_EmptySourcesCreatesEmptyBlob(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
ver, err := bs.Concatenate(ctx, "empty", nil)
require.NoError(t, err)
require.NotEmpty(t, ver)
rc, sz, ver2, err := bs.Get(ctx, "empty", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, uint64(0), sz)
data, err := io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, err)
require.Empty(t, data)
}
func TestGitBlobstore_Concatenate_MissingSourceIsNotFound(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
_, err = PutBytes(ctx, bs, "present", []byte("x"))
require.NoError(t, err)
_, err = bs.Concatenate(ctx, "composite", []string{"present", "missing"})
require.Error(t, err)
require.True(t, IsNotFoundError(err))
ok, err := bs.Exists(ctx, "composite")
require.NoError(t, err)
require.False(t, ok)
}
func TestGitBlobstore_Concatenate_ContentionRetryPreservesOtherKey(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
// Seed the ref so Concatenate takes the CAS path.
_, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
"a": []byte("A"),
"b": []byte("B"),
}, "seed")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
origAPI := bs.api
h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef}
h.onFirstCAS = func(ctx context.Context, old git.OID) {
// Advance the ref to simulate another writer committing concurrently.
_, _ = writeKeyToRef(ctx, origAPI, DoltDataRef, "external", []byte("external\n"), testIdentity())
}
bs.api = h
ver, err := bs.Concatenate(ctx, "composite", []string{"a", "b"})
require.NoError(t, err)
require.NotEmpty(t, ver)
got, ver2, err := GetBytes(ctx, bs, "composite", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, []byte("AB"), got)
got, _, err = GetBytes(ctx, bs, "external", AllRange)
require.NoError(t, err)
require.Equal(t, []byte("external\n"), got)
got, _, err = GetBytes(ctx, bs, "a", AllRange)
require.NoError(t, err)
require.Equal(t, []byte("A"), got)
}
type hookGitAPI struct {
git.GitAPI