mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-20 16:52:08 -05:00
d56dee4266
* placeholder * feat: db tables for user events (#2862) * feat: db tables for user events * move event payloads to payloads table, fix env var loading * fix: address pr review comments * missed save * feat: optimistic scheduling (#2867) * feat: db tables for user events * move event payloads to payloads table, fix env var loading * refactor: small changes to prepare optimistic txs * feat: optimistic scheduling * address pr review comments * rm comments * fix: rampup test race condition * fix: goleak * feat: grpc-side triggers * fix: config and sem logic * fix: respect optimistic scheduling env var * add optimistic to testing matrix, remove pg-only mode * fix cleanup of pubbuffers * merge migrations * last testing fixes
63 lines
1.0 KiB
Go
63 lines
1.0 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/hatchet-dev/hatchet/pkg/repository/sqlchelpers"
|
|
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
|
|
)
|
|
|
|
type OptimisticTx struct {
|
|
tx sqlcv1.DBTX
|
|
commit func(ctx context.Context) error
|
|
rollback func()
|
|
postCommit []func()
|
|
}
|
|
|
|
func (s *sharedRepository) PrepareOptimisticTx(ctx context.Context) (*OptimisticTx, error) {
|
|
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, s.pool, s.l)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &OptimisticTx{
|
|
tx: tx,
|
|
commit: commit,
|
|
rollback: rollback,
|
|
postCommit: make([]func(), 0),
|
|
}, nil
|
|
}
|
|
|
|
func (o *OptimisticTx) AddPostCommit(f func()) {
|
|
o.postCommit = append(o.postCommit, f)
|
|
}
|
|
|
|
func (o *OptimisticTx) Commit(ctx context.Context) error {
|
|
err := o.commit(ctx)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, f := range o.postCommit {
|
|
doCallback(f)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (o *OptimisticTx) Rollback() {
|
|
o.rollback()
|
|
}
|
|
|
|
func doCallback(f func()) {
|
|
go func() {
|
|
defer func() {
|
|
recover() // nolint: errcheck
|
|
}()
|
|
|
|
f()
|
|
}()
|
|
}
|