mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-26 00:51:33 -06:00
Merge pull request #10461 from dolthub/angelamayxie-46fe127d
[auto-bump] [no-release-notes] dependency by angelamayxie
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -25,3 +25,5 @@ CLAUDE.md
|
||||
.beads
|
||||
.gitattributes
|
||||
|
||||
.de/
|
||||
AGENTS.md
|
||||
|
||||
@@ -61,7 +61,7 @@ require (
|
||||
github.com/dolthub/dolt-mcp v0.2.2
|
||||
github.com/dolthub/eventsapi_schema v0.0.0-20260205214132-a7a3c84c84a1
|
||||
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
|
||||
github.com/dolthub/go-mysql-server v0.20.1-0.20260210000147-1ce36a7d1e8f
|
||||
github.com/dolthub/go-mysql-server v0.20.1-0.20260210005347-46fe127d0460
|
||||
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63
|
||||
github.com/edsrzf/mmap-go v1.2.0
|
||||
github.com/esote/minmaxheap v1.0.0
|
||||
|
||||
@@ -196,8 +196,8 @@ github.com/dolthub/fslock v0.0.0-20251215194149-ef20baba2318 h1:n+vdH5G5Db+1qnDC
|
||||
github.com/dolthub/fslock v0.0.0-20251215194149-ef20baba2318/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
|
||||
github.com/dolthub/go-icu-regex v0.0.0-20250916051405-78a38d478790 h1:zxMsH7RLiG+dlZ/y0LgJHTV26XoiSJcuWq+em6t6VVc=
|
||||
github.com/dolthub/go-icu-regex v0.0.0-20250916051405-78a38d478790/go.mod h1:F3cnm+vMRK1HaU6+rNqQrOCyR03HHhR1GWG2gnPOqaE=
|
||||
github.com/dolthub/go-mysql-server v0.20.1-0.20260210000147-1ce36a7d1e8f h1:1XL5lO5pbL6xomeC5DBzfT9pUoDZGpd9809TFsDiEWY=
|
||||
github.com/dolthub/go-mysql-server v0.20.1-0.20260210000147-1ce36a7d1e8f/go.mod h1:LEWdXw6LKjdonOv2X808RpUc8wZVtQx4ZEPvmDWkvY4=
|
||||
github.com/dolthub/go-mysql-server v0.20.1-0.20260210005347-46fe127d0460 h1:ku4qVcwZUUImcaWOOrPWwhjD5BD34wS6LuENxU3XJUU=
|
||||
github.com/dolthub/go-mysql-server v0.20.1-0.20260210005347-46fe127d0460/go.mod h1:LEWdXw6LKjdonOv2X808RpUc8wZVtQx4ZEPvmDWkvY4=
|
||||
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 h1:OAsXLAPL4du6tfbBgK0xXHZkOlos63RdKYS3Sgw/dfI=
|
||||
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63/go.mod h1:lV7lUeuDhH5thVGDCKXbatwKy2KW80L4rMT46n+Y2/Q=
|
||||
github.com/dolthub/ishell v0.0.0-20240701202509-2b217167d718 h1:lT7hE5k+0nkBdj/1UOSFwjWpNxf+LCApbRHgnCA17XE=
|
||||
|
||||
@@ -386,10 +386,11 @@ func (tx *DoltTransaction) doCommit(
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("database %s unknown to transaction, this is a bug", dbName)
|
||||
}
|
||||
normalizedDbName := strings.ToLower(branchState.dbState.dbName)
|
||||
|
||||
// Load the start state for this working set from the noms root at tx start
|
||||
// Get the base DB name from the db state, not the branch state
|
||||
startPoint, ok := tx.dbStartPoints[strings.ToLower(branchState.dbState.dbName)]
|
||||
startPoint, ok := tx.dbStartPoints[normalizedDbName]
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("database %s unknown to transaction, this is a bug", dbName)
|
||||
}
|
||||
@@ -403,7 +404,7 @@ func (tx *DoltTransaction) doCommit(
|
||||
|
||||
mergeOpts := branchState.EditOpts()
|
||||
|
||||
lockID := dbName + "\u0000" + workingSet.Ref().String()
|
||||
lockID := normalizedDbName + "\u0000" + workingSet.Ref().String()
|
||||
|
||||
for i := 0; i < maxTxCommitRetries; i++ {
|
||||
updatedWs, newCommit, err := func() (*doltdb.WorkingSet, *doltdb.Commit, error) {
|
||||
@@ -501,7 +502,6 @@ func (tx *DoltTransaction) mergeRoots(
|
||||
workingSet *doltdb.WorkingSet,
|
||||
mergeOpts editor.Options,
|
||||
) (*doltdb.WorkingSet, error) {
|
||||
|
||||
tableResolver, err := GetTableResolver(ctx, dbName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -33,7 +33,12 @@ type Blobstore interface {
|
||||
// Get returns a byte range of from the blob keyed by |key|, and the latest store version.
|
||||
Get(ctx context.Context, key string, br BlobRange) (rc io.ReadCloser, size uint64, version string, err error)
|
||||
|
||||
// Put creates a new blob from |reader| keyed by |key|, it returns the latest store version.
|
||||
// Put stores a blob from |reader| keyed by |key|, returning the latest store version.
|
||||
//
|
||||
// If |key| already exists, behavior is implementation-defined: some Blobstore
|
||||
// implementations overwrite, while others may treat Put as idempotent and fast-succeed
|
||||
// without consuming |reader|. Callers that require an explicit check-and-set should use
|
||||
// CheckAndPut.
|
||||
Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (version string, err error)
|
||||
|
||||
// CheckAndPut updates the blob keyed by |key| using a check-and-set on |expectedVersion|.
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
81
go/store/blobstore/git_blobstore_chunked_checkandput_test.go
Normal file
81
go/store/blobstore/git_blobstore_chunked_checkandput_test.go
Normal file
@@ -0,0 +1,81 @@
|
||||
// Copyright 2026 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package blobstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
|
||||
)
|
||||
|
||||
func TestGitBlobstore_CheckAndPut_ChunkedRoundTrip_CreateOnly(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 3,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
want := []byte("abcdefghij") // 10 bytes -> chunked tree
|
||||
ver, err := bs.CheckAndPut(ctx, "", "big", int64(len(want)), bytes.NewReader(want))
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver)
|
||||
|
||||
got, ver2, err := GetBytes(ctx, bs, "big", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver, ver2)
|
||||
require.Equal(t, want, got)
|
||||
}
|
||||
|
||||
type chunkedFailReader struct{}
|
||||
|
||||
func (chunkedFailReader) Read(_ []byte) (int, error) {
|
||||
return 0, errors.New("read should not be called")
|
||||
}
|
||||
|
||||
func TestGitBlobstore_CheckAndPut_MismatchDoesNotConsumeReader_WithChunkingEnabled(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Seed any commit so actualVersion != "".
|
||||
bs0, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{Identity: testIdentity()})
|
||||
require.NoError(t, err)
|
||||
_, err = bs0.Put(ctx, "x", 1, bytes.NewReader([]byte("x")))
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 3,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = bs.CheckAndPut(ctx, "definitely-wrong", "y", 1, io.Reader(chunkedFailReader{}))
|
||||
require.Error(t, err)
|
||||
require.True(t, IsCheckAndPutError(err))
|
||||
}
|
||||
101
go/store/blobstore/git_blobstore_chunked_get_test.go
Normal file
101
go/store/blobstore/git_blobstore_chunked_get_test.go
Normal file
@@ -0,0 +1,101 @@
|
||||
// Copyright 2026 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package blobstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os/exec"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
|
||||
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
|
||||
)
|
||||
|
||||
func TestGitBlobstore_Get_ChunkedTree_AllAndRanges(t *testing.T) {
|
||||
if _, err := exec.LookPath("git"); err != nil {
|
||||
t.Skip("git not found on PATH")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
part1 := []byte("abc")
|
||||
part2 := []byte("defgh")
|
||||
commitOID, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"chunked/0001": part1,
|
||||
"chunked/0002": part2,
|
||||
}, "seed chunked tree")
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
treeOID, _, err := api.ResolvePathObject(ctx, git.OID(commitOID), "chunked")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
wantAll := append(append([]byte(nil), part1...), part2...)
|
||||
|
||||
got, ver, err := GetBytes(ctx, bs, "chunked", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, treeOID.String(), ver)
|
||||
require.Equal(t, wantAll, got)
|
||||
|
||||
// Range spanning boundary: offset 2 length 4 => "cdef"
|
||||
got, ver, err = GetBytes(ctx, bs, "chunked", NewBlobRange(2, 4))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, treeOID.String(), ver)
|
||||
require.Equal(t, []byte("cdef"), got)
|
||||
|
||||
// Tail read last 3 bytes => "fgh"
|
||||
got, ver, err = GetBytes(ctx, bs, "chunked", NewBlobRange(-3, 0))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, treeOID.String(), ver)
|
||||
require.Equal(t, []byte("fgh"), got)
|
||||
|
||||
// Validate size returned is logical size.
|
||||
rc, sz, ver2, err := bs.Get(ctx, "chunked", NewBlobRange(0, 1))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(len(wantAll)), sz)
|
||||
require.Equal(t, treeOID.String(), ver2)
|
||||
_ = rc.Close()
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Get_ChunkedTree_InvalidPartsError(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Gap: 0001, 0003
|
||||
_, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{
|
||||
"chunked/0001": []byte("a"),
|
||||
"chunked/0003": []byte("b"),
|
||||
}, "seed invalid chunked tree")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = GetBytes(ctx, bs, "chunked", AllRange)
|
||||
require.Error(t, err)
|
||||
require.False(t, IsNotFoundError(err))
|
||||
}
|
||||
121
go/store/blobstore/git_blobstore_chunked_put_test.go
Normal file
121
go/store/blobstore/git_blobstore_chunked_put_test.go
Normal file
@@ -0,0 +1,121 @@
|
||||
// Copyright 2026 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package blobstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
|
||||
"github.com/dolthub/dolt/go/store/testutils/gitrepo"
|
||||
)
|
||||
|
||||
func TestGitBlobstore_Put_ChunkedWritesTreeParts(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 3,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
want := []byte("abcdefghij") // 10 bytes -> 3,3,3,1
|
||||
ver, err := bs.Put(ctx, "big", int64(len(want)), bytes.NewReader(want))
|
||||
require.NoError(t, err)
|
||||
|
||||
got, ver2, err := GetBytes(ctx, bs, "big", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver, ver2)
|
||||
require.Equal(t, want, got)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
|
||||
head, ok, err := api.TryResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
|
||||
_, typ, err := api.ResolvePathObject(ctx, head, "big")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.ObjectTypeTree, typ)
|
||||
|
||||
entries, err := api.ListTree(ctx, head, "big")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, entries, 4)
|
||||
require.Equal(t, "00000001", entries[0].Name)
|
||||
require.Equal(t, "00000004", entries[3].Name)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Put_IdempotentDoesNotChangeExistingRepresentation(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 3,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
|
||||
// blob stays blob (even if the caller would have triggered chunked mode)
|
||||
verBlob, err := bs.Put(ctx, "k", 2, bytes.NewReader([]byte("hi")))
|
||||
require.NoError(t, err)
|
||||
verNoop, err := bs.Put(ctx, "k", 10, putShouldNotRead{})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, verBlob, verNoop)
|
||||
|
||||
head1, ok, err := api.TryResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
_, typ, err := api.ResolvePathObject(ctx, head1, "k")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.ObjectTypeBlob, typ)
|
||||
|
||||
got, _, err := GetBytes(ctx, bs, "k", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("hi"), got)
|
||||
|
||||
// tree stays tree
|
||||
verTree, err := bs.Put(ctx, "ktree", 10, bytes.NewReader([]byte("abcdefghij")))
|
||||
require.NoError(t, err)
|
||||
head2, ok, err := api.TryResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
_, typ, err = api.ResolvePathObject(ctx, head2, "ktree")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.ObjectTypeTree, typ)
|
||||
|
||||
verTreeNoop, err := bs.Put(ctx, "ktree", 2, putShouldNotRead{})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, verTree, verTreeNoop)
|
||||
|
||||
got, _, err = GetBytes(ctx, bs, "ktree", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("abcdefghij"), got)
|
||||
}
|
||||
412
go/store/blobstore/git_blobstore_helpers_test.go
Normal file
412
go/store/blobstore/git_blobstore_helpers_test.go
Normal file
@@ -0,0 +1,412 @@
|
||||
// Copyright 2026 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package blobstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
|
||||
)
|
||||
|
||||
type fakeGitAPI struct {
|
||||
tryResolveRefCommit func(ctx context.Context, ref string) (git.OID, bool, error)
|
||||
resolvePathBlob func(ctx context.Context, commit git.OID, path string) (git.OID, error)
|
||||
resolvePathObject func(ctx context.Context, commit git.OID, path string) (git.OID, git.ObjectType, error)
|
||||
listTree func(ctx context.Context, commit git.OID, treePath string) ([]git.TreeEntry, error)
|
||||
blobSize func(ctx context.Context, oid git.OID) (int64, error)
|
||||
blobReader func(ctx context.Context, oid git.OID) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
func (f fakeGitAPI) TryResolveRefCommit(ctx context.Context, ref string) (git.OID, bool, error) {
|
||||
return f.tryResolveRefCommit(ctx, ref)
|
||||
}
|
||||
func (f fakeGitAPI) ResolveRefCommit(ctx context.Context, ref string) (git.OID, error) {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) ResolvePathBlob(ctx context.Context, commit git.OID, path string) (git.OID, error) {
|
||||
return f.resolvePathBlob(ctx, commit, path)
|
||||
}
|
||||
func (f fakeGitAPI) ResolvePathObject(ctx context.Context, commit git.OID, path string) (git.OID, git.ObjectType, error) {
|
||||
return f.resolvePathObject(ctx, commit, path)
|
||||
}
|
||||
func (f fakeGitAPI) ListTree(ctx context.Context, commit git.OID, treePath string) ([]git.TreeEntry, error) {
|
||||
return f.listTree(ctx, commit, treePath)
|
||||
}
|
||||
func (f fakeGitAPI) CatFileType(ctx context.Context, oid git.OID) (string, error) {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) BlobSize(ctx context.Context, oid git.OID) (int64, error) {
|
||||
return f.blobSize(ctx, oid)
|
||||
}
|
||||
func (f fakeGitAPI) BlobReader(ctx context.Context, oid git.OID) (io.ReadCloser, error) {
|
||||
return f.blobReader(ctx, oid)
|
||||
}
|
||||
func (f fakeGitAPI) HashObject(ctx context.Context, contents io.Reader) (git.OID, error) {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) ReadTree(ctx context.Context, commit git.OID, indexFile string) error {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) ReadTreeEmpty(ctx context.Context, indexFile string) error {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) UpdateIndexCacheInfo(ctx context.Context, indexFile string, mode string, oid git.OID, path string) error {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) RemoveIndexPaths(ctx context.Context, indexFile string, paths []string) error {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) WriteTree(ctx context.Context, indexFile string) (git.OID, error) {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) CommitTree(ctx context.Context, tree git.OID, parent *git.OID, message string, author *git.Identity) (git.OID, error) {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) UpdateRefCAS(ctx context.Context, ref string, newOID git.OID, oldOID git.OID, msg string) error {
|
||||
panic("unexpected call")
|
||||
}
|
||||
func (f fakeGitAPI) UpdateRef(ctx context.Context, ref string, newOID git.OID, msg string) error {
|
||||
panic("unexpected call")
|
||||
}
|
||||
|
||||
func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("ok", func(t *testing.T) {
|
||||
api := fakeGitAPI{
|
||||
tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) {
|
||||
require.Equal(t, DoltDataRef, ref)
|
||||
return git.OID("0123456789abcdef0123456789abcdef01234567"), true, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{ref: DoltDataRef, api: api}
|
||||
|
||||
commit, err := gbs.resolveCommitForGet(ctx, "k")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.OID("0123456789abcdef0123456789abcdef01234567"), commit)
|
||||
})
|
||||
|
||||
t.Run("missingRef_manifestIsNotFound", func(t *testing.T) {
|
||||
api := fakeGitAPI{
|
||||
tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) {
|
||||
return git.OID(""), false, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{ref: DoltDataRef, api: api}
|
||||
|
||||
_, err := gbs.resolveCommitForGet(ctx, "manifest")
|
||||
var nf NotFound
|
||||
require.ErrorAs(t, err, &nf)
|
||||
require.Equal(t, "manifest", nf.Key)
|
||||
})
|
||||
|
||||
t.Run("missingRef_nonManifestIsRefNotFound", func(t *testing.T) {
|
||||
api := fakeGitAPI{
|
||||
tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) {
|
||||
return git.OID(""), false, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{ref: DoltDataRef, api: api}
|
||||
|
||||
_, err := gbs.resolveCommitForGet(ctx, "somekey")
|
||||
var rnf *git.RefNotFoundError
|
||||
require.ErrorAs(t, err, &rnf)
|
||||
require.Equal(t, DoltDataRef, rnf.Ref)
|
||||
})
|
||||
|
||||
t.Run("propagatesError", func(t *testing.T) {
|
||||
sentinel := errors.New("boom")
|
||||
api := fakeGitAPI{
|
||||
tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) {
|
||||
return git.OID(""), false, sentinel
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{ref: DoltDataRef, api: api}
|
||||
|
||||
_, err := gbs.resolveCommitForGet(ctx, "k")
|
||||
require.ErrorIs(t, err, sentinel)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGitBlobstoreHelpers_resolveObjectForGet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
commit := git.OID("0123456789abcdef0123456789abcdef01234567")
|
||||
|
||||
t.Run("ok", func(t *testing.T) {
|
||||
api := fakeGitAPI{
|
||||
resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) {
|
||||
require.Equal(t, commit, gotCommit)
|
||||
require.Equal(t, "k", path)
|
||||
return git.OID("89abcdef0123456789abcdef0123456789abcdef"), git.ObjectTypeBlob, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{api: api}
|
||||
|
||||
oid, typ, err := gbs.resolveObjectForGet(ctx, commit, "k")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.ObjectTypeBlob, typ)
|
||||
require.Equal(t, git.OID("89abcdef0123456789abcdef0123456789abcdef"), oid)
|
||||
})
|
||||
|
||||
t.Run("pathNotFoundMapsToNotFound", func(t *testing.T) {
|
||||
api := fakeGitAPI{
|
||||
resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) {
|
||||
return git.OID(""), git.ObjectTypeUnknown, &git.PathNotFoundError{Commit: gotCommit.String(), Path: path}
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{api: api}
|
||||
|
||||
_, _, err := gbs.resolveObjectForGet(ctx, commit, "k")
|
||||
var nf NotFound
|
||||
require.ErrorAs(t, err, &nf)
|
||||
require.Equal(t, "k", nf.Key)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGitBlobstoreHelpers_resolveBlobSizeForGet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
commit := git.OID("0123456789abcdef0123456789abcdef01234567")
|
||||
oid := git.OID("89abcdef0123456789abcdef0123456789abcdef")
|
||||
|
||||
t.Run("ok", func(t *testing.T) {
|
||||
api := fakeGitAPI{
|
||||
blobSize: func(ctx context.Context, gotOID git.OID) (int64, error) {
|
||||
require.Equal(t, oid, gotOID)
|
||||
return 123, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{api: api}
|
||||
|
||||
sz, ver, err := gbs.resolveBlobSizeForGet(ctx, commit, oid)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, commit.String(), ver)
|
||||
require.Equal(t, int64(123), sz)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGitBlobstoreHelpers_validateAndSizeChunkedParts(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
api := fakeGitAPI{
|
||||
blobSize: func(ctx context.Context, oid git.OID) (int64, error) {
|
||||
switch oid {
|
||||
case "0123456789abcdef0123456789abcdef01234567":
|
||||
return 3, nil
|
||||
case "89abcdef0123456789abcdef0123456789abcdef":
|
||||
return 5, nil
|
||||
default:
|
||||
return 0, errors.New("unexpected oid")
|
||||
}
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{api: api}
|
||||
|
||||
parts, total, err := gbs.validateAndSizeChunkedParts(ctx, []git.TreeEntry{
|
||||
{Name: "0001", Type: git.ObjectTypeBlob, OID: "0123456789abcdef0123456789abcdef01234567"},
|
||||
{Name: "0002", Type: git.ObjectTypeBlob, OID: "89abcdef0123456789abcdef0123456789abcdef"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(8), total)
|
||||
require.Len(t, parts, 2)
|
||||
require.Equal(t, "0123456789abcdef0123456789abcdef01234567", parts[0].oidHex)
|
||||
require.Equal(t, uint64(3), parts[0].size)
|
||||
|
||||
_, _, err = gbs.validateAndSizeChunkedParts(ctx, []git.TreeEntry{{Name: "1", Type: git.ObjectTypeBlob, OID: "0123456789abcdef0123456789abcdef01234567"}})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestGitBlobstoreHelpers_sizeAtCommit(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
commit := git.OID("0123456789abcdef0123456789abcdef01234567")
|
||||
|
||||
t.Run("blob", func(t *testing.T) {
|
||||
api := fakeGitAPI{
|
||||
resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) {
|
||||
require.Equal(t, commit, gotCommit)
|
||||
require.Equal(t, "k", path)
|
||||
return git.OID("89abcdef0123456789abcdef0123456789abcdef"), git.ObjectTypeBlob, nil
|
||||
},
|
||||
blobSize: func(ctx context.Context, gotOID git.OID) (int64, error) {
|
||||
require.Equal(t, git.OID("89abcdef0123456789abcdef0123456789abcdef"), gotOID)
|
||||
return 123, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{api: api}
|
||||
sz, err := gbs.sizeAtCommit(ctx, commit, "k")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(123), sz)
|
||||
})
|
||||
|
||||
t.Run("chunkedTree", func(t *testing.T) {
|
||||
api := fakeGitAPI{
|
||||
resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) {
|
||||
require.Equal(t, commit, gotCommit)
|
||||
require.Equal(t, "k", path)
|
||||
return git.OID("treeoid"), git.ObjectTypeTree, nil
|
||||
},
|
||||
listTree: func(ctx context.Context, gotCommit git.OID, treePath string) ([]git.TreeEntry, error) {
|
||||
require.Equal(t, commit, gotCommit)
|
||||
require.Equal(t, "k", treePath)
|
||||
return []git.TreeEntry{
|
||||
{Name: "0001", Type: git.ObjectTypeBlob, OID: "0123456789abcdef0123456789abcdef01234567"},
|
||||
{Name: "0002", Type: git.ObjectTypeBlob, OID: "89abcdef0123456789abcdef0123456789abcdef"},
|
||||
}, nil
|
||||
},
|
||||
blobSize: func(ctx context.Context, oid git.OID) (int64, error) {
|
||||
switch oid {
|
||||
case "0123456789abcdef0123456789abcdef01234567":
|
||||
return 3, nil
|
||||
case "89abcdef0123456789abcdef0123456789abcdef":
|
||||
return 5, nil
|
||||
default:
|
||||
return 0, errors.New("unexpected oid")
|
||||
}
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{api: api}
|
||||
sz, err := gbs.sizeAtCommit(ctx, commit, "k")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(8), sz)
|
||||
})
|
||||
|
||||
t.Run("notFound", func(t *testing.T) {
|
||||
api := fakeGitAPI{
|
||||
resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) {
|
||||
return git.OID(""), git.ObjectTypeUnknown, &git.PathNotFoundError{Commit: gotCommit.String(), Path: path}
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{api: api}
|
||||
_, err := gbs.sizeAtCommit(ctx, commit, "missing")
|
||||
var nf NotFound
|
||||
require.ErrorAs(t, err, &nf)
|
||||
require.Equal(t, "missing", nf.Key)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGitBlobstoreHelpers_totalSizeAtCommit_overflowInt64(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
commit := git.OID("0123456789abcdef0123456789abcdef01234567")
|
||||
|
||||
api := fakeGitAPI{
|
||||
resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) {
|
||||
return git.OID(path + "_oid"), git.ObjectTypeBlob, nil
|
||||
},
|
||||
blobSize: func(ctx context.Context, gotOID git.OID) (int64, error) {
|
||||
// Make the total exceed int64 max with two sources.
|
||||
if gotOID == "a_oid" {
|
||||
return int64(^uint64(0) >> 1), nil // math.MaxInt64 without importing math
|
||||
}
|
||||
return 1, nil
|
||||
},
|
||||
}
|
||||
gbs := &GitBlobstore{api: api}
|
||||
_, err := gbs.totalSizeAtCommit(ctx, commit, []string{"a", "b"})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestConcatReadCloser(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
closed := map[string]int{}
|
||||
opened := map[string]int{}
|
||||
|
||||
mk := func(s string) io.ReadCloser {
|
||||
r := bytes.NewReader([]byte(s))
|
||||
return &trackedReadCloser{
|
||||
r: r,
|
||||
onClose: func() {
|
||||
closed[s]++
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
crc := &concatReadCloser{
|
||||
ctx: ctx,
|
||||
keys: []string{"a", "b"},
|
||||
open: func(ctx context.Context, key string) (io.ReadCloser, error) {
|
||||
opened[key]++
|
||||
if key == "a" {
|
||||
return mk("hi"), nil
|
||||
}
|
||||
return mk("there"), nil
|
||||
},
|
||||
}
|
||||
|
||||
out, err := io.ReadAll(crc)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "hithere", string(out))
|
||||
require.NoError(t, crc.Close())
|
||||
require.Equal(t, 1, opened["a"])
|
||||
require.Equal(t, 1, opened["b"])
|
||||
require.Equal(t, 1, closed["hi"])
|
||||
require.Equal(t, 1, closed["there"])
|
||||
}
|
||||
|
||||
func TestConcatReadCloser_CloseEarlyClosesCurrent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
closed := map[string]int{}
|
||||
opened := map[string]int{}
|
||||
|
||||
mk := func(id string, s string) io.ReadCloser {
|
||||
r := bytes.NewReader([]byte(s))
|
||||
return &trackedReadCloser{
|
||||
r: r,
|
||||
onClose: func() {
|
||||
closed[id]++
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
crc := &concatReadCloser{
|
||||
ctx: ctx,
|
||||
keys: []string{"a", "b"},
|
||||
open: func(ctx context.Context, key string) (io.ReadCloser, error) {
|
||||
opened[key]++
|
||||
if key == "a" {
|
||||
return mk("a", "hello"), nil
|
||||
}
|
||||
return mk("b", "world"), nil
|
||||
},
|
||||
}
|
||||
|
||||
buf := make([]byte, 1)
|
||||
n, err := crc.Read(buf)
|
||||
require.Equal(t, 1, n)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, crc.Close())
|
||||
require.Equal(t, 1, opened["a"])
|
||||
require.Equal(t, 0, opened["b"], "expected not to open second reader when closing early")
|
||||
require.Equal(t, 1, closed["a"])
|
||||
require.Equal(t, 0, closed["b"])
|
||||
}
|
||||
|
||||
type trackedReadCloser struct {
|
||||
r io.Reader
|
||||
onClose func()
|
||||
}
|
||||
|
||||
func (t *trackedReadCloser) Read(p []byte) (int, error) { return t.r.Read(p) }
|
||||
func (t *trackedReadCloser) Close() error {
|
||||
if t.onClose != nil {
|
||||
t.onClose()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
124
go/store/blobstore/git_blobstore_multipart_test.go
Normal file
124
go/store/blobstore/git_blobstore_multipart_test.go
Normal file
@@ -0,0 +1,124 @@
|
||||
// Copyright 2026 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package blobstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
|
||||
)
|
||||
|
||||
type trackingReadCloser struct {
|
||||
io.Reader
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (t *trackingReadCloser) Close() error {
|
||||
t.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestMultiPartReadCloser_ReadConcatenatesAcrossPartsWithOffsets(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
oid1 := "0123456789abcdef0123456789abcdef01234567"
|
||||
oid2 := "89abcdef0123456789abcdef0123456789abcdef"
|
||||
|
||||
blobs := map[string][]byte{
|
||||
oid1: []byte("hello"),
|
||||
oid2: []byte("world!"),
|
||||
}
|
||||
|
||||
api := fakeGitAPI{
|
||||
blobReader: func(ctx context.Context, oid git.OID) (io.ReadCloser, error) {
|
||||
b, ok := blobs[oid.String()]
|
||||
require.True(t, ok, "unexpected oid %s", oid.String())
|
||||
return io.NopCloser(bytes.NewReader(b)), nil
|
||||
},
|
||||
}
|
||||
|
||||
rc := &multiPartReadCloser{
|
||||
ctx: ctx,
|
||||
api: api,
|
||||
slices: []chunkPartSlice{
|
||||
{oidHex: oid1, offset: 1, length: 3}, // "ell"
|
||||
{oidHex: oid2, offset: 2, length: 3}, // "rld"
|
||||
},
|
||||
}
|
||||
defer func() { _ = rc.Close() }()
|
||||
|
||||
got, err := io.ReadAll(rc)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("ellrld"), got)
|
||||
}
|
||||
|
||||
func TestMultiPartReadCloser_ReadUnexpectedEOFWhenPartShorterThanDeclared(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
oid := "0123456789abcdef0123456789abcdef01234567"
|
||||
api := fakeGitAPI{
|
||||
blobReader: func(ctx context.Context, oid git.OID) (io.ReadCloser, error) {
|
||||
return io.NopCloser(bytes.NewReader([]byte("hi"))), nil // 2 bytes
|
||||
},
|
||||
}
|
||||
|
||||
rc := &multiPartReadCloser{
|
||||
ctx: ctx,
|
||||
api: api,
|
||||
slices: []chunkPartSlice{
|
||||
{oidHex: oid, offset: 0, length: 3}, // expect 3 bytes, only 2 available
|
||||
},
|
||||
}
|
||||
defer func() { _ = rc.Close() }()
|
||||
|
||||
_, err := io.ReadAll(rc)
|
||||
require.Error(t, err)
|
||||
require.True(t, errors.Is(err, io.ErrUnexpectedEOF))
|
||||
}
|
||||
|
||||
func TestMultiPartReadCloser_CloseClosesUnderlyingPartReader(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
oid := "0123456789abcdef0123456789abcdef01234567"
|
||||
underlying := &trackingReadCloser{Reader: bytes.NewReader([]byte("hello"))}
|
||||
|
||||
api := fakeGitAPI{
|
||||
blobReader: func(ctx context.Context, oid git.OID) (io.ReadCloser, error) {
|
||||
return underlying, nil
|
||||
},
|
||||
}
|
||||
|
||||
rc := &multiPartReadCloser{
|
||||
ctx: ctx,
|
||||
api: api,
|
||||
slices: []chunkPartSlice{
|
||||
{oidHex: oid, offset: 0, length: 1},
|
||||
},
|
||||
}
|
||||
|
||||
// Force the underlying reader to be opened.
|
||||
buf := make([]byte, 1)
|
||||
_, err := rc.Read(buf)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, rc.Close())
|
||||
require.True(t, underlying.closed)
|
||||
}
|
||||
@@ -85,6 +85,12 @@ func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) {
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
manifestOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "manifest")
|
||||
require.NoError(t, err)
|
||||
|
||||
ok, err := bs.Exists(ctx, "manifest")
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
@@ -100,14 +106,14 @@ func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) {
|
||||
|
||||
got, ver, err := GetBytes(ctx, bs, "manifest", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, commit, ver)
|
||||
require.Equal(t, manifestOID.String(), ver)
|
||||
require.Equal(t, want, got)
|
||||
|
||||
// Validate size + version on Get.
|
||||
rc, sz, ver2, err := bs.Get(ctx, "manifest", NewBlobRange(0, 5))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(len(want)), sz)
|
||||
require.Equal(t, commit, ver2)
|
||||
require.Equal(t, manifestOID.String(), ver2)
|
||||
_ = rc.Close()
|
||||
}
|
||||
|
||||
@@ -149,34 +155,40 @@ func TestGitBlobstore_BlobRangeSemantics(t *testing.T) {
|
||||
bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
rangeOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "range")
|
||||
require.NoError(t, err)
|
||||
|
||||
// full range
|
||||
got, ver, err := GetBytes(ctx, bs, "range", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, commit, ver)
|
||||
require.Equal(t, rangeOID.String(), ver)
|
||||
require.Equal(t, rangeData(0, maxValue), got)
|
||||
|
||||
// first 2048 bytes (1024 shorts)
|
||||
got, ver, err = GetBytes(ctx, bs, "range", NewBlobRange(0, 2048))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, commit, ver)
|
||||
require.Equal(t, rangeOID.String(), ver)
|
||||
require.Equal(t, rangeData(0, 1024), got)
|
||||
|
||||
// bytes 2048..4096 of original
|
||||
got, ver, err = GetBytes(ctx, bs, "range", NewBlobRange(2*1024, 2*1024))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, commit, ver)
|
||||
require.Equal(t, rangeOID.String(), ver)
|
||||
require.Equal(t, rangeData(1024, 2048), got)
|
||||
|
||||
// last 2048 bytes
|
||||
got, ver, err = GetBytes(ctx, bs, "range", NewBlobRange(-2*1024, 0))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, commit, ver)
|
||||
require.Equal(t, rangeOID.String(), ver)
|
||||
require.Equal(t, rangeData(maxValue-1024, maxValue), got)
|
||||
|
||||
// tail slice: beginning 2048 bytes from end, size 512
|
||||
got, ver, err = GetBytes(ctx, bs, "range", NewBlobRange(-2*1024, 512))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, commit, ver)
|
||||
require.Equal(t, rangeOID.String(), ver)
|
||||
require.Equal(t, rangeData(maxValue-1024, maxValue-768), got)
|
||||
}
|
||||
|
||||
@@ -243,7 +255,148 @@ func TestGitBlobstore_Put_RoundTripAndVersion(t *testing.T) {
|
||||
require.Equal(t, want, got)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Put_Overwrite(t *testing.T) {
|
||||
func TestGitBlobstore_Concatenate_Basic(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = PutBytes(ctx, bs, "a", []byte("hi "))
|
||||
require.NoError(t, err)
|
||||
_, err = PutBytes(ctx, bs, "b", []byte("there"))
|
||||
require.NoError(t, err)
|
||||
|
||||
ver, err := bs.Concatenate(ctx, "c", []string{"a", "b"})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver)
|
||||
|
||||
got, ver2, err := GetBytes(ctx, bs, "c", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver, ver2)
|
||||
require.Equal(t, []byte("hi there"), got)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{
|
||||
Identity: testIdentity(),
|
||||
MaxPartSize: 1024,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
a := bytes.Repeat([]byte("a"), 700)
|
||||
b := bytes.Repeat([]byte("b"), 700)
|
||||
want := append(append([]byte(nil), a...), b...)
|
||||
|
||||
_, err = PutBytes(ctx, bs, "a", a)
|
||||
require.NoError(t, err)
|
||||
_, err = PutBytes(ctx, bs, "b", b)
|
||||
require.NoError(t, err)
|
||||
|
||||
ver, err := bs.Concatenate(ctx, "c", []string{"a", "b"})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver)
|
||||
|
||||
// Verify the resulting key is stored as a chunked tree (not a single blob).
|
||||
head, ok, err := bs.api.TryResolveRefCommit(ctx, DoltDataRef)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
oid, typ, err := bs.api.ResolvePathObject(ctx, head, "c")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, git.ObjectTypeTree, typ)
|
||||
require.Equal(t, oid.String(), ver)
|
||||
|
||||
parts, err := bs.api.ListTree(ctx, head, "c")
|
||||
require.NoError(t, err)
|
||||
require.GreaterOrEqual(t, len(parts), 2)
|
||||
require.Equal(t, "00000001", parts[0].Name)
|
||||
|
||||
got, ver2, err := GetBytes(ctx, bs, "c", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver, ver2)
|
||||
require.Equal(t, want, got)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Concatenate_KeyExistsFastSucceeds(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
ver1, err := PutBytes(ctx, bs, "c", []byte("original"))
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver1)
|
||||
|
||||
_, err = PutBytes(ctx, bs, "a", []byte("new "))
|
||||
require.NoError(t, err)
|
||||
_, err = PutBytes(ctx, bs, "b", []byte("value"))
|
||||
require.NoError(t, err)
|
||||
|
||||
ver2, err := bs.Concatenate(ctx, "c", []string{"a", "b"})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver1, ver2, "expected concatenate to fast-succeed without overwriting existing key")
|
||||
|
||||
got, ver3, err := GetBytes(ctx, bs, "c", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver1, ver3)
|
||||
require.Equal(t, []byte("original"), got)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Concatenate_MissingSourceIsNotFound(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = PutBytes(ctx, bs, "present", []byte("x"))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = bs.Concatenate(ctx, "c", []string{"present", "missing"})
|
||||
require.Error(t, err)
|
||||
require.True(t, IsNotFoundError(err))
|
||||
var nf NotFound
|
||||
require.ErrorAs(t, err, &nf)
|
||||
require.Equal(t, "missing", nf.Key)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Concatenate_EmptySourcesErrors(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git")
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = bs.Concatenate(ctx, "c", nil)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
type putShouldNotRead struct{}
|
||||
|
||||
func (putShouldNotRead) Read(_ []byte) (int, error) {
|
||||
return 0, errors.New("read should not be called")
|
||||
}
|
||||
|
||||
func TestGitBlobstore_Put_IdempotentIfKeyExists(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
@@ -257,15 +410,14 @@ func TestGitBlobstore_Put_Overwrite(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver1)
|
||||
|
||||
ver2, err := PutBytes(ctx, bs, "k", []byte("v2\n"))
|
||||
ver2, err := bs.Put(ctx, "k", 3, putShouldNotRead{})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver2)
|
||||
require.NotEqual(t, ver1, ver2)
|
||||
require.Equal(t, ver1, ver2)
|
||||
|
||||
got, ver3, err := GetBytes(ctx, bs, "k", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver2, ver3)
|
||||
require.Equal(t, []byte("v2\n"), got)
|
||||
require.Equal(t, ver1, ver3)
|
||||
require.Equal(t, []byte("v1\n"), got)
|
||||
}
|
||||
|
||||
type hookGitAPI struct {
|
||||
@@ -431,8 +583,14 @@ func TestGitBlobstore_CheckAndPut_MismatchDoesNotRead(t *testing.T) {
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k")
|
||||
require.NoError(t, err)
|
||||
|
||||
r := &failReader{}
|
||||
_, err = bs.CheckAndPut(ctx, commit+"-wrong", "k", 1, r)
|
||||
_, err = bs.CheckAndPut(ctx, keyOID.String()+"-wrong", "k", 1, r)
|
||||
require.Error(t, err)
|
||||
require.True(t, IsCheckAndPutError(err))
|
||||
require.False(t, r.called.Load(), "expected reader not to be consumed on version mismatch")
|
||||
@@ -454,11 +612,17 @@ func TestGitBlobstore_CheckAndPut_UpdateSuccess(t *testing.T) {
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k")
|
||||
require.NoError(t, err)
|
||||
|
||||
want := []byte("updated\n")
|
||||
ver2, err := bs.CheckAndPut(ctx, commit, "k", int64(len(want)), bytes.NewReader(want))
|
||||
ver2, err := bs.CheckAndPut(ctx, keyOID.String(), "k", int64(len(want)), bytes.NewReader(want))
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, ver2)
|
||||
require.NotEqual(t, commit, ver2)
|
||||
require.NotEqual(t, keyOID.String(), ver2)
|
||||
|
||||
got, ver3, err := GetBytes(ctx, bs, "k", AllRange)
|
||||
require.NoError(t, err)
|
||||
@@ -470,7 +634,7 @@ func TestGitBlobstore_CheckAndPut_UpdateSuccess(t *testing.T) {
|
||||
require.Equal(t, []byte("keep\n"), got)
|
||||
}
|
||||
|
||||
func TestGitBlobstore_CheckAndPut_ConcurrentUpdateReturnsMismatch(t *testing.T) {
|
||||
func TestGitBlobstore_CheckAndPut_ConcurrentUnrelatedUpdateStillSucceeds(t *testing.T) {
|
||||
requireGitOnPath(t)
|
||||
|
||||
ctx := context.Background()
|
||||
@@ -485,6 +649,12 @@ func TestGitBlobstore_CheckAndPut_ConcurrentUpdateReturnsMismatch(t *testing.T)
|
||||
bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity())
|
||||
require.NoError(t, err)
|
||||
|
||||
runner, err := git.NewRunner(repo.GitDir)
|
||||
require.NoError(t, err)
|
||||
api := git.NewGitAPIImpl(runner)
|
||||
keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k")
|
||||
require.NoError(t, err)
|
||||
|
||||
origAPI := bs.api
|
||||
h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef}
|
||||
h.onFirstCAS = func(ctx context.Context, old git.OID) {
|
||||
@@ -493,12 +663,17 @@ func TestGitBlobstore_CheckAndPut_ConcurrentUpdateReturnsMismatch(t *testing.T)
|
||||
}
|
||||
bs.api = h
|
||||
|
||||
_, err = bs.CheckAndPut(ctx, commit, "k", 0, bytes.NewReader([]byte("mine\n")))
|
||||
require.Error(t, err)
|
||||
require.True(t, IsCheckAndPutError(err))
|
||||
|
||||
// Verify key did not change, since our CAS should have failed.
|
||||
got, _, err := GetBytes(ctx, bs, "k", AllRange)
|
||||
ver2, err := bs.CheckAndPut(ctx, keyOID.String(), "k", 0, bytes.NewReader([]byte("mine\n")))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("base\n"), got)
|
||||
require.NotEmpty(t, ver2)
|
||||
require.NotEqual(t, keyOID.String(), ver2)
|
||||
|
||||
got, ver3, err := GetBytes(ctx, bs, "k", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ver2, ver3)
|
||||
require.Equal(t, []byte("mine\n"), got)
|
||||
|
||||
got, _, err = GetBytes(ctx, bs, "external", AllRange)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("external\n"), got)
|
||||
}
|
||||
|
||||
@@ -19,6 +19,17 @@ import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// ObjectType is a git object type returned by plumbing (e.g. "blob", "tree").
|
||||
type ObjectType string
|
||||
|
||||
const (
|
||||
ObjectTypeUnknown ObjectType = ""
|
||||
ObjectTypeBlob ObjectType = "blob"
|
||||
ObjectTypeTree ObjectType = "tree"
|
||||
ObjectTypeCommit ObjectType = "commit"
|
||||
ObjectTypeTag ObjectType = "tag"
|
||||
)
|
||||
|
||||
// GitAPI defines the git plumbing operations needed by GitBlobstore. It includes both
|
||||
// read and write operations to allow swapping implementations (e.g. git CLI vs a Go git
|
||||
// library) while keeping callers stable.
|
||||
@@ -34,6 +45,16 @@ type GitAPI interface {
|
||||
// resolves to a non-blob object.
|
||||
ResolvePathBlob(ctx context.Context, commit OID, path string) (OID, error)
|
||||
|
||||
// ResolvePathObject resolves |path| within |commit| to an object OID and type.
|
||||
// It returns PathNotFoundError if the path does not exist.
|
||||
ResolvePathObject(ctx context.Context, commit OID, path string) (oid OID, typ ObjectType, err error)
|
||||
|
||||
// ListTree lists the entries of the tree at |treePath| within |commit|.
|
||||
// The listing is non-recursive: it returns only immediate children.
|
||||
//
|
||||
// It returns PathNotFoundError if |treePath| does not exist.
|
||||
ListTree(ctx context.Context, commit OID, treePath string) ([]TreeEntry, error)
|
||||
|
||||
// CatFileType returns the git object type for |oid| (e.g. "blob", "tree", "commit").
|
||||
CatFileType(ctx context.Context, oid OID) (string, error)
|
||||
|
||||
@@ -63,6 +84,11 @@ type GitAPI interface {
|
||||
// GIT_DIR=... GIT_INDEX_FILE=<indexFile> git update-index --add --cacheinfo <mode> <oid> <path>
|
||||
UpdateIndexCacheInfo(ctx context.Context, indexFile string, mode string, oid OID, path string) error
|
||||
|
||||
// RemoveIndexPaths removes |paths| from |indexFile| if present.
|
||||
// Equivalent plumbing:
|
||||
// GIT_DIR=... GIT_INDEX_FILE=<indexFile> git update-index --remove -z --stdin
|
||||
RemoveIndexPaths(ctx context.Context, indexFile string, paths []string) error
|
||||
|
||||
// WriteTree writes a tree object from the contents of |indexFile| and returns its oid.
|
||||
// Equivalent plumbing:
|
||||
// GIT_DIR=... GIT_INDEX_FILE=<indexFile> git write-tree
|
||||
@@ -84,6 +110,14 @@ type GitAPI interface {
|
||||
UpdateRef(ctx context.Context, ref string, newOID OID, msg string) error
|
||||
}
|
||||
|
||||
// TreeEntry describes one entry in a git tree listing.
|
||||
type TreeEntry struct {
|
||||
Mode string
|
||||
Type ObjectType
|
||||
OID OID
|
||||
Name string
|
||||
}
|
||||
|
||||
// Identity represents git author/committer metadata. A future implementation may set
|
||||
// this via environment variables (GIT_AUTHOR_NAME, etc.).
|
||||
type Identity struct {
|
||||
|
||||
@@ -88,6 +88,62 @@ func (a *GitAPIImpl) ResolvePathBlob(ctx context.Context, commit OID, path strin
|
||||
return OID(oid), nil
|
||||
}
|
||||
|
||||
func (a *GitAPIImpl) ResolvePathObject(ctx context.Context, commit OID, path string) (oid OID, typ ObjectType, err error) {
|
||||
spec := commit.String() + ":" + path
|
||||
out, err := a.r.Run(ctx, RunOptions{}, "rev-parse", "--verify", spec)
|
||||
if err != nil {
|
||||
if isPathNotFoundErr(err) {
|
||||
return "", ObjectTypeUnknown, &PathNotFoundError{Commit: commit.String(), Path: path}
|
||||
}
|
||||
return "", ObjectTypeUnknown, err
|
||||
}
|
||||
oidStr := strings.TrimSpace(string(out))
|
||||
if oidStr == "" {
|
||||
return "", ObjectTypeUnknown, fmt.Errorf("git rev-parse returned empty oid for %q", spec)
|
||||
}
|
||||
|
||||
typStr, err := a.CatFileType(ctx, OID(oidStr))
|
||||
if err != nil {
|
||||
return "", ObjectTypeUnknown, err
|
||||
}
|
||||
return OID(oidStr), ObjectType(typStr), nil
|
||||
}
|
||||
|
||||
func (a *GitAPIImpl) ListTree(ctx context.Context, commit OID, treePath string) ([]TreeEntry, error) {
|
||||
// Note: `git ls-tree <tree-ish>` accepts a tree-ish of the form "<commit>:<path>".
|
||||
// Use that to list children of a tree path without needing to pre-resolve the tree OID.
|
||||
spec := commit.String()
|
||||
if treePath != "" {
|
||||
spec = spec + ":" + treePath
|
||||
} else {
|
||||
spec = spec + "^{tree}"
|
||||
}
|
||||
|
||||
out, err := a.r.Run(ctx, RunOptions{}, "ls-tree", spec)
|
||||
if err != nil {
|
||||
if isPathNotFoundErr(err) && treePath != "" {
|
||||
return nil, &PathNotFoundError{Commit: commit.String(), Path: treePath}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
lines := strings.Split(strings.TrimRight(string(out), "\n"), "\n")
|
||||
if len(lines) == 1 && strings.TrimSpace(lines[0]) == "" {
|
||||
return nil, nil
|
||||
}
|
||||
entries := make([]TreeEntry, 0, len(lines))
|
||||
for _, line := range lines {
|
||||
if strings.TrimSpace(line) == "" {
|
||||
continue
|
||||
}
|
||||
e, err := parseLsTreeLine(line)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
entries = append(entries, e)
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
func (a *GitAPIImpl) CatFileType(ctx context.Context, oid OID) (string, error) {
|
||||
out, err := a.r.Run(ctx, RunOptions{}, "cat-file", "-t", oid.String())
|
||||
if err != nil {
|
||||
@@ -141,6 +197,26 @@ func (a *GitAPIImpl) UpdateIndexCacheInfo(ctx context.Context, indexFile string,
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *GitAPIImpl) RemoveIndexPaths(ctx context.Context, indexFile string, paths []string) error {
|
||||
if len(paths) == 0 {
|
||||
return nil
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
// `git update-index --remove` is about removing *missing worktree files*, and requires a worktree.
|
||||
// For bare repos / index-only workflows, use `--index-info` to remove paths by writing mode "0".
|
||||
//
|
||||
// Format:
|
||||
// <mode> <object> <stage>\t<path>\n
|
||||
// To remove:
|
||||
// 0 0000000000000000000000000000000000000000 0\t<path>\n
|
||||
const zeroOID = "0000000000000000000000000000000000000000"
|
||||
for _, p := range paths {
|
||||
fmt.Fprintf(&buf, "0 %s 0\t%s\n", zeroOID, p)
|
||||
}
|
||||
_, err := a.r.Run(ctx, RunOptions{IndexFile: indexFile, Stdin: &buf}, "update-index", "--index-info")
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *GitAPIImpl) WriteTree(ctx context.Context, indexFile string) (OID, error) {
|
||||
out, err := a.r.Run(ctx, RunOptions{IndexFile: indexFile}, "write-tree")
|
||||
if err != nil {
|
||||
@@ -243,3 +319,24 @@ func isPathNotFoundErr(err error) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func parseLsTreeLine(line string) (TreeEntry, error) {
|
||||
// Format (one entry):
|
||||
// <mode> SP <type> SP <oid>\t<name>
|
||||
// Example:
|
||||
// 100644 blob e69de29bb2d1d6434b8b29ae775ad8c2e48c5391\tfile.txt
|
||||
parts := strings.SplitN(line, "\t", 2)
|
||||
if len(parts) != 2 {
|
||||
return TreeEntry{}, fmt.Errorf("git ls-tree: malformed line %q", line)
|
||||
}
|
||||
left := strings.Fields(parts[0])
|
||||
if len(left) != 3 {
|
||||
return TreeEntry{}, fmt.Errorf("git ls-tree: malformed line %q", line)
|
||||
}
|
||||
return TreeEntry{
|
||||
Mode: left[0],
|
||||
Type: ObjectType(left[1]),
|
||||
OID: OID(left[2]),
|
||||
Name: parts[1],
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -366,6 +366,216 @@ func TestGitAPIImpl_UpdateIndexCacheInfo_FileDirectoryConflictErrors(t *testing.
|
||||
}
|
||||
}
|
||||
|
||||
func TestGitAPIImpl_ResolvePathObject_BlobAndTree(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
blobOID, err := api.HashObject(ctx, bytes.NewReader([]byte("hi\n")))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, "dir/file.txt"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
treeOID, err := api.WriteTree(ctx, indexFile)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
gotOID, gotTyp, err := api.ResolvePathObject(ctx, commitOID, "dir/file.txt")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if gotTyp != ObjectTypeBlob {
|
||||
t.Fatalf("expected type blob, got %q", gotTyp)
|
||||
}
|
||||
if gotOID != blobOID {
|
||||
t.Fatalf("expected oid %q, got %q", blobOID, gotOID)
|
||||
}
|
||||
|
||||
_, gotTyp, err = api.ResolvePathObject(ctx, commitOID, "dir")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if gotTyp != ObjectTypeTree {
|
||||
t.Fatalf("expected type tree, got %q", gotTyp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGitAPIImpl_ListTree_NonRecursive(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
oidA, err := api.HashObject(ctx, bytes.NewReader([]byte("a\n")))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
oidB, err := api.HashObject(ctx, bytes.NewReader([]byte("b\n")))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
oidX, err := api.HashObject(ctx, bytes.NewReader([]byte("x\n")))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidA, "dir/a.txt"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidB, "dir/b.txt"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidX, "dir/sub/x.txt"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
treeOID, err := api.WriteTree(ctx, indexFile)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
entries, err := api.ListTree(ctx, commitOID, "dir")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Expect: a.txt (blob), b.txt (blob), sub (tree)
|
||||
if len(entries) != 3 {
|
||||
t.Fatalf("expected 3 entries, got %d: %+v", len(entries), entries)
|
||||
}
|
||||
|
||||
var gotA, gotB, gotSub bool
|
||||
for _, e := range entries {
|
||||
switch e.Name {
|
||||
case "a.txt":
|
||||
gotA = true
|
||||
if e.Type != ObjectTypeBlob || e.OID != oidA {
|
||||
t.Fatalf("unexpected a.txt entry: %+v", e)
|
||||
}
|
||||
case "b.txt":
|
||||
gotB = true
|
||||
if e.Type != ObjectTypeBlob || e.OID != oidB {
|
||||
t.Fatalf("unexpected b.txt entry: %+v", e)
|
||||
}
|
||||
case "sub":
|
||||
gotSub = true
|
||||
if e.Type != ObjectTypeTree || e.OID == "" {
|
||||
t.Fatalf("unexpected sub entry: %+v", e)
|
||||
}
|
||||
default:
|
||||
t.Fatalf("unexpected entry: %+v", e)
|
||||
}
|
||||
}
|
||||
if !gotA || !gotB || !gotSub {
|
||||
t.Fatalf("missing expected entries: gotA=%v gotB=%v gotSub=%v", gotA, gotB, gotSub)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGitAPIImpl_RemoveIndexPaths_RemovesFromIndex(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
repo, err := gitrepo.InitBareTemp(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewRunner(repo.GitDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
api := NewGitAPIImpl(r)
|
||||
|
||||
indexFile := tempIndexFile(t)
|
||||
if err := api.ReadTreeEmpty(ctx, indexFile); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
oidA, err := api.HashObject(ctx, bytes.NewReader([]byte("a\n")))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
oidB, err := api.HashObject(ctx, bytes.NewReader([]byte("b\n")))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidA, "a.txt"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidB, "b.txt"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := api.RemoveIndexPaths(ctx, indexFile, []string{"a.txt"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
treeOID, err := api.WriteTree(ctx, indexFile)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed", testAuthor())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// a.txt removed, b.txt still present
|
||||
_, err = api.ResolvePathBlob(ctx, commitOID, "a.txt")
|
||||
if err == nil {
|
||||
t.Fatalf("expected a.txt missing")
|
||||
}
|
||||
var pnf *PathNotFoundError
|
||||
if !errors.As(err, &pnf) {
|
||||
t.Fatalf("expected PathNotFoundError, got %T: %v", err, err)
|
||||
}
|
||||
|
||||
gotB, err := api.ResolvePathBlob(ctx, commitOID, "b.txt")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if gotB != oidB {
|
||||
t.Fatalf("expected b.txt oid %q, got %q", oidB, gotB)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGitAPIImpl_ReadTree_PreservesExistingPaths(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
@@ -85,13 +85,22 @@ func TestGitBlobstoreReadSmoke_ManifestAndTableAccessPatterns(t *testing.T) {
|
||||
rc, totalSz, ver, err := bs.Get(ctx, "table", blobstore.NewBlobRange(-tailN, 0))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(len(table)), totalSz)
|
||||
require.Equal(t, commit, ver)
|
||||
require.NotEmpty(t, ver)
|
||||
tail := make([]byte, tailN)
|
||||
_, err = io.ReadFull(rc, tail)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, rc.Close())
|
||||
require.Equal(t, table[len(table)-tailN:], tail)
|
||||
|
||||
// Per-key version should be stable across reads.
|
||||
rc2, _, ver2, err := bs.Get(ctx, "table", blobstore.AllRange)
|
||||
require.NoError(t, err)
|
||||
// Drain before close to avoid broken-pipe errors from killing git early.
|
||||
_, err = io.Copy(io.Discard, rc2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, rc2.Close())
|
||||
require.Equal(t, ver, ver2)
|
||||
|
||||
// 3) ReadAt-style ranged reads used by table readers.
|
||||
tr := &bsTableReaderAt{bs: bs, key: "table"}
|
||||
out := make([]byte, 4096)
|
||||
|
||||
134
integration-tests/go-sql-server-driver/concurrent_writes_test.go
Normal file
134
integration-tests/go-sql-server-driver/concurrent_writes_test.go
Normal file
@@ -0,0 +1,134 @@
|
||||
// Copyright 2026 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver"
|
||||
)
|
||||
|
||||
// TestConcurrentWrites verifies concurrent write behavior and transaction locking in the SQL server driver.
|
||||
func TestConcurrentWrites(t *testing.T) {
|
||||
t.Parallel()
|
||||
var ports DynamicResources
|
||||
ports.global = &GlobalPorts
|
||||
ports.t = t
|
||||
u, err := driver.NewDoltUser()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
u.Cleanup()
|
||||
})
|
||||
|
||||
rs, err := u.MakeRepoStore()
|
||||
require.NoError(t, err)
|
||||
|
||||
repo, err := rs.MakeRepo("concurrent_writes_test")
|
||||
require.NoError(t, err)
|
||||
|
||||
srvSettings := &driver.Server{
|
||||
Args: []string{"-P", `{{get_port "server_port"}}`},
|
||||
DynamicPort: "server_port",
|
||||
}
|
||||
server := MakeServer(t, repo, srvSettings, &ports)
|
||||
server.DBName = "concurrent_writes_test"
|
||||
|
||||
db, err := server.DB(driver.Connection{User: "root"})
|
||||
require.NoError(t, err)
|
||||
db.SetMaxIdleConns(0)
|
||||
defer func() {
|
||||
require.NoError(t, db.Close())
|
||||
}()
|
||||
ctx := t.Context()
|
||||
func() {
|
||||
conn, err := db.Conn(ctx)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
// Create table and initial data.
|
||||
_, err = conn.ExecContext(ctx, "CREATE TABLE data (id VARCHAR(64) PRIMARY KEY, worker INT, data TEXT, created_at TIMESTAMP)")
|
||||
require.NoError(t, err)
|
||||
_, err = conn.ExecContext(ctx, "CALL DOLT_COMMIT('-Am', 'init with table')")
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
start := time.Now()
|
||||
|
||||
nextInt := uint32(0)
|
||||
const numWriters = 32
|
||||
const testDuration = 8 * time.Second
|
||||
startCh := make(chan struct{})
|
||||
for i := range numWriters {
|
||||
eg.Go(func() error {
|
||||
select {
|
||||
case <-startCh:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
db, err := server.DB(driver.Connection{User: "root"})
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
db.SetMaxOpenConns(1)
|
||||
conn, err := db.Conn(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
j := 0
|
||||
for {
|
||||
if time.Since(start) > testDuration {
|
||||
return nil
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
key := fmt.Sprintf("main-%d-%d", i, j)
|
||||
_, err := conn.ExecContext(ctx, "INSERT INTO data VALUES (?,?,?,?)", key, i, key, time.Now())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
atomic.AddUint32(&nextInt, 1)
|
||||
_, err = conn.ExecContext(ctx, fmt.Sprintf("CALL DOLT_COMMIT('-Am', 'insert %s')", key))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
j += 1
|
||||
}
|
||||
})
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
close(startCh)
|
||||
require.NoError(t, eg.Wait())
|
||||
t.Logf("wrote %d", nextInt)
|
||||
ctx = t.Context()
|
||||
conn, err := db.Conn(ctx)
|
||||
require.NoError(t, err)
|
||||
defer func () {
|
||||
require.NoError(t, conn.Close())
|
||||
}()
|
||||
var i int
|
||||
err = conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM data").Scan(&i)
|
||||
require.NoError(t, err)
|
||||
t.Logf("read %d", i)
|
||||
err = conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM dolt_log").Scan(&i)
|
||||
require.NoError(t, err)
|
||||
t.Logf("created %d commits", i)
|
||||
}
|
||||
@@ -15,11 +15,6 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
// "context"
|
||||
// "database/sql"
|
||||
// sqldriver "database/sql/driver"
|
||||
// "fmt"
|
||||
// "strings"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
@@ -29,14 +24,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
// "time"
|
||||
|
||||
// "github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
// "golang.org/x/sync/errgroup"
|
||||
|
||||
driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver"
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user