remove todos and clean up some dead code

This commit is contained in:
Neil Macneale IV
2025-11-17 13:43:18 -08:00
parent d672608348
commit 7c0bc33c12
6 changed files with 24 additions and 30 deletions
+3 -5
View File
@@ -135,9 +135,7 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat,
return s.ddb, s.vrw, s.ns, nil
}
ddb, vrw, ns, err := fact.CreateDbNoCache(ctx, nbf, urlObj, params, func(vErr error) {
logrus.Error(vErr.Error())
})
ddb, vrw, ns, err := fact.CreateDbNoCache(ctx, nbf, urlObj, params, nbs.JournalParserLoggingRecoveryCb)
if err != nil {
return nil, nil, nil, err
}
@@ -157,7 +155,7 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat,
//
// Furthermore, regular database loading uses this code path to construct the GenerationalCS, which is desired because we
// want the same underlying implementation.
func (fact FileFactory) CreateDbNoCache(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}, valCb func(error)) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
func (fact FileFactory) CreateDbNoCache(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}, recCb func(error)) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
path, err := url.PathUnescape(urlObj.Path)
if err != nil {
return nil, nil, nil, err
@@ -181,7 +179,7 @@ func (fact FileFactory) CreateDbNoCache(ctx context.Context, nbf *types.NomsBinF
var newGenSt *nbs.NomsBlockStore
q := nbs.NewUnlimitedMemQuotaProvider()
if useJournal && chunkJournalFeatureFlag {
newGenSt, err = nbs.NewLocalJournalingStore(ctx, nbf.VersionString(), path, q, mmapArchiveIndexes, valCb)
newGenSt, err = nbs.NewLocalJournalingStore(ctx, nbf.VersionString(), path, q, mmapArchiveIndexes, recCb)
} else {
newGenSt, err = nbs.NewLocalStore(ctx, nbf.VersionString(), path, defaultMemTableSize, q, mmapArchiveIndexes)
}
+16 -10
View File
@@ -71,7 +71,7 @@ var _ manifest = &ChunkJournal{}
var _ manifestGCGenUpdater = &ChunkJournal{}
var _ io.Closer = &ChunkJournal{}
func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifest, p *fsTablePersister, valCb func(error)) (*ChunkJournal, error) {
func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifest, p *fsTablePersister, recoveryCb func(error)) (*ChunkJournal, error) {
path, err := filepath.Abs(filepath.Join(dir, chunkJournalName))
if err != nil {
return nil, err
@@ -87,13 +87,17 @@ func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifes
} else if ok {
// only bootstrap journalWriter if the journal file exists,
// otherwise we wait to open in case we're cloning
if err = j.bootstrapJournalWriter(ctx, valCb); err != nil {
if err = j.bootstrapJournalWriter(ctx, recoveryCb); err != nil {
return nil, err
}
}
return j, nil
}
func JournalParserLoggingRecoveryCb(err error) {
logrus.Errorf(err.Error())
}
// reflogBufferSize returns the size of the ring buffer to allocate to store in-memory roots references when
// new roots are written to a chunk journal. If reflog queries have been disabled, this function will return 0.
// If the default buffer size has been overridden via DOLT_REFLOG_RECORD_LIMIT, that value will be returned if
@@ -133,7 +137,10 @@ func reflogBufferSize() int {
// file (see indexRec). The journal file is the source of truth for latest root hash.
// As we process journal records, we keep track of the latest root hash record we see
// and update the manifest file with the last root hash we saw.
func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, valCb func(error)) (err error) {
//
// |recoveryCb| is a callback function invoked if a recoverable parse error is encountered. Usually used
// for printing an error message to the user, but also used for fsck to report the error and exit with an error.
func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, recoveryCb func(error)) (err error) {
var ok bool
ok, err = fileExists(j.path)
if err != nil {
@@ -148,7 +155,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, valCb func(er
return err
}
_, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, valCb)
_, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, recoveryCb)
if err != nil {
return err
}
@@ -176,7 +183,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, valCb func(er
}
// parse existing journal file
root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, valCb)
root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, recoveryCb)
if err != nil {
return err
}
@@ -249,7 +256,7 @@ func (j *ChunkJournal) IterateRoots(f func(root string, timestamp *time.Time) er
func (j *ChunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
if j.backing.readOnly() {
return nil, gcBehavior_Continue, errReadOnlyManifest
} else if err := j.maybeInit(ctx, nil); err != nil { // NM4 - default, nil?
} else if err := j.maybeInit(ctx, JournalParserLoggingRecoveryCb); err != nil {
return nil, gcBehavior_Continue, err
}
@@ -287,8 +294,7 @@ func (j *ChunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, sta
// Open implements tablePersister.
func (j *ChunkJournal) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
if name == journalAddr {
// NM4 - default log?
if err := j.maybeInit(ctx, nil); err != nil {
if err := j.maybeInit(ctx, JournalParserLoggingRecoveryCb); err != nil {
return nil, err
}
return journalChunkSource{journal: j.wr}, nil
@@ -467,9 +473,9 @@ func (j *ChunkJournal) ParseIfExists(ctx context.Context, stats *Stats, readHook
return
}
func (j *ChunkJournal) maybeInit(ctx context.Context, valCb func(error)) (err error) {
func (j *ChunkJournal) maybeInit(ctx context.Context, recoveryCb func(error)) (err error) {
if j.wr == nil {
err = j.bootstrapJournalWriter(ctx, valCb)
err = j.bootstrapJournalWriter(ctx, recoveryCb)
}
return
}
-10
View File
@@ -246,16 +246,6 @@ func validateJournalRecord(buf []byte) error {
return nil
}
type CorruptJournalRecortError struct {
Data []byte
Offset int64
Why string
}
func (e *CorruptJournalRecortError) Error() string {
return fmt.Sprintf("Error validating journal record; skipping remaining journal records past offset %d: %s", e.Offset, e.Why)
}
// processJournalRecords iterates over a chunk journal's records by reading from disk using |r|, starting at
// offset |off|, and calls the callback function |cb| with each journal record. The offset where reading was stopped
// is returned, or any error encountered along the way.
+2 -2
View File
@@ -187,7 +187,7 @@ var _ io.Closer = &journalWriter{}
// are added to the novel ranges map. If the number of novel lookups exceeds |wr.maxNovel|, we
// extend the journal index with one metadata flush before existing this function to save indexing
// progress.
func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer, valCb func(error)) (last hash.Hash, err error) {
func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer, recoveryCb func(error)) (last hash.Hash, err error) {
wr.lock.Lock()
defer wr.lock.Unlock()
@@ -330,7 +330,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer
return fmt.Errorf("unknown journal record kind (%d)", r.kind)
}
return nil
}, valCb)
}, recoveryCb)
if err != nil {
return hash.Hash{}, err
}
+2 -2
View File
@@ -666,7 +666,7 @@ func newLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSi
return newNomsBlockStore(ctx, nbfVerStr, makeManifestManager(m), p, q, c, memTableSize)
}
func NewLocalJournalingStore(ctx context.Context, nbfVers, dir string, q MemoryQuotaProvider, mmapArchiveIndexes bool, valCb func(error)) (*NomsBlockStore, error) {
func NewLocalJournalingStore(ctx context.Context, nbfVers, dir string, q MemoryQuotaProvider, mmapArchiveIndexes bool, recoveryCb func(error)) (*NomsBlockStore, error) {
cacheOnce.Do(makeGlobalCaches)
if err := checkDir(dir); err != nil {
return nil, err
@@ -678,7 +678,7 @@ func NewLocalJournalingStore(ctx context.Context, nbfVers, dir string, q MemoryQ
}
p := newFSTablePersister(dir, q, mmapArchiveIndexes)
journal, err := newChunkJournal(ctx, nbfVers, dir, m, p.(*fsTablePersister), valCb)
journal, err := newChunkJournal(ctx, nbfVers, dir, m, p.(*fsTablePersister), recoveryCb)
if err != nil {
return nil, err
}
+1 -1
View File
@@ -506,7 +506,7 @@ func (sp Spec) createDatabase(ctx context.Context) (datas.Database, types.ValueR
return getStandardLocalStore(ctx, sp.DatabaseName)
}
newGenSt, err := nbs.NewLocalJournalingStore(ctx, types.Format_Default.VersionString(), sp.DatabaseName, nbs.NewUnlimitedMemQuotaProvider(), false, nil) // NM4.
newGenSt, err := nbs.NewLocalJournalingStore(ctx, types.Format_Default.VersionString(), sp.DatabaseName, nbs.NewUnlimitedMemQuotaProvider(), false, nbs.JournalParserLoggingRecoveryCb)
// If the journaling store can't be created, fall back to a standard local store
if err != nil {