mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-23 18:49:47 -05:00
fix: event keys (#951)
* feat: insert unique event keys * fix: list query * feat: bulk * chore: gen
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -59,6 +59,25 @@ SELECT
|
||||
FROM
|
||||
events;
|
||||
|
||||
-- name: CreateEventKeys :exec
|
||||
INSERT INTO "EventKey" (
|
||||
"key",
|
||||
"tenantId"
|
||||
)
|
||||
SELECT
|
||||
unnest(@keys::text[]) AS "key",
|
||||
unnest(@tenantIds::uuid[]) AS "tenantId"
|
||||
ON CONFLICT ("key", "tenantId") DO NOTHING;
|
||||
|
||||
-- name: ListEventKeys :many
|
||||
SELECT
|
||||
"key"
|
||||
FROM
|
||||
"EventKey"
|
||||
WHERE
|
||||
"tenantId" = @tenantId::uuid
|
||||
ORDER BY "key" ASC;
|
||||
|
||||
-- name: CreateEvent :one
|
||||
INSERT INTO "Event" (
|
||||
"id",
|
||||
|
||||
@@ -202,6 +202,27 @@ func (q *Queries) CreateEvent(ctx context.Context, db DBTX, arg CreateEventParam
|
||||
return &i, err
|
||||
}
|
||||
|
||||
const createEventKeys = `-- name: CreateEventKeys :exec
|
||||
INSERT INTO "EventKey" (
|
||||
"key",
|
||||
"tenantId"
|
||||
)
|
||||
SELECT
|
||||
unnest($1::text[]) AS "key",
|
||||
unnest($2::uuid[]) AS "tenantId"
|
||||
ON CONFLICT ("key", "tenantId") DO NOTHING
|
||||
`
|
||||
|
||||
type CreateEventKeysParams struct {
|
||||
Keys []string `json:"keys"`
|
||||
Tenantids []pgtype.UUID `json:"tenantids"`
|
||||
}
|
||||
|
||||
func (q *Queries) CreateEventKeys(ctx context.Context, db DBTX, arg CreateEventKeysParams) error {
|
||||
_, err := db.Exec(ctx, createEventKeys, arg.Keys, arg.Tenantids)
|
||||
return err
|
||||
}
|
||||
|
||||
type CreateEventsParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Key string `json:"key"`
|
||||
@@ -317,6 +338,36 @@ func (q *Queries) GetInsertedEvents(ctx context.Context, db DBTX, ids []pgtype.U
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listEventKeys = `-- name: ListEventKeys :many
|
||||
SELECT
|
||||
"key"
|
||||
FROM
|
||||
"EventKey"
|
||||
WHERE
|
||||
"tenantId" = $1::uuid
|
||||
ORDER BY "key" ASC
|
||||
`
|
||||
|
||||
func (q *Queries) ListEventKeys(ctx context.Context, db DBTX, tenantid pgtype.UUID) ([]string, error) {
|
||||
rows, err := db.Query(ctx, listEventKeys, tenantid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []string
|
||||
for rows.Next() {
|
||||
var key string
|
||||
if err := rows.Scan(&key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, key)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listEvents = `-- name: ListEvents :many
|
||||
WITH filtered_events AS (
|
||||
SELECT
|
||||
|
||||
@@ -996,6 +996,11 @@ type Event struct {
|
||||
InsertOrder pgtype.Int4 `json:"insertOrder"`
|
||||
}
|
||||
|
||||
type EventKey struct {
|
||||
Key string `json:"key"`
|
||||
TenantId pgtype.UUID `json:"tenantId"`
|
||||
}
|
||||
|
||||
type GetGroupKeyRun struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
CreatedAt pgtype.Timestamp `json:"createdAt"`
|
||||
|
||||
@@ -125,6 +125,14 @@ CREATE TABLE "Event" (
|
||||
CONSTRAINT "Event_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "EventKey" (
|
||||
"key" TEXT NOT NULL,
|
||||
"tenantId" UUID NOT NULL,
|
||||
|
||||
CONSTRAINT "EventKey_pkey" PRIMARY KEY ("key")
|
||||
);
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "GetGroupKeyRun" (
|
||||
"id" UUID NOT NULL,
|
||||
@@ -968,6 +976,9 @@ CREATE INDEX "Event_tenantId_createdAt_idx" ON "Event"("tenantId" ASC, "createdA
|
||||
-- CreateIndex
|
||||
CREATE INDEX "Event_tenantId_idx" ON "Event"("tenantId" ASC);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "EventKey_key_tenantId_key" ON "EventKey"("key" ASC, "tenantId" ASC);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "GetGroupKeyRun_createdAt_idx" ON "GetGroupKeyRun"("createdAt" ASC);
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
@@ -167,30 +168,15 @@ func (r *eventAPIRepository) ListEvents(ctx context.Context, tenantId string, op
|
||||
}
|
||||
|
||||
func (r *eventAPIRepository) ListEventKeys(tenantId string) ([]string, error) {
|
||||
var rows []struct {
|
||||
Key string `json:"key"`
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := r.client.Prisma.QueryRaw(
|
||||
`
|
||||
SELECT DISTINCT ON("Event"."key") "Event"."key"
|
||||
FROM "Event"
|
||||
WHERE
|
||||
"Event"."tenantId"::text = $1
|
||||
`,
|
||||
tenantId,
|
||||
).Exec(context.Background(), &rows)
|
||||
keys, err := r.queries.ListEventKeys(ctx, r.pool, sqlchelpers.UUIDFromStr(tenantId))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
keys := make([]string, len(rows))
|
||||
|
||||
for i, row := range rows {
|
||||
keys[i] = row.Key
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
@@ -224,13 +210,14 @@ func (r *eventAPIRepository) ListEventsById(tenantId string, ids []string) ([]db
|
||||
}
|
||||
|
||||
type eventEngineRepository struct {
|
||||
pool *pgxpool.Pool
|
||||
v validator.Validator
|
||||
queries *dbsqlc.Queries
|
||||
l *zerolog.Logger
|
||||
m *metered.Metered
|
||||
bulkCreateBuffer *TenantBufferManager[*repository.CreateEventOpts, *dbsqlc.Event]
|
||||
callbacks []repository.Callback[*dbsqlc.Event]
|
||||
pool *pgxpool.Pool
|
||||
v validator.Validator
|
||||
queries *dbsqlc.Queries
|
||||
l *zerolog.Logger
|
||||
m *metered.Metered
|
||||
bulkCreateBuffer *TenantBufferManager[*repository.CreateEventOpts, *dbsqlc.Event]
|
||||
callbacks []repository.Callback[*dbsqlc.Event]
|
||||
createEventKeyCache *lru.Cache[string, bool]
|
||||
}
|
||||
|
||||
func (r *eventEngineRepository) cleanup() error {
|
||||
@@ -240,12 +227,15 @@ func (r *eventEngineRepository) cleanup() error {
|
||||
func NewEventEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, m *metered.Metered) (repository.EventEngineRepository, func() error, error) {
|
||||
queries := dbsqlc.New()
|
||||
|
||||
createEventKeyCache, _ := lru.New[string, bool](2000) // nolint: errcheck - this only returns an error if the size is less than 0
|
||||
|
||||
e := eventEngineRepository{
|
||||
pool: pool,
|
||||
v: v,
|
||||
queries: queries,
|
||||
l: l,
|
||||
m: m,
|
||||
pool: pool,
|
||||
v: v,
|
||||
queries: queries,
|
||||
l: l,
|
||||
m: m,
|
||||
createEventKeyCache: createEventKeyCache,
|
||||
}
|
||||
err := e.startBufferLoop()
|
||||
|
||||
@@ -264,6 +254,44 @@ func (r *eventEngineRepository) GetEventForEngine(ctx context.Context, tenantId,
|
||||
return r.queries.GetEventForEngine(ctx, r.pool, sqlchelpers.UUIDFromStr(id))
|
||||
}
|
||||
|
||||
func (r *eventEngineRepository) createEventKeys(ctx context.Context, tx pgx.Tx, keys map[string]struct {
|
||||
key string
|
||||
tenantId string
|
||||
}) error {
|
||||
|
||||
eventKeys := make([]string, 0)
|
||||
eventKeysTenantIds := make([]pgtype.UUID, 0)
|
||||
|
||||
for _, eventKey := range keys {
|
||||
cacheKey := fmt.Sprintf("%s-%s", eventKey.tenantId, eventKey.key)
|
||||
|
||||
// if the key is already in the cache, skip it
|
||||
if _, ok := r.createEventKeyCache.Get(cacheKey); ok {
|
||||
continue
|
||||
}
|
||||
|
||||
r.l.Debug().Msgf("creating event key %s for tenant %s", eventKey.key, eventKey.tenantId)
|
||||
eventKeys = append(eventKeys, eventKey.key)
|
||||
eventKeysTenantIds = append(eventKeysTenantIds, sqlchelpers.UUIDFromStr(eventKey.tenantId))
|
||||
}
|
||||
|
||||
err := r.queries.CreateEventKeys(ctx, tx, dbsqlc.CreateEventKeysParams{
|
||||
Tenantids: eventKeysTenantIds,
|
||||
Keys: eventKeys,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// add to cache
|
||||
for i := range eventKeys {
|
||||
r.createEventKeyCache.Add(fmt.Sprintf("%s-%s", sqlchelpers.UUIDToStr(eventKeysTenantIds[i]), eventKeys[i]), true)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *eventEngineRepository) CreateEvent(ctx context.Context, opts *repository.CreateEventOpts) (*dbsqlc.Event, error) {
|
||||
return metered.MakeMetered(ctx, r.m, dbsqlc.LimitResourceEVENT, opts.TenantId, 1, func() (*string, *dbsqlc.Event, error) {
|
||||
|
||||
@@ -335,6 +363,11 @@ func (r *eventEngineRepository) BulkCreateEvent(ctx context.Context, opts *repos
|
||||
params := make([]dbsqlc.CreateEventsParams, len(opts.Events))
|
||||
ids := make([]pgtype.UUID, len(opts.Events))
|
||||
|
||||
uniqueEventKeys := make(map[string]struct {
|
||||
key string
|
||||
tenantId string
|
||||
})
|
||||
|
||||
for i, event := range opts.Events {
|
||||
eventId := uuid.New().String()
|
||||
|
||||
@@ -350,6 +383,14 @@ func (r *eventEngineRepository) BulkCreateEvent(ctx context.Context, opts *repos
|
||||
params[i].ReplayedFromId = sqlchelpers.UUIDFromStr(*event.ReplayedEvent)
|
||||
}
|
||||
|
||||
uniqueEventKeys[fmt.Sprintf("%s-%s", event.TenantId, event.Key)] = struct {
|
||||
key string
|
||||
tenantId string
|
||||
}{
|
||||
key: event.Key,
|
||||
tenantId: event.TenantId,
|
||||
}
|
||||
|
||||
ids[i] = sqlchelpers.UUIDFromStr(eventId)
|
||||
}
|
||||
|
||||
@@ -362,6 +403,12 @@ func (r *eventEngineRepository) BulkCreateEvent(ctx context.Context, opts *repos
|
||||
|
||||
defer deferRollback(ctx, r.l, tx.Rollback)
|
||||
|
||||
err = r.createEventKeys(ctx, tx, uniqueEventKeys)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("could not create event keys: %w", err)
|
||||
}
|
||||
// create events
|
||||
insertCount, err := r.queries.CreateEvents(
|
||||
ctx,
|
||||
tx,
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
-- CreateTable
|
||||
CREATE TABLE "EventKey" (
|
||||
"key" TEXT NOT NULL,
|
||||
"tenantId" UUID NOT NULL,
|
||||
|
||||
CONSTRAINT "EventKey_pkey" PRIMARY KEY ("key")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "EventKey_key_tenantId_key" ON "EventKey"("key", "tenantId");
|
||||
@@ -397,6 +397,13 @@ model APIToken {
|
||||
webhookWorkers WebhookWorker[]
|
||||
}
|
||||
|
||||
model EventKey {
|
||||
key String @id
|
||||
tenantId String @db.Uuid
|
||||
|
||||
@@unique([key, tenantId])
|
||||
}
|
||||
|
||||
// Event represents an event in the database.
|
||||
model Event {
|
||||
// base fields
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
-- Create "EventKey" table
|
||||
CREATE TABLE "EventKey" ("key" text NOT NULL, "tenantId" uuid NOT NULL, PRIMARY KEY ("key"));
|
||||
-- Create index "EventKey_key_tenantId_key" to table: "EventKey"
|
||||
CREATE UNIQUE INDEX "EventKey_key_tenantId_key" ON "EventKey" ("key", "tenantId");
|
||||
@@ -1,4 +1,4 @@
|
||||
h1:MdUIwxqQIsS9pvLCRU1mB2nai1wTMJ7Vjl3FRjCr6uw=
|
||||
h1:4CduYOxaYNUq+L7PFTvWFiMncxTz5b43B6rgB/ryJUA=
|
||||
20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k=
|
||||
20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo=
|
||||
20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs=
|
||||
@@ -64,3 +64,4 @@ h1:MdUIwxqQIsS9pvLCRU1mB2nai1wTMJ7Vjl3FRjCr6uw=
|
||||
20240930202706_v0.48.1.sql h1:CcgVHTRA4c9u5rQvSFFy/R9AdiSdTapqfpyUs0KGAf0=
|
||||
20240930233257_v0.49.0.sql h1:B+JMbME62DxaCnesydvQXPg+ZNB0kB/V8gSclh1VdY4=
|
||||
20241004122206_v0.49.1.sql h1:Fas5TXOp4a2g+y5sGBJG9wTaVL/WCaVJ9+ZlASN9Md4=
|
||||
20241008124038_v0.49.2.sql h1:YT40sN8Wtqh21emrzDMZIcvcOkipw+4fdwIoBpF+Dek=
|
||||
|
||||
@@ -125,6 +125,14 @@ CREATE TABLE "Event" (
|
||||
CONSTRAINT "Event_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "EventKey" (
|
||||
"key" TEXT NOT NULL,
|
||||
"tenantId" UUID NOT NULL,
|
||||
|
||||
CONSTRAINT "EventKey_pkey" PRIMARY KEY ("key")
|
||||
);
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "GetGroupKeyRun" (
|
||||
"id" UUID NOT NULL,
|
||||
@@ -968,6 +976,9 @@ CREATE INDEX "Event_tenantId_createdAt_idx" ON "Event"("tenantId" ASC, "createdA
|
||||
-- CreateIndex
|
||||
CREATE INDEX "Event_tenantId_idx" ON "Event"("tenantId" ASC);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "EventKey_key_tenantId_key" ON "EventKey"("key" ASC, "tenantId" ASC);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "GetGroupKeyRun_createdAt_idx" ON "GetGroupKeyRun"("createdAt" ASC);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user