diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index 84ef77a866..9fb7283491 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -418,7 +418,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes } func (rs *RemoteChunkStore) getUploadUrl(md metadata.MD, repoPath string, tfd *remotesapi.TableFileDetails) *url.URL { - fileID := hash.New(tfd.Id).String() + fileID := hash.New(tfd.Id).String() + tfd.Suffix params := url.Values{} params.Add("num_chunks", strconv.Itoa(int(tfd.NumChunks))) params.Add("content_length", strconv.Itoa(int(tfd.ContentLength))) diff --git a/go/libraries/doltcore/remotesrv/http.go b/go/libraries/doltcore/remotesrv/http.go index ba1ea046ee..3737d71f22 100644 --- a/go/libraries/doltcore/remotesrv/http.go +++ b/go/libraries/doltcore/remotesrv/http.go @@ -123,8 +123,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { } i := strings.LastIndex(path, "/") - // a table file name is currently 32 characters, plus the '/' is 33. - if i < 0 || len(path[i:]) != 33 { + if i < 0 || !validateFileName(path[i+1:]) { logger = logger.WithField("status", http.StatusNotFound) respWr.WriteHeader(http.StatusNotFound) return @@ -231,7 +230,7 @@ func readTableFile(logger *logrus.Entry, path string, respWr http.ResponseWriter } }() - if rangeStr == "" { + if rangeStr != "" { respWr.WriteHeader(http.StatusPartialContent) } else { respWr.WriteHeader(http.StatusOK) @@ -288,8 +287,7 @@ func (u *uploadreader) Close() error { } func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, path, fileId string, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) (*logrus.Entry, int) { - _, ok := hash.MaybeParse(fileId) - if !ok { + if !validateFileName(fileId) { logger = logger.WithField("status", http.StatusBadRequest) logger.Warnf("%s is not a valid hash", fileId) return logger, http.StatusBadRequest @@ -443,3 +441,15 @@ func ExtractBasicAuthCreds(ctx context.Context) (*RequestCredentials, error) { return &RequestCredentials{Username: username, Password: password, Address: addr.Addr.String()}, nil } } + +func validateFileName(fileName string) bool { + if len(fileName) == 32 { + _, ok := hash.MaybeParse(fileName) + return ok + } + if len(fileName) == 32+len(nbs.ArchiveFileSuffix) && strings.HasSuffix(fileName, nbs.ArchiveFileSuffix) { + _, ok := hash.MaybeParse(fileName[:32]) + return ok + } + return false +} diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index cd4f34f2d7..f731f0a4ce 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -933,7 +933,9 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has } for h, contentHash := range hashToContentHash { - err := dcs.uploadTableFileWithRetries(ctx, h, uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) { + // Tables created on this path are always starting from memory tables and ending up as noms table files. + // As a result, the suffix is always empty. + err := dcs.uploadTableFileWithRetries(ctx, h, "", uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) { data := hashToData[h] return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil }) @@ -945,7 +947,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has return hashToCount, nil } -func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error { +func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, suffix string, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error { op := func() error { body, contentLength, err := getContent() if err != nil { @@ -957,6 +959,7 @@ func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, table ContentLength: contentLength, ContentHash: tableFileContentHash, NumChunks: numChunks, + Suffix: suffix, } dcs.logf("getting upload location for file %s", tableFileId.String()) @@ -1061,17 +1064,14 @@ func (dcs *DoltChunkStore) SupportedOperations() chunks.TableFileStoreOps { // WriteTableFile reads a table file from the provided reader and writes it to the chunk store. func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { - // Err if the suffix is an archive file + suffix := "" if strings.HasSuffix(fileId, nbs.ArchiveFileSuffix) { - return errors.New("cannot write archive file ids currently.") + suffix = nbs.ArchiveFileSuffix + fileId = strings.TrimSuffix(fileId, nbs.ArchiveFileSuffix) } fileIdBytes := hash.Parse(fileId) - err := dcs.uploadTableFileWithRetries(ctx, fileIdBytes, uint64(numChunks), contentHash, getRd) - if err != nil { - return err - } - return nil + return dcs.uploadTableFileWithRetries(ctx, fileIdBytes, suffix, uint64(numChunks), contentHash, getRd) } // AddTableFilesToManifest adds table files to the manifest diff --git a/go/store/datas/pull/pull_table_file_writer.go b/go/store/datas/pull/pull_table_file_writer.go index 8e27dff9a4..2e4ca96c1f 100644 --- a/go/store/datas/pull/pull_table_file_writer.go +++ b/go/store/datas/pull/pull_table_file_writer.go @@ -17,6 +17,8 @@ package pull import ( "context" "io" + "os" + "strings" "sync" "sync/atomic" @@ -54,7 +56,7 @@ import ( type PullTableFileWriter struct { cfg PullTableFileWriterConfig - addChunkCh chan nbs.CompressedChunk + addChunkCh chan nbs.ToChunker newWriterCh chan nbs.GenericTableWriter egCtx context.Context eg *errgroup.Group @@ -99,7 +101,7 @@ type PullTableFileWriterStats struct { func NewPullTableFileWriter(ctx context.Context, cfg PullTableFileWriterConfig) *PullTableFileWriter { ret := &PullTableFileWriter{ cfg: cfg, - addChunkCh: make(chan nbs.CompressedChunk), + addChunkCh: make(chan nbs.ToChunker), newWriterCh: make(chan nbs.GenericTableWriter, cfg.MaximumBufferedFiles), getAddrs: cfg.GetAddrs, } @@ -124,7 +126,7 @@ func (w *PullTableFileWriter) GetStats() PullTableFileWriterStats { // This method may block for arbitrary amounts of time if there is already a // lot of buffered table files and we are waiting for uploads to succeed before // creating more table files. -func (w *PullTableFileWriter) AddCompressedChunk(ctx context.Context, chk nbs.CompressedChunk) error { +func (w *PullTableFileWriter) AddToChunker(ctx context.Context, chk nbs.ToChunker) error { select { case w.addChunkCh <- chk: return nil @@ -169,7 +171,12 @@ func (w *PullTableFileWriter) uploadAndFinalizeThread() (err error) { go func() { defer manifestWg.Done() for ttf := range respCh { - manifestUpdates[ttf.id] = ttf.numChunks + id := ttf.id + if strings.HasSuffix(id, nbs.ArchiveFileSuffix) { + id = id[:len(id)-len(nbs.ArchiveFileSuffix)] + } + + manifestUpdates[id] = ttf.numChunks } }() @@ -235,19 +242,23 @@ LOOP: } if curWr == nil { - // curWr, err = nbs.NewCmpChunkTableWriter(w.cfg.TempDir) - curWr, err = nbs.NewArchiveStreamWriter() + if os.Getenv("DOLT_ARCHIVE_PULL_STREAMER") != "" { + curWr, err = nbs.NewArchiveStreamWriter(w.cfg.TempDir) + } else { + curWr, err = nbs.NewCmpChunkTableWriter(w.cfg.TempDir) + } if err != nil { return err } } // Add the chunk to writer. - err = curWr.AddChunk(newChnk) + bytes, err := curWr.AddChunk(newChnk) if err != nil { return err } - atomic.AddUint64(&w.bufferedSendBytes, uint64(len(newChnk.FullCompressedChunk))) + + atomic.AddUint64(&w.bufferedSendBytes, uint64(bytes)) } } diff --git a/go/store/datas/pull/puller.go b/go/store/datas/pull/puller.go index a95e1e3559..a5ffde0102 100644 --- a/go/store/datas/pull/puller.go +++ b/go/store/datas/pull/puller.go @@ -381,19 +381,9 @@ func (p *Puller) Pull(ctx context.Context) error { } tracker.TickProcessed() - if compressedChunk, ok := cChk.(nbs.CompressedChunk); ok { - err = p.wr.AddCompressedChunk(ctx, compressedChunk) - if err != nil { - return err - } - } else if _, ok := cChk.(nbs.ArchiveToChunker); ok { - // NM4 - Until we can write quickly to archives..... - cc := nbs.ChunkToCompressedChunk(chnk) - - err = p.wr.AddCompressedChunk(ctx, cc) - if err != nil { - return err - } + err = p.wr.AddToChunker(ctx, cChk) + if err != nil { + return err } } }) diff --git a/go/store/nbs/archive_build.go b/go/store/nbs/archive_build.go index 42a8835a44..5c19d8190b 100644 --- a/go/store/nbs/archive_build.go +++ b/go/store/nbs/archive_build.go @@ -68,7 +68,7 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p err = arc.iterate(ctx, func(chk chunks.Chunk) error { cmpChk := ChunkToCompressedChunk(chk) - err := classicTable.AddChunk(cmpChk) + _, err := classicTable.AddChunk(cmpChk) if err != nil { return err } @@ -233,7 +233,7 @@ func convertTableFileToArchive( cmpBuff = gozstd.Compress(cmpBuff[:0], defaultDict) // p("Default Dict Raw vs Compressed: %d , %d\n", len(defaultDict), len(cmpDefDict)) - arcW, err := newArchiveWriter() + arcW, err := newArchiveWriter("") if err != nil { return "", hash.Hash{}, err } diff --git a/go/store/nbs/archive_writer.go b/go/store/nbs/archive_writer.go index a042af254e..d727a4dd8d 100644 --- a/go/store/nbs/archive_writer.go +++ b/go/store/nbs/archive_writer.go @@ -91,8 +91,8 @@ There is a workflow to writing an archive: // newArchiveWriter creates a new archiveWriter. Output is written to a temp file, as the file name won't be known // until we've finished writing the footer. -func newArchiveWriter() (*archiveWriter, error) { - bs, err := NewBufferedFileByteSink("", defaultTableSinkBlockSize, defaultChBufferSize) +func newArchiveWriter(tmpDir string) (*archiveWriter, error) { + bs, err := NewBufferedFileByteSink(tmpDir, defaultTableSinkBlockSize, defaultChBufferSize) if err != nil { return nil, err } @@ -537,8 +537,8 @@ type ArchiveStreamWriter struct { chunkCount *int32 } -func NewArchiveStreamWriter() (*ArchiveStreamWriter, error) { - writer, err := newArchiveWriter() +func NewArchiveStreamWriter(tmpDir string) (*ArchiveStreamWriter, error) { + writer, err := newArchiveWriter(tmpDir) if err != nil { return nil, err } @@ -575,18 +575,18 @@ func (asw *ArchiveStreamWriter) ChunkCount() int { return int(atomic.LoadInt32(asw.chunkCount)) } -func (asw *ArchiveStreamWriter) AddChunk(chunker ToChunker) error { +func (asw *ArchiveStreamWriter) AddChunk(chunker ToChunker) (uint32, error) { if cc, ok := chunker.(CompressedChunk); ok { return asw.writeCompressedChunk(cc) } if ac, ok := chunker.(ArchiveToChunker); ok { return asw.writeArchiveToChunker(ac) } - return fmt.Errorf("Unknown chunk type: %T", chunker) + return 0, fmt.Errorf("Unknown chunk type: %T", chunker) } func (asw *ArchiveStreamWriter) ContentLength() uint64 { - return asw.writer.bytesWritten + return asw.writer.md5Summer.Size() } func (asw *ArchiveStreamWriter) GetMD5() []byte { @@ -597,9 +597,11 @@ func (asw *ArchiveStreamWriter) Remove() error { return os.Remove(asw.writer.finalPath) } -func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker) error { +func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker) (uint32, error) { dict := chunker.dict + bytesWritten := uint32(0) + var err error dictId, ok := asw.dictMap[dict] if !ok { @@ -618,6 +620,7 @@ func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker) if err != nil { return err } + bytesWritten += uint32(len(*dict.rawDictionary)) asw.dictMap[dict] = dictId return nil }() @@ -625,18 +628,20 @@ func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker) dataId, err := asw.writer.writeByteSpan(chunker.chunkData) if err != nil { - return err + return bytesWritten, err } + bytesWritten += uint32(len(chunker.chunkData)) atomic.AddInt32(asw.chunkCount, 1) - return asw.writer.stageZStdChunk(chunker.Hash(), dictId, dataId) + return bytesWritten, asw.writer.stageZStdChunk(chunker.Hash(), dictId, dataId) } -func (asw *ArchiveStreamWriter) writeCompressedChunk(chunker CompressedChunk) error { +func (asw *ArchiveStreamWriter) writeCompressedChunk(chunker CompressedChunk) (uint32, error) { + writeCount := uint32(len(chunker.FullCompressedChunk)) dataId, err := asw.writer.writeByteSpan(chunker.FullCompressedChunk) if err != nil { - return err + return 0, err } atomic.AddInt32(asw.chunkCount, 1) - return asw.writer.stageSnappyChunk(chunker.Hash(), dataId) + return writeCount, asw.writer.stageSnappyChunk(chunker.Hash(), dataId) } diff --git a/go/store/nbs/byte_sink.go b/go/store/nbs/byte_sink.go index e2ba443225..1d02013542 100644 --- a/go/store/nbs/byte_sink.go +++ b/go/store/nbs/byte_sink.go @@ -316,7 +316,7 @@ func (sink *BufferedFileByteSink) Reader() (io.ReadCloser, error) { return os.Open(sink.path) } -// HashingByteSink is a ByteSink that keeps an md5 hash of all the data written to it. +// HashingByteSink is a ByteSink that keeps an hash of all the data written to it. type HashingByteSink struct { backingSink ByteSink hasher hash.Hash diff --git a/go/store/nbs/cmp_chunk_table_writer.go b/go/store/nbs/cmp_chunk_table_writer.go index 13a4feb519..856975d748 100644 --- a/go/store/nbs/cmp_chunk_table_writer.go +++ b/go/store/nbs/cmp_chunk_table_writer.go @@ -18,6 +18,7 @@ import ( "crypto/sha512" "encoding/binary" "errors" + "fmt" gohash "hash" "io" "os" @@ -38,9 +39,11 @@ type GenericTableWriter interface { // ChunkCount returns the number of chunks written to the table file. This can be called before Finish to determine // if the maximum number of chunks has been reached. ChunkCount() int - // AddChunk adds a chunk to the table file. The underlying implementation of ToChunker will probably be exploided + // AddChunk adds a chunk to the table file. The underlying implementation of ToChunker will probably be exploited // by implementors of GenericTableWriter so that their bytes can be efficiently written to the table file. - AddChunk(ToChunker) error + // + // If no error occurs, the number of bytes written to the store is returned. + AddChunk(ToChunker) (uint32, error) // ContentLength returns the number of bytes written to the table file. This can be called before Finish to determine // if the file is too large. ContentLength() uint64 @@ -65,7 +68,6 @@ var ErrDuplicateChunkWritten = errors.New("duplicate chunks written") // CmpChunkTableWriter writes CompressedChunks to a table file type CmpChunkTableWriter struct { sink *HashingByteSink - totalCompressedData uint64 totalUncompressedData uint64 prefixes prefixIndexSlice blockAddr *hash.Hash @@ -81,7 +83,7 @@ func NewCmpChunkTableWriter(tempDir string) (*CmpChunkTableWriter, error) { return nil, err } - return &CmpChunkTableWriter{NewMD5HashingByteSink(s), 0, 0, nil, nil, s.path}, nil + return &CmpChunkTableWriter{NewMD5HashingByteSink(s), 0, nil, nil, s.path}, nil } func (tw *CmpChunkTableWriter) ChunkCount() int { @@ -99,12 +101,12 @@ func (tw *CmpChunkTableWriter) GetMD5() []byte { } // AddCmpChunk adds a compressed chunk -func (tw *CmpChunkTableWriter) AddChunk(tc ToChunker) error { +func (tw *CmpChunkTableWriter) AddChunk(tc ToChunker) (uint32, error) { if tc.IsGhost() { // Ghost chunks cannot be written to a table file. They should // always be filtered by the write processes before landing // here. - return ErrGhostChunkRequested + return 0, ErrGhostChunkRequested } if tc.IsEmpty() { panic("NBS blocks cannot be zero length") @@ -112,33 +114,41 @@ func (tw *CmpChunkTableWriter) AddChunk(tc ToChunker) error { c, ok := tc.(CompressedChunk) if !ok { - panic("runtime error: Require a CompressedChunk instance") + if arc, ok := tc.(ArchiveToChunker); ok { + // Decompress, and recompress since we can only write snappy compressed objects to this store. + chk, err := arc.ToChunk() + if err != nil { + return 0, err + } + c = ChunkToCompressedChunk(chk) + } else { + panic(fmt.Sprintf("Unknown chunk type: %T", tc)) + } } uncmpLen, err := snappy.DecodedLen(c.CompressedData) if err != nil { - return err + return 0, err } - fullLen := len(c.FullCompressedChunk) + fullLen := uint32(len(c.FullCompressedChunk)) _, err = tw.sink.Write(c.FullCompressedChunk) if err != nil { - return err + return 0, err } - tw.totalCompressedData += uint64(len(c.CompressedData)) tw.totalUncompressedData += uint64(uncmpLen) // Stored in insertion order tw.prefixes = append(tw.prefixes, prefixIndexRec{ c.H, uint32(len(tw.prefixes)), - uint32(fullLen), + fullLen, }) - return nil + return fullLen, nil } // Finish will write the index and footer of the table file and return the id of the file. diff --git a/go/store/nbs/cmp_chunk_table_writer_test.go b/go/store/nbs/cmp_chunk_table_writer_test.go index d860c6348a..af6a701d30 100644 --- a/go/store/nbs/cmp_chunk_table_writer_test.go +++ b/go/store/nbs/cmp_chunk_table_writer_test.go @@ -59,7 +59,7 @@ func TestCmpChunkTableWriter(t *testing.T) { tw, err := NewCmpChunkTableWriter("") require.NoError(t, err) for _, cmpChnk := range found { - err = tw.AddChunk(cmpChnk) + _, err = tw.AddChunk(cmpChnk) require.NoError(t, err) } @@ -70,9 +70,9 @@ func TestCmpChunkTableWriter(t *testing.T) { tw, err := NewCmpChunkTableWriter("") require.NoError(t, err) for _, cmpChnk := range found { - err = tw.AddChunk(cmpChnk) + _, err = tw.AddChunk(cmpChnk) require.NoError(t, err) - err = tw.AddChunk(cmpChnk) + _, err = tw.AddChunk(cmpChnk) require.NoError(t, err) } _, err = tw.Finish() @@ -98,7 +98,8 @@ func TestCmpChunkTableWriter(t *testing.T) { func TestCmpChunkTableWriterGhostChunk(t *testing.T) { tw, err := NewCmpChunkTableWriter("") require.NoError(t, err) - require.Error(t, tw.AddChunk(NewGhostCompressedChunk(hash.Parse("6af71afc2ea0hmp4olev0vp9q1q5gvb1")))) + _, err = tw.AddChunk(NewGhostCompressedChunk(hash.Parse("6af71afc2ea0hmp4olev0vp9q1q5gvb1"))) + require.Error(t, err) } func TestContainsDuplicates(t *testing.T) { diff --git a/go/store/nbs/gc_copier.go b/go/store/nbs/gc_copier.go index c60b61199b..d3511e9dca 100644 --- a/go/store/nbs/gc_copier.go +++ b/go/store/nbs/gc_copier.go @@ -57,7 +57,8 @@ func newGarbageCollectionCopier(tfp tableFilePersister) (*gcCopier, error) { } func (gcc *gcCopier) addChunk(ctx context.Context, c ToChunker) error { - return gcc.writer.AddChunk(c) + _, err := gcc.writer.AddChunk(c) + return err } // If the writer should be closed and deleted, instead of being used with