Hotfix: Go SDK scheduled workflows issues (#1727)

* fix: scheduled workflow namespace + input

* fix: send namespace through to schedules client

* tweak: do nothing with invalid json

* fix: helper
This commit is contained in:
Matt Kaye
2025-05-16 08:11:33 -04:00
committed by GitHub
parent 51f2bc42dc
commit 3e7a73a81c
9 changed files with 50 additions and 30 deletions
@@ -530,6 +530,11 @@ func ToScheduledWorkflowsFromSQLC(scheduled *dbsqlc.ListScheduledWorkflowsRow) *
workflowRunIdPtr = &workflowRunId
}
input := make(map[string]interface{})
if scheduled.Input != nil {
json.Unmarshal(scheduled.Input, &input)
}
res := &gen.ScheduledWorkflows{
Metadata: *toAPIMetadata(sqlchelpers.UUIDToStr(scheduled.ID), scheduled.CreatedAt.Time, scheduled.UpdatedAt.Time),
WorkflowVersionId: sqlchelpers.UUIDToStr(scheduled.WorkflowVersionId),
@@ -544,6 +549,7 @@ func ToScheduledWorkflowsFromSQLC(scheduled *dbsqlc.ListScheduledWorkflowsRow) *
WorkflowRunName: &scheduled.WorkflowRunName.String,
Method: gen.ScheduledWorkflowsMethod(scheduled.Method),
Priority: &scheduled.Priority,
Input: &input,
}
return res
+6 -12
View File
@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"math"
"strings"
"sync"
"time"
@@ -17,6 +16,7 @@ import (
admincontracts "github.com/hatchet-dev/hatchet/internal/services/admin/contracts"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/hatchet-dev/hatchet/pkg/config/client"
"github.com/hatchet-dev/hatchet/pkg/validator"
v1contracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1"
@@ -190,6 +190,8 @@ func (a *adminClientImpl) ScheduleWorkflow(workflowName string, fs ...ScheduleOp
return err
}
workflowName = client.ApplyNamespace(workflowName, &a.namespace)
_, err = a.client.ScheduleWorkflow(a.ctx.newContext(context.Background()), &admincontracts.ScheduleWorkflowRequest{
Name: workflowName,
Schedules: pbSchedules,
@@ -244,9 +246,7 @@ func (a *adminClientImpl) RunWorkflow(workflowName string, input interface{}, op
return nil, fmt.Errorf("could not marshal input: %w", err)
}
if a.namespace != "" && !strings.HasPrefix(workflowName, a.namespace) {
workflowName = fmt.Sprintf("%s%s", a.namespace, workflowName)
}
workflowName = client.ApplyNamespace(workflowName, &a.namespace)
request := &admincontracts.TriggerWorkflowRequest{
Name: workflowName,
@@ -328,9 +328,7 @@ func (a *adminClientImpl) RunChildWorkflow(workflowName string, input interface{
return "", fmt.Errorf("could not marshal input: %w", err)
}
if a.namespace != "" && !strings.HasPrefix(workflowName, a.namespace) {
workflowName = fmt.Sprintf("%s%s", a.namespace, workflowName)
}
workflowName = client.ApplyNamespace(workflowName, &a.namespace)
childIndex := int32(opts.ChildIndex) // nolint: gosec
@@ -389,11 +387,7 @@ func (a *adminClientImpl) RunChildWorkflows(workflows []*RunChildWorkflowsOpts)
return nil, fmt.Errorf("could not marshal input: %w", err)
}
var workflowName = workflow.WorkflowName
if a.namespace != "" && !strings.HasPrefix(workflow.WorkflowName, a.namespace) {
workflowName = fmt.Sprintf("%s%s", a.namespace, workflow.WorkflowName)
}
workflowName := client.ApplyNamespace(workflow.WorkflowName, &a.namespace)
if workflow.Opts.ChildIndex < math.MinInt32 || workflow.Opts.ChildIndex > math.MaxInt32 {
return nil, fmt.Errorf("child index out of range")
+3 -1
View File
@@ -10,6 +10,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
eventcontracts "github.com/hatchet-dev/hatchet/internal/services/ingestor/contracts"
"github.com/hatchet-dev/hatchet/pkg/config/client"
"github.com/hatchet-dev/hatchet/pkg/validator"
)
@@ -74,9 +75,10 @@ func WithEventMetadata(metadata map[string]string) PushOpFunc {
}
func (a *eventClientImpl) Push(ctx context.Context, eventKey string, payload interface{}, options ...PushOpFunc) error {
key := client.ApplyNamespace(eventKey, &a.namespace)
request := eventcontracts.PushEventRequest{
Key: a.namespace + eventKey,
Key: key,
EventTimestamp: timestamppb.Now(),
}
+14
View File
@@ -2,6 +2,8 @@ package client
import (
"crypto/tls"
"fmt"
"strings"
"github.com/spf13/viper"
@@ -79,3 +81,15 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("tls.base.tlsRootCA", "HATCHET_CLIENT_TLS_ROOT_CA")
_ = v.BindEnv("tls.tlsServerName", "HATCHET_CLIENT_TLS_SERVER_NAME")
}
func ApplyNamespace(resourceName string, namespace *string) string {
if namespace == nil || *namespace == "" {
return resourceName
}
if strings.HasPrefix(resourceName, *namespace) {
return resourceName
}
return fmt.Sprintf("%s%s", *namespace, resourceName)
}
+3 -1
View File
@@ -158,7 +158,9 @@ func (c *v1HatchetClientImpl) Schedules() features.SchedulesClient {
if c.schedules == nil {
api := c.V0().API()
tenantId := c.V0().TenantId()
c.schedules = features.NewSchedulesClient(api, &tenantId)
namespace := c.V0().Namespace()
c.schedules = features.NewSchedulesClient(api, &tenantId, &namespace)
}
return c.schedules
}
+9 -4
View File
@@ -6,6 +6,7 @@ import (
"github.com/google/uuid"
"github.com/hatchet-dev/hatchet/pkg/client/rest"
"github.com/hatchet-dev/hatchet/pkg/config/client"
)
// SchedulesClient provides methods for interacting with workflow schedules
@@ -40,25 +41,29 @@ type CreateScheduledRunTrigger struct {
// schedulesClientImpl implements the SchedulesClient interface.
type schedulesClientImpl struct {
api *rest.ClientWithResponses
tenantId uuid.UUID
api *rest.ClientWithResponses
tenantId uuid.UUID
namespace *string
}
// NewSchedulesClient creates a new client for interacting with workflow schedules.
func NewSchedulesClient(
api *rest.ClientWithResponses,
tenantId *string,
namespace *string,
) SchedulesClient {
tenantIdUUID := uuid.MustParse(*tenantId)
return &schedulesClientImpl{
api: api,
tenantId: tenantIdUUID,
api: api,
tenantId: tenantIdUUID,
namespace: namespace,
}
}
// Create creates a new scheduled workflow run.
func (s *schedulesClientImpl) Create(ctx context.Context, workflowName string, trigger CreateScheduledRunTrigger) (*rest.ScheduledWorkflows, error) {
workflowName = client.ApplyNamespace(workflowName, s.namespace)
request := rest.ScheduleWorkflowRunRequest{
Input: trigger.Input,
+2 -1
View File
@@ -157,9 +157,10 @@ func NewWorkflowDeclaration[I any, O any](opts create.WorkflowCreateOpts[I], v0
api := v0.API()
tenantId := v0.TenantId()
namespace := v0.Namespace()
crons := features.NewCronsClient(api, &tenantId)
schedules := features.NewSchedulesClient(api, &tenantId)
schedules := features.NewSchedulesClient(api, &tenantId, &namespace)
metrics := features.NewMetricsClient(api, &tenantId)
workflows := features.NewWorkflowsClient(api, &tenantId)
+3 -8
View File
@@ -15,6 +15,7 @@ import (
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/compute"
"github.com/hatchet-dev/hatchet/pkg/client/types"
clientconfig "github.com/hatchet-dev/hatchet/pkg/config/client"
"github.com/hatchet-dev/hatchet/pkg/errors"
"github.com/hatchet-dev/hatchet/pkg/integrations"
"github.com/hatchet-dev/hatchet/pkg/logger"
@@ -269,14 +270,8 @@ func (w *Worker) Use(mws ...MiddlewareFunc) {
}
func (w *Worker) NewService(name string) *Service {
svcName := name
if w.client.Namespace() != "" && !strings.HasPrefix(name, w.client.Namespace()) {
namespace := w.client.Namespace()
svcName = namespace + name
}
svcName = strings.ToLower(svcName)
ns := w.client.Namespace()
svcName := strings.ToLower(clientconfig.ApplyNamespace(name, &ns))
svc := &Service{
Name: svcName,
+4 -3
View File
@@ -9,6 +9,7 @@ import (
"github.com/hatchet-dev/hatchet/pkg/client/compute"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/hatchet-dev/hatchet/pkg/config/client"
)
type triggerConverter interface {
@@ -101,7 +102,7 @@ func (e event) ToWorkflowTriggers(wt *types.WorkflowTriggers, namespace string)
// Prepend the namespace to each event
for i, event := range wt.Events {
wt.Events[i] = namespace + event
wt.Events[i] = client.ApplyNamespace(event, &namespace)
}
}
@@ -120,7 +121,7 @@ func (e eventsArr) ToWorkflowTriggers(wt *types.WorkflowTriggers, namespace stri
// Prepend the namespace to each event
for i, event := range wt.Events {
wt.Events[i] = namespace + event
wt.Events[i] = client.ApplyNamespace(event, &namespace)
}
}
@@ -207,7 +208,7 @@ func (j *WorkflowJob) ToWorkflow(svcName string, namespace string) types.Workflo
}
w := types.Workflow{
Name: namespace + j.Name,
Name: client.ApplyNamespace(j.Name, &namespace),
Jobs: jobs,
OnFailureJob: onFailureJob,
ScheduleTimeout: j.ScheduleTimeout,