mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-24 19:49:43 -05:00
validate parent closure
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -388,7 +389,7 @@ func fsckOnChunkStore(ctx context.Context, gs *nbs.GenerationalNBS, errs *[]erro
|
||||
|
||||
vs := types.NewValueStore(gs)
|
||||
|
||||
commitReachableChunks, err := validateCommitTrees(ctx, vs, &reachableCommits, progress, appendErr)
|
||||
commitReachableChunks, err := validateCommitTrees(ctx, vs, gs, &reachableCommits, progress, appendErr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("commit tree validation failed: %w", err)
|
||||
}
|
||||
@@ -549,13 +550,15 @@ func (rt *roundTripper) decodeMsg(chk chunks.Chunk) string {
|
||||
func validateCommitTrees(
|
||||
ctx context.Context,
|
||||
vs *types.ValueStore,
|
||||
cs chunks.ChunkStore,
|
||||
reachableCommits *hash.HashSet,
|
||||
progress chan FsckProgressMessage,
|
||||
appendErr func(error),
|
||||
) (*hash.HashSet, error) {
|
||||
|
||||
reachableChunks := &hash.HashSet{}
|
||||
treeScnr := newTreeScanner(vs, reachableChunks, appendErr, progress)
|
||||
ns := tree.NewNodeStore(cs)
|
||||
treeScnr := newTreeScanner(vs, ns, reachableChunks, appendErr, progress)
|
||||
|
||||
totalCommits := len(*reachableCommits)
|
||||
processedCommits := 0
|
||||
@@ -601,6 +604,7 @@ func validateCommitTrees(
|
||||
// treeScanner walks and validates prolly tree structures.
|
||||
type treeScanner struct {
|
||||
vs *types.ValueStore
|
||||
ns tree.NodeStore
|
||||
// avoid re-validation of chunks we've already seen. If the chunk hash is in this map, we've already validated it.
|
||||
// If the value is non-nil, it indicates an error was found during prior validation.
|
||||
history map[hash.Hash]*error
|
||||
@@ -609,9 +613,10 @@ type treeScanner struct {
|
||||
progress chan FsckProgressMessage
|
||||
}
|
||||
|
||||
func newTreeScanner(vs *types.ValueStore, reachableChunks *hash.HashSet, appendErr func(error), progress chan FsckProgressMessage) *treeScanner {
|
||||
func newTreeScanner(vs *types.ValueStore, ns tree.NodeStore, reachableChunks *hash.HashSet, appendErr func(error), progress chan FsckProgressMessage) *treeScanner {
|
||||
return &treeScanner{
|
||||
vs: vs,
|
||||
ns: ns,
|
||||
history: make(map[hash.Hash]*error),
|
||||
appendErr: appendErr,
|
||||
reachableChunks: reachableChunks,
|
||||
@@ -683,6 +688,33 @@ func (ts *treeScanner) processCommitContent(
|
||||
ts.appendErr(fmt.Errorf("::commit:%s: missing data. failed to read commit closure %s", commitHash.String(), parentClosureHash.String()))
|
||||
} else {
|
||||
// All hashes in the closure should be reachable commits.
|
||||
// Use the proper commit closure approach instead of WalkAddrs
|
||||
closure, err := datas.NewParentsClosure(ctx, nil, serialMsg, ts.vs, ts.ns)
|
||||
if err != nil {
|
||||
ts.appendErr(fmt.Errorf("commit %s failed to load parent closure %s: %w", commitHash.String(), parentClosureHash.String(), err))
|
||||
} else if !closure.IsEmpty() {
|
||||
// Verify all commits in the closure are things we expect to see.
|
||||
iter, err := closure.IterAllReverse(ctx)
|
||||
if err != nil {
|
||||
ts.appendErr(fmt.Errorf("commit %s failed to iterate parent closure %s: %w", commitHash.String(), parentClosureHash.String(), err))
|
||||
} else {
|
||||
for {
|
||||
key, _, err := iter.Next(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
ts.appendErr(fmt.Errorf("commit %s error iterating parent closure %s: %w", commitHash.String(), parentClosureHash.String(), err))
|
||||
break
|
||||
}
|
||||
|
||||
closureCommitAddr := key.Addr()
|
||||
if !reachableCommits.Has(closureCommitAddr) {
|
||||
ts.appendErr(fmt.Errorf("commit %s parent closure references unreachable commit %s", commitHash.String(), closureCommitAddr.String()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if len(parentAddrs) != 0 {
|
||||
// Empty closure should happen only for root commits. Make sure there are no parents.
|
||||
@@ -801,7 +833,7 @@ func (ts *treeScanner) validateTree(
|
||||
// walkCommitDAGFromRefs loads all branches/tags and walks the commit DAG to find reachable commits
|
||||
// This is lightweight - only validates commit objects, parent closures, and parent hashes (no trees)
|
||||
func walkCommitDAGFromRefs(ctx context.Context, gs *nbs.GenerationalNBS, allCommits *hash.HashSet, progress chan FsckProgressMessage, appendErr func(error)) (hash.HashSet, error) {
|
||||
startingCommits, err := getRawReferencesFromStoreRoot(ctx, gs, progress, appendErr)
|
||||
startingCommits, err := getRawReferencesFromStoreRoot(ctx, gs, appendErr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get references from store root: %w", err)
|
||||
}
|
||||
@@ -869,7 +901,7 @@ func walkCommitDAGFromRefs(ctx context.Context, gs *nbs.GenerationalNBS, allComm
|
||||
|
||||
// getRawReferencesFromStoreRoot accesses raw references from the chunk store root.
|
||||
// Returns a map from commit hash to list of reference names that point to it.
|
||||
func getRawReferencesFromStoreRoot(ctx context.Context, cs chunks.ChunkStore, progress chan FsckProgressMessage, appendErr func(error)) (map[hash.Hash][]string, error) {
|
||||
func getRawReferencesFromStoreRoot(ctx context.Context, cs chunks.ChunkStore, appendErr func(error)) (map[hash.Hash][]string, error) {
|
||||
// Get the root hash from the chunk store
|
||||
rootHash, err := cs.Root(ctx)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user