Files
hatchet/internal/queueutils/batch.go
abelanger5 2cdee59aea refactor: optimize v0.50.0 release (#975)
- Simplifies architecture for splitting engine services into different components. The three supported services are now `grpc-api`, `scheduler`, and `controllers`. The `grpc-api` service is the only one which needs to be exposed for workers. The other two can run as unexposed services.
- Fixes a set of bugs and race conditions in the `v2` scheduler
- Adds a `lastActive` time to the `Queue` table and includes a migration which sets this `lastActive` time for the most recent 24 hours of queues. Effectively this means that the max scheduling time in a queue is 24 hours. 
- Rewrites the `ListWorkflowsForEvent` query to improve performance and select far fewer rows.
2024-10-23 12:05:16 +00:00

55 lines
870 B
Go

package queueutils
import (
"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"
)
func BatchConcurrent[T any](batchSize int, things []T, fn func(group []T) error) error {
if batchSize <= 0 {
return nil
}
g := new(errgroup.Group)
for i := 0; i < len(things); i += batchSize {
end := i + batchSize
if end > len(things) {
end = len(things)
}
group := things[i:end]
g.Go(func() error {
return fn(group)
})
}
return g.Wait()
}
func BatchLinear[T any](batchSize int, things []T, fn func(group []T) error) error {
if batchSize <= 0 {
return nil
}
var err error
for i := 0; i < len(things); i += batchSize {
end := i + batchSize
if end > len(things) {
end = len(things)
}
group := things[i:end]
innerErr := fn(group)
if innerErr != nil {
err = multierror.Append(err, innerErr)
}
}
return err
}