Files
hatchet/pkg/repository/optimistic_tx.go
abelanger5 d56dee4266 feat: durable user event log (#2861)
* placeholder

* feat: db tables for user events (#2862)

* feat: db tables for user events

* move event payloads to payloads table, fix env var loading

* fix: address pr review comments

* missed save

* feat: optimistic scheduling (#2867)

* feat: db tables for user events

* move event payloads to payloads table, fix env var loading

* refactor: small changes to prepare optimistic txs

* feat: optimistic scheduling

* address pr review comments

* rm comments

* fix: rampup test race condition

* fix: goleak

* feat: grpc-side triggers

* fix: config and sem logic

* fix: respect optimistic scheduling env var

* add optimistic to testing matrix, remove pg-only mode

* fix cleanup of pubbuffers

* merge migrations

* last testing fixes
2026-02-02 18:04:02 -05:00

63 lines
1.0 KiB
Go

package repository
import (
"context"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
)
type OptimisticTx struct {
tx sqlcv1.DBTX
commit func(ctx context.Context) error
rollback func()
postCommit []func()
}
func (s *sharedRepository) PrepareOptimisticTx(ctx context.Context) (*OptimisticTx, error) {
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, s.pool, s.l)
if err != nil {
return nil, err
}
return &OptimisticTx{
tx: tx,
commit: commit,
rollback: rollback,
postCommit: make([]func(), 0),
}, nil
}
func (o *OptimisticTx) AddPostCommit(f func()) {
o.postCommit = append(o.postCommit, f)
}
func (o *OptimisticTx) Commit(ctx context.Context) error {
err := o.commit(ctx)
if err != nil {
return err
}
for _, f := range o.postCommit {
doCallback(f)
}
return err
}
func (o *OptimisticTx) Rollback() {
o.rollback()
}
func doCallback(f func()) {
go func() {
defer func() {
recover() // nolint: errcheck
}()
f()
}()
}