diff --git a/cpp/shard/ShardDB.cpp b/cpp/shard/ShardDB.cpp index eac65bee..7d64e691 100644 --- a/cpp/shard/ShardDB.cpp +++ b/cpp/shard/ShardDB.cpp @@ -732,7 +732,8 @@ struct ShardDBImpl { break; } auto value = ExternalValue::FromSlice(it->value()); - if (key().offset()+value().size() < req.byteOffset) { // can only happens if the first cursor is out of bounds + if (key().offset()+value().spanSize() < req.byteOffset) { // can only happens if the first cursor is out of bounds + LOG_DEBUG(_env, "exiting early from spans since current key starts at %s and ends at %s, which is less than offset %s", key().offset(), key().offset()+value().spanSize(), req.byteOffset); break; } auto& respSpan = resp.spans.els.emplace_back(); diff --git a/go/crc32csum/crc32csum.go b/go/crc32csum/crc32csum.go index d70b4472..18fceffb 100644 --- a/go/crc32csum/crc32csum.go +++ b/go/crc32csum/crc32csum.go @@ -22,7 +22,7 @@ func printCrc32(name string, r io.Reader) { } crc = crc32c.Sum(crc, buf[:read]) } - fmt.Printf("%v %v\n", name, msgs.Crc(crc)) + fmt.Printf("%v %v\n", msgs.Crc(crc), name) } func main() { diff --git a/go/eggscli/eggscli.go b/go/eggscli/eggscli.go index cc774f33..bb08238a 100644 --- a/go/eggscli/eggscli.go +++ b/go/eggscli/eggscli.go @@ -271,39 +271,10 @@ func main() { cpIntoOut := cpIntoCmd.String("o", "", "Where to write the file to in Eggs") cpIntoRun := func() { path := filepath.Clean("/" + *cpIntoOut) - dirPath := filepath.Dir(path) - fileName := filepath.Base(path) - if fileName == dirPath { - fmt.Fprintf(os.Stderr, "Bad output path '%v'.\n", *cpIntoOut) - os.Exit(2) - } client, err := lib.NewClient(log, *shuckleAddress, nil, nil, nil) if err != nil { panic(err) } - dirId, err := client.ResolvePath(log, dirPath) - if err != nil { - panic(err) - } - dirInfoCache := lib.NewDirInfoCache() - spanPolicies := msgs.SpanPolicy{} - if _, err := client.ResolveDirectoryInfoEntry(log, dirInfoCache, dirId, &spanPolicies); err != nil { - panic(err) - } - blockPolicies := msgs.BlockPolicy{} - if _, err := client.ResolveDirectoryInfoEntry(log, dirInfoCache, dirId, &blockPolicies); err != nil { - panic(err) - } - stripePolicy := msgs.StripePolicy{} - if _, err := client.ResolveDirectoryInfoEntry(log, dirInfoCache, dirId, &stripePolicy); err != nil { - panic(err) - } - fileResp := msgs.ConstructFileResp{} - if err := client.ShardRequest(log, dirId.Shard(), &msgs.ConstructFileReq{Type: msgs.FILE}, &fileResp); err != nil { - panic(err) - } - fileId := fileResp.Id - cookie := fileResp.Cookie var input io.Reader if *cpIntoInput == "" { input = os.Stdin @@ -314,33 +285,11 @@ func main() { panic(err) } } - maxSpanSize := spanPolicies.Entries[len(spanPolicies.Entries)-1].MaxSize - spanBuf := make([]byte, maxSpanSize) - offset := uint64(0) - for { - spanBuf = spanBuf[:maxSpanSize] - read, err := io.ReadFull(input, spanBuf) - if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { - panic(err) - } - if err == io.EOF { - break - } - spanBuf, err = client.CreateSpan( - log, []msgs.BlockServiceId{}, &spanPolicies, &blockPolicies, &stripePolicy, - fileId, cookie, offset, uint32(read), spanBuf[:read], - ) - if err != nil { - panic(err) - } - offset += uint64(read) - if read < len(spanBuf) { - break - } - } - if err := client.ShardRequest(log, dirId.Shard(), &msgs.LinkFileReq{FileId: fileId, Cookie: cookie, OwnerId: dirId, Name: fileName}, &msgs.LinkFileResp{}); err != nil { + fileId, err := client.CreateFile(log, lib.NewDirInfoCache(), path, input) + if err != nil { panic(err) } + log.Info("File created as %v", fileId) } commands["cp-into"] = commandSpec{ flags: cpIntoCmd, diff --git a/go/eggsfuse/eggsfuse.go b/go/eggsfuse/eggsfuse.go index 8eaa5cc2..03423c28 100644 --- a/go/eggsfuse/eggsfuse.go +++ b/go/eggsfuse/eggsfuse.go @@ -23,6 +23,14 @@ var dirInfoCache *lib.DirInfoCache var readBufPool *lib.ReadSpanBufPool var writeBufPool *sync.Pool +type statCache struct { + size uint64 + mtime msgs.EggsTime +} + +var fileStatCacheMu sync.RWMutex +var fileStatCache map[msgs.InodeId]statCache + func eggsErrToErrno(err error) syscall.Errno { switch err { case msgs.INTERNAL_ERROR: @@ -319,12 +327,24 @@ func (n *eggsNode) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrO return err } } else { - resp := msgs.StatFileResp{} - if err := shardRequest(n.id.Shard(), &msgs.StatFileReq{Id: n.id}, &resp); err != 0 { - return err + fileStatCacheMu.RLock() + cached, found := fileStatCache[n.id] + fileStatCacheMu.RUnlock() + + if !found { + resp := msgs.StatFileResp{} + if err := shardRequest(n.id.Shard(), &msgs.StatFileReq{Id: n.id}, &resp); err != 0 { + return err + } + cached.mtime = resp.Mtime + cached.size = resp.Size + fileStatCacheMu.Lock() + fileStatCache[n.id] = cached + fileStatCacheMu.Unlock() } - out.Size = resp.Size - mtime := msgs.EGGS_EPOCH + uint64(resp.Mtime) + + out.Size = cached.size + mtime := msgs.EGGS_EPOCH + uint64(cached.mtime) mtimesec := mtime / 1000000000 mtimens := uint32(mtime % 1000000000) out.Ctime = mtimesec @@ -494,13 +514,11 @@ type openFile struct { id msgs.InodeId size uint64 // total size of file - // We store the last successful span resp - spans *msgs.FileSpansResp + spanReader lib.TaintableReadCloser // the span we're currently reading from + readOffset int64 // the offset we're currently reading at, in the span reader above - // The span we're currently in - currentSpanIx int - spanReader io.ReadCloser // the span reader - offset int64 // the current reading offset + // We store the last successful span resp, to avoid re-issuing it if we can + spans *msgs.FileSpansResp } func (n *eggsNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno) { @@ -511,55 +529,76 @@ func (n *eggsNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fu return nil, 0, err } of := openFile{ - id: n.id, - size: statResp.Size, - offset: -1, + id: n.id, + size: statResp.Size, + readOffset: -1, } return &of, 0, 0 } -// ensures that the span reader is at the current offset. -func (of *openFile) ensureSpan() syscall.Errno { - if of.offset < 0 || of.offset >= int64(of.size) { - panic(fmt.Errorf("of.offset=%v < 0 || of.offset=%v >= of.size=%v", of.offset, of.offset, of.size)) +func (of *openFile) lookupSpan(offset int64) ([]msgs.BlockService, *msgs.FetchedSpan) { + for i := 0; i < len(of.spans.Spans); i++ { + span := &of.spans.Spans[i] + if int64(span.Header.ByteOffset) <= offset && offset < int64(span.Header.ByteOffset)+int64(span.Header.Size) { + log.Debug("picking span starting at %v, of size %v", span.Header.ByteOffset, span.Header.Size) + return of.spans.BlockServices, span + } } - of.currentSpanIx = -1 + return nil, nil +} + +func (of *openFile) getSpan(offset int64) ([]msgs.BlockService, *msgs.FetchedSpan, syscall.Errno) { // check if we're within current span index if of.spans != nil { - for i := 0; i < len(of.spans.Spans); i++ { - span := &of.spans.Spans[i] - if int64(span.Header.ByteOffset) <= of.offset && of.offset < int64(span.Header.ByteOffset)+int64(span.Header.Size) { - of.currentSpanIx = i - break - } + blockServices, span := of.lookupSpan(offset) + if blockServices != nil { + return blockServices, span, 0 } } - // we're not within the span index, download new batch of spans - if of.currentSpanIx < 0 { - of.spans = &msgs.FileSpansResp{} - if err := shardRequest(of.id.Shard(), &msgs.FileSpansReq{FileId: of.id, ByteOffset: uint64(of.offset)}, of.spans); err != 0 { + // we need to fetch the spans again + of.spans = &msgs.FileSpansResp{} + if err := shardRequest(of.id.Shard(), &msgs.FileSpansReq{FileId: of.id, ByteOffset: uint64(offset)}, of.spans); err != 0 { + return nil, nil, eggsErrToErrno(err) + } + blockServices, span := of.lookupSpan(offset) + if blockServices == nil { + panic(fmt.Errorf("couldn't get span at offset %v", offset)) + } + return blockServices, span, 0 +} + +// ensures that the span reader is at the current offset. +func (of *openFile) getSpanReaderAt(offset int64) syscall.Errno { + if offset < 0 || offset >= int64(of.size) { + panic(fmt.Errorf("offset=%v < 0 || offset=%v >= of.size=%v", offset, offset, of.size)) + } + if of.spanReader != nil { + panic(fmt.Errorf("we already have a span reader, close it explicitly first (it's non obvious how to from here)")) + } + blockServices, fetchedSpan, err := of.getSpan(offset) + if err != 0 { + return err + } + { + var err error + of.spanReader, err = client.ReadSpan(log, readBufPool, []msgs.BlockServiceId{}, blockServices, fetchedSpan) + if err != nil { return eggsErrToErrno(err) } - of.currentSpanIx = 0 } - span := &of.spans.Spans[of.currentSpanIx] - var err error - of.spanReader, err = client.ReadSpan(log, readBufPool, []msgs.BlockServiceId{}, of.spans.BlockServices, span) - if err != nil { - return eggsErrToErrno(err) - } - currentOffset := int64(span.Header.ByteOffset) + of.readOffset = int64(fetchedSpan.Header.ByteOffset) + // fast forward to what we're interested var buf [256]byte - for currentOffset < of.offset { + for of.readOffset < offset { end := 256 - if of.offset-currentOffset < 256 { - end = int(of.offset - currentOffset) + if offset-of.readOffset < 256 { + end = int(offset - of.readOffset) } read, err := of.spanReader.Read(buf[:end]) if err != nil { return eggsErrToErrno(err) } - currentOffset += int64(read) + of.readOffset += int64(read) } return 0 } @@ -577,13 +616,14 @@ func (of *openFile) readInternal(dest []byte, off int64) (int64, syscall.Errno) } // If the offset has changed, reset - if off != of.offset { - log.Debug("mismatching offset (%v vs %v), will reset span", off, of.offset) + if off != of.readOffset { + log.Debug("mismatching offset (%v vs %v), will reset span", off, of.readOffset) if of.spanReader != nil { + of.spanReader.Taint() of.spanReader.Close() + of.spanReader = nil } - of.offset = off - if err := of.ensureSpan(); err != 0 { + if err := of.getSpanReaderAt(off); err != 0 { return 0, err } } @@ -591,12 +631,17 @@ func (of *openFile) readInternal(dest []byte, off int64) (int64, syscall.Errno) read, err := of.spanReader.Read(dest) if err == io.EOF { // load next span log.Debug("finished reading current span, loading next") + // close normally here so that we can reuse the connections of.spanReader.Close() - if err := of.ensureSpan(); err != 0 { + of.spanReader = nil + if err := of.getSpanReaderAt(off); err != 0 { return 0, err } + read, err = of.spanReader.Read(dest) } + of.readOffset += int64(read) + if err != nil { return int64(read), eggsErrToErrno(err) } @@ -605,9 +650,8 @@ func (of *openFile) readInternal(dest []byte, off int64) (int64, syscall.Errno) func (of *openFile) Flush(ctx context.Context) syscall.Errno { if of.spanReader != nil { - if err := of.spanReader.Close(); err != nil { - return eggsErrToErrno(err) - } + of.spanReader.Taint() + of.spanReader.Close() } return 0 } @@ -631,6 +675,7 @@ func (of *openFile) Read(ctx context.Context, dest []byte, off int64) (fuse.Read break } } + log.Debug("read %v bytes", internalOff) return fuse.ReadResultData(dest[:internalOff]), 0 } @@ -806,6 +851,8 @@ func main() { dirInfoCache = lib.NewDirInfoCache() + fileStatCache = make(map[msgs.InodeId]statCache) + writeBufPool = &sync.Pool{ New: func() any { buf := []byte{} diff --git a/go/integrationtest/filehistory.go b/go/integrationtest/filehistory.go index d1cc0746..7be97213 100644 --- a/go/integrationtest/filehistory.go +++ b/go/integrationtest/filehistory.go @@ -165,7 +165,7 @@ func runCheckpoint(log *lib.Logger, client *lib.Client, prefix string, files *fi func runStep(log *lib.Logger, client *lib.Client, dirInfoCache *lib.DirInfoCache, files *fileHistoryFiles, stepAny any) any { switch step := stepAny.(type) { case fileHistoryCreateFile: - id, creationTime := createFile(log, client, dirInfoCache, msgs.ROOT_DIR_INODE_ID, 0, step.name, 0, 0, 0, nil) + id, creationTime := createFile(log, client, dirInfoCache, msgs.ROOT_DIR_INODE_ID, 0, step.name, 0, 0, nil) files.addFile(step.name, id, creationTime) return fileHistoryCreatedFile{ name: step.name, diff --git a/go/integrationtest/fstest.go b/go/integrationtest/fstest.go index 4af06a40..9bec1231 100644 --- a/go/integrationtest/fstest.go +++ b/go/integrationtest/fstest.go @@ -30,9 +30,9 @@ type fsTestOpts struct { type fsTestHarness[Id comparable] interface { createDirectory(log *lib.Logger, owner Id, name string) (Id, msgs.EggsTime) rename(log *lib.Logger, targetId Id, oldOwner Id, oldCreationTime msgs.EggsTime, oldName string, newOwner Id, newName string) (Id, msgs.EggsTime) - createFile(log *lib.Logger, owner Id, spanSize uint32, name string, size uint64, trailingZeros uint32, dataSeed uint64) (Id, msgs.EggsTime) + createFile(log *lib.Logger, owner Id, spanSize uint32, name string, size uint64, dataSeed uint64) (Id, msgs.EggsTime) // if false, the harness does not support reading files (e.g. we're mocking block services) - checkFileData(log *lib.Logger, id Id, size uint64, trailingZeros uint32, dataSeed uint64) + checkFileData(log *lib.Logger, id Id, size uint64, dataSeed uint64) // files, directories readDirectory(log *lib.Logger, dir Id) ([]string, []string) } @@ -104,9 +104,9 @@ func (c *apiFsTestHarness) rename( } func (c *apiFsTestHarness) createFile( - log *lib.Logger, owner msgs.InodeId, spanSize uint32, name string, size uint64, trailingZeros uint32, dataSeed uint64, + log *lib.Logger, owner msgs.InodeId, spanSize uint32, name string, size uint64, dataSeed uint64, ) (msgs.InodeId, msgs.EggsTime) { - return createFile(log, c.client, c.dirInfoCache, owner, spanSize, name, size, trailingZeros, dataSeed, &c.fileContentsBuf) + return createFile(log, c.client, c.dirInfoCache, owner, spanSize, name, size, dataSeed, &c.fileContentsBuf) } func (c *apiFsTestHarness) readDirectory(log *lib.Logger, dir msgs.InodeId) (files []string, dirs []string) { @@ -155,9 +155,9 @@ func ensureLen(buf []byte, l int) []byte { return buf } -func (c *apiFsTestHarness) checkFileData(log *lib.Logger, id msgs.InodeId, size uint64, trailingZeros uint32, dataSeed uint64) { +func (c *apiFsTestHarness) checkFileData(log *lib.Logger, id msgs.InodeId, size uint64, dataSeed uint64) { fileData := readFile(log, c.readBufPool, c.client, id, &c.fileContentsBuf) - fullSize := int(size) + int(trailingZeros) + fullSize := int(size) c.fileContentsBuf = ensureLen(c.fileContentsBuf, fullSize) expectedData := c.fileContentsBuf[:int(size)] wyhash.New(dataSeed).Read(expectedData[:int(size)]) @@ -201,9 +201,9 @@ func (*posixFsTestHarness) rename( } func (c *posixFsTestHarness) createFile( - log *lib.Logger, dirFullPath string, spanSize uint32, name string, size uint64, trailingZeros uint32, dataSeed uint64, + log *lib.Logger, dirFullPath string, spanSize uint32, name string, size uint64, dataSeed uint64, ) (fileFullPath string, t msgs.EggsTime) { - c.actualDataBuf = ensureLen(c.actualDataBuf, int(size)+int(trailingZeros)) + c.actualDataBuf = ensureLen(c.actualDataBuf, int(size)) wyhash.New(dataSeed).Read(c.actualDataBuf[:size]) fileFullPath = path.Join(dirFullPath, name) log.LogStack(1, lib.DEBUG, "posix create file %v", fileFullPath) @@ -229,8 +229,8 @@ func (c *posixFsTestHarness) readDirectory(log *lib.Logger, dirFullPath string) return files, dirs } -func (c *posixFsTestHarness) checkFileData(log *lib.Logger, fullFilePath string, size uint64, trailingZeros uint32, dataSeed uint64) { - fullSize := int(size) + int(trailingZeros) +func (c *posixFsTestHarness) checkFileData(log *lib.Logger, fullFilePath string, size uint64, dataSeed uint64) { + fullSize := int(size) c.expectedDataBuf = ensureLen(c.expectedDataBuf, fullSize) wyhash.New(dataSeed).Read(c.expectedDataBuf[:int(size)]) c.actualDataBuf = ensureLen(c.actualDataBuf, fullSize) @@ -243,12 +243,13 @@ func (c *posixFsTestHarness) checkFileData(log *lib.Logger, fullFilePath string, if _, err := io.ReadFull(f, c.actualDataBuf); err != nil { panic(err) } - checkFileData(c.expectedDataBuf, c.actualDataBuf) + checkFileData(c.actualDataBuf, c.expectedDataBuf) // then we start doing random reads around if fullSize > 1 { for i := 0; i < 10; i++ { - offset := int(rand.Uint64()) % (fullSize - 1) - size := 1 + int(rand.Uint64())%(fullSize-offset-1) + offset := int(rand.Uint64() % uint64(fullSize-1)) + size := 1 + int(rand.Uint64()%uint64(fullSize-offset-1)) + log.Debug("reading from %v to %v in file of size %v", offset, offset+size, fullSize) if _, err := f.Seek(int64(offset), 0); err != nil { panic(err) } @@ -275,10 +276,9 @@ type fsTestChild[T any] struct { } type fsTestFile[Id comparable] struct { - id Id - size uint64 - trailingZeros uint32 - dataSeed uint64 + id Id + size uint64 + dataSeed uint64 } // We always use integers as names @@ -387,7 +387,7 @@ func (state *fsTestState[Id]) incrementFiles(log *lib.Logger, opts *fsTestOpts) } } -func (state *fsTestState[Id]) calcFileSize(log *lib.Logger, opts *fsTestOpts, rand *wyhash.Rand) (size uint64, trailingZeros uint32) { +func (state *fsTestState[Id]) calcFileSize(log *lib.Logger, opts *fsTestOpts, rand *wyhash.Rand) (size uint64) { p := rand.Float64() if p < opts.emptyFileProb { size = 0 @@ -396,10 +396,9 @@ func (state *fsTestState[Id]) calcFileSize(log *lib.Logger, opts *fsTestOpts, ra } else { size = 1 + rand.Uint64()%uint64(opts.maxFileSize) } - trailingZeros = rand.Uint32() % 100 state.totalFilesSize += size - log.Debug("creating file with size %v, trailing zeros %v, total size %v", size, trailingZeros, state.totalFilesSize) - return size, trailingZeros + log.Debug("creating file with size %v, total size %v", size, state.totalFilesSize) + return size } func (state *fsTestState[Id]) makeFile(log *lib.Logger, harness fsTestHarness[Id], opts *fsTestOpts, rand *wyhash.Rand, dirPath []int, name int) { @@ -413,17 +412,16 @@ func (state *fsTestState[Id]) makeFile(log *lib.Logger, harness fsTestHarness[Id if fileExists { panic("conflicting name (files)") } - size, trailingZeros := state.calcFileSize(log, opts, rand) + size := state.calcFileSize(log, opts, rand) dataSeed := rand.Uint64() id, creationTime := harness.createFile( - log, dir.id, uint32(opts.spanSize), strconv.Itoa(name), size, trailingZeros, dataSeed, + log, dir.id, uint32(opts.spanSize), strconv.Itoa(name), size, dataSeed, ) dir.children.files[name] = fsTestChild[fsTestFile[Id]]{ body: fsTestFile[Id]{ - id: id, - size: size, - trailingZeros: trailingZeros, - dataSeed: dataSeed, + id: id, + size: size, + dataSeed: dataSeed, }, creationTime: creationTime, } @@ -440,19 +438,18 @@ func (state *fsTestState[Id]) makeFileFromTemp(log *lib.Logger, harness fsTestHa if fileExists { panic("conflicting name (files)") } - size, trailingZeros := state.calcFileSize(log, opts, rand) + size := state.calcFileSize(log, opts, rand) dataSeed := rand.Uint64() tmpParentId := state.dir(tmpDirPath).id id, creationTime := harness.createFile( - log, tmpParentId, uint32(opts.spanSize), "tmp", size, trailingZeros, dataSeed, + log, tmpParentId, uint32(opts.spanSize), "tmp", size, dataSeed, ) newId, creationTime := harness.rename(log, id, tmpParentId, creationTime, "tmp", dir.id, strconv.Itoa(name)) dir.children.files[name] = fsTestChild[fsTestFile[Id]]{ body: fsTestFile[Id]{ - id: newId, - size: size, - trailingZeros: trailingZeros, - dataSeed: dataSeed, + id: newId, + size: size, + dataSeed: dataSeed, }, creationTime: creationTime, } @@ -474,7 +471,7 @@ func (d *fsTestDir[Id]) check(log *lib.Logger, harness fsTestHarness[Id]) { panic(fmt.Errorf("file %v not found", name)) } harness.checkFileData( - log, file.body.id, file.body.size, file.body.trailingZeros, file.body.dataSeed, + log, file.body.id, file.body.size, file.body.dataSeed, ) } for _, dirName := range dirs { diff --git a/go/integrationtest/integrationtest.go b/go/integrationtest/integrationtest.go index 74fe61c1..0ef0e780 100644 --- a/go/integrationtest/integrationtest.go +++ b/go/integrationtest/integrationtest.go @@ -92,7 +92,7 @@ func runTest( counters := &lib.ClientCounters{} - fmt.Printf("running %s, %s\n", name, extra) + fmt.Printf("running %s test, %s\n", name, extra) t0 := time.Now() run(counters) elapsed := time.Since(t0) @@ -153,7 +153,7 @@ func runTests(terminateChan chan any, log *lib.Logger, shuckleAddress string, fu log, shuckleAddress, filter, - "file history test", + "file history", fmt.Sprintf("%v threads, %v steps", fileHistoryOpts.threads, fileHistoryOpts.steps), func(counters *lib.ClientCounters) { fileHistoryTest(log, shuckleAddress, &fileHistoryOpts, counters) @@ -180,7 +180,7 @@ func runTests(terminateChan chan any, log *lib.Logger, shuckleAddress string, fu log, shuckleAddress, filter, - "direct fs test", + "direct fs", fmt.Sprintf("%v dirs, %v files, %v depth", fsTestOpts.numDirs, fsTestOpts.numFiles, fsTestOpts.depth), func(counters *lib.ClientCounters) { fsTest(log, shuckleAddress, &fsTestOpts, counters, "") @@ -191,13 +191,27 @@ func runTests(terminateChan chan any, log *lib.Logger, shuckleAddress string, fu log, shuckleAddress, filter, - "fuse fs test", + "fuse fs", fmt.Sprintf("%v dirs, %v files, %v depth", fsTestOpts.numDirs, fsTestOpts.numFiles, fsTestOpts.depth), func(counters *lib.ClientCounters) { fsTest(log, shuckleAddress, &fsTestOpts, counters, fuseMountPoint) }, ) + largeFileOpts := largeFileTestOpts{ + fileSize: 1 << 30, // 1GiB + } + runTest( + log, + shuckleAddress, + filter, + "large file", + fmt.Sprintf("%vGB", float64(largeFileOpts.fileSize)/1e9), + func(counters *lib.ClientCounters) { + largeFileTest(log, shuckleAddress, &largeFileOpts, counters, fuseMountPoint) + }, + ) + terminateChan <- nil } @@ -362,7 +376,7 @@ func main() { fuseMountPoint := procs.StartFuse(log, &managedprocess.FuseOpts{ Exe: eggsFuseExe, - Path: path.Join(*dataDir, "eggsfuse"), + Path: path.Join(*dataDir, "fuse"), LogLevel: level, Wait: true, ShuckleAddress: shuckleAddress, diff --git a/go/integrationtest/largefile.go b/go/integrationtest/largefile.go new file mode 100644 index 00000000..93629782 --- /dev/null +++ b/go/integrationtest/largefile.go @@ -0,0 +1,85 @@ +package main + +import ( + "bytes" + "crypto/sha256" + "fmt" + "io" + "os" + "os/exec" + "path" + "strings" + "xtx/eggsfs/lib" + "xtx/eggsfs/wyhash" +) + +type largeFileTestOpts struct { + fileSize uint64 +} + +func largeFileTest( + log *lib.Logger, + shuckleAddress string, + opts *largeFileTestOpts, + counters *lib.ClientCounters, + mountPoint string, +) { + // create single 1GiB file + log.Info("creating %vGB file", float64(opts.fileSize)/1e9) + fpath := path.Join(mountPoint, "big") + f, err := os.Create(fpath) + defer f.Close() + if err != nil { + panic(err) + } + rand := wyhash.New(0) + buf := make([]byte, 1<<20) + expectedSha := sha256.New() + written := uint64(0) + for written < opts.fileSize { + rand.Read(buf) + expectedSha.Write(buf) + if _, err := f.Write(buf); err != nil { + panic(err) + } + written += uint64(len(buf)) + } + if err := f.Close(); err != nil { + panic(err) + } + log.Info("expected sha: %x", expectedSha.Sum(nil)) + // first using our own reading + f, err = os.Open(fpath) + if err != nil { + panic(err) + } + actualSha := sha256.New() + for { + read, err := f.Read(buf) + if err == io.EOF { + break + } + if err != nil { + panic(err) + } + actualSha.Write(buf[:read]) + } + log.Info("actual sha: %x", actualSha.Sum(nil)) + if !bytes.Equal(expectedSha.Sum(nil), actualSha.Sum(nil)) { + panic(fmt.Errorf("expected sha %v, got %v", expectedSha.Sum(nil), actualSha.Sum(nil))) + } + // Purposefully using an external program to have somebody else issue syscalls. + // This one is currently interesting because sha256sum looks ahead in unpredicatble + // ways, which means that it stimulates the "unhappy" reading path in interesting ways. + // That said, it' be much better to just stimulate it with our code, given that we + // have no control on how `sha256sum` works. + sha256Out, err := exec.Command("sha256sum", fpath).Output() + if err != nil { + panic(err) + } + log.Info("sha256 out: %s", strings.TrimSpace(string(sha256Out))) + expectedSha256Out := fmt.Sprintf("%x %s\n", expectedSha.Sum(nil), fpath) + if string(sha256Out) != expectedSha256Out { + panic(fmt.Errorf("expected sha256 output %q, got %q", expectedSha256Out, string(sha256Out))) + } +} diff --git a/go/integrationtest/req.go b/go/integrationtest/req.go index 02939307..d32774d5 100644 --- a/go/integrationtest/req.go +++ b/go/integrationtest/req.go @@ -64,7 +64,6 @@ func createFile( spanSize uint32, name string, size uint64, - trailingZeros uint32, dataSeed uint64, buf *[]byte, ) (id msgs.InodeId, creationTime msgs.EggsTime) { @@ -76,7 +75,7 @@ func createFile( constructResp := msgs.ConstructFileResp{} shardReq(log, client, dirId.Shard(), &constructReq, &constructResp) rand := wyhash.New(dataSeed) - if size+uint64(trailingZeros) > 0 { + if size > 0 { var spanPolicy msgs.SpanPolicy if _, err := client.ResolveDirectoryInfoEntry(log, dirInfoCache, dirId, &spanPolicy); err != nil { panic(err) @@ -92,10 +91,8 @@ func createFile( // add spans for offset := uint64(0); offset < size; offset += uint64(spanSize) { thisSpanSize := spanSize - spanTrailingZeros := uint32(0) if uint32(size-offset) < thisSpanSize { thisSpanSize = uint32(size - offset) - spanTrailingZeros = trailingZeros } if len(*buf) < int(thisSpanSize) { *buf = append(*buf, make([]byte, int(thisSpanSize)-len(*buf))...) @@ -103,7 +100,7 @@ func createFile( spanBuf := (*buf)[:thisSpanSize] rand.Read(spanBuf) var err error - *buf, err = client.CreateSpan(log, []msgs.BlockServiceId{}, &spanPolicy, &blockPolicies, &stripePolicy, constructResp.Id, constructResp.Cookie, offset, uint32(thisSpanSize+spanTrailingZeros), spanBuf) + *buf, err = client.CreateSpan(log, []msgs.BlockServiceId{}, &spanPolicy, &blockPolicies, &stripePolicy, constructResp.Id, constructResp.Cookie, offset, uint32(thisSpanSize), spanBuf) if err != nil { panic(err) } @@ -137,7 +134,7 @@ func readFile(log *lib.Logger, bufPool *lib.ReadSpanBufPool, client *lib.Client, for i := range spansResp.Spans { span := &spansResp.Spans[i] // TODO random blacklist - spanR, err := client.ReadSpan(log, bufPool, []msgs.BlockServiceId{}, id, spansResp.BlockServices, span) + spanR, err := client.ReadSpan(log, bufPool, []msgs.BlockServiceId{}, spansResp.BlockServices, span) if err != nil { panic(err) } diff --git a/go/lib/client.go b/go/lib/client.go index 2cb4e699..511ff44f 100644 --- a/go/lib/client.go +++ b/go/lib/client.go @@ -530,13 +530,26 @@ func (client *Client) ResolvePath(log *Logger, path string) (msgs.InodeId, error return id, nil } +type Taintable interface { + Taint() +} + +type TaintableReadCloser interface { + io.ReadCloser + Taintable +} + +type TaintableCloser interface { + io.Closer + Taintable +} + type BlocksConn interface { io.Writer io.Reader io.ReaderFrom io.Closer - - Taint() + Taintable } type trackedBlocksConn struct { diff --git a/go/lib/span.go b/go/lib/span.go index 8ca29cdd..e42ce272 100644 --- a/go/lib/span.go +++ b/go/lib/span.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "path/filepath" "sync" "xtx/eggsfs/crc32c" "xtx/eggsfs/msgs" @@ -271,10 +272,76 @@ func (c *Client) CreateSpan( return data, nil } +func (c *Client) CreateFile( + log *Logger, + dirInfoCache *DirInfoCache, + path string, // must be absolute + r io.Reader, +) (msgs.InodeId, error) { + if path[0] != '/' { + return 0, fmt.Errorf("non-absolute file path %v", path) + } + dirPath := filepath.Dir(path) + fileName := filepath.Base(path) + if fileName == dirPath { + return 0, fmt.Errorf("bad file path %v", path) + } + dirId, err := c.ResolvePath(log, dirPath) + if err != nil { + return 0, err + } + spanPolicies := msgs.SpanPolicy{} + if _, err := c.ResolveDirectoryInfoEntry(log, dirInfoCache, dirId, &spanPolicies); err != nil { + return 0, err + } + blockPolicies := msgs.BlockPolicy{} + if _, err := c.ResolveDirectoryInfoEntry(log, dirInfoCache, dirId, &blockPolicies); err != nil { + return 0, err + } + stripePolicy := msgs.StripePolicy{} + if _, err := c.ResolveDirectoryInfoEntry(log, dirInfoCache, dirId, &stripePolicy); err != nil { + return 0, err + } + fileResp := msgs.ConstructFileResp{} + if err := c.ShardRequest(log, dirId.Shard(), &msgs.ConstructFileReq{Type: msgs.FILE}, &fileResp); err != nil { + return 0, err + } + fileId := fileResp.Id + cookie := fileResp.Cookie + maxSpanSize := spanPolicies.Entries[len(spanPolicies.Entries)-1].MaxSize + spanBuf := make([]byte, maxSpanSize) + offset := uint64(0) + for { + spanBuf = spanBuf[:maxSpanSize] + read, err := io.ReadFull(r, spanBuf) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + return 0, err + } + if err == io.EOF { + break + } + spanBuf, err = c.CreateSpan( + log, []msgs.BlockServiceId{}, &spanPolicies, &blockPolicies, &stripePolicy, + fileId, cookie, offset, uint32(read), spanBuf[:read], + ) + if err != nil { + return 0, err + } + offset += uint64(read) + if read < int(maxSpanSize) { + break + } + } + if err := c.ShardRequest(log, dirId.Shard(), &msgs.LinkFileReq{FileId: fileId, Cookie: cookie, OwnerId: dirId, Name: fileName}, &msgs.LinkFileResp{}); err != nil { + return 0, err + } + return fileId, nil +} + type mirroredSpanReader struct { cursor int block int - blockConn io.ReadCloser + blockConn TaintableReadCloser cellBuf *[]byte cellCrcs []msgs.Crc // starting from the _next_ stripe crc } @@ -283,11 +350,15 @@ func (r *mirroredSpanReader) Close() error { return r.blockConn.Close() } +func (r *mirroredSpanReader) Taint() { + r.blockConn.Taint() +} + type rsNormalSpanReader struct { bufPool *ReadSpanBufPool cursor int haveBlocks []uint8 // which blocks are we fetching. most of the times it'll just be the data blocks - blockConns []io.ReadCloser + blockConns []TaintableReadCloser blocksRunningCrcs []msgs.Crc stripeBuf *[]byte stripeCrcs []msgs.Crc // starting from the _next_ stripe CRC. @@ -309,6 +380,12 @@ func (sr *rsNormalSpanReader) Close() error { return lastErr } +func (sr *rsNormalSpanReader) Taint() { + for _, c := range sr.blockConns { + c.Taint() + } +} + // This is when we actively detect a bad CRC, and we have no choice but to load // the remainder of the span in its entirety to find out which block is broken, // and then resume. @@ -326,6 +403,8 @@ func (r *rsCorruptedSpanReader) Close() error { return nil } +func (*rsCorruptedSpanReader) Taint() {} + type spanReader struct { bufPool *ReadSpanBufPool spanSize uint32 @@ -336,14 +415,18 @@ type spanReader struct { stripes uint8 cellSize uint32 blocksCrcs []msgs.Crc - blockConn func(block int, offset uint32, size uint32) (io.ReadCloser, error) - r io.Closer + blockConn func(block int, offset uint32, size uint32) (TaintableReadCloser, error) + r TaintableCloser } func (sr *spanReader) Close() error { return sr.r.Close() } +func (sr *spanReader) Taint() { + sr.r.Taint() +} + func (sr *spanReader) repairCorruptedStripe( bufPool *ReadSpanBufPool, // the broken stripe @@ -352,7 +435,7 @@ func (sr *spanReader) repairCorruptedStripe( parityData []*[]byte, haveBlocks []uint8, // the blocks we have been using so far haveBlocksCrc []msgs.Crc, // the CRCs so far for the blocks that we have - haveBlocksConns []io.ReadCloser, // the connections to the blocks we already have + haveBlocksConns []TaintableReadCloser, // the connections to the blocks we already have ) (*rsCorruptedSpanReader, error) { D := sr.parity.DataBlocks() B := sr.parity.Blocks() @@ -739,8 +822,8 @@ func readSpanFromBlocks( // // We currently make the assumption that the connections that are available // at the beginning will be available throughout the duration of span reading. - blockConn func(block int, offset uint32, size uint32) (io.ReadCloser, error), -) (io.ReadCloser, error) { + blockConn func(block int, offset uint32, size uint32) (TaintableReadCloser, error), +) (TaintableReadCloser, error) { D := parity.DataBlocks() B := parity.Blocks() sr := spanReader{ @@ -778,7 +861,7 @@ func readSpanFromBlocks( return nil, err } } else { - conns := make([]io.ReadCloser, 0) + conns := make([]TaintableReadCloser, 0) haveBlocks := make([]uint8, 0) parityBuffers := []*[]byte{} for i := 0; i < B; i++ { @@ -844,10 +927,12 @@ func (r *inlineSpanReader) Read(p []byte) (int, error) { return read, nil } -func (r *inlineSpanReader) Close() error { +func (*inlineSpanReader) Close() error { return nil } +func (*inlineSpanReader) Taint() {} + type ReadSpanBufPool struct { pool sync.Pool } @@ -889,7 +974,8 @@ func (c *Client) ReadSpan( blacklist []msgs.BlockServiceId, blockServices []msgs.BlockService, fetchedSpan *msgs.FetchedSpan, -) (io.ReadCloser, error) { +) (TaintableReadCloser, error) { + log.DebugStack(1, "starting to read span") if fetchedSpan.Header.StorageClass == msgs.INLINE_STORAGE { data := fetchedSpan.Body.(*msgs.FetchedInlineSpan).Body dataCrc := msgs.Crc(crc32c.Sum(0, data)) @@ -909,7 +995,8 @@ func (c *Client) ReadSpan( for i := range blocksCrcs { blocksCrcs[i] = body.Blocks[i].Crc } - blockConn := func(blockIx int, offset uint32, size uint32) (io.ReadCloser, error) { + blockConn := func(blockIx int, offset uint32, size uint32) (TaintableReadCloser, error) { + log.DebugStack(1, "requested connection for block ix %v, offset %v, size %v", blockIx, offset, size) block := body.Blocks[blockIx] blockService := blockServices[block.BlockServiceIx] for _, blacklisted := range blacklist { diff --git a/go/lib/span_test.go b/go/lib/span_test.go index d324a014..e788f600 100644 --- a/go/lib/span_test.go +++ b/go/lib/span_test.go @@ -38,6 +38,8 @@ func (c *mockedBlockConn) Close() error { return nil } +func (*mockedBlockConn) Taint() {} + func testSpan( t *testing.T, bufPool *ReadSpanBufPool, @@ -141,7 +143,7 @@ func testSpan( openBlockConns := int64(0) spanReader, err := readSpanFromBlocks( bufPool, sizeWithZeros, req.Crc, req.Parity, req.Stripes, req.CellSize, blocksCrcs, stripesCrcs, - func(i int, offset uint32, size uint32) (io.ReadCloser, error) { + func(i int, offset uint32, size uint32) (TaintableReadCloser, error) { for _, b := range badConnections { if int(b) == i { return nil, nil