[no-release-notes] go/store/nbs: Change lifecycle for io.Reader in WriteTableFile / CopyTableFile so that the thing that opens the Reader is responsible for closing it.

This commit is contained in:
Aaron Son
2023-10-26 13:59:24 -07:00
parent 9c6c9761c9
commit 4318e77b99
7 changed files with 19 additions and 39 deletions
+1 -10
View File
@@ -112,16 +112,7 @@ func (s3p awsTablePersister) Exists(ctx context.Context, name addr, chunkCount u
)
}
func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.ReadCloser, fileId string, fileSz uint64, chunkCount uint32) error {
var err error
defer func() {
cerr := r.Close()
if err == nil {
err = cerr
}
}()
func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error {
name, err := parseAddr(fileId)
if err != nil {
return err
+13 -19
View File
@@ -178,13 +178,7 @@ func (bsp *blobstorePersister) Path() string {
return ""
}
func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.ReadCloser, name string, fileSz uint64, chunkCount uint32) (err error) {
defer func() {
if cerr := r.Close(); cerr != nil {
err = cerr
}
}()
func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, chunkCount uint32) error {
// sanity check file size
if fileSz < indexSize(chunkCount)+footerSize {
return fmt.Errorf("table file size %d too small for chunk count %d", fileSz, chunkCount)
@@ -197,36 +191,36 @@ func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.ReadClose
rr, ok := r.(io.ReaderAt)
if !ok {
// sequentially write chunk records then tail
if _, err = bsp.bs.Put(ctx, name+tableRecordsExt, lr); err != nil {
if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, lr); err != nil {
return err
}
if _, err = bsp.bs.Put(ctx, name+tableTailExt, r); err != nil {
if _, err := bsp.bs.Put(ctx, name+tableTailExt, r); err != nil {
return err
}
} else {
// on the push path, we expect to Put concurrently
// see BufferedFileByteSink in byte_sink.go
eg, ectx := errgroup.WithContext(ctx)
eg.Go(func() (err error) {
eg.Go(func() error {
buf := make([]byte, indexSize(chunkCount)+footerSize)
if _, err = rr.ReadAt(buf, off); err != nil {
if _, err := rr.ReadAt(buf, off); err != nil {
return err
}
_, err = bsp.bs.Put(ectx, name+tableTailExt, bytes.NewBuffer(buf))
return
_, err := bsp.bs.Put(ectx, name+tableTailExt, bytes.NewBuffer(buf))
return err
})
eg.Go(func() (err error) {
_, err = bsp.bs.Put(ectx, name+tableRecordsExt, lr)
return
eg.Go(func() error {
_, err := bsp.bs.Put(ectx, name+tableRecordsExt, lr)
return err
})
if err = eg.Wait(); err != nil {
if err := eg.Wait(); err != nil {
return err
}
}
// finally concatenate into the complete table
_, err = bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt})
return
_, err := bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt})
return err
}
type bsTableReaderAt struct {
+1 -8
View File
@@ -95,15 +95,8 @@ func (ftp *fsTablePersister) Path() string {
return ftp.dir
}
func (ftp *fsTablePersister) CopyTableFile(ctx context.Context, r io.ReadCloser, fileId string, fileSz uint64, chunkCount uint32) error {
func (ftp *fsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error {
tn, f, err := func() (n string, cleanup func(), err error) {
defer func() {
cerr := r.Close()
if err == nil {
err = cerr
}
}()
ftp.removeMu.Lock()
var temp *os.File
temp, err = tempfiles.MovableTempFileProvider.NewFile(ftp.dir, tempTablePrefix)
+1
View File
@@ -110,6 +110,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister
if err != nil {
return nil, err
}
defer r.Close()
sz := gcc.writer.ContentLength()
err = tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount()))
+1 -1
View File
@@ -246,7 +246,7 @@ func (j *chunkJournal) Path() string {
return filepath.Dir(j.path)
}
func (j *chunkJournal) CopyTableFile(ctx context.Context, r io.ReadCloser, fileId string, fileSz uint64, chunkCount uint32) error {
func (j *chunkJournal) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error {
if j.backing.readOnly() {
return errReadOnlyManifest
}
+1
View File
@@ -1464,6 +1464,7 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, nu
if err != nil {
return err
}
defer r.Close()
return tfp.CopyTableFile(ctx, r, fileId, sz, uint32(numChunks))
}
+1 -1
View File
@@ -70,7 +70,7 @@ type tableFilePersister interface {
tablePersister
// CopyTableFile copies the table file with the given fileId from the reader to the TableFileStore.
CopyTableFile(ctx context.Context, r io.ReadCloser, fileId string, fileSz uint64, chunkCount uint32) error
CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error
// Path returns the file system path. Use CopyTableFile instead of Path to
// copy a file to the TableFileStore. Path cannot be removed because it's used