mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-18 07:15:00 -06:00
- Simplifies architecture for splitting engine services into different components. The three supported services are now `grpc-api`, `scheduler`, and `controllers`. The `grpc-api` service is the only one which needs to be exposed for workers. The other two can run as unexposed services. - Fixes a set of bugs and race conditions in the `v2` scheduler - Adds a `lastActive` time to the `Queue` table and includes a migration which sets this `lastActive` time for the most recent 24 hours of queues. Effectively this means that the max scheduling time in a queue is 24 hours. - Rewrites the `ListWorkflowsForEvent` query to improve performance and select far fewer rows.
55 lines
870 B
Go
55 lines
870 B
Go
package queueutils
|
|
|
|
import (
|
|
"github.com/hashicorp/go-multierror"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func BatchConcurrent[T any](batchSize int, things []T, fn func(group []T) error) error {
|
|
if batchSize <= 0 {
|
|
return nil
|
|
}
|
|
|
|
g := new(errgroup.Group)
|
|
|
|
for i := 0; i < len(things); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(things) {
|
|
end = len(things)
|
|
}
|
|
|
|
group := things[i:end]
|
|
|
|
g.Go(func() error {
|
|
return fn(group)
|
|
})
|
|
}
|
|
|
|
return g.Wait()
|
|
}
|
|
|
|
func BatchLinear[T any](batchSize int, things []T, fn func(group []T) error) error {
|
|
if batchSize <= 0 {
|
|
return nil
|
|
}
|
|
|
|
var err error
|
|
|
|
for i := 0; i < len(things); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(things) {
|
|
end = len(things)
|
|
}
|
|
|
|
group := things[i:end]
|
|
|
|
innerErr := fn(group)
|
|
|
|
if innerErr != nil {
|
|
err = multierror.Append(err, innerErr)
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|