diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index 1e05c8ad4..03faa9ff3 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log" + "math/rand" "sort" "sync" "time" @@ -1459,9 +1460,17 @@ func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId s } if event.ExternalID.Valid { + // generating a dummy id + inserted at to use for creating the external keys for the task events + // we do this since we don't have the id + inserted at of the events themselves on the opts, and we don't + // actually need those for anything once the keys are created. + dummyId := rand.Int63() + // randomly jitter the inserted at time by +/- 300ms to make collisions virtually impossible + dummyInsertedAt := time.Now().Add(time.Duration(rand.Intn(2*300+1)-300) * time.Millisecond) + payloadsToWrite = append(payloadsToWrite, StoreOLAPPayloadOpts{ + Id: dummyId, ExternalId: event.ExternalID, - InsertedAt: event.TaskInsertedAt, + InsertedAt: sqlchelpers.TimestamptzFromTime(dummyInsertedAt), Payload: event.Output, }) } @@ -1772,6 +1781,7 @@ func (r *OLAPRepositoryImpl) writeTaskBatch(ctx context.Context, tenantId string }) putPayloadOpts = append(putPayloadOpts, StoreOLAPPayloadOpts{ + Id: task.ID, ExternalId: task.ExternalID, InsertedAt: task.InsertedAt, Payload: payload, @@ -1833,6 +1843,7 @@ func (r *OLAPRepositoryImpl) writeDAGBatch(ctx context.Context, tenantId string, }) putPayloadOpts = append(putPayloadOpts, StoreOLAPPayloadOpts{ + Id: dag.ID, ExternalId: dag.ExternalID, InsertedAt: dag.InsertedAt, Payload: dag.Input, @@ -2049,29 +2060,6 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev return fmt.Errorf("error creating events: %v", err) } - tenantIdToPutPayloadOpts := make(map[string][]StoreOLAPPayloadOpts) - - for _, event := range insertedEvents { - if event == nil { - continue - } - - tenantIdToPutPayloadOpts[event.TenantID.String()] = append(tenantIdToPutPayloadOpts[event.TenantID.String()], StoreOLAPPayloadOpts{ - ExternalId: event.ExternalID, - InsertedAt: event.SeenAt, - Payload: event.Payload, - }) - - } - - for tenantId, putPayloadOpts := range tenantIdToPutPayloadOpts { - err = r.PutPayloads(ctx, tx, tenantId, putPayloadOpts) - - if err != nil { - return fmt.Errorf("error putting event payloads: %v", err) - } - } - eventExternalIdToId := make(map[pgtype.UUID]int64) for _, event := range insertedEvents { @@ -2102,73 +2090,33 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev return fmt.Errorf("error creating event triggers: %v", err) } - if err := commit(ctx); err != nil { - return fmt.Errorf("error committing transaction: %v", err) - } - - if !r.payloadStore.ExternalStoreEnabled() { - return nil - } - - offloadToExternalOpts := make([]OffloadToExternalStoreOpts, 0) - idInsertedAtToExternalId := make(map[IdInsertedAt]pgtype.UUID) + tenantIdToPutPayloadOpts := make(map[string][]StoreOLAPPayloadOpts) for _, event := range insertedEvents { - id := event.ID - insertedAt := event.SeenAt - idInsertedAtToExternalId[IdInsertedAt{ - ID: id, - InsertedAt: insertedAt, - }] = event.ExternalID + if event == nil { + continue + } + payload := eventExternalIdToPayload[event.ExternalID] - offloadToExternalOpts = append(offloadToExternalOpts, OffloadToExternalStoreOpts{ - StorePayloadOpts: &StorePayloadOpts{ - Id: event.ID, - InsertedAt: event.SeenAt, - ExternalId: event.ExternalID, - Type: sqlcv1.V1PayloadTypeTASKINPUT, - Payload: payload, - TenantId: event.TenantID.String(), - }, - OffloadAt: time.Now(), + tenantIdToPutPayloadOpts[event.TenantID.String()] = append(tenantIdToPutPayloadOpts[event.TenantID.String()], StoreOLAPPayloadOpts{ + Id: event.ID, + ExternalId: event.ExternalID, + InsertedAt: event.SeenAt, + Payload: payload, }) } - if len(offloadToExternalOpts) == 0 { - return nil - } - - retrieveOptsToKey, err := r.PayloadStore().ExternalStore().Store(ctx, offloadToExternalOpts...) - - if err != nil { - return err - } - - tenantIdToffloadOpts := make(map[string][]OffloadPayloadOpts) - - for opt, key := range retrieveOptsToKey { - externalId := idInsertedAtToExternalId[IdInsertedAt{ - ID: opt.Id, - InsertedAt: opt.InsertedAt, - }] - - tenantIdToffloadOpts[opt.TenantId.String()] = append(tenantIdToffloadOpts[opt.TenantId.String()], OffloadPayloadOpts{ - ExternalId: externalId, - ExternalLocationKey: string(key), - }) - } - - for tenantId, opts := range tenantIdToffloadOpts { - err = r.OffloadPayloads(ctx, tenantId, opts) + for tenantId, putPayloadOpts := range tenantIdToPutPayloadOpts { + err = r.PutPayloads(ctx, tx, tenantId, putPayloadOpts) if err != nil { - return fmt.Errorf("error offloading payloads: %v", err) + return fmt.Errorf("error putting event payloads: %v", err) } } - if len(offloadToExternalOpts) == 0 { - return nil + if err := commit(ctx); err != nil { + return fmt.Errorf("error committing transaction: %v", err) } return nil @@ -2426,27 +2374,107 @@ type OffloadPayloadOpts struct { } func (r *OLAPRepositoryImpl) PutPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId string, putPayloadOpts []StoreOLAPPayloadOpts) error { - insertedAts := make([]pgtype.Timestamptz, len(putPayloadOpts)) - tenantIds := make([]pgtype.UUID, len(putPayloadOpts)) - externalIds := make([]pgtype.UUID, len(putPayloadOpts)) - payloads := make([][]byte, len(putPayloadOpts)) - locations := make([]string, len(putPayloadOpts)) + localTx := false + var ( + commit func(context.Context) error + rollback func() + err error + ) - for i, opt := range putPayloadOpts { - externalIds[i] = opt.ExternalId - insertedAts[i] = opt.InsertedAt - tenantIds[i] = sqlchelpers.UUIDFromStr(tenantId) - payloads[i] = opt.Payload - locations[i] = string(sqlcv1.V1PayloadLocationOlapINLINE) + if tx == nil { + localTx = true + tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000) + + if err != nil { + return fmt.Errorf("error beginning transaction in `PutPayload`: %v", err) + } + + defer rollback() } - return r.queries.PutPayloads(ctx, tx, sqlcv1.PutPayloadsParams{ - Externalids: externalIds, - Insertedats: insertedAts, - Tenantids: tenantIds, - Payloads: payloads, - Locations: locations, + placeholderPayloadType := sqlcv1.V1PayloadTypeTASKEVENTDATA // placeholder, not used in OLAP + retrieveOptsToKey := make(map[RetrievePayloadOpts]ExternalPayloadLocationKey) + + if r.payloadStore.ExternalStoreEnabled() { + storeExternalPayloadOpts := make([]OffloadToExternalStoreOpts, len(putPayloadOpts)) + + for i, opt := range putPayloadOpts { + storeOpts := OffloadToExternalStoreOpts{ + StorePayloadOpts: &StorePayloadOpts{ + Id: opt.Id, + InsertedAt: opt.InsertedAt, + ExternalId: opt.ExternalId, + Type: placeholderPayloadType, + Payload: opt.Payload, + TenantId: tenantId, + }, + OffloadAt: opt.InsertedAt.Time, // placeholder, offloaded immediately + } + + storeExternalPayloadOpts[i] = storeOpts + } + + retrieveOptsToKey, err = r.payloadStore.ExternalStore().Store(ctx, storeExternalPayloadOpts...) + + if err != nil { + return fmt.Errorf("error offloading payloads to external store: %v", err) + } + } + + insertedAts := make([]pgtype.Timestamptz, 0, len(putPayloadOpts)) + tenantIds := make([]pgtype.UUID, 0, len(putPayloadOpts)) + externalIds := make([]pgtype.UUID, 0, len(putPayloadOpts)) + payloads := make([][]byte, 0, len(putPayloadOpts)) + locations := make([]string, 0, len(putPayloadOpts)) + externalKeys := make([]string, 0, len(putPayloadOpts)) + + tenantIdUUID := sqlchelpers.UUIDFromStr(tenantId) + + for _, opt := range putPayloadOpts { + retrieveOpts := RetrievePayloadOpts{ + Id: opt.Id, + InsertedAt: opt.InsertedAt, + Type: placeholderPayloadType, + TenantId: tenantIdUUID, + } + + key, ok := retrieveOptsToKey[retrieveOpts] + + externalIds = append(externalIds, opt.ExternalId) + insertedAts = append(insertedAts, opt.InsertedAt) + tenantIds = append(tenantIds, tenantIdUUID) + + if ok { + payloads = append(payloads, nil) + locations = append(locations, string(sqlcv1.V1PayloadLocationOlapEXTERNAL)) + externalKeys = append(externalKeys, string(key)) + } else { + payloads = append(payloads, opt.Payload) + locations = append(locations, string(sqlcv1.V1PayloadLocationOlapINLINE)) + externalKeys = append(externalKeys, "") + } + } + + err = r.queries.PutPayloads(ctx, tx, sqlcv1.PutPayloadsParams{ + Externalids: externalIds, + Insertedats: insertedAts, + Tenantids: tenantIds, + Payloads: payloads, + Locations: locations, + Externallocationkeys: externalKeys, }) + + if err != nil { + return fmt.Errorf("error putting payloads: %v", err) + } + + if localTx { + if err := commit(ctx); err != nil { + return fmt.Errorf("error committing transaction in `PutPayload`: %v", err) + } + } + + return nil } func (r *OLAPRepositoryImpl) ReadPayload(ctx context.Context, tenantId string, externalId pgtype.UUID) ([]byte, error) { diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index 71b704538..c208d1cb1 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -26,6 +26,7 @@ type StorePayloadOpts struct { } type StoreOLAPPayloadOpts struct { + Id int64 ExternalId pgtype.UUID InsertedAt pgtype.Timestamptz Payload []byte @@ -45,6 +46,7 @@ type RetrievePayloadOpts struct { type PayloadLocation string type ExternalPayloadLocationKey string +type TenantID string type ExternalStore interface { Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error) @@ -64,6 +66,7 @@ type PayloadStoreRepository interface { OLAPDualWritesEnabled() bool WALPollLimit() int WALProcessInterval() time.Duration + WALEnabled() bool ExternalCutoverProcessInterval() time.Duration ExternalStoreEnabled() bool ExternalStore() ExternalStore @@ -697,6 +700,10 @@ func (p *payloadStoreRepositoryImpl) ExternalStore() ExternalStore { return p.externalStore } +func (p *payloadStoreRepositoryImpl) WALEnabled() bool { + return p.walEnabled +} + type NoOpExternalStore struct{} func (n *NoOpExternalStore) Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error) { diff --git a/pkg/repository/v1/sqlcv1/olap.sql b/pkg/repository/v1/sqlcv1/olap.sql index 0885efde1..3e561335c 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql +++ b/pkg/repository/v1/sqlcv1/olap.sql @@ -1626,7 +1626,8 @@ WITH inputs AS ( UNNEST(@insertedAts::TIMESTAMPTZ[]) AS inserted_at, UNNEST(@payloads::JSONB[]) AS payload, UNNEST(@tenantIds::UUID[]) AS tenant_id, - UNNEST(CAST(@locations::TEXT[] AS v1_payload_location_olap[])) AS location + UNNEST(CAST(@locations::TEXT[] AS v1_payload_location_olap[])) AS location, + UNNEST(@externalLocationKeys::TEXT[]) AS external_location_key ) INSERT INTO v1_payloads_olap ( @@ -1644,7 +1645,7 @@ SELECT i.inserted_at, i.location, CASE - WHEN i.location = 'EXTERNAL' THEN i.payload + WHEN i.location = 'EXTERNAL' THEN i.external_location_key ELSE NULL END, CASE diff --git a/pkg/repository/v1/sqlcv1/olap.sql.go b/pkg/repository/v1/sqlcv1/olap.sql.go index ada636ba1..605203496 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql.go +++ b/pkg/repository/v1/sqlcv1/olap.sql.go @@ -2165,7 +2165,8 @@ WITH inputs AS ( UNNEST($2::TIMESTAMPTZ[]) AS inserted_at, UNNEST($3::JSONB[]) AS payload, UNNEST($4::UUID[]) AS tenant_id, - UNNEST(CAST($5::TEXT[] AS v1_payload_location_olap[])) AS location + UNNEST(CAST($5::TEXT[] AS v1_payload_location_olap[])) AS location, + UNNEST($6::TEXT[]) AS external_location_key ) INSERT INTO v1_payloads_olap ( @@ -2183,7 +2184,7 @@ SELECT i.inserted_at, i.location, CASE - WHEN i.location = 'EXTERNAL' THEN i.payload + WHEN i.location = 'EXTERNAL' THEN i.external_location_key ELSE NULL END, CASE @@ -2200,11 +2201,12 @@ SET ` type PutPayloadsParams struct { - Externalids []pgtype.UUID `json:"externalids"` - Insertedats []pgtype.Timestamptz `json:"insertedats"` - Payloads [][]byte `json:"payloads"` - Tenantids []pgtype.UUID `json:"tenantids"` - Locations []string `json:"locations"` + Externalids []pgtype.UUID `json:"externalids"` + Insertedats []pgtype.Timestamptz `json:"insertedats"` + Payloads [][]byte `json:"payloads"` + Tenantids []pgtype.UUID `json:"tenantids"` + Locations []string `json:"locations"` + Externallocationkeys []string `json:"externallocationkeys"` } func (q *Queries) PutPayloads(ctx context.Context, db DBTX, arg PutPayloadsParams) error { @@ -2214,6 +2216,7 @@ func (q *Queries) PutPayloads(ctx context.Context, db DBTX, arg PutPayloadsParam arg.Payloads, arg.Tenantids, arg.Locations, + arg.Externallocationkeys, ) return err }