mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-26 10:37:04 -06:00
go/store/nbs: Flush a journal writer to fsync and write index records when it builds up substantial unflushed data, even if there is no commit yet.
This commit is contained in:
@@ -45,6 +45,10 @@ const (
|
||||
// journalIndexDefaultMaxNovel determines how often we flush
|
||||
// records qto the out-of-band journal index file.
|
||||
journalIndexDefaultMaxNovel = 16384
|
||||
|
||||
// journalMaybeSyncThreshold determines how much un-syncd written data
|
||||
// can be outstanding to the journal before we will sync it.
|
||||
journalMaybeSyncThreshold = 64 * 1024 * 1024
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -151,6 +155,9 @@ type journalWriter struct {
|
||||
path string
|
||||
uncmpSz uint64
|
||||
|
||||
unsyncd uint64
|
||||
currentRoot hash.Hash
|
||||
|
||||
ranges rangeIndex
|
||||
index *os.File
|
||||
maxNovel int
|
||||
@@ -286,6 +293,8 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer
|
||||
return hash.Hash{}, err
|
||||
}
|
||||
|
||||
wr.currentRoot = last
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -354,8 +363,29 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wr.unsyncd += uint64(recordLen)
|
||||
_ = writeChunkRecord(buf, cc)
|
||||
wr.ranges.put(addr(cc.H), rng)
|
||||
|
||||
// To fulfill our durability guarantees, we technically only need to
|
||||
// file.Sync() the journal when we commit a new root chunk. However,
|
||||
// allowing an unbounded amount of unflushed dirty pages to accumulate
|
||||
// in the OS's page cache makes it possible for small writes which come
|
||||
// along during a large non-committing write to block on flushing all
|
||||
// of the unflushed data. To minimize interference from large
|
||||
// non-committing writes, we cap the amount of unflushed data here.
|
||||
//
|
||||
// We go through |commitRootHash|, instead of directly |Sync()|ing the
|
||||
// file, because we also have accumulating delayed work in the form of
|
||||
// journal index records which may need to be serialized and flushed.
|
||||
// Assumptions in journal bootstraping and the contents of the journal
|
||||
// index require us to have a newly written root hash record anytime we
|
||||
// write index records out. It's perfectly fine to reuse the current
|
||||
// root hash, and this will also take care of the |Sync|.
|
||||
if wr.unsyncd > journalMaybeSyncThreshold && !wr.currentRoot.IsEmpty() {
|
||||
return wr.commitRootHashUnlocked(wr.currentRoot)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -363,11 +393,15 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error {
|
||||
func (wr *journalWriter) commitRootHash(root hash.Hash) error {
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
return wr.commitRootHashUnlocked(root)
|
||||
}
|
||||
|
||||
func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error {
|
||||
buf, err := wr.getBytes(rootHashRecordSize())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wr.currentRoot = root
|
||||
n := writeRootHashRecord(buf, addr(root))
|
||||
if err = wr.flush(); err != nil {
|
||||
return err
|
||||
@@ -375,6 +409,7 @@ func (wr *journalWriter) commitRootHash(root hash.Hash) error {
|
||||
if err = wr.journal.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
wr.unsyncd = 0
|
||||
if wr.ranges.novelCount() > wr.maxNovel {
|
||||
o := wr.offset() - int64(n) // pre-commit journal offset
|
||||
err = wr.flushIndexRecord(root, o)
|
||||
|
||||
Reference in New Issue
Block a user