Gather commit messages with first pass

This commit is contained in:
Neil Macneale IV
2025-12-19 00:12:30 +00:00
parent 8aaa6fb73e
commit de24553ec3
+36 -87
View File
@@ -501,6 +501,10 @@ func fsckOnChunkStore(ctx context.Context, ddb *doltdb.DoltDB, gs *nbs.Generatio
// Build a set of all chunks found during the full scan
allChunks := make(hash.HashSet)
// Track chunks by their message type during scanning
chunksByType := make(map[string][]hash.Hash)
chunksByTypeLock := &sync.Mutex{}
// Callback for validating chunks. This code could be called concurrently, though that is not currently the case.
validationCallback := func(chunk chunks.Chunk) {
chunkOk := true
@@ -512,6 +516,24 @@ func fsckOnChunkStore(ctx context.Context, ddb *doltdb.DoltDB, gs *nbs.Generatio
// Add chunk to our set of all found chunks
allChunks.Insert(h)
// Determine chunk type using serial message file ID
var chunkType string
if len(raw) >= serial.MessagePrefixSz+4 { // Check if we have enough bytes for a serial message
fileID := serial.GetFileID(raw)
if fileID != "" {
chunkType = fileID
} else {
chunkType = "UNKNOWN"
}
} else {
chunkType = "TOO_SHORT"
}
// Thread-safely add to chunks by type. NM4?
chunksByTypeLock.Lock()
chunksByType[chunkType] = append(chunksByType[chunkType], h)
chunksByTypeLock.Unlock()
if h != calcChkSum {
fuzzyMatch := false
// Special case for the journal chunk source. We may have an address which has 4 null bytes at the end.
@@ -560,96 +582,23 @@ func fsckOnChunkStore(ctx context.Context, ddb *doltdb.DoltDB, gs *nbs.Generatio
return nil, err
}
// Report chunk type summary
progress <- "--------------- Chunk Type Summary ---------------"
for chunkType, chunks := range chunksByType {
progress <- fmt.Sprintf("Found %d chunks of type: %s", len(chunks), chunkType)
}
// Perform commit DAG validation from all branch HEADs and tags to identify unreachable chunks
progress <- "--------------- All Objects scanned. Starting commit DAG validation ---------------"
progress <- "--------------- All Objects scanned. Starting commit validation ---------------"
references := make(map[hash.Hash][]string)
// helper to get refs, resolve them to commits, and add them to startCommits/references.
// The callback is expected to work, we'll error out if it doesn't. resolving commits is less certain though,
// so those errors are appended via appendErr to be reported at the end.
refGetter := func(cb func(context.Context) ([]ref.DoltRef, error)) error {
refs, err := cb(ctx)
if err != nil {
return fmt.Errorf("failed to get references: %w", err)
}
for _, ref := range refs {
head, err := ddb.ResolveCommitRef(ctx, ref)
if err != nil {
appendErr(fmt.Errorf("failed to resolve reference %s: %w", ref.GetPath(), err))
continue
}
commitHash, err := head.HashOf()
if err != nil {
appendErr(fmt.Errorf("failed to get hash for refeence %s: %w", ref.GetPath(), err))
continue
}
references[commitHash] = append(references[commitHash], ref.GetPath())
}
return nil
}
err = refGetter(ddb.GetBranches)
if err != nil {
return nil, fmt.Errorf("failed to get branches: %w", err)
}
err = refGetter(ddb.GetRemoteRefs)
if err != nil {
return nil, fmt.Errorf("failed to get remote branches: %w", err)
}
err = refGetter(ddb.GetTags)
if err != nil {
return nil, fmt.Errorf("failed to get tags: %w", err)
}
startCommits := make([]hash.Hash, 0, len(references))
for commitHash := range references {
startCommits = append(startCommits, commitHash)
}
// NM4 - TODO: Get working sets too. Currently commit dag walking will report
// uncommited working set chunks as unreachable.
if len(startCommits) > 0 {
progress <- fmt.Sprintf("Starting commit DAG validation from %d local branches, remote branches, and tags...", len(startCommits))
reachedChunks := make(hash.HashSet)
err = validateCommitDAGAndTrackChunks(ctx, vs, startCommits, progress, appendErr, &reachedChunks)
if err != nil {
appendErr(fmt.Errorf("commit DAG validation failed: %w", err))
}
// Report unreachable chunks grouped by message type and count. NM4 - include size.
typeMap := make(map[string]int)
unreachableCount := 0
for chunkHash := range allChunks {
if !reachedChunks.Has(chunkHash) {
// Try to read the chunk to determine it's type. We'll summarize.
chunkValue, err := vs.ReadValue(ctx, chunkHash)
if err != nil || chunkValue == nil {
// Highly suspect, as allChunks contains chunks which we loaded and verified their address.
appendErr(fmt.Errorf("ReadValue failure to read object previously loaded %s: %w", chunkHash.String(), err))
continue
}
if serialMsg, ok := chunkValue.(types.SerialMessage); ok {
id := serial.GetFileID(serialMsg)
typeMap[id]++
} else {
// Spit on the old format.
panic(fmt.Sprintf("hash %s is not a SerialMessage, got type %T", chunkHash.String(), chunkValue))
}
}
}
for t, count := range typeMap {
progress <- fmt.Sprintf("Unreachable chunks of type %s: %d", t, count)
unreachableCount += count
}
progress <- fmt.Sprintf("Found %d unreachable chunks out of %d total chunks)", unreachableCount, len(allChunks))
// Find all commit objects from our scanned chunks
var startCommits []hash.Hash
if commitChunks, hasCommits := chunksByType[serial.CommitFileID]; hasCommits {
startCommits = make([]hash.Hash, len(commitChunks))
copy(startCommits, commitChunks)
progress <- fmt.Sprintf("Found %d commit objects to validate", len(startCommits))
} else {
progress <- "No branches, remote branches, or tags found - skipping commit DAG validation"
progress <- "No commit objects found during chunk scan"
}
FSCKReport := FSCKReport{Problems: errs, ChunkCount: chunkCount}