fix: separate context (#929)

* fix: separate context

* chore: comments

* chore: generate

* chore: gen

* chore: update protoc 28.2
This commit is contained in:
Gabe Ruttner
2024-10-02 07:53:51 -07:00
committed by GitHub
parent 15ff36bd3a
commit 5fcf5eff6a
8 changed files with 43 additions and 19 deletions
+1 -1
View File
@@ -12,7 +12,7 @@ jobs:
- name: Install Protoc
uses: arduino/setup-protoc@v3
with:
version: "28.1"
version: "28.2"
- name: Install Task
uses: arduino/setup-task@v2
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v5.28.1
// protoc v5.28.2
// source: workflows.proto
package contracts
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v5.28.1
// - protoc v5.28.2
// source: workflows.proto
package contracts
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v5.28.1
// protoc v5.28.2
// source: dispatcher.proto
package contracts
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v5.28.1
// - protoc v5.28.2
// source: dispatcher.proto
package contracts
+36 -12
View File
@@ -1038,10 +1038,14 @@ func (d *DispatcherImpl) RefreshTimeout(ctx context.Context, request *contracts.
}
func (s *DispatcherImpl) handleStepRunStarted(ctx context.Context, request *contracts.StepActionEvent) (*contracts.ActionEventResponse, error) {
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
func (s *DispatcherImpl) handleStepRunStarted(inputCtx context.Context, request *contracts.StepActionEvent) (*contracts.ActionEventResponse, error) {
tenant := inputCtx.Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
// run the rest on a separate context to always send to job controller
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
s.l.Debug().Msgf("Received step started event for step run %s", request.StepRunId)
startedAt := request.EventTimestamp.AsTime()
@@ -1082,12 +1086,16 @@ func (s *DispatcherImpl) handleStepRunStarted(ctx context.Context, request *cont
}, nil
}
func (s *DispatcherImpl) handleStepRunCompleted(ctx context.Context, request *contracts.StepActionEvent) (*contracts.ActionEventResponse, error) {
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
func (s *DispatcherImpl) handleStepRunCompleted(inputCtx context.Context, request *contracts.StepActionEvent) (*contracts.ActionEventResponse, error) {
tenant := inputCtx.Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
s.l.Debug().Msgf("Received step completed event for step run %s", request.StepRunId)
// run the rest on a separate context to always send to job controller
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// verify that the event payload can be unmarshalled into a map type
if request.EventPayload != "" {
res := make(map[string]interface{})
@@ -1146,10 +1154,14 @@ func (s *DispatcherImpl) handleStepRunCompleted(ctx context.Context, request *co
}, nil
}
func (s *DispatcherImpl) handleStepRunFailed(ctx context.Context, request *contracts.StepActionEvent) (*contracts.ActionEventResponse, error) {
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
func (s *DispatcherImpl) handleStepRunFailed(inputCtx context.Context, request *contracts.StepActionEvent) (*contracts.ActionEventResponse, error) {
tenant := inputCtx.Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
// run the rest on a separate context to always send to job controller
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
s.l.Debug().Msgf("Received step failed event for step run %s", request.StepRunId)
failedAt := request.EventTimestamp.AsTime()
@@ -1191,10 +1203,14 @@ func (s *DispatcherImpl) handleStepRunFailed(ctx context.Context, request *contr
}, nil
}
func (s *DispatcherImpl) handleGetGroupKeyRunStarted(ctx context.Context, request *contracts.GroupKeyActionEvent) (*contracts.ActionEventResponse, error) {
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
func (s *DispatcherImpl) handleGetGroupKeyRunStarted(inputCtx context.Context, request *contracts.GroupKeyActionEvent) (*contracts.ActionEventResponse, error) {
tenant := inputCtx.Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
// run the rest on a separate context to always send to job controller
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
s.l.Debug().Msgf("Received step started event for step run %s", request.GetGroupKeyRunId)
startedAt := request.EventTimestamp.AsTime()
@@ -1226,10 +1242,14 @@ func (s *DispatcherImpl) handleGetGroupKeyRunStarted(ctx context.Context, reques
}, nil
}
func (s *DispatcherImpl) handleGetGroupKeyRunCompleted(ctx context.Context, request *contracts.GroupKeyActionEvent) (*contracts.ActionEventResponse, error) {
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
func (s *DispatcherImpl) handleGetGroupKeyRunCompleted(inputCtx context.Context, request *contracts.GroupKeyActionEvent) (*contracts.ActionEventResponse, error) {
tenant := inputCtx.Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
// run the rest on a separate context to always send to job controller
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
s.l.Debug().Msgf("Received step completed event for step run %s", request.GetGroupKeyRunId)
finishedAt := request.EventTimestamp.AsTime()
@@ -1262,10 +1282,14 @@ func (s *DispatcherImpl) handleGetGroupKeyRunCompleted(ctx context.Context, requ
}, nil
}
func (s *DispatcherImpl) handleGetGroupKeyRunFailed(ctx context.Context, request *contracts.GroupKeyActionEvent) (*contracts.ActionEventResponse, error) {
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
func (s *DispatcherImpl) handleGetGroupKeyRunFailed(inputCtx context.Context, request *contracts.GroupKeyActionEvent) (*contracts.ActionEventResponse, error) {
tenant := inputCtx.Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
// run the rest on a separate context to always send to job controller
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
s.l.Debug().Msgf("Received step failed event for step run %s", request.GetGroupKeyRunId)
failedAt := request.EventTimestamp.AsTime()
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v5.28.1
// protoc v5.28.2
// source: events.proto
package contracts
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v5.28.1
// - protoc v5.28.2
// source: events.proto
package contracts