feat: new listen strategy with worker-side heartbeats (#308)

This commit is contained in:
abelanger5
2024-03-31 19:45:10 -07:00
committed by GitHub
parent 8183dd509a
commit cdf203dc3e
10 changed files with 673 additions and 113 deletions

View File

@@ -9,6 +9,13 @@ service Dispatcher {
rpc Listen(WorkerListenRequest) returns (stream AssignedAction) {}
// ListenV2 is like listen, but implementation does not include heartbeats. This should only used by SDKs
// against engine version v0.18.1+
rpc ListenV2(WorkerListenRequest) returns (stream AssignedAction) {}
// Heartbeat is a method for workers to send heartbeats to the dispatcher
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
rpc SubscribeToWorkflowEvents(SubscribeToWorkflowEventsRequest) returns (stream WorkflowEvent) {}
rpc SendStepActionEvent(StepActionEvent) returns (ActionEventResponse) {}
@@ -238,4 +245,14 @@ message OverridesData {
string callerFilename = 4;
}
message OverridesDataResponse {}
message OverridesDataResponse {}
message HeartbeatRequest {
// the id of the worker
string workerId = 1;
// heartbeatAt is the time the worker sent the heartbeat
google.protobuf.Timestamp heartbeatAt = 2;
}
message HeartbeatResponse {}

View File

@@ -1276,6 +1276,101 @@ func (*OverridesDataResponse) Descriptor() ([]byte, []int) {
return file_dispatcher_proto_rawDescGZIP(), []int{12}
}
type HeartbeatRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// the id of the worker
WorkerId string `protobuf:"bytes,1,opt,name=workerId,proto3" json:"workerId,omitempty"`
// heartbeatAt is the time the worker sent the heartbeat
HeartbeatAt *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=heartbeatAt,proto3" json:"heartbeatAt,omitempty"`
}
func (x *HeartbeatRequest) Reset() {
*x = HeartbeatRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_dispatcher_proto_msgTypes[13]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HeartbeatRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HeartbeatRequest) ProtoMessage() {}
func (x *HeartbeatRequest) ProtoReflect() protoreflect.Message {
mi := &file_dispatcher_proto_msgTypes[13]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HeartbeatRequest.ProtoReflect.Descriptor instead.
func (*HeartbeatRequest) Descriptor() ([]byte, []int) {
return file_dispatcher_proto_rawDescGZIP(), []int{13}
}
func (x *HeartbeatRequest) GetWorkerId() string {
if x != nil {
return x.WorkerId
}
return ""
}
func (x *HeartbeatRequest) GetHeartbeatAt() *timestamppb.Timestamp {
if x != nil {
return x.HeartbeatAt
}
return nil
}
type HeartbeatResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *HeartbeatResponse) Reset() {
*x = HeartbeatResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_dispatcher_proto_msgTypes[14]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HeartbeatResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HeartbeatResponse) ProtoMessage() {}
func (x *HeartbeatResponse) ProtoReflect() protoreflect.Message {
mi := &file_dispatcher_proto_msgTypes[14]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HeartbeatResponse.ProtoReflect.Descriptor instead.
func (*HeartbeatResponse) Descriptor() ([]byte, []int) {
return file_dispatcher_proto_rawDescGZIP(), []int{14}
}
var File_dispatcher_proto protoreflect.FileDescriptor
var file_dispatcher_proto_rawDesc = []byte{
@@ -1422,60 +1517,75 @@ var file_dispatcher_proto_rawDesc = []byte{
0x6c, 0x6c, 0x65, 0x72, 0x46, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0e, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x46, 0x69, 0x6c, 0x65, 0x6e, 0x61,
0x6d, 0x65, 0x22, 0x17, 0x0a, 0x15, 0x4f, 0x76, 0x65, 0x72, 0x72, 0x69, 0x64, 0x65, 0x73, 0x44,
0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x4e, 0x0a, 0x0a, 0x41,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41,
0x52, 0x54, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x00, 0x12, 0x13, 0x0a,
0x0f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, 0x55, 0x4e,
0x10, 0x01, 0x12, 0x17, 0x0a, 0x13, 0x53, 0x54, 0x41, 0x52, 0x54, 0x5f, 0x47, 0x45, 0x54, 0x5f,
0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x10, 0x02, 0x2a, 0xa2, 0x01, 0x0a, 0x17,
0x47, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76,
0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x20, 0x0a, 0x1c, 0x47, 0x52, 0x4f, 0x55, 0x50,
0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f,
0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c, 0x47, 0x52, 0x4f,
0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50,
0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x22, 0x0a, 0x1e, 0x47,
0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54,
0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12,
0x1f, 0x0a, 0x1b, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45,
0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x6c, 0x0a, 0x10, 0x48,
0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x12, 0x3c, 0x0a, 0x0b, 0x68,
0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x41, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x68, 0x65,
0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x41, 0x74, 0x22, 0x13, 0x0a, 0x11, 0x48, 0x65, 0x61,
0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x4e,
0x0a, 0x0a, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x0e,
0x53, 0x54, 0x41, 0x52, 0x54, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x00,
0x12, 0x13, 0x0a, 0x0f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f,
0x52, 0x55, 0x4e, 0x10, 0x01, 0x12, 0x17, 0x0a, 0x13, 0x53, 0x54, 0x41, 0x52, 0x54, 0x5f, 0x47,
0x45, 0x54, 0x5f, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x10, 0x02, 0x2a, 0xa2,
0x01, 0x0a, 0x17, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x41, 0x63, 0x74, 0x69, 0x6f,
0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x20, 0x0a, 0x1c, 0x47, 0x52,
0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59,
0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c,
0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f,
0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x22,
0x0a, 0x1e, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e,
0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44,
0x10, 0x02, 0x12, 0x1f, 0x0a, 0x1b, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f,
0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45,
0x44, 0x10, 0x03, 0x2a, 0x8a, 0x01, 0x0a, 0x13, 0x53, 0x74, 0x65, 0x70, 0x41, 0x63, 0x74, 0x69,
0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x17, 0x53,
0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55,
0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x45, 0x50,
0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52,
0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56,
0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54,
0x45, 0x44, 0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45,
0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03,
0x2a, 0x8a, 0x01, 0x0a, 0x13, 0x53, 0x74, 0x65, 0x70, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45,
0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x45, 0x50,
0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e,
0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56,
0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44,
0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54,
0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10,
0x02, 0x12, 0x1a, 0x0a, 0x16, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f,
0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x2a, 0x65, 0x0a,
0x0c, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x19, 0x0a,
0x15, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55,
0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x52, 0x45, 0x53, 0x4f,
0x55, 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52,
0x55, 0x4e, 0x10, 0x01, 0x12, 0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45,
0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x52,
0x55, 0x4e, 0x10, 0x02, 0x2a, 0xde, 0x01, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63,
0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x52, 0x45,
0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50,
0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x52,
0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59,
0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x21, 0x0a, 0x1d,
0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54,
0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12,
0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e,
0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x12,
0x2a, 0x65, 0x0a, 0x0c, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65,
0x12, 0x19, 0x0a, 0x15, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50,
0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x52,
0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x45,
0x50, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x01, 0x12, 0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f, 0x55,
0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f,
0x57, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x02, 0x2a, 0xde, 0x01, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x6f,
0x75, 0x72, 0x63, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a,
0x1b, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f,
0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1f,
0x0a, 0x1b, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54,
0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12,
0x21, 0x0a, 0x1d, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e,
0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x45, 0x44,
0x10, 0x04, 0x12, 0x21, 0x0a, 0x1d, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45,
0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x54, 0x49, 0x4d, 0x45, 0x44, 0x5f,
0x4f, 0x55, 0x54, 0x10, 0x05, 0x32, 0xe4, 0x03, 0x0a, 0x0a, 0x44, 0x69, 0x73, 0x70, 0x61, 0x74,
0x63, 0x68, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72,
0x12, 0x16, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65,
0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65,
0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x06, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x12, 0x14, 0x2e,
0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63,
0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x19, 0x53, 0x75, 0x62, 0x73,
0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44,
0x10, 0x02, 0x12, 0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45,
0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44,
0x10, 0x03, 0x12, 0x21, 0x0a, 0x1d, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45,
0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c,
0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, 0x21, 0x0a, 0x1d, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43,
0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x54, 0x49, 0x4d,
0x45, 0x44, 0x5f, 0x4f, 0x55, 0x54, 0x10, 0x05, 0x32, 0xd1, 0x04, 0x0a, 0x0a, 0x44, 0x69, 0x73,
0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73,
0x74, 0x65, 0x72, 0x12, 0x16, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69,
0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x57, 0x6f,
0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x06, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e,
0x12, 0x14, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65,
0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x12, 0x35, 0x0a, 0x08, 0x4c,
0x69, 0x73, 0x74, 0x65, 0x6e, 0x56, 0x32, 0x12, 0x14, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72,
0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e,
0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00,
0x30, 0x01, 0x12, 0x34, 0x0a, 0x09, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x12,
0x11, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x12, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x52, 0x0a, 0x19, 0x53, 0x75, 0x62, 0x73,
0x63, 0x72, 0x69, 0x62, 0x65, 0x54, 0x6f, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x45,
0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x21, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
0x65, 0x54, 0x6f, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74,
@@ -1518,7 +1628,7 @@ func file_dispatcher_proto_rawDescGZIP() []byte {
}
var file_dispatcher_proto_enumTypes = make([]protoimpl.EnumInfo, 5)
var file_dispatcher_proto_msgTypes = make([]protoimpl.MessageInfo, 13)
var file_dispatcher_proto_msgTypes = make([]protoimpl.MessageInfo, 15)
var file_dispatcher_proto_goTypes = []interface{}{
(ActionType)(0), // 0: ActionType
(GroupKeyActionEventType)(0), // 1: GroupKeyActionEventType
@@ -1538,36 +1648,43 @@ var file_dispatcher_proto_goTypes = []interface{}{
(*WorkflowEvent)(nil), // 15: WorkflowEvent
(*OverridesData)(nil), // 16: OverridesData
(*OverridesDataResponse)(nil), // 17: OverridesDataResponse
(*timestamppb.Timestamp)(nil), // 18: google.protobuf.Timestamp
(*HeartbeatRequest)(nil), // 18: HeartbeatRequest
(*HeartbeatResponse)(nil), // 19: HeartbeatResponse
(*timestamppb.Timestamp)(nil), // 20: google.protobuf.Timestamp
}
var file_dispatcher_proto_depIdxs = []int32{
0, // 0: AssignedAction.actionType:type_name -> ActionType
18, // 1: GroupKeyActionEvent.eventTimestamp:type_name -> google.protobuf.Timestamp
20, // 1: GroupKeyActionEvent.eventTimestamp:type_name -> google.protobuf.Timestamp
1, // 2: GroupKeyActionEvent.eventType:type_name -> GroupKeyActionEventType
18, // 3: StepActionEvent.eventTimestamp:type_name -> google.protobuf.Timestamp
20, // 3: StepActionEvent.eventTimestamp:type_name -> google.protobuf.Timestamp
2, // 4: StepActionEvent.eventType:type_name -> StepActionEventType
3, // 5: WorkflowEvent.resourceType:type_name -> ResourceType
4, // 6: WorkflowEvent.eventType:type_name -> ResourceEventType
18, // 7: WorkflowEvent.eventTimestamp:type_name -> google.protobuf.Timestamp
5, // 8: Dispatcher.Register:input_type -> WorkerRegisterRequest
8, // 9: Dispatcher.Listen:input_type -> WorkerListenRequest
14, // 10: Dispatcher.SubscribeToWorkflowEvents:input_type -> SubscribeToWorkflowEventsRequest
12, // 11: Dispatcher.SendStepActionEvent:input_type -> StepActionEvent
11, // 12: Dispatcher.SendGroupKeyActionEvent:input_type -> GroupKeyActionEvent
16, // 13: Dispatcher.PutOverridesData:input_type -> OverridesData
9, // 14: Dispatcher.Unsubscribe:input_type -> WorkerUnsubscribeRequest
6, // 15: Dispatcher.Register:output_type -> WorkerRegisterResponse
7, // 16: Dispatcher.Listen:output_type -> AssignedAction
15, // 17: Dispatcher.SubscribeToWorkflowEvents:output_type -> WorkflowEvent
13, // 18: Dispatcher.SendStepActionEvent:output_type -> ActionEventResponse
13, // 19: Dispatcher.SendGroupKeyActionEvent:output_type -> ActionEventResponse
17, // 20: Dispatcher.PutOverridesData:output_type -> OverridesDataResponse
10, // 21: Dispatcher.Unsubscribe:output_type -> WorkerUnsubscribeResponse
15, // [15:22] is the sub-list for method output_type
8, // [8:15] is the sub-list for method input_type
8, // [8:8] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension extendee
0, // [0:8] is the sub-list for field type_name
20, // 7: WorkflowEvent.eventTimestamp:type_name -> google.protobuf.Timestamp
20, // 8: HeartbeatRequest.heartbeatAt:type_name -> google.protobuf.Timestamp
5, // 9: Dispatcher.Register:input_type -> WorkerRegisterRequest
8, // 10: Dispatcher.Listen:input_type -> WorkerListenRequest
8, // 11: Dispatcher.ListenV2:input_type -> WorkerListenRequest
18, // 12: Dispatcher.Heartbeat:input_type -> HeartbeatRequest
14, // 13: Dispatcher.SubscribeToWorkflowEvents:input_type -> SubscribeToWorkflowEventsRequest
12, // 14: Dispatcher.SendStepActionEvent:input_type -> StepActionEvent
11, // 15: Dispatcher.SendGroupKeyActionEvent:input_type -> GroupKeyActionEvent
16, // 16: Dispatcher.PutOverridesData:input_type -> OverridesData
9, // 17: Dispatcher.Unsubscribe:input_type -> WorkerUnsubscribeRequest
6, // 18: Dispatcher.Register:output_type -> WorkerRegisterResponse
7, // 19: Dispatcher.Listen:output_type -> AssignedAction
7, // 20: Dispatcher.ListenV2:output_type -> AssignedAction
19, // 21: Dispatcher.Heartbeat:output_type -> HeartbeatResponse
15, // 22: Dispatcher.SubscribeToWorkflowEvents:output_type -> WorkflowEvent
13, // 23: Dispatcher.SendStepActionEvent:output_type -> ActionEventResponse
13, // 24: Dispatcher.SendGroupKeyActionEvent:output_type -> ActionEventResponse
17, // 25: Dispatcher.PutOverridesData:output_type -> OverridesDataResponse
10, // 26: Dispatcher.Unsubscribe:output_type -> WorkerUnsubscribeResponse
18, // [18:27] is the sub-list for method output_type
9, // [9:18] is the sub-list for method input_type
9, // [9:9] is the sub-list for extension type_name
9, // [9:9] is the sub-list for extension extendee
0, // [0:9] is the sub-list for field type_name
}
func init() { file_dispatcher_proto_init() }
@@ -1732,6 +1849,30 @@ func file_dispatcher_proto_init() {
return nil
}
}
file_dispatcher_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HeartbeatRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_dispatcher_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HeartbeatResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_dispatcher_proto_msgTypes[0].OneofWrappers = []interface{}{}
file_dispatcher_proto_msgTypes[10].OneofWrappers = []interface{}{}
@@ -1741,7 +1882,7 @@ func file_dispatcher_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_dispatcher_proto_rawDesc,
NumEnums: 5,
NumMessages: 13,
NumMessages: 15,
NumExtensions: 0,
NumServices: 1,
},

View File

@@ -24,6 +24,11 @@ const _ = grpc.SupportPackageIsVersion7
type DispatcherClient interface {
Register(ctx context.Context, in *WorkerRegisterRequest, opts ...grpc.CallOption) (*WorkerRegisterResponse, error)
Listen(ctx context.Context, in *WorkerListenRequest, opts ...grpc.CallOption) (Dispatcher_ListenClient, error)
// ListenV2 is like listen, but implementation does not include heartbeats. This should only used by SDKs
// against engine version v0.18.1+
ListenV2(ctx context.Context, in *WorkerListenRequest, opts ...grpc.CallOption) (Dispatcher_ListenV2Client, error)
// Heartbeat is a method for workers to send heartbeats to the dispatcher
Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error)
SubscribeToWorkflowEvents(ctx context.Context, in *SubscribeToWorkflowEventsRequest, opts ...grpc.CallOption) (Dispatcher_SubscribeToWorkflowEventsClient, error)
SendStepActionEvent(ctx context.Context, in *StepActionEvent, opts ...grpc.CallOption) (*ActionEventResponse, error)
SendGroupKeyActionEvent(ctx context.Context, in *GroupKeyActionEvent, opts ...grpc.CallOption) (*ActionEventResponse, error)
@@ -80,8 +85,49 @@ func (x *dispatcherListenClient) Recv() (*AssignedAction, error) {
return m, nil
}
func (c *dispatcherClient) ListenV2(ctx context.Context, in *WorkerListenRequest, opts ...grpc.CallOption) (Dispatcher_ListenV2Client, error) {
stream, err := c.cc.NewStream(ctx, &Dispatcher_ServiceDesc.Streams[1], "/Dispatcher/ListenV2", opts...)
if err != nil {
return nil, err
}
x := &dispatcherListenV2Client{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Dispatcher_ListenV2Client interface {
Recv() (*AssignedAction, error)
grpc.ClientStream
}
type dispatcherListenV2Client struct {
grpc.ClientStream
}
func (x *dispatcherListenV2Client) Recv() (*AssignedAction, error) {
m := new(AssignedAction)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *dispatcherClient) Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) {
out := new(HeartbeatResponse)
err := c.cc.Invoke(ctx, "/Dispatcher/Heartbeat", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dispatcherClient) SubscribeToWorkflowEvents(ctx context.Context, in *SubscribeToWorkflowEventsRequest, opts ...grpc.CallOption) (Dispatcher_SubscribeToWorkflowEventsClient, error) {
stream, err := c.cc.NewStream(ctx, &Dispatcher_ServiceDesc.Streams[1], "/Dispatcher/SubscribeToWorkflowEvents", opts...)
stream, err := c.cc.NewStream(ctx, &Dispatcher_ServiceDesc.Streams[2], "/Dispatcher/SubscribeToWorkflowEvents", opts...)
if err != nil {
return nil, err
}
@@ -154,6 +200,11 @@ func (c *dispatcherClient) Unsubscribe(ctx context.Context, in *WorkerUnsubscrib
type DispatcherServer interface {
Register(context.Context, *WorkerRegisterRequest) (*WorkerRegisterResponse, error)
Listen(*WorkerListenRequest, Dispatcher_ListenServer) error
// ListenV2 is like listen, but implementation does not include heartbeats. This should only used by SDKs
// against engine version v0.18.1+
ListenV2(*WorkerListenRequest, Dispatcher_ListenV2Server) error
// Heartbeat is a method for workers to send heartbeats to the dispatcher
Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error)
SubscribeToWorkflowEvents(*SubscribeToWorkflowEventsRequest, Dispatcher_SubscribeToWorkflowEventsServer) error
SendStepActionEvent(context.Context, *StepActionEvent) (*ActionEventResponse, error)
SendGroupKeyActionEvent(context.Context, *GroupKeyActionEvent) (*ActionEventResponse, error)
@@ -172,6 +223,12 @@ func (UnimplementedDispatcherServer) Register(context.Context, *WorkerRegisterRe
func (UnimplementedDispatcherServer) Listen(*WorkerListenRequest, Dispatcher_ListenServer) error {
return status.Errorf(codes.Unimplemented, "method Listen not implemented")
}
func (UnimplementedDispatcherServer) ListenV2(*WorkerListenRequest, Dispatcher_ListenV2Server) error {
return status.Errorf(codes.Unimplemented, "method ListenV2 not implemented")
}
func (UnimplementedDispatcherServer) Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Heartbeat not implemented")
}
func (UnimplementedDispatcherServer) SubscribeToWorkflowEvents(*SubscribeToWorkflowEventsRequest, Dispatcher_SubscribeToWorkflowEventsServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeToWorkflowEvents not implemented")
}
@@ -239,6 +296,45 @@ func (x *dispatcherListenServer) Send(m *AssignedAction) error {
return x.ServerStream.SendMsg(m)
}
func _Dispatcher_ListenV2_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(WorkerListenRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DispatcherServer).ListenV2(m, &dispatcherListenV2Server{stream})
}
type Dispatcher_ListenV2Server interface {
Send(*AssignedAction) error
grpc.ServerStream
}
type dispatcherListenV2Server struct {
grpc.ServerStream
}
func (x *dispatcherListenV2Server) Send(m *AssignedAction) error {
return x.ServerStream.SendMsg(m)
}
func _Dispatcher_Heartbeat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HeartbeatRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DispatcherServer).Heartbeat(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Dispatcher/Heartbeat",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DispatcherServer).Heartbeat(ctx, req.(*HeartbeatRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Dispatcher_SubscribeToWorkflowEvents_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeToWorkflowEventsRequest)
if err := stream.RecvMsg(m); err != nil {
@@ -343,6 +439,10 @@ var Dispatcher_ServiceDesc = grpc.ServiceDesc{
MethodName: "Register",
Handler: _Dispatcher_Register_Handler,
},
{
MethodName: "Heartbeat",
Handler: _Dispatcher_Heartbeat_Handler,
},
{
MethodName: "SendStepActionEvent",
Handler: _Dispatcher_SendStepActionEvent_Handler,
@@ -366,6 +466,11 @@ var Dispatcher_ServiceDesc = grpc.ServiceDesc{
Handler: _Dispatcher_Listen_Handler,
ServerStreams: true,
},
{
StreamName: "ListenV2",
Handler: _Dispatcher_ListenV2_Handler,
ServerStreams: true,
},
{
StreamName: "SubscribeToWorkflowEvents",
Handler: _Dispatcher_SubscribeToWorkflowEvents_Handler,

View File

@@ -258,6 +258,100 @@ func (s *DispatcherImpl) Listen(request *contracts.WorkerListenRequest, stream c
}
}
// ListenV2 is like Listen, but implementation does not include heartbeats. This should only used by SDKs
// against engine version v0.18.1+
func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream contracts.Dispatcher_ListenV2Server) error {
tenant := stream.Context().Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
s.l.Debug().Msgf("Received subscribe request from ID: %s", request.WorkerId)
worker, err := s.repo.Worker().GetWorkerForEngine(tenantId, request.WorkerId)
if err != nil {
s.l.Error().Err(err).Msgf("could not get worker %s", request.WorkerId)
return err
}
// check the worker's dispatcher against the current dispatcher. if they don't match, then update the worker
if worker.DispatcherId.Valid && sqlchelpers.UUIDToStr(worker.DispatcherId) != s.dispatcherId {
_, err = s.repo.Worker().UpdateWorker(tenantId, request.WorkerId, &repository.UpdateWorkerOpts{
DispatcherId: &s.dispatcherId,
})
if err != nil {
s.l.Error().Err(err).Msgf("could not update worker %s dispatcher", request.WorkerId)
return err
}
}
fin := make(chan bool)
s.workers.Store(request.WorkerId, subscribedWorker{stream: stream, finished: fin})
defer func() {
// non-blocking send
select {
case fin <- true:
default:
}
s.workers.Delete(request.WorkerId)
inactive := db.WorkerStatusInactive
_, err := s.repo.Worker().UpdateWorker(tenantId, request.WorkerId, &repository.UpdateWorkerOpts{
Status: &inactive,
})
if err != nil {
s.l.Error().Err(err).Msgf("could not update worker %s status to inactive", request.WorkerId)
}
}()
ctx := stream.Context()
// Keep the connection alive for sending messages
for {
select {
case <-fin:
s.l.Debug().Msgf("closing stream for worker id: %s", request.WorkerId)
return nil
case <-ctx.Done():
s.l.Debug().Msgf("worker id %s has disconnected", request.WorkerId)
return nil
}
}
}
const HeartbeatInterval = 4 * time.Second
// Heartbeat is used to update the last heartbeat time for a worker
func (s *DispatcherImpl) Heartbeat(ctx context.Context, req *contracts.HeartbeatRequest) (*contracts.HeartbeatResponse, error) {
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
heartbeatAt := time.Now().UTC()
s.l.Debug().Msgf("Received heartbeat request from ID: %s", req.WorkerId)
// if heartbeat time is greater than expected heartbeat interval, show a warning
if req.HeartbeatAt.AsTime().Before(heartbeatAt.Add(-1 * HeartbeatInterval)) {
s.l.Warn().Msgf("heartbeat time is greater than expected heartbeat interval")
}
_, err := s.repo.Worker().UpdateWorker(tenantId, req.WorkerId, &repository.UpdateWorkerOpts{
// use the system time for heartbeat
LastHeartbeatAt: &heartbeatAt,
})
if err != nil {
return nil, err
}
return &contracts.HeartbeatResponse{}, nil
}
// SubscribeToWorkflowEvents registers workflow events with the dispatcher
func (s *DispatcherImpl) SubscribeToWorkflowEvents(request *contracts.SubscribeToWorkflowEventsRequest, stream contracts.Dispatcher_SubscribeToWorkflowEventsServer) error {
tenant := stream.Context().Value("tenant").(*dbsqlc.Tenant)

View File

@@ -8,6 +8,8 @@ import (
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
dispatchercontracts "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts"
@@ -145,6 +147,13 @@ func newDispatcher(conn *grpc.ClientConn, opts *sharedClientOpts) DispatcherClie
}
}
type ListenerStrategy string
const (
ListenerStrategyV1 ListenerStrategy = "v1"
ListenerStrategyV2 ListenerStrategy = "v2"
)
type actionListenerImpl struct {
client dispatchercontracts.DispatcherClient
@@ -159,6 +168,8 @@ type actionListenerImpl struct {
v validator.Validator
ctx *contextLoader
listenerStrategy ListenerStrategy
}
func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetActionListenerRequest) (*actionListenerImpl, error) {
@@ -188,7 +199,7 @@ func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetAc
d.l.Debug().Msgf("Registered worker with id: %s", resp.WorkerId)
// subscribe to the worker
listener, err := d.client.Listen(d.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{
listener, err := d.client.ListenV2(d.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{
WorkerId: resp.WorkerId,
})
@@ -197,13 +208,14 @@ func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetAc
}
return &actionListenerImpl{
client: d.client,
listenClient: listener,
workerId: resp.WorkerId,
l: d.l,
v: d.v,
tenantId: d.tenantId,
ctx: d.ctx,
client: d.client,
listenClient: listener,
workerId: resp.WorkerId,
l: d.l,
v: d.v,
tenantId: d.tenantId,
ctx: d.ctx,
listenerStrategy: ListenerStrategyV2,
}, nil
}
@@ -212,6 +224,42 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error
a.l.Debug().Msgf("Starting to listen for actions")
// update the worker with a last heartbeat time every 4 seconds as long as the worker is connected
go func() {
timer := time.NewTicker(100 * time.Millisecond)
defer timer.Stop()
// set last heartbeat to 5 seconds ago so that the first heartbeat is sent immediately
lastHeartbeat := time.Now().Add(-5 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
if now := time.Now().UTC(); lastHeartbeat.Add(4 * time.Second).Before(now) {
a.l.Debug().Msgf("updating worker %s heartbeat", a.workerId)
_, err := a.client.Heartbeat(a.ctx.newContext(ctx), &dispatchercontracts.HeartbeatRequest{
WorkerId: a.workerId,
HeartbeatAt: timestamppb.New(now),
})
if err != nil {
a.l.Error().Err(err).Msgf("could not update worker %s heartbeat", a.workerId)
// if the heartbeat method is unimplemented, don't continue to send heartbeats
if status.Code(err) == codes.Unimplemented {
return
}
}
lastHeartbeat = time.Now().UTC()
}
}
}
}()
go func() {
for {
assignedAction, err := a.listenClient.Recv()
@@ -232,6 +280,12 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error
return
}
// if this is an unimplemented error, default to v1
if a.listenerStrategy == ListenerStrategyV2 && status.Code(err) == codes.Unimplemented {
a.l.Debug().Msgf("Falling back to v1 listener strategy")
a.listenerStrategy = ListenerStrategyV1
}
err = a.retrySubscribe(ctx)
if err != nil {
@@ -287,9 +341,18 @@ func (a *actionListenerImpl) retrySubscribe(ctx context.Context) error {
for retries < DefaultActionListenerRetryCount {
time.Sleep(DefaultActionListenerRetryInterval)
listenClient, err := a.client.Listen(a.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{
WorkerId: a.workerId,
})
var err error
var listenClient dispatchercontracts.Dispatcher_ListenClient
if a.listenerStrategy == ListenerStrategyV1 {
listenClient, err = a.client.Listen(a.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{
WorkerId: a.workerId,
})
} else if a.listenerStrategy == ListenerStrategyV2 {
listenClient, err = a.client.ListenV2(a.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{
WorkerId: a.workerId,
})
}
if err != nil {
retries++

View File

@@ -4,7 +4,7 @@ from dotenv import load_dotenv
load_dotenv()
hatchet = Hatchet()
hatchet = Hatchet(debug=True)
@hatchet.workflow(on_events=["user:create"],schedule_timeout="10m")
class MyWorkflow:

View File

@@ -1,5 +1,6 @@
# relative imports
from ..dispatcher_pb2 import GroupKeyActionEvent, StepActionEvent, ActionEventResponse, ActionType, AssignedAction, WorkerListenRequest, WorkerRegisterRequest, WorkerUnsubscribeRequest, WorkerRegisterResponse, OverridesData
import threading
from ..dispatcher_pb2 import GroupKeyActionEvent, StepActionEvent, ActionEventResponse, ActionType, AssignedAction, WorkerListenRequest, WorkerRegisterRequest, WorkerUnsubscribeRequest, WorkerRegisterResponse, OverridesData, HeartbeatRequest
from ..dispatcher_pb2_grpc import DispatcherStub
import time
@@ -9,6 +10,7 @@ import json
import grpc
from typing import Callable, List, Union
from ..metadata import get_metadata
from .events import proto_timestamp_now
import time
@@ -72,8 +74,43 @@ class ActionListenerImpl(WorkerActionListener):
self.worker_id = worker_id
self.retries = 0
self.last_connection_attempt = 0
# self.logger = logger
# self.validator = validator
self.heartbeat_thread = None
self.run_heartbeat = True
self.listen_strategy = "v2"
def heartbeat(self):
# send a heartbeat every 4 seconds
while True:
if not self.run_heartbeat:
break
try:
self.client.Heartbeat(
HeartbeatRequest(
workerId=self.worker_id,
heartbeatAt=proto_timestamp_now(),
),
timeout=DEFAULT_REGISTER_TIMEOUT,
metadata=get_metadata(self.token),
)
except grpc.RpcError as e:
# we don't reraise the error here, as we don't want to stop the heartbeat thread
logger.error(f"Failed to send heartbeat: {e}")
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
break
time.sleep(4)
def start_heartbeater(self):
if self.heartbeat_thread is not None:
return
# create a new thread to send heartbeats
heartbeat_thread = threading.Thread(target=self.heartbeat)
heartbeat_thread.start()
self.heartbeat_thread = heartbeat_thread
def actions(self):
while True:
@@ -119,6 +156,12 @@ class ActionListenerImpl(WorkerActionListener):
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
logger.info("Deadline exceeded, retrying subscription")
continue
elif self.listen_strategy == "v2" and e.code() == grpc.StatusCode.UNIMPLEMENTED:
# ListenV2 is not available, fallback to Listen
self.listen_strategy = "v1"
self.run_heartbeat = False
logger.info("ListenV2 not available, falling back to Listen")
continue
else:
# Unknown error, report and break
# self.logger.error(f"Failed to receive message: {e}")
@@ -160,13 +203,23 @@ class ActionListenerImpl(WorkerActionListener):
time.sleep(DEFAULT_ACTION_LISTENER_RETRY_INTERVAL)
logger.info(
f"Could not connect to Hatchet, retrying... {self.retries}/{DEFAULT_ACTION_LISTENER_RETRY_COUNT}")
if self.listen_strategy == "v2":
listener = self.client.ListenV2(WorkerListenRequest(
workerId=self.worker_id
),
metadata=get_metadata(self.token),
)
listener = self.client.Listen(WorkerListenRequest(
workerId=self.worker_id
),
timeout=DEFAULT_ACTION_TIMEOUT,
metadata=get_metadata(self.token),
)
self.start_heartbeater()
else:
# if ListenV2 is not available, fallback to Listen
listener = self.client.Listen(WorkerListenRequest(
workerId=self.worker_id
),
timeout=DEFAULT_ACTION_TIMEOUT,
metadata=get_metadata(self.token),
)
self.last_connection_attempt = current_time
@@ -174,6 +227,8 @@ class ActionListenerImpl(WorkerActionListener):
return listener
def unregister(self):
self.run_heartbeat = False
try:
self.client.Unsubscribe(
WorkerUnsubscribeRequest(

File diff suppressed because one or more lines are too long

View File

@@ -223,3 +223,15 @@ class OverridesData(_message.Message):
class OverridesDataResponse(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...
class HeartbeatRequest(_message.Message):
__slots__ = ("workerId", "heartbeatAt")
WORKERID_FIELD_NUMBER: _ClassVar[int]
HEARTBEATAT_FIELD_NUMBER: _ClassVar[int]
workerId: str
heartbeatAt: _timestamp_pb2.Timestamp
def __init__(self, workerId: _Optional[str] = ..., heartbeatAt: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...) -> None: ...
class HeartbeatResponse(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...

View File

@@ -23,6 +23,16 @@ class DispatcherStub(object):
request_serializer=dispatcher__pb2.WorkerListenRequest.SerializeToString,
response_deserializer=dispatcher__pb2.AssignedAction.FromString,
)
self.ListenV2 = channel.unary_stream(
'/Dispatcher/ListenV2',
request_serializer=dispatcher__pb2.WorkerListenRequest.SerializeToString,
response_deserializer=dispatcher__pb2.AssignedAction.FromString,
)
self.Heartbeat = channel.unary_unary(
'/Dispatcher/Heartbeat',
request_serializer=dispatcher__pb2.HeartbeatRequest.SerializeToString,
response_deserializer=dispatcher__pb2.HeartbeatResponse.FromString,
)
self.SubscribeToWorkflowEvents = channel.unary_stream(
'/Dispatcher/SubscribeToWorkflowEvents',
request_serializer=dispatcher__pb2.SubscribeToWorkflowEventsRequest.SerializeToString,
@@ -65,6 +75,21 @@ class DispatcherServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def ListenV2(self, request, context):
"""ListenV2 is like listen, but implementation does not include heartbeats. This should only used by SDKs
against engine version v0.18.1+
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Heartbeat(self, request, context):
"""Heartbeat is a method for workers to send heartbeats to the dispatcher
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def SubscribeToWorkflowEvents(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
@@ -108,6 +133,16 @@ def add_DispatcherServicer_to_server(servicer, server):
request_deserializer=dispatcher__pb2.WorkerListenRequest.FromString,
response_serializer=dispatcher__pb2.AssignedAction.SerializeToString,
),
'ListenV2': grpc.unary_stream_rpc_method_handler(
servicer.ListenV2,
request_deserializer=dispatcher__pb2.WorkerListenRequest.FromString,
response_serializer=dispatcher__pb2.AssignedAction.SerializeToString,
),
'Heartbeat': grpc.unary_unary_rpc_method_handler(
servicer.Heartbeat,
request_deserializer=dispatcher__pb2.HeartbeatRequest.FromString,
response_serializer=dispatcher__pb2.HeartbeatResponse.SerializeToString,
),
'SubscribeToWorkflowEvents': grpc.unary_stream_rpc_method_handler(
servicer.SubscribeToWorkflowEvents,
request_deserializer=dispatcher__pb2.SubscribeToWorkflowEventsRequest.FromString,
@@ -177,6 +212,40 @@ class Dispatcher(object):
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def ListenV2(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(request, target, '/Dispatcher/ListenV2',
dispatcher__pb2.WorkerListenRequest.SerializeToString,
dispatcher__pb2.AssignedAction.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Heartbeat(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/Dispatcher/Heartbeat',
dispatcher__pb2.HeartbeatRequest.SerializeToString,
dispatcher__pb2.HeartbeatResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def SubscribeToWorkflowEvents(request,
target,