[Go SDK] Case on worker labels for durable tasks (#2511)

* fix durable task worker labels

* fix assignment
This commit is contained in:
Mohammed Nafees
2025-11-12 23:02:58 +05:30
committed by GitHub
parent 9a797610fc
commit f97171f245
5 changed files with 69 additions and 4 deletions

View File

@@ -643,7 +643,6 @@ func (a *adminClientImpl) getJobOpts(jobName string, job *types.WorkflowJob) (*a
c := admincontracts.WorkerLabelComparator(*desiredLabel.Comparator)
stepOpt.WorkerLabels[key].Comparator = &c
}
}
}

View File

@@ -199,6 +199,45 @@ func makeContractTaskOpts(t *TaskShared, taskDefaults *create.TaskDefaults) *con
taskOpts.BackoffMaxSeconds = t.RetryMaxBackoffSeconds
}
if len(t.WorkerLabels) > 0 {
taskOpts.WorkerLabels = make(map[string]*contracts.DesiredWorkerLabels)
for key, value := range t.WorkerLabels {
taskOpts.WorkerLabels[key] = &contracts.DesiredWorkerLabels{}
switch v := value.Value.(type) {
case string:
strValue := v
taskOpts.WorkerLabels[key].StrValue = &strValue
case int:
intValue := int32(v) // nolint: gosec
taskOpts.WorkerLabels[key].IntValue = &intValue
case int32:
taskOpts.WorkerLabels[key].IntValue = &v
case int64:
intValue := int32(v) // nolint: gosec
taskOpts.WorkerLabels[key].IntValue = &intValue
default:
// For any other type, convert to string
strValue := fmt.Sprintf("%v", v)
taskOpts.WorkerLabels[key].StrValue = &strValue
}
if value.Required {
taskOpts.WorkerLabels[key].Required = &value.Required
}
if value.Weight != 0 {
taskOpts.WorkerLabels[key].Weight = &value.Weight
}
if value.Comparator != nil {
c := contracts.WorkerLabelComparator(*value.Comparator)
taskOpts.WorkerLabels[key].Comparator = &c
}
}
}
// Apply workflow task defaults if they are not set
if taskDefaults != nil {
if t.Retries == nil && taskDefaults.Retries != 0 {

View File

@@ -190,13 +190,18 @@ func (w *WorkerImpl) RegisterWorkflows(workflows ...workflow.WorkflowBase) error
logger = w.nonDurableWorker.Logger()
}
labels := make(map[string]interface{})
for k, v := range w.labels {
labels[k] = fmt.Sprintf("%v-durable", v)
}
opts := []worker.WorkerOpt{
worker.WithClient(w.v0),
worker.WithName(w.name + "-durable"),
worker.WithMaxRuns(w.durableSlots),
worker.WithLogger(logger),
worker.WithLogLevel(w.logLevel),
worker.WithLabels(w.labels),
worker.WithLabels(labels),
}
durableWorker, err := worker.NewWorker(

View File

@@ -398,6 +398,17 @@ func (w *workflowDeclarationImpl[I, O]) DurableTask(opts create.WorkflowTask[I,
parentNames[i] = parent.GetName()
}
labels := make(map[string]*types.DesiredWorkerLabel)
for k, v := range opts.WorkerLabels {
labels[k] = &types.DesiredWorkerLabel{
Value: fmt.Sprintf("%v-durable", v.Value),
Required: v.Required,
Weight: v.Weight,
Comparator: v.Comparator,
}
}
taskDecl := &task.DurableTaskDeclaration[I]{
Name: opts.Name,
Fn: genericFn,
@@ -412,7 +423,7 @@ func (w *workflowDeclarationImpl[I, O]) DurableTask(opts create.WorkflowTask[I,
RetryBackoffFactor: retryBackoffFactor,
RetryMaxBackoffSeconds: retryMaxBackoffSeconds,
RateLimits: opts.RateLimits,
WorkerLabels: opts.WorkerLabels,
WorkerLabels: labels,
Concurrency: opts.Concurrency,
},
}

View File

@@ -391,6 +391,17 @@ func (w *workflowDeclarationImpl[I, O]) DurableTask(opts create.WorkflowTask[I,
parentNames[i] = parent.GetName()
}
labels := make(map[string]*types.DesiredWorkerLabel)
for k, v := range opts.WorkerLabels {
labels[k] = &types.DesiredWorkerLabel{
Value: fmt.Sprintf("%v-durable", v.Value),
Required: v.Required,
Weight: v.Weight,
Comparator: v.Comparator,
}
}
taskDecl := &task.DurableTaskDeclaration[I]{
Name: opts.Name,
Fn: genericFn,
@@ -405,7 +416,7 @@ func (w *workflowDeclarationImpl[I, O]) DurableTask(opts create.WorkflowTask[I,
RetryBackoffFactor: retryBackoffFactor,
RetryMaxBackoffSeconds: retryMaxBackoffSeconds,
RateLimits: opts.RateLimits,
WorkerLabels: opts.WorkerLabels,
WorkerLabels: labels,
Concurrency: opts.Concurrency,
},
}