/go/store/blobstore: disable Concatenate on chunked branch

Create db/gitblobstore-next-2a by leaving GitBlobstore.Concatenate unimplemented and removing concatenate-focused tests, while keeping chunked Get/Put/CheckAndPut work intact.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
coffeegoddd☕️✨
2026-02-05 12:04:07 -08:00
parent 361a5ff747
commit 4a09c8ec53
3 changed files with 8 additions and 578 deletions

View File

@@ -27,7 +27,6 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
"golang.org/x/sync/errgroup"
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
gitbs "github.com/dolthub/dolt/go/store/blobstore/internal/gitbs"
@@ -701,344 +700,20 @@ func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key s
}
func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []string) (string, error) {
key, err := normalizeGitTreePath(key)
// Chunked-object support is landing in phases. Concatenate is the final piece
// needed for NBS conjoin and is intentionally left unimplemented on this branch.
//
// Keep key validation for consistent error behavior.
_, err := normalizeGitTreePath(key)
if err != nil {
return "", err
}
normSources := make([]string, len(sources))
for i, src := range sources {
src, err := normalizeGitTreePath(src)
if err != nil {
return "", err
}
normSources[i] = src
}
// Snapshot the current head for reading sources so we don't depend on the ref staying
// stable while we stream the concatenated contents into a new blob object.
snapshot, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
if err != nil {
return "", err
}
if !ok && len(normSources) > 0 {
// If the ref doesn't exist, the store is missing/corrupt (there is no commit to
// resolve source paths against).
return "", &git.RefNotFoundError{Ref: gbs.ref}
}
msg := fmt.Sprintf("gitblobstore: concatenate %s (%d sources)", key, len(normSources))
var writes []treeWrite
if gbs.maxPartSize == 0 {
blobOID, err := gbs.hashConcatenation(ctx, snapshot, ok, normSources)
if err != nil {
return "", err
}
writes = []treeWrite{{path: key, oid: blobOID}}
} else {
writes, err = gbs.planConcatenateWritesChunked(ctx, snapshot, ok, key, normSources)
if err != nil {
return "", err
}
}
const maxRetries = 31 // 32 total attempts (initial + retries)
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 5 * time.Millisecond
bo.Multiplier = 2
bo.MaxInterval = 320 * time.Millisecond
bo.RandomizationFactor = 0 // deterministic; can add jitter later if needed
bo.Reset()
policy := backoff.WithContext(backoff.WithMaxRetries(bo, maxRetries), ctx)
var ver string
op := func() error {
parent, hasParent, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
if err != nil {
return backoff.Permanent(err)
}
newCommit, err := gbs.buildCommitWithWrites(ctx, parent, hasParent, writes, msg)
if err != nil {
return backoff.Permanent(err)
}
if !hasParent {
// Create-only CAS: oldOID=all-zero requires the ref to not exist. This avoids
// losing concurrent writes when multiple goroutines create the ref at once.
const zeroOID = git.OID("0000000000000000000000000000000000000000")
if err := gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, zeroOID, msg); err != nil {
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}
ver = newCommit.String()
return nil
}
err = gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg)
if err == nil {
ver = newCommit.String()
return nil
}
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}
if err := backoff.Retry(op, policy); err != nil {
if ctx.Err() != nil {
return "", ctx.Err()
}
return "", err
}
return ver, nil
}
func (gbs *GitBlobstore) hashConcatenation(ctx context.Context, commit git.OID, hasCommit bool, sources []string) (git.OID, error) {
if len(sources) == 0 {
return gbs.api.HashObject(ctx, bytes.NewReader(nil))
}
if !hasCommit {
return "", &git.RefNotFoundError{Ref: gbs.ref}
}
pr, pw := io.Pipe()
eg, ectx := errgroup.WithContext(ctx)
eg.Go(func() error {
defer func() {
_ = pw.Close()
}()
for _, src := range sources {
blobOID, err := gbs.api.ResolvePathBlob(ectx, commit, src)
if err != nil {
if git.IsPathNotFound(err) {
_ = pw.CloseWithError(NotFound{Key: src})
return NotFound{Key: src}
}
_ = pw.CloseWithError(err)
return err
}
rc, err := gbs.api.BlobReader(ectx, blobOID)
if err != nil {
_ = pw.CloseWithError(err)
return err
}
_, err = io.Copy(pw, rc)
cerr := rc.Close()
if err == nil {
err = cerr
}
if err != nil {
_ = pw.CloseWithError(err)
return err
}
}
return nil
})
oid, err := gbs.api.HashObject(ectx, pr)
if err != nil {
_ = pr.CloseWithError(err)
if werr := eg.Wait(); werr != nil {
return "", werr
}
if ctx.Err() != nil {
return "", ctx.Err()
}
return "", err
}
_ = pr.Close()
if err := eg.Wait(); err != nil {
return "", err
}
return oid, nil
}
type resolvedConcatSource struct {
inlineOID git.OID
inlineSize int64
desc *gitbs.Descriptor
}
func (gbs *GitBlobstore) resolveConcatSource(ctx context.Context, commit git.OID, path string) (resolvedConcatSource, error) {
blobOID, err := gbs.api.ResolvePathBlob(ctx, commit, path)
if err != nil {
if git.IsPathNotFound(err) {
return resolvedConcatSource{}, NotFound{Key: path}
}
return resolvedConcatSource{}, err
}
sz, err := gbs.api.BlobSize(ctx, blobOID)
if err != nil {
return resolvedConcatSource{}, err
}
// Peek enough bytes to detect descriptor prefix conservatively.
rc, err := gbs.api.BlobReader(ctx, blobOID)
if err != nil {
return resolvedConcatSource{}, err
}
defer rc.Close()
peek := make([]byte, 0, 64)
buf := make([]byte, 64)
for len(peek) < cap(peek) {
n, rerr := rc.Read(buf[:min(cap(peek)-len(peek), len(buf))])
if n > 0 {
peek = append(peek, buf[:n]...)
}
if rerr != nil {
if errors.Is(rerr, io.EOF) {
break
}
return resolvedConcatSource{}, rerr
}
}
if !gitbs.IsDescriptorPrefix(peek) {
return resolvedConcatSource{inlineOID: blobOID, inlineSize: sz}, nil
}
// Descriptor: re-read whole descriptor blob (bounded).
// TODO(gitblobstore): configurable MaxDescriptorSize.
const maxDesc = int64(64 * 1024)
if sz > maxDesc {
return resolvedConcatSource{}, fmt.Errorf("gitblobstore: descriptor too large (%d bytes, cap %d)", sz, maxDesc)
}
_ = rc.Close()
rc2, err := gbs.api.BlobReader(ctx, blobOID)
if err != nil {
return resolvedConcatSource{}, err
}
defer rc2.Close()
descBytes, err := io.ReadAll(rc2)
if err != nil {
return resolvedConcatSource{}, err
}
desc, err := gitbs.ParseDescriptor(descBytes)
if err != nil {
return resolvedConcatSource{}, err
}
return resolvedConcatSource{desc: &desc}, nil
}
func (gbs *GitBlobstore) planConcatenateWritesChunked(ctx context.Context, snapshot git.OID, hasSnapshot bool, key string, sources []string) ([]treeWrite, error) {
if len(sources) == 0 {
// Empty concatenation => empty object. Store inline.
oid, err := gbs.api.HashObject(ctx, bytes.NewReader(nil))
if err != nil {
return nil, err
}
return []treeWrite{{path: key, oid: oid}}, nil
}
if !hasSnapshot {
return nil, &git.RefNotFoundError{Ref: gbs.ref}
}
var (
allParts []gitbs.PartRef
allPartOID = make(map[git.OID]struct{})
total uint64
)
for _, src := range sources {
rs, err := gbs.resolveConcatSource(ctx, snapshot, src)
if err != nil {
return nil, err
}
var parts []gitbs.PartRef
var oids []git.OID
if rs.desc != nil {
parts = rs.desc.Parts
for _, p := range parts {
oid := git.OID(p.OIDHex)
if p.Size > gbs.maxPartSize {
// Re-chunk oversized part.
rc, err := gbs.api.BlobReader(ctx, oid)
if err != nil {
return nil, err
}
newParts, newOIDs, _, err := gbs.hashParts(ctx, rc)
_ = rc.Close()
if err != nil {
return nil, err
}
allParts = append(allParts, newParts...)
for _, no := range newOIDs {
allPartOID[no] = struct{}{}
}
for _, np := range newParts {
total += np.Size
}
continue
}
oids = append(oids, oid)
}
} else {
// Inline.
if rs.inlineSize < 0 {
return nil, fmt.Errorf("gitblobstore: invalid inline size %d", rs.inlineSize)
}
if uint64(rs.inlineSize) > gbs.maxPartSize {
// Re-chunk oversized inline blob.
rc, err := gbs.api.BlobReader(ctx, rs.inlineOID)
if err != nil {
return nil, err
}
newParts, newOIDs, _, err := gbs.hashParts(ctx, rc)
_ = rc.Close()
if err != nil {
return nil, err
}
parts = newParts
oids = newOIDs
} else {
parts = []gitbs.PartRef{{OIDHex: rs.inlineOID.String(), Size: uint64(rs.inlineSize)}}
oids = []git.OID{rs.inlineOID}
}
}
allParts = append(allParts, parts...)
for _, o := range oids {
allPartOID[o] = struct{}{}
}
for _, p := range parts {
total += p.Size
if _, err := normalizeGitTreePath(src); err != nil {
return "", err
}
}
descBytes, err := gitbs.EncodeDescriptor(gitbs.Descriptor{TotalSize: total, Parts: allParts})
if err != nil {
return nil, err
}
descOID, err := gbs.api.HashObject(ctx, bytes.NewReader(descBytes))
if err != nil {
return nil, err
}
writes := make([]treeWrite, 0, 1+len(allPartOID))
writes = append(writes, treeWrite{path: key, oid: descOID})
for oid := range allPartOID {
ppath, err := gitbs.PartPath(oid.String())
if err != nil {
return nil, err
}
writes = append(writes, treeWrite{path: ppath, oid: oid})
}
return writes, nil
return "", git.ErrUnimplemented
}
// normalizeGitTreePath normalizes and validates a blobstore key for use as a git tree path.

View File

@@ -1,88 +0,0 @@
// Copyright 2026 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package blobstore
import (
"bytes"
"context"
"io"
"testing"
"github.com/stretchr/testify/require"
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
gitbs "github.com/dolthub/dolt/go/store/blobstore/internal/gitbs"
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
)
func TestGitBlobstore_Concatenate_ChunkedStructuralAndRechunksOversizedInline(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
// Seed an oversized inline blob (chunking disabled) so we exercise re-chunking during Concatenate.
seed, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
Identity: testIdentity(),
MaxPartSize: 0,
})
require.NoError(t, err)
inline := []byte("abcdefghij") // 10 bytes
_, err = seed.Put(ctx, "a", int64(len(inline)), bytes.NewReader(inline))
require.NoError(t, err)
// Now concatenate in chunked mode with a small max part size.
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
Identity: testIdentity(),
MaxPartSize: 3,
})
require.NoError(t, err)
ver, err := bs.Concatenate(ctx, "out", []string{"a"})
require.NoError(t, err)
require.NotEmpty(t, ver)
got, ver2, err := GetBytes(ctx, bs, "out", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, inline, got)
// Verify "out" is a descriptor and all parts are <= 3 and reachable under parts namespace.
runner, err := git.NewRunner(repo.GitDir)
require.NoError(t, err)
api := git.NewGitAPIImpl(runner)
commit := git.OID(ver)
outOID, err := api.ResolvePathBlob(ctx, commit, "out")
require.NoError(t, err)
rc, err := api.BlobReader(ctx, outOID)
require.NoError(t, err)
descBytes, err := io.ReadAll(rc)
require.NoError(t, err)
require.NoError(t, rc.Close())
desc, err := gitbs.ParseDescriptor(descBytes)
require.NoError(t, err)
require.Equal(t, uint64(len(inline)), desc.TotalSize)
for _, p := range desc.Parts {
require.LessOrEqual(t, p.Size, uint64(3))
ppath, err := gitbs.PartPath(p.OIDHex)
require.NoError(t, err)
_, err = api.ResolvePathBlob(ctx, commit, ppath)
require.NoError(t, err)
}
}

View File

@@ -215,12 +215,6 @@ func TestGitBlobstore_InvalidKeysError(t *testing.T) {
_, err = bs.Put(ctx, k, 1, bytes.NewReader([]byte("x")))
require.Error(t, err, "expected error for key %q", k)
_, err = bs.Concatenate(ctx, k, []string{"ok"})
require.Error(t, err, "expected error for key %q", k)
_, err = bs.Concatenate(ctx, "ok2", []string{k})
require.Error(t, err, "expected error for source key %q", k)
}
}
@@ -274,157 +268,6 @@ func TestGitBlobstore_Put_Overwrite(t *testing.T) {
require.Equal(t, []byte("v2\n"), got)
}
func TestGitBlobstore_Concatenate_RoundTripAndRanges(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
a := []byte("aaaaa")
b := []byte("bbb")
c := []byte("cccccccc")
_, err = PutBytes(ctx, bs, "a", a)
require.NoError(t, err)
_, err = PutBytes(ctx, bs, "b", b)
require.NoError(t, err)
_, err = PutBytes(ctx, bs, "c", c)
require.NoError(t, err)
ver, err := bs.Concatenate(ctx, "composite", []string{"a", "b", "c"})
require.NoError(t, err)
require.NotEmpty(t, ver)
// Full object.
got, ver2, err := GetBytes(ctx, bs, "composite", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, append(append(append([]byte(nil), a...), b...), c...), got)
// Range verification across boundaries.
var off int64
rc, sz, ver3, err := bs.Get(ctx, "composite", BlobRange{offset: off, length: int64(len(a))})
require.NoError(t, err)
require.Equal(t, ver, ver3)
require.Equal(t, uint64(len(a)+len(b)+len(c)), sz)
buf, err := io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, err)
require.Equal(t, a, buf)
off += int64(len(a))
rc, sz, ver3, err = bs.Get(ctx, "composite", BlobRange{offset: off, length: int64(len(b))})
require.NoError(t, err)
require.Equal(t, ver, ver3)
require.Equal(t, uint64(len(a)+len(b)+len(c)), sz)
buf, err = io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, err)
require.Equal(t, b, buf)
off += int64(len(b))
rc, sz, ver3, err = bs.Get(ctx, "composite", BlobRange{offset: off, length: int64(len(c))})
require.NoError(t, err)
require.Equal(t, ver, ver3)
require.Equal(t, uint64(len(a)+len(b)+len(c)), sz)
buf, err = io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, err)
require.Equal(t, c, buf)
}
func TestGitBlobstore_Concatenate_EmptySourcesCreatesEmptyBlob(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
ver, err := bs.Concatenate(ctx, "empty", nil)
require.NoError(t, err)
require.NotEmpty(t, ver)
rc, sz, ver2, err := bs.Get(ctx, "empty", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, uint64(0), sz)
data, err := io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, err)
require.Empty(t, data)
}
func TestGitBlobstore_Concatenate_MissingSourceIsNotFound(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
_, err = PutBytes(ctx, bs, "present", []byte("x"))
require.NoError(t, err)
_, err = bs.Concatenate(ctx, "composite", []string{"present", "missing"})
require.Error(t, err)
require.True(t, IsNotFoundError(err))
ok, err := bs.Exists(ctx, "composite")
require.NoError(t, err)
require.False(t, ok)
}
func TestGitBlobstore_Concatenate_ContentionRetryPreservesOtherKey(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
// Seed the ref so Concatenate takes the CAS path.
_, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
"a": []byte("A"),
"b": []byte("B"),
}, "seed")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)
origAPI := bs.api
h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef}
h.onFirstCAS = func(ctx context.Context, old git.OID) {
// Advance the ref to simulate another writer committing concurrently.
_, _ = writeKeyToRef(ctx, origAPI, DoltDataRef, "external", []byte("external\n"), testIdentity())
}
bs.api = h
ver, err := bs.Concatenate(ctx, "composite", []string{"a", "b"})
require.NoError(t, err)
require.NotEmpty(t, ver)
got, ver2, err := GetBytes(ctx, bs, "composite", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, []byte("AB"), got)
got, _, err = GetBytes(ctx, bs, "external", AllRange)
require.NoError(t, err)
require.Equal(t, []byte("external\n"), got)
got, _, err = GetBytes(ctx, bs, "a", AllRange)
require.NoError(t, err)
require.Equal(t, []byte("A"), got)
}
type hookGitAPI struct {
git.GitAPI