feat: streaming events (#309)

* feat: add stream event model

* docs: how to work with db models

* feat: put stream event

* chore: rm comments

* feat: add stream resource type

* feat: enqueue stream event

* fix: contracts

* feat: protos

* chore: set properties correctly for typing

* fix: stream example

* chore: rm old example

* fix: async on

* fix: bytea type

* fix: worker

* feat: put stream data

* feat: stream type

* fix: correct queue

* feat: streaming payloads

* fix: cleanup

* fix: validation

* feat: example file streaming

* chore: rm unused query

* fix: tenant check and read only consumer

* fix: check tenant-steprun relation

* Update prisma/schema.prisma

Co-authored-by: abelanger5 <belanger@sas.upenn.edu>

* chore: generate protos

* chore: rename migration

* release: 0.20.0

* feat(go-sdk): implement streaming in go

---------

Co-authored-by: gabriel ruttner <gabe@hatchet.run>
Co-authored-by: abelanger5 <belanger@sas.upenn.edu>
This commit is contained in:
Gabe Ruttner
2024-04-01 12:46:21 -07:00
committed by GitHub
parent 7b7fbe3668
commit d8b6843dec
49 changed files with 1173 additions and 185 deletions

View File

@@ -103,3 +103,11 @@ Make sure you call `.Disconnect` on the database config object when writing CLI
```
export HATCHET_CLIENT_TOKEN="$(go run ./cmd/hatchet-admin token create --tenant-id <tenant>)"
```
## Working with Database Models
1. Add or modify the model schema in `./prisma/schema.prisma`
2. Create or modify the required SQL queries in `./internal/repository/prisma/dbsqlc`
3. Add new queries files to `./internal/repository/prisma/dbsqlc/sqlc.yaml`
4. Create a new migration file with `task prisma-migrate`
5. Generate Go with `task generate-all`

View File

@@ -203,6 +203,7 @@ enum ResourceEventType {
RESOURCE_EVENT_TYPE_FAILED = 3;
RESOURCE_EVENT_TYPE_CANCELLED = 4;
RESOURCE_EVENT_TYPE_TIMED_OUT = 5;
RESOURCE_EVENT_TYPE_STREAM = 6;
}
message WorkflowEvent {

View File

@@ -10,6 +10,8 @@ service EventsService {
rpc ReplaySingleEvent(ReplayEventRequest) returns (Event) {}
rpc PutLog(PutLogRequest) returns (PutLogResponse) {}
rpc PutStreamEvent(PutStreamEventRequest) returns (PutStreamEventResponse) {}
}
message Event {
@@ -48,6 +50,22 @@ message PutLogRequest {
message PutLogResponse {}
message PutStreamEventRequest {
// the step run id for the request
string stepRunId = 1;
// when the stream event was created
google.protobuf.Timestamp createdAt = 2;
// the stream event message
bytes message = 3;
// associated stream event metadata
string metadata = 5;
}
message PutStreamEventResponse {}
message PushEventRequest {
// the key for the event
string key = 1;

View File

@@ -201,6 +201,9 @@ func Run(ctx context.Context, cf *loader.ConfigLoader) error {
ingestor.WithEventRepository(
sc.EngineRepository.Event(),
),
ingestor.WithStreamEventsRepository(
sc.EngineRepository.StreamEvent(),
),
ingestor.WithLogRepository(
sc.EngineRepository.Log(),
),

View File

@@ -58,7 +58,7 @@ func run(ch <-chan interface{}, events chan<- string) error {
interruptCtx, cancel := cmdutils.InterruptContextFromChan(ch)
defer cancel()
err = c.Run().On(interruptCtx, workflowRunId, func(event client.RunEvent) error {
err = c.Subscribe().On(interruptCtx, workflowRunId, func(event client.RunEvent) error {
fmt.Println(event.EventPayload)
return nil

View File

@@ -0,0 +1,102 @@
package main
import (
"fmt"
"time"
"github.com/joho/godotenv"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
type streamEventInput struct {
Index int `json:"index"`
}
type stepOneOutput struct {
Message string `json:"message"`
}
func StepOne(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
input := &streamEventInput{}
err = ctx.WorkflowInput(input)
if err != nil {
return nil, err
}
ctx.StreamEvent([]byte(fmt.Sprintf("This is a stream event %d", input.Index)))
return &stepOneOutput{
Message: fmt.Sprintf("This ran at %s", time.Now().String()),
}, nil
}
func main() {
err := godotenv.Load()
if err != nil {
panic(err)
}
c, err := client.New()
if err != nil {
panic(err)
}
w, err := worker.NewWorker(
worker.WithClient(
c,
),
)
if err != nil {
panic(err)
}
err = w.On(
worker.NoTrigger(),
&worker.WorkflowJob{
Name: "stream-event-workflow",
Description: "This sends a stream event.",
Steps: []*worker.WorkflowStep{
worker.Fn(StepOne).SetName("step-one"),
},
},
)
if err != nil {
panic(err)
}
interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan())
defer cancel()
_, err = w.Start()
if err != nil {
panic(fmt.Errorf("error cleaning up: %w", err))
}
workflowRunId, err := c.Admin().RunWorkflow("stream-event-workflow", &streamEventInput{
Index: 0,
})
if err != nil {
panic(err)
}
err = c.Subscribe().Stream(interruptCtx, workflowRunId, func(event client.StreamEvent) error {
fmt.Println(string(event.Message))
return nil
})
if err != nil {
panic(err)
}
}

View File

@@ -192,6 +192,7 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF
ingestor, err := ingestor.NewIngestor(
ingestor.WithEventRepository(dc.EngineRepository.Event()),
ingestor.WithStreamEventsRepository(dc.EngineRepository.StreamEvent()),
ingestor.WithLogRepository(dc.EngineRepository.Log()),
ingestor.WithMessageQueue(mq),
)

View File

@@ -6,14 +6,18 @@ INSERT INTO "LogLine" (
"message",
"level",
"metadata"
) VALUES (
)
SELECT
coalesce(sqlc.narg('createdAt')::timestamp, now()),
@tenantId::uuid,
@stepRunId::uuid,
@message::text,
coalesce(sqlc.narg('level')::"LogLineLevel", 'INFO'::"LogLineLevel"),
coalesce(sqlc.narg('metadata')::jsonb, '{}'::jsonb)
) RETURNING *;
FROM "StepRun"
WHERE "StepRun"."id" = @stepRunId::uuid
AND "StepRun"."tenantId" = @tenantId::uuid
RETURNING *;
-- name: ListLogLines :many
SELECT * FROM "LogLine"

View File

@@ -48,14 +48,18 @@ INSERT INTO "LogLine" (
"message",
"level",
"metadata"
) VALUES (
)
SELECT
coalesce($1::timestamp, now()),
$2::uuid,
$3::uuid,
$4::text,
coalesce($5::"LogLineLevel", 'INFO'::"LogLineLevel"),
coalesce($6::jsonb, '{}'::jsonb)
) RETURNING id, "createdAt", "tenantId", "stepRunId", message, level, metadata
FROM "StepRun"
WHERE "StepRun"."id" = $3::uuid
AND "StepRun"."tenantId" = $2::uuid
RETURNING id, "createdAt", "tenantId", "stepRunId", message, level, metadata
`
type CreateLogLineParams struct {

View File

@@ -696,6 +696,15 @@ type StepRunResultArchive struct {
CancelledError pgtype.Text `json:"cancelledError"`
}
type StreamEvent struct {
ID int64 `json:"id"`
CreatedAt pgtype.Timestamp `json:"createdAt"`
TenantId pgtype.UUID `json:"tenantId"`
StepRunId pgtype.UUID `json:"stepRunId"`
Message []byte `json:"message"`
Metadata []byte `json:"metadata"`
}
type Tenant struct {
ID pgtype.UUID `json:"id"`
CreatedAt pgtype.Timestamp `json:"createdAt"`

View File

@@ -338,6 +338,18 @@ CREATE TABLE "StepRunResultArchive" (
CONSTRAINT "StepRunResultArchive_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "StreamEvent" (
"id" BIGSERIAL NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"tenantId" UUID NOT NULL,
"stepRunId" UUID,
"message" BYTEA NOT NULL,
"metadata" JSONB,
CONSTRAINT "StreamEvent_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "Tenant" (
"id" UUID NOT NULL,
@@ -1023,6 +1035,12 @@ ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_workerId_fkey" FOREIGN KEY ("worke
-- AddForeignKey
ALTER TABLE "StepRunResultArchive" ADD CONSTRAINT "StepRunResultArchive_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "TenantInviteLink" ADD CONSTRAINT "TenantInviteLink_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -15,6 +15,7 @@ sql:
- tickers.sql
- dispatchers.sql
- workers.sql
- stream_event.sql
- logs.sql
- tenants.sql
schema:

View File

@@ -0,0 +1,30 @@
-- name: CreateStreamEvent :one
INSERT INTO "StreamEvent" (
"createdAt",
"tenantId",
"stepRunId",
"message",
"metadata"
)
SELECT
coalesce(sqlc.narg('createdAt')::timestamp, now()),
@tenantId::uuid,
@stepRunId::uuid,
@message::bytea,
coalesce(sqlc.narg('metadata')::jsonb, '{}'::jsonb)
FROM "StepRun"
WHERE "StepRun"."id" = @stepRunId::uuid
AND "StepRun"."tenantId" = @tenantId::uuid
RETURNING *;
-- name: GetStreamEvent :one
SELECT * FROM "StreamEvent"
WHERE
"tenantId" = @tenantId::uuid AND
"id" = @id::bigint;
-- name: CleanupStreamEvents :exec
DELETE FROM "StreamEvent"
WHERE
-- older than than 5 minutes ago
"createdAt" < NOW() - INTERVAL '5 minutes';

View File

@@ -0,0 +1,98 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.24.0
// source: stream_event.sql
package dbsqlc
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const cleanupStreamEvents = `-- name: CleanupStreamEvents :exec
DELETE FROM "StreamEvent"
WHERE
-- older than than 5 minutes ago
"createdAt" < NOW() - INTERVAL '5 minutes'
`
func (q *Queries) CleanupStreamEvents(ctx context.Context, db DBTX) error {
_, err := db.Exec(ctx, cleanupStreamEvents)
return err
}
const createStreamEvent = `-- name: CreateStreamEvent :one
INSERT INTO "StreamEvent" (
"createdAt",
"tenantId",
"stepRunId",
"message",
"metadata"
)
SELECT
coalesce($1::timestamp, now()),
$2::uuid,
$3::uuid,
$4::bytea,
coalesce($5::jsonb, '{}'::jsonb)
FROM "StepRun"
WHERE "StepRun"."id" = $3::uuid
AND "StepRun"."tenantId" = $2::uuid
RETURNING id, "createdAt", "tenantId", "stepRunId", message, metadata
`
type CreateStreamEventParams struct {
CreatedAt pgtype.Timestamp `json:"createdAt"`
Tenantid pgtype.UUID `json:"tenantid"`
Steprunid pgtype.UUID `json:"steprunid"`
Message []byte `json:"message"`
Metadata []byte `json:"metadata"`
}
func (q *Queries) CreateStreamEvent(ctx context.Context, db DBTX, arg CreateStreamEventParams) (*StreamEvent, error) {
row := db.QueryRow(ctx, createStreamEvent,
arg.CreatedAt,
arg.Tenantid,
arg.Steprunid,
arg.Message,
arg.Metadata,
)
var i StreamEvent
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.TenantId,
&i.StepRunId,
&i.Message,
&i.Metadata,
)
return &i, err
}
const getStreamEvent = `-- name: GetStreamEvent :one
SELECT id, "createdAt", "tenantId", "stepRunId", message, metadata FROM "StreamEvent"
WHERE
"tenantId" = $1::uuid AND
"id" = $2::bigint
`
type GetStreamEventParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
ID int64 `json:"id"`
}
func (q *Queries) GetStreamEvent(ctx context.Context, db DBTX, arg GetStreamEventParams) (*StreamEvent, error) {
row := db.QueryRow(ctx, getStreamEvent, arg.Tenantid, arg.ID)
var i StreamEvent
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.TenantId,
&i.StepRunId,
&i.Message,
&i.Metadata,
)
return &i, err
}

View File

@@ -174,6 +174,7 @@ type engineRepository struct {
worker repository.WorkerEngineRepository
workflow repository.WorkflowEngineRepository
workflowRun repository.WorkflowRunEngineRepository
streamEvent repository.StreamEventsEngineRepository
log repository.LogsEngineRepository
}
@@ -225,6 +226,10 @@ func (r *engineRepository) WorkflowRun() repository.WorkflowRunEngineRepository
return r.workflowRun
}
func (r *engineRepository) StreamEvent() repository.StreamEventsEngineRepository {
return r.streamEvent
}
func (r *engineRepository) Log() repository.LogsEngineRepository {
return r.log
}
@@ -256,6 +261,7 @@ func NewEngineRepository(pool *pgxpool.Pool, fs ...PrismaRepositoryOpt) reposito
worker: NewWorkerEngineRepository(pool, opts.v, opts.l),
workflow: NewWorkflowEngineRepository(pool, opts.v, opts.l),
workflowRun: NewWorkflowRunEngineRepository(pool, opts.v, opts.l),
streamEvent: NewStreamEventsEngineRepository(pool, opts.v, opts.l),
log: NewLogEngineRepository(pool, opts.v, opts.l),
}
}

View File

@@ -0,0 +1,128 @@
package prisma
import (
"context"
"errors"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
"github.com/hatchet-dev/hatchet/internal/repository"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/dbsqlc"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/sqlchelpers"
"github.com/hatchet-dev/hatchet/internal/validator"
)
type streamEventEngineRepository struct {
pool *pgxpool.Pool
v validator.Validator
queries *dbsqlc.Queries
l *zerolog.Logger
}
func NewStreamEventsEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger) repository.StreamEventsEngineRepository {
queries := dbsqlc.New()
return &streamEventEngineRepository{
pool: pool,
v: v,
queries: queries,
l: l,
}
}
func (r *streamEventEngineRepository) PutStreamEvent(tenantId string, opts *repository.CreateStreamEventOpts) (*dbsqlc.StreamEvent, error) {
if err := r.v.Validate(opts); err != nil {
return nil, err
}
createParams := dbsqlc.CreateStreamEventParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Message: opts.Message,
Steprunid: sqlchelpers.UUIDFromStr(opts.StepRunId),
}
if opts.CreatedAt != nil {
utcTime := opts.CreatedAt.UTC()
createParams.CreatedAt = sqlchelpers.TimestampFromTime(utcTime)
}
if opts.Metadata != nil {
createParams.Metadata = opts.Metadata
}
tx, err := r.pool.Begin(context.Background())
if err != nil {
return nil, err
}
defer deferRollback(context.Background(), r.l, tx.Rollback)
streamEvent, err := r.queries.CreateStreamEvent(
context.Background(),
tx,
createParams,
)
if err != nil {
return nil, fmt.Errorf("could not create stream event: %w", err)
}
err = tx.Commit(context.Background())
if err != nil {
return nil, fmt.Errorf("could not commit transaction: %w", err)
}
return streamEvent, nil
}
func (r *streamEventEngineRepository) GetStreamEvent(tenantId string, streamEventId int64) (*dbsqlc.StreamEvent, error) {
pgTenantId := sqlchelpers.UUIDFromStr(tenantId)
tx, err := r.pool.Begin(context.Background())
if err != nil {
return nil, err
}
defer deferRollback(context.Background(), r.l, tx.Rollback)
streamEvent, err := r.queries.GetStreamEvent(context.Background(), tx, dbsqlc.GetStreamEventParams{
ID: streamEventId,
Tenantid: pgTenantId,
})
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, fmt.Errorf("stream event not found")
}
return nil, fmt.Errorf("could not get stream event: %w", err)
}
err = tx.Commit(context.Background())
if err != nil {
return nil, fmt.Errorf("could not commit transaction: %w", err)
}
return streamEvent, nil
}
func (r *streamEventEngineRepository) CleanupStreamEvents() error {
tx, err := r.pool.Begin(context.Background())
if err != nil {
return err
}
defer deferRollback(context.Background(), r.l, tx.Rollback)
err = r.queries.CleanupStreamEvents(context.Background(), r.pool)
if err != nil {
return fmt.Errorf("could not cleanup stream events: %w", err)
}
return nil
}

View File

@@ -32,6 +32,7 @@ type EngineRepository interface {
Worker() WorkerEngineRepository
Workflow() WorkflowEngineRepository
WorkflowRun() WorkflowRunEngineRepository
StreamEvent() StreamEventsEngineRepository
Log() LogsEngineRepository
}

View File

@@ -0,0 +1,32 @@
package repository
import (
"time"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/dbsqlc"
)
type CreateStreamEventOpts struct {
// The step run id
StepRunId string `validate:"required,uuid"`
// (optional) The time when the StreamEvent was created.
CreatedAt *time.Time
// (required) The message of the Stream Event.
Message []byte `validate:"required,min=1"`
// (optional) The metadata of the Stream Event.
Metadata []byte
}
type StreamEventsEngineRepository interface {
// PutStreamEvent creates a new StreamEvent line.
PutStreamEvent(tenantId string, opts *CreateStreamEventOpts) (*dbsqlc.StreamEvent, error)
// GetStreamEvent returns a StreamEvent line by id.
GetStreamEvent(tenantId string, streamEventId int64) (*dbsqlc.StreamEvent, error)
// CleanupStreamEvents deletes all stale StreamEvents.
CleanupStreamEvents() error
}

View File

@@ -217,7 +217,6 @@ func (ec *JobsControllerImpl) handleTask(ctx context.Context, task *msgqueue.Mes
case "step-run-timed-out":
return ec.handleStepRunTimedOut(ctx, task)
}
return fmt.Errorf("unknown task: %s", task.ID)
}

View File

@@ -232,6 +232,7 @@ const (
ResourceEventType_RESOURCE_EVENT_TYPE_FAILED ResourceEventType = 3
ResourceEventType_RESOURCE_EVENT_TYPE_CANCELLED ResourceEventType = 4
ResourceEventType_RESOURCE_EVENT_TYPE_TIMED_OUT ResourceEventType = 5
ResourceEventType_RESOURCE_EVENT_TYPE_STREAM ResourceEventType = 6
)
// Enum value maps for ResourceEventType.
@@ -243,6 +244,7 @@ var (
3: "RESOURCE_EVENT_TYPE_FAILED",
4: "RESOURCE_EVENT_TYPE_CANCELLED",
5: "RESOURCE_EVENT_TYPE_TIMED_OUT",
6: "RESOURCE_EVENT_TYPE_STREAM",
}
ResourceEventType_value = map[string]int32{
"RESOURCE_EVENT_TYPE_UNKNOWN": 0,
@@ -251,6 +253,7 @@ var (
"RESOURCE_EVENT_TYPE_FAILED": 3,
"RESOURCE_EVENT_TYPE_CANCELLED": 4,
"RESOURCE_EVENT_TYPE_TIMED_OUT": 5,
"RESOURCE_EVENT_TYPE_STREAM": 6,
}
)
@@ -1556,7 +1559,7 @@ var file_dispatcher_proto_rawDesc = []byte{
0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x45,
0x50, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x01, 0x12, 0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f, 0x55,
0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f,
0x57, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x02, 0x2a, 0xde, 0x01, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x6f,
0x57, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x02, 0x2a, 0xfe, 0x01, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x6f,
0x75, 0x72, 0x63, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a,
0x1b, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f,
0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1f,
@@ -1570,7 +1573,9 @@ var file_dispatcher_proto_rawDesc = []byte{
0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c,
0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, 0x21, 0x0a, 0x1d, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43,
0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x54, 0x49, 0x4d,
0x45, 0x44, 0x5f, 0x4f, 0x55, 0x54, 0x10, 0x05, 0x32, 0xd1, 0x04, 0x0a, 0x0a, 0x44, 0x69, 0x73,
0x45, 0x44, 0x5f, 0x4f, 0x55, 0x54, 0x10, 0x05, 0x12, 0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f,
0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f,
0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x10, 0x06, 0x32, 0xd1, 0x04, 0x0a, 0x0a, 0x44, 0x69, 0x73,
0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73,
0x74, 0x65, 0x72, 0x12, 0x16, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69,
0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x57, 0x6f,

View File

@@ -3,6 +3,7 @@ package dispatcher
import (
"context"
"fmt"
"strconv"
"sync"
"time"
@@ -373,6 +374,7 @@ func (s *DispatcherImpl) SubscribeToWorkflowEvents(request *contracts.SubscribeT
f := func(task *msgqueue.Message) error {
wg.Add(1)
defer wg.Done()
e, err := s.tenantTaskToWorkflowEvent(task, tenantId, request.WorkflowRunId)
if err != nil {
@@ -466,18 +468,6 @@ func (s *DispatcherImpl) PutOverridesData(ctx context.Context, request *contract
return nil, err
}
// jsonSchemaBytes, err := schema.SchemaBytesFromBytes(input)
// if err != nil {
// return nil, err
// }
// _, err = s.repo.StepRun().UpdateStepRunInputSchema(tenantId, request.StepRunId, jsonSchemaBytes)
// if err != nil {
// return nil, err
// }
return &contracts.OverridesDataResponse{}, nil
}
@@ -747,6 +737,11 @@ func (s *DispatcherImpl) tenantTaskToWorkflowEvent(task *msgqueue.Message, tenan
workflowEvent.ResourceType = contracts.ResourceType_RESOURCE_TYPE_STEP_RUN
workflowEvent.ResourceId = stepRunId
workflowEvent.EventType = contracts.ResourceEventType_RESOURCE_EVENT_TYPE_TIMED_OUT
case "step-run-stream-event":
stepRunId = task.Payload["step_run_id"].(string)
workflowEvent.ResourceType = contracts.ResourceType_RESOURCE_TYPE_STEP_RUN
workflowEvent.ResourceId = stepRunId
workflowEvent.EventType = contracts.ResourceEventType_RESOURCE_EVENT_TYPE_STREAM
case "workflow-run-finished":
workflowRunId := task.Payload["workflow_run_id"].(string)
workflowEvent.ResourceType = contracts.ResourceType_RESOURCE_TYPE_WORKFLOW_RUN
@@ -768,11 +763,24 @@ func (s *DispatcherImpl) tenantTaskToWorkflowEvent(task *msgqueue.Message, tenan
return nil, nil
}
unquoted := workflowEvent.EventPayload
workflowEvent.EventPayload = unquoted
workflowEvent.StepRetries = &stepRun.StepRetries
workflowEvent.RetryCount = &stepRun.StepRun.RetryCount
if workflowEvent.EventType == contracts.ResourceEventType_RESOURCE_EVENT_TYPE_STREAM {
streamEventId, err := strconv.ParseInt(task.Metadata["stream_event_id"].(string), 10, 64)
if err != nil {
return nil, err
}
streamEvent, err := s.repo.StreamEvent().GetStreamEvent(tenantId, streamEventId)
if err != nil {
return nil, err
}
workflowEvent.EventPayload = string(streamEvent.Message)
}
} else if workflowEvent.ResourceType == contracts.ResourceType_RESOURCE_TYPE_WORKFLOW_RUN {
if workflowEvent.ResourceId != workflowRunId {
return nil, nil

View File

@@ -227,6 +227,119 @@ func (*PutLogResponse) Descriptor() ([]byte, []int) {
return file_events_proto_rawDescGZIP(), []int{2}
}
type PutStreamEventRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// the step run id for the request
StepRunId string `protobuf:"bytes,1,opt,name=stepRunId,proto3" json:"stepRunId,omitempty"`
// when the stream event was created
CreatedAt *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=createdAt,proto3" json:"createdAt,omitempty"`
// the stream event message
Message []byte `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
// associated stream event metadata
Metadata string `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"`
}
func (x *PutStreamEventRequest) Reset() {
*x = PutStreamEventRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_events_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PutStreamEventRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PutStreamEventRequest) ProtoMessage() {}
func (x *PutStreamEventRequest) ProtoReflect() protoreflect.Message {
mi := &file_events_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PutStreamEventRequest.ProtoReflect.Descriptor instead.
func (*PutStreamEventRequest) Descriptor() ([]byte, []int) {
return file_events_proto_rawDescGZIP(), []int{3}
}
func (x *PutStreamEventRequest) GetStepRunId() string {
if x != nil {
return x.StepRunId
}
return ""
}
func (x *PutStreamEventRequest) GetCreatedAt() *timestamppb.Timestamp {
if x != nil {
return x.CreatedAt
}
return nil
}
func (x *PutStreamEventRequest) GetMessage() []byte {
if x != nil {
return x.Message
}
return nil
}
func (x *PutStreamEventRequest) GetMetadata() string {
if x != nil {
return x.Metadata
}
return ""
}
type PutStreamEventResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *PutStreamEventResponse) Reset() {
*x = PutStreamEventResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_events_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PutStreamEventResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PutStreamEventResponse) ProtoMessage() {}
func (x *PutStreamEventResponse) ProtoReflect() protoreflect.Message {
mi := &file_events_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PutStreamEventResponse.ProtoReflect.Descriptor instead.
func (*PutStreamEventResponse) Descriptor() ([]byte, []int) {
return file_events_proto_rawDescGZIP(), []int{4}
}
type PushEventRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -243,7 +356,7 @@ type PushEventRequest struct {
func (x *PushEventRequest) Reset() {
*x = PushEventRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_events_proto_msgTypes[3]
mi := &file_events_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -256,7 +369,7 @@ func (x *PushEventRequest) String() string {
func (*PushEventRequest) ProtoMessage() {}
func (x *PushEventRequest) ProtoReflect() protoreflect.Message {
mi := &file_events_proto_msgTypes[3]
mi := &file_events_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -269,7 +382,7 @@ func (x *PushEventRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use PushEventRequest.ProtoReflect.Descriptor instead.
func (*PushEventRequest) Descriptor() ([]byte, []int) {
return file_events_proto_rawDescGZIP(), []int{3}
return file_events_proto_rawDescGZIP(), []int{5}
}
func (x *PushEventRequest) GetKey() string {
@@ -305,7 +418,7 @@ type ReplayEventRequest struct {
func (x *ReplayEventRequest) Reset() {
*x = ReplayEventRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_events_proto_msgTypes[4]
mi := &file_events_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -318,7 +431,7 @@ func (x *ReplayEventRequest) String() string {
func (*ReplayEventRequest) ProtoMessage() {}
func (x *ReplayEventRequest) ProtoReflect() protoreflect.Message {
mi := &file_events_proto_msgTypes[4]
mi := &file_events_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -331,7 +444,7 @@ func (x *ReplayEventRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use ReplayEventRequest.ProtoReflect.Descriptor instead.
func (*ReplayEventRequest) Descriptor() ([]byte, []int) {
return file_events_proto_rawDescGZIP(), []int{4}
return file_events_proto_rawDescGZIP(), []int{6}
}
func (x *ReplayEventRequest) GetEventId() string {
@@ -371,33 +484,49 @@ var file_events_proto_rawDesc = []byte{
0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09,
0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x6c,
0x65, 0x76, 0x65, 0x6c, 0x22, 0x10, 0x0a, 0x0e, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x82, 0x01, 0x0a, 0x10, 0x50, 0x75, 0x73, 0x68, 0x45,
0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b,
0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x18, 0x0a,
0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x42, 0x0a, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x65, 0x76, 0x65,
0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x2e, 0x0a, 0x12, 0x52,
0x65, 0x70, 0x6c, 0x61, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x32, 0x95, 0x01, 0x0a, 0x0d,
0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x23, 0x0a,
0x04, 0x50, 0x75, 0x73, 0x68, 0x12, 0x11, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x45, 0x76, 0x65, 0x6e,
0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x06, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74,
0x22, 0x00, 0x12, 0x32, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x53, 0x69, 0x6e, 0x67,
0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x13, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79,
0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x06, 0x2e, 0x45,
0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x12, 0x2b, 0x0a, 0x06, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67,
0x12, 0x0e, 0x2e, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x0f, 0x2e, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x22, 0x00, 0x42, 0x47, 0x5a, 0x45, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
0x6d, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2d, 0x64, 0x65, 0x76, 0x2f, 0x68, 0x61,
0x74, 0x63, 0x68, 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x73,
0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68,
0x65, 0x72, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xa5, 0x01, 0x0a, 0x15, 0x50, 0x75, 0x74, 0x53, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x1c, 0x0a, 0x09, 0x73, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x38,
0x0a, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x63,
0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x18,
0x0a, 0x16, 0x50, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x82, 0x01, 0x0a, 0x10, 0x50, 0x75, 0x73,
0x68, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a,
0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12,
0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x42, 0x0a, 0x0e, 0x65, 0x76, 0x65,
0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x65,
0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x2e, 0x0a,
0x12, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x32, 0xda, 0x01,
0x0a, 0x0d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12,
0x23, 0x0a, 0x04, 0x50, 0x75, 0x73, 0x68, 0x12, 0x11, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x45, 0x76,
0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x06, 0x2e, 0x45, 0x76, 0x65,
0x6e, 0x74, 0x22, 0x00, 0x12, 0x32, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x53, 0x69,
0x6e, 0x67, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x13, 0x2e, 0x52, 0x65, 0x70, 0x6c,
0x61, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x06,
0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x12, 0x2b, 0x0a, 0x06, 0x50, 0x75, 0x74, 0x4c,
0x6f, 0x67, 0x12, 0x0e, 0x2e, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x43, 0x0a, 0x0e, 0x50, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x16, 0x2e, 0x50, 0x75, 0x74, 0x53, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x17, 0x2e, 0x50, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x47, 0x5a, 0x45, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74,
0x2d, 0x64, 0x65, 0x76, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74,
0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x64,
0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61,
0x63, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -412,30 +541,35 @@ func file_events_proto_rawDescGZIP() []byte {
return file_events_proto_rawDescData
}
var file_events_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_events_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_events_proto_goTypes = []interface{}{
(*Event)(nil), // 0: Event
(*PutLogRequest)(nil), // 1: PutLogRequest
(*PutLogResponse)(nil), // 2: PutLogResponse
(*PushEventRequest)(nil), // 3: PushEventRequest
(*ReplayEventRequest)(nil), // 4: ReplayEventRequest
(*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp
(*Event)(nil), // 0: Event
(*PutLogRequest)(nil), // 1: PutLogRequest
(*PutLogResponse)(nil), // 2: PutLogResponse
(*PutStreamEventRequest)(nil), // 3: PutStreamEventRequest
(*PutStreamEventResponse)(nil), // 4: PutStreamEventResponse
(*PushEventRequest)(nil), // 5: PushEventRequest
(*ReplayEventRequest)(nil), // 6: ReplayEventRequest
(*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
}
var file_events_proto_depIdxs = []int32{
5, // 0: Event.eventTimestamp:type_name -> google.protobuf.Timestamp
5, // 1: PutLogRequest.createdAt:type_name -> google.protobuf.Timestamp
5, // 2: PushEventRequest.eventTimestamp:type_name -> google.protobuf.Timestamp
3, // 3: EventsService.Push:input_type -> PushEventRequest
4, // 4: EventsService.ReplaySingleEvent:input_type -> ReplayEventRequest
1, // 5: EventsService.PutLog:input_type -> PutLogRequest
0, // 6: EventsService.Push:output_type -> Event
0, // 7: EventsService.ReplaySingleEvent:output_type -> Event
2, // 8: EventsService.PutLog:output_type -> PutLogResponse
6, // [6:9] is the sub-list for method output_type
3, // [3:6] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
7, // 0: Event.eventTimestamp:type_name -> google.protobuf.Timestamp
7, // 1: PutLogRequest.createdAt:type_name -> google.protobuf.Timestamp
7, // 2: PutStreamEventRequest.createdAt:type_name -> google.protobuf.Timestamp
7, // 3: PushEventRequest.eventTimestamp:type_name -> google.protobuf.Timestamp
5, // 4: EventsService.Push:input_type -> PushEventRequest
6, // 5: EventsService.ReplaySingleEvent:input_type -> ReplayEventRequest
1, // 6: EventsService.PutLog:input_type -> PutLogRequest
3, // 7: EventsService.PutStreamEvent:input_type -> PutStreamEventRequest
0, // 8: EventsService.Push:output_type -> Event
0, // 9: EventsService.ReplaySingleEvent:output_type -> Event
2, // 10: EventsService.PutLog:output_type -> PutLogResponse
4, // 11: EventsService.PutStreamEvent:output_type -> PutStreamEventResponse
8, // [8:12] is the sub-list for method output_type
4, // [4:8] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_events_proto_init() }
@@ -481,7 +615,7 @@ func file_events_proto_init() {
}
}
file_events_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PushEventRequest); i {
switch v := v.(*PutStreamEventRequest); i {
case 0:
return &v.state
case 1:
@@ -493,6 +627,30 @@ func file_events_proto_init() {
}
}
file_events_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PutStreamEventResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_events_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PushEventRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_events_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplayEventRequest); i {
case 0:
return &v.state
@@ -512,7 +670,7 @@ func file_events_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_events_proto_rawDesc,
NumEnums: 0,
NumMessages: 5,
NumMessages: 7,
NumExtensions: 0,
NumServices: 1,
},

View File

@@ -25,6 +25,7 @@ type EventsServiceClient interface {
Push(ctx context.Context, in *PushEventRequest, opts ...grpc.CallOption) (*Event, error)
ReplaySingleEvent(ctx context.Context, in *ReplayEventRequest, opts ...grpc.CallOption) (*Event, error)
PutLog(ctx context.Context, in *PutLogRequest, opts ...grpc.CallOption) (*PutLogResponse, error)
PutStreamEvent(ctx context.Context, in *PutStreamEventRequest, opts ...grpc.CallOption) (*PutStreamEventResponse, error)
}
type eventsServiceClient struct {
@@ -62,6 +63,15 @@ func (c *eventsServiceClient) PutLog(ctx context.Context, in *PutLogRequest, opt
return out, nil
}
func (c *eventsServiceClient) PutStreamEvent(ctx context.Context, in *PutStreamEventRequest, opts ...grpc.CallOption) (*PutStreamEventResponse, error) {
out := new(PutStreamEventResponse)
err := c.cc.Invoke(ctx, "/EventsService/PutStreamEvent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// EventsServiceServer is the server API for EventsService service.
// All implementations must embed UnimplementedEventsServiceServer
// for forward compatibility
@@ -69,6 +79,7 @@ type EventsServiceServer interface {
Push(context.Context, *PushEventRequest) (*Event, error)
ReplaySingleEvent(context.Context, *ReplayEventRequest) (*Event, error)
PutLog(context.Context, *PutLogRequest) (*PutLogResponse, error)
PutStreamEvent(context.Context, *PutStreamEventRequest) (*PutStreamEventResponse, error)
mustEmbedUnimplementedEventsServiceServer()
}
@@ -85,6 +96,9 @@ func (UnimplementedEventsServiceServer) ReplaySingleEvent(context.Context, *Repl
func (UnimplementedEventsServiceServer) PutLog(context.Context, *PutLogRequest) (*PutLogResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method PutLog not implemented")
}
func (UnimplementedEventsServiceServer) PutStreamEvent(context.Context, *PutStreamEventRequest) (*PutStreamEventResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method PutStreamEvent not implemented")
}
func (UnimplementedEventsServiceServer) mustEmbedUnimplementedEventsServiceServer() {}
// UnsafeEventsServiceServer may be embedded to opt out of forward compatibility for this service.
@@ -152,6 +166,24 @@ func _EventsService_PutLog_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler)
}
func _EventsService_PutStreamEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PutStreamEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(EventsServiceServer).PutStreamEvent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/EventsService/PutStreamEvent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(EventsServiceServer).PutStreamEvent(ctx, req.(*PutStreamEventRequest))
}
return interceptor(ctx, in, info, handler)
}
// EventsService_ServiceDesc is the grpc.ServiceDesc for EventsService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -171,6 +203,10 @@ var EventsService_ServiceDesc = grpc.ServiceDesc{
MethodName: "PutLog",
Handler: _EventsService_PutLog_Handler,
},
{
MethodName: "PutStreamEvent",
Handler: _EventsService_PutStreamEvent_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "events.proto",

View File

@@ -23,9 +23,10 @@ type Ingestor interface {
type IngestorOptFunc func(*IngestorOpts)
type IngestorOpts struct {
eventRepository repository.EventEngineRepository
logRepository repository.LogsEngineRepository
mq msgqueue.MessageQueue
eventRepository repository.EventEngineRepository
streamEventRepository repository.StreamEventsEngineRepository
logRepository repository.LogsEngineRepository
mq msgqueue.MessageQueue
}
func WithEventRepository(r repository.EventEngineRepository) IngestorOptFunc {
@@ -34,6 +35,12 @@ func WithEventRepository(r repository.EventEngineRepository) IngestorOptFunc {
}
}
func WithStreamEventsRepository(r repository.StreamEventsEngineRepository) IngestorOptFunc {
return func(opts *IngestorOpts) {
opts.streamEventRepository = r
}
}
func WithLogRepository(r repository.LogsEngineRepository) IngestorOptFunc {
return func(opts *IngestorOpts) {
opts.logRepository = r
@@ -53,9 +60,10 @@ func defaultIngestorOpts() *IngestorOpts {
type IngestorImpl struct {
contracts.UnimplementedEventsServiceServer
eventRepository repository.EventEngineRepository
logRepository repository.LogsEngineRepository
mq msgqueue.MessageQueue
eventRepository repository.EventEngineRepository
logRepository repository.LogsEngineRepository
streamEventRepository repository.StreamEventsEngineRepository
mq msgqueue.MessageQueue
}
func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) {
@@ -69,6 +77,10 @@ func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) {
return nil, fmt.Errorf("event repository is required. use WithEventRepository")
}
if opts.streamEventRepository == nil {
return nil, fmt.Errorf("stream event repository is required. use WithStreamEventRepository")
}
if opts.logRepository == nil {
return nil, fmt.Errorf("log repository is required. use WithLogRepository")
}
@@ -78,9 +90,10 @@ func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) {
}
return &IngestorImpl{
eventRepository: opts.eventRepository,
logRepository: opts.logRepository,
mq: opts.mq,
eventRepository: opts.eventRepository,
streamEventRepository: opts.streamEventRepository,
logRepository: opts.logRepository,
mq: opts.mq,
}, nil
}

View File

@@ -2,14 +2,18 @@ package ingestor
import (
"context"
"strconv"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/repository"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/dbsqlc"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/sqlchelpers"
"github.com/hatchet-dev/hatchet/internal/services/ingestor/contracts"
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
)
func (i *IngestorImpl) Push(ctx context.Context, req *contracts.PushEventRequest) (*contracts.Event, error) {
@@ -58,6 +62,49 @@ func (i *IngestorImpl) ReplaySingleEvent(ctx context.Context, req *contracts.Rep
return e, nil
}
func (i *IngestorImpl) PutStreamEvent(ctx context.Context, req *contracts.PutStreamEventRequest) (*contracts.PutStreamEventResponse, error) {
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
var createdAt *time.Time
if t := req.CreatedAt.AsTime().UTC(); !t.IsZero() {
createdAt = &t
}
var metadata []byte
if req.Metadata != "" {
metadata = []byte(req.Metadata)
}
streamEvent, err := i.streamEventRepository.PutStreamEvent(tenantId, &repository.CreateStreamEventOpts{
StepRunId: req.StepRunId,
CreatedAt: createdAt,
Message: req.Message,
Metadata: metadata,
})
if err != nil {
return nil, err
}
q, err := msgqueue.TenantEventConsumerQueue(tenantId)
if err != nil {
return nil, err
}
err = i.mq.AddMessage(context.Background(), q, streamEventToTask(streamEvent))
if err != nil {
return nil, err
}
return &contracts.PutStreamEventResponse{}, nil
}
func (i *IngestorImpl) PutLog(ctx context.Context, req *contracts.PutLogRequest) (*contracts.PutLogResponse, error) {
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
@@ -102,3 +149,27 @@ func toEvent(e *dbsqlc.Event) (*contracts.Event, error) {
EventTimestamp: timestamppb.New(e.CreatedAt.Time),
}, nil
}
func streamEventToTask(e *dbsqlc.StreamEvent) *msgqueue.Message {
tenantId := sqlchelpers.UUIDToStr(e.TenantId)
payloadTyped := tasktypes.StepRunStreamEventTaskPayload{
StepRunId: sqlchelpers.UUIDToStr(e.StepRunId),
CreatedAt: e.CreatedAt.Time.String(),
StreamEventId: strconv.FormatInt(e.ID, 10),
}
payload, _ := datautils.ToJSONMap(payloadTyped)
metadata, _ := datautils.ToJSONMap(tasktypes.StepRunStreamEventTaskMetadata{
TenantId: tenantId,
StreamEventId: strconv.FormatInt(e.ID, 10),
})
return &msgqueue.Message{
ID: "step-run-stream-event",
Payload: payload,
Metadata: metadata,
Retries: 3,
}
}

View File

@@ -80,6 +80,17 @@ type StepRunFinishedTaskMetadata struct {
TenantId string `json:"tenant_id" validate:"required,uuid"`
}
type StepRunStreamEventTaskPayload struct {
StepRunId string `json:"step_run_id" validate:"required,uuid"`
CreatedAt string `json:"created_at" validate:"required"`
StreamEventId string `json:"stream_event_id"`
}
type StepRunStreamEventTaskMetadata struct {
TenantId string `json:"tenant_id" validate:"required,uuid"`
StreamEventId string `json:"stream_event_id" validate:"required,integer"`
}
type StepRunFailedTaskPayload struct {
StepRunId string `json:"step_run_id" validate:"required,uuid"`
FailedAt string `json:"failed_at" validate:"required"`

View File

@@ -183,6 +183,18 @@ func (t *TickerImpl) Start() (func() error, error) {
return nil, fmt.Errorf("could not create poll cron schedules job: %w", err)
}
_, err = t.s.NewJob(
gocron.DurationJob(time.Minute*5),
gocron.NewTask(
t.runStreamEventCleanup(),
),
)
if err != nil {
cancel()
return nil, fmt.Errorf("could not schedule stream event cleanup: %w", err)
}
t.s.Start()
cleanup := func() error {
@@ -224,3 +236,15 @@ func (t *TickerImpl) runUpdateHeartbeat(ctx context.Context) func() {
}
}
}
func (t *TickerImpl) runStreamEventCleanup() func() {
return func() {
t.l.Debug().Msgf("ticker: cleaning up stream event")
err := t.repo.StreamEvent().CleanupStreamEvents()
if err != nil {
t.l.Err(err).Msg("could not cleanup stream events")
}
}
}

View File

@@ -23,7 +23,7 @@ type Client interface {
Admin() AdminClient
Dispatcher() DispatcherClient
Event() EventClient
Run() RunClient
Subscribe() SubscribeClient
API() *rest.ClientWithResponses
TenantId() string
}
@@ -34,7 +34,7 @@ type clientImpl struct {
admin AdminClient
dispatcher DispatcherClient
event EventClient
run RunClient
subscribe SubscribeClient
rest *rest.ClientWithResponses
// the tenant id
@@ -210,7 +210,7 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
admin := newAdmin(conn, shared)
dispatcher := newDispatcher(conn, shared)
event := newEvent(conn, shared)
run := newRun(conn, shared)
subscribe := newSubscribe(conn, shared)
rest, err := rest.NewClientWithResponses(opts.serverURL, rest.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", opts.token))
return nil
@@ -233,7 +233,7 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
l: opts.l,
admin: admin,
dispatcher: dispatcher,
run: run,
subscribe: subscribe,
event: event,
v: opts.v,
rest: rest,
@@ -252,8 +252,8 @@ func (c *clientImpl) Event() EventClient {
return c.event
}
func (c *clientImpl) Run() RunClient {
return c.run
func (c *clientImpl) Subscribe() SubscribeClient {
return c.subscribe
}
func (c *clientImpl) API() *rest.ClientWithResponses {

View File

@@ -16,6 +16,8 @@ type EventClient interface {
Push(ctx context.Context, eventKey string, payload interface{}) error
PutLog(ctx context.Context, stepRunId, msg string) error
PutStreamEvent(ctx context.Context, stepRunId string, message []byte) error
}
type eventClientImpl struct {
@@ -69,3 +71,13 @@ func (a *eventClientImpl) PutLog(ctx context.Context, stepRunId, msg string) err
return err
}
func (a *eventClientImpl) PutStreamEvent(ctx context.Context, stepRunId string, message []byte) error {
_, err := a.client.PutStreamEvent(a.ctx.newContext(ctx), &eventcontracts.PutStreamEventRequest{
CreatedAt: timestamppb.Now(),
StepRunId: stepRunId,
Message: message,
})
return err
}

View File

@@ -14,17 +14,24 @@ import (
type RunEvent *dispatchercontracts.WorkflowEvent
type RunHandler func(event RunEvent) error
type StreamEvent struct {
Message []byte
}
type RunClient interface {
type RunHandler func(event RunEvent) error
type StreamHandler func(event StreamEvent) error
type SubscribeClient interface {
On(ctx context.Context, workflowRunId string, handler RunHandler) error
Stream(ctx context.Context, workflowRunId string, handler StreamHandler) error
}
type ClientEventListener interface {
OnRunEvent(ctx context.Context, event *RunEvent) error
}
type runClientImpl struct {
type subscribeClientImpl struct {
client dispatchercontracts.DispatcherClient
l *zerolog.Logger
@@ -34,8 +41,8 @@ type runClientImpl struct {
ctx *contextLoader
}
func newRun(conn *grpc.ClientConn, opts *sharedClientOpts) RunClient {
return &runClientImpl{
func newSubscribe(conn *grpc.ClientConn, opts *sharedClientOpts) SubscribeClient {
return &subscribeClientImpl{
client: dispatchercontracts.NewDispatcherClient(conn),
l: opts.l,
v: opts.v,
@@ -43,7 +50,7 @@ func newRun(conn *grpc.ClientConn, opts *sharedClientOpts) RunClient {
}
}
func (r *runClientImpl) On(ctx context.Context, workflowRunId string, handler RunHandler) error {
func (r *subscribeClientImpl) On(ctx context.Context, workflowRunId string, handler RunHandler) error {
stream, err := r.client.SubscribeToWorkflowEvents(r.ctx.newContext(ctx), &dispatchercontracts.SubscribeToWorkflowEventsRequest{
WorkflowRunId: workflowRunId,
})
@@ -63,8 +70,44 @@ func (r *runClientImpl) On(ctx context.Context, workflowRunId string, handler Ru
return err
}
if event.EventType == dispatchercontracts.ResourceEventType_RESOURCE_EVENT_TYPE_STREAM {
continue
}
if err := handler(event); err != nil {
return err
}
}
}
func (r *subscribeClientImpl) Stream(ctx context.Context, workflowRunId string, handler StreamHandler) error {
stream, err := r.client.SubscribeToWorkflowEvents(r.ctx.newContext(ctx), &dispatchercontracts.SubscribeToWorkflowEventsRequest{
WorkflowRunId: workflowRunId,
})
if err != nil {
return err
}
for {
event, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
if event.EventType != dispatchercontracts.ResourceEventType_RESOURCE_EVENT_TYPE_STREAM {
continue
}
if err := handler(StreamEvent{
Message: []byte(event.EventPayload),
}); err != nil {
return err
}
}
}

View File

@@ -104,7 +104,7 @@ func (c *ChildWorkflow) Result() (*ChildWorkflowResult, error) {
go func() {
// listen for workflow finished events
err := c.client.Run().On(
err := c.client.Subscribe().On(
ctx,
c.workflowRunId,
func(event client.RunEvent) error {

View File

@@ -32,6 +32,8 @@ type HatchetContext interface {
Log(message string)
StreamEvent(message []byte)
SpawnWorkflow(workflowName string, input any, opts *SpawnWorkflowOpts) (*ChildWorkflow, error)
client() client.Client
@@ -152,6 +154,14 @@ func (h *hatchetContext) Log(message string) {
}
}
func (h *hatchetContext) StreamEvent(message []byte) {
err := h.c.Event().PutStreamEvent(h, h.action.StepRunId, message)
if err != nil {
h.l.Err(err).Msg("could not put stream event")
}
}
func (h *hatchetContext) index() int {
return h.i
}

View File

@@ -52,6 +52,10 @@ func (c *testHatchetContext) Log(message string) {
panic("not implemented")
}
func (c *testHatchetContext) StreamEvent(message []byte) {
panic("not implemented")
}
func (c *testHatchetContext) index() int {
panic("not implemented")
}

View File

@@ -0,0 +1,17 @@
-- CreateTable
CREATE TABLE "StreamEvent" (
"id" BIGSERIAL NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"tenantId" UUID NOT NULL,
"stepRunId" UUID,
"message" BYTEA NOT NULL,
"metadata" JSONB,
CONSTRAINT "StreamEvent_pkey" PRIMARY KEY ("id")
);
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun"("id") ON DELETE SET NULL ON UPDATE CASCADE;

View File

@@ -125,6 +125,7 @@ model Tenant {
githubPullRequests GithubPullRequest[]
githubPullRequestComments GithubPullRequestComment[]
githubWebhooks GithubWebhook[]
streamEvents StreamEvent[]
logs LogLine[]
snsIntegrations SNSIntegration[]
}
@@ -904,6 +905,7 @@ model StepRun {
archivedResults StepRunResultArchive[]
streamEvents StreamEvent[]
logs LogLine[]
childWorkflowRuns WorkflowRun[]
@@ -1266,6 +1268,26 @@ model LogLine {
metadata Json?
}
model StreamEvent {
// base fields
id BigInt @id @default(autoincrement()) @db.BigInt
createdAt DateTime @default(now())
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the step run id this stream event is associated with
stepRun StepRun? @relation(fields: [stepRunId], references: [id], onDelete: SetNull, onUpdate: Cascade)
stepRunId String? @db.Uuid
// the stream event bytes
message Bytes
// (optional) the stream event metadata
metadata Json?
}
model SNSIntegration {
// base fields
id String @id @unique @default(uuid()) @db.Uuid

Binary file not shown.

After

Width:  |  Height:  |  Size: 177 KiB

View File

@@ -0,0 +1,40 @@
import os
from hatchet_sdk import new_client
from dotenv import load_dotenv
import json
import asyncio
from hatchet_sdk.clients.listener import StepRunEventType
import base64
async def main():
load_dotenv()
hatchet = new_client()
workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", {"test": "test"})
listener = hatchet.listener.stream(workflowRunId)
# Get the directory of the current script
script_dir = os.path.dirname(os.path.abspath(__file__))
# Create the "out" directory if it doesn't exist
out_dir = os.path.join(script_dir, "out")
os.makedirs(out_dir, exist_ok=True)
async for event in listener:
if event.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:
# Decode the base64-encoded payload
decoded_payload = base64.b64decode(event.payload)
# Construct the path to the payload file in the "out" directory
payload_path = os.path.join(out_dir, "payload.jpg")
with open(payload_path, "wb") as f:
f.write(decoded_payload)
data = json.dumps({
"type": event.type,
"messageId": workflowRunId
})
print("data: " + data + "\n\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,14 +0,0 @@
from hatchet_sdk import new_client
from dotenv import load_dotenv
import json
load_dotenv()
client = new_client()
workflowRunId = client.admin.run_workflow("ManualTriggerWorkflow", {
"test": "test"
})
client.listener.on(workflowRunId, lambda event: print(
'EVENT: ' + event.type + ' ' + json.dumps(event.payload)))

View File

@@ -1,5 +1,7 @@
from hatchet_sdk import Hatchet
from hatchet_sdk import Hatchet, Context
from dotenv import load_dotenv
import base64
import os
load_dotenv()
@@ -9,9 +11,27 @@ hatchet = Hatchet(debug=True)
@hatchet.workflow(on_events=["man:create"])
class ManualTriggerWorkflow:
@hatchet.step()
def step1(self, context):
def step1(self, context: Context):
res = context.playground('res', "HELLO")
# Get the directory of the current script
script_dir = os.path.dirname(os.path.abspath(__file__))
# Construct the path to the image file relative to the script's directory
image_path = os.path.join(script_dir, "image.jpeg")
# Load the image file
with open(image_path, "rb") as image_file:
image_data = image_file.read()
print(len(image_data))
# Encode the image data as base64
base64_image = base64.b64encode(image_data).decode('utf-8')
# Stream the base64-encoded image data
context.put_stream(base64_image)
context.sleep(3)
print("executed step1")
return {"step1": "data1 "+res}
@@ -23,22 +43,6 @@ class ManualTriggerWorkflow:
print("finished step2")
return {"step2": "data2"}
# @hatchet.step()
# def stepb(self, context):
# res = context.playground('res', "HELLO")
# context.sleep(3)
# print("executed step1")
# return {"step1": "data1 "+res}
# @hatchet.step(parents=["stepb"], timeout='4s')
# def stepc(self, context):
# print("started step2")
# context.sleep(1)
# print("finished step2")
# return {"step2": "data2"}
workflow = ManualTriggerWorkflow()
worker = hatchet.worker('manual-worker', max_runs=4)
worker.register_workflow(workflow)

View File

@@ -18,20 +18,11 @@ from .clients.rest.configuration import Configuration
from .clients.rest_client import RestApi
class Client:
def admin(self):
raise NotImplementedError
def dispatcher(self):
raise NotImplementedError
def event(self):
raise NotImplementedError
def listener(self):
raise NotImplementedError
def rest(self):
raise NotImplementedError
admin: AdminClientImpl
dispatcher: DispatcherClientImpl
event: EventClientImpl
listener: ListenerClientImpl
rest: RestApi
class ClientImpl(Client):
@@ -43,30 +34,11 @@ class ClientImpl(Client):
listener_client: ListenerClientImpl,
rest_client: RestApi
):
# self.conn = conn
# self.tenant_id = tenant_id
# self.logger = logger
# self.validator = validator
self.admin = admin_client
self.dispatcher = dispatcher_client
self.event = event_client
self.listener = listener_client
self.rest_client = rest_client
def admin(self) -> AdminClientImpl:
return self.admin
def dispatcher(self) -> DispatcherClientImpl:
return self.dispatcher
def event(self) -> EventClientImpl:
return self.event
def listener(self) -> ListenerClientImpl:
return self.listener
def rest(self) -> RestApi:
return self.rest_client
self.rest = rest_client
def with_host_port(host: str, port: int):
def with_host_port_impl(config: ClientConfig):

View File

@@ -1,5 +1,5 @@
from ..events_pb2_grpc import EventsServiceStub
from ..events_pb2 import PushEventRequest, PutLogRequest
from ..events_pb2 import PushEventRequest, PutLogRequest, PutStreamEventRequest
import datetime
from ..loader import ClientConfig
@@ -22,7 +22,7 @@ def proto_timestamp_now():
return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
class EventClientImpl:
def __init__(self, client, token):
def __init__(self, client: EventsServiceStub, token):
self.client = client
self.token = token
@@ -53,4 +53,22 @@ class EventClientImpl:
self.client.PutLog(request, metadata=get_metadata(self.token))
except Exception as e:
raise ValueError(f"Error logging: {e}")
raise ValueError(f"Error logging: {e}")
def stream(self, data: str | bytes, step_run_id: str):
try:
if isinstance(data, str):
data_bytes = data.encode('utf-8')
elif isinstance(data, bytes):
data_bytes = data
else:
raise ValueError("Invalid data type. Expected str, bytes, or file.")
request = PutStreamEventRequest(
stepRunId=step_run_id,
createdAt=proto_timestamp_now(),
message=data_bytes,
)
self.client.PutStreamEvent(request, metadata=get_metadata(self.token))
except Exception as e:
raise ValueError(f"Error putting stream event: {e}")

View File

@@ -20,6 +20,7 @@ class StepRunEventType:
STEP_RUN_EVENT_TYPE_FAILED = 'STEP_RUN_EVENT_TYPE_FAILED'
STEP_RUN_EVENT_TYPE_CANCELLED = 'STEP_RUN_EVENT_TYPE_CANCELLED'
STEP_RUN_EVENT_TYPE_TIMED_OUT = 'STEP_RUN_EVENT_TYPE_TIMED_OUT'
STEP_RUN_EVENT_TYPE_STREAM = 'STEP_RUN_EVENT_TYPE_STREAM'
class WorkflowRunEventType:
WORKFLOW_RUN_EVENT_TYPE_STARTED = 'WORKFLOW_RUN_EVENT_TYPE_STARTED'
@@ -34,6 +35,7 @@ step_run_event_type_mapping = {
ResourceEventType.RESOURCE_EVENT_TYPE_FAILED: StepRunEventType.STEP_RUN_EVENT_TYPE_FAILED,
ResourceEventType.RESOURCE_EVENT_TYPE_CANCELLED: StepRunEventType.STEP_RUN_EVENT_TYPE_CANCELLED,
ResourceEventType.RESOURCE_EVENT_TYPE_TIMED_OUT: StepRunEventType.STEP_RUN_EVENT_TYPE_TIMED_OUT,
ResourceEventType.RESOURCE_EVENT_TYPE_STREAM: StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM,
}
workflow_run_event_type_mapping = {
@@ -95,6 +97,7 @@ class HatchetListener:
if workflow_event.eventPayload:
payload = json.loads(workflow_event.eventPayload)
except Exception as e:
payload = workflow_event.eventPayload
pass
yield StepRunEvent(type=eventType, payload=payload)
@@ -166,8 +169,8 @@ class ListenerClientImpl:
def stream(self, workflow_run_id: str):
return HatchetListener(workflow_run_id, self.token, self.config)
def on(self, workflow_run_id: str, handler: callable = None):
for event in self.stream(workflow_run_id):
async def on(self, workflow_run_id: str, handler: callable = None):
async for event in self.stream(workflow_run_id):
# call the handler if provided
if handler:
handler(event)

View File

@@ -127,6 +127,7 @@ class Context:
# FIXME: this limits the number of concurrent log requests to 1, which means we can do about
# 100 log lines per second but this depends on network.
self.logger_thread_pool = ThreadPoolExecutor(max_workers=1)
self.stream_event_thread_pool = ThreadPoolExecutor(max_workers=1)
# store each key in the overrides field in a lookup table
# overrides_data is a dictionary of key-value pairs
@@ -216,3 +217,15 @@ class Context:
return
self.logger_thread_pool.submit(self._log, line)
def _put_stream(self, data: str | bytes):
try:
self.client.event.stream(data=data, step_run_id=self.stepRunId)
except Exception as e:
logger.error(f"Error putting stream event: {e}")
def put_stream(self, data: str | bytes):
if self.stepRunId == "":
return
self.stream_event_thread_pool.submit(self._put_stream, data)

File diff suppressed because one or more lines are too long

View File

@@ -41,6 +41,7 @@ class ResourceEventType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
RESOURCE_EVENT_TYPE_FAILED: _ClassVar[ResourceEventType]
RESOURCE_EVENT_TYPE_CANCELLED: _ClassVar[ResourceEventType]
RESOURCE_EVENT_TYPE_TIMED_OUT: _ClassVar[ResourceEventType]
RESOURCE_EVENT_TYPE_STREAM: _ClassVar[ResourceEventType]
START_STEP_RUN: ActionType
CANCEL_STEP_RUN: ActionType
START_GET_GROUP_KEY: ActionType
@@ -61,6 +62,7 @@ RESOURCE_EVENT_TYPE_COMPLETED: ResourceEventType
RESOURCE_EVENT_TYPE_FAILED: ResourceEventType
RESOURCE_EVENT_TYPE_CANCELLED: ResourceEventType
RESOURCE_EVENT_TYPE_TIMED_OUT: ResourceEventType
RESOURCE_EVENT_TYPE_STREAM: ResourceEventType
class WorkerRegisterRequest(_message.Message):
__slots__ = ("workerName", "actions", "services", "maxRuns")

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: events.proto
# Protobuf Python Version: 4.25.0
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
@@ -15,7 +15,7 @@ _sym_db = _symbol_database.Default()
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x65vents.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"|\n\x05\x45vent\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x0f\n\x07\x65ventId\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\x12\x0f\n\x07payload\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x92\x01\n\rPutLogRequest\x12\x11\n\tstepRunId\x18\x01 \x01(\t\x12-\n\tcreatedAt\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0f\n\x07message\x18\x03 \x01(\t\x12\x12\n\x05level\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x10\n\x08metadata\x18\x05 \x01(\tB\x08\n\x06_level\"\x10\n\x0ePutLogResponse\"d\n\x10PushEventRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"%\n\x12ReplayEventRequest\x12\x0f\n\x07\x65ventId\x18\x01 \x01(\t2\x95\x01\n\rEventsService\x12#\n\x04Push\x12\x11.PushEventRequest\x1a\x06.Event\"\x00\x12\x32\n\x11ReplaySingleEvent\x12\x13.ReplayEventRequest\x1a\x06.Event\"\x00\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse\"\x00\x42GZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contractsb\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x65vents.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"|\n\x05\x45vent\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x0f\n\x07\x65ventId\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\x12\x0f\n\x07payload\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x92\x01\n\rPutLogRequest\x12\x11\n\tstepRunId\x18\x01 \x01(\t\x12-\n\tcreatedAt\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0f\n\x07message\x18\x03 \x01(\t\x12\x12\n\x05level\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x10\n\x08metadata\x18\x05 \x01(\tB\x08\n\x06_level\"\x10\n\x0ePutLogResponse\"|\n\x15PutStreamEventRequest\x12\x11\n\tstepRunId\x18\x01 \x01(\t\x12-\n\tcreatedAt\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0f\n\x07message\x18\x03 \x01(\x0c\x12\x10\n\x08metadata\x18\x05 \x01(\t\"\x18\n\x16PutStreamEventResponse\"d\n\x10PushEventRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"%\n\x12ReplayEventRequest\x12\x0f\n\x07\x65ventId\x18\x01 \x01(\t2\xda\x01\n\rEventsService\x12#\n\x04Push\x12\x11.PushEventRequest\x1a\x06.Event\"\x00\x12\x32\n\x11ReplaySingleEvent\x12\x13.ReplayEventRequest\x1a\x06.Event\"\x00\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse\"\x00\x12\x43\n\x0ePutStreamEvent\x12\x16.PutStreamEventRequest\x1a\x17.PutStreamEventResponse\"\x00\x42GZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contractsb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -29,10 +29,14 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_globals['_PUTLOGREQUEST']._serialized_end=322
_globals['_PUTLOGRESPONSE']._serialized_start=324
_globals['_PUTLOGRESPONSE']._serialized_end=340
_globals['_PUSHEVENTREQUEST']._serialized_start=342
_globals['_PUSHEVENTREQUEST']._serialized_end=442
_globals['_REPLAYEVENTREQUEST']._serialized_start=444
_globals['_REPLAYEVENTREQUEST']._serialized_end=481
_globals['_EVENTSSERVICE']._serialized_start=484
_globals['_EVENTSSERVICE']._serialized_end=633
_globals['_PUTSTREAMEVENTREQUEST']._serialized_start=342
_globals['_PUTSTREAMEVENTREQUEST']._serialized_end=466
_globals['_PUTSTREAMEVENTRESPONSE']._serialized_start=468
_globals['_PUTSTREAMEVENTRESPONSE']._serialized_end=492
_globals['_PUSHEVENTREQUEST']._serialized_start=494
_globals['_PUSHEVENTREQUEST']._serialized_end=594
_globals['_REPLAYEVENTREQUEST']._serialized_start=596
_globals['_REPLAYEVENTREQUEST']._serialized_end=633
_globals['_EVENTSSERVICE']._serialized_start=636
_globals['_EVENTSSERVICE']._serialized_end=854
# @@protoc_insertion_point(module_scope)

View File

@@ -37,6 +37,22 @@ class PutLogResponse(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...
class PutStreamEventRequest(_message.Message):
__slots__ = ("stepRunId", "createdAt", "message", "metadata")
STEPRUNID_FIELD_NUMBER: _ClassVar[int]
CREATEDAT_FIELD_NUMBER: _ClassVar[int]
MESSAGE_FIELD_NUMBER: _ClassVar[int]
METADATA_FIELD_NUMBER: _ClassVar[int]
stepRunId: str
createdAt: _timestamp_pb2.Timestamp
message: bytes
metadata: str
def __init__(self, stepRunId: _Optional[str] = ..., createdAt: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., message: _Optional[bytes] = ..., metadata: _Optional[str] = ...) -> None: ...
class PutStreamEventResponse(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...
class PushEventRequest(_message.Message):
__slots__ = ("key", "payload", "eventTimestamp")
KEY_FIELD_NUMBER: _ClassVar[int]

View File

@@ -28,6 +28,11 @@ class EventsServiceStub(object):
request_serializer=events__pb2.PutLogRequest.SerializeToString,
response_deserializer=events__pb2.PutLogResponse.FromString,
)
self.PutStreamEvent = channel.unary_unary(
'/EventsService/PutStreamEvent',
request_serializer=events__pb2.PutStreamEventRequest.SerializeToString,
response_deserializer=events__pb2.PutStreamEventResponse.FromString,
)
class EventsServiceServicer(object):
@@ -51,6 +56,12 @@ class EventsServiceServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def PutStreamEvent(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_EventsServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -69,6 +80,11 @@ def add_EventsServiceServicer_to_server(servicer, server):
request_deserializer=events__pb2.PutLogRequest.FromString,
response_serializer=events__pb2.PutLogResponse.SerializeToString,
),
'PutStreamEvent': grpc.unary_unary_rpc_method_handler(
servicer.PutStreamEvent,
request_deserializer=events__pb2.PutStreamEventRequest.FromString,
response_serializer=events__pb2.PutStreamEventResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'EventsService', rpc_method_handlers)
@@ -129,3 +145,20 @@ class EventsService(object):
events__pb2.PutLogResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def PutStreamEvent(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/EventsService/PutStreamEvent',
events__pb2.PutStreamEventRequest.SerializeToString,
events__pb2.PutStreamEventResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "0.19.0"
version = "0.20.0"
description = ""
authors = ["Alexander Belanger <alexander@hatchet.run>"]
readme = "README.md"