diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index 3517c5bcf..2c833498e 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -1090,8 +1090,7 @@ func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId s payloadsToWrite = append(payloadsToWrite, PutOLAPPayloadOpts{ StoreOLAPPayloadOpts: &StoreOLAPPayloadOpts{ - // todo: wire this up properly - ExternalId: sqlchelpers.UUIDFromStr(uuid.NewString()), + ExternalId: event.ExternalID, InsertedAt: event.TaskInsertedAt, Payload: event.Output, }, diff --git a/pkg/repository/v1/sqlcv1/copyfrom.go b/pkg/repository/v1/sqlcv1/copyfrom.go index 0981ada20..4c6cef7c3 100644 --- a/pkg/repository/v1/sqlcv1/copyfrom.go +++ b/pkg/repository/v1/sqlcv1/copyfrom.go @@ -197,6 +197,7 @@ func (r iteratorForCreateTaskEventsOLAP) Values() ([]interface{}, error) { r.rows[0].WorkerID, r.rows[0].AdditionalEventData, r.rows[0].AdditionalEventMessage, + r.rows[0].ExternalID, }, nil } @@ -205,7 +206,7 @@ func (r iteratorForCreateTaskEventsOLAP) Err() error { } func (q *Queries) CreateTaskEventsOLAP(ctx context.Context, db DBTX, arg []CreateTaskEventsOLAPParams) (int64, error) { - return db.CopyFrom(ctx, []string{"v1_task_events_olap"}, []string{"tenant_id", "task_id", "task_inserted_at", "event_type", "workflow_id", "event_timestamp", "readable_status", "retry_count", "error_message", "output", "worker_id", "additional__event_data", "additional__event_message"}, &iteratorForCreateTaskEventsOLAP{rows: arg}) + return db.CopyFrom(ctx, []string{"v1_task_events_olap"}, []string{"tenant_id", "task_id", "task_inserted_at", "event_type", "workflow_id", "event_timestamp", "readable_status", "retry_count", "error_message", "output", "worker_id", "additional__event_data", "additional__event_message", "external_id"}, &iteratorForCreateTaskEventsOLAP{rows: arg}) } // iteratorForCreateTaskEventsOLAPTmp implements pgx.CopyFromSource. diff --git a/pkg/repository/v1/sqlcv1/olap.sql b/pkg/repository/v1/sqlcv1/olap.sql index f48f6bcae..e09ff21fa 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql +++ b/pkg/repository/v1/sqlcv1/olap.sql @@ -217,7 +217,8 @@ INSERT INTO v1_task_events_olap ( output, worker_id, additional__event_data, - additional__event_message + additional__event_message, + external_id ) VALUES ( $1, $2, @@ -231,7 +232,8 @@ INSERT INTO v1_task_events_olap ( $10, $11, $12, - $13 + $13, + $14 ); -- name: ReadTaskByExternalID :one diff --git a/pkg/repository/v1/sqlcv1/olap.sql.go b/pkg/repository/v1/sqlcv1/olap.sql.go index 59290ccac..3229cba0e 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql.go +++ b/pkg/repository/v1/sqlcv1/olap.sql.go @@ -232,6 +232,7 @@ type CreateTaskEventsOLAPParams struct { WorkerID pgtype.UUID `json:"worker_id"` AdditionalEventData pgtype.Text `json:"additional__event_data"` AdditionalEventMessage pgtype.Text `json:"additional__event_message"` + ExternalID pgtype.UUID `json:"external_id"` } type CreateTaskEventsOLAPTmpParams struct {