skip progress append if ctx.Done()

This commit is contained in:
Neil Macneale IV
2026-01-07 15:26:03 -08:00
parent 74c9f75bd4
commit 673e388ea3

View File

@@ -47,12 +47,20 @@ import (
type ProgressReporter chan FsckProgressMessage
func (pr ProgressReporter) Milestonef(msg string, args ...any) {
pr <- FsckProgressMessage{Type: FsckProgressMilestone, Message: fmt.Sprintf(msg, args...)}
func (pr ProgressReporter) Milestonef(ctx context.Context, msg string, args ...any) {
pr.Progress(ctx, FsckProgressMessage{Type: FsckProgressMilestone, Message: fmt.Sprintf(msg, args...)})
}
func (pr ProgressReporter) Milestone(msg string) {
pr <- FsckProgressMessage{Type: FsckProgressMilestone, Message: msg}
func (pr ProgressReporter) Milestone(ctx context.Context, msg string) {
pr.Progress(ctx, FsckProgressMessage{Type: FsckProgressMilestone, Message: msg})
}
func (pr ProgressReporter) Progress(ctx context.Context, msg FsckProgressMessage) {
select {
case <-ctx.Done():
return
case pr <- msg:
}
}
// FsckProgressMessageType indicates the type of progress message
@@ -356,8 +364,8 @@ func fsckHandleProgress(ctx context.Context, progress <-chan FsckProgressMessage
// 3. Commit tree validation: For each commit found in phase 2, we validate its tree structure and all referenced objects.
//
// As with the other code in this file, we try and continue processing as much as possible even in the presence of corruption,
// so that a full report can be generated. Errors encountered during processing are appended to the |errs| slice passed in. Only
// when there is an unexpected failure (such as inability to read from storage) is an error returned. In that situation,
// so that a full report can be generated. Errors encountered during processing are appended to the |errs| object. Only
// when there is an unexpected failure (such as inability to open a storage file) is an error returned. In that situation,
// we halt processing.
func fsckOnChunkStore(ctx context.Context, gs *nbs.GenerationalNBS, errs *Errs, progress ProgressReporter) error {
rt, err := newRoundTripper(ctx, gs, progress, errs)
@@ -373,13 +381,13 @@ func fsckOnChunkStore(ctx context.Context, gs *nbs.GenerationalNBS, errs *Errs,
chunksByType := rt.chunksByType
// Report chunk type summary
progress.Milestone("--------------- Chunk Type Summary ---------------")
progress.Milestone(ctx, "--------------- Chunk Type Summary ---------------")
for chunkType, hashes := range chunksByType {
progress.Milestonef("Found %d chunks of type: %s", len(hashes), chunkType)
progress.Milestonef(ctx, "Found %d chunks of type: %s", len(hashes), chunkType)
}
// Perform commit DAG validation from all branch HEADs and tags to identify unreachable chunks
progress.Milestone("--------------- All Objects scanned. Starting commit validation ---------------")
progress.Milestone(ctx, "--------------- All Objects scanned. Starting commit validation ---------------")
// Find all commit objects from our scanned chunks
allCommitsSet := make(hash.HashSet)
@@ -387,9 +395,9 @@ func fsckOnChunkStore(ctx context.Context, gs *nbs.GenerationalNBS, errs *Errs,
for _, commitHash := range commitChunks {
allCommitsSet.Insert(commitHash)
}
progress.Milestonef("Found %d commit objects", len(allCommitsSet))
progress.Milestonef(ctx, "Found %d commit objects", len(allCommitsSet))
} else {
progress.Milestone("No commit objects found during chunk scan")
progress.Milestone(ctx, "No commit objects found during chunk scan")
}
reachableCommits, err := walkCommitDAGFromRefs(ctx, gs, &allCommitsSet, progress, errs)
@@ -399,7 +407,7 @@ func fsckOnChunkStore(ctx context.Context, gs *nbs.GenerationalNBS, errs *Errs,
// Phase 3: Tree validation for commits (performance heavy)
if len(reachableCommits) > 0 {
progress.Milestonef("Starting tree validation for %d commit objects...", len(reachableCommits))
progress.Milestonef(ctx, "Starting tree validation for %d commit objects...", len(reachableCommits))
vs := types.NewValueStore(gs)
@@ -417,10 +425,10 @@ func fsckOnChunkStore(ctx context.Context, gs *nbs.GenerationalNBS, errs *Errs,
}
unreachableChunks := chunkCount - uint32(commitReachableChunks.Size())
progress.Milestonef("Found %d unreachable commits (not reachable from any branch/tag)", unreachableCommits)
progress.Milestonef("Validated %d chunks reachable by branches and tags (unreachable: %d)", commitReachableChunks.Size(), unreachableChunks)
progress.Milestonef(ctx, "Found %d unreachable commits (not reachable from any branch/tag)", unreachableCommits)
progress.Milestonef(ctx, "Validated %d chunks reachable by branches and tags (unreachable: %d)", commitReachableChunks.Size(), unreachableChunks)
} else {
progress.Milestone("No branches or tags found. Skipping tree validation.")
progress.Milestone(ctx, "No branches or tags found. Skipping tree validation.")
}
return nil
@@ -540,13 +548,13 @@ func (rt *roundTripper) roundTripAndCategorizeChunk(chunk chunks.Chunk) {
status = "FAIL"
}
rt.progress <- FsckProgressMessage{
rt.progress.Progress(rt.ctx, FsckProgressMessage{
Type: FsckProgressChunkScan,
Message: fmt.Sprintf("%s: %s", status, h.String()),
Percentage: percentage,
Current: int(rt.proccessedCnt),
Total: int(rt.chunkCount),
}
})
}
// decodeMsg attempts to decode the chunk into a human-readable string for error reporting.
@@ -584,13 +592,13 @@ func validateCommitTrees(
percentage := (float64(processedCommits) * 100) / float64(totalCommits)
// Send progress update for tree validation
progress <- FsckProgressMessage{
progress.Progress(ctx, FsckProgressMessage{
Type: FsckProgressTreeValidation,
Message: fmt.Sprintf("Validating commit %s", commitHash.String()),
Percentage: percentage,
Current: processedCommits,
Total: totalCommits,
}
})
// Load and validate the commit
commitValue, err := vs.MustReadValue(ctx, commitHash)
if err != nil {
@@ -844,10 +852,10 @@ func walkCommitDAGFromRefs(ctx context.Context, gs *nbs.GenerationalNBS, allComm
for _, refs := range startingCommits {
refCount += len(refs)
}
progress.Milestonef("Found %d refs pointing to %d unique starting commits", refCount, len(startingCommits))
progress.Milestonef(ctx, "Found %d refs pointing to %d unique starting commits", refCount, len(startingCommits))
if len(startingCommits) == 0 {
progress.Milestone("No refs found - no commits are reachable")
progress.Milestone(ctx, "No refs found - no commits are reachable")
return hash.HashSet{}, nil
}
@@ -868,20 +876,20 @@ func walkCommitDAGFromRefs(ctx context.Context, gs *nbs.GenerationalNBS, allComm
// Skip if this commit doesn't exist in our found commits
if !allCommits.Has(commitHash) {
errs.CmtAppendF(commitHash, "missing commit object")
_ = errs.CmtAppendF(commitHash, "missing commit object")
continue
}
commitValue, err := vs.ReadValue(ctx, commitHash)
if err != nil {
errs.CmtAppendF(commitHash, "read error: %w", err)
_ = errs.CmtAppendF(commitHash, "read error: %w", err)
continue
}
if serialMsg, ok := commitValue.(types.SerialMessage); ok {
parentAddrs, err := types.SerialCommitParentAddrs(vs.Format(), serialMsg)
if err != nil {
errs.CmtAppendF(commitHash, "corrupted parent data: %w", err)
_ = errs.CmtAppendF(commitHash, "corrupted parent data: %w", err)
continue
}
@@ -895,7 +903,7 @@ func walkCommitDAGFromRefs(ctx context.Context, gs *nbs.GenerationalNBS, allComm
panic(fmt.Sprintf("::commit:%s: is not a SerialMessage, got type %T", commitHash.String(), commitValue))
}
}
progress.Milestonef("Found %d commits reachable from branches/tags", len(reachableCommits))
progress.Milestonef(ctx, "Found %d commits reachable from branches/tags", len(reachableCommits))
return reachableCommits, nil
}