From 7aef8fcde1460148718cb9e9e89c354a201adfc2 Mon Sep 17 00:00:00 2001 From: Maximilian Hoffman Date: Wed, 26 Jun 2024 11:59:46 -0700 Subject: [PATCH] [no-release-notes] tracing infra (#7783) * tracing starter * bump * trace reads * bump * undo some unnecessary changes * fix build * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --------- Co-authored-by: max-hoffman --- go/cmd/dolt/commands/sql.go | 2 +- go/go.mod | 4 +- go/go.sum | 8 +-- go/libraries/doltcore/remotesrv/dbcache.go | 2 +- go/libraries/doltcore/remotesrv/grpc.go | 4 +- go/performance/import_benchmarker/testdef.go | 2 +- .../utils/benchmark_runner/tpcc.lua | 1 + go/store/nbs/archive_chunk_source.go | 2 +- go/store/nbs/empty_chunk_source.go | 2 +- go/store/nbs/generational_chunk_store.go | 12 ++-- go/store/nbs/journal.go | 6 +- go/store/nbs/journal_chunk_source.go | 18 ++++-- go/store/nbs/journal_test.go | 4 +- go/store/nbs/journal_writer.go | 63 +++++++++++-------- go/store/nbs/journal_writer_test.go | 26 ++++---- go/store/nbs/store.go | 8 +-- go/store/nbs/table.go | 2 +- go/store/nbs/table_reader.go | 2 +- 18 files changed, 94 insertions(+), 74 deletions(-) create mode 120000 go/performance/utils/benchmark_runner/tpcc.lua diff --git a/go/cmd/dolt/commands/sql.go b/go/cmd/dolt/commands/sql.go index 36cbf851bc..73ed7af316 100644 --- a/go/cmd/dolt/commands/sql.go +++ b/go/cmd/dolt/commands/sql.go @@ -626,7 +626,7 @@ func execBatchMode(ctx *sql.Context, qryist cli.Queryist, input io.Reader, conti sqlMode := sql.LoadSqlMode(ctx) - sqlStatement, err := sqlparser.ParseWithOptions(query, sqlMode.ParserOptions()) + sqlStatement, err := sqlparser.ParseWithOptions(ctx, query, sqlMode.ParserOptions()) if err == sqlparser.ErrEmpty { continue } else if err != nil { diff --git a/go/go.mod b/go/go.mod index 11617617cb..5ca608b217 100644 --- a/go/go.mod +++ b/go/go.mod @@ -15,7 +15,7 @@ require ( github.com/dolthub/fslock v0.0.3 github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 - github.com/dolthub/vitess v0.0.0-20240617225939-55a46c5dcfc8 + github.com/dolthub/vitess v0.0.0-20240626174323-4083c07f5e9c github.com/dustin/go-humanize v1.0.1 github.com/fatih/color v1.13.0 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 @@ -57,7 +57,7 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/creasty/defaults v1.6.0 github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2 - github.com/dolthub/go-mysql-server v0.18.2-0.20240625212035-80f4e402d726 + github.com/dolthub/go-mysql-server v0.18.2-0.20240626180128-807a2e35937f github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 github.com/dolthub/swiss v0.1.0 github.com/goccy/go-json v0.10.2 diff --git a/go/go.sum b/go/go.sum index 19e23a4e8f..8a871b2c1f 100644 --- a/go/go.sum +++ b/go/go.sum @@ -183,8 +183,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U= github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0= github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y= github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168= -github.com/dolthub/go-mysql-server v0.18.2-0.20240625212035-80f4e402d726 h1:zVMMW0gpT/Cq+xEUPulbt5y7dWz0K1A5BtNRDarW+i0= -github.com/dolthub/go-mysql-server v0.18.2-0.20240625212035-80f4e402d726/go.mod h1:XdiHsd2TX3OOhjwY6tPcw1ztT2BdBiP6Wp0m/7OYHn4= +github.com/dolthub/go-mysql-server v0.18.2-0.20240626180128-807a2e35937f h1:ouZxORcShC3qeQdsTkKDpROh+OI1OZuoOAJx8HThMrs= +github.com/dolthub/go-mysql-server v0.18.2-0.20240626180128-807a2e35937f/go.mod h1:JahRYjx/Py6T/bWrnTu25CaGn94Df+McAuWGEG0shwU= github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 h1:OAsXLAPL4du6tfbBgK0xXHZkOlos63RdKYS3Sgw/dfI= github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63/go.mod h1:lV7lUeuDhH5thVGDCKXbatwKy2KW80L4rMT46n+Y2/Q= github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514= @@ -197,8 +197,8 @@ github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9X github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY= github.com/dolthub/swiss v0.1.0 h1:EaGQct3AqeP/MjASHLiH6i4TAmgbG/c4rA6a1bzCOPc= github.com/dolthub/swiss v0.1.0/go.mod h1:BeucyB08Vb1G9tumVN3Vp/pyY4AMUnr9p7Rz7wJ7kAQ= -github.com/dolthub/vitess v0.0.0-20240617225939-55a46c5dcfc8 h1:d+dOTwI8dkwNYmcweXNjei2ot3GHJB3HqLWUeNvAkC0= -github.com/dolthub/vitess v0.0.0-20240617225939-55a46c5dcfc8/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM= +github.com/dolthub/vitess v0.0.0-20240626174323-4083c07f5e9c h1:Y3M0hPCUvT+5RTNbJLKywGc9aHIRCIlg+0NOhC91GYE= +github.com/dolthub/vitess v0.0.0-20240626174323-4083c07f5e9c/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= diff --git a/go/libraries/doltcore/remotesrv/dbcache.go b/go/libraries/doltcore/remotesrv/dbcache.go index 882aa6e9c8..ca2ca05a05 100644 --- a/go/libraries/doltcore/remotesrv/dbcache.go +++ b/go/libraries/doltcore/remotesrv/dbcache.go @@ -31,7 +31,7 @@ type RemoteSrvStore interface { chunks.TableFileStore Path() (string, bool) - GetChunkLocationsWithPaths(hashes hash.HashSet) (map[string]map[hash.Hash]nbs.Range, error) + GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]nbs.Range, error) } var _ RemoteSrvStore = &nbs.NomsBlockStore{} diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index e9aa6d1d6e..d158f47e05 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -172,7 +172,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot numHashes := len(hashes) - locations, err := cs.GetChunkLocationsWithPaths(hashes) + locations, err := cs.GetChunkLocationsWithPaths(ctx, hashes) if err != nil { logger.WithError(err).Error("error getting chunk locations for hashes") return nil, err @@ -277,7 +277,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore return err } numHashes += len(hashes) - locations, err := cs.GetChunkLocationsWithPaths(hashes) + locations, err := cs.GetChunkLocationsWithPaths(context.Background(), hashes) if err != nil { logger.WithError(err).Error("error getting chunk locations for hashes") return err diff --git a/go/performance/import_benchmarker/testdef.go b/go/performance/import_benchmarker/testdef.go index 7a0e12a579..486aff5a5b 100644 --- a/go/performance/import_benchmarker/testdef.go +++ b/go/performance/import_benchmarker/testdef.go @@ -623,7 +623,7 @@ func tableKey(t Table) (uint64, error) { } func parseTableAndSchema(q string) (string, []string, []sql2.Type) { - stmt, _, err := ast.ParseOne(q) + stmt, _, err := ast.ParseOne(context.Background(), q) if err != nil { panic(fmt.Sprintf("invalid query: %s; %s", q, err)) } diff --git a/go/performance/utils/benchmark_runner/tpcc.lua b/go/performance/utils/benchmark_runner/tpcc.lua new file mode 120000 index 0000000000..b61e7be4d5 --- /dev/null +++ b/go/performance/utils/benchmark_runner/tpcc.lua @@ -0,0 +1 @@ +/Users/maxhoffman/go/github.com/dolthub/sysbench-tpcc/tpcc.lua \ No newline at end of file diff --git a/go/store/nbs/archive_chunk_source.go b/go/store/nbs/archive_chunk_source.go index f3b2e3df0a..9a68eb4bef 100644 --- a/go/store/nbs/archive_chunk_source.go +++ b/go/store/nbs/archive_chunk_source.go @@ -123,7 +123,7 @@ func (acs archiveChunkSource) clone() (chunkSource, error) { return nil, errors.New("Archive chunk source does not support clone") } -func (acs archiveChunkSource) getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error) { +func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord) (map[hash.Hash]Range, error) { return nil, errors.New("Archive chunk source does not support getRecordRanges") } diff --git a/go/store/nbs/empty_chunk_source.go b/go/store/nbs/empty_chunk_source.go index 719b8539e5..c8f094f98a 100644 --- a/go/store/nbs/empty_chunk_source.go +++ b/go/store/nbs/empty_chunk_source.go @@ -74,7 +74,7 @@ func (ecs emptyChunkSource) reader(context.Context) (io.ReadCloser, uint64, erro return io.NopCloser(&bytes.Buffer{}), 0, nil } -func (ecs emptyChunkSource) getRecordRanges(lookups []getRecord) (map[hash.Hash]Range, error) { +func (ecs emptyChunkSource) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) { return map[hash.Hash]Range{}, nil } diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 00f70a02d4..5dc356361d 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -401,14 +401,14 @@ func (gcs *GenerationalNBS) SupportedOperations() chunks.TableFileStoreOps { return gcs.newGen.SupportedOperations() } -func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { - res, err := gcs.newGen.GetChunkLocationsWithPaths(hashes) +func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { + res, err := gcs.newGen.GetChunkLocationsWithPaths(ctx, hashes) if err != nil { return nil, err } if len(hashes) > 0 { prefix := gcs.RelativeOldGenPath() - toadd, err := gcs.oldGen.GetChunkLocationsWithPaths(hashes) + toadd, err := gcs.oldGen.GetChunkLocationsWithPaths(ctx, hashes) if err != nil { return nil, err } @@ -419,13 +419,13 @@ func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(hashes hash.HashSet) (map return res, nil } -func (gcs *GenerationalNBS) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) { - res, err := gcs.newGen.GetChunkLocations(hashes) +func (gcs *GenerationalNBS) GetChunkLocations(ctx context.Context, hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) { + res, err := gcs.newGen.GetChunkLocations(ctx, hashes) if err != nil { return nil, err } if len(hashes) > 0 { - toadd, err := gcs.oldGen.GetChunkLocations(hashes) + toadd, err := gcs.oldGen.GetChunkLocations(ctx, hashes) if err != nil { return nil, err } diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index aef7dfb538..628f14d702 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -161,7 +161,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context) (err error) { } if ok { // write the current root hash to the journal file - if err = j.wr.commitRootHash(contents.root); err != nil { + if err = j.wr.commitRootHash(ctx, contents.root); err != nil { return } j.contents = contents @@ -259,7 +259,7 @@ func (j *ChunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkRea continue } c := chunks.NewChunkWithHash(hash.Hash(*record.a), mt.chunks[*record.a]) - err := j.wr.writeCompressedChunk(ChunkToCompressedChunk(c)) + err := j.wr.writeCompressedChunk(ctx, ChunkToCompressedChunk(c)) if err != nil { return nil, err } @@ -355,7 +355,7 @@ func (j *ChunkJournal) Update(ctx context.Context, lastLock hash.Hash, next mani } } - if err := j.wr.commitRootHash(next.root); err != nil { + if err := j.wr.commitRootHash(ctx, next.root); err != nil { return manifestContents{}, err } j.contents = next diff --git a/go/store/nbs/journal_chunk_source.go b/go/store/nbs/journal_chunk_source.go index 4376b7922d..0b94b13e1b 100644 --- a/go/store/nbs/journal_chunk_source.go +++ b/go/store/nbs/journal_chunk_source.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io" + "runtime/trace" "sort" "sync" @@ -54,11 +55,14 @@ func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error) return } -func (s journalChunkSource) getCompressed(_ context.Context, h hash.Hash, _ *Stats) (CompressedChunk, error) { +func (s journalChunkSource) getCompressed(ctx context.Context, h hash.Hash, _ *Stats) (CompressedChunk, error) { + defer trace.StartRegion(ctx, "journalChunkSource.getCompressed").End() return s.journal.getCompressedChunk(h) } -func (s journalChunkSource) get(_ context.Context, h hash.Hash, _ *Stats) ([]byte, error) { +func (s journalChunkSource) get(ctx context.Context, h hash.Hash, _ *Stats) ([]byte, error) { + defer trace.StartRegion(ctx, "journalChunkSource.get").End() + cc, err := s.journal.getCompressedChunk(h) if err != nil { return nil, err @@ -100,6 +104,8 @@ func (s journalChunkSource) getMany(ctx context.Context, eg *errgroup.Group, req // lock after returning when all reads are completed, which can be after the // function returns. func (s journalChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { + defer trace.StartRegion(ctx, "journalChunkSource.getManyCompressed").End() + var remaining bool var jReqs []journalRecord var wg sync.WaitGroup @@ -160,18 +166,18 @@ func (s journalChunkSource) hash() hash.Hash { } // reader implements chunkSource. -func (s journalChunkSource) reader(context.Context) (io.ReadCloser, uint64, error) { - rdr, sz, err := s.journal.snapshot() +func (s journalChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64, error) { + rdr, sz, err := s.journal.snapshot(ctx) return rdr, uint64(sz), err } -func (s journalChunkSource) getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error) { +func (s journalChunkSource) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) { ranges := make(map[hash.Hash]Range, len(requests)) for _, req := range requests { if req.found { continue } - rng, ok, err := s.journal.getRange(*req.a) + rng, ok, err := s.journal.getRange(ctx, *req.a) if err != nil { return nil, err } else if !ok { diff --git a/go/store/nbs/journal_test.go b/go/store/nbs/journal_test.go index 2c5165260b..9486f1edf1 100644 --- a/go/store/nbs/journal_test.go +++ b/go/store/nbs/journal_test.go @@ -99,7 +99,7 @@ func TestReadRecordRanges(t *testing.T) { jcs, err := j.Persist(ctx, mt, emptyChunkSource{}, &Stats{}) require.NoError(t, err) - rdr, sz, err := jcs.(journalChunkSource).journal.snapshot() + rdr, sz, err := jcs.(journalChunkSource).journal.snapshot(context.Background()) require.NoError(t, err) defer rdr.Close() @@ -108,7 +108,7 @@ func TestReadRecordRanges(t *testing.T) { require.NoError(t, err) assert.Equal(t, int(sz), n) - ranges, err := jcs.getRecordRanges(gets) + ranges, err := jcs.getRecordRanges(ctx, gets) require.NoError(t, err) for h, rng := range ranges { diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index 517bf2566b..10febd736e 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -23,6 +23,7 @@ import ( "io" "os" "path/filepath" + "runtime/trace" "sync" "github.com/dolthub/swiss" @@ -286,7 +287,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer if err := wr.truncateIndex(safeIndexOffset); err != nil { return hash.Hash{}, err } - wr.ranges = wr.ranges.flatten() + wr.ranges = wr.ranges.flatten(ctx) } var lastOffset int64 @@ -331,7 +332,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer if wr.ranges.novelCount() > wr.maxNovel { // save bootstrap progress - if err := wr.flushIndexRecord(last, lastOffset); err != nil { + if err := wr.flushIndexRecord(ctx, last, lastOffset); err != nil { return hash.Hash{}, err } } @@ -397,10 +398,10 @@ func (wr *journalWriter) getCompressedChunkAtRange(r Range, h hash.Hash) (Compre } // getRange returns a Range for the chunk with addr |h|. -func (wr *journalWriter) getRange(h hash.Hash) (rng Range, ok bool, err error) { +func (wr *journalWriter) getRange(ctx context.Context, h hash.Hash) (rng Range, ok bool, err error) { // callers will use |rng| to read directly from the // journal file, so we must flush here - if err = wr.maybeFlush(); err != nil { + if err = wr.maybeFlush(ctx); err != nil { return } wr.lock.RLock() @@ -410,7 +411,7 @@ func (wr *journalWriter) getRange(h hash.Hash) (rng Range, ok bool, err error) { } // writeCompressedChunk writes |cc| to the journal. -func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error { +func (wr *journalWriter) writeCompressedChunk(ctx context.Context, cc CompressedChunk) error { wr.lock.Lock() defer wr.lock.Unlock() recordLen, payloadOff := chunkRecordSize(cc) @@ -418,7 +419,7 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error { Offset: uint64(wr.offset()) + uint64(payloadOff), Length: uint32(len(cc.FullCompressedChunk)), } - buf, err := wr.getBytes(int(recordLen)) + buf, err := wr.getBytes(ctx, int(recordLen)) if err != nil { return err } @@ -448,37 +449,44 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error { // write index records out. It's perfectly fine to reuse the current // root hash, and this will also take care of the |Sync|. if wr.unsyncd > journalMaybeSyncThreshold && !wr.currentRoot.IsEmpty() { - return wr.commitRootHashUnlocked(wr.currentRoot) + return wr.commitRootHashUnlocked(ctx, wr.currentRoot) } return nil } // commitRootHash commits |root| to the journal and syncs the file to disk. -func (wr *journalWriter) commitRootHash(root hash.Hash) error { +func (wr *journalWriter) commitRootHash(ctx context.Context, root hash.Hash) error { wr.lock.Lock() defer wr.lock.Unlock() - return wr.commitRootHashUnlocked(root) + return wr.commitRootHashUnlocked(ctx, root) } -func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error { - buf, err := wr.getBytes(rootHashRecordSize()) +func (wr *journalWriter) commitRootHashUnlocked(ctx context.Context, root hash.Hash) error { + defer trace.StartRegion(ctx, "commit-root").End() + + buf, err := wr.getBytes(ctx, rootHashRecordSize()) if err != nil { return err } wr.currentRoot = root n := writeRootHashRecord(buf, root) - if err = wr.flush(); err != nil { + if err = wr.flush(ctx); err != nil { return err } - if err = wr.journal.Sync(); err != nil { + func() { + defer trace.StartRegion(ctx, "sync").End() + + err = wr.journal.Sync() + }() + if err != nil { return err } wr.unsyncd = 0 if wr.ranges.novelCount() > wr.maxNovel { o := wr.offset() - int64(n) // pre-commit journal offset - if err := wr.flushIndexRecord(root, o); err != nil { + if err := wr.flushIndexRecord(ctx, root, o); err != nil { return err } } @@ -488,12 +496,13 @@ func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error { // flushIndexRecord writes metadata for a range of index lookups to the // out-of-band journal index file. Index records accelerate journal // bootstrapping by reducing the amount of the journal that must be processed. -func (wr *journalWriter) flushIndexRecord(root hash.Hash, end int64) (err error) { +func (wr *journalWriter) flushIndexRecord(ctx context.Context, root hash.Hash, end int64) (err error) { + defer trace.StartRegion(ctx, "flushIndexRecord").End() if err := writeJournalIndexMeta(wr.indexWriter, root, wr.indexed, end, wr.batchCrc); err != nil { return err } wr.batchCrc = 0 - wr.ranges = wr.ranges.flatten() + wr.ranges = wr.ranges.flatten(ctx) // set a new high-water-mark for the indexed portion of the journal wr.indexed = end return @@ -524,13 +533,13 @@ func (wr *journalWriter) readAt(p []byte, off int64) (n int, err error) { } // getBytes returns a buffer for writers to copy data into. -func (wr *journalWriter) getBytes(n int) (buf []byte, err error) { +func (wr *journalWriter) getBytes(ctx context.Context, n int) (buf []byte, err error) { c, l := cap(wr.buf), len(wr.buf) if n > c { err = fmt.Errorf("requested bytes (%d) exceeds capacity (%d)", n, c) return } else if n > c-l { - if err = wr.flush(); err != nil { + if err = wr.flush(ctx); err != nil { return } } @@ -541,7 +550,8 @@ func (wr *journalWriter) getBytes(n int) (buf []byte, err error) { } // flush writes buffered data into the journal file. -func (wr *journalWriter) flush() (err error) { +func (wr *journalWriter) flush(ctx context.Context) (err error) { + defer trace.StartRegion(ctx, "flush journal").End() if _, err = wr.journal.WriteAt(wr.buf, wr.off); err != nil { return err } @@ -551,7 +561,7 @@ func (wr *journalWriter) flush() (err error) { } // maybeFlush flushes buffered data, if any exists. -func (wr *journalWriter) maybeFlush() (err error) { +func (wr *journalWriter) maybeFlush(ctx context.Context) (err error) { wr.lock.RLock() empty := len(wr.buf) == 0 wr.lock.RUnlock() @@ -560,7 +570,7 @@ func (wr *journalWriter) maybeFlush() (err error) { } wr.lock.Lock() defer wr.lock.Unlock() - return wr.flush() + return wr.flush(ctx) } type journalWriterSnapshot struct { @@ -574,10 +584,10 @@ func (s journalWriterSnapshot) Close() error { // snapshot returns an io.Reader with a consistent view of // the current state of the journal file. -func (wr *journalWriter) snapshot() (io.ReadCloser, int64, error) { +func (wr *journalWriter) snapshot(ctx context.Context) (io.ReadCloser, int64, error) { wr.lock.Lock() defer wr.lock.Unlock() - if err := wr.flush(); err != nil { + if err := wr.flush(ctx); err != nil { return nil, 0, err } // open a new file descriptor with an @@ -625,7 +635,7 @@ func (wr *journalWriter) Close() (err error) { return nil } - if err = wr.flush(); err != nil { + if err = wr.flush(context.Background()); err != nil { return err } if wr.index != nil { @@ -709,7 +719,10 @@ func (idx rangeIndex) novelLookups() (lookups []lookup) { return } -func (idx rangeIndex) flatten() rangeIndex { +func (idx rangeIndex) flatten(ctx context.Context) rangeIndex { + defer trace.StartRegion(ctx, "flatten journal index").End() + trace.Logf(ctx, "swiss map current count", "%d", idx.cached.Count()) + trace.Logf(ctx, "swiss map add count", "%d", idx.novel.Count()) idx.novel.Iter(func(a hash.Hash, r Range) (stop bool) { idx.cached.Put(toAddr16(a), r) return diff --git a/go/store/nbs/journal_writer_test.go b/go/store/nbs/journal_writer_test.go index cd233eae32..df8c45946f 100644 --- a/go/store/nbs/journal_writer_test.go +++ b/go/store/nbs/journal_writer_test.go @@ -163,13 +163,13 @@ func TestJournalWriterReadWrite(t *testing.T) { assert.Equal(t, op.buf, act, "operation %d failed", i) case writeOp: var p []byte - p, err = j.getBytes(len(op.buf)) + p, err = j.getBytes(context.Background(), len(op.buf)) require.NoError(t, err, "operation %d errored", i) n := copy(p, op.buf) assert.Equal(t, len(op.buf), n, "operation %d failed", i) off += int64(n) case flushOp: - err = j.flush() + err = j.flush(context.Background()) assert.NoError(t, err, "operation %d errored", i) default: t.Fatal("unknown opKind") @@ -195,7 +195,7 @@ func TestJournalWriterWriteCompressedChunk(t *testing.T) { j := newTestJournalWriter(t, path) data := randomCompressedChunks(1024) for a, cc := range data { - err := j.writeCompressedChunk(cc) + err := j.writeCompressedChunk(context.Background(), cc) require.NoError(t, err) r, _ := j.ranges.get(a) validateLookup(t, j, r, cc) @@ -210,11 +210,11 @@ func TestJournalWriterBootstrap(t *testing.T) { data := randomCompressedChunks(1024) var last hash.Hash for _, cc := range data { - err := j.writeCompressedChunk(cc) + err := j.writeCompressedChunk(context.Background(), cc) require.NoError(t, err) last = cc.Hash() } - require.NoError(t, j.commitRootHash(last)) + require.NoError(t, j.commitRootHash(context.Background(), last)) require.NoError(t, j.Close()) j, _, err := openJournalWriter(ctx, path) @@ -270,10 +270,10 @@ func TestJournalWriterSyncClose(t *testing.T) { path := newTestFilePath(t) j := newTestJournalWriter(t, path) p := []byte("sit") - buf, err := j.getBytes(len(p)) + buf, err := j.getBytes(context.Background(), len(p)) require.NoError(t, err) copy(buf, p) - j.flush() + j.flush(context.Background()) assert.Equal(t, 0, len(j.buf)) assert.Equal(t, 3, int(j.off)) } @@ -341,17 +341,17 @@ func TestJournalIndexBootstrap(t *testing.T) { for i, e := range epochs { for _, cc := range e.records { recordCnt++ - assert.NoError(t, j.writeCompressedChunk(cc)) + assert.NoError(t, j.writeCompressedChunk(context.Background(), cc)) if rand.Int()%10 == 0 { // periodic commits - assert.NoError(t, j.commitRootHash(cc.H)) + assert.NoError(t, j.commitRootHash(context.Background(), cc.H)) } } - o := j.offset() // precommit offset - assert.NoError(t, j.commitRootHash(e.last)) // commit |e.last| + o := j.offset() // precommit offset + assert.NoError(t, j.commitRootHash(context.Background(), e.last)) // commit |e.last| if i == len(epochs) { break // don't index |test.novel| } - assert.NoError(t, j.flushIndexRecord(e.last, o)) // write index record + assert.NoError(t, j.flushIndexRecord(ctx, e.last, o)) // write index record } err := j.Close() require.NoError(t, err) @@ -448,7 +448,7 @@ func TestRangeIndex(t *testing.T) { } assert.Equal(t, len(data), idx.novelCount()) assert.Equal(t, len(data), int(idx.count())) - idx = idx.flatten() + idx = idx.flatten(context.Background()) assert.Equal(t, 0, idx.novelCount()) assert.Equal(t, len(data), int(idx.count())) } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index c9fd12be55..2d377afbd7 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -138,8 +138,8 @@ func (nbs *NomsBlockStore) ChunkJournal() *ChunkJournal { return nil } -func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { - locs, err := nbs.GetChunkLocations(hashes) +func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { + locs, err := nbs.GetChunkLocations(ctx, hashes) if err != nil { return nil, err } @@ -150,13 +150,13 @@ func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(hashes hash.HashSet) (map[ return toret, nil } -func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) { +func (nbs *NomsBlockStore) GetChunkLocations(ctx context.Context, hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) { gr := toGetRecords(hashes) ranges := make(map[hash.Hash]map[hash.Hash]Range) fn := func(css chunkSourceSet) error { for _, cs := range css { - rng, err := cs.getRecordRanges(gr) + rng, err := cs.getRecordRanges(ctx, gr) if err != nil { return err } diff --git a/go/store/nbs/table.go b/go/store/nbs/table.go index 1170ad7f06..e8e080c690 100644 --- a/go/store/nbs/table.go +++ b/go/store/nbs/table.go @@ -226,7 +226,7 @@ type chunkSource interface { reader(context.Context) (io.ReadCloser, uint64, error) // getRecordRanges sets getRecord.found to true, and returns a Range for each present getRecord query. - getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error) + getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) // index returns the tableIndex of this chunkSource. index() (tableIndex, error) diff --git a/go/store/nbs/table_reader.go b/go/store/nbs/table_reader.go index e7593b9a1c..820dc9f81f 100644 --- a/go/store/nbs/table_reader.go +++ b/go/store/nbs/table_reader.go @@ -668,7 +668,7 @@ func (tr tableReader) reader(ctx context.Context) (io.ReadCloser, uint64, error) return r, sz, nil } -func (tr tableReader) getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error) { +func (tr tableReader) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) { // findOffsets sets getRecord.found recs, _, err := tr.findOffsets(requests) if err != nil {