From 4faefcde73a74b7bbc44ffc656170f18b46db1b6 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Wed, 1 Oct 2025 10:35:33 +0000 Subject: [PATCH] Write out to cache, and enable page cache read/write in FUSE --- go/client/span.go | 260 +++++++++++++++++++++++++++++----------- go/client/span_test.go | 24 +++- go/ternfuse/ternfuse.go | 23 +++- 3 files changed, 231 insertions(+), 76 deletions(-) diff --git a/go/client/span.go b/go/client/span.go index 38a3b436..8a781d2b 100644 --- a/go/client/span.go +++ b/go/client/span.go @@ -728,6 +728,23 @@ func (f *FileReader) readMirrored(log *log.Logger, client *Client, bufPool *bufp return 0, err } +type readRsStateRecover struct { + start uint32 + end uint32 + // Blocks we're downloading because we're recovering stuff. + // Lenghth is D+P. + // Note that we might have to recover some blocks that we're + // also downloading because we need more of them. + blocks []BlockReader + blocksCh chan *blockCompletion + blocksPageCacheWriteCh chan struct{} + fetching uint32 + succeeded uint32 + failed uint32 + // This is how many page-cache writes we're doing. It's a counter, not a bitmask. + writersToPageCache uint32 +} + type readRsState struct { span *msgs.FetchedBlocksSpan // The indices of the region of the span we're interested in. @@ -737,22 +754,41 @@ type readRsState struct { spanEnd uint32 // Blocks we're downloading because we need the data. // Lenght is D - blocks []BlockReader - blocksCh chan *blockCompletion - // Blocks we're downloading because we're recovering stuff. - // Lenghth is D+P. - // Note that we might have to recover some blocks that we're - // also downloading because we need more of them. - recoverBlocks []BlockReader - recoverBlocksCh chan *blockCompletion - // Bitmaps with status of `blocks` + blocks []BlockReader + blocksCh chan *blockCompletion fetching uint32 succeeded uint32 failed uint32 - // Bitmaps with status of `recoverBlocks` - recoverFetching uint32 - recoverSucceeded uint32 - recoverFailed uint32 + // Recovery state + recover *readRsStateRecover +} + +func (s *readRsState) recoverFetching() uint32 { + if s.recover == nil { + return 0 + } + return s.recover.fetching +} + +func (s *readRsState) writersToPageCache() uint32 { + if s.recover == nil { + return 0 + } + return s.recover.writersToPageCache +} + +func (s *readRsState) recoverBlocksCh() chan *blockCompletion { + if s.recover == nil { + return nil + } + return s.recover.blocksCh +} + +func (s *readRsState) blocksPageCacheWriteCh() chan struct{} { + if s.recover == nil { + return nil + } + return s.recover.blocksPageCacheWriteCh } // Block start: where to download from in the block. @@ -790,6 +826,7 @@ func (rs *readRsState) cell(stripeIx uint32, blockIx uint32) (blockStart uint32, return blockStart, blockFrom, blockTo, outFrom, outTo } +// Returns blockStart == blockFrom == blockTo == 0 if we shouldn't download anything from this block. func (rs *readRsState) blockToDownload(blockIx uint32) (blockStart uint32, blockFrom uint32, blockTo uint32) { for stripeIx := range uint32(rs.span.Stripes) { thisStart, thisFrom, thisTo, _, _ := rs.cell(stripeIx, blockIx) @@ -821,9 +858,9 @@ func (rs *readRsState) freeBlocks(bufPool *bufpool.BufPool) { bufPool.Put(rs.blocks[i].AcquireBuf()) } } - if rs.recoverBlocks != nil { - for i := range rs.recoverBlocks { - bufPool.Put(rs.recoverBlocks[i].AcquireBuf()) + if rs.recover != nil { + for i := range rs.recover.blocks { + bufPool.Put(rs.recover.blocks[i].AcquireBuf()) } } } @@ -847,7 +884,12 @@ type PageCache interface { // Return 0 if things could not be found in the cache. Must read multiples of // TERN_PAGE_SIZE, or not at all. ReadCache(offset uint64, dest []byte) (count int) - // WriteCache(offset uint64, dest []byte) + WriteCache(offset uint64, data []byte) +} + +type readRsRecoverExtra struct { + blockIx uint8 + readFromPageCache uint32 // the portion of the read that came from page cache, always page-aligned } func readRsStartFetchBlockWithCache( @@ -858,27 +900,25 @@ func readRsStartFetchBlockWithCache( span *SpanWithBlockServices, state *readRsState, blockIx uint8, - recoverStart uint32, - recoverEnd uint32, ) error { - reader := &state.recoverBlocks[blockIx] - reader.New(bufPool, recoverStart, recoverEnd-recoverStart, nil) - offset := recoverStart - count := recoverEnd - recoverStart + reader := &state.recover.blocks[blockIx] + reader.New(bufPool, state.recover.start, state.recover.end-state.recover.start, nil) + offset := state.recover.start + count := state.recover.end - state.recover.start + extra := readRsRecoverExtra{blockIx: blockIx} // If we have the page cache, try to retrieve data blocks from it. We read // from the page cache until we can, left to right, and hand off to block // services from when we can't onwards. We trust the page cache to not return // bad results (i.e. we don't, and can't, CRC its pages). block := &state.span.Blocks[blockIx] if pageCache == nil || blockIx >= uint8(state.span.Parity.DataBlocks()) { - return startFetch.StartFetchBlock(log, &span.BlockServices[block.BlockServiceIx], block.BlockId, offset, count, reader, uint8(blockIx), state.recoverBlocksCh) + return startFetch.StartFetchBlock(log, &span.BlockServices[block.BlockServiceIx], block.BlockId, offset, count, reader, extra, state.recover.blocksCh) } go func() { // Go cell-by-cell, fetching from the page cache as much as we can - readFromPageCache := uint32(0) // without CRCs stripeSize := uint32(state.span.Parity.DataBlocks()) * state.span.CellSize readerBuf := reader.PagesWithCrcBuffer() - for stripeIx := uint32(0); stripeIx < uint32(state.span.Stripes); stripeIx++ { + for stripeIx := range uint32(state.span.Stripes) { blockCellStart := stripeIx * state.span.CellSize blockCellEnd := blockCellStart + state.span.CellSize if blockCellEnd <= offset { @@ -905,27 +945,27 @@ func readRsStartFetchBlockWithCache( } pageCacheReadCount = uint32(fileEnd - fileStart) } - r := pageCache.ReadCache(fileStart, readerBuf[readFromPageCache:readFromPageCache+pageCacheReadCount]) + r := pageCache.ReadCache(fileStart, readerBuf[extra.readFromPageCache:extra.readFromPageCache+pageCacheReadCount]) // Round down to page, Linux's page cache should never return half-pages I think, but // let's be safe. r = (r / int(msgs.TERN_PAGE_SIZE)) * int(msgs.TERN_PAGE_SIZE) - readFromPageCache += uint32(r) + extra.readFromPageCache += uint32(r) if r < int(pageCacheReadCount) { break } } - reader.Advance(readFromPageCache) - if readFromPageCache == count { // we got everything from page cache + reader.Advance(extra.readFromPageCache) + if extra.readFromPageCache == count { // we got everything from page cache _, err := reader.ReadFrom(bytes.NewReader([]byte{})) // realign - state.recoverBlocksCh <- &blockCompletion{ + state.recover.blocksCh <- &blockCompletion{ Error: err, - Extra: uint8(blockIx), + Extra: extra, } } else { - if err := startFetch.StartFetchBlock(log, &span.BlockServices[block.BlockServiceIx], block.BlockId, offset+readFromPageCache, count-readFromPageCache, reader, uint8(blockIx), state.recoverBlocksCh); err != nil { - state.recoverBlocksCh <- &blockCompletion{ + if err := startFetch.StartFetchBlock(log, &span.BlockServices[block.BlockServiceIx], block.BlockId, offset+extra.readFromPageCache, count-extra.readFromPageCache, reader, extra, state.recover.blocksCh); err != nil { + state.recover.blocksCh <- &blockCompletion{ Error: err, - Extra: uint8(blockIx), + Extra: extra, } } } @@ -933,6 +973,73 @@ func readRsStartFetchBlockWithCache( return nil } +func readRsWriteToPageCache( + log *log.Logger, + pageCache PageCache, + span *SpanWithBlockServices, + state *readRsState, + extra *readRsRecoverExtra, +) { + // If we have a page cache, and this is a data block, and this didn't all come from page + // cache, write out the other parts which are within file boundaries. + if pageCache != nil && int(extra.blockIx) < state.span.Parity.DataBlocks() && extra.readFromPageCache < state.recover.end-state.recover.start { + stripeSize := uint32(state.span.Parity.DataBlocks()) * state.span.CellSize + blockStart, _, blockTo := state.blockToDownload(uint32(extra.blockIx)) // blockStart is what we want -- it's already page-aligned. + noFetch := blockStart == blockTo + pageCacheFrom := state.recover.start + extra.readFromPageCache + pageCacheTo := state.recover.end + for stripeIx := range uint32(state.span.Stripes) { + blockCellStart := stripeIx * state.span.CellSize + blockCellEnd := blockCellStart + state.span.CellSize + if blockCellEnd <= pageCacheFrom { // didn't get to the interesting region yet + continue + } + if blockCellStart >= pageCacheTo { // reached the end + break + } + if blockCellStart < pageCacheFrom { + blockCellStart = pageCacheFrom + } + if blockCellEnd > pageCacheTo { + blockCellEnd = pageCacheTo + } + fileAdjustment := -stripeIx*state.span.CellSize + stripeIx*stripeSize + uint32(extra.blockIx)*state.span.CellSize + fileStart := span.Span.Header.ByteOffset + uint64(blockCellStart) + uint64(fileAdjustment) + fileEnd := span.Span.Header.ByteOffset + uint64(blockCellEnd) + uint64(fileAdjustment) + clippedFileEnd := min(fileEnd, span.Span.Header.ByteOffset+uint64(span.Span.Header.Size)) + clippedLen := int64(clippedFileEnd) - int64(fileStart) + if clippedLen < 0 { // ran over the file + break + } + blockCellEnd = min(blockCellEnd, blockCellStart+uint32(clippedLen)) + if noFetch { // we weren't fetching this at all, write everything + state.recover.writersToPageCache++ + go func() { + defer func() { state.recover.blocksPageCacheWriteCh <- struct{}{} }() + pageCache.WriteCache(uint64(fileStart), state.recover.blocks[extra.blockIx].Buf().Bytes()[uint32(blockCellStart)-state.recover.start:uint32(blockCellEnd)-state.recover.start]) + }() + } else { + if blockCellStart < blockStart { // we have stuff to write before + state.recover.writersToPageCache++ + end := min(blockCellEnd, blockStart) + go func() { + defer func() { state.recover.blocksPageCacheWriteCh <- struct{}{} }() + pageCache.WriteCache(fileStart, state.recover.blocks[extra.blockIx].Buf().Bytes()[blockCellStart-state.recover.start:end-state.recover.start]) + }() + } + if blockCellEnd > blockTo { // we have stuff to write after + state.recover.writersToPageCache++ + begin := max(blockTo, blockCellStart) + go func() { + defer func() { state.recover.blocksPageCacheWriteCh <- struct{}{} }() + pageCache.WriteCache(fileStart+uint64(begin-blockCellStart), state.recover.blocks[extra.blockIx].Buf().Bytes()[begin-state.recover.start:blockCellEnd-state.recover.start]) + }() + } + } + } + } +} + func readRsRecover( log *log.Logger, startFetch startFetch, @@ -950,13 +1057,18 @@ func readRsRecover( // more because another block is involved, too. So the first thing // we compute is the from/to recover we need for any recover block we // will start. - state.recoverBlocks = make([]BlockReader, state.span.Parity.Blocks()) - state.recoverBlocksCh = make(chan *blockCompletion, state.span.Parity.DataBlocks()) - var recoverStart, recoverEnd uint32 + state.recover = &readRsStateRecover{ + blocks: make([]BlockReader, state.span.Parity.Blocks()), + blocksCh: make(chan *blockCompletion, state.span.Parity.DataBlocks()), + blocksPageCacheWriteCh: make(chan struct{}), + } for blockIx := range uint32(state.span.Parity.DataBlocks()) { blockStart, _, blockTo := state.blockToDownload(blockIx) - recoverStart = min(recoverStart, blockStart) - recoverEnd = max(blockTo, recoverEnd) + if blockStart == blockTo { + continue + } + state.recover.start = min(state.recover.start, blockStart) + state.recover.end = max(blockTo, state.recover.end) } // Mark things that don't need recover fetching as complete already, // and those that are failed or can't be read failed also. Again we @@ -965,12 +1077,12 @@ func readRsRecover( // anything we just use the fetched data. for blockIx := range uint32(state.span.Parity.DataBlocks()) { if state.failed&(uint32(1)< 0 || recoverFetching > 0 { + recoverFetching := bits.OnesCount32(state.recoverFetching()) + if fetching > 0 || recoverFetching > 0 || state.writersToPageCache() > 0 { go func() { // Need to make sure that all requests to be finished // before we can free the buffers. No need to block // the exit though. - for fetching > 0 || recoverFetching > 0 { + for fetching > 0 || recoverFetching > 0 || state.writersToPageCache() > 0 { select { case <-state.blocksCh: fetching-- - case <-state.recoverBlocksCh: + case <-state.recoverBlocksCh(): recoverFetching-- + case <-state.blocksPageCacheWriteCh(): + state.recover.writersToPageCache-- } } @@ -1165,6 +1280,7 @@ func readRs( func (f *FileReader) Read( log *log.Logger, client *Client, + pageCache PageCache, bufPool *bufpool.BufPool, offset uint64, dest []byte, @@ -1188,7 +1304,7 @@ func (f *FileReader) Read( if body.Parity.DataBlocks() == 1 { // mirrored return f.readMirrored(log, client, bufPool, offset, span, dest) } else { // RS - return readRs(log, client, nil, bufPool, uint32(offset-span.Span.Header.ByteOffset), span, dest) + return readRs(log, client, pageCache, bufPool, uint32(offset-span.Span.Header.ByteOffset), span, dest) } } @@ -1216,7 +1332,7 @@ type fileReader struct { } func (f *fileReader) Read(p []byte) (int, error) { - n, err := f.reader.Read(f.log, f.client, f.bufPool, f.offset, p) + n, err := f.reader.Read(f.log, f.client, nil, f.bufPool, f.offset, p) f.Seek(int64(n), io.SeekCurrent) return n, err } diff --git a/go/client/span_test.go b/go/client/span_test.go index f72c8636..1508644a 100644 --- a/go/client/span_test.go +++ b/go/client/span_test.go @@ -66,7 +66,7 @@ func (bf *blockFetcher) StartFetchBlock(log *log.Logger, blockService *msgs.Bloc // TODO randomize completion order go func() { defer func() { - handleRecover(bf.errorsChan, recover(), fmt.Sprintf("%s blockId=%v offset=%v count=%v", bf.info, blockId, offset, count)) + handleRecover(bf.errorsChan, recover(), fmt.Sprintf("%s fetch block blockId=%v offset=%v count=%v", bf.info, blockId, offset, count)) }() if bf.bad&(uint32(1)< bf.spanOffset+uint64(len(bf.contents)) { + panic(fmt.Errorf("offset=%v + len(data)=%v > bf.spanOffset=%v + len(bf.contents)=%v", offset, len(data), bf.spanOffset, len(bf.contents))) + } + if !bytes.Equal(bf.contents[offset-bf.spanOffset:(offset-bf.spanOffset)+uint64(len(data))], data) { + panic(fmt.Errorf("mismatching page cache write")) + } +} + func insertPageCrcs(data []byte) []byte { pages := len(data) / int(msgs.TERN_PAGE_SIZE) data = append(data, make([]byte, pages*4)...) // add space for CRCs @@ -209,6 +226,9 @@ func (fpc *flakyPageCache) ReadCache(offset uint64, dest []byte) (count int) { return fpc.c.ReadCache(offset, dest) } +func (fpc *flakyPageCache) WriteCache(offset uint64, data []byte) { + fpc.c.WriteCache(offset, data) +} func runRsReadTest( log *log.Logger, @@ -226,7 +246,7 @@ func runRsReadTest( defer func() { handleRecover(errorsChan, recover(), fmt.Sprintf("test runner for parity=%v stripes=%v length=%v lastReadFrom=%v lastReadTo=%v lastBad=%s", parity, stripes, length, lastReadFrom, lastReadTo, printBlockBitmap(parity.Blocks(), lastBad))) }() - seed := uint64(length) | uint64(stripes)<<32 | uint64(parity)<<(32 + 8) + seed := uint64(length) | uint64(stripes)<<32 | uint64(parity)<<(32+8) r := wyhash.New(seed) contents := make([]byte, length) r.Read(contents) diff --git a/go/ternfuse/ternfuse.go b/go/ternfuse/ternfuse.go index 4012de90..1d73e96d 100644 --- a/go/ternfuse/ternfuse.go +++ b/go/ternfuse/ternfuse.go @@ -32,6 +32,7 @@ import ( ) var c *client.Client +var server *fuse.Server var logger *log.Logger var dirInfoCache *client.DirInfoCache var bufPool *bufpool.BufPool @@ -675,6 +676,24 @@ func (n *ternNode) OpendirHandle(ctx context.Context, flags uint32) (fh fs.FileH return od, fuse.FOPEN_CACHE_DIR, 0 } +type fusePageCache struct { + id msgs.InodeId +} + +func (fc fusePageCache) ReadCache(offset uint64, dest []byte) (count int) { + count, err := server.InodeRetrieveCache(uint64(fc.id), int64(offset), dest) + if err != fuse.OK { + logger.RaiseAlert("Could not read from cache id=%v offset=%v len=%v err=%v", fc.id, offset, len(dest), err) + } + return count +} + +func (fc fusePageCache) WriteCache(offset uint64, data []byte) { + if err := server.InodeNotifyStoreCache(uint64(fc.id), int64(offset), data); err != fuse.OK { + logger.RaiseAlert("Could not write to cache id=%v offset=%v len=%v err=%v", fc.id, offset, len(data), err) + } +} + func (f *ternFile) Read(ctx context.Context, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) { f.mu.RLock() defer f.mu.RUnlock() @@ -689,7 +708,7 @@ func (f *ternFile) Read(ctx context.Context, dest []byte, off int64) (fuse.ReadR // FUSE seems to require a full read r := 0 for r < len(dest) { - thisR, err := tf.Read(logger, c, bufPool, uint64(off)+uint64(r), dest[r:]) + thisR, err := tf.Read(logger, c, fusePageCache{f.id}, bufPool, uint64(off)+uint64(r), dest[r:]) if thisR == 0 || err == io.EOF { break } @@ -1184,7 +1203,7 @@ func main() { if *allowOther { fuseOptions.MountOptions.Options = append(fuseOptions.MountOptions.Options, "allow_other") } - server, err := fs.Mount(mountPoint, &root, fuseOptions) + server, err = fs.Mount(mountPoint, &root, fuseOptions) if err != nil { fmt.Fprintf(os.Stderr, "Could not mount: %v", err) os.Exit(1)