diff --git a/pkg/scheduling/v2/extension.go b/pkg/scheduling/v2/extension.go index 2ab2c3b61..867fc607e 100644 --- a/pkg/scheduling/v2/extension.go +++ b/pkg/scheduling/v2/extension.go @@ -8,9 +8,12 @@ import ( "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc" ) -type PostScheduleInput struct { - Workers map[string]*WorkerCp +type PostAssignInput struct { HasUnassignedStepRuns bool +} + +type SnapshotInput struct { + Workers map[string]*WorkerCp WorkerSlotUtilization map[string]*SlotUtilization } @@ -32,7 +35,8 @@ type SlotCp struct { type SchedulerExtension interface { SetTenants(tenants []*dbsqlc.Tenant) - PostSchedule(tenantId string, input *PostScheduleInput) + ReportSnapshot(tenantId string, input *SnapshotInput) + PostAssign(tenantId string, input *PostAssignInput) Cleanup() error } @@ -52,12 +56,22 @@ func (e *Extensions) Add(ext SchedulerExtension) { e.exts = append(e.exts, ext) } -func (e *Extensions) PostSchedule(tenantId string, input *PostScheduleInput) { +func (e *Extensions) ReportSnapshot(tenantId string, input *SnapshotInput) { e.mu.RLock() defer e.mu.RUnlock() for _, ext := range e.exts { - f := ext.PostSchedule + f := ext.ReportSnapshot + go f(tenantId, input) + } +} + +func (e *Extensions) PostAssign(tenantId string, input *PostAssignInput) { + e.mu.RLock() + defer e.mu.RUnlock() + + for _, ext := range e.exts { + f := ext.PostAssign go f(tenantId, input) } } diff --git a/pkg/scheduling/v2/randomticker/ticker.go b/pkg/scheduling/v2/randomticker/ticker.go new file mode 100644 index 000000000..3ca8319e4 --- /dev/null +++ b/pkg/scheduling/v2/randomticker/ticker.go @@ -0,0 +1,84 @@ +// Copyright (c) 2020 Filip Wojciechowski + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package randomticker + +import ( + "math/rand" + "time" +) + +// RandomTicker is similar to time.Ticker but ticks at random intervals between +// the min and max duration values (stored internally as int64 nanosecond +// counts). +type RandomTicker struct { + C chan time.Time + stopc chan chan struct{} + min int64 + max int64 +} + +// NewRandomTicker returns a pointer to an initialized instance of the +// RandomTicker. Min and max are durations of the shortest and longest allowed +// ticks. Ticker will run in a goroutine until explicitly stopped. +func NewRandomTicker(minDuration, maxDuration time.Duration) *RandomTicker { + rt := &RandomTicker{ + C: make(chan time.Time), + stopc: make(chan chan struct{}), + min: minDuration.Nanoseconds(), + max: maxDuration.Nanoseconds(), + } + go rt.loop() + return rt +} + +// Stop terminates the ticker goroutine and closes the C channel. +func (rt *RandomTicker) Stop() { + c := make(chan struct{}) + rt.stopc <- c + <-c +} + +func (rt *RandomTicker) loop() { + defer close(rt.C) + t := time.NewTimer(rt.nextInterval()) + for { + // either a stop signal or a timeout + select { + case c := <-rt.stopc: + t.Stop() + close(c) + return + case <-t.C: + select { + case rt.C <- time.Now(): + t.Stop() + t = time.NewTimer(rt.nextInterval()) + default: + // there could be noone receiving... + } + } + } +} + +func (rt *RandomTicker) nextInterval() time.Duration { + interval := rand.Int63n(rt.max-rt.min) + rt.min // nolint: gosec + return time.Duration(interval) * time.Nanosecond +} diff --git a/pkg/scheduling/v2/randomticker/ticker_test.go b/pkg/scheduling/v2/randomticker/ticker_test.go new file mode 100644 index 000000000..a87834994 --- /dev/null +++ b/pkg/scheduling/v2/randomticker/ticker_test.go @@ -0,0 +1,61 @@ +// Copyright (c) 2020 Filip Wojciechowski + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package randomticker_test + +import ( + "testing" + "time" + + "github.com/hatchet-dev/hatchet/pkg/scheduling/v2/randomticker" +) + +func TestRandomTicker(t *testing.T) { + t.Parallel() + + minDuration := time.Duration(10) + maxDuration := time.Duration(20) + + // tick can take a little longer since we're not adjusting it to account for + // processing. + precision := time.Duration(4) + + rt := randomticker.NewRandomTicker(minDuration*time.Millisecond, maxDuration*time.Millisecond) + for i := 0; i < 5; i++ { + t0 := time.Now() + t1 := <-rt.C + td := t1.Sub(t0) + if td < minDuration*time.Millisecond { + t.Fatalf("tick was shorter than expected: %s", td) + } else if td > (maxDuration+precision)*time.Millisecond { + t.Fatalf("tick was longer than expected: %s", td) + } + } + rt.Stop() + time.Sleep((maxDuration + precision) * time.Millisecond) + select { + case v, ok := <-rt.C: + if ok || !v.IsZero() { + t.Fatal("ticker did not shut down") + } + default: + t.Fatal("expected to receive close channel signal") + } +} diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index 3525e983b..b32bdcc31 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -14,6 +14,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/scheduling/v2/randomticker" ) // Scheduler is responsible for scheduling steps to workers as efficiently as possible. @@ -379,8 +380,34 @@ func (s *Scheduler) loopReplenish(ctx context.Context) { } } +func (s *Scheduler) loopSnapshot(ctx context.Context) { + ticker := randomticker.NewRandomTicker(10*time.Millisecond, 90*time.Millisecond) + defer ticker.Stop() + + for { + count := 0 + + select { + case <-ctx.Done(): + return + case <-ticker.C: + // require that 1 out of every 20 snapshots is taken + must := count%20 == 0 + + in, ok := s.getSnapshotInput(must) + + if !ok { + continue + } + + s.exts.ReportSnapshot(sqlchelpers.UUIDToStr(s.tenantId), in) + } + } +} + func (s *Scheduler) start(ctx context.Context) { go s.loopReplenish(ctx) + go s.loopSnapshot(ctx) } type scheduleRateLimitResult struct { @@ -751,9 +778,7 @@ func (s *Scheduler) tryAssign( span.End() close(resultsCh) - extInput := s.getExtensionInput(extensionResults) - - s.exts.PostSchedule(sqlchelpers.UUIDToStr(s.tenantId), extInput) + s.exts.PostAssign(sqlchelpers.UUIDToStr(s.tenantId), s.getExtensionInput(extensionResults)) if sinceStart := time.Since(startTotal); sinceStart > 100*time.Millisecond { s.l.Warn().Dur("duration", sinceStart).Msgf("assigning queue items took longer than 100ms") @@ -763,16 +788,32 @@ func (s *Scheduler) tryAssign( return resultsCh } -func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInput { +func (s *Scheduler) getExtensionInput(results []*assignResults) *PostAssignInput { unassigned := make([]*dbsqlc.QueueItem, 0) for _, res := range results { unassigned = append(unassigned, res.unassigned...) } + return &PostAssignInput{ + HasUnassignedStepRuns: len(unassigned) > 0, + } +} + +func (s *Scheduler) getSnapshotInput(mustSnapshot bool) (*SnapshotInput, bool) { + if mustSnapshot { + s.actionsMu.RLock() + } else { + if ok := s.actionsMu.TryRLock(); !ok { + return nil, false + } + } + + defer s.actionsMu.RUnlock() + workers := s.getWorkers() - res := &PostScheduleInput{ + res := &SnapshotInput{ Workers: make(map[string]*WorkerCp), } @@ -788,14 +829,10 @@ func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInp // functions. we always acquire actionsMu first and then the specific action's lock. actionKeys := make([]string, 0, len(s.actions)) - s.actionsMu.RLock() - for actionId := range s.actions { actionKeys = append(actionKeys, actionId) } - s.actionsMu.RUnlock() - uniqueSlots := make(map[*slot]bool) workerSlotUtilization := make(map[string]*SlotUtilization) @@ -808,9 +845,7 @@ func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInp } for _, actionId := range actionKeys { - s.actionsMu.RLock() action, ok := s.actions[actionId] - s.actionsMu.RUnlock() if !ok || action == nil { continue @@ -845,9 +880,7 @@ func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInp res.WorkerSlotUtilization = workerSlotUtilization - res.HasUnassignedStepRuns = len(unassigned) > 0 - - return res + return res, true } func isTimedOut(qi *dbsqlc.QueueItem) bool {