/go/store/blobstore: wip, simplifying

This commit is contained in:
coffeegoddd☕️✨
2026-02-05 16:54:17 -08:00
parent f95f4f0e30
commit c2ea3de6f2
9 changed files with 481 additions and 384 deletions
+70 -102
View File
@@ -20,9 +20,11 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
@@ -111,7 +113,7 @@ func (gbs *GitBlobstore) Exists(ctx context.Context, key string) (bool, error) {
if !ok {
return false, nil
}
_, err = gbs.api.ResolvePathBlob(ctx, commit, key)
_, _, err = gbs.api.ResolvePathObject(ctx, commit, key)
if err != nil {
if git.IsPathNotFound(err) {
return false, nil
@@ -131,17 +133,29 @@ func (gbs *GitBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.
return nil, 0, ver, err
}
blobOID, ver, err := gbs.resolveBlobForGet(ctx, commit, key)
oid, typ, ver, err := gbs.resolveObjectForGet(ctx, commit, key)
if err != nil {
return nil, 0, ver, err
}
sz, ver, err := gbs.resolveBlobSizeForGet(ctx, commit, blobOID)
if err != nil {
return nil, 0, ver, err
}
switch typ {
case "blob":
sz, ver, err := gbs.resolveBlobSizeForGet(ctx, commit, oid)
if err != nil {
return nil, 0, ver, err
}
rc, err := gbs.api.BlobReader(ctx, oid)
if err != nil {
return nil, 0, ver, err
}
return sliceInlineBlob(rc, sz, br, ver)
return gbs.openBlobOrDescriptorRange(ctx, commit, blobOID, sz, br)
case "tree":
return gbs.openChunkedTreeRange(ctx, commit, key, oid, br)
default:
return nil, 0, ver, fmt.Errorf("gitblobstore: unsupported object type %q for key %q", typ, key)
}
}
func (gbs *GitBlobstore) resolveCommitForGet(ctx context.Context, key string) (commit git.OID, ver string, err error) {
@@ -161,15 +175,15 @@ func (gbs *GitBlobstore) resolveCommitForGet(ctx context.Context, key string) (c
return git.OID(""), "", &git.RefNotFoundError{Ref: gbs.ref}
}
func (gbs *GitBlobstore) resolveBlobForGet(ctx context.Context, commit git.OID, key string) (oid git.OID, ver string, err error) {
oid, err = gbs.api.ResolvePathBlob(ctx, commit, key)
func (gbs *GitBlobstore) resolveObjectForGet(ctx context.Context, commit git.OID, key string) (oid git.OID, typ string, ver string, err error) {
oid, typ, err = gbs.api.ResolvePathObject(ctx, commit, key)
if err != nil {
if git.IsPathNotFound(err) {
return git.OID(""), commit.String(), NotFound{Key: key}
return git.OID(""), "", commit.String(), NotFound{Key: key}
}
return git.OID(""), commit.String(), err
return git.OID(""), "", commit.String(), err
}
return oid, commit.String(), nil
return oid, typ, commit.String(), nil
}
func (gbs *GitBlobstore) resolveBlobSizeForGet(ctx context.Context, commit git.OID, oid git.OID) (sz int64, ver string, err error) {
@@ -188,56 +202,27 @@ type limitReadCloser struct {
func (l *limitReadCloser) Read(p []byte) (int, error) { return l.r.Read(p) }
func (l *limitReadCloser) Close() error { return l.c.Close() }
func (gbs *GitBlobstore) openBlobOrDescriptorRange(ctx context.Context, commit git.OID, blobOID git.OID, blobSize int64, br BlobRange) (io.ReadCloser, uint64, string, error) {
func (gbs *GitBlobstore) openChunkedTreeRange(ctx context.Context, commit git.OID, key string, treeOID git.OID, br BlobRange) (io.ReadCloser, uint64, string, error) {
ver := commit.String()
rc, err := gbs.api.BlobReader(ctx, blobOID)
_ = treeOID // treeOID is informational; ListTree resolves by path.
entries, err := gbs.api.ListTree(ctx, commit, key)
if err != nil {
return nil, 0, ver, err
}
defer func() {
if rc != nil {
_ = rc.Close()
}
}()
const peekN = 64 * 1024
peek, err := readAtMost(rc, 256)
parts, totalSize, err := gbs.validateAndSizeChunkedParts(ctx, entries)
if err != nil {
return nil, 0, ver, err
}
// Not a descriptor: stream inline blob with BlobRange slicing.
if !gitbs.IsDescriptorPrefix(peek) {
inlineRC, err := gbs.reopenInlineBlobReader(ctx, rc, blobOID)
if err != nil {
return nil, 0, ver, err
}
rc = nil // ownership transferred / already closed
return sliceInlineBlob(inlineRC, blobSize, br, ver)
}
// It's probably a descriptor. Read the full contents (bounded defensively).
// TODO(gitblobstore): add a MaxDescriptorSize config; for now cap at 64KiB.
descBytes, err := readFullBlobBounded(rc, peek, blobSize, peekN)
if err != nil {
return nil, 0, ver, err
}
desc, err := gitbs.ParseDescriptor(descBytes)
if err != nil {
// Treat malformed descriptors as corruption (hard error).
return nil, 0, ver, err
}
total := int64(desc.TotalSize)
total := int64(totalSize)
start, end, err := gitbs.NormalizeRange(total, br.offset, br.length)
if err != nil {
return nil, uint64(desc.TotalSize), ver, err
return nil, totalSize, ver, err
}
slices, err := gitbs.SliceParts(desc.Parts, start, end)
slices, err := gitbs.SliceParts(parts, start, end)
if err != nil {
return nil, uint64(desc.TotalSize), ver, err
return nil, totalSize, ver, err
}
// Stream across part blobs.
@@ -246,72 +231,55 @@ func (gbs *GitBlobstore) openBlobOrDescriptorRange(ctx context.Context, commit g
api: gbs.api,
slices: slices,
}
// Close descriptor blob reader (not used past this point).
_ = rc.Close()
rc = nil
return streamRC, uint64(desc.TotalSize), ver, nil
return streamRC, totalSize, ver, nil
}
func (gbs *GitBlobstore) reopenInlineBlobReader(ctx context.Context, rc io.ReadCloser, blobOID git.OID) (io.ReadCloser, error) {
// Re-open for streaming the full inline blob. (Simpler than splicing peek+rest.)
if rc != nil {
_ = rc.Close()
func (gbs *GitBlobstore) validateAndSizeChunkedParts(ctx context.Context, entries []git.TreeEntry) ([]gitbs.PartRef, uint64, error) {
if len(entries) == 0 {
return nil, 0, fmt.Errorf("gitblobstore: chunked tree has no parts")
}
return gbs.api.BlobReader(ctx, blobOID)
}
func readAtMost(r io.Reader, n int) ([]byte, error) {
if n <= 0 {
return nil, nil
width := len(entries[0].Name)
// First pass: validate names + types, and determine width.
if width < 4 {
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected at least 4 digits)", entries[0].Name)
}
out := make([]byte, 0, n)
buf := make([]byte, min(256, n))
for len(out) < n {
toRead := min(n-len(out), len(buf))
rd, err := r.Read(buf[:toRead])
if rd > 0 {
out = append(out, buf[:rd]...)
parts := make([]gitbs.PartRef, 0, len(entries))
var total uint64
for i, e := range entries {
if e.Type != "blob" {
return nil, 0, fmt.Errorf("gitblobstore: invalid part %q: expected blob, got %q", e.Name, e.Type)
}
if len(e.Name) != width {
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected width %d)", e.Name, width)
}
n, err := strconv.Atoi(e.Name)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, err
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected digits): %w", e.Name, err)
}
}
return out, nil
}
func readFullBlobBounded(r io.Reader, already []byte, blobSize int64, max int) ([]byte, error) {
if blobSize < 0 {
return nil, fmt.Errorf("gitblobstore: invalid blob size %d", blobSize)
}
if int64(len(already)) > blobSize {
// Defensive: callers should pass a prefix read from this same blob reader.
return nil, io.ErrUnexpectedEOF
}
descBytes := append([]byte(nil), already...)
buf := make([]byte, 256)
for int64(len(descBytes)) < blobSize && len(descBytes) < max {
n, err := r.Read(buf)
if n > 0 {
descBytes = append(descBytes, buf[:n]...)
if n != i+1 {
want := fmt.Sprintf("%0*d", width, i+1)
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected %q)", e.Name, want)
}
if want := fmt.Sprintf("%0*d", width, n); want != e.Name {
return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected %q)", e.Name, want)
}
sz, err := gbs.api.BlobSize(ctx, e.OID)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, err
return nil, 0, err
}
}
if int64(len(descBytes)) < blobSize {
if blobSize > int64(max) {
return nil, fmt.Errorf("gitblobstore: descriptor too large (%d bytes, cap %d)", blobSize, max)
if sz < 0 {
return nil, 0, fmt.Errorf("gitblobstore: invalid part size %d for %q", sz, e.Name)
}
return nil, io.ErrUnexpectedEOF
if uint64(sz) > math.MaxUint64-total {
return nil, 0, fmt.Errorf("gitblobstore: total size overflow")
}
total += uint64(sz)
parts = append(parts, gitbs.PartRef{OIDHex: e.OID.String(), Size: uint64(sz)})
}
return descBytes, nil
return parts, total, nil
}
func sliceInlineBlob(rc io.ReadCloser, sz int64, br BlobRange, ver string) (io.ReadCloser, uint64, string, error) {
@@ -1,82 +0,0 @@
// Copyright 2026 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package blobstore
import (
"bytes"
"context"
"errors"
"io"
"testing"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
)
func TestGitBlobstore_CheckAndPut_ChunkedRoundTrip_CreateOnly(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
Identity: testIdentity(),
MaxPartSize: 3,
})
require.NoError(t, err)
want := []byte("abcdefghij") // 10 bytes -> chunked
ver, err := bs.CheckAndPut(ctx, "", "big", int64(len(want)), bytes.NewReader(want))
require.NoError(t, err)
require.NotEmpty(t, ver)
got, ver2, err := GetBytes(ctx, bs, "big", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, want, got)
}
type failReadSeeker struct{}
func (f failReadSeeker) Read(p []byte) (int, error) {
return 0, errors.New("read should not be called")
}
func TestGitBlobstore_CheckAndPut_MismatchDoesNotConsumeReader(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
// Seed any commit so actualVersion != "".
bs0, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{Identity: testIdentity()})
require.NoError(t, err)
_, err = bs0.Put(ctx, "x", 1, bytes.NewReader([]byte("x")))
require.NoError(t, err)
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
Identity: testIdentity(),
MaxPartSize: 3,
})
require.NoError(t, err)
// Provide a wrong expectedVersion; should fail without reading.
_, err = bs.CheckAndPut(ctx, "definitely-wrong", "y", 1, io.Reader(failReadSeeker{}))
require.Error(t, err)
require.True(t, IsCheckAndPutError(err))
}
@@ -15,19 +15,16 @@
package blobstore
import (
"bytes"
"context"
"os/exec"
"testing"
"github.com/stretchr/testify/require"
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
gitbs "github.com/dolthub/dolt/go/store/blobstore/internal/gitbs"
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
)
func TestGitBlobstore_Get_ChunkedDescriptor_AllAndRanges(t *testing.T) {
func TestGitBlobstore_Get_ChunkedTree_AllAndRanges(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found on PATH")
}
@@ -36,49 +33,13 @@ func TestGitBlobstore_Get_ChunkedDescriptor_AllAndRanges(t *testing.T) {
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
runner, err := git.NewRunner(repo.GitDir)
require.NoError(t, err)
api := git.NewGitAPIImpl(runner)
// Create two part blobs.
part1 := []byte("abc")
part2 := []byte("defgh")
oid1, err := api.HashObject(ctx, bytes.NewReader(part1))
commitOID, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
"chunked/0001": part1,
"chunked/0002": part2,
}, "seed chunked tree")
require.NoError(t, err)
oid2, err := api.HashObject(ctx, bytes.NewReader(part2))
require.NoError(t, err)
desc := gitbs.Descriptor{
TotalSize: uint64(len(part1) + len(part2)),
Parts: []gitbs.PartRef{
{OIDHex: oid1.String(), Size: uint64(len(part1))},
{OIDHex: oid2.String(), Size: uint64(len(part2))},
},
}
descBytes, err := gitbs.EncodeDescriptor(desc)
require.NoError(t, err)
descOID, err := api.HashObject(ctx, bytes.NewReader(descBytes))
require.NoError(t, err)
// Build a commit whose tree contains:
// - key "chunked" -> descriptor blob
// - parts staged under reserved parts namespace (reachability)
_, indexFile, cleanup, err := newTempIndex()
require.NoError(t, err)
defer cleanup()
require.NoError(t, api.ReadTreeEmpty(ctx, indexFile))
require.NoError(t, api.UpdateIndexCacheInfo(ctx, indexFile, "100644", descOID, "chunked"))
_, err = stagePartReachable(ctx, api, indexFile, oid1)
require.NoError(t, err)
_, err = stagePartReachable(ctx, api, indexFile, oid2)
require.NoError(t, err)
treeOID, err := api.WriteTree(ctx, indexFile)
require.NoError(t, err)
commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed chunked descriptor", &git.Identity{Name: "t", Email: "t@t"})
require.NoError(t, err)
require.NoError(t, api.UpdateRef(ctx, DoltDataRef, commitOID, "seed"))
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
require.NoError(t, err)
@@ -87,40 +48,47 @@ func TestGitBlobstore_Get_ChunkedDescriptor_AllAndRanges(t *testing.T) {
got, ver, err := GetBytes(ctx, bs, "chunked", AllRange)
require.NoError(t, err)
require.Equal(t, commitOID.String(), ver)
require.Equal(t, commitOID, ver)
require.Equal(t, wantAll, got)
// Range spanning boundary: offset 2 length 4 => "cdef"
got, ver, err = GetBytes(ctx, bs, "chunked", NewBlobRange(2, 4))
require.NoError(t, err)
require.Equal(t, commitOID.String(), ver)
require.Equal(t, commitOID, ver)
require.Equal(t, []byte("cdef"), got)
// Tail read last 3 bytes => "fgh"
got, ver, err = GetBytes(ctx, bs, "chunked", NewBlobRange(-3, 0))
require.NoError(t, err)
require.Equal(t, commitOID.String(), ver)
require.Equal(t, commitOID, ver)
require.Equal(t, []byte("fgh"), got)
// Validate size returned is logical size, not descriptor size.
// Validate size returned is logical size.
rc, sz, ver2, err := bs.Get(ctx, "chunked", NewBlobRange(0, 1))
require.NoError(t, err)
require.Equal(t, uint64(len(wantAll)), sz)
require.Equal(t, commitOID.String(), ver2)
require.Equal(t, commitOID, ver2)
_ = rc.Close()
// Also verify "inline blob that happens to start with magic" is treated as inline
// if it doesn't match the descriptor prefix (magic + size line).
inline := "DOLTBS1\nthis is not a descriptor\n"
inlineCommit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
"inline": []byte(inline),
}, "seed inline magic")
require.NoError(t, err)
bs2, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
require.NoError(t, err)
got2, ver3, err := GetBytes(ctx, bs2, "inline", AllRange)
require.NoError(t, err)
require.Equal(t, inlineCommit, ver3)
require.Equal(t, []byte(inline), got2)
}
func TestGitBlobstore_Get_ChunkedTree_InvalidPartsError(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
// Gap: 0001, 0003
_, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
"chunked/0001": []byte("a"),
"chunked/0003": []byte("b"),
}, "seed invalid chunked tree")
require.NoError(t, err)
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
require.NoError(t, err)
_, _, err = GetBytes(ctx, bs, "chunked", AllRange)
require.Error(t, err)
require.False(t, IsNotFoundError(err))
}
@@ -1,84 +0,0 @@
// Copyright 2026 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package blobstore
import (
"bytes"
"context"
"io"
"testing"
"github.com/stretchr/testify/require"
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
gitbs "github.com/dolthub/dolt/go/store/blobstore/internal/gitbs"
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
)
func TestGitBlobstore_Put_ChunkedUnderMaxPartSize(t *testing.T) {
requireGitOnPath(t)
ctx := context.Background()
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
require.NoError(t, err)
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
Identity: testIdentity(),
MaxPartSize: 3,
})
require.NoError(t, err)
want := []byte("abcdefghij") // 10 bytes -> 3,3,3,1
ver, err := bs.Put(ctx, "big", int64(len(want)), bytes.NewReader(want))
require.NoError(t, err)
got, ver2, err := GetBytes(ctx, bs, "big", AllRange)
require.NoError(t, err)
require.Equal(t, ver, ver2)
require.Equal(t, want, got)
runner, err := git.NewRunner(repo.GitDir)
require.NoError(t, err)
api := git.NewGitAPIImpl(runner)
commit := git.OID(ver)
keyOID, err := api.ResolvePathBlob(ctx, commit, "big")
require.NoError(t, err)
rc, err := api.BlobReader(ctx, keyOID)
require.NoError(t, err)
descBytes, err := io.ReadAll(rc)
require.NoError(t, err)
require.NoError(t, rc.Close())
desc, err := gitbs.ParseDescriptor(descBytes)
require.NoError(t, err)
require.Equal(t, uint64(len(want)), desc.TotalSize)
require.GreaterOrEqual(t, len(desc.Parts), 2)
for _, p := range desc.Parts {
require.LessOrEqual(t, p.Size, uint64(3))
ppath, err := gitbs.PartPath(p.OIDHex)
require.NoError(t, err)
gotOID, err := api.ResolvePathBlob(ctx, commit, ppath)
require.NoError(t, err)
require.Equal(t, git.OID(p.OIDHex), gotOID)
}
// Range spanning boundary (offset 2, length 4) => "cdef"
got, _, err = GetBytes(ctx, bs, "big", NewBlobRange(2, 4))
require.NoError(t, err)
require.Equal(t, []byte("cdef"), got)
}
@@ -15,7 +15,6 @@
package blobstore
import (
"bytes"
"context"
"errors"
"io"
@@ -29,6 +28,7 @@ import (
type fakeGitAPI struct {
tryResolveRefCommit func(ctx context.Context, ref string) (git.OID, bool, error)
resolvePathBlob func(ctx context.Context, commit git.OID, path string) (git.OID, error)
resolvePathObject func(ctx context.Context, commit git.OID, path string) (git.OID, string, error)
blobSize func(ctx context.Context, oid git.OID) (int64, error)
blobReader func(ctx context.Context, oid git.OID) (io.ReadCloser, error)
}
@@ -42,6 +42,12 @@ func (f fakeGitAPI) ResolveRefCommit(ctx context.Context, ref string) (git.OID,
func (f fakeGitAPI) ResolvePathBlob(ctx context.Context, commit git.OID, path string) (git.OID, error) {
return f.resolvePathBlob(ctx, commit, path)
}
func (f fakeGitAPI) ResolvePathObject(ctx context.Context, commit git.OID, path string) (git.OID, string, error) {
return f.resolvePathObject(ctx, commit, path)
}
func (f fakeGitAPI) ListTree(ctx context.Context, commit git.OID, treePath string) ([]git.TreeEntry, error) {
panic("unexpected call")
}
func (f fakeGitAPI) CatFileType(ctx context.Context, oid git.OID) (string, error) {
panic("unexpected call")
}
@@ -63,6 +69,9 @@ func (f fakeGitAPI) ReadTreeEmpty(ctx context.Context, indexFile string) error {
func (f fakeGitAPI) UpdateIndexCacheInfo(ctx context.Context, indexFile string, mode string, oid git.OID, path string) error {
panic("unexpected call")
}
func (f fakeGitAPI) RemoveIndexPaths(ctx context.Context, indexFile string, paths []string) error {
panic("unexpected call")
}
func (f fakeGitAPI) WriteTree(ctx context.Context, indexFile string) (git.OID, error) {
panic("unexpected call")
}
@@ -76,16 +85,6 @@ func (f fakeGitAPI) UpdateRef(ctx context.Context, ref string, newOID git.OID, m
panic("unexpected call")
}
type trackingReadCloser struct {
io.Reader
closed bool
}
func (t *trackingReadCloser) Close() error {
t.closed = true
return nil
}
func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) {
ctx := context.Background()
@@ -146,35 +145,36 @@ func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) {
})
}
func TestGitBlobstoreHelpers_resolveBlobForGet(t *testing.T) {
func TestGitBlobstoreHelpers_resolveObjectForGet(t *testing.T) {
ctx := context.Background()
commit := git.OID("0123456789abcdef0123456789abcdef01234567")
t.Run("ok", func(t *testing.T) {
api := fakeGitAPI{
resolvePathBlob: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, error) {
resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, string, error) {
require.Equal(t, commit, gotCommit)
require.Equal(t, "k", path)
return git.OID("89abcdef0123456789abcdef0123456789abcdef"), nil
return git.OID("89abcdef0123456789abcdef0123456789abcdef"), "blob", nil
},
}
gbs := &GitBlobstore{api: api}
oid, ver, err := gbs.resolveBlobForGet(ctx, commit, "k")
oid, typ, ver, err := gbs.resolveObjectForGet(ctx, commit, "k")
require.NoError(t, err)
require.Equal(t, "0123456789abcdef0123456789abcdef01234567", ver)
require.Equal(t, "blob", typ)
require.Equal(t, git.OID("89abcdef0123456789abcdef0123456789abcdef"), oid)
})
t.Run("pathNotFoundMapsToNotFound", func(t *testing.T) {
api := fakeGitAPI{
resolvePathBlob: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, error) {
return git.OID(""), &git.PathNotFoundError{Commit: gotCommit.String(), Path: path}
resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, string, error) {
return git.OID(""), "", &git.PathNotFoundError{Commit: gotCommit.String(), Path: path}
},
}
gbs := &GitBlobstore{api: api}
_, ver, err := gbs.resolveBlobForGet(ctx, commit, "k")
_, _, ver, err := gbs.resolveObjectForGet(ctx, commit, "k")
require.Equal(t, commit.String(), ver)
var nf NotFound
require.ErrorAs(t, err, &nf)
@@ -203,48 +203,33 @@ func TestGitBlobstoreHelpers_resolveBlobSizeForGet(t *testing.T) {
})
}
func TestGitBlobstoreHelpers_reopenInlineBlobReaderClosesOriginal(t *testing.T) {
func TestGitBlobstoreHelpers_validateAndSizeChunkedParts(t *testing.T) {
ctx := context.Background()
blobOID := git.OID("0123456789abcdef0123456789abcdef01234567")
orig := &trackingReadCloser{Reader: bytes.NewReader([]byte("x"))}
api := fakeGitAPI{
blobReader: func(ctx context.Context, gotOID git.OID) (io.ReadCloser, error) {
require.Equal(t, blobOID, gotOID)
return io.NopCloser(bytes.NewReader([]byte("y"))), nil
blobSize: func(ctx context.Context, oid git.OID) (int64, error) {
switch oid {
case "0123456789abcdef0123456789abcdef01234567":
return 3, nil
case "89abcdef0123456789abcdef0123456789abcdef":
return 5, nil
default:
return 0, errors.New("unexpected oid")
}
},
}
gbs := &GitBlobstore{api: api}
rc, err := gbs.reopenInlineBlobReader(ctx, orig, blobOID)
parts, total, err := gbs.validateAndSizeChunkedParts(ctx, []git.TreeEntry{
{Name: "0001", Type: "blob", OID: "0123456789abcdef0123456789abcdef01234567"},
{Name: "0002", Type: "blob", OID: "89abcdef0123456789abcdef0123456789abcdef"},
})
require.NoError(t, err)
require.True(t, orig.closed)
require.NotNil(t, rc)
_ = rc.Close()
}
require.Equal(t, uint64(8), total)
require.Len(t, parts, 2)
require.Equal(t, "0123456789abcdef0123456789abcdef01234567", parts[0].OIDHex)
require.Equal(t, uint64(3), parts[0].Size)
func TestReadAtMost(t *testing.T) {
out, err := readAtMost(bytes.NewReader([]byte("hello")), 3)
require.NoError(t, err)
require.Equal(t, []byte("hel"), out)
out, err = readAtMost(bytes.NewReader([]byte("hi")), 3)
require.NoError(t, err)
require.Equal(t, []byte("hi"), out)
}
func TestReadFullBlobBounded(t *testing.T) {
// Reads through to blobSize when within max.
// Note: |already| is expected to be a prefix read from |r|, so |r| must represent the
// remaining stream after the prefix has been consumed.
r := bytes.NewReader([]byte("cdef"))
got, err := readFullBlobBounded(r, []byte("ab"), 6, 64)
require.NoError(t, err)
require.Equal(t, []byte("abcdef"), got)
// Errors if blobSize exceeds max and we hit the cap.
r = bytes.NewReader(bytes.Repeat([]byte("x"), 100))
_, err = readFullBlobBounded(r, bytes.Repeat([]byte("x"), 10), 100000, 10)
_, _, err = gbs.validateAndSizeChunkedParts(ctx, []git.TreeEntry{{Name: "1", Type: "blob", OID: "0123456789abcdef0123456789abcdef01234567"}})
require.Error(t, err)
require.Contains(t, err.Error(), "descriptor too large")
}
@@ -27,6 +27,16 @@ import (
gitbs "github.com/dolthub/dolt/go/store/blobstore/internal/gitbs"
)
type trackingReadCloser struct {
io.Reader
closed bool
}
func (t *trackingReadCloser) Close() error {
t.closed = true
return nil
}
func TestMultiPartReadCloser_ReadConcatenatesAcrossPartsWithOffsets(t *testing.T) {
ctx := context.Background()
+25
View File
@@ -34,6 +34,18 @@ type GitAPI interface {
// resolves to a non-blob object.
ResolvePathBlob(ctx context.Context, commit OID, path string) (OID, error)
// ResolvePathObject resolves |path| within |commit| to an object OID and type.
// It returns PathNotFoundError if the path does not exist.
//
// Typical types are "blob" and "tree".
ResolvePathObject(ctx context.Context, commit OID, path string) (oid OID, typ string, err error)
// ListTree lists the entries of the tree at |treePath| within |commit|.
// The listing is non-recursive: it returns only immediate children.
//
// It returns PathNotFoundError if |treePath| does not exist.
ListTree(ctx context.Context, commit OID, treePath string) ([]TreeEntry, error)
// CatFileType returns the git object type for |oid| (e.g. "blob", "tree", "commit").
CatFileType(ctx context.Context, oid OID) (string, error)
@@ -63,6 +75,11 @@ type GitAPI interface {
// GIT_DIR=... GIT_INDEX_FILE=<indexFile> git update-index --add --cacheinfo <mode> <oid> <path>
UpdateIndexCacheInfo(ctx context.Context, indexFile string, mode string, oid OID, path string) error
// RemoveIndexPaths removes |paths| from |indexFile| if present.
// Equivalent plumbing:
// GIT_DIR=... GIT_INDEX_FILE=<indexFile> git update-index --remove -z --stdin
RemoveIndexPaths(ctx context.Context, indexFile string, paths []string) error
// WriteTree writes a tree object from the contents of |indexFile| and returns its oid.
// Equivalent plumbing:
// GIT_DIR=... GIT_INDEX_FILE=<indexFile> git write-tree
@@ -84,6 +101,14 @@ type GitAPI interface {
UpdateRef(ctx context.Context, ref string, newOID OID, msg string) error
}
// TreeEntry describes one entry in a git tree listing.
type TreeEntry struct {
Mode string
Type string
OID OID
Name string
}
// Identity represents git author/committer metadata. A future implementation may set
// this via environment variables (GIT_AUTHOR_NAME, etc.).
type Identity struct {
+97
View File
@@ -88,6 +88,62 @@ func (a *GitAPIImpl) ResolvePathBlob(ctx context.Context, commit OID, path strin
return OID(oid), nil
}
func (a *GitAPIImpl) ResolvePathObject(ctx context.Context, commit OID, path string) (oid OID, typ string, err error) {
spec := commit.String() + ":" + path
out, err := a.r.Run(ctx, RunOptions{}, "rev-parse", "--verify", spec)
if err != nil {
if isPathNotFoundErr(err) {
return "", "", &PathNotFoundError{Commit: commit.String(), Path: path}
}
return "", "", err
}
oidStr := strings.TrimSpace(string(out))
if oidStr == "" {
return "", "", fmt.Errorf("git rev-parse returned empty oid for %q", spec)
}
typ, err = a.CatFileType(ctx, OID(oidStr))
if err != nil {
return "", "", err
}
return OID(oidStr), typ, nil
}
func (a *GitAPIImpl) ListTree(ctx context.Context, commit OID, treePath string) ([]TreeEntry, error) {
// Note: `git ls-tree <tree-ish>` accepts a tree-ish of the form "<commit>:<path>".
// Use that to list children of a tree path without needing to pre-resolve the tree OID.
spec := commit.String()
if treePath != "" {
spec = spec + ":" + treePath
} else {
spec = spec + "^{tree}"
}
out, err := a.r.Run(ctx, RunOptions{}, "ls-tree", spec)
if err != nil {
if isPathNotFoundErr(err) && treePath != "" {
return nil, &PathNotFoundError{Commit: commit.String(), Path: treePath}
}
return nil, err
}
lines := strings.Split(strings.TrimRight(string(out), "\n"), "\n")
if len(lines) == 1 && strings.TrimSpace(lines[0]) == "" {
return nil, nil
}
entries := make([]TreeEntry, 0, len(lines))
for _, line := range lines {
if strings.TrimSpace(line) == "" {
continue
}
e, err := parseLsTreeLine(line)
if err != nil {
return nil, err
}
entries = append(entries, e)
}
return entries, nil
}
func (a *GitAPIImpl) CatFileType(ctx context.Context, oid OID) (string, error) {
out, err := a.r.Run(ctx, RunOptions{}, "cat-file", "-t", oid.String())
if err != nil {
@@ -141,6 +197,26 @@ func (a *GitAPIImpl) UpdateIndexCacheInfo(ctx context.Context, indexFile string,
return err
}
func (a *GitAPIImpl) RemoveIndexPaths(ctx context.Context, indexFile string, paths []string) error {
if len(paths) == 0 {
return nil
}
var buf bytes.Buffer
// `git update-index --remove` is about removing *missing worktree files*, and requires a worktree.
// For bare repos / index-only workflows, use `--index-info` to remove paths by writing mode "0".
//
// Format:
// <mode> <object> <stage>\t<path>\n
// To remove:
// 0 0000000000000000000000000000000000000000 0\t<path>\n
const zeroOID = "0000000000000000000000000000000000000000"
for _, p := range paths {
fmt.Fprintf(&buf, "0 %s 0\t%s\n", zeroOID, p)
}
_, err := a.r.Run(ctx, RunOptions{IndexFile: indexFile, Stdin: &buf}, "update-index", "--index-info")
return err
}
func (a *GitAPIImpl) WriteTree(ctx context.Context, indexFile string) (OID, error) {
out, err := a.r.Run(ctx, RunOptions{IndexFile: indexFile}, "write-tree")
if err != nil {
@@ -243,3 +319,24 @@ func isPathNotFoundErr(err error) bool {
}
return false
}
func parseLsTreeLine(line string) (TreeEntry, error) {
// Format (one entry):
// <mode> SP <type> SP <oid>\t<name>
// Example:
// 100644 blob e69de29bb2d1d6434b8b29ae775ad8c2e48c5391\tfile.txt
parts := strings.SplitN(line, "\t", 2)
if len(parts) != 2 {
return TreeEntry{}, fmt.Errorf("git ls-tree: malformed line %q", line)
}
left := strings.Fields(parts[0])
if len(left) != 3 {
return TreeEntry{}, fmt.Errorf("git ls-tree: malformed line %q", line)
}
return TreeEntry{
Mode: left[0],
Type: left[1],
OID: OID(left[2]),
Name: parts[1],
}, nil
}
@@ -366,6 +366,216 @@ func TestGitAPIImpl_UpdateIndexCacheInfo_FileDirectoryConflictErrors(t *testing.
}
}
func TestGitAPIImpl_ResolvePathObject_BlobAndTree(t *testing.T) {
t.Parallel()
ctx := context.Background()
repo, err := gitrepo.InitBareTemp(ctx, "")
if err != nil {
t.Fatal(err)
}
r, err := NewRunner(repo.GitDir)
if err != nil {
t.Fatal(err)
}
api := NewGitAPIImpl(r)
indexFile := tempIndexFile(t)
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
t.Fatal(err)
}
blobOID, err := api.HashObject(ctx, bytes.NewReader([]byte("hi\n")))
if err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, "dir/file.txt"); err != nil {
t.Fatal(err)
}
treeOID, err := api.WriteTree(ctx, indexFile)
if err != nil {
t.Fatal(err)
}
commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed", testAuthor())
if err != nil {
t.Fatal(err)
}
gotOID, gotTyp, err := api.ResolvePathObject(ctx, commitOID, "dir/file.txt")
if err != nil {
t.Fatal(err)
}
if gotTyp != "blob" {
t.Fatalf("expected type blob, got %q", gotTyp)
}
if gotOID != blobOID {
t.Fatalf("expected oid %q, got %q", blobOID, gotOID)
}
_, gotTyp, err = api.ResolvePathObject(ctx, commitOID, "dir")
if err != nil {
t.Fatal(err)
}
if gotTyp != "tree" {
t.Fatalf("expected type tree, got %q", gotTyp)
}
}
func TestGitAPIImpl_ListTree_NonRecursive(t *testing.T) {
t.Parallel()
ctx := context.Background()
repo, err := gitrepo.InitBareTemp(ctx, "")
if err != nil {
t.Fatal(err)
}
r, err := NewRunner(repo.GitDir)
if err != nil {
t.Fatal(err)
}
api := NewGitAPIImpl(r)
indexFile := tempIndexFile(t)
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
t.Fatal(err)
}
oidA, err := api.HashObject(ctx, bytes.NewReader([]byte("a\n")))
if err != nil {
t.Fatal(err)
}
oidB, err := api.HashObject(ctx, bytes.NewReader([]byte("b\n")))
if err != nil {
t.Fatal(err)
}
oidX, err := api.HashObject(ctx, bytes.NewReader([]byte("x\n")))
if err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidA, "dir/a.txt"); err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidB, "dir/b.txt"); err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidX, "dir/sub/x.txt"); err != nil {
t.Fatal(err)
}
treeOID, err := api.WriteTree(ctx, indexFile)
if err != nil {
t.Fatal(err)
}
commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed", testAuthor())
if err != nil {
t.Fatal(err)
}
entries, err := api.ListTree(ctx, commitOID, "dir")
if err != nil {
t.Fatal(err)
}
// Expect: a.txt (blob), b.txt (blob), sub (tree)
if len(entries) != 3 {
t.Fatalf("expected 3 entries, got %d: %+v", len(entries), entries)
}
var gotA, gotB, gotSub bool
for _, e := range entries {
switch e.Name {
case "a.txt":
gotA = true
if e.Type != "blob" || e.OID != oidA {
t.Fatalf("unexpected a.txt entry: %+v", e)
}
case "b.txt":
gotB = true
if e.Type != "blob" || e.OID != oidB {
t.Fatalf("unexpected b.txt entry: %+v", e)
}
case "sub":
gotSub = true
if e.Type != "tree" || e.OID == "" {
t.Fatalf("unexpected sub entry: %+v", e)
}
default:
t.Fatalf("unexpected entry: %+v", e)
}
}
if !gotA || !gotB || !gotSub {
t.Fatalf("missing expected entries: gotA=%v gotB=%v gotSub=%v", gotA, gotB, gotSub)
}
}
func TestGitAPIImpl_RemoveIndexPaths_RemovesFromIndex(t *testing.T) {
t.Parallel()
ctx := context.Background()
repo, err := gitrepo.InitBareTemp(ctx, "")
if err != nil {
t.Fatal(err)
}
r, err := NewRunner(repo.GitDir)
if err != nil {
t.Fatal(err)
}
api := NewGitAPIImpl(r)
indexFile := tempIndexFile(t)
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
t.Fatal(err)
}
oidA, err := api.HashObject(ctx, bytes.NewReader([]byte("a\n")))
if err != nil {
t.Fatal(err)
}
oidB, err := api.HashObject(ctx, bytes.NewReader([]byte("b\n")))
if err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidA, "a.txt"); err != nil {
t.Fatal(err)
}
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidB, "b.txt"); err != nil {
t.Fatal(err)
}
if err := api.RemoveIndexPaths(ctx, indexFile, []string{"a.txt"}); err != nil {
t.Fatal(err)
}
treeOID, err := api.WriteTree(ctx, indexFile)
if err != nil {
t.Fatal(err)
}
commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed", testAuthor())
if err != nil {
t.Fatal(err)
}
// a.txt removed, b.txt still present
_, err = api.ResolvePathBlob(ctx, commitOID, "a.txt")
if err == nil {
t.Fatalf("expected a.txt missing")
}
var pnf *PathNotFoundError
if !errors.As(err, &pnf) {
t.Fatalf("expected PathNotFoundError, got %T: %v", err, err)
}
gotB, err := api.ResolvePathBlob(ctx, commitOID, "b.txt")
if err != nil {
t.Fatal(err)
}
if gotB != oidB {
t.Fatalf("expected b.txt oid %q, got %q", oidB, gotB)
}
}
func TestGitAPIImpl_ReadTree_PreservesExistingPaths(t *testing.T) {
t.Parallel()