Complain when a closure object is missing

This commit is contained in:
Neil Macneale IV
2025-12-11 00:56:40 +00:00
parent 8dbfc8b68a
commit 9d13fc4031
+49 -48
View File
@@ -283,7 +283,7 @@ func validateCommitDAGAndTrackChunks(ctx context.Context, vs *types.ValueStore,
// processSerialCommitAndTrack handles validation and tracking of new-style SerialMessage (flatbuffer) commits
func processSerialCommitAndTrack(
ctx context.Context,
commitHash hash.Hash,
commitHash hash.Hash, // Use just for error messages.
vs *types.ValueStore,
serialMsg types.SerialMessage,
commitQueue *[]hash.Hash,
@@ -325,10 +325,26 @@ func processSerialCommitAndTrack(
// Get and track the parent closure (commit closure) reference
parentClosureBytes := commit.ParentClosureBytes()
if len(parentClosureBytes) == hash.ByteLen {
// NM4 - TODO: validate we see all these parents during commit DAG traversal
parentClosureHash := hash.New(parentClosureBytes)
reachableChunks.Insert(parentClosureHash) // Mark parent closure as reachable
progress <- fmt.Sprintf("Tracking parent closure: %s", parentClosureHash.String())
if !parentClosureHash.IsEmpty() {
reachableChunks.Insert(parentClosureHash) // Mark parent closure as reachable
// Closure will be a single object
value, err := vs.ReadValue(ctx, parentClosureHash)
if err != nil {
appendErr(fmt.Errorf("Commit %s is missing data. Failed to read commit closure %s: %w", commitHash.String(), parentClosureHash.String(), err))
} else if value == nil {
appendErr(fmt.Errorf("Commit %s is missing data. Failed to read commit closure %s", commitHash.String(), parentClosureHash.String()))
} else {
// NM4 - TODO: Validate closure contents. We should probably do this after we walk the graph,
// then confirm all parents were seen.
// progress <- fmt.Sprintf("Found commit closure: %s", parentClosureHash.String())
}
} else if len(parentAddrs) != 0 {
// Empty closure should happen only for root commits. Make sure there are no parents.
appendErr(fmt.Errorf("commit %s has empty parent closure but has %d parents", commitHash.String(), len(parentAddrs)))
}
} else {
panic(fmt.Sprintf("invalid parent closure length: %d", len(parentClosureBytes)))
}
@@ -340,8 +356,17 @@ func processSerialCommitAndTrack(
}
// validateTreeAndTrack performs breadth-first validation of a tree structure and tracks all reachable chunks
func validateTreeAndTrack(ctx context.Context, vs *types.ValueStore, commitHash, treeHash hash.Hash, progress chan string, appendErr func(error), reachableChunks *hash.HashSet) error {
progress <- fmt.Sprintf("Validating tree: %s", treeHash.String())
func validateTreeAndTrack(
ctx context.Context,
vs *types.ValueStore,
commitHash, // Use just for error messages.
treeHash hash.Hash,
progress chan string,
appendErr func(error),
reachableChunks *hash.HashSet,
) error {
progress <- fmt.Sprintf("Validating commit %s's data tree: %s", commitHash.String(), treeHash.String())
// Queue for tree entries to process (breadth-first)
treeQueue := []hash.Hash{treeHash}
@@ -536,7 +561,7 @@ func fsckOnChunkStore(ctx context.Context, ddb *doltdb.DoltDB, gs *nbs.Generatio
// Perform commit DAG validation from all branch HEADs and tags to identify unreachable chunks
progress <- "--------------- All Objects scanned. Starting commit DAG validation ---------------"
var references map[hash.Hash][]string
references := make(map[hash.Hash][]string)
// helper to get refs, resolve them to commits, and add them to startCommits/references.
// The callback is expected to work, we'll error out if it doesn't. resolving commits is less certain though,
@@ -561,7 +586,6 @@ func fsckOnChunkStore(ctx context.Context, ddb *doltdb.DoltDB, gs *nbs.Generatio
}
return nil
}
err = refGetter(ddb.GetBranches)
if err != nil {
return nil, fmt.Errorf("failed to get branches: %w", err)
@@ -586,19 +610,19 @@ func fsckOnChunkStore(ctx context.Context, ddb *doltdb.DoltDB, gs *nbs.Generatio
if len(startCommits) > 0 {
progress <- fmt.Sprintf("Starting commit DAG validation from %d local branches, remote branches, and tags...", len(startCommits))
reachableChunks := make(hash.HashSet)
reachedChunks := make(hash.HashSet)
err = validateCommitDAGAndTrackChunks(ctx, vs, startCommits, progress, appendErr, &reachableChunks)
err = validateCommitDAGAndTrackChunks(ctx, vs, startCommits, progress, appendErr, &reachedChunks)
if err != nil {
appendErr(fmt.Errorf("commit DAG validation failed: %w", err))
}
// Report unreachable chunks (excluding essential repository infrastructure)
// Report unreachable chunks grouped by message type and count. NM4 - include size.
typeMap := make(map[string]int)
unreachableCount := 0
infrastructureCount := 0
for chunkHash := range allChunks {
if !reachableChunks.Has(chunkHash) {
// Try to read the chunk to determine if it's infrastructure
if !reachedChunks.Has(chunkHash) {
// Try to read the chunk to determine it's type. We'll summarize.
chunkValue, err := vs.ReadValue(ctx, chunkHash)
if err != nil || chunkValue == nil {
// Highly suspect, as allChunks contains chunks which we loaded and verified their address.
@@ -606,45 +630,22 @@ func fsckOnChunkStore(ctx context.Context, ddb *doltdb.DoltDB, gs *nbs.Generatio
continue
}
// Check if this is essential repository infrastructure
isInfrastructure := false
if serialMsg, ok := chunkValue.(types.SerialMessage); ok {
// Check for StoreRoot and WorkingSet chunks using proper file ID
id := serial.GetFileID(serialMsg)
if id == serial.StoreRootFileID {
isInfrastructure = true
infrastructureCount++
progress <- fmt.Sprintf("Infrastructure chunk: %s (type: StoreRoot)", chunkHash.String())
} else if id == serial.WorkingSetFileID {
isInfrastructure = true
infrastructureCount++
progress <- fmt.Sprintf("Infrastructure chunk: %s (type: WorkingSet)", chunkHash.String())
}
}
if !isInfrastructure {
unreachableCount++
humanStr, err := types.EncodedValue(ctx, chunkValue)
progress <- fmt.Sprintf("Unreachable chunk: %s", chunkHash.String())
progress <- fmt.Sprintf(" Type: %T", chunkValue)
if err != nil {
progress <- fmt.Sprintf(" Human readable: (error: %v)", err)
} else {
progress <- fmt.Sprintf(" Human readable: %s", humanStr)
}
// For SerialMessage, also try to show some raw content
if serialMsg, ok := chunkValue.(types.SerialMessage); ok {
if len(serialMsg) < 200 {
progress <- fmt.Sprintf(" Raw content: %q", string(serialMsg))
} else {
progress <- fmt.Sprintf(" Raw content (first 200 bytes): %q", string(serialMsg[:200]))
}
}
typeMap[id]++
} else {
// Spit on the old format.
panic(fmt.Sprintf("hash %s is not a SerialMessage, got type %T", chunkHash.String(), chunkValue))
}
}
}
progress <- fmt.Sprintf("Found %d unreachable chunks out of %d total chunks (%d infrastructure chunks excluded)", unreachableCount, len(allChunks), infrastructureCount)
for t, count := range typeMap {
progress <- fmt.Sprintf("Unreachable chunks of type %s: %d", t, count)
unreachableCount += count
}
progress <- fmt.Sprintf("Found %d unreachable chunks out of %d total chunks)", unreachableCount, len(allChunks))
} else {
progress <- "No branches, remote branches, or tags found - skipping commit DAG validation"
}