mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-03-23 13:11:38 -05:00
* print error log temporarily * casing * only for create-monitoring-event * rate limit iterator * add a debugger * remove rate limiter * improve otel on trigger * cache probability stuff * track misses * move down one ln * default * Fix: Pass tx down into payload retrieve (#2483) * [Python] Feat: Dataclass Support (#2476) * fix: prevent lifespan error from hanging worker * fix: handle cleanup * feat: dataclass outputs * feat: dataclasses * feat: incremental dataclass work * feat: dataclass tests * fix: lint * fix: register wf * fix: ugh * chore: changelog * fix: validation issue * fix: none check * fix: lint * fix: error type * chore: regenerate examples (#2477) Co-authored-by: GitHub Action <action@github.com> * feat: add health and metrics api on typescript sdk worker (#2457) * feat: add health and metrics api on typescript sdk worker add: prom-client to fetch metrics data add: track health status of worker across different states * refactor: keep prom-client as optional dependency * refactor: remove async import of prom-client * chore: update package version for ts sdk * fix: lint * fix: lint, const enum --------- Co-authored-by: mrkaye97 <mrkaye97@gmail.com> * Update frontend onboarding steps (#2478) * Update frontend onboarding steps * Update sidebar as well * Fix Go SDK cron inputs (#2481) * cron input in Go SDK * add example * fix: pass tx down to retrieve * fix: attempt 2, another pool use * fix: spans and debugging for task statuses * attempted hotfix on olap statuses * process tenants in parallel in prom worker --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: GitHub Action <action@github.com> Co-authored-by: Jishnu <jishnun789@gmail.com> Co-authored-by: Sid Premkumar <sid.premkumar@gmail.com> Co-authored-by: Mohammed Nafees <hello@mnafees.me> Co-authored-by: Alexander Belanger <alexander@hatchet.run> * move debugger package, clean up init * remove probability factor logic * remove debug * fix: debugger instantiation --------- Co-authored-by: Alexander Belanger <alexander@hatchet.run> Co-authored-by: gabriel ruttner <gabriel.ruttner@gmail.com> Co-authored-by: mrkaye97 <mrkaye97@gmail.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: GitHub Action <action@github.com> Co-authored-by: Jishnu <jishnun789@gmail.com> Co-authored-by: Sid Premkumar <sid.premkumar@gmail.com>
160 lines
2.6 KiB
Go
160 lines
2.6 KiB
Go
package debugger
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
type Debugger struct {
|
|
callerCounts map[string]int
|
|
activeConns map[*pgx.Conn]string
|
|
callerMu sync.Mutex
|
|
|
|
lastPrint *time.Time
|
|
|
|
l *zerolog.Logger
|
|
pool *pgxpool.Pool
|
|
poolMu sync.Mutex
|
|
}
|
|
|
|
func NewDebugger(l *zerolog.Logger) *Debugger {
|
|
return &Debugger{
|
|
callerCounts: make(map[string]int),
|
|
activeConns: make(map[*pgx.Conn]string),
|
|
l: l,
|
|
}
|
|
}
|
|
|
|
func (d *Debugger) Setup(pool *pgxpool.Pool) {
|
|
d.poolMu.Lock()
|
|
defer d.poolMu.Unlock()
|
|
|
|
d.pool = pool
|
|
d.activeConns = make(map[*pgx.Conn]string)
|
|
}
|
|
|
|
func (d *Debugger) getPool() *pgxpool.Pool {
|
|
d.poolMu.Lock()
|
|
defer d.poolMu.Unlock()
|
|
|
|
return d.pool
|
|
}
|
|
|
|
func (d *Debugger) BeforeAcquire(ctx context.Context, conn *pgx.Conn) bool {
|
|
// if we don't have a pool set yet, skip
|
|
if d.getPool() == nil {
|
|
return true
|
|
}
|
|
|
|
_, file, line, ok := runtime.Caller(5)
|
|
|
|
caller := "unknown"
|
|
|
|
if ok {
|
|
if strings.Contains(file, "tx.go") {
|
|
_, file, line, _ = runtime.Caller(6)
|
|
}
|
|
|
|
caller = fmt.Sprintf("%s:%d", file, line)
|
|
}
|
|
|
|
d.callerMu.Lock()
|
|
d.callerCounts[caller]++
|
|
d.activeConns[conn] = caller
|
|
d.callerMu.Unlock()
|
|
|
|
if d.lastPrint == nil || time.Since(*d.lastPrint) > 120*time.Second {
|
|
d.printCallerCounts()
|
|
}
|
|
|
|
if d.pool.Stat().AcquiredConns() == d.pool.Config().MaxConns {
|
|
d.printActiveCallers()
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (d *Debugger) AfterRelease(conn *pgx.Conn) bool {
|
|
if d.getPool() == nil {
|
|
return true
|
|
}
|
|
|
|
d.callerMu.Lock()
|
|
delete(d.activeConns, conn)
|
|
d.callerMu.Unlock()
|
|
|
|
return true
|
|
}
|
|
|
|
type callerCount struct {
|
|
caller string
|
|
count int
|
|
}
|
|
|
|
func (d *Debugger) printCallerCounts() {
|
|
d.callerMu.Lock()
|
|
defer d.callerMu.Unlock()
|
|
|
|
var counts []callerCount
|
|
|
|
for caller, count := range d.callerCounts {
|
|
counts = append(counts, callerCount{caller, count})
|
|
}
|
|
|
|
sort.Slice(counts, func(i, j int) bool {
|
|
return counts[i].count > counts[j].count
|
|
})
|
|
|
|
sl := d.l.Warn()
|
|
|
|
for i, c := range counts {
|
|
// print only the top 20 callers
|
|
if i >= 20 {
|
|
break
|
|
}
|
|
|
|
sl.Int(
|
|
c.caller,
|
|
c.count,
|
|
)
|
|
}
|
|
|
|
sl.Msg("top 20 callers")
|
|
|
|
d.callerCounts = make(map[string]int)
|
|
now := time.Now()
|
|
d.lastPrint = &now
|
|
}
|
|
|
|
func (d *Debugger) printActiveCallers() {
|
|
// print the active callers, grouped by caller
|
|
d.callerMu.Lock()
|
|
defer d.callerMu.Unlock()
|
|
|
|
callerMap := make(map[string]int)
|
|
|
|
for _, caller := range d.activeConns {
|
|
callerMap[caller]++
|
|
}
|
|
|
|
sl := d.l.Warn()
|
|
|
|
for caller, count := range callerMap {
|
|
sl.Int(
|
|
caller,
|
|
count,
|
|
)
|
|
}
|
|
|
|
sl.Msg("hit max database connections, showing active callers")
|
|
}
|