diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index f18c1eda00..a0d0a3b01b 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -45,6 +45,10 @@ const ( // journalIndexDefaultMaxNovel determines how often we flush // records qto the out-of-band journal index file. journalIndexDefaultMaxNovel = 16384 + + // journalMaybeSyncThreshold determines how much un-syncd written data + // can be outstanding to the journal before we will sync it. + journalMaybeSyncThreshold = 64 * 1024 * 1024 ) var ( @@ -151,6 +155,9 @@ type journalWriter struct { path string uncmpSz uint64 + unsyncd uint64 + currentRoot hash.Hash + ranges rangeIndex index *os.File maxNovel int @@ -286,6 +293,8 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer return hash.Hash{}, err } + wr.currentRoot = last + return } @@ -354,8 +363,29 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error { if err != nil { return err } + wr.unsyncd += uint64(recordLen) _ = writeChunkRecord(buf, cc) wr.ranges.put(addr(cc.H), rng) + + // To fulfill our durability guarantees, we technically only need to + // file.Sync() the journal when we commit a new root chunk. However, + // allowing an unbounded amount of unflushed dirty pages to accumulate + // in the OS's page cache makes it possible for small writes which come + // along during a large non-committing write to block on flushing all + // of the unflushed data. To minimize interference from large + // non-committing writes, we cap the amount of unflushed data here. + // + // We go through |commitRootHash|, instead of directly |Sync()|ing the + // file, because we also have accumulating delayed work in the form of + // journal index records which may need to be serialized and flushed. + // Assumptions in journal bootstraping and the contents of the journal + // index require us to have a newly written root hash record anytime we + // write index records out. It's perfectly fine to reuse the current + // root hash, and this will also take care of the |Sync|. + if wr.unsyncd > journalMaybeSyncThreshold && !wr.currentRoot.IsEmpty() { + return wr.commitRootHashUnlocked(wr.currentRoot) + } + return nil } @@ -363,11 +393,15 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error { func (wr *journalWriter) commitRootHash(root hash.Hash) error { wr.lock.Lock() defer wr.lock.Unlock() + return wr.commitRootHashUnlocked(root) +} +func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error { buf, err := wr.getBytes(rootHashRecordSize()) if err != nil { return err } + wr.currentRoot = root n := writeRootHashRecord(buf, addr(root)) if err = wr.flush(); err != nil { return err @@ -375,6 +409,7 @@ func (wr *journalWriter) commitRootHash(root hash.Hash) error { if err = wr.journal.Sync(); err != nil { return err } + wr.unsyncd = 0 if wr.ranges.novelCount() > wr.maxNovel { o := wr.offset() - int64(n) // pre-commit journal offset err = wr.flushIndexRecord(root, o)