mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-05 17:20:02 -05:00
Add missing Go SDK examples for worker affinity and manual slot release (#3179)
* add missing Go SDK examples for affinity and manual slot release * fix gen
This commit is contained in:
@@ -0,0 +1,82 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/client/types"
|
||||
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
|
||||
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
|
||||
)
|
||||
|
||||
func main() {
|
||||
client, err := hatchet.NewClient()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create hatchet client: %v", err)
|
||||
}
|
||||
|
||||
type AffinityOutput struct {
|
||||
Worker string `json:"worker"`
|
||||
}
|
||||
|
||||
// > AffinityWorkflow
|
||||
|
||||
affinityWorkflow := client.NewWorkflow("affinity-workflow")
|
||||
|
||||
_ = affinityWorkflow.NewTask("step", func(ctx hatchet.Context, _ any) (*AffinityOutput, error) {
|
||||
// > AffinityTask
|
||||
if ctx.Worker().GetLabels()["model"] != "fancy-ai-model-v2" {
|
||||
_ = ctx.Worker().UpsertLabels(map[string]interface{}{"model": "unset"})
|
||||
// DO WORK TO EVICT OLD MODEL / LOAD NEW MODEL
|
||||
_ = ctx.Worker().UpsertLabels(map[string]interface{}{"model": "fancy-ai-model-v2"})
|
||||
}
|
||||
|
||||
return &AffinityOutput{Worker: ctx.Worker().ID()}, nil
|
||||
})
|
||||
|
||||
_ = func() error {
|
||||
// > AffinityRun
|
||||
result, runErr := affinityWorkflow.RunNoWait(context.Background(), nil,
|
||||
hatchet.WithDesiredWorkerLabels(map[string]*hatchet.DesiredWorkerLabel{
|
||||
"model": {
|
||||
Value: "fancy-ai-model-v2",
|
||||
Weight: 10,
|
||||
},
|
||||
"memory": {
|
||||
Value: 256,
|
||||
Required: true,
|
||||
Comparator: types.ComparatorPtr(types.WorkerLabelComparator_LESS_THAN),
|
||||
},
|
||||
}),
|
||||
)
|
||||
if runErr != nil {
|
||||
return fmt.Errorf("failed to run workflow: %w", runErr)
|
||||
}
|
||||
|
||||
fmt.Println(result.RunId)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// > AffinityWorker
|
||||
worker, err := client.NewWorker("affinity-worker",
|
||||
hatchet.WithWorkflows(affinityWorkflow),
|
||||
hatchet.WithSlots(10),
|
||||
hatchet.WithLabels(map[string]any{
|
||||
"model": "fancy-ai-model-v2",
|
||||
"memory": 512,
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create worker: %v", err)
|
||||
}
|
||||
|
||||
interruptCtx, cancel := cmdutils.NewInterruptContext()
|
||||
defer cancel()
|
||||
|
||||
err = worker.StartBlocking(interruptCtx)
|
||||
if err != nil {
|
||||
log.Printf("failed to start worker: %v\n", err)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
|
||||
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
|
||||
)
|
||||
|
||||
func main() {
|
||||
client, err := hatchet.NewClient()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create hatchet client: %v", err)
|
||||
}
|
||||
|
||||
type StepOutput struct {
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
workflow := client.NewWorkflow("slot-release-workflow")
|
||||
|
||||
// > SlotRelease
|
||||
|
||||
_ = workflow.NewTask("step1", func(ctx hatchet.Context, _ any) (*StepOutput, error) {
|
||||
fmt.Println("RESOURCE INTENSIVE PROCESS")
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
// Release the slot after the resource-intensive process,
|
||||
// so that other steps can run on this worker.
|
||||
if releaseErr := ctx.ReleaseSlot(); releaseErr != nil {
|
||||
return nil, fmt.Errorf("failed to release slot: %w", releaseErr)
|
||||
}
|
||||
|
||||
fmt.Println("NON RESOURCE INTENSIVE PROCESS")
|
||||
|
||||
return &StepOutput{Status: "success"}, nil
|
||||
})
|
||||
|
||||
worker, err := client.NewWorker("slot-release-worker", hatchet.WithWorkflows(workflow))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create worker: %v", err)
|
||||
}
|
||||
|
||||
interruptCtx, cancel := cmdutils.NewInterruptContext()
|
||||
defer cancel()
|
||||
|
||||
err = worker.StartBlocking(interruptCtx)
|
||||
if err != nil {
|
||||
log.Printf("failed to start worker: %v\n", err)
|
||||
}
|
||||
}
|
||||
@@ -21,28 +21,14 @@ In some cases, you may have a task in your workflow that is resource-intensive a
|
||||
|
||||
You can manually release a slot in from within a running task in your workflow using the Hatchet context method `release_slot`:
|
||||
|
||||
<UniversalTabs items={['Python', "Ruby"]}>
|
||||
<UniversalTabs items={['Python', "Go", "Ruby"]}>
|
||||
<Tabs.Tab>
|
||||
<Snippet src={snippets.python.manual_slot_release.worker.slot_release} />
|
||||
|
||||
</Tabs.Tab>
|
||||
|
||||
<Tabs.Tab>
|
||||
|
||||
```go
|
||||
|
||||
func StepOne(ctx worker.HatchetContext) (result \*taskOneOutput, err error) {
|
||||
fmt.Println("RESOURCE INTENSIVE PROCESS")
|
||||
time.Sleep(10 * time.Second)
|
||||
// Release the slot after the resource-intensive process, so that other tasks can run
|
||||
ctx.ReleaseSlot()
|
||||
fmt.Println("NON RESOURCE INTENSIVE PROCESS")
|
||||
return &taskOneOutput{
|
||||
Message: "task1 results",
|
||||
}, nil
|
||||
},
|
||||
```
|
||||
|
||||
<Snippet src={snippets.go.manual_slot_release.main.slot_release} />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Ruby">
|
||||
<Snippet src={snippets.ruby.manual_slot_release.worker.slot_release} />
|
||||
|
||||
@@ -17,7 +17,7 @@ Specific tasks can then specify desired label state to ensure that workflows are
|
||||
|
||||
Labels can be set on workers when they are registered with Hatchet. Labels are key-value pairs that can be used to specify worker capabilities, resource availability, or other criteria that can be used to match workflows to workers. Values can be strings or numbers, and multiple labels can be set on a worker.
|
||||
|
||||
<UniversalTabs items={['Python', 'Typescript', "Ruby"]}>
|
||||
<UniversalTabs items={["Python", "Typescript", "Go", "Ruby"]}>
|
||||
<Tabs.Tab>
|
||||
|
||||
<Snippet src={snippets.python.affinity_workers.worker.affinity_worker} />
|
||||
@@ -28,17 +28,8 @@ Labels can be set on workers when they are registered with Hatchet. Labels are k
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab>
|
||||
```go
|
||||
w, err := worker.NewWorker(
|
||||
worker.WithClient(
|
||||
c,
|
||||
),
|
||||
worker.WithLabels(map[string]interface{}{
|
||||
"model": "fancy-ai-model-v2",
|
||||
"memory": 512,
|
||||
}),
|
||||
)
|
||||
```
|
||||
<Snippet src={snippets.go.affinity_workers.main.affinity_worker} />
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Ruby">
|
||||
<Snippet src={snippets.ruby.affinity_workers.worker.affinity_worker} />
|
||||
|
||||
@@ -0,0 +1,87 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/client/types"
|
||||
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
|
||||
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
|
||||
)
|
||||
|
||||
func main() {
|
||||
client, err := hatchet.NewClient()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create hatchet client: %v", err)
|
||||
}
|
||||
|
||||
type AffinityOutput struct {
|
||||
Worker string `json:"worker"`
|
||||
}
|
||||
|
||||
// > AffinityWorkflow
|
||||
|
||||
affinityWorkflow := client.NewWorkflow("affinity-workflow")
|
||||
|
||||
_ = affinityWorkflow.NewTask("step", func(ctx hatchet.Context, _ any) (*AffinityOutput, error) {
|
||||
// > AffinityTask
|
||||
if ctx.Worker().GetLabels()["model"] != "fancy-ai-model-v2" {
|
||||
_ = ctx.Worker().UpsertLabels(map[string]interface{}{"model": "unset"})
|
||||
// DO WORK TO EVICT OLD MODEL / LOAD NEW MODEL
|
||||
_ = ctx.Worker().UpsertLabels(map[string]interface{}{"model": "fancy-ai-model-v2"})
|
||||
}
|
||||
|
||||
return &AffinityOutput{Worker: ctx.Worker().ID()}, nil
|
||||
// !!
|
||||
})
|
||||
|
||||
// !!
|
||||
|
||||
_ = func() error {
|
||||
// > AffinityRun
|
||||
result, runErr := affinityWorkflow.RunNoWait(context.Background(), nil,
|
||||
hatchet.WithDesiredWorkerLabels(map[string]*hatchet.DesiredWorkerLabel{
|
||||
"model": {
|
||||
Value: "fancy-ai-model-v2",
|
||||
Weight: 10,
|
||||
},
|
||||
"memory": {
|
||||
Value: 256,
|
||||
Required: true,
|
||||
Comparator: types.ComparatorPtr(types.WorkerLabelComparator_LESS_THAN),
|
||||
},
|
||||
}),
|
||||
)
|
||||
if runErr != nil {
|
||||
return fmt.Errorf("failed to run workflow: %w", runErr)
|
||||
}
|
||||
|
||||
fmt.Println(result.RunId)
|
||||
// !!
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// > AffinityWorker
|
||||
worker, err := client.NewWorker("affinity-worker",
|
||||
hatchet.WithWorkflows(affinityWorkflow),
|
||||
hatchet.WithSlots(10),
|
||||
hatchet.WithLabels(map[string]any{
|
||||
"model": "fancy-ai-model-v2",
|
||||
"memory": 512,
|
||||
}),
|
||||
)
|
||||
// !!
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create worker: %v", err)
|
||||
}
|
||||
|
||||
interruptCtx, cancel := cmdutils.NewInterruptContext()
|
||||
defer cancel()
|
||||
|
||||
err = worker.StartBlocking(interruptCtx)
|
||||
if err != nil {
|
||||
log.Printf("failed to start worker: %v\n", err)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
|
||||
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
|
||||
)
|
||||
|
||||
func main() {
|
||||
client, err := hatchet.NewClient()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create hatchet client: %v", err)
|
||||
}
|
||||
|
||||
type StepOutput struct {
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
workflow := client.NewWorkflow("slot-release-workflow")
|
||||
|
||||
// > SlotRelease
|
||||
|
||||
_ = workflow.NewTask("step1", func(ctx hatchet.Context, _ any) (*StepOutput, error) {
|
||||
fmt.Println("RESOURCE INTENSIVE PROCESS")
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
// Release the slot after the resource-intensive process,
|
||||
// so that other steps can run on this worker.
|
||||
if releaseErr := ctx.ReleaseSlot(); releaseErr != nil {
|
||||
return nil, fmt.Errorf("failed to release slot: %w", releaseErr)
|
||||
}
|
||||
|
||||
fmt.Println("NON RESOURCE INTENSIVE PROCESS")
|
||||
|
||||
return &StepOutput{Status: "success"}, nil
|
||||
})
|
||||
|
||||
// !!
|
||||
|
||||
worker, err := client.NewWorker("slot-release-worker", hatchet.WithWorkflows(workflow))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create worker: %v", err)
|
||||
}
|
||||
|
||||
interruptCtx, cancel := cmdutils.NewInterruptContext()
|
||||
defer cancel()
|
||||
|
||||
err = worker.StartBlocking(interruptCtx)
|
||||
if err != nil {
|
||||
log.Printf("failed to start worker: %v\n", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user