- Playground/{stepRun?.step?.readableId}
+ {stepRun?.step?.readableId}
@@ -333,7 +340,9 @@ export function StepRunPlayground({
- Inputs
+
+ Input
+
{stepInput && (
-
-
-
Outputs
+
+
+
+
+
+ Output
+
+
+ Logs
+
+
+
0
@@ -357,20 +375,29 @@ export function StepRunPlayground({
}
/>
- !!e) as string[]
- }
- />
-
+
+
+ !!e) as string[]
+ }
+ />
+
+
+
+
+
+
+
+
+
>
diff --git a/frontend/docs/pages/home/python-sdk/creating-a-workflow.mdx b/frontend/docs/pages/home/python-sdk/creating-a-workflow.mdx
index 80432059c..f06bf684a 100644
--- a/frontend/docs/pages/home/python-sdk/creating-a-workflow.mdx
+++ b/frontend/docs/pages/home/python-sdk/creating-a-workflow.mdx
@@ -160,4 +160,24 @@ def step1(self, context):
pass
```
-If you need control over cancellation, you can also use `context.cancel()` to cancel the current step, though this is not recommended.
\ No newline at end of file
+If you need control over cancellation, you can also use `context.cancel()` to cancel the current step, though this is not recommended.
+
+## Logging
+
+Hatchet comes with a built-in logging view where you can push debug logs from your workflows. To use this, you can use the `context.log` method. For example:
+
+```py
+@hatchet.workflow(on_events=["user:create"],schedule_timeout="10m")
+class LoggingWorkflow:
+ @hatchet.step()
+ def logger(self, context : Context):
+
+ for i in range(1000):
+ context.log(f"Logging message {i}")
+
+ return {
+ "step1": "completed",
+ }
+```
+
+Each step is currently limited to 1000 log lines.
\ No newline at end of file
diff --git a/frontend/docs/pages/home/typescript-sdk/creating-a-workflow.mdx b/frontend/docs/pages/home/typescript-sdk/creating-a-workflow.mdx
index 89bf77d20..270d5d2d2 100644
--- a/frontend/docs/pages/home/typescript-sdk/creating-a-workflow.mdx
+++ b/frontend/docs/pages/home/typescript-sdk/creating-a-workflow.mdx
@@ -296,4 +296,32 @@ await worker.registerWorkflow({
});
```
-This will then appear in the Hatchet UI under the `prompt` value.
\ No newline at end of file
+This will then appear in the Hatchet UI under the `prompt` value.
+
+## Logging
+
+Hatchet comes with a built-in logging view where you can push debug logs from your workflows. To use this, you can use the `ctx.log` method. For example:
+
+```ts
+const workflow: Workflow = {
+ id: 'logger-example',
+ description: 'test',
+ on: {
+ event: 'user:create',
+ },
+ steps: [
+ {
+ name: 'logger-step1',
+ run: async (ctx) => {
+ for (let i = 0; i < 1000; i++) {
+ ctx.log(`log message ${i}`);
+ }
+
+ return { step1: 'completed step run' };
+ },
+ },
+ ],
+};
+```
+
+Each step is currently limited to 1000 log lines.
\ No newline at end of file
diff --git a/internal/config/loader/loader.go b/internal/config/loader/loader.go
index a69daa5b2..2012e7eb2 100644
--- a/internal/config/loader/loader.go
+++ b/internal/config/loader/loader.go
@@ -181,6 +181,7 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF
ingestor, err := ingestor.NewIngestor(
ingestor.WithEventRepository(dc.Repository.Event()),
+ ingestor.WithLogRepository(dc.Repository.Log()),
ingestor.WithTaskQueue(tq),
)
diff --git a/internal/repository/logs.go b/internal/repository/logs.go
new file mode 100644
index 000000000..edc920013
--- /dev/null
+++ b/internal/repository/logs.go
@@ -0,0 +1,60 @@
+package repository
+
+import (
+ "time"
+
+ "github.com/hatchet-dev/hatchet/internal/repository/prisma/dbsqlc"
+)
+
+type CreateLogLineOpts struct {
+ // The step run id
+ StepRunId string `validate:"required,uuid"`
+
+ // (optional) The time when the log line was created.
+ CreatedAt *time.Time
+
+ // (required) The message of the log line.
+ Message string `validate:"required,min=1,max=10000"`
+
+ // (optional) The level of the log line.
+ Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`
+
+ // (optional) The metadata of the log line.
+ Metadata []byte
+}
+
+type ListLogsOpts struct {
+ // (optional) number of logs to skip
+ Offset *int
+
+ // (optional) number of logs to return
+ Limit *int `validate:"omitnil,min=1,max=1000"`
+
+ // (optional) a list of log levels to filter by
+ Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`
+
+ // (optional) a step run id to filter by
+ StepRunId *string `validate:"omitempty,uuid"`
+
+ // (optional) a search query
+ Search *string
+
+ // (optional) the order by field
+ OrderBy *string `validate:"omitempty,oneof=createdAt"`
+
+ // (optional) the order direction
+ OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
+}
+
+type ListLogsResult struct {
+ Rows []*dbsqlc.LogLine
+ Count int
+}
+
+type LogsRepository interface {
+ // PutLog creates a new log line.
+ PutLog(tenantId string, opts *CreateLogLineOpts) (*dbsqlc.LogLine, error)
+
+ // ListLogLines returns a list of log lines for a given step run.
+ ListLogLines(tenantId string, opts *ListLogsOpts) (*ListLogsResult, error)
+}
diff --git a/internal/repository/prisma/dbsqlc/logs.sql b/internal/repository/prisma/dbsqlc/logs.sql
new file mode 100644
index 000000000..98400f339
--- /dev/null
+++ b/internal/repository/prisma/dbsqlc/logs.sql
@@ -0,0 +1,41 @@
+-- name: CreateLogLine :one
+INSERT INTO "LogLine" (
+ "createdAt",
+ "tenantId",
+ "stepRunId",
+ "message",
+ "level",
+ "metadata"
+) VALUES (
+ coalesce(sqlc.narg('createdAt')::timestamp, now()),
+ @tenantId::uuid,
+ @stepRunId::uuid,
+ @message::text,
+ coalesce(sqlc.narg('level')::"LogLineLevel", 'INFO'::"LogLineLevel"),
+ coalesce(sqlc.narg('metadata')::jsonb, '{}'::jsonb)
+) RETURNING *;
+
+-- name: ListLogLines :many
+SELECT * FROM "LogLine"
+WHERE
+ "tenantId" = @tenantId::uuid AND
+ (sqlc.narg('stepRunId')::uuid IS NULL OR "stepRunId" = sqlc.narg('stepRunId')::uuid) AND
+ (sqlc.narg('search')::text IS NULL OR "message" LIKE concat('%', sqlc.narg('search')::text, '%')) AND
+ (sqlc.narg('levels')::"LogLineLevel"[] IS NULL OR "level" = ANY(sqlc.narg('levels')::"LogLineLevel"[]))
+ORDER BY
+ CASE WHEN sqlc.narg('orderBy')::text = 'createdAt ASC' THEN "createdAt" END ASC,
+ CASE WHEN sqlc.narg('orderBy')::text = 'createdAt DESC' THEN "createdAt" END DESC,
+ -- add order by id to make sure the order is deterministic
+ CASE WHEN sqlc.narg('orderBy')::text = 'createdAt ASC' THEN "id" END ASC,
+ CASE WHEN sqlc.narg('orderBy')::text = 'createdAt DESC' THEN "id" END DESC
+LIMIT COALESCE(sqlc.narg('limit'), 50)
+OFFSET COALESCE(sqlc.narg('offset'), 0);
+
+-- name: CountLogLines :one
+SELECT COUNT(*) AS total
+FROM "LogLine"
+WHERE
+ "tenantId" = @tenantId::uuid AND
+ (sqlc.narg('stepRunId')::uuid IS NULL OR "stepRunId" = sqlc.narg('stepRunId')::uuid) AND
+ (sqlc.narg('search')::text IS NULL OR "message" LIKE concat('%', sqlc.narg('search')::text, '%')) AND
+ (sqlc.narg('levels')::"LogLineLevel"[] IS NULL OR "level" = ANY(sqlc.narg('levels')::"LogLineLevel"[]));
diff --git a/internal/repository/prisma/dbsqlc/logs.sql.go b/internal/repository/prisma/dbsqlc/logs.sql.go
new file mode 100644
index 000000000..1b9dbe809
--- /dev/null
+++ b/internal/repository/prisma/dbsqlc/logs.sql.go
@@ -0,0 +1,153 @@
+// Code generated by sqlc. DO NOT EDIT.
+// versions:
+// sqlc v1.24.0
+// source: logs.sql
+
+package dbsqlc
+
+import (
+ "context"
+
+ "github.com/jackc/pgx/v5/pgtype"
+)
+
+const countLogLines = `-- name: CountLogLines :one
+SELECT COUNT(*) AS total
+FROM "LogLine"
+WHERE
+ "tenantId" = $1::uuid AND
+ ($2::uuid IS NULL OR "stepRunId" = $2::uuid) AND
+ ($3::text IS NULL OR "message" LIKE concat('%', $3::text, '%')) AND
+ ($4::"LogLineLevel"[] IS NULL OR "level" = ANY($4::"LogLineLevel"[]))
+`
+
+type CountLogLinesParams struct {
+ Tenantid pgtype.UUID `json:"tenantid"`
+ StepRunId pgtype.UUID `json:"stepRunId"`
+ Search pgtype.Text `json:"search"`
+ Levels []LogLineLevel `json:"levels"`
+}
+
+func (q *Queries) CountLogLines(ctx context.Context, db DBTX, arg CountLogLinesParams) (int64, error) {
+ row := db.QueryRow(ctx, countLogLines,
+ arg.Tenantid,
+ arg.StepRunId,
+ arg.Search,
+ arg.Levels,
+ )
+ var total int64
+ err := row.Scan(&total)
+ return total, err
+}
+
+const createLogLine = `-- name: CreateLogLine :one
+INSERT INTO "LogLine" (
+ "createdAt",
+ "tenantId",
+ "stepRunId",
+ "message",
+ "level",
+ "metadata"
+) VALUES (
+ coalesce($1::timestamp, now()),
+ $2::uuid,
+ $3::uuid,
+ $4::text,
+ coalesce($5::"LogLineLevel", 'INFO'::"LogLineLevel"),
+ coalesce($6::jsonb, '{}'::jsonb)
+) RETURNING id, "createdAt", "tenantId", "stepRunId", message, level, metadata
+`
+
+type CreateLogLineParams struct {
+ CreatedAt pgtype.Timestamp `json:"createdAt"`
+ Tenantid pgtype.UUID `json:"tenantid"`
+ Steprunid pgtype.UUID `json:"steprunid"`
+ Message string `json:"message"`
+ Level NullLogLineLevel `json:"level"`
+ Metadata []byte `json:"metadata"`
+}
+
+func (q *Queries) CreateLogLine(ctx context.Context, db DBTX, arg CreateLogLineParams) (*LogLine, error) {
+ row := db.QueryRow(ctx, createLogLine,
+ arg.CreatedAt,
+ arg.Tenantid,
+ arg.Steprunid,
+ arg.Message,
+ arg.Level,
+ arg.Metadata,
+ )
+ var i LogLine
+ err := row.Scan(
+ &i.ID,
+ &i.CreatedAt,
+ &i.TenantId,
+ &i.StepRunId,
+ &i.Message,
+ &i.Level,
+ &i.Metadata,
+ )
+ return &i, err
+}
+
+const listLogLines = `-- name: ListLogLines :many
+SELECT id, "createdAt", "tenantId", "stepRunId", message, level, metadata FROM "LogLine"
+WHERE
+ "tenantId" = $1::uuid AND
+ ($2::uuid IS NULL OR "stepRunId" = $2::uuid) AND
+ ($3::text IS NULL OR "message" LIKE concat('%', $3::text, '%')) AND
+ ($4::"LogLineLevel"[] IS NULL OR "level" = ANY($4::"LogLineLevel"[]))
+ORDER BY
+ CASE WHEN $5::text = 'createdAt ASC' THEN "createdAt" END ASC,
+ CASE WHEN $5::text = 'createdAt DESC' THEN "createdAt" END DESC,
+ -- add order by id to make sure the order is deterministic
+ CASE WHEN $5::text = 'createdAt ASC' THEN "id" END ASC,
+ CASE WHEN $5::text = 'createdAt DESC' THEN "id" END DESC
+LIMIT COALESCE($7, 50)
+OFFSET COALESCE($6, 0)
+`
+
+type ListLogLinesParams struct {
+ Tenantid pgtype.UUID `json:"tenantid"`
+ StepRunId pgtype.UUID `json:"stepRunId"`
+ Search pgtype.Text `json:"search"`
+ Levels []LogLineLevel `json:"levels"`
+ OrderBy pgtype.Text `json:"orderBy"`
+ Offset interface{} `json:"offset"`
+ Limit interface{} `json:"limit"`
+}
+
+func (q *Queries) ListLogLines(ctx context.Context, db DBTX, arg ListLogLinesParams) ([]*LogLine, error) {
+ rows, err := db.Query(ctx, listLogLines,
+ arg.Tenantid,
+ arg.StepRunId,
+ arg.Search,
+ arg.Levels,
+ arg.OrderBy,
+ arg.Offset,
+ arg.Limit,
+ )
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var items []*LogLine
+ for rows.Next() {
+ var i LogLine
+ if err := rows.Scan(
+ &i.ID,
+ &i.CreatedAt,
+ &i.TenantId,
+ &i.StepRunId,
+ &i.Message,
+ &i.Level,
+ &i.Metadata,
+ ); err != nil {
+ return nil, err
+ }
+ items = append(items, &i)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, err
+ }
+ return items, nil
+}
diff --git a/internal/repository/prisma/dbsqlc/models.go b/internal/repository/prisma/dbsqlc/models.go
index 23165e018..f016b3dac 100644
--- a/internal/repository/prisma/dbsqlc/models.go
+++ b/internal/repository/prisma/dbsqlc/models.go
@@ -143,6 +143,50 @@ func (ns NullJobRunStatus) Value() (driver.Value, error) {
return string(ns.JobRunStatus), nil
}
+type LogLineLevel string
+
+const (
+ LogLineLevelDEBUG LogLineLevel = "DEBUG"
+ LogLineLevelINFO LogLineLevel = "INFO"
+ LogLineLevelWARN LogLineLevel = "WARN"
+ LogLineLevelERROR LogLineLevel = "ERROR"
+)
+
+func (e *LogLineLevel) Scan(src interface{}) error {
+ switch s := src.(type) {
+ case []byte:
+ *e = LogLineLevel(s)
+ case string:
+ *e = LogLineLevel(s)
+ default:
+ return fmt.Errorf("unsupported scan type for LogLineLevel: %T", src)
+ }
+ return nil
+}
+
+type NullLogLineLevel struct {
+ LogLineLevel LogLineLevel `json:"LogLineLevel"`
+ Valid bool `json:"valid"` // Valid is true if LogLineLevel is not NULL
+}
+
+// Scan implements the Scanner interface.
+func (ns *NullLogLineLevel) Scan(value interface{}) error {
+ if value == nil {
+ ns.LogLineLevel, ns.Valid = "", false
+ return nil
+ }
+ ns.Valid = true
+ return ns.LogLineLevel.Scan(value)
+}
+
+// Value implements the driver Valuer interface.
+func (ns NullLogLineLevel) Value() (driver.Value, error) {
+ if !ns.Valid {
+ return nil, nil
+ }
+ return string(ns.LogLineLevel), nil
+}
+
type StepRunStatus string
const (
@@ -547,6 +591,16 @@ type JobRunLookupData struct {
Data []byte `json:"data"`
}
+type LogLine struct {
+ ID int64 `json:"id"`
+ CreatedAt pgtype.Timestamp `json:"createdAt"`
+ TenantId pgtype.UUID `json:"tenantId"`
+ StepRunId pgtype.UUID `json:"stepRunId"`
+ Message string `json:"message"`
+ Level LogLineLevel `json:"level"`
+ Metadata []byte `json:"metadata"`
+}
+
type Service struct {
ID pgtype.UUID `json:"id"`
CreatedAt pgtype.Timestamp `json:"createdAt"`
diff --git a/internal/repository/prisma/dbsqlc/schema.sql b/internal/repository/prisma/dbsqlc/schema.sql
index 9fb492013..d11932483 100644
--- a/internal/repository/prisma/dbsqlc/schema.sql
+++ b/internal/repository/prisma/dbsqlc/schema.sql
@@ -7,6 +7,9 @@ CREATE TYPE "InviteLinkStatus" AS ENUM ('PENDING', 'ACCEPTED', 'REJECTED');
-- CreateEnum
CREATE TYPE "JobRunStatus" AS ENUM ('PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELLED');
+-- CreateEnum
+CREATE TYPE "LogLineLevel" AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR');
+
-- CreateEnum
CREATE TYPE "StepRunStatus" AS ENUM ('PENDING', 'PENDING_ASSIGNMENT', 'ASSIGNED', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELLED');
@@ -227,6 +230,19 @@ CREATE TABLE "JobRunLookupData" (
CONSTRAINT "JobRunLookupData_pkey" PRIMARY KEY ("id")
);
+-- CreateTable
+CREATE TABLE "LogLine" (
+ "id" BIGSERIAL NOT NULL,
+ "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "tenantId" UUID NOT NULL,
+ "stepRunId" UUID,
+ "message" TEXT NOT NULL,
+ "level" "LogLineLevel" NOT NULL DEFAULT 'INFO',
+ "metadata" JSONB,
+
+ CONSTRAINT "LogLine_pkey" PRIMARY KEY ("id")
+);
+
-- CreateTable
CREATE TABLE "Service" (
"id" UUID NOT NULL,
@@ -928,6 +944,12 @@ ALTER TABLE "JobRunLookupData" ADD CONSTRAINT "JobRunLookupData_jobRunId_fkey" F
-- AddForeignKey
ALTER TABLE "JobRunLookupData" ADD CONSTRAINT "JobRunLookupData_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
+-- AddForeignKey
+ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun"("id") ON DELETE SET NULL ON UPDATE CASCADE;
+
+-- AddForeignKey
+ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
+
-- AddForeignKey
ALTER TABLE "Service" ADD CONSTRAINT "Service_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
diff --git a/internal/repository/prisma/dbsqlc/sqlc.yaml b/internal/repository/prisma/dbsqlc/sqlc.yaml
index 11626c680..e2a1a18ab 100644
--- a/internal/repository/prisma/dbsqlc/sqlc.yaml
+++ b/internal/repository/prisma/dbsqlc/sqlc.yaml
@@ -14,6 +14,7 @@ sql:
- tickers.sql
- dispatchers.sql
- workers.sql
+ - logs.sql
schema:
- schema.sql
strict_order_by: false
diff --git a/internal/repository/prisma/log.go b/internal/repository/prisma/log.go
new file mode 100644
index 000000000..a5f02609a
--- /dev/null
+++ b/internal/repository/prisma/log.go
@@ -0,0 +1,190 @@
+package prisma
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgxpool"
+ "github.com/rs/zerolog"
+
+ "github.com/hatchet-dev/hatchet/internal/repository"
+ "github.com/hatchet-dev/hatchet/internal/repository/prisma/db"
+ "github.com/hatchet-dev/hatchet/internal/repository/prisma/dbsqlc"
+ "github.com/hatchet-dev/hatchet/internal/repository/prisma/sqlchelpers"
+ "github.com/hatchet-dev/hatchet/internal/validator"
+)
+
+type logRepository struct {
+ client *db.PrismaClient
+ pool *pgxpool.Pool
+ v validator.Validator
+ queries *dbsqlc.Queries
+ l *zerolog.Logger
+}
+
+func NewLogRepository(client *db.PrismaClient, pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger) repository.LogsRepository {
+ queries := dbsqlc.New()
+
+ return &logRepository{
+ client: client,
+ pool: pool,
+ v: v,
+ queries: queries,
+ l: l,
+ }
+}
+
+func (r *logRepository) PutLog(tenantId string, opts *repository.CreateLogLineOpts) (*dbsqlc.LogLine, error) {
+ if err := r.v.Validate(opts); err != nil {
+ return nil, err
+ }
+
+ createParams := dbsqlc.CreateLogLineParams{
+ Tenantid: sqlchelpers.UUIDFromStr(tenantId),
+ Message: opts.Message,
+ Steprunid: sqlchelpers.UUIDFromStr(opts.StepRunId),
+ }
+
+ if opts.CreatedAt != nil {
+ utcTime := opts.CreatedAt.UTC()
+ createParams.CreatedAt = sqlchelpers.TimestampFromTime(utcTime)
+ }
+
+ if opts.Level != nil {
+ createParams.Level = dbsqlc.NullLogLineLevel{
+ LogLineLevel: dbsqlc.LogLineLevel(*opts.Level),
+ Valid: true,
+ }
+ }
+
+ if opts.Metadata != nil {
+ createParams.Metadata = opts.Metadata
+ }
+
+ tx, err := r.pool.Begin(context.Background())
+
+ if err != nil {
+ return nil, err
+ }
+
+ defer deferRollback(context.Background(), r.l, tx.Rollback)
+
+ logLine, err := r.queries.CreateLogLine(
+ context.Background(),
+ tx,
+ createParams,
+ )
+
+ if err != nil {
+ return nil, fmt.Errorf("could not create log line: %w", err)
+ }
+
+ err = tx.Commit(context.Background())
+
+ if err != nil {
+ return nil, fmt.Errorf("could not commit transaction: %w", err)
+ }
+
+ return logLine, nil
+}
+
+func (r *logRepository) ListLogLines(tenantId string, opts *repository.ListLogsOpts) (*repository.ListLogsResult, error) {
+ if err := r.v.Validate(opts); err != nil {
+ return nil, err
+ }
+
+ res := &repository.ListLogsResult{}
+
+ pgTenantId := sqlchelpers.UUIDFromStr(tenantId)
+
+ queryParams := dbsqlc.ListLogLinesParams{
+ Tenantid: pgTenantId,
+ }
+
+ countParams := dbsqlc.CountLogLinesParams{
+ Tenantid: pgTenantId,
+ }
+
+ if opts.Search != nil {
+ queryParams.Search = sqlchelpers.TextFromStr(*opts.Search)
+ countParams.Search = sqlchelpers.TextFromStr(*opts.Search)
+ }
+
+ if opts.Offset != nil {
+ queryParams.Offset = *opts.Offset
+ }
+
+ if opts.Limit != nil {
+ queryParams.Limit = *opts.Limit
+ }
+
+ if opts.StepRunId != nil {
+ queryParams.StepRunId = sqlchelpers.UUIDFromStr(*opts.StepRunId)
+ countParams.StepRunId = sqlchelpers.UUIDFromStr(*opts.StepRunId)
+ }
+
+ if opts.Levels != nil {
+ var levels []dbsqlc.LogLineLevel
+
+ for _, level := range opts.Levels {
+ levels = append(levels, dbsqlc.LogLineLevel(level))
+ }
+
+ queryParams.Levels = levels
+ countParams.Levels = levels
+ }
+
+ orderByField := "createdAt"
+ orderByDirection := "DESC"
+
+ if opts.OrderBy != nil {
+ orderByField = *opts.OrderBy
+ }
+
+ if opts.OrderDirection != nil {
+ orderByDirection = *opts.OrderDirection
+ }
+
+ queryParams.OrderBy = sqlchelpers.TextFromStr(orderByField + " " + orderByDirection)
+
+ tx, err := r.pool.Begin(context.Background())
+
+ if err != nil {
+ return nil, err
+ }
+
+ defer deferRollback(context.Background(), r.l, tx.Rollback)
+
+ logLines, err := r.queries.ListLogLines(context.Background(), tx, queryParams)
+
+ if err != nil {
+ if errors.Is(err, pgx.ErrNoRows) {
+ logLines = make([]*dbsqlc.LogLine, 0)
+ } else {
+ return nil, fmt.Errorf("could not list log lines: %w", err)
+ }
+ }
+
+ count, err := r.queries.CountLogLines(context.Background(), tx, countParams)
+
+ if err != nil {
+ if errors.Is(err, pgx.ErrNoRows) {
+ count = 0
+ } else {
+ return nil, fmt.Errorf("could not count events: %w", err)
+ }
+ }
+
+ err = tx.Commit(context.Background())
+
+ if err != nil {
+ return nil, fmt.Errorf("could not commit transaction: %w", err)
+ }
+
+ res.Rows = logLines
+ res.Count = int(count)
+
+ return res, nil
+}
diff --git a/internal/repository/prisma/repository.go b/internal/repository/prisma/repository.go
index b9dd04071..1f12a4b07 100644
--- a/internal/repository/prisma/repository.go
+++ b/internal/repository/prisma/repository.go
@@ -12,6 +12,7 @@ import (
type prismaRepository struct {
apiToken repository.APITokenRepository
event repository.EventRepository
+ log repository.LogsRepository
tenant repository.TenantRepository
tenantInvite repository.TenantInviteRepository
workflow repository.WorkflowRepository
@@ -67,6 +68,7 @@ func NewPrismaRepository(client *db.PrismaClient, pool *pgxpool.Pool, fs ...Pris
return &prismaRepository{
apiToken: NewAPITokenRepository(client, opts.v),
event: NewEventRepository(client, pool, opts.v, opts.l),
+ log: NewLogRepository(client, pool, opts.v, opts.l),
tenant: NewTenantRepository(client, opts.v),
tenantInvite: NewTenantInviteRepository(client, opts.v),
workflow: NewWorkflowRepository(client, pool, opts.v, opts.l),
@@ -97,6 +99,10 @@ func (r *prismaRepository) Event() repository.EventRepository {
return r.event
}
+func (r *prismaRepository) Log() repository.LogsRepository {
+ return r.log
+}
+
func (r *prismaRepository) Tenant() repository.TenantRepository {
return r.tenant
}
diff --git a/internal/repository/repository.go b/internal/repository/repository.go
index f05f7b7ca..b108a1609 100644
--- a/internal/repository/repository.go
+++ b/internal/repository/repository.go
@@ -4,6 +4,7 @@ type Repository interface {
Health() HealthRepository
APIToken() APITokenRepository
Event() EventRepository
+ Log() LogsRepository
Tenant() TenantRepository
TenantInvite() TenantInviteRepository
Workflow() WorkflowRepository
diff --git a/internal/services/ingestor/contracts/events.pb.go b/internal/services/ingestor/contracts/events.pb.go
index 93b3c91a7..ccad481c0 100644
--- a/internal/services/ingestor/contracts/events.pb.go
+++ b/internal/services/ingestor/contracts/events.pb.go
@@ -105,6 +105,128 @@ func (x *Event) GetEventTimestamp() *timestamppb.Timestamp {
return nil
}
+type PutLogRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // the step run id for the request
+ StepRunId string `protobuf:"bytes,1,opt,name=stepRunId,proto3" json:"stepRunId,omitempty"`
+ // when the log line was created
+ CreatedAt *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=createdAt,proto3" json:"createdAt,omitempty"`
+ // the log line message
+ Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
+ // the log line level
+ Level *string `protobuf:"bytes,4,opt,name=level,proto3,oneof" json:"level,omitempty"`
+ // associated log line metadata
+ Metadata string `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"`
+}
+
+func (x *PutLogRequest) Reset() {
+ *x = PutLogRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_events_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *PutLogRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*PutLogRequest) ProtoMessage() {}
+
+func (x *PutLogRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_events_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use PutLogRequest.ProtoReflect.Descriptor instead.
+func (*PutLogRequest) Descriptor() ([]byte, []int) {
+ return file_events_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *PutLogRequest) GetStepRunId() string {
+ if x != nil {
+ return x.StepRunId
+ }
+ return ""
+}
+
+func (x *PutLogRequest) GetCreatedAt() *timestamppb.Timestamp {
+ if x != nil {
+ return x.CreatedAt
+ }
+ return nil
+}
+
+func (x *PutLogRequest) GetMessage() string {
+ if x != nil {
+ return x.Message
+ }
+ return ""
+}
+
+func (x *PutLogRequest) GetLevel() string {
+ if x != nil && x.Level != nil {
+ return *x.Level
+ }
+ return ""
+}
+
+func (x *PutLogRequest) GetMetadata() string {
+ if x != nil {
+ return x.Metadata
+ }
+ return ""
+}
+
+type PutLogResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+}
+
+func (x *PutLogResponse) Reset() {
+ *x = PutLogResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_events_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *PutLogResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*PutLogResponse) ProtoMessage() {}
+
+func (x *PutLogResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_events_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use PutLogResponse.ProtoReflect.Descriptor instead.
+func (*PutLogResponse) Descriptor() ([]byte, []int) {
+ return file_events_proto_rawDescGZIP(), []int{2}
+}
+
type PushEventRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -121,7 +243,7 @@ type PushEventRequest struct {
func (x *PushEventRequest) Reset() {
*x = PushEventRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_events_proto_msgTypes[1]
+ mi := &file_events_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -134,7 +256,7 @@ func (x *PushEventRequest) String() string {
func (*PushEventRequest) ProtoMessage() {}
func (x *PushEventRequest) ProtoReflect() protoreflect.Message {
- mi := &file_events_proto_msgTypes[1]
+ mi := &file_events_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -147,7 +269,7 @@ func (x *PushEventRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use PushEventRequest.ProtoReflect.Descriptor instead.
func (*PushEventRequest) Descriptor() ([]byte, []int) {
- return file_events_proto_rawDescGZIP(), []int{1}
+ return file_events_proto_rawDescGZIP(), []int{3}
}
func (x *PushEventRequest) GetKey() string {
@@ -185,7 +307,7 @@ type ListEventRequest struct {
func (x *ListEventRequest) Reset() {
*x = ListEventRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_events_proto_msgTypes[2]
+ mi := &file_events_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -198,7 +320,7 @@ func (x *ListEventRequest) String() string {
func (*ListEventRequest) ProtoMessage() {}
func (x *ListEventRequest) ProtoReflect() protoreflect.Message {
- mi := &file_events_proto_msgTypes[2]
+ mi := &file_events_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -211,7 +333,7 @@ func (x *ListEventRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use ListEventRequest.ProtoReflect.Descriptor instead.
func (*ListEventRequest) Descriptor() ([]byte, []int) {
- return file_events_proto_rawDescGZIP(), []int{2}
+ return file_events_proto_rawDescGZIP(), []int{4}
}
func (x *ListEventRequest) GetOffset() int32 {
@@ -240,7 +362,7 @@ type ListEventResponse struct {
func (x *ListEventResponse) Reset() {
*x = ListEventResponse{}
if protoimpl.UnsafeEnabled {
- mi := &file_events_proto_msgTypes[3]
+ mi := &file_events_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -253,7 +375,7 @@ func (x *ListEventResponse) String() string {
func (*ListEventResponse) ProtoMessage() {}
func (x *ListEventResponse) ProtoReflect() protoreflect.Message {
- mi := &file_events_proto_msgTypes[3]
+ mi := &file_events_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -266,7 +388,7 @@ func (x *ListEventResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use ListEventResponse.ProtoReflect.Descriptor instead.
func (*ListEventResponse) Descriptor() ([]byte, []int) {
- return file_events_proto_rawDescGZIP(), []int{3}
+ return file_events_proto_rawDescGZIP(), []int{5}
}
func (x *ListEventResponse) GetEvents() []*Event {
@@ -288,7 +410,7 @@ type ReplayEventRequest struct {
func (x *ReplayEventRequest) Reset() {
*x = ReplayEventRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_events_proto_msgTypes[4]
+ mi := &file_events_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -301,7 +423,7 @@ func (x *ReplayEventRequest) String() string {
func (*ReplayEventRequest) ProtoMessage() {}
func (x *ReplayEventRequest) ProtoReflect() protoreflect.Message {
- mi := &file_events_proto_msgTypes[4]
+ mi := &file_events_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -314,7 +436,7 @@ func (x *ReplayEventRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use ReplayEventRequest.ProtoReflect.Descriptor instead.
func (*ReplayEventRequest) Descriptor() ([]byte, []int) {
- return file_events_proto_rawDescGZIP(), []int{4}
+ return file_events_proto_rawDescGZIP(), []int{6}
}
func (x *ReplayEventRequest) GetEventId() string {
@@ -341,40 +463,56 @@ var file_events_proto_rawDesc = []byte{
0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52,
0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22,
- 0x82, 0x01, 0x0a, 0x10, 0x50, 0x75, 0x73, 0x68, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71,
- 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
- 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61,
- 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
- 0x12, 0x42, 0x0a, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
- 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
- 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
- 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73,
- 0x74, 0x61, 0x6d, 0x70, 0x22, 0x3c, 0x0a, 0x10, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x76, 0x65, 0x6e,
- 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73,
- 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74,
- 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
- 0x65, 0x79, 0x22, 0x33, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52,
- 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74,
- 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x06, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52,
- 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x2e, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x61,
- 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a,
- 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
- 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x32, 0x99, 0x01, 0x0a, 0x0d, 0x45, 0x76, 0x65, 0x6e,
- 0x74, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x23, 0x0a, 0x04, 0x50, 0x75, 0x73,
- 0x68, 0x12, 0x11, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71,
- 0x75, 0x65, 0x73, 0x74, 0x1a, 0x06, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x12, 0x2f,
- 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x11, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x76, 0x65,
- 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x4c, 0x69, 0x73, 0x74,
- 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
- 0x32, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x53, 0x69, 0x6e, 0x67, 0x6c, 0x65, 0x45,
- 0x76, 0x65, 0x6e, 0x74, 0x12, 0x13, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x45, 0x76, 0x65,
- 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x06, 0x2e, 0x45, 0x76, 0x65, 0x6e,
- 0x74, 0x22, 0x00, 0x42, 0x47, 0x5a, 0x45, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
- 0x6d, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2d, 0x64, 0x65, 0x76, 0x2f, 0x68, 0x61,
- 0x74, 0x63, 0x68, 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x73,
- 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68,
- 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x33,
+ 0xc2, 0x01, 0x0a, 0x0d, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
+ 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x12,
+ 0x38, 0x0a, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x18, 0x02, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09,
+ 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73,
+ 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01,
+ 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x88, 0x01, 0x01, 0x12, 0x1a,
+ 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x6c,
+ 0x65, 0x76, 0x65, 0x6c, 0x22, 0x10, 0x0a, 0x0e, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x52, 0x65,
+ 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x82, 0x01, 0x0a, 0x10, 0x50, 0x75, 0x73, 0x68, 0x45,
+ 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b,
+ 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x18, 0x0a,
+ 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
+ 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x42, 0x0a, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74,
+ 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32,
+ 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
+ 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x65, 0x76, 0x65,
+ 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x3c, 0x0a, 0x10, 0x4c,
+ 0x69, 0x73, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
+ 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52,
+ 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x33, 0x0a, 0x11, 0x4c, 0x69, 0x73,
+ 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e,
+ 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x06,
+ 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x2e,
+ 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x32, 0xc6,
+ 0x01, 0x0a, 0x0d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
+ 0x12, 0x23, 0x0a, 0x04, 0x50, 0x75, 0x73, 0x68, 0x12, 0x11, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x45,
+ 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x06, 0x2e, 0x45, 0x76,
+ 0x65, 0x6e, 0x74, 0x22, 0x00, 0x12, 0x2f, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x11, 0x2e,
+ 0x4c, 0x69, 0x73, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+ 0x1a, 0x12, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70,
+ 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x32, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79,
+ 0x53, 0x69, 0x6e, 0x67, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x13, 0x2e, 0x52, 0x65,
+ 0x70, 0x6c, 0x61, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+ 0x1a, 0x06, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x12, 0x2b, 0x0a, 0x06, 0x50, 0x75,
+ 0x74, 0x4c, 0x6f, 0x67, 0x12, 0x0e, 0x2e, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x50, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73,
+ 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x47, 0x5a, 0x45, 0x67, 0x69, 0x74, 0x68, 0x75,
+ 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2d, 0x64, 0x65,
+ 0x76, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e,
+ 0x61, 0x6c, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x64, 0x69, 0x73, 0x70,
+ 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x73,
+ 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -389,30 +527,35 @@ func file_events_proto_rawDescGZIP() []byte {
return file_events_proto_rawDescData
}
-var file_events_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
+var file_events_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_events_proto_goTypes = []interface{}{
(*Event)(nil), // 0: Event
- (*PushEventRequest)(nil), // 1: PushEventRequest
- (*ListEventRequest)(nil), // 2: ListEventRequest
- (*ListEventResponse)(nil), // 3: ListEventResponse
- (*ReplayEventRequest)(nil), // 4: ReplayEventRequest
- (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp
+ (*PutLogRequest)(nil), // 1: PutLogRequest
+ (*PutLogResponse)(nil), // 2: PutLogResponse
+ (*PushEventRequest)(nil), // 3: PushEventRequest
+ (*ListEventRequest)(nil), // 4: ListEventRequest
+ (*ListEventResponse)(nil), // 5: ListEventResponse
+ (*ReplayEventRequest)(nil), // 6: ReplayEventRequest
+ (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
}
var file_events_proto_depIdxs = []int32{
- 5, // 0: Event.eventTimestamp:type_name -> google.protobuf.Timestamp
- 5, // 1: PushEventRequest.eventTimestamp:type_name -> google.protobuf.Timestamp
- 0, // 2: ListEventResponse.events:type_name -> Event
- 1, // 3: EventsService.Push:input_type -> PushEventRequest
- 2, // 4: EventsService.List:input_type -> ListEventRequest
- 4, // 5: EventsService.ReplaySingleEvent:input_type -> ReplayEventRequest
- 0, // 6: EventsService.Push:output_type -> Event
- 3, // 7: EventsService.List:output_type -> ListEventResponse
- 0, // 8: EventsService.ReplaySingleEvent:output_type -> Event
- 6, // [6:9] is the sub-list for method output_type
- 3, // [3:6] is the sub-list for method input_type
- 3, // [3:3] is the sub-list for extension type_name
- 3, // [3:3] is the sub-list for extension extendee
- 0, // [0:3] is the sub-list for field type_name
+ 7, // 0: Event.eventTimestamp:type_name -> google.protobuf.Timestamp
+ 7, // 1: PutLogRequest.createdAt:type_name -> google.protobuf.Timestamp
+ 7, // 2: PushEventRequest.eventTimestamp:type_name -> google.protobuf.Timestamp
+ 0, // 3: ListEventResponse.events:type_name -> Event
+ 3, // 4: EventsService.Push:input_type -> PushEventRequest
+ 4, // 5: EventsService.List:input_type -> ListEventRequest
+ 6, // 6: EventsService.ReplaySingleEvent:input_type -> ReplayEventRequest
+ 1, // 7: EventsService.PutLog:input_type -> PutLogRequest
+ 0, // 8: EventsService.Push:output_type -> Event
+ 5, // 9: EventsService.List:output_type -> ListEventResponse
+ 0, // 10: EventsService.ReplaySingleEvent:output_type -> Event
+ 2, // 11: EventsService.PutLog:output_type -> PutLogResponse
+ 8, // [8:12] is the sub-list for method output_type
+ 4, // [4:8] is the sub-list for method input_type
+ 4, // [4:4] is the sub-list for extension type_name
+ 4, // [4:4] is the sub-list for extension extendee
+ 0, // [0:4] is the sub-list for field type_name
}
func init() { file_events_proto_init() }
@@ -434,7 +577,7 @@ func file_events_proto_init() {
}
}
file_events_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*PushEventRequest); i {
+ switch v := v.(*PutLogRequest); i {
case 0:
return &v.state
case 1:
@@ -446,7 +589,7 @@ func file_events_proto_init() {
}
}
file_events_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*ListEventRequest); i {
+ switch v := v.(*PutLogResponse); i {
case 0:
return &v.state
case 1:
@@ -458,7 +601,7 @@ func file_events_proto_init() {
}
}
file_events_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*ListEventResponse); i {
+ switch v := v.(*PushEventRequest); i {
case 0:
return &v.state
case 1:
@@ -470,6 +613,30 @@ func file_events_proto_init() {
}
}
file_events_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ListEventRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_events_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ListEventResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_events_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplayEventRequest); i {
case 0:
return &v.state
@@ -482,13 +649,14 @@ func file_events_proto_init() {
}
}
}
+ file_events_proto_msgTypes[1].OneofWrappers = []interface{}{}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_events_proto_rawDesc,
NumEnums: 0,
- NumMessages: 5,
+ NumMessages: 7,
NumExtensions: 0,
NumServices: 1,
},
diff --git a/internal/services/ingestor/contracts/events_grpc.pb.go b/internal/services/ingestor/contracts/events_grpc.pb.go
index 6a009cc8a..0acd8ad13 100644
--- a/internal/services/ingestor/contracts/events_grpc.pb.go
+++ b/internal/services/ingestor/contracts/events_grpc.pb.go
@@ -25,6 +25,7 @@ type EventsServiceClient interface {
Push(ctx context.Context, in *PushEventRequest, opts ...grpc.CallOption) (*Event, error)
List(ctx context.Context, in *ListEventRequest, opts ...grpc.CallOption) (*ListEventResponse, error)
ReplaySingleEvent(ctx context.Context, in *ReplayEventRequest, opts ...grpc.CallOption) (*Event, error)
+ PutLog(ctx context.Context, in *PutLogRequest, opts ...grpc.CallOption) (*PutLogResponse, error)
}
type eventsServiceClient struct {
@@ -62,6 +63,15 @@ func (c *eventsServiceClient) ReplaySingleEvent(ctx context.Context, in *ReplayE
return out, nil
}
+func (c *eventsServiceClient) PutLog(ctx context.Context, in *PutLogRequest, opts ...grpc.CallOption) (*PutLogResponse, error) {
+ out := new(PutLogResponse)
+ err := c.cc.Invoke(ctx, "/EventsService/PutLog", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
// EventsServiceServer is the server API for EventsService service.
// All implementations must embed UnimplementedEventsServiceServer
// for forward compatibility
@@ -69,6 +79,7 @@ type EventsServiceServer interface {
Push(context.Context, *PushEventRequest) (*Event, error)
List(context.Context, *ListEventRequest) (*ListEventResponse, error)
ReplaySingleEvent(context.Context, *ReplayEventRequest) (*Event, error)
+ PutLog(context.Context, *PutLogRequest) (*PutLogResponse, error)
mustEmbedUnimplementedEventsServiceServer()
}
@@ -85,6 +96,9 @@ func (UnimplementedEventsServiceServer) List(context.Context, *ListEventRequest)
func (UnimplementedEventsServiceServer) ReplaySingleEvent(context.Context, *ReplayEventRequest) (*Event, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReplaySingleEvent not implemented")
}
+func (UnimplementedEventsServiceServer) PutLog(context.Context, *PutLogRequest) (*PutLogResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method PutLog not implemented")
+}
func (UnimplementedEventsServiceServer) mustEmbedUnimplementedEventsServiceServer() {}
// UnsafeEventsServiceServer may be embedded to opt out of forward compatibility for this service.
@@ -152,6 +166,24 @@ func _EventsService_ReplaySingleEvent_Handler(srv interface{}, ctx context.Conte
return interceptor(ctx, in, info, handler)
}
+func _EventsService_PutLog_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(PutLogRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(EventsServiceServer).PutLog(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/EventsService/PutLog",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(EventsServiceServer).PutLog(ctx, req.(*PutLogRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
// EventsService_ServiceDesc is the grpc.ServiceDesc for EventsService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -171,6 +203,10 @@ var EventsService_ServiceDesc = grpc.ServiceDesc{
MethodName: "ReplaySingleEvent",
Handler: _EventsService_ReplaySingleEvent_Handler,
},
+ {
+ MethodName: "PutLog",
+ Handler: _EventsService_PutLog_Handler,
+ },
},
Streams: []grpc.StreamDesc{},
Metadata: "events.proto",
diff --git a/internal/services/ingestor/ingestor.go b/internal/services/ingestor/ingestor.go
index 5c17c73af..91d3471df 100644
--- a/internal/services/ingestor/ingestor.go
+++ b/internal/services/ingestor/ingestor.go
@@ -25,6 +25,7 @@ type IngestorOptFunc func(*IngestorOpts)
type IngestorOpts struct {
eventRepository repository.EventRepository
+ logRepository repository.LogsRepository
taskQueue taskqueue.TaskQueue
}
@@ -34,6 +35,12 @@ func WithEventRepository(r repository.EventRepository) IngestorOptFunc {
}
}
+func WithLogRepository(r repository.LogsRepository) IngestorOptFunc {
+ return func(opts *IngestorOpts) {
+ opts.logRepository = r
+ }
+}
+
func WithTaskQueue(tq taskqueue.TaskQueue) IngestorOptFunc {
return func(opts *IngestorOpts) {
opts.taskQueue = tq
@@ -48,6 +55,7 @@ type IngestorImpl struct {
contracts.UnimplementedEventsServiceServer
eventRepository repository.EventRepository
+ logRepository repository.LogsRepository
tq taskqueue.TaskQueue
}
@@ -62,12 +70,17 @@ func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) {
return nil, fmt.Errorf("event repository is required. use WithEventRepository")
}
+ if opts.logRepository == nil {
+ return nil, fmt.Errorf("log repository is required. use WithLogRepository")
+ }
+
if opts.taskQueue == nil {
return nil, fmt.Errorf("task queue is required. use WithTaskQueue")
}
return &IngestorImpl{
eventRepository: opts.eventRepository,
+ logRepository: opts.logRepository,
tq: opts.taskQueue,
}, nil
}
diff --git a/internal/services/ingestor/server.go b/internal/services/ingestor/server.go
index 3673dfc17..8e34ccfc7 100644
--- a/internal/services/ingestor/server.go
+++ b/internal/services/ingestor/server.go
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
+ "time"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -101,6 +102,36 @@ func (i *IngestorImpl) ReplaySingleEvent(ctx context.Context, req *contracts.Rep
return e, nil
}
+func (i *IngestorImpl) PutLog(ctx context.Context, req *contracts.PutLogRequest) (*contracts.PutLogResponse, error) {
+ tenant := ctx.Value("tenant").(*db.TenantModel)
+
+ var createdAt *time.Time
+
+ if t := req.CreatedAt.AsTime(); !t.IsZero() {
+ createdAt = &t
+ }
+
+ var metadata []byte
+
+ if req.Metadata != "" {
+ metadata = []byte(req.Metadata)
+ }
+
+ _, err := i.logRepository.PutLog(tenant.ID, &repository.CreateLogLineOpts{
+ StepRunId: req.StepRunId,
+ CreatedAt: createdAt,
+ Message: req.Message,
+ Level: req.Level,
+ Metadata: metadata,
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &contracts.PutLogResponse{}, nil
+}
+
func toEventFromSQLC(eventRow *dbsqlc.ListEventsRow) (*contracts.Event, error) {
event := eventRow.Event
diff --git a/prisma/migrations/20240229232811_v0_14_0/migration.sql b/prisma/migrations/20240229232811_v0_14_0/migration.sql
new file mode 100644
index 000000000..d1e9d658b
--- /dev/null
+++ b/prisma/migrations/20240229232811_v0_14_0/migration.sql
@@ -0,0 +1,21 @@
+-- CreateEnum
+CREATE TYPE "LogLineLevel" AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR');
+
+-- CreateTable
+CREATE TABLE
+ "LogLine" (
+ "id" BIGSERIAL NOT NULL,
+ "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "tenantId" UUID NOT NULL,
+ "stepRunId" UUID,
+ "message" TEXT NOT NULL,
+ "level" "LogLineLevel" NOT NULL DEFAULT 'INFO',
+ "metadata" JSONB,
+ CONSTRAINT "LogLine_pkey" PRIMARY KEY ("id")
+ );
+
+-- AddForeignKey
+ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant" ("id") ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- AddForeignKey
+ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun" ("id") ON DELETE SET NULL ON UPDATE CASCADE;
\ No newline at end of file
diff --git a/prisma/schema.prisma b/prisma/schema.prisma
index 80790b430..5b0554855 100644
--- a/prisma/schema.prisma
+++ b/prisma/schema.prisma
@@ -125,6 +125,7 @@ model Tenant {
githubPullRequests GithubPullRequest[]
githubPullRequestComments GithubPullRequestComment[]
githubWebhooks GithubWebhook[]
+ logs LogLine[]
}
enum TenantMemberRole {
@@ -868,6 +869,8 @@ model StepRun {
gitRepoBranch String?
archivedResults StepRunResultArchive[]
+
+ logs LogLine[]
}
model StepRunResultArchive {
@@ -1184,3 +1187,33 @@ model GithubWebhook {
@@unique([tenantId, repositoryOwner, repositoryName])
}
+
+enum LogLineLevel {
+ DEBUG
+ INFO
+ WARN
+ ERROR
+}
+
+model LogLine {
+ // base fields
+ id BigInt @id @default(autoincrement()) @db.BigInt
+ createdAt DateTime @default(now())
+
+ // the parent tenant
+ tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
+ tenantId String @db.Uuid
+
+ // the step run id this log is associated with
+ stepRun StepRun? @relation(fields: [stepRunId], references: [id], onDelete: SetNull, onUpdate: Cascade)
+ stepRunId String? @db.Uuid
+
+ // the log line message
+ message String
+
+ // the log line level
+ level LogLineLevel @default(INFO)
+
+ // (optional) the log line metadata
+ metadata Json?
+}
diff --git a/python-sdk/examples/logger/worker.py b/python-sdk/examples/logger/worker.py
new file mode 100644
index 000000000..77a688abe
--- /dev/null
+++ b/python-sdk/examples/logger/worker.py
@@ -0,0 +1,25 @@
+import time
+from hatchet_sdk import Hatchet, Context
+from dotenv import load_dotenv
+
+load_dotenv()
+
+hatchet = Hatchet()
+
+@hatchet.workflow(on_events=["user:create"],schedule_timeout="10m")
+class LoggingWorkflow:
+ @hatchet.step()
+ def logger(self, context : Context):
+
+ for i in range(1000):
+ context.log(f"Logging message {i}")
+
+ return {
+ "step1": "completed",
+ }
+
+workflow = LoggingWorkflow()
+worker = hatchet.worker('logging-worker-py')
+worker.register_workflow(workflow)
+
+worker.start()
\ No newline at end of file
diff --git a/python-sdk/hatchet_sdk/clients/events.py b/python-sdk/hatchet_sdk/clients/events.py
index 8297129c6..e5a6e1381 100644
--- a/python-sdk/hatchet_sdk/clients/events.py
+++ b/python-sdk/hatchet_sdk/clients/events.py
@@ -1,5 +1,5 @@
from ..events_pb2_grpc import EventsServiceStub
-from ..events_pb2 import PushEventRequest
+from ..events_pb2 import PushEventRequest, PutLogRequest
import datetime
from ..loader import ClientConfig
@@ -14,6 +14,13 @@ def new_event(conn, config: ClientConfig):
token=config.token,
)
+def proto_timestamp_now():
+ t = datetime.datetime.now().timestamp()
+ seconds = int(t)
+ nanos = int(t % 1 * 1e9)
+
+ return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
+
class EventClientImpl:
def __init__(self, client, token):
self.client = client
@@ -28,10 +35,22 @@ class EventClientImpl:
request = PushEventRequest(
key=event_key,
payload=payload_bytes,
- eventTimestamp=timestamp_pb2.Timestamp().FromDatetime(datetime.datetime.now()),
+ eventTimestamp=proto_timestamp_now(),
)
try:
self.client.Push(request, metadata=get_metadata(self.token))
except grpc.RpcError as e:
raise ValueError(f"gRPC error: {e}")
+
+ def log(self, message: str, step_run_id: str):
+ try:
+ request = PutLogRequest(
+ stepRunId=step_run_id,
+ createdAt=proto_timestamp_now(),
+ message=message,
+ )
+
+ self.client.PutLog(request, metadata=get_metadata(self.token))
+ except Exception as e:
+ raise ValueError(f"Error logging: {e}")
\ No newline at end of file
diff --git a/python-sdk/hatchet_sdk/context.py b/python-sdk/hatchet_sdk/context.py
index 3a3897a34..c3ac88bdb 100644
--- a/python-sdk/hatchet_sdk/context.py
+++ b/python-sdk/hatchet_sdk/context.py
@@ -1,8 +1,13 @@
+from concurrent.futures import ThreadPoolExecutor
+import datetime
import inspect
from multiprocessing import Event
import os
from .clients.dispatcher import Action, DispatcherClient
+from google.protobuf import timestamp_pb2
+from .clients.events import EventClientImpl
from .dispatcher_pb2 import OverridesData
+from .events_pb2 import PutLogRequest
from .logger import logger
import json
@@ -12,7 +17,7 @@ def get_caller_file_path():
return caller_frame.filename
class Context:
- def __init__(self, action: Action, client: DispatcherClient):
+ def __init__(self, action: Action, client: DispatcherClient, eventClient: EventClientImpl):
try:
self.data = json.loads(action.action_payload)
except Exception as e:
@@ -21,6 +26,11 @@ class Context:
self.stepRunId = action.step_run_id
self.exit_flag = Event()
self.client = client
+ self.eventClient = eventClient
+
+ # FIXME: this limits the number of concurrent log requests to 1, which means we can do about
+ # 100 log lines per second but this depends on network.
+ self.logger_thread_pool = ThreadPoolExecutor(max_workers=1)
# store each key in the overrides field in a lookup table
# overrides_data is a dictionary of key-value pairs
@@ -73,4 +83,16 @@ class Context:
)
)
- return default
\ No newline at end of file
+ return default
+
+ def _log(self, line: str):
+ try:
+ self.eventClient.log(message=line, step_run_id=self.stepRunId)
+ except Exception as e:
+ logger.error(f"Error logging: {e}")
+
+ def log(self, line: str):
+ if self.stepRunId == "":
+ return
+
+ self.logger_thread_pool.submit(self._log, line)
diff --git a/python-sdk/hatchet_sdk/dispatcher_pb2.py b/python-sdk/hatchet_sdk/dispatcher_pb2.py
index bb37c5efc..5984c6f25 100644
--- a/python-sdk/hatchet_sdk/dispatcher_pb2.py
+++ b/python-sdk/hatchet_sdk/dispatcher_pb2.py
@@ -15,7 +15,7 @@ _sym_db = _symbol_database.Default()
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64ispatcher.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"p\n\x15WorkerRegisterRequest\x12\x12\n\nworkerName\x18\x01 \x01(\t\x12\x0f\n\x07\x61\x63tions\x18\x02 \x03(\t\x12\x10\n\x08services\x18\x03 \x03(\t\x12\x14\n\x07maxRuns\x18\x04 \x01(\x05H\x00\x88\x01\x01\x42\n\n\x08_maxRuns\"P\n\x16WorkerRegisterResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\x12\x12\n\nworkerName\x18\x03 \x01(\t\"\xf2\x01\n\x0e\x41ssignedAction\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\r\n\x05jobId\x18\x04 \x01(\t\x12\x0f\n\x07jobName\x18\x05 \x01(\t\x12\x10\n\x08jobRunId\x18\x06 \x01(\t\x12\x0e\n\x06stepId\x18\x07 \x01(\t\x12\x11\n\tstepRunId\x18\x08 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\t \x01(\t\x12\x1f\n\nactionType\x18\n \x01(\x0e\x32\x0b.ActionType\x12\x15\n\ractionPayload\x18\x0b \x01(\t\"\'\n\x13WorkerListenRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\",\n\x18WorkerUnsubscribeRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\"?\n\x19WorkerUnsubscribeResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"\xe1\x01\n\x13GroupKeyActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12+\n\teventType\x18\x06 \x01(\x0e\x32\x18.GroupKeyActionEventType\x12\x14\n\x0c\x65ventPayload\x18\x07 \x01(\t\"\xec\x01\n\x0fStepActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\r\n\x05jobId\x18\x02 \x01(\t\x12\x10\n\x08jobRunId\x18\x03 \x01(\t\x12\x0e\n\x06stepId\x18\x04 \x01(\t\x12\x11\n\tstepRunId\x18\x05 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x06 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\'\n\teventType\x18\x08 \x01(\x0e\x32\x14.StepActionEventType\x12\x14\n\x0c\x65ventPayload\x18\t \x01(\t\"9\n\x13\x41\x63tionEventResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"9\n SubscribeToWorkflowEventsRequest\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\"\xe0\x01\n\rWorkflowEvent\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\x12#\n\x0cresourceType\x18\x02 \x01(\x0e\x32\r.ResourceType\x12%\n\teventType\x18\x03 \x01(\x0e\x32\x12.ResourceEventType\x12\x12\n\nresourceId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x14\n\x0c\x65ventPayload\x18\x06 \x01(\t\x12\x0e\n\x06hangup\x18\x07 \x01(\x08\"W\n\rOverridesData\x12\x11\n\tstepRunId\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\t\x12\x16\n\x0e\x63\x61llerFilename\x18\x04 \x01(\t\"\x17\n\x15OverridesDataResponse*N\n\nActionType\x12\x12\n\x0eSTART_STEP_RUN\x10\x00\x12\x13\n\x0f\x43\x41NCEL_STEP_RUN\x10\x01\x12\x17\n\x13START_GET_GROUP_KEY\x10\x02*\xa2\x01\n\x17GroupKeyActionEventType\x12 \n\x1cGROUP_KEY_EVENT_TYPE_UNKNOWN\x10\x00\x12 \n\x1cGROUP_KEY_EVENT_TYPE_STARTED\x10\x01\x12\"\n\x1eGROUP_KEY_EVENT_TYPE_COMPLETED\x10\x02\x12\x1f\n\x1bGROUP_KEY_EVENT_TYPE_FAILED\x10\x03*\x8a\x01\n\x13StepActionEventType\x12\x1b\n\x17STEP_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1b\n\x17STEP_EVENT_TYPE_STARTED\x10\x01\x12\x1d\n\x19STEP_EVENT_TYPE_COMPLETED\x10\x02\x12\x1a\n\x16STEP_EVENT_TYPE_FAILED\x10\x03*e\n\x0cResourceType\x12\x19\n\x15RESOURCE_TYPE_UNKNOWN\x10\x00\x12\x1a\n\x16RESOURCE_TYPE_STEP_RUN\x10\x01\x12\x1e\n\x1aRESOURCE_TYPE_WORKFLOW_RUN\x10\x02*\xde\x01\n\x11ResourceEventType\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_STARTED\x10\x01\x12!\n\x1dRESOURCE_EVENT_TYPE_COMPLETED\x10\x02\x12\x1e\n\x1aRESOURCE_EVENT_TYPE_FAILED\x10\x03\x12!\n\x1dRESOURCE_EVENT_TYPE_CANCELLED\x10\x04\x12!\n\x1dRESOURCE_EVENT_TYPE_TIMED_OUT\x10\x05\x32\xe4\x03\n\nDispatcher\x12=\n\x08Register\x12\x16.WorkerRegisterRequest\x1a\x17.WorkerRegisterResponse\"\x00\x12\x33\n\x06Listen\x12\x14.WorkerListenRequest\x1a\x0f.AssignedAction\"\x00\x30\x01\x12R\n\x19SubscribeToWorkflowEvents\x12!.SubscribeToWorkflowEventsRequest\x1a\x0e.WorkflowEvent\"\x00\x30\x01\x12?\n\x13SendStepActionEvent\x12\x10.StepActionEvent\x1a\x14.ActionEventResponse\"\x00\x12G\n\x17SendGroupKeyActionEvent\x12\x14.GroupKeyActionEvent\x1a\x14.ActionEventResponse\"\x00\x12<\n\x10PutOverridesData\x12\x0e.OverridesData\x1a\x16.OverridesDataResponse\"\x00\x12\x46\n\x0bUnsubscribe\x12\x19.WorkerUnsubscribeRequest\x1a\x1a.WorkerUnsubscribeResponse\"\x00\x42GZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contractsb\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64ispatcher.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"p\n\x15WorkerRegisterRequest\x12\x12\n\nworkerName\x18\x01 \x01(\t\x12\x0f\n\x07\x61\x63tions\x18\x02 \x03(\t\x12\x10\n\x08services\x18\x03 \x03(\t\x12\x14\n\x07maxRuns\x18\x04 \x01(\x05H\x00\x88\x01\x01\x42\n\n\x08_maxRuns\"P\n\x16WorkerRegisterResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\x12\x12\n\nworkerName\x18\x03 \x01(\t\"\x84\x02\n\x0e\x41ssignedAction\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\r\n\x05jobId\x18\x04 \x01(\t\x12\x0f\n\x07jobName\x18\x05 \x01(\t\x12\x10\n\x08jobRunId\x18\x06 \x01(\t\x12\x0e\n\x06stepId\x18\x07 \x01(\t\x12\x11\n\tstepRunId\x18\x08 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\t \x01(\t\x12\x1f\n\nactionType\x18\n \x01(\x0e\x32\x0b.ActionType\x12\x15\n\ractionPayload\x18\x0b \x01(\t\x12\x10\n\x08stepName\x18\x0c \x01(\t\"\'\n\x13WorkerListenRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\",\n\x18WorkerUnsubscribeRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\"?\n\x19WorkerUnsubscribeResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"\xe1\x01\n\x13GroupKeyActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12+\n\teventType\x18\x06 \x01(\x0e\x32\x18.GroupKeyActionEventType\x12\x14\n\x0c\x65ventPayload\x18\x07 \x01(\t\"\xec\x01\n\x0fStepActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\r\n\x05jobId\x18\x02 \x01(\t\x12\x10\n\x08jobRunId\x18\x03 \x01(\t\x12\x0e\n\x06stepId\x18\x04 \x01(\t\x12\x11\n\tstepRunId\x18\x05 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x06 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\'\n\teventType\x18\x08 \x01(\x0e\x32\x14.StepActionEventType\x12\x14\n\x0c\x65ventPayload\x18\t \x01(\t\"9\n\x13\x41\x63tionEventResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"9\n SubscribeToWorkflowEventsRequest\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\"\xe0\x01\n\rWorkflowEvent\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\x12#\n\x0cresourceType\x18\x02 \x01(\x0e\x32\r.ResourceType\x12%\n\teventType\x18\x03 \x01(\x0e\x32\x12.ResourceEventType\x12\x12\n\nresourceId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x14\n\x0c\x65ventPayload\x18\x06 \x01(\t\x12\x0e\n\x06hangup\x18\x07 \x01(\x08\"W\n\rOverridesData\x12\x11\n\tstepRunId\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\t\x12\x16\n\x0e\x63\x61llerFilename\x18\x04 \x01(\t\"\x17\n\x15OverridesDataResponse*N\n\nActionType\x12\x12\n\x0eSTART_STEP_RUN\x10\x00\x12\x13\n\x0f\x43\x41NCEL_STEP_RUN\x10\x01\x12\x17\n\x13START_GET_GROUP_KEY\x10\x02*\xa2\x01\n\x17GroupKeyActionEventType\x12 \n\x1cGROUP_KEY_EVENT_TYPE_UNKNOWN\x10\x00\x12 \n\x1cGROUP_KEY_EVENT_TYPE_STARTED\x10\x01\x12\"\n\x1eGROUP_KEY_EVENT_TYPE_COMPLETED\x10\x02\x12\x1f\n\x1bGROUP_KEY_EVENT_TYPE_FAILED\x10\x03*\x8a\x01\n\x13StepActionEventType\x12\x1b\n\x17STEP_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1b\n\x17STEP_EVENT_TYPE_STARTED\x10\x01\x12\x1d\n\x19STEP_EVENT_TYPE_COMPLETED\x10\x02\x12\x1a\n\x16STEP_EVENT_TYPE_FAILED\x10\x03*e\n\x0cResourceType\x12\x19\n\x15RESOURCE_TYPE_UNKNOWN\x10\x00\x12\x1a\n\x16RESOURCE_TYPE_STEP_RUN\x10\x01\x12\x1e\n\x1aRESOURCE_TYPE_WORKFLOW_RUN\x10\x02*\xde\x01\n\x11ResourceEventType\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_STARTED\x10\x01\x12!\n\x1dRESOURCE_EVENT_TYPE_COMPLETED\x10\x02\x12\x1e\n\x1aRESOURCE_EVENT_TYPE_FAILED\x10\x03\x12!\n\x1dRESOURCE_EVENT_TYPE_CANCELLED\x10\x04\x12!\n\x1dRESOURCE_EVENT_TYPE_TIMED_OUT\x10\x05\x32\xe4\x03\n\nDispatcher\x12=\n\x08Register\x12\x16.WorkerRegisterRequest\x1a\x17.WorkerRegisterResponse\"\x00\x12\x33\n\x06Listen\x12\x14.WorkerListenRequest\x1a\x0f.AssignedAction\"\x00\x30\x01\x12R\n\x19SubscribeToWorkflowEvents\x12!.SubscribeToWorkflowEventsRequest\x1a\x0e.WorkflowEvent\"\x00\x30\x01\x12?\n\x13SendStepActionEvent\x12\x10.StepActionEvent\x1a\x14.ActionEventResponse\"\x00\x12G\n\x17SendGroupKeyActionEvent\x12\x14.GroupKeyActionEvent\x1a\x14.ActionEventResponse\"\x00\x12<\n\x10PutOverridesData\x12\x0e.OverridesData\x1a\x16.OverridesDataResponse\"\x00\x12\x46\n\x0bUnsubscribe\x12\x19.WorkerUnsubscribeRequest\x1a\x1a.WorkerUnsubscribeResponse\"\x00\x42GZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contractsb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -23,42 +23,42 @@ _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dispatcher_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
_globals['DESCRIPTOR']._options = None
_globals['DESCRIPTOR']._serialized_options = b'ZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts'
- _globals['_ACTIONTYPE']._serialized_start=1572
- _globals['_ACTIONTYPE']._serialized_end=1650
- _globals['_GROUPKEYACTIONEVENTTYPE']._serialized_start=1653
- _globals['_GROUPKEYACTIONEVENTTYPE']._serialized_end=1815
- _globals['_STEPACTIONEVENTTYPE']._serialized_start=1818
- _globals['_STEPACTIONEVENTTYPE']._serialized_end=1956
- _globals['_RESOURCETYPE']._serialized_start=1958
- _globals['_RESOURCETYPE']._serialized_end=2059
- _globals['_RESOURCEEVENTTYPE']._serialized_start=2062
- _globals['_RESOURCEEVENTTYPE']._serialized_end=2284
+ _globals['_ACTIONTYPE']._serialized_start=1590
+ _globals['_ACTIONTYPE']._serialized_end=1668
+ _globals['_GROUPKEYACTIONEVENTTYPE']._serialized_start=1671
+ _globals['_GROUPKEYACTIONEVENTTYPE']._serialized_end=1833
+ _globals['_STEPACTIONEVENTTYPE']._serialized_start=1836
+ _globals['_STEPACTIONEVENTTYPE']._serialized_end=1974
+ _globals['_RESOURCETYPE']._serialized_start=1976
+ _globals['_RESOURCETYPE']._serialized_end=2077
+ _globals['_RESOURCEEVENTTYPE']._serialized_start=2080
+ _globals['_RESOURCEEVENTTYPE']._serialized_end=2302
_globals['_WORKERREGISTERREQUEST']._serialized_start=53
_globals['_WORKERREGISTERREQUEST']._serialized_end=165
_globals['_WORKERREGISTERRESPONSE']._serialized_start=167
_globals['_WORKERREGISTERRESPONSE']._serialized_end=247
_globals['_ASSIGNEDACTION']._serialized_start=250
- _globals['_ASSIGNEDACTION']._serialized_end=492
- _globals['_WORKERLISTENREQUEST']._serialized_start=494
- _globals['_WORKERLISTENREQUEST']._serialized_end=533
- _globals['_WORKERUNSUBSCRIBEREQUEST']._serialized_start=535
- _globals['_WORKERUNSUBSCRIBEREQUEST']._serialized_end=579
- _globals['_WORKERUNSUBSCRIBERESPONSE']._serialized_start=581
- _globals['_WORKERUNSUBSCRIBERESPONSE']._serialized_end=644
- _globals['_GROUPKEYACTIONEVENT']._serialized_start=647
- _globals['_GROUPKEYACTIONEVENT']._serialized_end=872
- _globals['_STEPACTIONEVENT']._serialized_start=875
- _globals['_STEPACTIONEVENT']._serialized_end=1111
- _globals['_ACTIONEVENTRESPONSE']._serialized_start=1113
- _globals['_ACTIONEVENTRESPONSE']._serialized_end=1170
- _globals['_SUBSCRIBETOWORKFLOWEVENTSREQUEST']._serialized_start=1172
- _globals['_SUBSCRIBETOWORKFLOWEVENTSREQUEST']._serialized_end=1229
- _globals['_WORKFLOWEVENT']._serialized_start=1232
- _globals['_WORKFLOWEVENT']._serialized_end=1456
- _globals['_OVERRIDESDATA']._serialized_start=1458
- _globals['_OVERRIDESDATA']._serialized_end=1545
- _globals['_OVERRIDESDATARESPONSE']._serialized_start=1547
- _globals['_OVERRIDESDATARESPONSE']._serialized_end=1570
- _globals['_DISPATCHER']._serialized_start=2287
- _globals['_DISPATCHER']._serialized_end=2771
+ _globals['_ASSIGNEDACTION']._serialized_end=510
+ _globals['_WORKERLISTENREQUEST']._serialized_start=512
+ _globals['_WORKERLISTENREQUEST']._serialized_end=551
+ _globals['_WORKERUNSUBSCRIBEREQUEST']._serialized_start=553
+ _globals['_WORKERUNSUBSCRIBEREQUEST']._serialized_end=597
+ _globals['_WORKERUNSUBSCRIBERESPONSE']._serialized_start=599
+ _globals['_WORKERUNSUBSCRIBERESPONSE']._serialized_end=662
+ _globals['_GROUPKEYACTIONEVENT']._serialized_start=665
+ _globals['_GROUPKEYACTIONEVENT']._serialized_end=890
+ _globals['_STEPACTIONEVENT']._serialized_start=893
+ _globals['_STEPACTIONEVENT']._serialized_end=1129
+ _globals['_ACTIONEVENTRESPONSE']._serialized_start=1131
+ _globals['_ACTIONEVENTRESPONSE']._serialized_end=1188
+ _globals['_SUBSCRIBETOWORKFLOWEVENTSREQUEST']._serialized_start=1190
+ _globals['_SUBSCRIBETOWORKFLOWEVENTSREQUEST']._serialized_end=1247
+ _globals['_WORKFLOWEVENT']._serialized_start=1250
+ _globals['_WORKFLOWEVENT']._serialized_end=1474
+ _globals['_OVERRIDESDATA']._serialized_start=1476
+ _globals['_OVERRIDESDATA']._serialized_end=1563
+ _globals['_OVERRIDESDATARESPONSE']._serialized_start=1565
+ _globals['_OVERRIDESDATARESPONSE']._serialized_end=1588
+ _globals['_DISPATCHER']._serialized_start=2305
+ _globals['_DISPATCHER']._serialized_end=2789
# @@protoc_insertion_point(module_scope)
diff --git a/python-sdk/hatchet_sdk/dispatcher_pb2.pyi b/python-sdk/hatchet_sdk/dispatcher_pb2.pyi
index 3df6a60cd..d6806835a 100644
--- a/python-sdk/hatchet_sdk/dispatcher_pb2.pyi
+++ b/python-sdk/hatchet_sdk/dispatcher_pb2.pyi
@@ -85,7 +85,7 @@ class WorkerRegisterResponse(_message.Message):
def __init__(self, tenantId: _Optional[str] = ..., workerId: _Optional[str] = ..., workerName: _Optional[str] = ...) -> None: ...
class AssignedAction(_message.Message):
- __slots__ = ("tenantId", "workflowRunId", "getGroupKeyRunId", "jobId", "jobName", "jobRunId", "stepId", "stepRunId", "actionId", "actionType", "actionPayload")
+ __slots__ = ("tenantId", "workflowRunId", "getGroupKeyRunId", "jobId", "jobName", "jobRunId", "stepId", "stepRunId", "actionId", "actionType", "actionPayload", "stepName")
TENANTID_FIELD_NUMBER: _ClassVar[int]
WORKFLOWRUNID_FIELD_NUMBER: _ClassVar[int]
GETGROUPKEYRUNID_FIELD_NUMBER: _ClassVar[int]
@@ -97,6 +97,7 @@ class AssignedAction(_message.Message):
ACTIONID_FIELD_NUMBER: _ClassVar[int]
ACTIONTYPE_FIELD_NUMBER: _ClassVar[int]
ACTIONPAYLOAD_FIELD_NUMBER: _ClassVar[int]
+ STEPNAME_FIELD_NUMBER: _ClassVar[int]
tenantId: str
workflowRunId: str
getGroupKeyRunId: str
@@ -108,7 +109,8 @@ class AssignedAction(_message.Message):
actionId: str
actionType: ActionType
actionPayload: str
- def __init__(self, tenantId: _Optional[str] = ..., workflowRunId: _Optional[str] = ..., getGroupKeyRunId: _Optional[str] = ..., jobId: _Optional[str] = ..., jobName: _Optional[str] = ..., jobRunId: _Optional[str] = ..., stepId: _Optional[str] = ..., stepRunId: _Optional[str] = ..., actionId: _Optional[str] = ..., actionType: _Optional[_Union[ActionType, str]] = ..., actionPayload: _Optional[str] = ...) -> None: ...
+ stepName: str
+ def __init__(self, tenantId: _Optional[str] = ..., workflowRunId: _Optional[str] = ..., getGroupKeyRunId: _Optional[str] = ..., jobId: _Optional[str] = ..., jobName: _Optional[str] = ..., jobRunId: _Optional[str] = ..., stepId: _Optional[str] = ..., stepRunId: _Optional[str] = ..., actionId: _Optional[str] = ..., actionType: _Optional[_Union[ActionType, str]] = ..., actionPayload: _Optional[str] = ..., stepName: _Optional[str] = ...) -> None: ...
class WorkerListenRequest(_message.Message):
__slots__ = ("workerId",)
diff --git a/python-sdk/hatchet_sdk/events_pb2.py b/python-sdk/hatchet_sdk/events_pb2.py
index da31c3c55..f3557d668 100644
--- a/python-sdk/hatchet_sdk/events_pb2.py
+++ b/python-sdk/hatchet_sdk/events_pb2.py
@@ -15,7 +15,7 @@ _sym_db = _symbol_database.Default()
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x65vents.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"|\n\x05\x45vent\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x0f\n\x07\x65ventId\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\x12\x0f\n\x07payload\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"d\n\x10PushEventRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"/\n\x10ListEventRequest\x12\x0e\n\x06offset\x18\x01 \x01(\x05\x12\x0b\n\x03key\x18\x02 \x01(\t\"+\n\x11ListEventResponse\x12\x16\n\x06\x65vents\x18\x01 \x03(\x0b\x32\x06.Event\"%\n\x12ReplayEventRequest\x12\x0f\n\x07\x65ventId\x18\x01 \x01(\t2\x99\x01\n\rEventsService\x12#\n\x04Push\x12\x11.PushEventRequest\x1a\x06.Event\"\x00\x12/\n\x04List\x12\x11.ListEventRequest\x1a\x12.ListEventResponse\"\x00\x12\x32\n\x11ReplaySingleEvent\x12\x13.ReplayEventRequest\x1a\x06.Event\"\x00\x42GZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contractsb\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x65vents.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"|\n\x05\x45vent\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x0f\n\x07\x65ventId\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\x12\x0f\n\x07payload\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x92\x01\n\rPutLogRequest\x12\x11\n\tstepRunId\x18\x01 \x01(\t\x12-\n\tcreatedAt\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0f\n\x07message\x18\x03 \x01(\t\x12\x12\n\x05level\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x10\n\x08metadata\x18\x05 \x01(\tB\x08\n\x06_level\"\x10\n\x0ePutLogResponse\"d\n\x10PushEventRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"/\n\x10ListEventRequest\x12\x0e\n\x06offset\x18\x01 \x01(\x05\x12\x0b\n\x03key\x18\x02 \x01(\t\"+\n\x11ListEventResponse\x12\x16\n\x06\x65vents\x18\x01 \x03(\x0b\x32\x06.Event\"%\n\x12ReplayEventRequest\x12\x0f\n\x07\x65ventId\x18\x01 \x01(\t2\xc6\x01\n\rEventsService\x12#\n\x04Push\x12\x11.PushEventRequest\x1a\x06.Event\"\x00\x12/\n\x04List\x12\x11.ListEventRequest\x1a\x12.ListEventResponse\"\x00\x12\x32\n\x11ReplaySingleEvent\x12\x13.ReplayEventRequest\x1a\x06.Event\"\x00\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse\"\x00\x42GZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contractsb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -25,14 +25,18 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_globals['DESCRIPTOR']._serialized_options = b'ZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts'
_globals['_EVENT']._serialized_start=49
_globals['_EVENT']._serialized_end=173
- _globals['_PUSHEVENTREQUEST']._serialized_start=175
- _globals['_PUSHEVENTREQUEST']._serialized_end=275
- _globals['_LISTEVENTREQUEST']._serialized_start=277
- _globals['_LISTEVENTREQUEST']._serialized_end=324
- _globals['_LISTEVENTRESPONSE']._serialized_start=326
- _globals['_LISTEVENTRESPONSE']._serialized_end=369
- _globals['_REPLAYEVENTREQUEST']._serialized_start=371
- _globals['_REPLAYEVENTREQUEST']._serialized_end=408
- _globals['_EVENTSSERVICE']._serialized_start=411
- _globals['_EVENTSSERVICE']._serialized_end=564
+ _globals['_PUTLOGREQUEST']._serialized_start=176
+ _globals['_PUTLOGREQUEST']._serialized_end=322
+ _globals['_PUTLOGRESPONSE']._serialized_start=324
+ _globals['_PUTLOGRESPONSE']._serialized_end=340
+ _globals['_PUSHEVENTREQUEST']._serialized_start=342
+ _globals['_PUSHEVENTREQUEST']._serialized_end=442
+ _globals['_LISTEVENTREQUEST']._serialized_start=444
+ _globals['_LISTEVENTREQUEST']._serialized_end=491
+ _globals['_LISTEVENTRESPONSE']._serialized_start=493
+ _globals['_LISTEVENTRESPONSE']._serialized_end=536
+ _globals['_REPLAYEVENTREQUEST']._serialized_start=538
+ _globals['_REPLAYEVENTREQUEST']._serialized_end=575
+ _globals['_EVENTSSERVICE']._serialized_start=578
+ _globals['_EVENTSSERVICE']._serialized_end=776
# @@protoc_insertion_point(module_scope)
diff --git a/python-sdk/hatchet_sdk/events_pb2.pyi b/python-sdk/hatchet_sdk/events_pb2.pyi
index f118711d1..50cf3a516 100644
--- a/python-sdk/hatchet_sdk/events_pb2.pyi
+++ b/python-sdk/hatchet_sdk/events_pb2.pyi
@@ -20,6 +20,24 @@ class Event(_message.Message):
eventTimestamp: _timestamp_pb2.Timestamp
def __init__(self, tenantId: _Optional[str] = ..., eventId: _Optional[str] = ..., key: _Optional[str] = ..., payload: _Optional[str] = ..., eventTimestamp: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...) -> None: ...
+class PutLogRequest(_message.Message):
+ __slots__ = ("stepRunId", "createdAt", "message", "level", "metadata")
+ STEPRUNID_FIELD_NUMBER: _ClassVar[int]
+ CREATEDAT_FIELD_NUMBER: _ClassVar[int]
+ MESSAGE_FIELD_NUMBER: _ClassVar[int]
+ LEVEL_FIELD_NUMBER: _ClassVar[int]
+ METADATA_FIELD_NUMBER: _ClassVar[int]
+ stepRunId: str
+ createdAt: _timestamp_pb2.Timestamp
+ message: str
+ level: str
+ metadata: str
+ def __init__(self, stepRunId: _Optional[str] = ..., createdAt: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., message: _Optional[str] = ..., level: _Optional[str] = ..., metadata: _Optional[str] = ...) -> None: ...
+
+class PutLogResponse(_message.Message):
+ __slots__ = ()
+ def __init__(self) -> None: ...
+
class PushEventRequest(_message.Message):
__slots__ = ("key", "payload", "eventTimestamp")
KEY_FIELD_NUMBER: _ClassVar[int]
diff --git a/python-sdk/hatchet_sdk/events_pb2_grpc.py b/python-sdk/hatchet_sdk/events_pb2_grpc.py
index 95c13d699..e67bc9d00 100644
--- a/python-sdk/hatchet_sdk/events_pb2_grpc.py
+++ b/python-sdk/hatchet_sdk/events_pb2_grpc.py
@@ -29,6 +29,11 @@ class EventsServiceStub(object):
request_serializer=events__pb2.ReplayEventRequest.SerializeToString,
response_deserializer=events__pb2.Event.FromString,
)
+ self.PutLog = channel.unary_unary(
+ '/EventsService/PutLog',
+ request_serializer=events__pb2.PutLogRequest.SerializeToString,
+ response_deserializer=events__pb2.PutLogResponse.FromString,
+ )
class EventsServiceServicer(object):
@@ -52,6 +57,12 @@ class EventsServiceServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
+ def PutLog(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
def add_EventsServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -70,6 +81,11 @@ def add_EventsServiceServicer_to_server(servicer, server):
request_deserializer=events__pb2.ReplayEventRequest.FromString,
response_serializer=events__pb2.Event.SerializeToString,
),
+ 'PutLog': grpc.unary_unary_rpc_method_handler(
+ servicer.PutLog,
+ request_deserializer=events__pb2.PutLogRequest.FromString,
+ response_serializer=events__pb2.PutLogResponse.SerializeToString,
+ ),
}
generic_handler = grpc.method_handlers_generic_handler(
'EventsService', rpc_method_handlers)
@@ -130,3 +146,20 @@ class EventsService(object):
events__pb2.Event.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+ @staticmethod
+ def PutLog(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target, '/EventsService/PutLog',
+ events__pb2.PutLogRequest.SerializeToString,
+ events__pb2.PutLogResponse.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
diff --git a/python-sdk/hatchet_sdk/worker.py b/python-sdk/hatchet_sdk/worker.py
index 9b4ffca5d..0dba28dfb 100644
--- a/python-sdk/hatchet_sdk/worker.py
+++ b/python-sdk/hatchet_sdk/worker.py
@@ -36,7 +36,7 @@ class Worker:
def handle_start_step_run(self, action : Action):
action_name = action.action_id
- context = Context(action, self.client.dispatcher)
+ context = Context(action, self.client.dispatcher, self.client.event)
self.contexts[action.step_run_id] = context
@@ -110,7 +110,7 @@ class Worker:
def handle_start_group_key_run(self, action : Action):
action_name = action.action_id
- context = Context(action, self.client.dispatcher)
+ context = Context(action, self.client.dispatcher, self.client.event)
self.contexts[action.get_group_key_run_id] = context
diff --git a/python-sdk/pyproject.toml b/python-sdk/pyproject.toml
index 90ff03654..9e659ca6d 100644
--- a/python-sdk/pyproject.toml
+++ b/python-sdk/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
-version = "0.13.0"
+version = "0.14.0"
description = ""
authors = ["Alexander Belanger "]
readme = "README.md"
diff --git a/typescript-sdk/examples/logger.ts b/typescript-sdk/examples/logger.ts
new file mode 100644
index 000000000..e3fdcd2d3
--- /dev/null
+++ b/typescript-sdk/examples/logger.ts
@@ -0,0 +1,42 @@
+import Hatchet from '../src/sdk';
+import { Workflow } from '../src/workflow';
+
+const hatchet = Hatchet.init({
+ log_level: 'OFF',
+});
+
+const sleep = (ms: number) =>
+ new Promise((resolve) => {
+ setTimeout(resolve, ms);
+ });
+
+const workflow: Workflow = {
+ id: 'logger-example',
+ description: 'test',
+ on: {
+ event: 'user:create',
+ },
+ steps: [
+ {
+ name: 'logger-step1',
+ run: async (ctx) => {
+ // log in a for loop
+ // eslint-disable-next-line no-plusplus
+ for (let i = 0; i < 10; i++) {
+ ctx.log(`log message ${i}`);
+ await sleep(200);
+ }
+
+ return { step1: 'completed step run' };
+ },
+ },
+ ],
+};
+
+async function main() {
+ const worker = await hatchet.worker('logger-worker', 1);
+ await worker.registerWorkflow(workflow);
+ worker.start();
+}
+
+main();
diff --git a/typescript-sdk/package.json b/typescript-sdk/package.json
index 95b3ef0cc..62063eb6f 100644
--- a/typescript-sdk/package.json
+++ b/typescript-sdk/package.json
@@ -1,6 +1,6 @@
{
"name": "@hatchet-dev/typescript-sdk",
- "version": "0.1.23",
+ "version": "0.1.24",
"description": "Background task orchestration & visibility for developers",
"main": "dist/index.js",
"types": "dist/index.d.ts",
@@ -38,6 +38,7 @@
"worker:playground": "npm run exec -- ./examples/playground.ts",
"worker:retries": "npm run exec -- ./examples/retries-worker.ts",
"worker:multi-workflow": "npm run exec -- ./examples/multi-workflow.ts",
+ "worker:logger": "npm run exec -- ./examples/logger.ts",
"api": "npm run exec -- ./examples/api.ts",
"prepublish": "cp package.json dist/package.json;",
"publish:ci": "rm -rf ./dist && npm run tsc:build && npm run prepublish && cd dist && npm publish --access public --no-git-checks",
diff --git a/typescript-sdk/src/clients/event/event-client.ts b/typescript-sdk/src/clients/event/event-client.ts
index eff91ab35..e7df983a1 100644
--- a/typescript-sdk/src/clients/event/event-client.ts
+++ b/typescript-sdk/src/clients/event/event-client.ts
@@ -7,6 +7,15 @@ import {
import HatchetError from '@util/errors/hatchet-error';
import { ClientConfig } from '@clients/hatchet-client/client-config';
import { Logger } from '@hatchet/util/logger';
+import { retrier } from '@hatchet/util/retrier';
+
+// eslint-disable-next-line no-shadow
+export enum LogLevel {
+ INFO = 'INFO',
+ WARN = 'WARN',
+ ERROR = 'ERROR',
+ DEBUG = 'DEBUG',
+}
export class EventClient {
config: ClientConfig;
@@ -35,4 +44,24 @@ export class EventClient {
throw new HatchetError(e.message);
}
}
+
+ putLog(stepRunId: string, log: string, level?: LogLevel) {
+ const createdAt = new Date();
+
+ try {
+ retrier(
+ async () =>
+ this.client.putLog({
+ stepRunId,
+ createdAt,
+ message: log,
+ level: level || LogLevel.INFO,
+ }),
+ this.logger
+ );
+ } catch (e: any) {
+ // log a warning, but this is not a fatal error
+ this.logger.warn(`Could not put log: ${e.message}`);
+ }
+ }
}
diff --git a/typescript-sdk/src/clients/rest/generated/Api.ts b/typescript-sdk/src/clients/rest/generated/Api.ts
index 594b5e126..be431d34c 100644
--- a/typescript-sdk/src/clients/rest/generated/Api.ts
+++ b/typescript-sdk/src/clients/rest/generated/Api.ts
@@ -34,6 +34,11 @@ import {
ListGithubBranchesResponse,
ListGithubReposResponse,
ListPullRequestsResponse,
+ LogLineLevelField,
+ LogLineList,
+ LogLineOrderByDirection,
+ LogLineOrderByField,
+ LogLineSearch,
PullRequestState,
RejectInviteRequest,
ReplayEventRequest,
@@ -784,6 +789,47 @@ export class Api extends HttpClient
+ this.request({
+ path: `/api/v1/step-runs/${stepRun}/logs`,
+ method: 'GET',
+ query: query,
+ secure: true,
+ format: 'json',
+ ...params,
+ });
/**
* @description Get the diff for a step run between the most recent run and the first run.
*
diff --git a/typescript-sdk/src/clients/rest/generated/data-contracts.ts b/typescript-sdk/src/clients/rest/generated/data-contracts.ts
index bd698bb09..4b4edf194 100644
--- a/typescript-sdk/src/clients/rest/generated/data-contracts.ts
+++ b/typescript-sdk/src/clients/rest/generated/data-contracts.ts
@@ -684,3 +684,40 @@ export enum PullRequestState {
Open = 'open',
Closed = 'closed',
}
+
+export interface LogLine {
+ /**
+ * The creation date of the log line.
+ * @format date-time
+ */
+ createdAt: string;
+ /** The log message. */
+ message: string;
+ /** The log metadata. */
+ metadata: object;
+}
+
+export enum LogLineLevel {
+ DEBUG = 'DEBUG',
+ INFO = 'INFO',
+ WARN = 'WARN',
+ ERROR = 'ERROR',
+}
+
+export interface LogLineList {
+ pagination?: PaginationResponse;
+ rows?: LogLine[];
+}
+
+export enum LogLineOrderByField {
+ CreatedAt = 'createdAt',
+}
+
+export enum LogLineOrderByDirection {
+ Asc = 'asc',
+ Desc = 'desc',
+}
+
+export type LogLineSearch = string;
+
+export type LogLineLevelField = LogLineLevel[];
diff --git a/typescript-sdk/src/clients/worker/worker.ts b/typescript-sdk/src/clients/worker/worker.ts
index b5af71d78..dd55a1a73 100644
--- a/typescript-sdk/src/clients/worker/worker.ts
+++ b/typescript-sdk/src/clients/worker/worker.ts
@@ -122,7 +122,7 @@ export class Worker {
const { actionId } = action;
try {
- const context = new Context(action, this.client.dispatcher);
+ const context = new Context(action, this.client.dispatcher, this.client.event);
this.contexts[action.stepRunId] = context;
const step = this.action_registry[actionId];
@@ -188,7 +188,7 @@ export class Worker {
const { actionId } = action;
try {
- const context = new Context(action, this.client.dispatcher);
+ const context = new Context(action, this.client.dispatcher, this.client.event);
const key = action.getGroupKeyRunId;
diff --git a/typescript-sdk/src/protoc/events/events.ts b/typescript-sdk/src/protoc/events/events.ts
index fed7e6757..0eac3fa01 100644
--- a/typescript-sdk/src/protoc/events/events.ts
+++ b/typescript-sdk/src/protoc/events/events.ts
@@ -18,6 +18,26 @@ export interface Event {
eventTimestamp: Date | undefined;
}
+export interface PutLogRequest {
+ /** the step run id for the request */
+ stepRunId: string;
+ /** when the log line was created */
+ createdAt:
+ | Date
+ | undefined;
+ /** the log line message */
+ message: string;
+ /** the log line level */
+ level?:
+ | string
+ | undefined;
+ /** associated log line metadata */
+ metadata: string;
+}
+
+export interface PutLogResponse {
+}
+
export interface PushEventRequest {
/** the key for the event */
key: string;
@@ -163,6 +183,168 @@ export const Event = {
},
};
+function createBasePutLogRequest(): PutLogRequest {
+ return { stepRunId: "", createdAt: undefined, message: "", level: undefined, metadata: "" };
+}
+
+export const PutLogRequest = {
+ encode(message: PutLogRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
+ if (message.stepRunId !== "") {
+ writer.uint32(10).string(message.stepRunId);
+ }
+ if (message.createdAt !== undefined) {
+ Timestamp.encode(toTimestamp(message.createdAt), writer.uint32(18).fork()).ldelim();
+ }
+ if (message.message !== "") {
+ writer.uint32(26).string(message.message);
+ }
+ if (message.level !== undefined) {
+ writer.uint32(34).string(message.level);
+ }
+ if (message.metadata !== "") {
+ writer.uint32(42).string(message.metadata);
+ }
+ return writer;
+ },
+
+ decode(input: _m0.Reader | Uint8Array, length?: number): PutLogRequest {
+ const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
+ let end = length === undefined ? reader.len : reader.pos + length;
+ const message = createBasePutLogRequest();
+ while (reader.pos < end) {
+ const tag = reader.uint32();
+ switch (tag >>> 3) {
+ case 1:
+ if (tag !== 10) {
+ break;
+ }
+
+ message.stepRunId = reader.string();
+ continue;
+ case 2:
+ if (tag !== 18) {
+ break;
+ }
+
+ message.createdAt = fromTimestamp(Timestamp.decode(reader, reader.uint32()));
+ continue;
+ case 3:
+ if (tag !== 26) {
+ break;
+ }
+
+ message.message = reader.string();
+ continue;
+ case 4:
+ if (tag !== 34) {
+ break;
+ }
+
+ message.level = reader.string();
+ continue;
+ case 5:
+ if (tag !== 42) {
+ break;
+ }
+
+ message.metadata = reader.string();
+ continue;
+ }
+ if ((tag & 7) === 4 || tag === 0) {
+ break;
+ }
+ reader.skipType(tag & 7);
+ }
+ return message;
+ },
+
+ fromJSON(object: any): PutLogRequest {
+ return {
+ stepRunId: isSet(object.stepRunId) ? globalThis.String(object.stepRunId) : "",
+ createdAt: isSet(object.createdAt) ? fromJsonTimestamp(object.createdAt) : undefined,
+ message: isSet(object.message) ? globalThis.String(object.message) : "",
+ level: isSet(object.level) ? globalThis.String(object.level) : undefined,
+ metadata: isSet(object.metadata) ? globalThis.String(object.metadata) : "",
+ };
+ },
+
+ toJSON(message: PutLogRequest): unknown {
+ const obj: any = {};
+ if (message.stepRunId !== "") {
+ obj.stepRunId = message.stepRunId;
+ }
+ if (message.createdAt !== undefined) {
+ obj.createdAt = message.createdAt.toISOString();
+ }
+ if (message.message !== "") {
+ obj.message = message.message;
+ }
+ if (message.level !== undefined) {
+ obj.level = message.level;
+ }
+ if (message.metadata !== "") {
+ obj.metadata = message.metadata;
+ }
+ return obj;
+ },
+
+ create(base?: DeepPartial): PutLogRequest {
+ return PutLogRequest.fromPartial(base ?? {});
+ },
+ fromPartial(object: DeepPartial): PutLogRequest {
+ const message = createBasePutLogRequest();
+ message.stepRunId = object.stepRunId ?? "";
+ message.createdAt = object.createdAt ?? undefined;
+ message.message = object.message ?? "";
+ message.level = object.level ?? undefined;
+ message.metadata = object.metadata ?? "";
+ return message;
+ },
+};
+
+function createBasePutLogResponse(): PutLogResponse {
+ return {};
+}
+
+export const PutLogResponse = {
+ encode(_: PutLogResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
+ return writer;
+ },
+
+ decode(input: _m0.Reader | Uint8Array, length?: number): PutLogResponse {
+ const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
+ let end = length === undefined ? reader.len : reader.pos + length;
+ const message = createBasePutLogResponse();
+ while (reader.pos < end) {
+ const tag = reader.uint32();
+ switch (tag >>> 3) {
+ }
+ if ((tag & 7) === 4 || tag === 0) {
+ break;
+ }
+ reader.skipType(tag & 7);
+ }
+ return message;
+ },
+
+ fromJSON(_: any): PutLogResponse {
+ return {};
+ },
+
+ toJSON(_: PutLogResponse): unknown {
+ const obj: any = {};
+ return obj;
+ },
+
+ create(base?: DeepPartial): PutLogResponse {
+ return PutLogResponse.fromPartial(base ?? {});
+ },
+ fromPartial(_: DeepPartial): PutLogResponse {
+ const message = createBasePutLogResponse();
+ return message;
+ },
+};
+
function createBasePushEventRequest(): PushEventRequest {
return { key: "", payload: "", eventTimestamp: undefined };
}
@@ -469,6 +651,14 @@ export const EventsServiceDefinition = {
responseStream: false,
options: {},
},
+ putLog: {
+ name: "PutLog",
+ requestType: PutLogRequest,
+ requestStream: false,
+ responseType: PutLogResponse,
+ responseStream: false,
+ options: {},
+ },
},
} as const;
@@ -476,12 +666,14 @@ export interface EventsServiceImplementation {
push(request: PushEventRequest, context: CallContext & CallContextExt): Promise>;
list(request: ListEventRequest, context: CallContext & CallContextExt): Promise>;
replaySingleEvent(request: ReplayEventRequest, context: CallContext & CallContextExt): Promise>;
+ putLog(request: PutLogRequest, context: CallContext & CallContextExt): Promise>;
}
export interface EventsServiceClient {
push(request: DeepPartial, options?: CallOptions & CallOptionsExt): Promise;
list(request: DeepPartial, options?: CallOptions & CallOptionsExt): Promise;
replaySingleEvent(request: DeepPartial, options?: CallOptions & CallOptionsExt): Promise;
+ putLog(request: DeepPartial, options?: CallOptions & CallOptionsExt): Promise;
}
type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined;
diff --git a/typescript-sdk/src/step.ts b/typescript-sdk/src/step.ts
index 387085046..9969e3b6b 100644
--- a/typescript-sdk/src/step.ts
+++ b/typescript-sdk/src/step.ts
@@ -3,6 +3,8 @@ import * as z from 'zod';
import { HatchetTimeoutSchema } from './workflow';
import { Action } from './clients/dispatcher/action-listener';
import { DispatcherClient } from './clients/dispatcher/dispatcher-client';
+import { EventClient, LogLevel } from './clients/event/event-client';
+import { Logger } from './util/logger';
export const CreateStepSchema = z.object({
name: z.string(),
@@ -28,14 +30,18 @@ export class Context {
controller = new AbortController();
action: Action;
client: DispatcherClient;
+ eventClient: EventClient;
overridesData: Record = {};
+ logger: Logger;
- constructor(action: Action, client: DispatcherClient) {
+ constructor(action: Action, client: DispatcherClient, eventClient: EventClient) {
try {
const data = JSON.parse(JSON.parse(action.actionPayload));
this.data = data;
this.action = action;
this.client = client;
+ this.eventClient = eventClient;
+ this.logger = new Logger(`Context Logger`, client.config.log_level);
// if this is a getGroupKeyRunId, the data is the workflow input
if (action.getGroupKeyRunId !== '') {
@@ -93,6 +99,18 @@ export class Context {
return defaultValue;
}
+
+ log(message: string, level?: LogLevel): void {
+ const { stepRunId } = this.action;
+
+ if (!stepRunId) {
+ // log a warning
+ this.logger.warn('cannot log from context without stepRunId');
+ return;
+ }
+
+ this.eventClient.putLog(stepRunId, message, level);
+ }
}
export type StepRunFunction = (ctx: Context) => Promise | NextStep | void;