fix: make extension less memory intensive (#1241)

This commit is contained in:
abelanger5
2025-01-31 10:28:53 -05:00
committed by GitHub
parent 3185a6740d
commit 9b30a3c5a3
4 changed files with 211 additions and 19 deletions

View File

@@ -8,9 +8,12 @@ import (
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
)
type PostScheduleInput struct {
Workers map[string]*WorkerCp
type PostAssignInput struct {
HasUnassignedStepRuns bool
}
type SnapshotInput struct {
Workers map[string]*WorkerCp
WorkerSlotUtilization map[string]*SlotUtilization
}
@@ -32,7 +35,8 @@ type SlotCp struct {
type SchedulerExtension interface {
SetTenants(tenants []*dbsqlc.Tenant)
PostSchedule(tenantId string, input *PostScheduleInput)
ReportSnapshot(tenantId string, input *SnapshotInput)
PostAssign(tenantId string, input *PostAssignInput)
Cleanup() error
}
@@ -52,12 +56,22 @@ func (e *Extensions) Add(ext SchedulerExtension) {
e.exts = append(e.exts, ext)
}
func (e *Extensions) PostSchedule(tenantId string, input *PostScheduleInput) {
func (e *Extensions) ReportSnapshot(tenantId string, input *SnapshotInput) {
e.mu.RLock()
defer e.mu.RUnlock()
for _, ext := range e.exts {
f := ext.PostSchedule
f := ext.ReportSnapshot
go f(tenantId, input)
}
}
func (e *Extensions) PostAssign(tenantId string, input *PostAssignInput) {
e.mu.RLock()
defer e.mu.RUnlock()
for _, ext := range e.exts {
f := ext.PostAssign
go f(tenantId, input)
}
}

View File

@@ -0,0 +1,84 @@
// Copyright (c) 2020 Filip Wojciechowski
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package randomticker
import (
"math/rand"
"time"
)
// RandomTicker is similar to time.Ticker but ticks at random intervals between
// the min and max duration values (stored internally as int64 nanosecond
// counts).
type RandomTicker struct {
C chan time.Time
stopc chan chan struct{}
min int64
max int64
}
// NewRandomTicker returns a pointer to an initialized instance of the
// RandomTicker. Min and max are durations of the shortest and longest allowed
// ticks. Ticker will run in a goroutine until explicitly stopped.
func NewRandomTicker(minDuration, maxDuration time.Duration) *RandomTicker {
rt := &RandomTicker{
C: make(chan time.Time),
stopc: make(chan chan struct{}),
min: minDuration.Nanoseconds(),
max: maxDuration.Nanoseconds(),
}
go rt.loop()
return rt
}
// Stop terminates the ticker goroutine and closes the C channel.
func (rt *RandomTicker) Stop() {
c := make(chan struct{})
rt.stopc <- c
<-c
}
func (rt *RandomTicker) loop() {
defer close(rt.C)
t := time.NewTimer(rt.nextInterval())
for {
// either a stop signal or a timeout
select {
case c := <-rt.stopc:
t.Stop()
close(c)
return
case <-t.C:
select {
case rt.C <- time.Now():
t.Stop()
t = time.NewTimer(rt.nextInterval())
default:
// there could be noone receiving...
}
}
}
}
func (rt *RandomTicker) nextInterval() time.Duration {
interval := rand.Int63n(rt.max-rt.min) + rt.min // nolint: gosec
return time.Duration(interval) * time.Nanosecond
}

View File

@@ -0,0 +1,61 @@
// Copyright (c) 2020 Filip Wojciechowski
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package randomticker_test
import (
"testing"
"time"
"github.com/hatchet-dev/hatchet/pkg/scheduling/v2/randomticker"
)
func TestRandomTicker(t *testing.T) {
t.Parallel()
minDuration := time.Duration(10)
maxDuration := time.Duration(20)
// tick can take a little longer since we're not adjusting it to account for
// processing.
precision := time.Duration(4)
rt := randomticker.NewRandomTicker(minDuration*time.Millisecond, maxDuration*time.Millisecond)
for i := 0; i < 5; i++ {
t0 := time.Now()
t1 := <-rt.C
td := t1.Sub(t0)
if td < minDuration*time.Millisecond {
t.Fatalf("tick was shorter than expected: %s", td)
} else if td > (maxDuration+precision)*time.Millisecond {
t.Fatalf("tick was longer than expected: %s", td)
}
}
rt.Stop()
time.Sleep((maxDuration + precision) * time.Millisecond)
select {
case v, ok := <-rt.C:
if ok || !v.IsZero() {
t.Fatal("ticker did not shut down")
}
default:
t.Fatal("expected to receive close channel signal")
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/scheduling/v2/randomticker"
)
// Scheduler is responsible for scheduling steps to workers as efficiently as possible.
@@ -379,8 +380,34 @@ func (s *Scheduler) loopReplenish(ctx context.Context) {
}
}
func (s *Scheduler) loopSnapshot(ctx context.Context) {
ticker := randomticker.NewRandomTicker(10*time.Millisecond, 90*time.Millisecond)
defer ticker.Stop()
for {
count := 0
select {
case <-ctx.Done():
return
case <-ticker.C:
// require that 1 out of every 20 snapshots is taken
must := count%20 == 0
in, ok := s.getSnapshotInput(must)
if !ok {
continue
}
s.exts.ReportSnapshot(sqlchelpers.UUIDToStr(s.tenantId), in)
}
}
}
func (s *Scheduler) start(ctx context.Context) {
go s.loopReplenish(ctx)
go s.loopSnapshot(ctx)
}
type scheduleRateLimitResult struct {
@@ -751,9 +778,7 @@ func (s *Scheduler) tryAssign(
span.End()
close(resultsCh)
extInput := s.getExtensionInput(extensionResults)
s.exts.PostSchedule(sqlchelpers.UUIDToStr(s.tenantId), extInput)
s.exts.PostAssign(sqlchelpers.UUIDToStr(s.tenantId), s.getExtensionInput(extensionResults))
if sinceStart := time.Since(startTotal); sinceStart > 100*time.Millisecond {
s.l.Warn().Dur("duration", sinceStart).Msgf("assigning queue items took longer than 100ms")
@@ -763,16 +788,32 @@ func (s *Scheduler) tryAssign(
return resultsCh
}
func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInput {
func (s *Scheduler) getExtensionInput(results []*assignResults) *PostAssignInput {
unassigned := make([]*dbsqlc.QueueItem, 0)
for _, res := range results {
unassigned = append(unassigned, res.unassigned...)
}
return &PostAssignInput{
HasUnassignedStepRuns: len(unassigned) > 0,
}
}
func (s *Scheduler) getSnapshotInput(mustSnapshot bool) (*SnapshotInput, bool) {
if mustSnapshot {
s.actionsMu.RLock()
} else {
if ok := s.actionsMu.TryRLock(); !ok {
return nil, false
}
}
defer s.actionsMu.RUnlock()
workers := s.getWorkers()
res := &PostScheduleInput{
res := &SnapshotInput{
Workers: make(map[string]*WorkerCp),
}
@@ -788,14 +829,10 @@ func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInp
// functions. we always acquire actionsMu first and then the specific action's lock.
actionKeys := make([]string, 0, len(s.actions))
s.actionsMu.RLock()
for actionId := range s.actions {
actionKeys = append(actionKeys, actionId)
}
s.actionsMu.RUnlock()
uniqueSlots := make(map[*slot]bool)
workerSlotUtilization := make(map[string]*SlotUtilization)
@@ -808,9 +845,7 @@ func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInp
}
for _, actionId := range actionKeys {
s.actionsMu.RLock()
action, ok := s.actions[actionId]
s.actionsMu.RUnlock()
if !ok || action == nil {
continue
@@ -845,9 +880,7 @@ func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInp
res.WorkerSlotUtilization = workerSlotUtilization
res.HasUnassignedStepRuns = len(unassigned) > 0
return res
return res, true
}
func isTimedOut(qi *dbsqlc.QueueItem) bool {