mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-07 08:50:34 -06:00
mem table WriteChunks to return splitOffset
This commit is contained in:
@@ -922,14 +922,14 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has
|
||||
|
||||
// structuring so this can be done as multiple files in the future.
|
||||
{
|
||||
name, data, err := nbs.WriteChunks(chnks)
|
||||
name, data, splitOffset, err := nbs.WriteChunks(chnks)
|
||||
|
||||
if err != nil {
|
||||
return map[hash.Hash]int{}, err
|
||||
}
|
||||
|
||||
h := hash.Parse(name)
|
||||
hashToSplitOffset[h] = uint64(len(data))
|
||||
hashToSplitOffset[h] = splitOffset
|
||||
hashToData[h] = data
|
||||
hashToCount[h] = len(chnks)
|
||||
|
||||
|
||||
@@ -136,7 +136,7 @@ func (s3p awsTablePersister) key(k string) string {
|
||||
}
|
||||
|
||||
func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
|
||||
name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
if err != nil {
|
||||
return emptyChunkSource{}, gcBehavior_Continue, err
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ var _ tableFilePersister = &blobstorePersister{}
|
||||
// Persist makes the contents of mt durable. Chunks already present in
|
||||
// |haver| may be dropped in the process.
|
||||
func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
|
||||
address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
address, data, splitOffset, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
if err != nil {
|
||||
return emptyChunkSource{}, gcBehavior_Continue, err
|
||||
}
|
||||
@@ -58,7 +58,7 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver
|
||||
name := address.String()
|
||||
|
||||
// persist this table in two parts to facilitate later conjoins
|
||||
records, tail := splitTableParts(data, chunkCount)
|
||||
records, tail := data[:splitOffset], data[splitOffset:]
|
||||
|
||||
// first write table records and tail (index+footer) as separate blobs
|
||||
eg, ectx := errgroup.WithContext(ctx)
|
||||
@@ -395,17 +395,6 @@ func newBSTableChunkSource(ctx context.Context, bs blobstore.Blobstore, name has
|
||||
return &chunkSourceAdapter{tr, name}, nil
|
||||
}
|
||||
|
||||
// splitTableParts separates a table into chunk records and meta data.
|
||||
//
|
||||
// +----------------------+-------+--------+
|
||||
// table format: | Chunk Record 0 ... N | Index | Footer |
|
||||
// +----------------------+-------+--------+
|
||||
func splitTableParts(data []byte, count uint32) (records, tail []byte) {
|
||||
o := tableTailOffset(uint64(len(data)), count)
|
||||
records, tail = data[:o], data[o:]
|
||||
return
|
||||
}
|
||||
|
||||
func tableTailOffset(size uint64, count uint32) uint64 {
|
||||
return size - (indexSize(count) + footerSize)
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
|
||||
// Put some chunks in a table file and get the buffer back which contains the table file data
|
||||
ctx := context.Background()
|
||||
|
||||
expectedId, buff, err := WriteChunks(testMDChunks)
|
||||
expectedId, buff, _, err := WriteChunks(testMDChunks)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Setup a TableReader to read compressed chunks out of
|
||||
|
||||
@@ -95,7 +95,7 @@ func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch
|
||||
t1 := time.Now()
|
||||
defer stats.PersistLatency.SampleTimeSince(t1)
|
||||
|
||||
name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
if err != nil {
|
||||
return emptyChunkSource{}, gcBehavior_Continue, err
|
||||
}
|
||||
|
||||
@@ -41,7 +41,9 @@ const (
|
||||
chunkNotAdded
|
||||
)
|
||||
|
||||
func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) {
|
||||
// WriteChunks writes the provided chunks to a newly created memory table and returns the name and data of the resulting
|
||||
// table.
|
||||
func WriteChunks(chunks []chunks.Chunk) (name string, data []byte, splitOffset uint64, err error) {
|
||||
var size uint64
|
||||
for _, chunk := range chunks {
|
||||
size += uint64(len(chunk.Data()))
|
||||
@@ -52,26 +54,25 @@ func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) {
|
||||
return writeChunksToMT(mt, chunks)
|
||||
}
|
||||
|
||||
func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error) {
|
||||
func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (name string, data []byte, splitOffset uint64, err error) {
|
||||
for _, chunk := range chunks {
|
||||
res := mt.addChunk(chunk.Hash(), chunk.Data())
|
||||
if res == chunkNotAdded {
|
||||
return "", nil, errors.New("didn't create this memory table with enough space to add all the chunks")
|
||||
return "", nil, 0, errors.New("didn't create this memory table with enough space to add all the chunks")
|
||||
}
|
||||
}
|
||||
|
||||
var stats Stats
|
||||
name, data, count, _, err := mt.write(nil, nil, &stats)
|
||||
|
||||
h, data, splitOffset, count, _, err := mt.write(nil, nil, &stats)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return "", nil, 0, err
|
||||
}
|
||||
|
||||
if count != uint32(len(chunks)) {
|
||||
return "", nil, errors.New("didn't write everything")
|
||||
return "", nil, 0, errors.New("didn't write everything")
|
||||
}
|
||||
|
||||
return name.String(), data, nil
|
||||
return h.String(), data, splitOffset, nil
|
||||
}
|
||||
|
||||
type memTable struct {
|
||||
@@ -81,7 +82,7 @@ type memTable struct {
|
||||
pendingRefs []hasRecord
|
||||
getChildAddrs []chunks.GetAddrsCb
|
||||
maxData uint64
|
||||
totalData uint64
|
||||
totalData uint64 // size of uncompressed data in chunks
|
||||
}
|
||||
|
||||
func newMemTable(memTableSize uint64) *memTable {
|
||||
@@ -220,11 +221,11 @@ func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name hash.Hash, data []byte, count uint32, gcb gcBehavior, err error) {
|
||||
func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name hash.Hash, data []byte, splitOffset uint64, chunkCount uint32, gcb gcBehavior, err error) {
|
||||
gcb = gcBehavior_Continue
|
||||
numChunks := uint64(len(mt.order))
|
||||
if numChunks == 0 {
|
||||
return hash.Hash{}, nil, 0, gcBehavior_Continue, fmt.Errorf("mem table cannot write with zero chunks")
|
||||
return hash.Hash{}, nil, 0, 0, gcBehavior_Continue, fmt.Errorf("mem table cannot write with zero chunks")
|
||||
}
|
||||
maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData)
|
||||
// todo: memory quota
|
||||
@@ -235,10 +236,10 @@ func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name
|
||||
sort.Sort(hasRecordByPrefix(mt.order)) // hasMany() requires addresses to be sorted.
|
||||
_, gcb, err = haver.hasMany(mt.order, keeper)
|
||||
if err != nil {
|
||||
return hash.Hash{}, nil, 0, gcBehavior_Continue, err
|
||||
return hash.Hash{}, nil, 0, 0, gcBehavior_Continue, err
|
||||
}
|
||||
if gcb != gcBehavior_Continue {
|
||||
return hash.Hash{}, nil, 0, gcb, err
|
||||
return hash.Hash{}, nil, 0, 0, gcb, err
|
||||
}
|
||||
|
||||
sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write
|
||||
@@ -248,23 +249,24 @@ func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name
|
||||
if !addr.has {
|
||||
h := addr.a
|
||||
tw.addChunk(*h, mt.chunks[*h])
|
||||
count++
|
||||
chunkCount++
|
||||
}
|
||||
}
|
||||
tableSize, name, err := tw.finish()
|
||||
|
||||
if err != nil {
|
||||
return hash.Hash{}, nil, 0, gcBehavior_Continue, err
|
||||
return hash.Hash{}, nil, 0, 0, gcBehavior_Continue, err
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
splitOffset = tableSize - (indexSize(chunkCount) + footerSize)
|
||||
|
||||
if chunkCount > 0 {
|
||||
stats.BytesPerPersist.Sample(uint64(tableSize))
|
||||
stats.CompressedChunkBytesPerPersist.Sample(uint64(tw.totalCompressedData))
|
||||
stats.UncompressedChunkBytesPerPersist.Sample(uint64(tw.totalUncompressedData))
|
||||
stats.ChunksPerPersist.Sample(uint64(count))
|
||||
stats.ChunksPerPersist.Sample(uint64(chunkCount))
|
||||
}
|
||||
|
||||
return name, buff[:tableSize], count, gcBehavior_Continue, nil
|
||||
return name, buff[:tableSize], splitOffset, chunkCount, gcBehavior_Continue, nil
|
||||
}
|
||||
|
||||
func (mt *memTable) close() error {
|
||||
|
||||
@@ -64,20 +64,17 @@ func mustChunk(chunk chunks.Chunk, err error) chunks.Chunk {
|
||||
}
|
||||
|
||||
func TestWriteChunks(t *testing.T) {
|
||||
name, data, err := WriteChunks(testMDChunks)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
name, data, splitOffSet, err := WriteChunks(testMDChunks)
|
||||
require.NoError(t, err)
|
||||
// Size of written data is stable so long as we don't change testMDChunks
|
||||
assert.Equal(t, uint64(845), splitOffSet)
|
||||
assert.Equal(t, 1089, len(data))
|
||||
|
||||
dir, err := os.MkdirTemp("", "write_chunks_test")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
err = os.WriteFile(dir+name, data, os.ModePerm)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestMemTableAddHasGetChunk(t *testing.T) {
|
||||
@@ -169,7 +166,7 @@ func TestMemTableWrite(t *testing.T) {
|
||||
defer tr2.close()
|
||||
assert.True(tr2.has(computeAddr(chunks[2]), nil))
|
||||
|
||||
_, data, count, _, err := mt.write(chunkReaderGroup{tr1, tr2}, nil, &Stats{})
|
||||
_, data, _, count, _, err := mt.write(chunkReaderGroup{tr1, tr2}, nil, &Stats{})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(uint32(1), count)
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ var _ tableFilePersister = &noConjoinBlobstorePersister{}
|
||||
// Persist makes the contents of mt durable. Chunks already present in
|
||||
// |haver| may be dropped in the process.
|
||||
func (bsp *noConjoinBlobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
|
||||
address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
address, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
if err != nil {
|
||||
return emptyChunkSource{}, gcBehavior_Continue, err
|
||||
} else if gcb != gcBehavior_Continue {
|
||||
|
||||
@@ -510,7 +510,7 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c
|
||||
return emptyChunkSource{}, gcBehavior_Continue, nil
|
||||
}
|
||||
|
||||
name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats)
|
||||
if err != nil {
|
||||
return emptyChunkSource{}, gcBehavior_Continue, err
|
||||
} else if gcb != gcBehavior_Continue {
|
||||
|
||||
@@ -123,6 +123,8 @@ func (tw *tableWriter) addChunk(h hash.Hash, data []byte) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// finish completed table, writing the index and footer. Returns the total length of the table file and the hash used
|
||||
// to identify the table.
|
||||
func (tw *tableWriter) finish() (tableFileLength uint64, blockAddr hash.Hash, err error) {
|
||||
err = tw.writeIndex()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user