mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-07 08:50:34 -06:00
[no-release-notes] tracing infra (#7783)
* tracing starter * bump * trace reads * bump * undo some unnecessary changes * fix build * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --------- Co-authored-by: max-hoffman <max-hoffman@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
f7abadf73e
commit
7aef8fcde1
@@ -626,7 +626,7 @@ func execBatchMode(ctx *sql.Context, qryist cli.Queryist, input io.Reader, conti
|
||||
|
||||
sqlMode := sql.LoadSqlMode(ctx)
|
||||
|
||||
sqlStatement, err := sqlparser.ParseWithOptions(query, sqlMode.ParserOptions())
|
||||
sqlStatement, err := sqlparser.ParseWithOptions(ctx, query, sqlMode.ParserOptions())
|
||||
if err == sqlparser.ErrEmpty {
|
||||
continue
|
||||
} else if err != nil {
|
||||
|
||||
@@ -15,7 +15,7 @@ require (
|
||||
github.com/dolthub/fslock v0.0.3
|
||||
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488
|
||||
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
|
||||
github.com/dolthub/vitess v0.0.0-20240617225939-55a46c5dcfc8
|
||||
github.com/dolthub/vitess v0.0.0-20240626174323-4083c07f5e9c
|
||||
github.com/dustin/go-humanize v1.0.1
|
||||
github.com/fatih/color v1.13.0
|
||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||
@@ -57,7 +57,7 @@ require (
|
||||
github.com/cespare/xxhash v1.1.0
|
||||
github.com/creasty/defaults v1.6.0
|
||||
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
|
||||
github.com/dolthub/go-mysql-server v0.18.2-0.20240625212035-80f4e402d726
|
||||
github.com/dolthub/go-mysql-server v0.18.2-0.20240626180128-807a2e35937f
|
||||
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63
|
||||
github.com/dolthub/swiss v0.1.0
|
||||
github.com/goccy/go-json v0.10.2
|
||||
|
||||
@@ -183,8 +183,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
|
||||
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
|
||||
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y=
|
||||
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168=
|
||||
github.com/dolthub/go-mysql-server v0.18.2-0.20240625212035-80f4e402d726 h1:zVMMW0gpT/Cq+xEUPulbt5y7dWz0K1A5BtNRDarW+i0=
|
||||
github.com/dolthub/go-mysql-server v0.18.2-0.20240625212035-80f4e402d726/go.mod h1:XdiHsd2TX3OOhjwY6tPcw1ztT2BdBiP6Wp0m/7OYHn4=
|
||||
github.com/dolthub/go-mysql-server v0.18.2-0.20240626180128-807a2e35937f h1:ouZxORcShC3qeQdsTkKDpROh+OI1OZuoOAJx8HThMrs=
|
||||
github.com/dolthub/go-mysql-server v0.18.2-0.20240626180128-807a2e35937f/go.mod h1:JahRYjx/Py6T/bWrnTu25CaGn94Df+McAuWGEG0shwU=
|
||||
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 h1:OAsXLAPL4du6tfbBgK0xXHZkOlos63RdKYS3Sgw/dfI=
|
||||
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63/go.mod h1:lV7lUeuDhH5thVGDCKXbatwKy2KW80L4rMT46n+Y2/Q=
|
||||
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514=
|
||||
@@ -197,8 +197,8 @@ github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9X
|
||||
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
|
||||
github.com/dolthub/swiss v0.1.0 h1:EaGQct3AqeP/MjASHLiH6i4TAmgbG/c4rA6a1bzCOPc=
|
||||
github.com/dolthub/swiss v0.1.0/go.mod h1:BeucyB08Vb1G9tumVN3Vp/pyY4AMUnr9p7Rz7wJ7kAQ=
|
||||
github.com/dolthub/vitess v0.0.0-20240617225939-55a46c5dcfc8 h1:d+dOTwI8dkwNYmcweXNjei2ot3GHJB3HqLWUeNvAkC0=
|
||||
github.com/dolthub/vitess v0.0.0-20240617225939-55a46c5dcfc8/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
|
||||
github.com/dolthub/vitess v0.0.0-20240626174323-4083c07f5e9c h1:Y3M0hPCUvT+5RTNbJLKywGc9aHIRCIlg+0NOhC91GYE=
|
||||
github.com/dolthub/vitess v0.0.0-20240626174323-4083c07f5e9c/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
|
||||
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
|
||||
@@ -31,7 +31,7 @@ type RemoteSrvStore interface {
|
||||
chunks.TableFileStore
|
||||
|
||||
Path() (string, bool)
|
||||
GetChunkLocationsWithPaths(hashes hash.HashSet) (map[string]map[hash.Hash]nbs.Range, error)
|
||||
GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]nbs.Range, error)
|
||||
}
|
||||
|
||||
var _ RemoteSrvStore = &nbs.NomsBlockStore{}
|
||||
|
||||
@@ -172,7 +172,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
|
||||
|
||||
numHashes := len(hashes)
|
||||
|
||||
locations, err := cs.GetChunkLocationsWithPaths(hashes)
|
||||
locations, err := cs.GetChunkLocationsWithPaths(ctx, hashes)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("error getting chunk locations for hashes")
|
||||
return nil, err
|
||||
@@ -277,7 +277,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
|
||||
return err
|
||||
}
|
||||
numHashes += len(hashes)
|
||||
locations, err := cs.GetChunkLocationsWithPaths(hashes)
|
||||
locations, err := cs.GetChunkLocationsWithPaths(context.Background(), hashes)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("error getting chunk locations for hashes")
|
||||
return err
|
||||
|
||||
@@ -623,7 +623,7 @@ func tableKey(t Table) (uint64, error) {
|
||||
}
|
||||
|
||||
func parseTableAndSchema(q string) (string, []string, []sql2.Type) {
|
||||
stmt, _, err := ast.ParseOne(q)
|
||||
stmt, _, err := ast.ParseOne(context.Background(), q)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("invalid query: %s; %s", q, err))
|
||||
}
|
||||
|
||||
1
go/performance/utils/benchmark_runner/tpcc.lua
Symbolic link
1
go/performance/utils/benchmark_runner/tpcc.lua
Symbolic link
@@ -0,0 +1 @@
|
||||
/Users/maxhoffman/go/github.com/dolthub/sysbench-tpcc/tpcc.lua
|
||||
@@ -123,7 +123,7 @@ func (acs archiveChunkSource) clone() (chunkSource, error) {
|
||||
return nil, errors.New("Archive chunk source does not support clone")
|
||||
}
|
||||
|
||||
func (acs archiveChunkSource) getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error) {
|
||||
func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord) (map[hash.Hash]Range, error) {
|
||||
return nil, errors.New("Archive chunk source does not support getRecordRanges")
|
||||
}
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ func (ecs emptyChunkSource) reader(context.Context) (io.ReadCloser, uint64, erro
|
||||
return io.NopCloser(&bytes.Buffer{}), 0, nil
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) getRecordRanges(lookups []getRecord) (map[hash.Hash]Range, error) {
|
||||
func (ecs emptyChunkSource) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) {
|
||||
return map[hash.Hash]Range{}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -401,14 +401,14 @@ func (gcs *GenerationalNBS) SupportedOperations() chunks.TableFileStoreOps {
|
||||
return gcs.newGen.SupportedOperations()
|
||||
}
|
||||
|
||||
func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) {
|
||||
res, err := gcs.newGen.GetChunkLocationsWithPaths(hashes)
|
||||
func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) {
|
||||
res, err := gcs.newGen.GetChunkLocationsWithPaths(ctx, hashes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(hashes) > 0 {
|
||||
prefix := gcs.RelativeOldGenPath()
|
||||
toadd, err := gcs.oldGen.GetChunkLocationsWithPaths(hashes)
|
||||
toadd, err := gcs.oldGen.GetChunkLocationsWithPaths(ctx, hashes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -419,13 +419,13 @@ func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(hashes hash.HashSet) (map
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (gcs *GenerationalNBS) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) {
|
||||
res, err := gcs.newGen.GetChunkLocations(hashes)
|
||||
func (gcs *GenerationalNBS) GetChunkLocations(ctx context.Context, hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) {
|
||||
res, err := gcs.newGen.GetChunkLocations(ctx, hashes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(hashes) > 0 {
|
||||
toadd, err := gcs.oldGen.GetChunkLocations(hashes)
|
||||
toadd, err := gcs.oldGen.GetChunkLocations(ctx, hashes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -161,7 +161,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context) (err error) {
|
||||
}
|
||||
if ok {
|
||||
// write the current root hash to the journal file
|
||||
if err = j.wr.commitRootHash(contents.root); err != nil {
|
||||
if err = j.wr.commitRootHash(ctx, contents.root); err != nil {
|
||||
return
|
||||
}
|
||||
j.contents = contents
|
||||
@@ -259,7 +259,7 @@ func (j *ChunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkRea
|
||||
continue
|
||||
}
|
||||
c := chunks.NewChunkWithHash(hash.Hash(*record.a), mt.chunks[*record.a])
|
||||
err := j.wr.writeCompressedChunk(ChunkToCompressedChunk(c))
|
||||
err := j.wr.writeCompressedChunk(ctx, ChunkToCompressedChunk(c))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -355,7 +355,7 @@ func (j *ChunkJournal) Update(ctx context.Context, lastLock hash.Hash, next mani
|
||||
}
|
||||
}
|
||||
|
||||
if err := j.wr.commitRootHash(next.root); err != nil {
|
||||
if err := j.wr.commitRootHash(ctx, next.root); err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
j.contents = next
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime/trace"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
@@ -54,11 +55,14 @@ func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error)
|
||||
return
|
||||
}
|
||||
|
||||
func (s journalChunkSource) getCompressed(_ context.Context, h hash.Hash, _ *Stats) (CompressedChunk, error) {
|
||||
func (s journalChunkSource) getCompressed(ctx context.Context, h hash.Hash, _ *Stats) (CompressedChunk, error) {
|
||||
defer trace.StartRegion(ctx, "journalChunkSource.getCompressed").End()
|
||||
return s.journal.getCompressedChunk(h)
|
||||
}
|
||||
|
||||
func (s journalChunkSource) get(_ context.Context, h hash.Hash, _ *Stats) ([]byte, error) {
|
||||
func (s journalChunkSource) get(ctx context.Context, h hash.Hash, _ *Stats) ([]byte, error) {
|
||||
defer trace.StartRegion(ctx, "journalChunkSource.get").End()
|
||||
|
||||
cc, err := s.journal.getCompressedChunk(h)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -100,6 +104,8 @@ func (s journalChunkSource) getMany(ctx context.Context, eg *errgroup.Group, req
|
||||
// lock after returning when all reads are completed, which can be after the
|
||||
// function returns.
|
||||
func (s journalChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
|
||||
defer trace.StartRegion(ctx, "journalChunkSource.getManyCompressed").End()
|
||||
|
||||
var remaining bool
|
||||
var jReqs []journalRecord
|
||||
var wg sync.WaitGroup
|
||||
@@ -160,18 +166,18 @@ func (s journalChunkSource) hash() hash.Hash {
|
||||
}
|
||||
|
||||
// reader implements chunkSource.
|
||||
func (s journalChunkSource) reader(context.Context) (io.ReadCloser, uint64, error) {
|
||||
rdr, sz, err := s.journal.snapshot()
|
||||
func (s journalChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64, error) {
|
||||
rdr, sz, err := s.journal.snapshot(ctx)
|
||||
return rdr, uint64(sz), err
|
||||
}
|
||||
|
||||
func (s journalChunkSource) getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error) {
|
||||
func (s journalChunkSource) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) {
|
||||
ranges := make(map[hash.Hash]Range, len(requests))
|
||||
for _, req := range requests {
|
||||
if req.found {
|
||||
continue
|
||||
}
|
||||
rng, ok, err := s.journal.getRange(*req.a)
|
||||
rng, ok, err := s.journal.getRange(ctx, *req.a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
|
||||
@@ -99,7 +99,7 @@ func TestReadRecordRanges(t *testing.T) {
|
||||
jcs, err := j.Persist(ctx, mt, emptyChunkSource{}, &Stats{})
|
||||
require.NoError(t, err)
|
||||
|
||||
rdr, sz, err := jcs.(journalChunkSource).journal.snapshot()
|
||||
rdr, sz, err := jcs.(journalChunkSource).journal.snapshot(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer rdr.Close()
|
||||
|
||||
@@ -108,7 +108,7 @@ func TestReadRecordRanges(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int(sz), n)
|
||||
|
||||
ranges, err := jcs.getRecordRanges(gets)
|
||||
ranges, err := jcs.getRecordRanges(ctx, gets)
|
||||
require.NoError(t, err)
|
||||
|
||||
for h, rng := range ranges {
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/trace"
|
||||
"sync"
|
||||
|
||||
"github.com/dolthub/swiss"
|
||||
@@ -286,7 +287,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer
|
||||
if err := wr.truncateIndex(safeIndexOffset); err != nil {
|
||||
return hash.Hash{}, err
|
||||
}
|
||||
wr.ranges = wr.ranges.flatten()
|
||||
wr.ranges = wr.ranges.flatten(ctx)
|
||||
}
|
||||
|
||||
var lastOffset int64
|
||||
@@ -331,7 +332,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer
|
||||
|
||||
if wr.ranges.novelCount() > wr.maxNovel {
|
||||
// save bootstrap progress
|
||||
if err := wr.flushIndexRecord(last, lastOffset); err != nil {
|
||||
if err := wr.flushIndexRecord(ctx, last, lastOffset); err != nil {
|
||||
return hash.Hash{}, err
|
||||
}
|
||||
}
|
||||
@@ -397,10 +398,10 @@ func (wr *journalWriter) getCompressedChunkAtRange(r Range, h hash.Hash) (Compre
|
||||
}
|
||||
|
||||
// getRange returns a Range for the chunk with addr |h|.
|
||||
func (wr *journalWriter) getRange(h hash.Hash) (rng Range, ok bool, err error) {
|
||||
func (wr *journalWriter) getRange(ctx context.Context, h hash.Hash) (rng Range, ok bool, err error) {
|
||||
// callers will use |rng| to read directly from the
|
||||
// journal file, so we must flush here
|
||||
if err = wr.maybeFlush(); err != nil {
|
||||
if err = wr.maybeFlush(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
wr.lock.RLock()
|
||||
@@ -410,7 +411,7 @@ func (wr *journalWriter) getRange(h hash.Hash) (rng Range, ok bool, err error) {
|
||||
}
|
||||
|
||||
// writeCompressedChunk writes |cc| to the journal.
|
||||
func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error {
|
||||
func (wr *journalWriter) writeCompressedChunk(ctx context.Context, cc CompressedChunk) error {
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
recordLen, payloadOff := chunkRecordSize(cc)
|
||||
@@ -418,7 +419,7 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error {
|
||||
Offset: uint64(wr.offset()) + uint64(payloadOff),
|
||||
Length: uint32(len(cc.FullCompressedChunk)),
|
||||
}
|
||||
buf, err := wr.getBytes(int(recordLen))
|
||||
buf, err := wr.getBytes(ctx, int(recordLen))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -448,37 +449,44 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error {
|
||||
// write index records out. It's perfectly fine to reuse the current
|
||||
// root hash, and this will also take care of the |Sync|.
|
||||
if wr.unsyncd > journalMaybeSyncThreshold && !wr.currentRoot.IsEmpty() {
|
||||
return wr.commitRootHashUnlocked(wr.currentRoot)
|
||||
return wr.commitRootHashUnlocked(ctx, wr.currentRoot)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// commitRootHash commits |root| to the journal and syncs the file to disk.
|
||||
func (wr *journalWriter) commitRootHash(root hash.Hash) error {
|
||||
func (wr *journalWriter) commitRootHash(ctx context.Context, root hash.Hash) error {
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
return wr.commitRootHashUnlocked(root)
|
||||
return wr.commitRootHashUnlocked(ctx, root)
|
||||
}
|
||||
|
||||
func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error {
|
||||
buf, err := wr.getBytes(rootHashRecordSize())
|
||||
func (wr *journalWriter) commitRootHashUnlocked(ctx context.Context, root hash.Hash) error {
|
||||
defer trace.StartRegion(ctx, "commit-root").End()
|
||||
|
||||
buf, err := wr.getBytes(ctx, rootHashRecordSize())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wr.currentRoot = root
|
||||
n := writeRootHashRecord(buf, root)
|
||||
if err = wr.flush(); err != nil {
|
||||
if err = wr.flush(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = wr.journal.Sync(); err != nil {
|
||||
func() {
|
||||
defer trace.StartRegion(ctx, "sync").End()
|
||||
|
||||
err = wr.journal.Sync()
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wr.unsyncd = 0
|
||||
if wr.ranges.novelCount() > wr.maxNovel {
|
||||
o := wr.offset() - int64(n) // pre-commit journal offset
|
||||
if err := wr.flushIndexRecord(root, o); err != nil {
|
||||
if err := wr.flushIndexRecord(ctx, root, o); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -488,12 +496,13 @@ func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error {
|
||||
// flushIndexRecord writes metadata for a range of index lookups to the
|
||||
// out-of-band journal index file. Index records accelerate journal
|
||||
// bootstrapping by reducing the amount of the journal that must be processed.
|
||||
func (wr *journalWriter) flushIndexRecord(root hash.Hash, end int64) (err error) {
|
||||
func (wr *journalWriter) flushIndexRecord(ctx context.Context, root hash.Hash, end int64) (err error) {
|
||||
defer trace.StartRegion(ctx, "flushIndexRecord").End()
|
||||
if err := writeJournalIndexMeta(wr.indexWriter, root, wr.indexed, end, wr.batchCrc); err != nil {
|
||||
return err
|
||||
}
|
||||
wr.batchCrc = 0
|
||||
wr.ranges = wr.ranges.flatten()
|
||||
wr.ranges = wr.ranges.flatten(ctx)
|
||||
// set a new high-water-mark for the indexed portion of the journal
|
||||
wr.indexed = end
|
||||
return
|
||||
@@ -524,13 +533,13 @@ func (wr *journalWriter) readAt(p []byte, off int64) (n int, err error) {
|
||||
}
|
||||
|
||||
// getBytes returns a buffer for writers to copy data into.
|
||||
func (wr *journalWriter) getBytes(n int) (buf []byte, err error) {
|
||||
func (wr *journalWriter) getBytes(ctx context.Context, n int) (buf []byte, err error) {
|
||||
c, l := cap(wr.buf), len(wr.buf)
|
||||
if n > c {
|
||||
err = fmt.Errorf("requested bytes (%d) exceeds capacity (%d)", n, c)
|
||||
return
|
||||
} else if n > c-l {
|
||||
if err = wr.flush(); err != nil {
|
||||
if err = wr.flush(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -541,7 +550,8 @@ func (wr *journalWriter) getBytes(n int) (buf []byte, err error) {
|
||||
}
|
||||
|
||||
// flush writes buffered data into the journal file.
|
||||
func (wr *journalWriter) flush() (err error) {
|
||||
func (wr *journalWriter) flush(ctx context.Context) (err error) {
|
||||
defer trace.StartRegion(ctx, "flush journal").End()
|
||||
if _, err = wr.journal.WriteAt(wr.buf, wr.off); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -551,7 +561,7 @@ func (wr *journalWriter) flush() (err error) {
|
||||
}
|
||||
|
||||
// maybeFlush flushes buffered data, if any exists.
|
||||
func (wr *journalWriter) maybeFlush() (err error) {
|
||||
func (wr *journalWriter) maybeFlush(ctx context.Context) (err error) {
|
||||
wr.lock.RLock()
|
||||
empty := len(wr.buf) == 0
|
||||
wr.lock.RUnlock()
|
||||
@@ -560,7 +570,7 @@ func (wr *journalWriter) maybeFlush() (err error) {
|
||||
}
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
return wr.flush()
|
||||
return wr.flush(ctx)
|
||||
}
|
||||
|
||||
type journalWriterSnapshot struct {
|
||||
@@ -574,10 +584,10 @@ func (s journalWriterSnapshot) Close() error {
|
||||
|
||||
// snapshot returns an io.Reader with a consistent view of
|
||||
// the current state of the journal file.
|
||||
func (wr *journalWriter) snapshot() (io.ReadCloser, int64, error) {
|
||||
func (wr *journalWriter) snapshot(ctx context.Context) (io.ReadCloser, int64, error) {
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
if err := wr.flush(); err != nil {
|
||||
if err := wr.flush(ctx); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
// open a new file descriptor with an
|
||||
@@ -625,7 +635,7 @@ func (wr *journalWriter) Close() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = wr.flush(); err != nil {
|
||||
if err = wr.flush(context.Background()); err != nil {
|
||||
return err
|
||||
}
|
||||
if wr.index != nil {
|
||||
@@ -709,7 +719,10 @@ func (idx rangeIndex) novelLookups() (lookups []lookup) {
|
||||
return
|
||||
}
|
||||
|
||||
func (idx rangeIndex) flatten() rangeIndex {
|
||||
func (idx rangeIndex) flatten(ctx context.Context) rangeIndex {
|
||||
defer trace.StartRegion(ctx, "flatten journal index").End()
|
||||
trace.Logf(ctx, "swiss map current count", "%d", idx.cached.Count())
|
||||
trace.Logf(ctx, "swiss map add count", "%d", idx.novel.Count())
|
||||
idx.novel.Iter(func(a hash.Hash, r Range) (stop bool) {
|
||||
idx.cached.Put(toAddr16(a), r)
|
||||
return
|
||||
|
||||
@@ -163,13 +163,13 @@ func TestJournalWriterReadWrite(t *testing.T) {
|
||||
assert.Equal(t, op.buf, act, "operation %d failed", i)
|
||||
case writeOp:
|
||||
var p []byte
|
||||
p, err = j.getBytes(len(op.buf))
|
||||
p, err = j.getBytes(context.Background(), len(op.buf))
|
||||
require.NoError(t, err, "operation %d errored", i)
|
||||
n := copy(p, op.buf)
|
||||
assert.Equal(t, len(op.buf), n, "operation %d failed", i)
|
||||
off += int64(n)
|
||||
case flushOp:
|
||||
err = j.flush()
|
||||
err = j.flush(context.Background())
|
||||
assert.NoError(t, err, "operation %d errored", i)
|
||||
default:
|
||||
t.Fatal("unknown opKind")
|
||||
@@ -195,7 +195,7 @@ func TestJournalWriterWriteCompressedChunk(t *testing.T) {
|
||||
j := newTestJournalWriter(t, path)
|
||||
data := randomCompressedChunks(1024)
|
||||
for a, cc := range data {
|
||||
err := j.writeCompressedChunk(cc)
|
||||
err := j.writeCompressedChunk(context.Background(), cc)
|
||||
require.NoError(t, err)
|
||||
r, _ := j.ranges.get(a)
|
||||
validateLookup(t, j, r, cc)
|
||||
@@ -210,11 +210,11 @@ func TestJournalWriterBootstrap(t *testing.T) {
|
||||
data := randomCompressedChunks(1024)
|
||||
var last hash.Hash
|
||||
for _, cc := range data {
|
||||
err := j.writeCompressedChunk(cc)
|
||||
err := j.writeCompressedChunk(context.Background(), cc)
|
||||
require.NoError(t, err)
|
||||
last = cc.Hash()
|
||||
}
|
||||
require.NoError(t, j.commitRootHash(last))
|
||||
require.NoError(t, j.commitRootHash(context.Background(), last))
|
||||
require.NoError(t, j.Close())
|
||||
|
||||
j, _, err := openJournalWriter(ctx, path)
|
||||
@@ -270,10 +270,10 @@ func TestJournalWriterSyncClose(t *testing.T) {
|
||||
path := newTestFilePath(t)
|
||||
j := newTestJournalWriter(t, path)
|
||||
p := []byte("sit")
|
||||
buf, err := j.getBytes(len(p))
|
||||
buf, err := j.getBytes(context.Background(), len(p))
|
||||
require.NoError(t, err)
|
||||
copy(buf, p)
|
||||
j.flush()
|
||||
j.flush(context.Background())
|
||||
assert.Equal(t, 0, len(j.buf))
|
||||
assert.Equal(t, 3, int(j.off))
|
||||
}
|
||||
@@ -341,17 +341,17 @@ func TestJournalIndexBootstrap(t *testing.T) {
|
||||
for i, e := range epochs {
|
||||
for _, cc := range e.records {
|
||||
recordCnt++
|
||||
assert.NoError(t, j.writeCompressedChunk(cc))
|
||||
assert.NoError(t, j.writeCompressedChunk(context.Background(), cc))
|
||||
if rand.Int()%10 == 0 { // periodic commits
|
||||
assert.NoError(t, j.commitRootHash(cc.H))
|
||||
assert.NoError(t, j.commitRootHash(context.Background(), cc.H))
|
||||
}
|
||||
}
|
||||
o := j.offset() // precommit offset
|
||||
assert.NoError(t, j.commitRootHash(e.last)) // commit |e.last|
|
||||
o := j.offset() // precommit offset
|
||||
assert.NoError(t, j.commitRootHash(context.Background(), e.last)) // commit |e.last|
|
||||
if i == len(epochs) {
|
||||
break // don't index |test.novel|
|
||||
}
|
||||
assert.NoError(t, j.flushIndexRecord(e.last, o)) // write index record
|
||||
assert.NoError(t, j.flushIndexRecord(ctx, e.last, o)) // write index record
|
||||
}
|
||||
err := j.Close()
|
||||
require.NoError(t, err)
|
||||
@@ -448,7 +448,7 @@ func TestRangeIndex(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, len(data), idx.novelCount())
|
||||
assert.Equal(t, len(data), int(idx.count()))
|
||||
idx = idx.flatten()
|
||||
idx = idx.flatten(context.Background())
|
||||
assert.Equal(t, 0, idx.novelCount())
|
||||
assert.Equal(t, len(data), int(idx.count()))
|
||||
}
|
||||
|
||||
@@ -138,8 +138,8 @@ func (nbs *NomsBlockStore) ChunkJournal() *ChunkJournal {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) {
|
||||
locs, err := nbs.GetChunkLocations(hashes)
|
||||
func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) {
|
||||
locs, err := nbs.GetChunkLocations(ctx, hashes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -150,13 +150,13 @@ func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(hashes hash.HashSet) (map[
|
||||
return toret, nil
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) {
|
||||
func (nbs *NomsBlockStore) GetChunkLocations(ctx context.Context, hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) {
|
||||
gr := toGetRecords(hashes)
|
||||
ranges := make(map[hash.Hash]map[hash.Hash]Range)
|
||||
|
||||
fn := func(css chunkSourceSet) error {
|
||||
for _, cs := range css {
|
||||
rng, err := cs.getRecordRanges(gr)
|
||||
rng, err := cs.getRecordRanges(ctx, gr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -226,7 +226,7 @@ type chunkSource interface {
|
||||
reader(context.Context) (io.ReadCloser, uint64, error)
|
||||
|
||||
// getRecordRanges sets getRecord.found to true, and returns a Range for each present getRecord query.
|
||||
getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error)
|
||||
getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error)
|
||||
|
||||
// index returns the tableIndex of this chunkSource.
|
||||
index() (tableIndex, error)
|
||||
|
||||
@@ -668,7 +668,7 @@ func (tr tableReader) reader(ctx context.Context) (io.ReadCloser, uint64, error)
|
||||
return r, sz, nil
|
||||
}
|
||||
|
||||
func (tr tableReader) getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error) {
|
||||
func (tr tableReader) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) {
|
||||
// findOffsets sets getRecord.found
|
||||
recs, _, err := tr.findOffsets(requests)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user