feat: db tables for user events (#2862)

* feat: db tables for user events

* move event payloads to payloads table, fix env var loading

* fix: address pr review comments

* missed save
This commit is contained in:
abelanger5
2026-01-27 16:47:05 -05:00
committed by GitHub
parent d411a06d67
commit 03339c4a8b
13 changed files with 586 additions and 21 deletions
@@ -0,0 +1,82 @@
-- +goose Up
-- +goose StatementBegin
ALTER TYPE v1_payload_type ADD VALUE IF NOT EXISTS 'USER_EVENT_INPUT';
CREATE TABLE IF NOT EXISTS v1_event (
id bigint GENERATED ALWAYS AS IDENTITY,
seen_at TIMESTAMPTZ NOT NULL,
tenant_id UUID NOT NULL,
external_id UUID NOT NULL DEFAULT gen_random_uuid(),
key TEXT NOT NULL,
additional_metadata JSONB,
scope TEXT,
triggering_webhook_name TEXT,
PRIMARY KEY (tenant_id, seen_at, id)
) PARTITION BY RANGE(seen_at);
CREATE INDEX IF NOT EXISTS v1_event_key_idx ON v1_event (tenant_id, key);
CREATE TABLE IF NOT EXISTS v1_event_lookup_table (
tenant_id UUID NOT NULL,
external_id UUID NOT NULL,
event_id BIGINT NOT NULL,
event_seen_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (external_id, event_seen_at)
) PARTITION BY RANGE(event_seen_at);
CREATE OR REPLACE FUNCTION v1_event_lookup_table_insert_function()
RETURNS TRIGGER AS
$$
BEGIN
INSERT INTO v1_event_lookup_table (
tenant_id,
external_id,
event_id,
event_seen_at
)
SELECT
tenant_id,
external_id,
id,
seen_at
FROM new_rows
ON CONFLICT (external_id, event_seen_at) DO NOTHING;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS v1_event_lookup_table_insert_trigger ON v1_event;
CREATE TRIGGER v1_event_lookup_table_insert_trigger
AFTER INSERT ON v1_event
REFERENCING NEW TABLE AS new_rows
FOR EACH STATEMENT
EXECUTE FUNCTION v1_event_lookup_table_insert_function();
CREATE TABLE IF NOT EXISTS v1_event_to_run (
run_external_id UUID NOT NULL,
event_id BIGINT NOT NULL,
event_seen_at TIMESTAMPTZ NOT NULL,
filter_id UUID,
PRIMARY KEY (event_id, event_seen_at, run_external_id)
) PARTITION BY RANGE(event_seen_at);
SELECT create_v1_range_partition('v1_event', DATE 'today');
SELECT create_v1_weekly_range_partition('v1_event_lookup_table', DATE 'today');
SELECT create_v1_range_partition('v1_event_to_run', DATE 'today');
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
DROP TABLE IF EXISTS v1_event_to_run;
DROP TABLE IF EXISTS v1_event_lookup_table;
DROP TRIGGER IF EXISTS v1_event_lookup_table_insert_trigger ON v1_event;
DROP FUNCTION IF EXISTS v1_event_lookup_table_insert_function();
DROP INDEX IF EXISTS v1_event_key_idx;
DROP TABLE IF EXISTS v1_event;
-- +goose StatementEnd
@@ -591,7 +591,7 @@ func (tc *OLAPControllerImpl) handleCreateEventTriggers(ctx context.Context, ten
}
}
bulkCreateEventParams := sqlcv1.BulkCreateEventsParams{
bulkCreateEventParams := sqlcv1.BulkCreateEventsOLAPParams{
Tenantids: tenantIds,
Externalids: externalIds,
Seenats: seenAts,
+1
View File
@@ -300,6 +300,7 @@ func (c *ConfigLoader) InitDataLayer() (res *database.Layer, err error) {
scf.Runtime.Limits,
scf.Runtime.EnforceLimits,
scf.Runtime.EnforceLimitsFunc,
scf.Runtime.EnableDurableUserEventLog,
)
if readReplicaPool != nil {
+7
View File
@@ -284,6 +284,10 @@ type ConfigFileRuntime struct {
// TaskOperationLimits controls the limits for various task operations
TaskOperationLimits TaskOperationLimitsConfigFile `mapstructure:"taskOperationLimits" json:"taskOperationLimits,omitempty"`
// EnableDurableUserEventLog controls whether we enable the durable event log for user events. By default, we don't persist user events
// to the core database, we only use them to trigger workflows. Enabling this will persist them to the core database.
EnableDurableUserEventLog bool `mapstructure:"enableDurableUserEventLog" json:"enableDurableUserEventLog,omitempty" default:"false"`
}
type InternalClientTLSConfigFile struct {
@@ -769,6 +773,9 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("runtime.updateHashFactor", "SERVER_UPDATE_HASH_FACTOR")
_ = v.BindEnv("runtime.updateConcurrentFactor", "SERVER_UPDATE_CONCURRENT_FACTOR")
// enable durable user event log
_ = v.BindEnv("runtime.enableDurableUserEventLog", "SERVER_ENABLE_DURABLE_USER_EVENT_LOG")
// internal client options
_ = v.BindEnv("internalClient.base.tlsStrategy", "SERVER_INTERNAL_CLIENT_BASE_STRATEGY")
_ = v.BindEnv("internalClient.inheritBase", "SERVER_INTERNAL_CLIENT_BASE_INHERIT_BASE")
+5 -3
View File
@@ -246,7 +246,9 @@ type OLAPRepository interface {
ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
// Events queries
BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsOLAPParams, triggers []EventTriggersFromExternalId) error
ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*EventWithPayload, *int64, error)
GetEvent(ctx context.Context, externalId string) (*sqlcv1.V1EventsOlap, error)
GetEventWithPayload(ctx context.Context, externalId, tenantId string) (*EventWithPayload, error)
@@ -2056,7 +2058,7 @@ type EventTriggersFromExternalId struct {
FilterId pgtype.UUID `json:"filter_id"`
}
func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error {
func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsOLAPParams, triggers []EventTriggersFromExternalId) error {
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l)
if err != nil {
@@ -2083,7 +2085,7 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev
eventsToInsert.Payloads = payloads
}
insertedEvents, err := r.queries.BulkCreateEvents(ctx, tx, eventsToInsert)
insertedEvents, err := r.queries.BulkCreateEventsOLAP(ctx, tx, eventsToInsert)
if err != nil {
return fmt.Errorf("error creating events: %v", err)
+2 -1
View File
@@ -100,6 +100,7 @@ func NewRepository(
tenantLimitConfig limits.LimitConfigFile,
enforceLimits bool,
enforceLimitsFunc func(ctx context.Context, tenantId string) (bool, error),
enableDurableUserEventLog bool,
) (Repository, func() error) {
v := validator.NewDefaultValidator()
@@ -113,7 +114,7 @@ func NewRepository(
health: newHealthRepository(shared),
messageQueue: mq,
rateLimit: newRateLimitRepository(shared),
triggers: newTriggerRepository(shared),
triggers: newTriggerRepository(shared, enableDurableUserEventLog),
tasks: newTaskRepository(shared, taskRetentionPeriod, maxInternalRetryCount, taskLimits.TimeoutLimit, taskLimits.ReassignLimit, taskLimits.RetryQueueLimit, taskLimits.DurableSleepLimit),
scheduler: newSchedulerRepository(shared),
matches: newMatchRepository(shared),
+30 -4
View File
@@ -1482,10 +1482,11 @@ func (ns NullV1PayloadLocationOlap) Value() (driver.Value, error) {
type V1PayloadType string
const (
V1PayloadTypeTASKINPUT V1PayloadType = "TASK_INPUT"
V1PayloadTypeDAGINPUT V1PayloadType = "DAG_INPUT"
V1PayloadTypeTASKOUTPUT V1PayloadType = "TASK_OUTPUT"
V1PayloadTypeTASKEVENTDATA V1PayloadType = "TASK_EVENT_DATA"
V1PayloadTypeTASKINPUT V1PayloadType = "TASK_INPUT"
V1PayloadTypeDAGINPUT V1PayloadType = "DAG_INPUT"
V1PayloadTypeTASKOUTPUT V1PayloadType = "TASK_OUTPUT"
V1PayloadTypeTASKEVENTDATA V1PayloadType = "TASK_EVENT_DATA"
V1PayloadTypeUSEREVENTINPUT V1PayloadType = "USER_EVENT_INPUT"
)
func (e *V1PayloadType) Scan(src interface{}) error {
@@ -3019,6 +3020,24 @@ type V1DurableSleep struct {
SleepDuration string `json:"sleep_duration"`
}
type V1Event struct {
ID int64 `json:"id"`
SeenAt pgtype.Timestamptz `json:"seen_at"`
TenantID pgtype.UUID `json:"tenant_id"`
ExternalID pgtype.UUID `json:"external_id"`
Key string `json:"key"`
AdditionalMetadata []byte `json:"additional_metadata"`
Scope pgtype.Text `json:"scope"`
TriggeringWebhookName pgtype.Text `json:"triggering_webhook_name"`
}
type V1EventLookupTable struct {
TenantID pgtype.UUID `json:"tenant_id"`
ExternalID pgtype.UUID `json:"external_id"`
EventID int64 `json:"event_id"`
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
}
type V1EventLookupTableOlap struct {
TenantID pgtype.UUID `json:"tenant_id"`
ExternalID pgtype.UUID `json:"external_id"`
@@ -3026,6 +3045,13 @@ type V1EventLookupTableOlap struct {
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
}
type V1EventToRun struct {
RunExternalID pgtype.UUID `json:"run_external_id"`
EventID int64 `json:"event_id"`
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
FilterID pgtype.UUID `json:"filter_id"`
}
type V1EventToRunOlap struct {
RunID int64 `json:"run_id"`
RunInsertedAt pgtype.Timestamptz `json:"run_inserted_at"`
+4 -4
View File
@@ -374,7 +374,7 @@ func (q *Queries) ListTasksOlap(ctx context.Context, db DBTX, arg ListTasksOlapP
return items, nil
}
const bulkCreateEvents = `-- name: BulkCreateEvents :many
const bulkCreateEventsOLAP = `-- name: BulkCreateEventsOLAP :many
WITH to_insert AS (
SELECT
UNNEST($1::UUID[]) AS tenant_id,
@@ -404,7 +404,7 @@ FROM to_insert
RETURNING tenant_id, id, external_id, seen_at, key, payload, additional_metadata, scope, triggering_webhook_name
`
type BulkCreateEventsParams struct {
type BulkCreateEventsOLAPParams struct {
Tenantids []pgtype.UUID `json:"tenantids"`
Externalids []pgtype.UUID `json:"externalids"`
Seenats []pgtype.Timestamptz `json:"seenats"`
@@ -415,8 +415,8 @@ type BulkCreateEventsParams struct {
TriggeringWebhookNames []pgtype.Text `json:"triggeringWebhookName"`
}
func (q *Queries) BulkCreateEvents(ctx context.Context, db DBTX, arg BulkCreateEventsParams) ([]*V1EventsOlap, error) {
rows, err := db.Query(ctx, bulkCreateEvents,
func (q *Queries) BulkCreateEventsOLAP(ctx context.Context, db DBTX, arg BulkCreateEventsOLAPParams) ([]*V1EventsOlap, error) {
rows, err := db.Query(ctx, bulkCreateEventsOLAP,
arg.Tenantids,
arg.Externalids,
arg.Seenats,
+74
View File
@@ -839,3 +839,77 @@ func (q *Queries) ReleaseTasks(ctx context.Context, db DBTX, arg ReleaseTasksPar
return items, nil
}
const bulkCreateEvents = `-- name: BulkCreateEvents :many
WITH to_insert AS (
SELECT
UNNEST($1::UUID[]) AS tenant_id,
UNNEST($2::UUID[]) AS external_id,
UNNEST($3::TIMESTAMPTZ[]) AS seen_at,
UNNEST($4::TEXT[]) AS key,
UNNEST($5::JSONB[]) AS additional_metadata,
-- Scopes are nullable
UNNEST($6::TEXT[]) AS scope,
-- Webhook names are nullable
UNNEST($7::TEXT[]) AS triggering_webhook_name
)
INSERT INTO v1_event (
tenant_id,
external_id,
seen_at,
key,
additional_metadata,
scope,
triggering_webhook_name
)
SELECT tenant_id, external_id, seen_at, key, additional_metadata, scope, triggering_webhook_name
FROM to_insert
RETURNING tenant_id, id, external_id, seen_at, key, additional_metadata, scope, triggering_webhook_name
`
type BulkCreateEventsParams struct {
Tenantids []pgtype.UUID `json:"tenantids"`
Externalids []pgtype.UUID `json:"externalids"`
Seenats []pgtype.Timestamptz `json:"seenats"`
Keys []string `json:"keys"`
Additionalmetadatas [][]byte `json:"additionalmetadatas"`
Scopes []pgtype.Text `json:"scopes"`
TriggeringWebhookNames []pgtype.Text `json:"triggeringWebhookName"`
}
func (q *Queries) BulkCreateEvents(ctx context.Context, db DBTX, arg BulkCreateEventsParams) ([]*V1Event, error) {
rows, err := db.Query(ctx, bulkCreateEvents,
arg.Tenantids,
arg.Externalids,
arg.Seenats,
arg.Keys,
arg.Additionalmetadatas,
arg.Scopes,
arg.TriggeringWebhookNames,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*V1Event
for rows.Next() {
var i V1Event
if err := rows.Scan(
&i.TenantID,
&i.ID,
&i.ExternalID,
&i.SeenAt,
&i.Key,
&i.AdditionalMetadata,
&i.Scope,
&i.TriggeringWebhookName,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
+54 -1
View File
@@ -4,7 +4,10 @@ SELECT
create_v1_range_partition('v1_dag', @date::date),
create_v1_range_partition('v1_task_event', @date::date),
create_v1_range_partition('v1_log_line', @date::date),
create_v1_range_partition('v1_payload', @date::date);
create_v1_range_partition('v1_payload', @date::date),
create_v1_range_partition('v1_event', @date::date),
create_v1_weekly_range_partition('v1_event_lookup_table', @date::date),
create_v1_range_partition('v1_event_to_run', @date::date);
-- name: EnsureTablePartitionsExist :one
WITH tomorrow_date AS (
@@ -18,6 +21,10 @@ WITH tomorrow_date AS (
SELECT 'v1_task_event_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
UNION ALL
SELECT 'v1_log_line_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
UNION ALL
SELECT 'v1_payload_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
UNION ALL
SELECT 'v1_event_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
), partition_check AS (
SELECT
COUNT(*) AS total_tables,
@@ -43,6 +50,12 @@ WITH task_partitions AS (
SELECT 'v1_log_line' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_log_line', @date::date) AS p
), payload_partitions AS (
SELECT 'v1_payload' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_payload', @date::date) AS p
), event_partitions AS (
SELECT 'v1_event' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_event', @date::date) AS p
), event_lookup_table_partitions AS (
SELECT 'v1_event_lookup_table' AS parent_table, p::text as partition_name FROM get_v1_weekly_partitions_before_date('v1_event_lookup_table', @date::date) AS p
), event_to_run_partitions AS (
SELECT 'v1_event_to_run' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_event_to_run', @date::date) AS p
)
SELECT
@@ -77,6 +90,27 @@ SELECT
*
FROM
payload_partitions
UNION ALL
SELECT
*
FROM
event_partitions
UNION ALL
SELECT
*
FROM
event_lookup_table_partitions
UNION ALL
SELECT
*
FROM
event_to_run_partitions
;
-- name: DefaultTaskActivityGauge :one
@@ -1171,3 +1205,22 @@ WHERE
FROM inputs
)
;
-- name: CreateEventToRuns :many
WITH input AS (
SELECT
UNNEST(@runExternalIds::uuid[]) AS run_external_id,
UNNEST(@eventIds::bigint[]) AS event_id,
UNNEST(@eventSeenAts::timestamptz[]) AS event_seen_at,
UNNEST(@filterIds::uuid[]) AS filter_id
)
INSERT INTO v1_event_to_run (run_external_id, event_id, event_seen_at, filter_id)
SELECT
run_external_id,
event_id,
event_seen_at,
filter_id
FROM
input
RETURNING
*;
+92 -1
View File
@@ -166,13 +166,73 @@ func (q *Queries) CleanupWorkflowConcurrencySlotsAfterInsert(ctx context.Context
return err
}
const createEventToRuns = `-- name: CreateEventToRuns :many
WITH input AS (
SELECT
UNNEST($1::uuid[]) AS run_external_id,
UNNEST($2::bigint[]) AS event_id,
UNNEST($3::timestamptz[]) AS event_seen_at,
UNNEST($4::uuid[]) AS filter_id
)
INSERT INTO v1_event_to_run (run_external_id, event_id, event_seen_at, filter_id)
SELECT
run_external_id,
event_id,
event_seen_at,
filter_id
FROM
input
RETURNING
run_external_id, event_id, event_seen_at, filter_id
`
type CreateEventToRunsParams struct {
Runexternalids []pgtype.UUID `json:"runexternalids"`
Eventids []int64 `json:"eventids"`
Eventseenats []pgtype.Timestamptz `json:"eventseenats"`
Filterids []pgtype.UUID `json:"filterids"`
}
func (q *Queries) CreateEventToRuns(ctx context.Context, db DBTX, arg CreateEventToRunsParams) ([]*V1EventToRun, error) {
rows, err := db.Query(ctx, createEventToRuns,
arg.Runexternalids,
arg.Eventids,
arg.Eventseenats,
arg.Filterids,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*V1EventToRun
for rows.Next() {
var i V1EventToRun
if err := rows.Scan(
&i.RunExternalID,
&i.EventID,
&i.EventSeenAt,
&i.FilterID,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const createPartitions = `-- name: CreatePartitions :exec
SELECT
create_v1_range_partition('v1_task', $1::date),
create_v1_range_partition('v1_dag', $1::date),
create_v1_range_partition('v1_task_event', $1::date),
create_v1_range_partition('v1_log_line', $1::date),
create_v1_range_partition('v1_payload', $1::date)
create_v1_range_partition('v1_payload', $1::date),
create_v1_range_partition('v1_event', $1::date),
create_v1_weekly_range_partition('v1_event_lookup_table', $1::date),
create_v1_range_partition('v1_event_to_run', $1::date)
`
func (q *Queries) CreatePartitions(ctx context.Context, db DBTX, date pgtype.Date) error {
@@ -264,6 +324,10 @@ WITH tomorrow_date AS (
SELECT 'v1_task_event_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
UNION ALL
SELECT 'v1_log_line_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
UNION ALL
SELECT 'v1_payload_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
UNION ALL
SELECT 'v1_event_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
), partition_check AS (
SELECT
COUNT(*) AS total_tables,
@@ -1112,6 +1176,12 @@ WITH task_partitions AS (
SELECT 'v1_log_line' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_log_line', $1::date) AS p
), payload_partitions AS (
SELECT 'v1_payload' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_payload', $1::date) AS p
), event_partitions AS (
SELECT 'v1_event' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_event', $1::date) AS p
), event_lookup_table_partitions AS (
SELECT 'v1_event_lookup_table' AS parent_table, p::text as partition_name FROM get_v1_weekly_partitions_before_date('v1_event_lookup_table', $1::date) AS p
), event_to_run_partitions AS (
SELECT 'v1_event_to_run' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_event_to_run', $1::date) AS p
)
SELECT
@@ -1146,6 +1216,27 @@ SELECT
parent_table, partition_name
FROM
payload_partitions
UNION ALL
SELECT
parent_table, partition_name
FROM
event_partitions
UNION ALL
SELECT
parent_table, partition_name
FROM
event_lookup_table_partitions
UNION ALL
SELECT
parent_table, partition_name
FROM
event_to_run_partitions
`
type ListPartitionsBeforeDateRow struct {
+170 -5
View File
@@ -103,11 +103,14 @@ type TriggerRepository interface {
type TriggerRepositoryImpl struct {
*sharedRepository
enableDurableUserEventLog bool
}
func newTriggerRepository(s *sharedRepository) TriggerRepository {
func newTriggerRepository(s *sharedRepository, enableDurableUserEventLog bool) TriggerRepository {
return &TriggerRepositoryImpl{
sharedRepository: s,
sharedRepository: s,
enableDurableUserEventLog: enableDurableUserEventLog,
}
}
@@ -216,6 +219,11 @@ type EventExternalIdFilterId struct {
FilterId *string
}
type EventIds struct {
SeenAt pgtype.Timestamptz
Id int64
}
type WorkflowAndScope struct {
WorkflowId pgtype.UUID
Scope string
@@ -230,6 +238,19 @@ func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId
eventKeysToOpts := make(map[string][]EventTriggerOpts)
eventExternalIdToRuns := make(map[string][]*Run)
var createCoreEventOpts *createCoreUserEventOpts
createCoreEventsTenantIds := []pgtype.UUID{}
createCoreEventsExternalIds := []pgtype.UUID{}
createCoreEventsSeenAts := []pgtype.Timestamptz{}
createCoreEventsKeys := []string{}
createCoreEventsAdditionalMetadatas := [][]byte{}
createCoreEventsScopes := []pgtype.Text{}
createCoreEventsTriggeringWebhookNames := []pgtype.Text{}
eventExternalIdsToPayloads := make(map[string][]byte)
seenAt := time.Now().UTC() // TODO: propagate this to caller, and figure out how we should be setting this
eventKeys := make([]string, 0, len(opts))
uniqueEventKeys := make(map[string]struct{})
@@ -237,6 +258,26 @@ func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId
for _, opt := range opts {
eventExternalIdToRuns[opt.ExternalId] = []*Run{}
if r.enableDurableUserEventLog {
createCoreEventsTenantIds = append(createCoreEventsTenantIds, sqlchelpers.UUIDFromStr(tenantId))
createCoreEventsExternalIds = append(createCoreEventsExternalIds, sqlchelpers.UUIDFromStr(opt.ExternalId))
createCoreEventsSeenAts = append(createCoreEventsSeenAts, sqlchelpers.TimestamptzFromTime(seenAt))
createCoreEventsKeys = append(createCoreEventsKeys, opt.Key)
eventExternalIdsToPayloads[opt.ExternalId] = opt.Data
createCoreEventsAdditionalMetadatas = append(createCoreEventsAdditionalMetadatas, opt.AdditionalMetadata)
if opt.Scope != nil {
createCoreEventsScopes = append(createCoreEventsScopes, pgtype.Text{String: *opt.Scope, Valid: true})
} else {
createCoreEventsScopes = append(createCoreEventsScopes, pgtype.Text{Valid: false})
}
if opt.TriggeringWebhookName != nil {
createCoreEventsTriggeringWebhookNames = append(createCoreEventsTriggeringWebhookNames, pgtype.Text{String: *opt.TriggeringWebhookName, Valid: true})
} else {
createCoreEventsTriggeringWebhookNames = append(createCoreEventsTriggeringWebhookNames, pgtype.Text{Valid: false})
}
}
eventKeysToOpts[opt.Key] = append(eventKeysToOpts[opt.Key], opt)
if _, ok := uniqueEventKeys[opt.Key]; ok {
@@ -395,7 +436,23 @@ func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId
}
}
tasks, dags, err := r.triggerWorkflows(ctx, tenantId, triggerOpts)
if r.enableDurableUserEventLog {
createCoreEventOpts = &createCoreUserEventOpts{
params: sqlcv1.BulkCreateEventsParams{
Tenantids: createCoreEventsTenantIds,
Externalids: createCoreEventsExternalIds,
Seenats: createCoreEventsSeenAts,
Keys: createCoreEventsKeys,
Additionalmetadatas: createCoreEventsAdditionalMetadatas,
Scopes: createCoreEventsScopes,
TriggeringWebhookNames: createCoreEventsTriggeringWebhookNames,
},
externalIdToEventIdAndFilterId: externalIdToEventIdAndFilterId,
externalIdsToPayloads: eventExternalIdsToPayloads,
}
}
tasks, dags, err := r.triggerWorkflows(ctx, tenantId, triggerOpts, createCoreEventOpts)
if err != nil {
return nil, fmt.Errorf("failed to trigger workflows: %w", err)
@@ -532,7 +589,7 @@ func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, te
}
}
return r.triggerWorkflows(ctx, tenantId, triggerOpts)
return r.triggerWorkflows(ctx, tenantId, triggerOpts, nil)
}
type ErrNamesNotFound struct {
@@ -679,7 +736,13 @@ type triggerTuple struct {
childKey *string
}
func (r *TriggerRepositoryImpl) triggerWorkflows(ctx context.Context, tenantId string, tuples []triggerTuple) ([]*V1TaskWithPayload, []*DAGWithData, error) {
type createCoreUserEventOpts struct {
externalIdToEventIdAndFilterId map[string]EventExternalIdFilterId
externalIdsToPayloads map[string][]byte
params sqlcv1.BulkCreateEventsParams
}
func (r *TriggerRepositoryImpl) triggerWorkflows(ctx context.Context, tenantId string, tuples []triggerTuple, coreEvents *createCoreUserEventOpts) ([]*V1TaskWithPayload, []*DAGWithData, error) {
// get unique workflow version ids
uniqueWorkflowVersionIds := make(map[string]struct{})
@@ -1268,6 +1331,108 @@ func (r *TriggerRepositoryImpl) triggerWorkflows(ctx context.Context, tenantId s
})
}
if coreEvents != nil {
eventExternalIdsToIds := make(map[string]EventIds)
createdEvents, err := r.queries.BulkCreateEvents(ctx, tx, coreEvents.params)
if err != nil {
return nil, nil, fmt.Errorf("failed to create core events: %w", err)
}
for _, createdEvent := range createdEvents {
eventExternalIdsToIds[createdEvent.ExternalID.String()] = EventIds{
Id: createdEvent.ID,
SeenAt: createdEvent.SeenAt,
}
}
eventToRunExternalIds := []pgtype.UUID{}
eventToRunEventIds := []int64{}
eventToRunEventSeenAts := []pgtype.Timestamptz{}
eventToRunRunFilterIds := []pgtype.UUID{}
for _, task := range tasks {
externalId := task.ExternalID
eventIdAndFilterId, ok := coreEvents.externalIdToEventIdAndFilterId[externalId.String()]
if !ok {
continue
}
eventIds, ok := eventExternalIdsToIds[eventIdAndFilterId.ExternalId]
if !ok {
continue
}
eventToRunExternalIds = append(eventToRunExternalIds, task.ExternalID)
eventToRunEventIds = append(eventToRunEventIds, eventIds.Id)
eventToRunEventSeenAts = append(eventToRunEventSeenAts, eventIds.SeenAt)
if eventIdAndFilterId.FilterId != nil {
eventToRunRunFilterIds = append(eventToRunRunFilterIds, sqlchelpers.UUIDFromStr(*eventIdAndFilterId.FilterId))
} else {
eventToRunRunFilterIds = append(eventToRunRunFilterIds, pgtype.UUID{Valid: false})
}
}
for _, dag := range dags {
externalId := dag.ExternalID
eventIdAndFilterId, ok := coreEvents.externalIdToEventIdAndFilterId[externalId.String()]
if !ok {
continue
}
eventIds, ok := eventExternalIdsToIds[eventIdAndFilterId.ExternalId]
if !ok {
continue
}
eventToRunExternalIds = append(eventToRunExternalIds, dag.ExternalID)
eventToRunEventIds = append(eventToRunEventIds, eventIds.Id)
eventToRunEventSeenAts = append(eventToRunEventSeenAts, eventIds.SeenAt)
if eventIdAndFilterId.FilterId != nil {
eventToRunRunFilterIds = append(eventToRunRunFilterIds, sqlchelpers.UUIDFromStr(*eventIdAndFilterId.FilterId))
} else {
eventToRunRunFilterIds = append(eventToRunRunFilterIds, pgtype.UUID{Valid: false})
}
}
_, err = r.queries.CreateEventToRuns(ctx, tx, sqlcv1.CreateEventToRunsParams{
Runexternalids: eventToRunExternalIds,
Eventids: eventToRunEventIds,
Eventseenats: eventToRunEventSeenAts,
Filterids: eventToRunRunFilterIds,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to create event to runs: %w", err)
}
for _, e := range createdEvents {
payload, ok := coreEvents.externalIdsToPayloads[e.ExternalID.String()]
if !ok {
continue
}
storePayloadOpts = append(storePayloadOpts, StorePayloadOpts{
Id: e.ID,
InsertedAt: e.SeenAt,
ExternalId: e.ExternalID,
Type: sqlcv1.V1PayloadTypeUSEREVENTINPUT,
Payload: payload,
TenantId: tenantId,
})
}
}
err = r.payloadStore.Store(ctx, tx, storePayloadOpts...)
if err != nil {
+64 -1
View File
@@ -1640,7 +1640,7 @@ CREATE TABLE v1_durable_sleep (
PRIMARY KEY (tenant_id, sleep_until, id)
);
CREATE TYPE v1_payload_type AS ENUM ('TASK_INPUT', 'DAG_INPUT', 'TASK_OUTPUT', 'TASK_EVENT_DATA');
CREATE TYPE v1_payload_type AS ENUM ('TASK_INPUT', 'DAG_INPUT', 'TASK_OUTPUT', 'TASK_EVENT_DATA', 'USER_EVENT_INPUT');
-- IMPORTANT: Keep these values in sync with `v1_payload_type_olap` in the OLAP db
CREATE TYPE v1_payload_location AS ENUM ('INLINE', 'EXTERNAL');
@@ -2146,3 +2146,66 @@ CREATE TABLE v1_operation_interval_settings (
interval_nanoseconds BIGINT NOT NULL,
PRIMARY KEY (tenant_id, operation_id)
);
-- Events tables
CREATE TABLE v1_event (
id bigint GENERATED ALWAYS AS IDENTITY,
seen_at TIMESTAMPTZ NOT NULL,
tenant_id UUID NOT NULL,
external_id UUID NOT NULL DEFAULT gen_random_uuid(),
key TEXT NOT NULL,
additional_metadata JSONB,
scope TEXT,
triggering_webhook_name TEXT,
PRIMARY KEY (tenant_id, seen_at, id)
) PARTITION BY RANGE(seen_at);
CREATE INDEX v1_event_key_idx ON v1_event (tenant_id, key);
CREATE TABLE v1_event_lookup_table (
tenant_id UUID NOT NULL,
external_id UUID NOT NULL,
event_id BIGINT NOT NULL,
event_seen_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (external_id, event_seen_at)
) PARTITION BY RANGE(event_seen_at);
CREATE OR REPLACE FUNCTION v1_event_lookup_table_insert_function()
RETURNS TRIGGER AS
$$
BEGIN
INSERT INTO v1_event_lookup_table (
tenant_id,
external_id,
event_id,
event_seen_at
)
SELECT
tenant_id,
external_id,
id,
seen_at
FROM new_rows
ON CONFLICT (external_id, event_seen_at) DO NOTHING;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER v1_event_lookup_table_insert_trigger
AFTER INSERT ON v1_event
REFERENCING NEW TABLE AS new_rows
FOR EACH STATEMENT
EXECUTE FUNCTION v1_event_lookup_table_insert_function();
CREATE TABLE v1_event_to_run (
run_external_id UUID NOT NULL,
event_id BIGINT NOT NULL,
event_seen_at TIMESTAMPTZ NOT NULL,
filter_id UUID,
PRIMARY KEY (event_id, event_seen_at, run_external_id)
) PARTITION BY RANGE(event_seen_at);