diff --git a/.gitignore b/.gitignore index 1e6b24365d..251f66cd75 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,5 @@ CLAUDE.md .beads .gitattributes +.de/ +AGENTS.md diff --git a/go/go.mod b/go/go.mod index 9c3b81da1b..fb5804552e 100644 --- a/go/go.mod +++ b/go/go.mod @@ -61,7 +61,7 @@ require ( github.com/dolthub/dolt-mcp v0.2.2 github.com/dolthub/eventsapi_schema v0.0.0-20260205214132-a7a3c84c84a1 github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2 - github.com/dolthub/go-mysql-server v0.20.1-0.20260210000147-1ce36a7d1e8f + github.com/dolthub/go-mysql-server v0.20.1-0.20260210005347-46fe127d0460 github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 github.com/edsrzf/mmap-go v1.2.0 github.com/esote/minmaxheap v1.0.0 diff --git a/go/go.sum b/go/go.sum index 014d12fafd..4c96b0eee2 100644 --- a/go/go.sum +++ b/go/go.sum @@ -196,8 +196,8 @@ github.com/dolthub/fslock v0.0.0-20251215194149-ef20baba2318 h1:n+vdH5G5Db+1qnDC github.com/dolthub/fslock v0.0.0-20251215194149-ef20baba2318/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0= github.com/dolthub/go-icu-regex v0.0.0-20250916051405-78a38d478790 h1:zxMsH7RLiG+dlZ/y0LgJHTV26XoiSJcuWq+em6t6VVc= github.com/dolthub/go-icu-regex v0.0.0-20250916051405-78a38d478790/go.mod h1:F3cnm+vMRK1HaU6+rNqQrOCyR03HHhR1GWG2gnPOqaE= -github.com/dolthub/go-mysql-server v0.20.1-0.20260210000147-1ce36a7d1e8f h1:1XL5lO5pbL6xomeC5DBzfT9pUoDZGpd9809TFsDiEWY= -github.com/dolthub/go-mysql-server v0.20.1-0.20260210000147-1ce36a7d1e8f/go.mod h1:LEWdXw6LKjdonOv2X808RpUc8wZVtQx4ZEPvmDWkvY4= +github.com/dolthub/go-mysql-server v0.20.1-0.20260210005347-46fe127d0460 h1:ku4qVcwZUUImcaWOOrPWwhjD5BD34wS6LuENxU3XJUU= +github.com/dolthub/go-mysql-server v0.20.1-0.20260210005347-46fe127d0460/go.mod h1:LEWdXw6LKjdonOv2X808RpUc8wZVtQx4ZEPvmDWkvY4= github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 h1:OAsXLAPL4du6tfbBgK0xXHZkOlos63RdKYS3Sgw/dfI= github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63/go.mod h1:lV7lUeuDhH5thVGDCKXbatwKy2KW80L4rMT46n+Y2/Q= github.com/dolthub/ishell v0.0.0-20240701202509-2b217167d718 h1:lT7hE5k+0nkBdj/1UOSFwjWpNxf+LCApbRHgnCA17XE= diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index f9f40cae04..d5b3a0cf06 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -386,10 +386,11 @@ func (tx *DoltTransaction) doCommit( if !ok { return nil, nil, fmt.Errorf("database %s unknown to transaction, this is a bug", dbName) } + normalizedDbName := strings.ToLower(branchState.dbState.dbName) // Load the start state for this working set from the noms root at tx start // Get the base DB name from the db state, not the branch state - startPoint, ok := tx.dbStartPoints[strings.ToLower(branchState.dbState.dbName)] + startPoint, ok := tx.dbStartPoints[normalizedDbName] if !ok { return nil, nil, fmt.Errorf("database %s unknown to transaction, this is a bug", dbName) } @@ -403,7 +404,7 @@ func (tx *DoltTransaction) doCommit( mergeOpts := branchState.EditOpts() - lockID := dbName + "\u0000" + workingSet.Ref().String() + lockID := normalizedDbName + "\u0000" + workingSet.Ref().String() for i := 0; i < maxTxCommitRetries; i++ { updatedWs, newCommit, err := func() (*doltdb.WorkingSet, *doltdb.Commit, error) { @@ -501,7 +502,6 @@ func (tx *DoltTransaction) mergeRoots( workingSet *doltdb.WorkingSet, mergeOpts editor.Options, ) (*doltdb.WorkingSet, error) { - tableResolver, err := GetTableResolver(ctx, dbName) if err != nil { return nil, err diff --git a/go/store/blobstore/blobstore.go b/go/store/blobstore/blobstore.go index 2ff5a97259..cfcfc0281c 100644 --- a/go/store/blobstore/blobstore.go +++ b/go/store/blobstore/blobstore.go @@ -33,7 +33,12 @@ type Blobstore interface { // Get returns a byte range of from the blob keyed by |key|, and the latest store version. Get(ctx context.Context, key string, br BlobRange) (rc io.ReadCloser, size uint64, version string, err error) - // Put creates a new blob from |reader| keyed by |key|, it returns the latest store version. + // Put stores a blob from |reader| keyed by |key|, returning the latest store version. + // + // If |key| already exists, behavior is implementation-defined: some Blobstore + // implementations overwrite, while others may treat Put as idempotent and fast-succeed + // without consuming |reader|. Callers that require an explicit check-and-set should use + // CheckAndPut. Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (version string, err error) // CheckAndPut updates the blob keyed by |key| using a check-and-set on |expectedVersion|. diff --git a/go/store/blobstore/git_blobstore.go b/go/store/blobstore/git_blobstore.go index a7ba33f27f..c005dd00d6 100644 --- a/go/store/blobstore/git_blobstore.go +++ b/go/store/blobstore/git_blobstore.go @@ -15,12 +15,16 @@ package blobstore import ( + "bytes" "context" "errors" "fmt" "io" + "math" "os" "path/filepath" + "sort" + "strconv" "strings" "time" @@ -29,13 +33,230 @@ import ( git "github.com/dolthub/dolt/go/store/blobstore/internal/git" ) +const gitblobstorePartNameWidth = 8 // "00000001" + +type chunkPartRef struct { + oidHex string + size uint64 +} + +type chunkPartSlice struct { + oidHex string + offset int64 + length int64 +} + +type treeWrite struct { + path string + oid git.OID +} + +type putPlan struct { + writes []treeWrite + // If true, the key should be represented as a tree (chunked parts under key/NNNNNNNN). + chunked bool +} + +type limitReadCloser struct { + r io.Reader + c io.Closer +} + +func (l *limitReadCloser) Read(p []byte) (int, error) { return l.r.Read(p) } +func (l *limitReadCloser) Close() error { return l.c.Close() } + +type multiPartReadCloser struct { + ctx context.Context + api git.GitAPI + + slices []chunkPartSlice + curIdx int + + curRC io.ReadCloser + rem int64 +} + +func (m *multiPartReadCloser) Read(p []byte) (int, error) { + for { + if err := m.ensureCurrent(); err != nil { + return 0, err + } + if m.curRC == nil { + return 0, io.EOF + } + + if m.rem == 0 { + if err := m.closeCurrentAndAdvance(); err != nil { + return 0, err + } + continue + } + + n, err := m.readCurrent(p) + if n > 0 || err != nil { + return n, err + } + } +} + +func (m *multiPartReadCloser) ensureCurrent() error { + if m.curRC != nil { + return nil + } + if m.curIdx >= len(m.slices) { + return nil + } + s := m.slices[m.curIdx] + rc, err := m.openSliceReader(s) + if err != nil { + return err + } + m.curRC = rc + m.rem = s.length + return nil +} + +func (m *multiPartReadCloser) openSliceReader(s chunkPartSlice) (io.ReadCloser, error) { + rc, err := m.api.BlobReader(m.ctx, git.OID(s.oidHex)) + if err != nil { + return nil, err + } + if err := skipN(rc, s.offset); err != nil { + _ = rc.Close() + return nil, err + } + return rc, nil +} + +func (m *multiPartReadCloser) closeCurrentAndAdvance() error { + if m.curRC != nil { + err := m.curRC.Close() + m.curRC = nil + m.rem = 0 + m.curIdx++ + return err + } + m.curIdx++ + return nil +} + +func (m *multiPartReadCloser) readCurrent(p []byte) (int, error) { + toRead := len(p) + if int64(toRead) > m.rem { + toRead = int(m.rem) + } + + n, err := m.curRC.Read(p[:toRead]) + if n > 0 { + m.rem -= int64(n) + return n, nil + } + if err == nil { + return 0, nil + } + if errors.Is(err, io.EOF) { + // End of underlying part blob; if we still expected bytes, that's corruption. + if m.rem > 0 { + return 0, io.ErrUnexpectedEOF + } + _ = m.closeCurrentAndAdvance() + return 0, nil + } + return 0, err +} + +func (m *multiPartReadCloser) Close() error { + if m.curRC != nil { + err := m.curRC.Close() + m.curRC = nil + return err + } + return nil +} + +type concatReadCloser struct { + ctx context.Context + keys []string + open func(ctx context.Context, key string) (io.ReadCloser, error) + cur int + curRC io.ReadCloser + done bool +} + +func (c *concatReadCloser) ensureCurrent() error { + if c.done || c.curRC != nil { + return nil + } + if c.cur >= len(c.keys) { + c.done = true + return nil + } + rc, err := c.open(c.ctx, c.keys[c.cur]) + if err != nil { + return err + } + c.curRC = rc + return nil +} + +func (c *concatReadCloser) closeCurrentAndAdvance() error { + if c.curRC != nil { + err := c.curRC.Close() + c.curRC = nil + c.cur++ + return err + } + c.cur++ + return nil +} + +func (c *concatReadCloser) Read(p []byte) (int, error) { + for { + if err := c.ensureCurrent(); err != nil { + return 0, err + } + if c.curRC == nil { + return 0, io.EOF + } + + n, err := c.curRC.Read(p) + if n > 0 { + // Preserve data; defer advancement until next Read call. + if err == io.EOF { + _ = c.closeCurrentAndAdvance() + return n, nil + } + return n, err + } + if err == nil { + continue + } + if err == io.EOF { + if cerr := c.closeCurrentAndAdvance(); cerr != nil { + return 0, cerr + } + continue + } + return 0, err + } +} + +func (c *concatReadCloser) Close() error { + c.done = true + if c.curRC != nil { + err := c.curRC.Close() + c.curRC = nil + return err + } + return nil +} + // GitBlobstore is a Blobstore implementation backed by a git repository's object // database (bare repo or .git directory). It stores keys as paths within the tree // of the commit referenced by a git ref (e.g. refs/dolt/data). // -// This implementation is being developed in phases. Read paths are implemented first, -// then write paths are added incrementally. At the moment, Put is implemented, while -// CheckAndPut and Concatenate are still unimplemented. +// This implementation is being developed in phases. Read paths were implemented first, +// then write paths were added incrementally. type GitBlobstore struct { gitDir string ref string @@ -45,6 +266,12 @@ type GitBlobstore struct { // When nil, we prefer whatever identity git derives from env/config, falling back // to a deterministic default only if git reports the identity is missing. identity *git.Identity + // maxPartSize, when non-zero, enables the chunked-object representation for objects + // written by Put/CheckAndPut/Concatenate. When enabled, no single part blob created + // by this blobstore should exceed maxPartSize bytes. + // + // A zero value means "disabled" (store values inline as a single git blob). + maxPartSize uint64 } var _ Blobstore = (*GitBlobstore)(nil) @@ -53,17 +280,38 @@ var _ Blobstore = (*GitBlobstore)(nil) // |gitDir| should point at a bare repo directory or a .git directory. Put is implemented, // while CheckAndPut and Concatenate are still unimplemented (see type-level docs). func NewGitBlobstore(gitDir, ref string) (*GitBlobstore, error) { - return NewGitBlobstoreWithIdentity(gitDir, ref, nil) + return NewGitBlobstoreWithOptions(gitDir, ref, GitBlobstoreOptions{}) } // NewGitBlobstoreWithIdentity creates a GitBlobstore rooted at |gitDir| and |ref|, optionally // forcing an author/committer identity for write paths. func NewGitBlobstoreWithIdentity(gitDir, ref string, identity *git.Identity) (*GitBlobstore, error) { + return NewGitBlobstoreWithOptions(gitDir, ref, GitBlobstoreOptions{Identity: identity}) +} + +// GitBlobstoreOptions configures optional behaviors of GitBlobstore. +type GitBlobstoreOptions struct { + // Identity, when non-nil, forces the author/committer identity for commits created by write paths. + Identity *git.Identity + // MaxPartSize enables chunked-object writes when non-zero. + // Read paths always support chunked objects if encountered. + MaxPartSize uint64 +} + +// NewGitBlobstoreWithOptions creates a GitBlobstore rooted at |gitDir| and |ref|. +func NewGitBlobstoreWithOptions(gitDir, ref string, opts GitBlobstoreOptions) (*GitBlobstore, error) { r, err := git.NewRunner(gitDir) if err != nil { return nil, err } - return &GitBlobstore{gitDir: gitDir, ref: ref, runner: r, api: git.NewGitAPIImpl(r), identity: identity}, nil + return &GitBlobstore{ + gitDir: gitDir, + ref: ref, + runner: r, + api: git.NewGitAPIImpl(r), + identity: opts.Identity, + maxPartSize: opts.MaxPartSize, + }, nil } func (gbs *GitBlobstore) Path() string { @@ -82,7 +330,7 @@ func (gbs *GitBlobstore) Exists(ctx context.Context, key string) (bool, error) { if !ok { return false, nil } - _, err = gbs.api.ResolvePathBlob(ctx, commit, key) + _, _, err = gbs.api.ResolvePathObject(ctx, commit, key) if err != nil { if git.IsPathNotFound(err) { return false, nil @@ -97,106 +345,262 @@ func (gbs *GitBlobstore) Get(ctx context.Context, key string, br BlobRange) (io. if err != nil { return nil, 0, "", err } - commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + commit, err := gbs.resolveCommitForGet(ctx, key) if err != nil { return nil, 0, "", err } - if !ok { - // If the ref doesn't exist, treat the manifest as missing (empty store), - // but surface a hard error for other keys: the store itself is missing. - if key == "manifest" { - return nil, 0, "", NotFound{Key: key} - } - return nil, 0, "", &git.RefNotFoundError{Ref: gbs.ref} + + oid, typ, err := gbs.resolveObjectForGet(ctx, commit, key) + if err != nil { + return nil, 0, "", err } - blobOID, err := gbs.api.ResolvePathBlob(ctx, commit, key) + switch typ { + case git.ObjectTypeBlob: + sz, ver, err := gbs.resolveBlobSizeForGet(ctx, commit, oid) + if err != nil { + return nil, 0, ver, err + } + rc, err := gbs.api.BlobReader(ctx, oid) + if err != nil { + return nil, 0, ver, err + } + // Per-key version: blob object id. + return sliceInlineBlob(rc, sz, br, oid.String()) + + case git.ObjectTypeTree: + // Per-key version: tree object id at this key. + rc, sz, _, err := gbs.openChunkedTreeRange(ctx, commit, key, br) + return rc, sz, oid.String(), err + + default: + return nil, 0, "", fmt.Errorf("gitblobstore: unsupported object type %q for key %q", typ, key) + } +} + +func (gbs *GitBlobstore) openChunkedTreeRange(ctx context.Context, commit git.OID, key string, br BlobRange) (io.ReadCloser, uint64, string, error) { + ver := commit.String() + + entries, err := gbs.api.ListTree(ctx, commit, key) + if err != nil { + return nil, 0, ver, err + } + parts, totalSize, err := gbs.validateAndSizeChunkedParts(ctx, entries) + if err != nil { + return nil, 0, ver, err + } + + total := int64(totalSize) + start, end, err := normalizeRange(total, br.offset, br.length) + if err != nil { + return nil, totalSize, ver, err + } + slices, err := sliceChunkParts(parts, start, end) + if err != nil { + return nil, totalSize, ver, err + } + + // Stream across part blobs. + streamRC := &multiPartReadCloser{ + ctx: ctx, + api: gbs.api, + slices: slices, + } + return streamRC, totalSize, ver, nil +} + +func (gbs *GitBlobstore) validateAndSizeChunkedParts(ctx context.Context, entries []git.TreeEntry) ([]chunkPartRef, uint64, error) { + if len(entries) == 0 { + return nil, 0, fmt.Errorf("gitblobstore: chunked tree has no parts") + } + + width := len(entries[0].Name) + // First pass: validate names + types, and determine width. + if width < 4 { + return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected at least 4 digits)", entries[0].Name) + } + + parts := make([]chunkPartRef, 0, len(entries)) + var total uint64 + for i, e := range entries { + if e.Type != git.ObjectTypeBlob { + return nil, 0, fmt.Errorf("gitblobstore: invalid part %q: expected blob, got %q", e.Name, e.Type) + } + if len(e.Name) != width { + return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected width %d)", e.Name, width) + } + n, err := strconv.Atoi(e.Name) + if err != nil { + return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected digits): %w", e.Name, err) + } + if n != i+1 { + want := fmt.Sprintf("%0*d", width, i+1) + return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected %q)", e.Name, want) + } + if want := fmt.Sprintf("%0*d", width, n); want != e.Name { + return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected %q)", e.Name, want) + } + + sz, err := gbs.api.BlobSize(ctx, e.OID) + if err != nil { + return nil, 0, err + } + if sz < 0 { + return nil, 0, fmt.Errorf("gitblobstore: invalid part size %d for %q", sz, e.Name) + } + if uint64(sz) > math.MaxUint64-total { + return nil, 0, fmt.Errorf("gitblobstore: total size overflow") + } + total += uint64(sz) + parts = append(parts, chunkPartRef{oidHex: e.OID.String(), size: uint64(sz)}) + } + return parts, total, nil +} + +func (gbs *GitBlobstore) resolveCommitForGet(ctx context.Context, key string) (commit git.OID, err error) { + commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + if err != nil { + return git.OID(""), err + } + if ok { + return commit, nil + } + + // If the ref doesn't exist, treat the manifest as missing (empty store), + // but surface a hard error for other keys: the store itself is missing. + if key == "manifest" { + return git.OID(""), NotFound{Key: key} + } + return git.OID(""), &git.RefNotFoundError{Ref: gbs.ref} +} + +func (gbs *GitBlobstore) resolveObjectForGet(ctx context.Context, commit git.OID, key string) (oid git.OID, typ git.ObjectType, err error) { + oid, typ, err = gbs.api.ResolvePathObject(ctx, commit, key) if err != nil { if git.IsPathNotFound(err) { - return nil, 0, commit.String(), NotFound{Key: key} + return git.OID(""), git.ObjectTypeUnknown, NotFound{Key: key} } - return nil, 0, commit.String(), err + return git.OID(""), git.ObjectTypeUnknown, err } - - sz, err := gbs.api.BlobSize(ctx, blobOID) - if err != nil { - return nil, 0, commit.String(), err - } - - // TODO(gitblobstore): This streaming implementation is correct but may be slow for workloads - // that do many small ranged reads (e.g. table index/footer reads). Consider caching/materializing - // blobs to a local file (or using a batched git cat-file mode) to serve ranges efficiently. - rc, err := gbs.api.BlobReader(ctx, blobOID) - if err != nil { - return nil, 0, commit.String(), err - } - - // Implement BlobRange by slicing the streamed blob contents. - if br.isAllRange() { - return rc, uint64(sz), commit.String(), nil - } - - pos := br.positiveRange(sz) - if pos.offset < 0 || pos.offset > sz { - _ = rc.Close() - return nil, uint64(sz), commit.String(), fmt.Errorf("invalid BlobRange offset %d for blob of size %d", pos.offset, sz) - } - if pos.length < 0 { - _ = rc.Close() - return nil, uint64(sz), commit.String(), fmt.Errorf("invalid BlobRange length %d", pos.length) - } - if pos.length == 0 { - // Read from offset to end. - pos.length = sz - pos.offset - } - // Clamp to end (defensive; positiveRange should already do this). - if pos.offset+pos.length > sz { - pos.length = sz - pos.offset - } - - // Skip to offset. - if pos.offset > 0 { - if _, err := io.CopyN(io.Discard, rc, pos.offset); err != nil { - _ = rc.Close() - return nil, uint64(sz), commit.String(), err - } - } - - return &limitReadCloser{r: io.LimitReader(rc, pos.length), c: rc}, uint64(sz), commit.String(), nil + return oid, typ, nil } -type limitReadCloser struct { - r io.Reader - c io.Closer +func (gbs *GitBlobstore) resolveBlobSizeForGet(ctx context.Context, commit git.OID, oid git.OID) (sz int64, ver string, err error) { + sz, err = gbs.api.BlobSize(ctx, oid) + if err != nil { + return 0, commit.String(), err + } + return sz, commit.String(), nil } -func (l *limitReadCloser) Read(p []byte) (int, error) { return l.r.Read(p) } -func (l *limitReadCloser) Close() error { return l.c.Close() } - func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (string, error) { key, err := normalizeGitTreePath(key) if err != nil { return "", err } + // Many NBS/table-file writes are content-addressed: if the key already exists, callers + // assume it refers to the same bytes and treat the operation as idempotent. + // + // GitBlobstore enforces that assumption by fast-succeeding when a non-manifest key + // already exists: it returns the existing per-key version and does not overwrite the + // key (and does not consume |reader|). + // + // The manifest is the main exception (it is mutable and updated via CheckAndPut). + if ver, ok, err := gbs.tryFastSucceedPutIfKeyExists(ctx, key); err != nil { + return "", err + } else if ok { + return ver, nil + } + + msg := fmt.Sprintf("gitblobstore: put %s", key) + // Hash the contents once. If we need to retry due to concurrent updates to |gbs.ref|, - // we can reuse the blob OID without re-reading |reader|. - blobOID, err := gbs.api.HashObject(ctx, reader) + // we can reuse the resulting object OIDs without re-reading |reader|. + plan, err := gbs.planPutWrites(ctx, key, totalSize, reader) if err != nil { return "", err } + return gbs.putWithCASRetries(ctx, key, plan, msg) +} +func (gbs *GitBlobstore) planPutWrites(ctx context.Context, key string, totalSize int64, reader io.Reader) (putPlan, error) { + // Minimal policy: chunk only when explicitly enabled and |totalSize| exceeds MaxPartSize. + if gbs.maxPartSize == 0 || totalSize <= 0 || uint64(totalSize) <= gbs.maxPartSize { + blobOID, err := gbs.api.HashObject(ctx, reader) + if err != nil { + return putPlan{}, err + } + return putPlan{writes: []treeWrite{{path: key, oid: blobOID}}}, nil + } + + partOIDs, err := gbs.hashChunkedParts(ctx, reader) + if err != nil { + return putPlan{}, err + } + + writes := make([]treeWrite, 0, len(partOIDs)) + for i, p := range partOIDs { + partName := fmt.Sprintf("%0*d", gitblobstorePartNameWidth, i+1) + writes = append(writes, treeWrite{path: key + "/" + partName, oid: p}) + } + return putPlan{writes: writes, chunked: true}, nil +} + +func (gbs *GitBlobstore) hashChunkedParts(ctx context.Context, reader io.Reader) (partOIDs []git.OID, err error) { + max := int64(gbs.maxPartSize) + if max <= 0 { + return nil, fmt.Errorf("gitblobstore: invalid maxPartSize %d", gbs.maxPartSize) + } + + _, partOIDs, _, err = gbs.hashParts(ctx, reader) + if err != nil { + return nil, err + } + return partOIDs, nil +} + +func (gbs *GitBlobstore) hashParts(ctx context.Context, reader io.Reader) (parts []chunkPartRef, partOIDs []git.OID, total uint64, err error) { + max := int64(gbs.maxPartSize) + if max <= 0 { + return nil, nil, 0, fmt.Errorf("gitblobstore: invalid maxPartSize %d", gbs.maxPartSize) + } + + buf := make([]byte, max) + for { + n, rerr := io.ReadFull(reader, buf) + if rerr != nil { + if errors.Is(rerr, io.EOF) { + break + } + if !errors.Is(rerr, io.ErrUnexpectedEOF) { + return nil, nil, 0, rerr + } + // ErrUnexpectedEOF: process final short chunk and stop. + } + if n == 0 { + break + } + partBytes := append([]byte(nil), buf[:n]...) + oid, err := gbs.api.HashObject(ctx, bytes.NewReader(partBytes)) + if err != nil { + return nil, nil, 0, err + } + partOIDs = append(partOIDs, oid) + parts = append(parts, chunkPartRef{oidHex: oid.String(), size: uint64(n)}) + total += uint64(n) + if errors.Is(rerr, io.ErrUnexpectedEOF) { + break + } + } + return parts, partOIDs, total, nil +} + +func (gbs *GitBlobstore) putWithCASRetries(ctx context.Context, key string, plan putPlan, msg string) (string, error) { // Make Put resilient to concurrent writers updating unrelated keys by using a CAS loop // under the hood. This matches typical object-store semantics more closely than an // unconditional ref update (which could clobber other keys). - const maxRetries = 31 // 32 total attempts (initial + retries) - bo := backoff.NewExponentialBackOff() - bo.InitialInterval = 5 * time.Millisecond - bo.Multiplier = 2 - bo.MaxInterval = 320 * time.Millisecond - bo.RandomizationFactor = 0 // deterministic; can add jitter later if needed - bo.Reset() - policy := backoff.WithContext(backoff.WithMaxRetries(bo, maxRetries), ctx) + policy := gbs.casRetryPolicy(ctx) var ver string op := func() error { @@ -205,36 +609,20 @@ func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, r return backoff.Permanent(err) } - newCommit, msg, err := gbs.buildPutCommit(ctx, parent, ok, key, blobOID) + newCommit, err := gbs.buildCommitForKeyWrite(ctx, parent, ok, key, plan, msg) if err != nil { return backoff.Permanent(err) } - if !ok { - // Best-effort ref creation. If a concurrent writer created the ref first, retry - // and take the normal CAS path on the new head. - if err := gbs.api.UpdateRef(ctx, gbs.ref, newCommit, msg); err != nil { - if gbs.refAdvanced(ctx, parent) { - return err - } - return backoff.Permanent(err) - } - ver = newCommit.String() - return nil - } - - err = gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg) - if err == nil { - ver = newCommit.String() - return nil - } - - // If the ref changed since we read |parent|, retry on the new head. Otherwise - // surface the error (e.g. permissions, corruption). - if gbs.refAdvanced(ctx, parent) { + if err := gbs.updateRefCASForWrite(ctx, parent, ok, newCommit, msg); err != nil { return err } - return backoff.Permanent(err) + + ver, err = gbs.resolveKeyVersionAtCommit(ctx, newCommit, key) + if err != nil { + return backoff.Permanent(err) + } + return nil } if err := backoff.Retry(op, policy); err != nil { @@ -246,16 +634,18 @@ func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, r return ver, nil } -func (gbs *GitBlobstore) buildPutCommit(ctx context.Context, parent git.OID, hasParent bool, key string, blobOID git.OID) (git.OID, string, error) { - msg := fmt.Sprintf("gitblobstore: put %s", key) - commitOID, err := gbs.buildCommitWithMessage(ctx, parent, hasParent, key, blobOID, msg) - if err != nil { - return "", "", err - } - return commitOID, msg, nil +func (gbs *GitBlobstore) casRetryPolicy(ctx context.Context) backoff.BackOff { + const maxRetries = 31 // 32 total attempts (initial + retries) + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = 5 * time.Millisecond + bo.Multiplier = 2 + bo.MaxInterval = 320 * time.Millisecond + bo.RandomizationFactor = 0 // deterministic; can add jitter later if needed + bo.Reset() + return backoff.WithContext(backoff.WithMaxRetries(bo, maxRetries), ctx) } -func (gbs *GitBlobstore) buildCommitWithMessage(ctx context.Context, parent git.OID, hasParent bool, key string, blobOID git.OID, msg string) (git.OID, error) { +func (gbs *GitBlobstore) buildCommitForKeyWrite(ctx context.Context, parent git.OID, hasParent bool, key string, plan putPlan, msg string) (git.OID, error) { _, indexFile, cleanup, err := newTempIndex() if err != nil { return "", err @@ -272,14 +662,17 @@ func (gbs *GitBlobstore) buildCommitWithMessage(ctx context.Context, parent git. } } - // TODO(gitblobstore): Decide on a policy for file-vs-directory prefix conflicts when staging keys. - // For example, staging "a" when "a/b" already exists in the tree/index (or vice-versa) can fail - // with a git index error (path appears as both a file and directory). Today our NBS keyspace is - // flat (e.g. "manifest", "", ".records"), so this should not occur. If we ever - // namespace keys into directories, consider proactively removing conflicting paths from the index - // before UpdateIndexCacheInfo so Put/CheckAndPut remain robust. - if err := gbs.api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, key); err != nil { - return "", err + if hasParent { + if err := gbs.removeKeyConflictsFromIndex(ctx, parent, indexFile, key, plan.chunked); err != nil { + return "", err + } + } + + sort.Slice(plan.writes, func(i, j int) bool { return plan.writes[i].path < plan.writes[j].path }) + for _, w := range plan.writes { + if err := gbs.api.UpdateIndexCacheInfo(ctx, indexFile, "100644", w.oid, w.path); err != nil { + return "", err + } } treeOID, err := gbs.api.WriteTree(ctx, indexFile) @@ -293,7 +686,6 @@ func (gbs *GitBlobstore) buildCommitWithMessage(ctx context.Context, parent git. parentPtr = &p } - // Prefer git's default identity from env/config when not explicitly configured. commitOID, err := gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, gbs.identity) if err != nil && gbs.identity == nil && isMissingGitIdentityErr(err) { commitOID, err = gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, defaultGitBlobstoreIdentity()) @@ -301,10 +693,472 @@ func (gbs *GitBlobstore) buildCommitWithMessage(ctx context.Context, parent git. if err != nil { return "", err } - return commitOID, nil } +func (gbs *GitBlobstore) removeKeyConflictsFromIndex(ctx context.Context, parent git.OID, indexFile string, key string, newIsChunked bool) error { + _, typ, err := gbs.api.ResolvePathObject(ctx, parent, key) + if err != nil { + if git.IsPathNotFound(err) { + return nil + } + return err + } + + switch typ { + case git.ObjectTypeBlob: + if newIsChunked { + // blob -> tree: must remove the file entry at + return gbs.api.RemoveIndexPaths(ctx, indexFile, []string{key}) + } + return nil + + case git.ObjectTypeTree: + // tree -> blob OR tree overwrite: remove old child entries under /... + entries, err := gbs.api.ListTree(ctx, parent, key) + if err != nil { + return err + } + if len(entries) == 0 { + return nil + } + paths := make([]string, 0, len(entries)) + for _, e := range entries { + paths = append(paths, key+"/"+e.Name) + } + return gbs.api.RemoveIndexPaths(ctx, indexFile, paths) + + default: + return fmt.Errorf("gitblobstore: unsupported existing object type %q at key %q", typ, key) + } +} + +func (gbs *GitBlobstore) updateRefCASForWrite(ctx context.Context, parent git.OID, haveParent bool, newCommit git.OID, msg string) error { + if !haveParent { + // Create-only CAS: oldOID=all-zero requires the ref to not exist. This avoids + // losing concurrent writes when multiple goroutines create the ref at once. + const zeroOID = git.OID("0000000000000000000000000000000000000000") + if err := gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, zeroOID, msg); err != nil { + if gbs.refAdvanced(ctx, parent) { + return err + } + return backoff.Permanent(err) + } + return nil + } + + if err := gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg); err != nil { + // If the ref changed since we read |parent|, retry on the new head. Otherwise + // surface the error (e.g. permissions, corruption). + if gbs.refAdvanced(ctx, parent) { + return err + } + return backoff.Permanent(err) + } + return nil +} + +func (gbs *GitBlobstore) refAdvanced(ctx context.Context, old git.OID) bool { + if ctx.Err() != nil { + return false + } + cur, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + return err == nil && ok && cur != old +} + +func (gbs *GitBlobstore) resolveKeyVersionAtCommit(ctx context.Context, commit git.OID, key string) (string, error) { + oid, _, err := gbs.api.ResolvePathObject(ctx, commit, key) + if err != nil { + return "", err + } + return oid.String(), nil +} + +func (gbs *GitBlobstore) tryFastSucceedPutIfKeyExists(ctx context.Context, key string) (ver string, ok bool, err error) { + if key == "manifest" { + return "", false, nil + } + + commit, haveCommit, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + if err != nil { + return "", false, err + } + if !haveCommit { + return "", false, nil + } + + oid, _, err := gbs.api.ResolvePathObject(ctx, commit, key) + if err == nil { + // Per-key version: existing object id. + return oid.String(), true, nil + } + if git.IsPathNotFound(err) { + return "", false, nil + } + return "", false, err +} + +func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader) (string, error) { + key, err := normalizeGitTreePath(key) + if err != nil { + return "", err + } + + msg := fmt.Sprintf("gitblobstore: checkandput %s", key) + + policy := gbs.casRetryPolicy(ctx) + + var newKeyVersion string + var cachedPlan *putPlan + op := func() error { + parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + if err != nil { + return backoff.Permanent(err) + } + + actualKeyVersion, err := gbs.currentKeyVersion(ctx, parent, ok, key) + if err != nil { + return backoff.Permanent(err) + } + if expectedVersion != actualKeyVersion { + return backoff.Permanent(CheckAndPutError{Key: key, ExpectedVersion: expectedVersion, ActualVersion: actualKeyVersion}) + } + + // Only hash/consume the reader once we know the expectedVersion matches. + // If we need to retry due to unrelated ref advances, reuse the cached plan so we + // don't re-read |reader| (which may not be rewindable). + if cachedPlan == nil { + plan, err := gbs.planPutWrites(ctx, key, totalSize, reader) + if err != nil { + return backoff.Permanent(err) + } + cachedPlan = &plan + } + + newCommit, err := gbs.buildCommitForKeyWrite(ctx, parent, ok, key, *cachedPlan, msg) + if err != nil { + return backoff.Permanent(err) + } + + if err := gbs.updateRefCASForWrite(ctx, parent, ok, newCommit, msg); err != nil { + return err + } + + ver, err := gbs.resolveKeyVersionAtCommit(ctx, newCommit, key) + if err != nil { + return backoff.Permanent(err) + } + newKeyVersion = ver + return nil + } + + if err := backoff.Retry(op, policy); err != nil { + if ctx.Err() != nil { + return "", ctx.Err() + } + return "", err + } + + return newKeyVersion, nil +} + +func (gbs *GitBlobstore) currentKeyVersion(ctx context.Context, commit git.OID, haveCommit bool, key string) (string, error) { + if !haveCommit { + // Ref missing => empty store => key missing. + return "", nil + } + oid, _, err := gbs.api.ResolvePathObject(ctx, commit, key) + if err != nil { + if git.IsPathNotFound(err) { + return "", nil + } + return "", err + } + return oid.String(), nil +} + +func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []string) (string, error) { + // Keep key validation for consistent error behavior. + var err error + key, err = normalizeGitTreePath(key) + if err != nil { + return "", err + } + if len(sources) == 0 { + return "", fmt.Errorf("gitblobstore: concatenate requires at least one source") + } + normSources := make([]string, 0, len(sources)) + for _, src := range sources { + norm, err := normalizeGitTreePath(src) + if err != nil { + return "", err + } + normSources = append(normSources, norm) + } + sources = normSources + + // For non-manifest keys, match Put's behavior: if the key already exists, succeed without overwriting. + if ver, ok, err := gbs.tryFastSucceedPutIfKeyExists(ctx, key); err != nil { + return "", err + } else if ok { + return ver, nil + } + + // Resolve a snapshot commit for the sources. + commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + if err != nil { + return "", err + } + if !ok { + // Consistent with Get: empty store => manifest missing, other keys => ref missing. + if key == "manifest" { + return "", NotFound{Key: key} + } + return "", &git.RefNotFoundError{Ref: gbs.ref} + } + + totalSz, err := gbs.totalSizeAtCommit(ctx, commit, sources) + if err != nil { + return "", err + } + + rc := &concatReadCloser{ + ctx: ctx, + keys: sources, + open: func(ctx context.Context, k string) (io.ReadCloser, error) { + return gbs.openReaderAtCommit(ctx, commit, k) + }, + } + defer rc.Close() + + plan, err := gbs.planPutWrites(ctx, key, totalSz, rc) + if err != nil { + return "", err + } + + msg := fmt.Sprintf("gitblobstore: concatenate %s", key) + return gbs.putWithCASRetries(ctx, key, plan, msg) +} + +func (gbs *GitBlobstore) openReaderAtCommit(ctx context.Context, commit git.OID, key string) (io.ReadCloser, error) { + oid, typ, err := gbs.resolveObjectForGet(ctx, commit, key) + if err != nil { + return nil, err + } + switch typ { + case git.ObjectTypeBlob: + return gbs.api.BlobReader(ctx, oid) + case git.ObjectTypeTree: + rc, _, _, err := gbs.openChunkedTreeRange(ctx, commit, key, AllRange) + if err != nil { + // Defensive: resolveObjectForGet succeeded, but keep NotFound mapping consistent. + var pnf *git.PathNotFoundError + if errors.As(err, &pnf) { + return nil, NotFound{Key: key} + } + return nil, err + } + return rc, nil + default: + return nil, fmt.Errorf("gitblobstore: unsupported object type %q for key %q", typ, key) + } +} + +// sizeAtCommit returns the byte size of |key| as of |commit|. +// It supports both inline blobs and the chunked-tree representation used by GitBlobstore. +// If |key| is missing at |commit|, it returns NotFound{Key: key}. +func (gbs *GitBlobstore) sizeAtCommit(ctx context.Context, commit git.OID, key string) (uint64, error) { + oid, typ, err := gbs.api.ResolvePathObject(ctx, commit, key) + if err != nil { + if git.IsPathNotFound(err) { + return 0, NotFound{Key: key} + } + return 0, err + } + + switch typ { + case git.ObjectTypeBlob: + sz, err := gbs.api.BlobSize(ctx, oid) + if err != nil { + return 0, err + } + if sz < 0 { + return 0, fmt.Errorf("gitblobstore: invalid blob size %d for key %q", sz, key) + } + return uint64(sz), nil + + case git.ObjectTypeTree: + entries, err := gbs.api.ListTree(ctx, commit, key) + if err != nil { + if git.IsPathNotFound(err) { + return 0, NotFound{Key: key} + } + return 0, err + } + _, total, err := gbs.validateAndSizeChunkedParts(ctx, entries) + return total, err + + default: + return 0, fmt.Errorf("gitblobstore: unsupported object type %q for key %q", typ, key) + } +} + +// totalSizeAtCommit sums the sizes of |sources| at |commit| and returns the total as int64. +// Returns an error on overflow or if any source is missing. +func (gbs *GitBlobstore) totalSizeAtCommit(ctx context.Context, commit git.OID, sources []string) (int64, error) { + var total uint64 + for _, src := range sources { + sz, err := gbs.sizeAtCommit(ctx, commit, src) + if err != nil { + return 0, err + } + if sz > math.MaxUint64-total { + return 0, fmt.Errorf("gitblobstore: concatenated size overflow") + } + total += sz + } + if total > uint64(math.MaxInt64) { + return 0, fmt.Errorf("gitblobstore: concatenated size %d overflows int64", total) + } + return int64(total), nil +} + +func sliceInlineBlob(rc io.ReadCloser, sz int64, br BlobRange, ver string) (io.ReadCloser, uint64, string, error) { + // Implement BlobRange by slicing the streamed blob contents. + // TODO(gitblobstore): This streaming implementation is correct but may be slow for workloads + // that do many small ranged reads (e.g. table index/footer reads). Consider caching/materializing + // blobs to a local file (or using a batched git cat-file mode) to serve ranges efficiently. + if br.isAllRange() { + return rc, uint64(sz), ver, nil + } + + pos := br.positiveRange(sz) + if pos.offset < 0 || pos.offset > sz { + _ = rc.Close() + return nil, uint64(sz), ver, fmt.Errorf("invalid BlobRange offset %d for blob of size %d", pos.offset, sz) + } + if pos.length < 0 { + _ = rc.Close() + return nil, uint64(sz), ver, fmt.Errorf("invalid BlobRange length %d", pos.length) + } + if pos.length == 0 { + // Read from offset to end. + pos.length = sz - pos.offset + } + // Clamp to end (defensive; positiveRange should already do this). + if pos.offset+pos.length > sz { + pos.length = sz - pos.offset + } + + // Skip to offset. + if pos.offset > 0 { + if _, err := io.CopyN(io.Discard, rc, pos.offset); err != nil { + _ = rc.Close() + return nil, uint64(sz), ver, err + } + } + + return &limitReadCloser{r: io.LimitReader(rc, pos.length), c: rc}, uint64(sz), ver, nil +} + +func skipN(r io.Reader, n int64) error { + if n <= 0 { + return nil + } + _, err := io.CopyN(io.Discard, r, n) + return err +} + +func normalizeRange(total int64, offset int64, length int64) (start, end int64, err error) { + if total < 0 { + return 0, 0, fmt.Errorf("invalid total size %d", total) + } + if length < 0 { + return 0, 0, fmt.Errorf("invalid length %d", length) + } + start = offset + if start < 0 { + start = total + start + } + if start < 0 || start > total { + return 0, 0, fmt.Errorf("invalid offset %d for total size %d", offset, total) + } + if length == 0 { + end = total + } else { + end = start + length + if end < start { + return 0, 0, fmt.Errorf("range overflow") + } + if end > total { + end = total + } + } + return start, end, nil +} + +func sliceChunkParts(parts []chunkPartRef, start, end int64) ([]chunkPartSlice, error) { + if start < 0 || end < 0 || end < start { + return nil, fmt.Errorf("invalid start/end: %d/%d", start, end) + } + if start == end { + return nil, nil + } + + var ( + out []chunkPartSlice + pos int64 + ) + + for _, p := range parts { + if p.size == 0 { + return nil, fmt.Errorf("invalid part size 0") + } + partStart := pos + partEnd := pos + int64(p.size) + if partEnd < partStart { + return nil, fmt.Errorf("part size overflow") + } + + if end <= partStart { + break + } + if start >= partEnd { + pos = partEnd + continue + } + + s := start + if s < partStart { + s = partStart + } + e := end + if e > partEnd { + e = partEnd + } + if e > s { + out = append(out, chunkPartSlice{ + oidHex: p.oidHex, + offset: s - partStart, + length: e - s, + }) + } + pos = partEnd + } + + if len(out) == 0 { + return nil, fmt.Errorf("range [%d,%d) not covered by parts", start, end) + } + var covered int64 + for _, s := range out { + covered += s.length + } + if covered != (end - start) { + return nil, fmt.Errorf("range [%d,%d) not fully covered by parts", start, end) + } + return out, nil +} + func defaultGitBlobstoreIdentity() *git.Identity { // Deterministic fallback identity for environments without git identity configured. return &git.Identity{Name: "dolt gitblobstore", Email: "gitblobstore@dolt.invalid"} @@ -345,80 +1199,6 @@ func newTempIndex() (dir, indexFile string, cleanup func(), err error) { return dir, indexFile, cleanup, nil } -func (gbs *GitBlobstore) refAdvanced(ctx context.Context, old git.OID) bool { - if ctx.Err() != nil { - return false - } - cur, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) - return err == nil && ok && cur != old -} - -func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader) (string, error) { - key, err := normalizeGitTreePath(key) - if err != nil { - return "", err - } - - // Resolve current head and validate expectedVersion before consuming |reader|. - parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) - if err != nil { - return "", err - } - actualVersion := "" - if ok { - actualVersion = parent.String() - } - if expectedVersion != actualVersion { - return "", CheckAndPutError{Key: key, ExpectedVersion: expectedVersion, ActualVersion: actualVersion} - } - - blobOID, err := gbs.api.HashObject(ctx, reader) - if err != nil { - return "", err - } - - msg := fmt.Sprintf("gitblobstore: checkandput %s", key) - newCommit, err := gbs.buildCommitWithMessage(ctx, parent, ok, key, blobOID, msg) - if err != nil { - return "", err - } - - if ok { - if err := gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg); err != nil { - // If the ref changed, surface as a standard mismatch error. - cur, ok2, err2 := gbs.api.TryResolveRefCommit(ctx, gbs.ref) - if err2 == nil && ok2 && cur != parent { - return "", CheckAndPutError{Key: key, ExpectedVersion: expectedVersion, ActualVersion: cur.String()} - } - return "", err - } - return newCommit.String(), nil - } - - // Create-only CAS: oldOID=all-zero requires the ref to not exist. - const zeroOID = git.OID("0000000000000000000000000000000000000000") - if err := gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, zeroOID, msg); err != nil { - cur, ok2, err2 := gbs.api.TryResolveRefCommit(ctx, gbs.ref) - if err2 == nil && ok2 { - return "", CheckAndPutError{Key: key, ExpectedVersion: expectedVersion, ActualVersion: cur.String()} - } - return "", err - } - return newCommit.String(), nil -} - -func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []string) (string, error) { - if _, err := normalizeGitTreePath(key); err != nil { - return "", err - } - for _, src := range sources { - if _, err := normalizeGitTreePath(src); err != nil { - return "", err - } - } - return "", fmt.Errorf("%w: GitBlobstore.Concatenate", git.ErrUnimplemented) -} - // normalizeGitTreePath normalizes and validates a blobstore key for use as a git tree path. // // Rules: diff --git a/go/store/blobstore/git_blobstore_chunked_checkandput_test.go b/go/store/blobstore/git_blobstore_chunked_checkandput_test.go new file mode 100644 index 0000000000..cc1e18ff9e --- /dev/null +++ b/go/store/blobstore/git_blobstore_chunked_checkandput_test.go @@ -0,0 +1,81 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobstore + +import ( + "bytes" + "context" + "errors" + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/store/testutils/gitrepo" +) + +func TestGitBlobstore_CheckAndPut_ChunkedRoundTrip_CreateOnly(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + Identity: testIdentity(), + MaxPartSize: 3, + }) + require.NoError(t, err) + + want := []byte("abcdefghij") // 10 bytes -> chunked tree + ver, err := bs.CheckAndPut(ctx, "", "big", int64(len(want)), bytes.NewReader(want)) + require.NoError(t, err) + require.NotEmpty(t, ver) + + got, ver2, err := GetBytes(ctx, bs, "big", AllRange) + require.NoError(t, err) + require.Equal(t, ver, ver2) + require.Equal(t, want, got) +} + +type chunkedFailReader struct{} + +func (chunkedFailReader) Read(_ []byte) (int, error) { + return 0, errors.New("read should not be called") +} + +func TestGitBlobstore_CheckAndPut_MismatchDoesNotConsumeReader_WithChunkingEnabled(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + // Seed any commit so actualVersion != "". + bs0, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{Identity: testIdentity()}) + require.NoError(t, err) + _, err = bs0.Put(ctx, "x", 1, bytes.NewReader([]byte("x"))) + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + Identity: testIdentity(), + MaxPartSize: 3, + }) + require.NoError(t, err) + + _, err = bs.CheckAndPut(ctx, "definitely-wrong", "y", 1, io.Reader(chunkedFailReader{})) + require.Error(t, err) + require.True(t, IsCheckAndPutError(err)) +} diff --git a/go/store/blobstore/git_blobstore_chunked_get_test.go b/go/store/blobstore/git_blobstore_chunked_get_test.go new file mode 100644 index 0000000000..2f6054f632 --- /dev/null +++ b/go/store/blobstore/git_blobstore_chunked_get_test.go @@ -0,0 +1,101 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobstore + +import ( + "context" + "os/exec" + "testing" + + "github.com/stretchr/testify/require" + + git "github.com/dolthub/dolt/go/store/blobstore/internal/git" + "github.com/dolthub/dolt/go/store/testutils/gitrepo" +) + +func TestGitBlobstore_Get_ChunkedTree_AllAndRanges(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found on PATH") + } + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + part1 := []byte("abc") + part2 := []byte("defgh") + commitOID, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + "chunked/0001": part1, + "chunked/0002": part2, + }, "seed chunked tree") + require.NoError(t, err) + + runner, err := git.NewRunner(repo.GitDir) + require.NoError(t, err) + api := git.NewGitAPIImpl(runner) + treeOID, _, err := api.ResolvePathObject(ctx, git.OID(commitOID), "chunked") + require.NoError(t, err) + + bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) + require.NoError(t, err) + + wantAll := append(append([]byte(nil), part1...), part2...) + + got, ver, err := GetBytes(ctx, bs, "chunked", AllRange) + require.NoError(t, err) + require.Equal(t, treeOID.String(), ver) + require.Equal(t, wantAll, got) + + // Range spanning boundary: offset 2 length 4 => "cdef" + got, ver, err = GetBytes(ctx, bs, "chunked", NewBlobRange(2, 4)) + require.NoError(t, err) + require.Equal(t, treeOID.String(), ver) + require.Equal(t, []byte("cdef"), got) + + // Tail read last 3 bytes => "fgh" + got, ver, err = GetBytes(ctx, bs, "chunked", NewBlobRange(-3, 0)) + require.NoError(t, err) + require.Equal(t, treeOID.String(), ver) + require.Equal(t, []byte("fgh"), got) + + // Validate size returned is logical size. + rc, sz, ver2, err := bs.Get(ctx, "chunked", NewBlobRange(0, 1)) + require.NoError(t, err) + require.Equal(t, uint64(len(wantAll)), sz) + require.Equal(t, treeOID.String(), ver2) + _ = rc.Close() +} + +func TestGitBlobstore_Get_ChunkedTree_InvalidPartsError(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + // Gap: 0001, 0003 + _, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + "chunked/0001": []byte("a"), + "chunked/0003": []byte("b"), + }, "seed invalid chunked tree") + require.NoError(t, err) + + bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) + require.NoError(t, err) + + _, _, err = GetBytes(ctx, bs, "chunked", AllRange) + require.Error(t, err) + require.False(t, IsNotFoundError(err)) +} diff --git a/go/store/blobstore/git_blobstore_chunked_put_test.go b/go/store/blobstore/git_blobstore_chunked_put_test.go new file mode 100644 index 0000000000..3096310bc6 --- /dev/null +++ b/go/store/blobstore/git_blobstore_chunked_put_test.go @@ -0,0 +1,121 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobstore + +import ( + "bytes" + "context" + "testing" + + "github.com/stretchr/testify/require" + + git "github.com/dolthub/dolt/go/store/blobstore/internal/git" + "github.com/dolthub/dolt/go/store/testutils/gitrepo" +) + +func TestGitBlobstore_Put_ChunkedWritesTreeParts(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + Identity: testIdentity(), + MaxPartSize: 3, + }) + require.NoError(t, err) + + want := []byte("abcdefghij") // 10 bytes -> 3,3,3,1 + ver, err := bs.Put(ctx, "big", int64(len(want)), bytes.NewReader(want)) + require.NoError(t, err) + + got, ver2, err := GetBytes(ctx, bs, "big", AllRange) + require.NoError(t, err) + require.Equal(t, ver, ver2) + require.Equal(t, want, got) + + runner, err := git.NewRunner(repo.GitDir) + require.NoError(t, err) + api := git.NewGitAPIImpl(runner) + + head, ok, err := api.TryResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + require.True(t, ok) + + _, typ, err := api.ResolvePathObject(ctx, head, "big") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeTree, typ) + + entries, err := api.ListTree(ctx, head, "big") + require.NoError(t, err) + require.Len(t, entries, 4) + require.Equal(t, "00000001", entries[0].Name) + require.Equal(t, "00000004", entries[3].Name) +} + +func TestGitBlobstore_Put_IdempotentDoesNotChangeExistingRepresentation(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + Identity: testIdentity(), + MaxPartSize: 3, + }) + require.NoError(t, err) + + runner, err := git.NewRunner(repo.GitDir) + require.NoError(t, err) + api := git.NewGitAPIImpl(runner) + + // blob stays blob (even if the caller would have triggered chunked mode) + verBlob, err := bs.Put(ctx, "k", 2, bytes.NewReader([]byte("hi"))) + require.NoError(t, err) + verNoop, err := bs.Put(ctx, "k", 10, putShouldNotRead{}) + require.NoError(t, err) + require.Equal(t, verBlob, verNoop) + + head1, ok, err := api.TryResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + require.True(t, ok) + _, typ, err := api.ResolvePathObject(ctx, head1, "k") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeBlob, typ) + + got, _, err := GetBytes(ctx, bs, "k", AllRange) + require.NoError(t, err) + require.Equal(t, []byte("hi"), got) + + // tree stays tree + verTree, err := bs.Put(ctx, "ktree", 10, bytes.NewReader([]byte("abcdefghij"))) + require.NoError(t, err) + head2, ok, err := api.TryResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + require.True(t, ok) + _, typ, err = api.ResolvePathObject(ctx, head2, "ktree") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeTree, typ) + + verTreeNoop, err := bs.Put(ctx, "ktree", 2, putShouldNotRead{}) + require.NoError(t, err) + require.Equal(t, verTree, verTreeNoop) + + got, _, err = GetBytes(ctx, bs, "ktree", AllRange) + require.NoError(t, err) + require.Equal(t, []byte("abcdefghij"), got) +} diff --git a/go/store/blobstore/git_blobstore_helpers_test.go b/go/store/blobstore/git_blobstore_helpers_test.go new file mode 100644 index 0000000000..8080f4e838 --- /dev/null +++ b/go/store/blobstore/git_blobstore_helpers_test.go @@ -0,0 +1,412 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobstore + +import ( + "bytes" + "context" + "errors" + "io" + "testing" + + "github.com/stretchr/testify/require" + + git "github.com/dolthub/dolt/go/store/blobstore/internal/git" +) + +type fakeGitAPI struct { + tryResolveRefCommit func(ctx context.Context, ref string) (git.OID, bool, error) + resolvePathBlob func(ctx context.Context, commit git.OID, path string) (git.OID, error) + resolvePathObject func(ctx context.Context, commit git.OID, path string) (git.OID, git.ObjectType, error) + listTree func(ctx context.Context, commit git.OID, treePath string) ([]git.TreeEntry, error) + blobSize func(ctx context.Context, oid git.OID) (int64, error) + blobReader func(ctx context.Context, oid git.OID) (io.ReadCloser, error) +} + +func (f fakeGitAPI) TryResolveRefCommit(ctx context.Context, ref string) (git.OID, bool, error) { + return f.tryResolveRefCommit(ctx, ref) +} +func (f fakeGitAPI) ResolveRefCommit(ctx context.Context, ref string) (git.OID, error) { + panic("unexpected call") +} +func (f fakeGitAPI) ResolvePathBlob(ctx context.Context, commit git.OID, path string) (git.OID, error) { + return f.resolvePathBlob(ctx, commit, path) +} +func (f fakeGitAPI) ResolvePathObject(ctx context.Context, commit git.OID, path string) (git.OID, git.ObjectType, error) { + return f.resolvePathObject(ctx, commit, path) +} +func (f fakeGitAPI) ListTree(ctx context.Context, commit git.OID, treePath string) ([]git.TreeEntry, error) { + return f.listTree(ctx, commit, treePath) +} +func (f fakeGitAPI) CatFileType(ctx context.Context, oid git.OID) (string, error) { + panic("unexpected call") +} +func (f fakeGitAPI) BlobSize(ctx context.Context, oid git.OID) (int64, error) { + return f.blobSize(ctx, oid) +} +func (f fakeGitAPI) BlobReader(ctx context.Context, oid git.OID) (io.ReadCloser, error) { + return f.blobReader(ctx, oid) +} +func (f fakeGitAPI) HashObject(ctx context.Context, contents io.Reader) (git.OID, error) { + panic("unexpected call") +} +func (f fakeGitAPI) ReadTree(ctx context.Context, commit git.OID, indexFile string) error { + panic("unexpected call") +} +func (f fakeGitAPI) ReadTreeEmpty(ctx context.Context, indexFile string) error { + panic("unexpected call") +} +func (f fakeGitAPI) UpdateIndexCacheInfo(ctx context.Context, indexFile string, mode string, oid git.OID, path string) error { + panic("unexpected call") +} +func (f fakeGitAPI) RemoveIndexPaths(ctx context.Context, indexFile string, paths []string) error { + panic("unexpected call") +} +func (f fakeGitAPI) WriteTree(ctx context.Context, indexFile string) (git.OID, error) { + panic("unexpected call") +} +func (f fakeGitAPI) CommitTree(ctx context.Context, tree git.OID, parent *git.OID, message string, author *git.Identity) (git.OID, error) { + panic("unexpected call") +} +func (f fakeGitAPI) UpdateRefCAS(ctx context.Context, ref string, newOID git.OID, oldOID git.OID, msg string) error { + panic("unexpected call") +} +func (f fakeGitAPI) UpdateRef(ctx context.Context, ref string, newOID git.OID, msg string) error { + panic("unexpected call") +} + +func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) { + ctx := context.Background() + + t.Run("ok", func(t *testing.T) { + api := fakeGitAPI{ + tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) { + require.Equal(t, DoltDataRef, ref) + return git.OID("0123456789abcdef0123456789abcdef01234567"), true, nil + }, + } + gbs := &GitBlobstore{ref: DoltDataRef, api: api} + + commit, err := gbs.resolveCommitForGet(ctx, "k") + require.NoError(t, err) + require.Equal(t, git.OID("0123456789abcdef0123456789abcdef01234567"), commit) + }) + + t.Run("missingRef_manifestIsNotFound", func(t *testing.T) { + api := fakeGitAPI{ + tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) { + return git.OID(""), false, nil + }, + } + gbs := &GitBlobstore{ref: DoltDataRef, api: api} + + _, err := gbs.resolveCommitForGet(ctx, "manifest") + var nf NotFound + require.ErrorAs(t, err, &nf) + require.Equal(t, "manifest", nf.Key) + }) + + t.Run("missingRef_nonManifestIsRefNotFound", func(t *testing.T) { + api := fakeGitAPI{ + tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) { + return git.OID(""), false, nil + }, + } + gbs := &GitBlobstore{ref: DoltDataRef, api: api} + + _, err := gbs.resolveCommitForGet(ctx, "somekey") + var rnf *git.RefNotFoundError + require.ErrorAs(t, err, &rnf) + require.Equal(t, DoltDataRef, rnf.Ref) + }) + + t.Run("propagatesError", func(t *testing.T) { + sentinel := errors.New("boom") + api := fakeGitAPI{ + tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) { + return git.OID(""), false, sentinel + }, + } + gbs := &GitBlobstore{ref: DoltDataRef, api: api} + + _, err := gbs.resolveCommitForGet(ctx, "k") + require.ErrorIs(t, err, sentinel) + }) +} + +func TestGitBlobstoreHelpers_resolveObjectForGet(t *testing.T) { + ctx := context.Background() + commit := git.OID("0123456789abcdef0123456789abcdef01234567") + + t.Run("ok", func(t *testing.T) { + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + require.Equal(t, commit, gotCommit) + require.Equal(t, "k", path) + return git.OID("89abcdef0123456789abcdef0123456789abcdef"), git.ObjectTypeBlob, nil + }, + } + gbs := &GitBlobstore{api: api} + + oid, typ, err := gbs.resolveObjectForGet(ctx, commit, "k") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeBlob, typ) + require.Equal(t, git.OID("89abcdef0123456789abcdef0123456789abcdef"), oid) + }) + + t.Run("pathNotFoundMapsToNotFound", func(t *testing.T) { + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + return git.OID(""), git.ObjectTypeUnknown, &git.PathNotFoundError{Commit: gotCommit.String(), Path: path} + }, + } + gbs := &GitBlobstore{api: api} + + _, _, err := gbs.resolveObjectForGet(ctx, commit, "k") + var nf NotFound + require.ErrorAs(t, err, &nf) + require.Equal(t, "k", nf.Key) + }) +} + +func TestGitBlobstoreHelpers_resolveBlobSizeForGet(t *testing.T) { + ctx := context.Background() + commit := git.OID("0123456789abcdef0123456789abcdef01234567") + oid := git.OID("89abcdef0123456789abcdef0123456789abcdef") + + t.Run("ok", func(t *testing.T) { + api := fakeGitAPI{ + blobSize: func(ctx context.Context, gotOID git.OID) (int64, error) { + require.Equal(t, oid, gotOID) + return 123, nil + }, + } + gbs := &GitBlobstore{api: api} + + sz, ver, err := gbs.resolveBlobSizeForGet(ctx, commit, oid) + require.NoError(t, err) + require.Equal(t, commit.String(), ver) + require.Equal(t, int64(123), sz) + }) +} + +func TestGitBlobstoreHelpers_validateAndSizeChunkedParts(t *testing.T) { + ctx := context.Background() + + api := fakeGitAPI{ + blobSize: func(ctx context.Context, oid git.OID) (int64, error) { + switch oid { + case "0123456789abcdef0123456789abcdef01234567": + return 3, nil + case "89abcdef0123456789abcdef0123456789abcdef": + return 5, nil + default: + return 0, errors.New("unexpected oid") + } + }, + } + gbs := &GitBlobstore{api: api} + + parts, total, err := gbs.validateAndSizeChunkedParts(ctx, []git.TreeEntry{ + {Name: "0001", Type: git.ObjectTypeBlob, OID: "0123456789abcdef0123456789abcdef01234567"}, + {Name: "0002", Type: git.ObjectTypeBlob, OID: "89abcdef0123456789abcdef0123456789abcdef"}, + }) + require.NoError(t, err) + require.Equal(t, uint64(8), total) + require.Len(t, parts, 2) + require.Equal(t, "0123456789abcdef0123456789abcdef01234567", parts[0].oidHex) + require.Equal(t, uint64(3), parts[0].size) + + _, _, err = gbs.validateAndSizeChunkedParts(ctx, []git.TreeEntry{{Name: "1", Type: git.ObjectTypeBlob, OID: "0123456789abcdef0123456789abcdef01234567"}}) + require.Error(t, err) +} + +func TestGitBlobstoreHelpers_sizeAtCommit(t *testing.T) { + ctx := context.Background() + commit := git.OID("0123456789abcdef0123456789abcdef01234567") + + t.Run("blob", func(t *testing.T) { + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + require.Equal(t, commit, gotCommit) + require.Equal(t, "k", path) + return git.OID("89abcdef0123456789abcdef0123456789abcdef"), git.ObjectTypeBlob, nil + }, + blobSize: func(ctx context.Context, gotOID git.OID) (int64, error) { + require.Equal(t, git.OID("89abcdef0123456789abcdef0123456789abcdef"), gotOID) + return 123, nil + }, + } + gbs := &GitBlobstore{api: api} + sz, err := gbs.sizeAtCommit(ctx, commit, "k") + require.NoError(t, err) + require.Equal(t, uint64(123), sz) + }) + + t.Run("chunkedTree", func(t *testing.T) { + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + require.Equal(t, commit, gotCommit) + require.Equal(t, "k", path) + return git.OID("treeoid"), git.ObjectTypeTree, nil + }, + listTree: func(ctx context.Context, gotCommit git.OID, treePath string) ([]git.TreeEntry, error) { + require.Equal(t, commit, gotCommit) + require.Equal(t, "k", treePath) + return []git.TreeEntry{ + {Name: "0001", Type: git.ObjectTypeBlob, OID: "0123456789abcdef0123456789abcdef01234567"}, + {Name: "0002", Type: git.ObjectTypeBlob, OID: "89abcdef0123456789abcdef0123456789abcdef"}, + }, nil + }, + blobSize: func(ctx context.Context, oid git.OID) (int64, error) { + switch oid { + case "0123456789abcdef0123456789abcdef01234567": + return 3, nil + case "89abcdef0123456789abcdef0123456789abcdef": + return 5, nil + default: + return 0, errors.New("unexpected oid") + } + }, + } + gbs := &GitBlobstore{api: api} + sz, err := gbs.sizeAtCommit(ctx, commit, "k") + require.NoError(t, err) + require.Equal(t, uint64(8), sz) + }) + + t.Run("notFound", func(t *testing.T) { + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + return git.OID(""), git.ObjectTypeUnknown, &git.PathNotFoundError{Commit: gotCommit.String(), Path: path} + }, + } + gbs := &GitBlobstore{api: api} + _, err := gbs.sizeAtCommit(ctx, commit, "missing") + var nf NotFound + require.ErrorAs(t, err, &nf) + require.Equal(t, "missing", nf.Key) + }) +} + +func TestGitBlobstoreHelpers_totalSizeAtCommit_overflowInt64(t *testing.T) { + ctx := context.Background() + commit := git.OID("0123456789abcdef0123456789abcdef01234567") + + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + return git.OID(path + "_oid"), git.ObjectTypeBlob, nil + }, + blobSize: func(ctx context.Context, gotOID git.OID) (int64, error) { + // Make the total exceed int64 max with two sources. + if gotOID == "a_oid" { + return int64(^uint64(0) >> 1), nil // math.MaxInt64 without importing math + } + return 1, nil + }, + } + gbs := &GitBlobstore{api: api} + _, err := gbs.totalSizeAtCommit(ctx, commit, []string{"a", "b"}) + require.Error(t, err) +} + +func TestConcatReadCloser(t *testing.T) { + ctx := context.Background() + closed := map[string]int{} + opened := map[string]int{} + + mk := func(s string) io.ReadCloser { + r := bytes.NewReader([]byte(s)) + return &trackedReadCloser{ + r: r, + onClose: func() { + closed[s]++ + }, + } + } + + crc := &concatReadCloser{ + ctx: ctx, + keys: []string{"a", "b"}, + open: func(ctx context.Context, key string) (io.ReadCloser, error) { + opened[key]++ + if key == "a" { + return mk("hi"), nil + } + return mk("there"), nil + }, + } + + out, err := io.ReadAll(crc) + require.NoError(t, err) + require.Equal(t, "hithere", string(out)) + require.NoError(t, crc.Close()) + require.Equal(t, 1, opened["a"]) + require.Equal(t, 1, opened["b"]) + require.Equal(t, 1, closed["hi"]) + require.Equal(t, 1, closed["there"]) +} + +func TestConcatReadCloser_CloseEarlyClosesCurrent(t *testing.T) { + ctx := context.Background() + closed := map[string]int{} + opened := map[string]int{} + + mk := func(id string, s string) io.ReadCloser { + r := bytes.NewReader([]byte(s)) + return &trackedReadCloser{ + r: r, + onClose: func() { + closed[id]++ + }, + } + } + + crc := &concatReadCloser{ + ctx: ctx, + keys: []string{"a", "b"}, + open: func(ctx context.Context, key string) (io.ReadCloser, error) { + opened[key]++ + if key == "a" { + return mk("a", "hello"), nil + } + return mk("b", "world"), nil + }, + } + + buf := make([]byte, 1) + n, err := crc.Read(buf) + require.Equal(t, 1, n) + require.NoError(t, err) + + require.NoError(t, crc.Close()) + require.Equal(t, 1, opened["a"]) + require.Equal(t, 0, opened["b"], "expected not to open second reader when closing early") + require.Equal(t, 1, closed["a"]) + require.Equal(t, 0, closed["b"]) +} + +type trackedReadCloser struct { + r io.Reader + onClose func() +} + +func (t *trackedReadCloser) Read(p []byte) (int, error) { return t.r.Read(p) } +func (t *trackedReadCloser) Close() error { + if t.onClose != nil { + t.onClose() + } + return nil +} diff --git a/go/store/blobstore/git_blobstore_multipart_test.go b/go/store/blobstore/git_blobstore_multipart_test.go new file mode 100644 index 0000000000..a28636e291 --- /dev/null +++ b/go/store/blobstore/git_blobstore_multipart_test.go @@ -0,0 +1,124 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobstore + +import ( + "bytes" + "context" + "errors" + "io" + "testing" + + "github.com/stretchr/testify/require" + + git "github.com/dolthub/dolt/go/store/blobstore/internal/git" +) + +type trackingReadCloser struct { + io.Reader + closed bool +} + +func (t *trackingReadCloser) Close() error { + t.closed = true + return nil +} + +func TestMultiPartReadCloser_ReadConcatenatesAcrossPartsWithOffsets(t *testing.T) { + ctx := context.Background() + + oid1 := "0123456789abcdef0123456789abcdef01234567" + oid2 := "89abcdef0123456789abcdef0123456789abcdef" + + blobs := map[string][]byte{ + oid1: []byte("hello"), + oid2: []byte("world!"), + } + + api := fakeGitAPI{ + blobReader: func(ctx context.Context, oid git.OID) (io.ReadCloser, error) { + b, ok := blobs[oid.String()] + require.True(t, ok, "unexpected oid %s", oid.String()) + return io.NopCloser(bytes.NewReader(b)), nil + }, + } + + rc := &multiPartReadCloser{ + ctx: ctx, + api: api, + slices: []chunkPartSlice{ + {oidHex: oid1, offset: 1, length: 3}, // "ell" + {oidHex: oid2, offset: 2, length: 3}, // "rld" + }, + } + defer func() { _ = rc.Close() }() + + got, err := io.ReadAll(rc) + require.NoError(t, err) + require.Equal(t, []byte("ellrld"), got) +} + +func TestMultiPartReadCloser_ReadUnexpectedEOFWhenPartShorterThanDeclared(t *testing.T) { + ctx := context.Background() + + oid := "0123456789abcdef0123456789abcdef01234567" + api := fakeGitAPI{ + blobReader: func(ctx context.Context, oid git.OID) (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader([]byte("hi"))), nil // 2 bytes + }, + } + + rc := &multiPartReadCloser{ + ctx: ctx, + api: api, + slices: []chunkPartSlice{ + {oidHex: oid, offset: 0, length: 3}, // expect 3 bytes, only 2 available + }, + } + defer func() { _ = rc.Close() }() + + _, err := io.ReadAll(rc) + require.Error(t, err) + require.True(t, errors.Is(err, io.ErrUnexpectedEOF)) +} + +func TestMultiPartReadCloser_CloseClosesUnderlyingPartReader(t *testing.T) { + ctx := context.Background() + + oid := "0123456789abcdef0123456789abcdef01234567" + underlying := &trackingReadCloser{Reader: bytes.NewReader([]byte("hello"))} + + api := fakeGitAPI{ + blobReader: func(ctx context.Context, oid git.OID) (io.ReadCloser, error) { + return underlying, nil + }, + } + + rc := &multiPartReadCloser{ + ctx: ctx, + api: api, + slices: []chunkPartSlice{ + {oidHex: oid, offset: 0, length: 1}, + }, + } + + // Force the underlying reader to be opened. + buf := make([]byte, 1) + _, err := rc.Read(buf) + require.NoError(t, err) + + require.NoError(t, rc.Close()) + require.True(t, underlying.closed) +} diff --git a/go/store/blobstore/git_blobstore_test.go b/go/store/blobstore/git_blobstore_test.go index a543808572..18b7156ec5 100644 --- a/go/store/blobstore/git_blobstore_test.go +++ b/go/store/blobstore/git_blobstore_test.go @@ -85,6 +85,12 @@ func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) { bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) require.NoError(t, err) + runner, err := git.NewRunner(repo.GitDir) + require.NoError(t, err) + api := git.NewGitAPIImpl(runner) + manifestOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "manifest") + require.NoError(t, err) + ok, err := bs.Exists(ctx, "manifest") require.NoError(t, err) require.True(t, ok) @@ -100,14 +106,14 @@ func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) { got, ver, err := GetBytes(ctx, bs, "manifest", AllRange) require.NoError(t, err) - require.Equal(t, commit, ver) + require.Equal(t, manifestOID.String(), ver) require.Equal(t, want, got) // Validate size + version on Get. rc, sz, ver2, err := bs.Get(ctx, "manifest", NewBlobRange(0, 5)) require.NoError(t, err) require.Equal(t, uint64(len(want)), sz) - require.Equal(t, commit, ver2) + require.Equal(t, manifestOID.String(), ver2) _ = rc.Close() } @@ -149,34 +155,40 @@ func TestGitBlobstore_BlobRangeSemantics(t *testing.T) { bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) require.NoError(t, err) + runner, err := git.NewRunner(repo.GitDir) + require.NoError(t, err) + api := git.NewGitAPIImpl(runner) + rangeOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "range") + require.NoError(t, err) + // full range got, ver, err := GetBytes(ctx, bs, "range", AllRange) require.NoError(t, err) - require.Equal(t, commit, ver) + require.Equal(t, rangeOID.String(), ver) require.Equal(t, rangeData(0, maxValue), got) // first 2048 bytes (1024 shorts) got, ver, err = GetBytes(ctx, bs, "range", NewBlobRange(0, 2048)) require.NoError(t, err) - require.Equal(t, commit, ver) + require.Equal(t, rangeOID.String(), ver) require.Equal(t, rangeData(0, 1024), got) // bytes 2048..4096 of original got, ver, err = GetBytes(ctx, bs, "range", NewBlobRange(2*1024, 2*1024)) require.NoError(t, err) - require.Equal(t, commit, ver) + require.Equal(t, rangeOID.String(), ver) require.Equal(t, rangeData(1024, 2048), got) // last 2048 bytes got, ver, err = GetBytes(ctx, bs, "range", NewBlobRange(-2*1024, 0)) require.NoError(t, err) - require.Equal(t, commit, ver) + require.Equal(t, rangeOID.String(), ver) require.Equal(t, rangeData(maxValue-1024, maxValue), got) // tail slice: beginning 2048 bytes from end, size 512 got, ver, err = GetBytes(ctx, bs, "range", NewBlobRange(-2*1024, 512)) require.NoError(t, err) - require.Equal(t, commit, ver) + require.Equal(t, rangeOID.String(), ver) require.Equal(t, rangeData(maxValue-1024, maxValue-768), got) } @@ -243,7 +255,148 @@ func TestGitBlobstore_Put_RoundTripAndVersion(t *testing.T) { require.Equal(t, want, got) } -func TestGitBlobstore_Put_Overwrite(t *testing.T) { +func TestGitBlobstore_Concatenate_Basic(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + _, err = PutBytes(ctx, bs, "a", []byte("hi ")) + require.NoError(t, err) + _, err = PutBytes(ctx, bs, "b", []byte("there")) + require.NoError(t, err) + + ver, err := bs.Concatenate(ctx, "c", []string{"a", "b"}) + require.NoError(t, err) + require.NotEmpty(t, ver) + + got, ver2, err := GetBytes(ctx, bs, "c", AllRange) + require.NoError(t, err) + require.Equal(t, ver, ver2) + require.Equal(t, []byte("hi there"), got) +} + +func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + Identity: testIdentity(), + MaxPartSize: 1024, + }) + require.NoError(t, err) + + a := bytes.Repeat([]byte("a"), 700) + b := bytes.Repeat([]byte("b"), 700) + want := append(append([]byte(nil), a...), b...) + + _, err = PutBytes(ctx, bs, "a", a) + require.NoError(t, err) + _, err = PutBytes(ctx, bs, "b", b) + require.NoError(t, err) + + ver, err := bs.Concatenate(ctx, "c", []string{"a", "b"}) + require.NoError(t, err) + require.NotEmpty(t, ver) + + // Verify the resulting key is stored as a chunked tree (not a single blob). + head, ok, err := bs.api.TryResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + require.True(t, ok) + oid, typ, err := bs.api.ResolvePathObject(ctx, head, "c") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeTree, typ) + require.Equal(t, oid.String(), ver) + + parts, err := bs.api.ListTree(ctx, head, "c") + require.NoError(t, err) + require.GreaterOrEqual(t, len(parts), 2) + require.Equal(t, "00000001", parts[0].Name) + + got, ver2, err := GetBytes(ctx, bs, "c", AllRange) + require.NoError(t, err) + require.Equal(t, ver, ver2) + require.Equal(t, want, got) +} + +func TestGitBlobstore_Concatenate_KeyExistsFastSucceeds(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + ver1, err := PutBytes(ctx, bs, "c", []byte("original")) + require.NoError(t, err) + require.NotEmpty(t, ver1) + + _, err = PutBytes(ctx, bs, "a", []byte("new ")) + require.NoError(t, err) + _, err = PutBytes(ctx, bs, "b", []byte("value")) + require.NoError(t, err) + + ver2, err := bs.Concatenate(ctx, "c", []string{"a", "b"}) + require.NoError(t, err) + require.Equal(t, ver1, ver2, "expected concatenate to fast-succeed without overwriting existing key") + + got, ver3, err := GetBytes(ctx, bs, "c", AllRange) + require.NoError(t, err) + require.Equal(t, ver1, ver3) + require.Equal(t, []byte("original"), got) +} + +func TestGitBlobstore_Concatenate_MissingSourceIsNotFound(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + _, err = PutBytes(ctx, bs, "present", []byte("x")) + require.NoError(t, err) + + _, err = bs.Concatenate(ctx, "c", []string{"present", "missing"}) + require.Error(t, err) + require.True(t, IsNotFoundError(err)) + var nf NotFound + require.ErrorAs(t, err, &nf) + require.Equal(t, "missing", nf.Key) +} + +func TestGitBlobstore_Concatenate_EmptySourcesErrors(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + _, err = bs.Concatenate(ctx, "c", nil) + require.Error(t, err) +} + +type putShouldNotRead struct{} + +func (putShouldNotRead) Read(_ []byte) (int, error) { + return 0, errors.New("read should not be called") +} + +func TestGitBlobstore_Put_IdempotentIfKeyExists(t *testing.T) { requireGitOnPath(t) ctx := context.Background() @@ -257,15 +410,14 @@ func TestGitBlobstore_Put_Overwrite(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, ver1) - ver2, err := PutBytes(ctx, bs, "k", []byte("v2\n")) + ver2, err := bs.Put(ctx, "k", 3, putShouldNotRead{}) require.NoError(t, err) - require.NotEmpty(t, ver2) - require.NotEqual(t, ver1, ver2) + require.Equal(t, ver1, ver2) got, ver3, err := GetBytes(ctx, bs, "k", AllRange) require.NoError(t, err) - require.Equal(t, ver2, ver3) - require.Equal(t, []byte("v2\n"), got) + require.Equal(t, ver1, ver3) + require.Equal(t, []byte("v1\n"), got) } type hookGitAPI struct { @@ -431,8 +583,14 @@ func TestGitBlobstore_CheckAndPut_MismatchDoesNotRead(t *testing.T) { bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) + runner, err := git.NewRunner(repo.GitDir) + require.NoError(t, err) + api := git.NewGitAPIImpl(runner) + keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k") + require.NoError(t, err) + r := &failReader{} - _, err = bs.CheckAndPut(ctx, commit+"-wrong", "k", 1, r) + _, err = bs.CheckAndPut(ctx, keyOID.String()+"-wrong", "k", 1, r) require.Error(t, err) require.True(t, IsCheckAndPutError(err)) require.False(t, r.called.Load(), "expected reader not to be consumed on version mismatch") @@ -454,11 +612,17 @@ func TestGitBlobstore_CheckAndPut_UpdateSuccess(t *testing.T) { bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) + runner, err := git.NewRunner(repo.GitDir) + require.NoError(t, err) + api := git.NewGitAPIImpl(runner) + keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k") + require.NoError(t, err) + want := []byte("updated\n") - ver2, err := bs.CheckAndPut(ctx, commit, "k", int64(len(want)), bytes.NewReader(want)) + ver2, err := bs.CheckAndPut(ctx, keyOID.String(), "k", int64(len(want)), bytes.NewReader(want)) require.NoError(t, err) require.NotEmpty(t, ver2) - require.NotEqual(t, commit, ver2) + require.NotEqual(t, keyOID.String(), ver2) got, ver3, err := GetBytes(ctx, bs, "k", AllRange) require.NoError(t, err) @@ -470,7 +634,7 @@ func TestGitBlobstore_CheckAndPut_UpdateSuccess(t *testing.T) { require.Equal(t, []byte("keep\n"), got) } -func TestGitBlobstore_CheckAndPut_ConcurrentUpdateReturnsMismatch(t *testing.T) { +func TestGitBlobstore_CheckAndPut_ConcurrentUnrelatedUpdateStillSucceeds(t *testing.T) { requireGitOnPath(t) ctx := context.Background() @@ -485,6 +649,12 @@ func TestGitBlobstore_CheckAndPut_ConcurrentUpdateReturnsMismatch(t *testing.T) bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) + runner, err := git.NewRunner(repo.GitDir) + require.NoError(t, err) + api := git.NewGitAPIImpl(runner) + keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k") + require.NoError(t, err) + origAPI := bs.api h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef} h.onFirstCAS = func(ctx context.Context, old git.OID) { @@ -493,12 +663,17 @@ func TestGitBlobstore_CheckAndPut_ConcurrentUpdateReturnsMismatch(t *testing.T) } bs.api = h - _, err = bs.CheckAndPut(ctx, commit, "k", 0, bytes.NewReader([]byte("mine\n"))) - require.Error(t, err) - require.True(t, IsCheckAndPutError(err)) - - // Verify key did not change, since our CAS should have failed. - got, _, err := GetBytes(ctx, bs, "k", AllRange) + ver2, err := bs.CheckAndPut(ctx, keyOID.String(), "k", 0, bytes.NewReader([]byte("mine\n"))) require.NoError(t, err) - require.Equal(t, []byte("base\n"), got) + require.NotEmpty(t, ver2) + require.NotEqual(t, keyOID.String(), ver2) + + got, ver3, err := GetBytes(ctx, bs, "k", AllRange) + require.NoError(t, err) + require.Equal(t, ver2, ver3) + require.Equal(t, []byte("mine\n"), got) + + got, _, err = GetBytes(ctx, bs, "external", AllRange) + require.NoError(t, err) + require.Equal(t, []byte("external\n"), got) } diff --git a/go/store/blobstore/internal/git/api.go b/go/store/blobstore/internal/git/api.go index d767f5bc81..cabda14d60 100644 --- a/go/store/blobstore/internal/git/api.go +++ b/go/store/blobstore/internal/git/api.go @@ -19,6 +19,17 @@ import ( "io" ) +// ObjectType is a git object type returned by plumbing (e.g. "blob", "tree"). +type ObjectType string + +const ( + ObjectTypeUnknown ObjectType = "" + ObjectTypeBlob ObjectType = "blob" + ObjectTypeTree ObjectType = "tree" + ObjectTypeCommit ObjectType = "commit" + ObjectTypeTag ObjectType = "tag" +) + // GitAPI defines the git plumbing operations needed by GitBlobstore. It includes both // read and write operations to allow swapping implementations (e.g. git CLI vs a Go git // library) while keeping callers stable. @@ -34,6 +45,16 @@ type GitAPI interface { // resolves to a non-blob object. ResolvePathBlob(ctx context.Context, commit OID, path string) (OID, error) + // ResolvePathObject resolves |path| within |commit| to an object OID and type. + // It returns PathNotFoundError if the path does not exist. + ResolvePathObject(ctx context.Context, commit OID, path string) (oid OID, typ ObjectType, err error) + + // ListTree lists the entries of the tree at |treePath| within |commit|. + // The listing is non-recursive: it returns only immediate children. + // + // It returns PathNotFoundError if |treePath| does not exist. + ListTree(ctx context.Context, commit OID, treePath string) ([]TreeEntry, error) + // CatFileType returns the git object type for |oid| (e.g. "blob", "tree", "commit"). CatFileType(ctx context.Context, oid OID) (string, error) @@ -63,6 +84,11 @@ type GitAPI interface { // GIT_DIR=... GIT_INDEX_FILE= git update-index --add --cacheinfo UpdateIndexCacheInfo(ctx context.Context, indexFile string, mode string, oid OID, path string) error + // RemoveIndexPaths removes |paths| from |indexFile| if present. + // Equivalent plumbing: + // GIT_DIR=... GIT_INDEX_FILE= git update-index --remove -z --stdin + RemoveIndexPaths(ctx context.Context, indexFile string, paths []string) error + // WriteTree writes a tree object from the contents of |indexFile| and returns its oid. // Equivalent plumbing: // GIT_DIR=... GIT_INDEX_FILE= git write-tree @@ -84,6 +110,14 @@ type GitAPI interface { UpdateRef(ctx context.Context, ref string, newOID OID, msg string) error } +// TreeEntry describes one entry in a git tree listing. +type TreeEntry struct { + Mode string + Type ObjectType + OID OID + Name string +} + // Identity represents git author/committer metadata. A future implementation may set // this via environment variables (GIT_AUTHOR_NAME, etc.). type Identity struct { diff --git a/go/store/blobstore/internal/git/impl.go b/go/store/blobstore/internal/git/impl.go index 53d63edeb8..8e78ccfaf6 100644 --- a/go/store/blobstore/internal/git/impl.go +++ b/go/store/blobstore/internal/git/impl.go @@ -88,6 +88,62 @@ func (a *GitAPIImpl) ResolvePathBlob(ctx context.Context, commit OID, path strin return OID(oid), nil } +func (a *GitAPIImpl) ResolvePathObject(ctx context.Context, commit OID, path string) (oid OID, typ ObjectType, err error) { + spec := commit.String() + ":" + path + out, err := a.r.Run(ctx, RunOptions{}, "rev-parse", "--verify", spec) + if err != nil { + if isPathNotFoundErr(err) { + return "", ObjectTypeUnknown, &PathNotFoundError{Commit: commit.String(), Path: path} + } + return "", ObjectTypeUnknown, err + } + oidStr := strings.TrimSpace(string(out)) + if oidStr == "" { + return "", ObjectTypeUnknown, fmt.Errorf("git rev-parse returned empty oid for %q", spec) + } + + typStr, err := a.CatFileType(ctx, OID(oidStr)) + if err != nil { + return "", ObjectTypeUnknown, err + } + return OID(oidStr), ObjectType(typStr), nil +} + +func (a *GitAPIImpl) ListTree(ctx context.Context, commit OID, treePath string) ([]TreeEntry, error) { + // Note: `git ls-tree ` accepts a tree-ish of the form ":". + // Use that to list children of a tree path without needing to pre-resolve the tree OID. + spec := commit.String() + if treePath != "" { + spec = spec + ":" + treePath + } else { + spec = spec + "^{tree}" + } + + out, err := a.r.Run(ctx, RunOptions{}, "ls-tree", spec) + if err != nil { + if isPathNotFoundErr(err) && treePath != "" { + return nil, &PathNotFoundError{Commit: commit.String(), Path: treePath} + } + return nil, err + } + lines := strings.Split(strings.TrimRight(string(out), "\n"), "\n") + if len(lines) == 1 && strings.TrimSpace(lines[0]) == "" { + return nil, nil + } + entries := make([]TreeEntry, 0, len(lines)) + for _, line := range lines { + if strings.TrimSpace(line) == "" { + continue + } + e, err := parseLsTreeLine(line) + if err != nil { + return nil, err + } + entries = append(entries, e) + } + return entries, nil +} + func (a *GitAPIImpl) CatFileType(ctx context.Context, oid OID) (string, error) { out, err := a.r.Run(ctx, RunOptions{}, "cat-file", "-t", oid.String()) if err != nil { @@ -141,6 +197,26 @@ func (a *GitAPIImpl) UpdateIndexCacheInfo(ctx context.Context, indexFile string, return err } +func (a *GitAPIImpl) RemoveIndexPaths(ctx context.Context, indexFile string, paths []string) error { + if len(paths) == 0 { + return nil + } + var buf bytes.Buffer + // `git update-index --remove` is about removing *missing worktree files*, and requires a worktree. + // For bare repos / index-only workflows, use `--index-info` to remove paths by writing mode "0". + // + // Format: + // \t\n + // To remove: + // 0 0000000000000000000000000000000000000000 0\t\n + const zeroOID = "0000000000000000000000000000000000000000" + for _, p := range paths { + fmt.Fprintf(&buf, "0 %s 0\t%s\n", zeroOID, p) + } + _, err := a.r.Run(ctx, RunOptions{IndexFile: indexFile, Stdin: &buf}, "update-index", "--index-info") + return err +} + func (a *GitAPIImpl) WriteTree(ctx context.Context, indexFile string) (OID, error) { out, err := a.r.Run(ctx, RunOptions{IndexFile: indexFile}, "write-tree") if err != nil { @@ -243,3 +319,24 @@ func isPathNotFoundErr(err error) bool { } return false } + +func parseLsTreeLine(line string) (TreeEntry, error) { + // Format (one entry): + // SP SP \t + // Example: + // 100644 blob e69de29bb2d1d6434b8b29ae775ad8c2e48c5391\tfile.txt + parts := strings.SplitN(line, "\t", 2) + if len(parts) != 2 { + return TreeEntry{}, fmt.Errorf("git ls-tree: malformed line %q", line) + } + left := strings.Fields(parts[0]) + if len(left) != 3 { + return TreeEntry{}, fmt.Errorf("git ls-tree: malformed line %q", line) + } + return TreeEntry{ + Mode: left[0], + Type: ObjectType(left[1]), + OID: OID(left[2]), + Name: parts[1], + }, nil +} diff --git a/go/store/blobstore/internal/git/impl_test.go b/go/store/blobstore/internal/git/impl_test.go index 8219f27ce1..216cb880c7 100644 --- a/go/store/blobstore/internal/git/impl_test.go +++ b/go/store/blobstore/internal/git/impl_test.go @@ -366,6 +366,216 @@ func TestGitAPIImpl_UpdateIndexCacheInfo_FileDirectoryConflictErrors(t *testing. } } +func TestGitAPIImpl_ResolvePathObject_BlobAndTree(t *testing.T) { + t.Parallel() + + ctx := context.Background() + repo, err := gitrepo.InitBareTemp(ctx, "") + if err != nil { + t.Fatal(err) + } + + r, err := NewRunner(repo.GitDir) + if err != nil { + t.Fatal(err) + } + api := NewGitAPIImpl(r) + + indexFile := tempIndexFile(t) + if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { + t.Fatal(err) + } + + blobOID, err := api.HashObject(ctx, bytes.NewReader([]byte("hi\n"))) + if err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, "dir/file.txt"); err != nil { + t.Fatal(err) + } + treeOID, err := api.WriteTree(ctx, indexFile) + if err != nil { + t.Fatal(err) + } + commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed", testAuthor()) + if err != nil { + t.Fatal(err) + } + + gotOID, gotTyp, err := api.ResolvePathObject(ctx, commitOID, "dir/file.txt") + if err != nil { + t.Fatal(err) + } + if gotTyp != ObjectTypeBlob { + t.Fatalf("expected type blob, got %q", gotTyp) + } + if gotOID != blobOID { + t.Fatalf("expected oid %q, got %q", blobOID, gotOID) + } + + _, gotTyp, err = api.ResolvePathObject(ctx, commitOID, "dir") + if err != nil { + t.Fatal(err) + } + if gotTyp != ObjectTypeTree { + t.Fatalf("expected type tree, got %q", gotTyp) + } +} + +func TestGitAPIImpl_ListTree_NonRecursive(t *testing.T) { + t.Parallel() + + ctx := context.Background() + repo, err := gitrepo.InitBareTemp(ctx, "") + if err != nil { + t.Fatal(err) + } + + r, err := NewRunner(repo.GitDir) + if err != nil { + t.Fatal(err) + } + api := NewGitAPIImpl(r) + + indexFile := tempIndexFile(t) + if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { + t.Fatal(err) + } + + oidA, err := api.HashObject(ctx, bytes.NewReader([]byte("a\n"))) + if err != nil { + t.Fatal(err) + } + oidB, err := api.HashObject(ctx, bytes.NewReader([]byte("b\n"))) + if err != nil { + t.Fatal(err) + } + oidX, err := api.HashObject(ctx, bytes.NewReader([]byte("x\n"))) + if err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidA, "dir/a.txt"); err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidB, "dir/b.txt"); err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidX, "dir/sub/x.txt"); err != nil { + t.Fatal(err) + } + + treeOID, err := api.WriteTree(ctx, indexFile) + if err != nil { + t.Fatal(err) + } + commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed", testAuthor()) + if err != nil { + t.Fatal(err) + } + + entries, err := api.ListTree(ctx, commitOID, "dir") + if err != nil { + t.Fatal(err) + } + // Expect: a.txt (blob), b.txt (blob), sub (tree) + if len(entries) != 3 { + t.Fatalf("expected 3 entries, got %d: %+v", len(entries), entries) + } + + var gotA, gotB, gotSub bool + for _, e := range entries { + switch e.Name { + case "a.txt": + gotA = true + if e.Type != ObjectTypeBlob || e.OID != oidA { + t.Fatalf("unexpected a.txt entry: %+v", e) + } + case "b.txt": + gotB = true + if e.Type != ObjectTypeBlob || e.OID != oidB { + t.Fatalf("unexpected b.txt entry: %+v", e) + } + case "sub": + gotSub = true + if e.Type != ObjectTypeTree || e.OID == "" { + t.Fatalf("unexpected sub entry: %+v", e) + } + default: + t.Fatalf("unexpected entry: %+v", e) + } + } + if !gotA || !gotB || !gotSub { + t.Fatalf("missing expected entries: gotA=%v gotB=%v gotSub=%v", gotA, gotB, gotSub) + } +} + +func TestGitAPIImpl_RemoveIndexPaths_RemovesFromIndex(t *testing.T) { + t.Parallel() + + ctx := context.Background() + repo, err := gitrepo.InitBareTemp(ctx, "") + if err != nil { + t.Fatal(err) + } + + r, err := NewRunner(repo.GitDir) + if err != nil { + t.Fatal(err) + } + api := NewGitAPIImpl(r) + + indexFile := tempIndexFile(t) + if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { + t.Fatal(err) + } + + oidA, err := api.HashObject(ctx, bytes.NewReader([]byte("a\n"))) + if err != nil { + t.Fatal(err) + } + oidB, err := api.HashObject(ctx, bytes.NewReader([]byte("b\n"))) + if err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidA, "a.txt"); err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidB, "b.txt"); err != nil { + t.Fatal(err) + } + + if err := api.RemoveIndexPaths(ctx, indexFile, []string{"a.txt"}); err != nil { + t.Fatal(err) + } + + treeOID, err := api.WriteTree(ctx, indexFile) + if err != nil { + t.Fatal(err) + } + commitOID, err := api.CommitTree(ctx, treeOID, nil, "seed", testAuthor()) + if err != nil { + t.Fatal(err) + } + + // a.txt removed, b.txt still present + _, err = api.ResolvePathBlob(ctx, commitOID, "a.txt") + if err == nil { + t.Fatalf("expected a.txt missing") + } + var pnf *PathNotFoundError + if !errors.As(err, &pnf) { + t.Fatalf("expected PathNotFoundError, got %T: %v", err, err) + } + + gotB, err := api.ResolvePathBlob(ctx, commitOID, "b.txt") + if err != nil { + t.Fatal(err) + } + if gotB != oidB { + t.Fatalf("expected b.txt oid %q, got %q", oidB, gotB) + } +} + func TestGitAPIImpl_ReadTree_PreservesExistingPaths(t *testing.T) { t.Parallel() diff --git a/go/store/nbs/git_blobstore_read_smoke_test.go b/go/store/nbs/git_blobstore_read_smoke_test.go index acae17284a..8be83c3e0c 100644 --- a/go/store/nbs/git_blobstore_read_smoke_test.go +++ b/go/store/nbs/git_blobstore_read_smoke_test.go @@ -85,13 +85,22 @@ func TestGitBlobstoreReadSmoke_ManifestAndTableAccessPatterns(t *testing.T) { rc, totalSz, ver, err := bs.Get(ctx, "table", blobstore.NewBlobRange(-tailN, 0)) require.NoError(t, err) require.Equal(t, uint64(len(table)), totalSz) - require.Equal(t, commit, ver) + require.NotEmpty(t, ver) tail := make([]byte, tailN) _, err = io.ReadFull(rc, tail) require.NoError(t, err) require.NoError(t, rc.Close()) require.Equal(t, table[len(table)-tailN:], tail) + // Per-key version should be stable across reads. + rc2, _, ver2, err := bs.Get(ctx, "table", blobstore.AllRange) + require.NoError(t, err) + // Drain before close to avoid broken-pipe errors from killing git early. + _, err = io.Copy(io.Discard, rc2) + require.NoError(t, err) + require.NoError(t, rc2.Close()) + require.Equal(t, ver, ver2) + // 3) ReadAt-style ranged reads used by table readers. tr := &bsTableReaderAt{bs: bs, key: "table"} out := make([]byte, 4096) diff --git a/integration-tests/go-sql-server-driver/concurrent_writes_test.go b/integration-tests/go-sql-server-driver/concurrent_writes_test.go new file mode 100644 index 0000000000..da6c203bfb --- /dev/null +++ b/integration-tests/go-sql-server-driver/concurrent_writes_test.go @@ -0,0 +1,134 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" +) + +// TestConcurrentWrites verifies concurrent write behavior and transaction locking in the SQL server driver. +func TestConcurrentWrites(t *testing.T) { + t.Parallel() + var ports DynamicResources + ports.global = &GlobalPorts + ports.t = t + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + + rs, err := u.MakeRepoStore() + require.NoError(t, err) + + repo, err := rs.MakeRepo("concurrent_writes_test") + require.NoError(t, err) + + srvSettings := &driver.Server{ + Args: []string{"-P", `{{get_port "server_port"}}`}, + DynamicPort: "server_port", + } + server := MakeServer(t, repo, srvSettings, &ports) + server.DBName = "concurrent_writes_test" + + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + db.SetMaxIdleConns(0) + defer func() { + require.NoError(t, db.Close()) + }() + ctx := t.Context() + func() { + conn, err := db.Conn(ctx) + require.NoError(t, err) + defer conn.Close() + // Create table and initial data. + _, err = conn.ExecContext(ctx, "CREATE TABLE data (id VARCHAR(64) PRIMARY KEY, worker INT, data TEXT, created_at TIMESTAMP)") + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "CALL DOLT_COMMIT('-Am', 'init with table')") + require.NoError(t, err) + }() + + eg, ctx := errgroup.WithContext(ctx) + start := time.Now() + + nextInt := uint32(0) + const numWriters = 32 + const testDuration = 8 * time.Second + startCh := make(chan struct{}) + for i := range numWriters { + eg.Go(func() error { + select { + case <-startCh: + case <-ctx.Done(): + return nil + } + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + defer db.Close() + db.SetMaxOpenConns(1) + conn, err := db.Conn(ctx) + if err != nil { + return err + } + defer conn.Close() + j := 0 + for { + if time.Since(start) > testDuration { + return nil + } + if ctx.Err() != nil { + return nil + } + key := fmt.Sprintf("main-%d-%d", i, j) + _, err := conn.ExecContext(ctx, "INSERT INTO data VALUES (?,?,?,?)", key, i, key, time.Now()) + if err != nil { + return err + } + atomic.AddUint32(&nextInt, 1) + _, err = conn.ExecContext(ctx, fmt.Sprintf("CALL DOLT_COMMIT('-Am', 'insert %s')", key)) + if err != nil { + return err + } + j += 1 + } + }) + } + time.Sleep(500 * time.Millisecond) + close(startCh) + require.NoError(t, eg.Wait()) + t.Logf("wrote %d", nextInt) + ctx = t.Context() + conn, err := db.Conn(ctx) + require.NoError(t, err) + defer func () { + require.NoError(t, conn.Close()) + }() + var i int + err = conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM data").Scan(&i) + require.NoError(t, err) + t.Logf("read %d", i) + err = conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM dolt_log").Scan(&i) + require.NoError(t, err) + t.Logf("created %d commits", i) +} diff --git a/integration-tests/go-sql-server-driver/repro_10331_test.go b/integration-tests/go-sql-server-driver/repro_10331_test.go index 516095c96a..04809822ce 100644 --- a/integration-tests/go-sql-server-driver/repro_10331_test.go +++ b/integration-tests/go-sql-server-driver/repro_10331_test.go @@ -15,11 +15,6 @@ package main import ( - // "context" - // "database/sql" - // sqldriver "database/sql/driver" - // "fmt" - // "strings" "crypto/rand" "encoding/base64" "fmt" @@ -29,14 +24,9 @@ import ( "testing" "time" - // "time" - - // "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" - // "golang.org/x/sync/errgroup" - driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" )