mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-11 18:49:14 -06:00
s/recoveryCb/warningsCb/
This commit is contained in:
@@ -135,7 +135,7 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat,
|
||||
return s.ddb, s.vrw, s.ns, nil
|
||||
}
|
||||
|
||||
ddb, vrw, ns, err := fact.CreateDbNoCache(ctx, nbf, urlObj, params, nbs.JournalParserLoggingRecoveryCb)
|
||||
ddb, vrw, ns, err := fact.CreateDbNoCache(ctx, nbf, urlObj, params, nbs.JournalParserLoggingWarningsCb)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ var _ manifest = &ChunkJournal{}
|
||||
var _ manifestGCGenUpdater = &ChunkJournal{}
|
||||
var _ io.Closer = &ChunkJournal{}
|
||||
|
||||
func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifest, p *fsTablePersister, recoveryCb func(error)) (*ChunkJournal, error) {
|
||||
func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifest, p *fsTablePersister, warningsCb func(error)) (*ChunkJournal, error) {
|
||||
path, err := filepath.Abs(filepath.Join(dir, chunkJournalName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -87,14 +87,14 @@ func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifes
|
||||
} else if ok {
|
||||
// only bootstrap journalWriter if the journal file exists,
|
||||
// otherwise we wait to open in case we're cloning
|
||||
if err = j.bootstrapJournalWriter(ctx, recoveryCb); err != nil {
|
||||
if err = j.bootstrapJournalWriter(ctx, warningsCb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return j, nil
|
||||
}
|
||||
|
||||
func JournalParserLoggingRecoveryCb(err error) {
|
||||
func JournalParserLoggingWarningsCb(err error) {
|
||||
logrus.Error(err.Error())
|
||||
}
|
||||
|
||||
@@ -138,9 +138,9 @@ func reflogBufferSize() int {
|
||||
// As we process journal records, we keep track of the latest root hash record we see
|
||||
// and update the manifest file with the last root hash we saw.
|
||||
//
|
||||
// |recoveryCb| is a callback function invoked if a recoverable parse error is encountered. Usually used
|
||||
// |warningsCb| is a callback function invoked if a recoverable parse error is encountered. Usually used
|
||||
// for printing an error message to the user, but also used for fsck to report the error and exit with an error.
|
||||
func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, recoveryCb func(error)) (err error) {
|
||||
func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb func(error)) (err error) {
|
||||
var ok bool
|
||||
ok, err = fileExists(j.path)
|
||||
if err != nil {
|
||||
@@ -155,7 +155,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, recoveryCb fu
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, recoveryCb)
|
||||
_, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -183,7 +183,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, recoveryCb fu
|
||||
}
|
||||
|
||||
// parse existing journal file
|
||||
root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, recoveryCb)
|
||||
root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -256,7 +256,7 @@ func (j *ChunkJournal) IterateRoots(f func(root string, timestamp *time.Time) er
|
||||
func (j *ChunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
|
||||
if j.backing.readOnly() {
|
||||
return nil, gcBehavior_Continue, errReadOnlyManifest
|
||||
} else if err := j.maybeInit(ctx, JournalParserLoggingRecoveryCb); err != nil {
|
||||
} else if err := j.maybeInit(ctx, JournalParserLoggingWarningsCb); err != nil {
|
||||
return nil, gcBehavior_Continue, err
|
||||
}
|
||||
|
||||
@@ -294,7 +294,7 @@ func (j *ChunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, sta
|
||||
// Open implements tablePersister.
|
||||
func (j *ChunkJournal) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
|
||||
if name == journalAddr {
|
||||
if err := j.maybeInit(ctx, JournalParserLoggingRecoveryCb); err != nil {
|
||||
if err := j.maybeInit(ctx, JournalParserLoggingWarningsCb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return journalChunkSource{journal: j.wr}, nil
|
||||
@@ -473,9 +473,9 @@ func (j *ChunkJournal) ParseIfExists(ctx context.Context, stats *Stats, readHook
|
||||
return
|
||||
}
|
||||
|
||||
func (j *ChunkJournal) maybeInit(ctx context.Context, recoveryCb func(error)) (err error) {
|
||||
func (j *ChunkJournal) maybeInit(ctx context.Context, warningsCb func(error)) (err error) {
|
||||
if j.wr == nil {
|
||||
err = j.bootstrapJournalWriter(ctx, recoveryCb)
|
||||
err = j.bootstrapJournalWriter(ctx, warningsCb)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -256,9 +256,9 @@ func validateJournalRecord(buf []byte) error {
|
||||
// without preventing the server from starting up, so we are careful to only return the journal file
|
||||
// offset that points to end of the last valid record.
|
||||
//
|
||||
// The |recoveryCb| callback is called with any errors encountered that we automatically recover from. This allows the caller
|
||||
// The |warningsCb| callback is called with any errors encountered that we automatically recover from. This allows the caller
|
||||
// to handle the situation in a context specific way.
|
||||
func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error, recoveryCb func(error)) (int64, error) {
|
||||
func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) {
|
||||
var (
|
||||
buf []byte
|
||||
err error
|
||||
@@ -296,20 +296,20 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
|
||||
|
||||
if l > journalWriterBuffSize {
|
||||
// Probably hit a corrupted record. Report error and stop processing.
|
||||
if recoveryCb != nil {
|
||||
if warningsCb != nil {
|
||||
// We don't assign this error to err, because we want to recover from this.
|
||||
jErr := fmt.Errorf("invalid journal record length: %d exceeds max allowed size of %d", l, journalWriterBuffSize)
|
||||
recoveryCb(jErr)
|
||||
warningsCb(jErr)
|
||||
}
|
||||
recovered = true
|
||||
break
|
||||
}
|
||||
|
||||
if buf, err = rdr.Peek(int(l)); err != nil {
|
||||
if recoveryCb != nil {
|
||||
if warningsCb != nil {
|
||||
// We probably wend off the end of the file. Report error and recover.
|
||||
jErr := fmt.Errorf("failed to read full journal record of length %d at offset %d: %w", l, off, err)
|
||||
recoveryCb(jErr)
|
||||
warningsCb(jErr)
|
||||
}
|
||||
recovered = true
|
||||
break
|
||||
@@ -321,10 +321,10 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
|
||||
// clean shutdown, we expect all journal records to be valid, and could safely error out during startup
|
||||
// for invalid records.
|
||||
if validationErr := validateJournalRecord(buf); validationErr != nil {
|
||||
if recoveryCb != nil {
|
||||
if warningsCb != nil {
|
||||
// We don't assign the validation error to err, because we want to recover from this.
|
||||
jErr := fmt.Errorf("invalid journal record at offset %d: %w", off, validationErr)
|
||||
recoveryCb(jErr)
|
||||
warningsCb(jErr)
|
||||
}
|
||||
recovered = true
|
||||
break
|
||||
@@ -357,8 +357,8 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
|
||||
if dErr != nil {
|
||||
// Finding an error at this point is considered recoverable since we were already in a recovery state.
|
||||
// Report the error and continue.
|
||||
if recoveryCb != nil {
|
||||
recoveryCb(fmt.Errorf("error while checking for possible data loss in journal at offset %d: %w", off, dErr))
|
||||
if warningsCb != nil {
|
||||
warningsCb(fmt.Errorf("error while checking for possible data loss in journal at offset %d: %w", off, dErr))
|
||||
}
|
||||
}
|
||||
if dataLossFound {
|
||||
|
||||
@@ -187,7 +187,7 @@ var _ io.Closer = &journalWriter{}
|
||||
// are added to the novel ranges map. If the number of novel lookups exceeds |wr.maxNovel|, we
|
||||
// extend the journal index with one metadata flush before existing this function to save indexing
|
||||
// progress.
|
||||
func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer, recoveryCb func(error)) (last hash.Hash, err error) {
|
||||
func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) {
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
|
||||
@@ -330,7 +330,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer
|
||||
return fmt.Errorf("unknown journal record kind (%d)", r.kind)
|
||||
}
|
||||
return nil
|
||||
}, recoveryCb)
|
||||
}, warningsCb)
|
||||
if err != nil {
|
||||
return hash.Hash{}, err
|
||||
}
|
||||
|
||||
@@ -666,7 +666,7 @@ func newLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSi
|
||||
return newNomsBlockStore(ctx, nbfVerStr, makeManifestManager(m), p, q, c, memTableSize)
|
||||
}
|
||||
|
||||
func NewLocalJournalingStore(ctx context.Context, nbfVers, dir string, q MemoryQuotaProvider, mmapArchiveIndexes bool, recoveryCb func(error)) (*NomsBlockStore, error) {
|
||||
func NewLocalJournalingStore(ctx context.Context, nbfVers, dir string, q MemoryQuotaProvider, mmapArchiveIndexes bool, warningsCb func(error)) (*NomsBlockStore, error) {
|
||||
cacheOnce.Do(makeGlobalCaches)
|
||||
if err := checkDir(dir); err != nil {
|
||||
return nil, err
|
||||
@@ -678,7 +678,7 @@ func NewLocalJournalingStore(ctx context.Context, nbfVers, dir string, q MemoryQ
|
||||
}
|
||||
p := newFSTablePersister(dir, q, mmapArchiveIndexes)
|
||||
|
||||
journal, err := newChunkJournal(ctx, nbfVers, dir, m, p.(*fsTablePersister), recoveryCb)
|
||||
journal, err := newChunkJournal(ctx, nbfVers, dir, m, p.(*fsTablePersister), warningsCb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -506,7 +506,7 @@ func (sp Spec) createDatabase(ctx context.Context) (datas.Database, types.ValueR
|
||||
return getStandardLocalStore(ctx, sp.DatabaseName)
|
||||
}
|
||||
|
||||
newGenSt, err := nbs.NewLocalJournalingStore(ctx, types.Format_Default.VersionString(), sp.DatabaseName, nbs.NewUnlimitedMemQuotaProvider(), false, nbs.JournalParserLoggingRecoveryCb)
|
||||
newGenSt, err := nbs.NewLocalJournalingStore(ctx, types.Format_Default.VersionString(), sp.DatabaseName, nbs.NewUnlimitedMemQuotaProvider(), false, nbs.JournalParserLoggingWarningsCb)
|
||||
|
||||
// If the journaling store can't be created, fall back to a standard local store
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user