mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-02 07:29:59 -05:00
more tenant related repo methods (#2854)
This commit is contained in:
@@ -642,3 +642,40 @@ RETURNING *;
|
||||
-- name: DeleteTenantMember :exec
|
||||
DELETE FROM "TenantMember"
|
||||
WHERE "id" = @id::uuid;
|
||||
|
||||
-- name: DeleteTenant :exec
|
||||
UPDATE "Tenant"
|
||||
SET "deletedAt" = NOW(),
|
||||
slug = slug || '_deleted_' || gen_random_uuid()
|
||||
WHERE "id" = @id::uuid;
|
||||
|
||||
-- name: GetTenantUsageData :one
|
||||
WITH active_workers AS (
|
||||
SELECT
|
||||
workers."id",
|
||||
workers."maxRuns"
|
||||
FROM
|
||||
"Worker" workers
|
||||
WHERE
|
||||
workers."tenantId" = @tenantId::uuid
|
||||
AND workers."dispatcherId" IS NOT NULL
|
||||
AND workers."lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
|
||||
AND workers."isActive" = true
|
||||
AND workers."isPaused" = false
|
||||
), worker_slots AS (
|
||||
SELECT
|
||||
aw."id" AS worker_id,
|
||||
aw."maxRuns" - (
|
||||
SELECT COUNT(*)
|
||||
FROM v1_task_runtime runtime
|
||||
WHERE
|
||||
runtime.tenant_id = @tenantId::uuid AND
|
||||
runtime.worker_id = aw."id"
|
||||
) AS "remainingSlots"
|
||||
FROM
|
||||
active_workers aw
|
||||
)
|
||||
SELECT
|
||||
(SELECT COUNT(*) FROM active_workers) AS "workerCount",
|
||||
(SELECT SUM("maxRuns") - SUM("remainingSlots") FROM active_workers aw JOIN worker_slots ws ON aw."id" = ws.worker_id) AS "usedWorkerSlotsCount",
|
||||
(SELECT COUNT(*) FROM "TenantMember" WHERE "tenantId" = @tenantId::uuid) AS "tenantMembersCount";
|
||||
|
||||
@@ -298,6 +298,18 @@ func (q *Queries) DeleteSchedulerPartition(ctx context.Context, db DBTX, id stri
|
||||
return &i, err
|
||||
}
|
||||
|
||||
const deleteTenant = `-- name: DeleteTenant :exec
|
||||
UPDATE "Tenant"
|
||||
SET "deletedAt" = NOW(),
|
||||
slug = slug || '_deleted_' || gen_random_uuid()
|
||||
WHERE "id" = $1::uuid
|
||||
`
|
||||
|
||||
func (q *Queries) DeleteTenant(ctx context.Context, db DBTX, id pgtype.UUID) error {
|
||||
_, err := db.Exec(ctx, deleteTenant, id)
|
||||
return err
|
||||
}
|
||||
|
||||
const deleteTenantAlertGroup = `-- name: DeleteTenantAlertGroup :exec
|
||||
DELETE FROM
|
||||
"TenantAlertEmailGroup"
|
||||
@@ -745,6 +757,51 @@ func (q *Queries) GetTenantTotalQueueMetrics(ctx context.Context, db DBTX, arg G
|
||||
return &i, err
|
||||
}
|
||||
|
||||
const getTenantUsageData = `-- name: GetTenantUsageData :one
|
||||
WITH active_workers AS (
|
||||
SELECT
|
||||
workers."id",
|
||||
workers."maxRuns"
|
||||
FROM
|
||||
"Worker" workers
|
||||
WHERE
|
||||
workers."tenantId" = $1::uuid
|
||||
AND workers."dispatcherId" IS NOT NULL
|
||||
AND workers."lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
|
||||
AND workers."isActive" = true
|
||||
AND workers."isPaused" = false
|
||||
), worker_slots AS (
|
||||
SELECT
|
||||
aw."id" AS worker_id,
|
||||
aw."maxRuns" - (
|
||||
SELECT COUNT(*)
|
||||
FROM v1_task_runtime runtime
|
||||
WHERE
|
||||
runtime.tenant_id = $1::uuid AND
|
||||
runtime.worker_id = aw."id"
|
||||
) AS "remainingSlots"
|
||||
FROM
|
||||
active_workers aw
|
||||
)
|
||||
SELECT
|
||||
(SELECT COUNT(*) FROM active_workers) AS "workerCount",
|
||||
(SELECT SUM("maxRuns") - SUM("remainingSlots") FROM active_workers aw JOIN worker_slots ws ON aw."id" = ws.worker_id) AS "usedWorkerSlotsCount",
|
||||
(SELECT COUNT(*) FROM "TenantMember" WHERE "tenantId" = $1::uuid) AS "tenantMembersCount"
|
||||
`
|
||||
|
||||
type GetTenantUsageDataRow struct {
|
||||
WorkerCount int64 `json:"workerCount"`
|
||||
UsedWorkerSlotsCount int32 `json:"usedWorkerSlotsCount"`
|
||||
TenantMembersCount int64 `json:"tenantMembersCount"`
|
||||
}
|
||||
|
||||
func (q *Queries) GetTenantUsageData(ctx context.Context, db DBTX, tenantid pgtype.UUID) (*GetTenantUsageDataRow, error) {
|
||||
row := db.QueryRow(ctx, getTenantUsageData, tenantid)
|
||||
var i GetTenantUsageDataRow
|
||||
err := row.Scan(&i.WorkerCount, &i.UsedWorkerSlotsCount, &i.TenantMembersCount)
|
||||
return &i, err
|
||||
}
|
||||
|
||||
const getTenantWorkflowQueueMetrics = `-- name: GetTenantWorkflowQueueMetrics :many
|
||||
WITH valid_workflow_runs AS (
|
||||
SELECT
|
||||
|
||||
@@ -169,6 +169,10 @@ type TenantRepository interface {
|
||||
RebalanceAllTenantWorkerPartitions(ctx context.Context) error
|
||||
|
||||
RebalanceInactiveTenantWorkerPartitions(ctx context.Context) error
|
||||
|
||||
DeleteTenant(ctx context.Context, id string) error
|
||||
|
||||
GetTenantUsageData(ctx context.Context, tenantId string) (*sqlcv1.GetTenantUsageDataRow, error)
|
||||
}
|
||||
|
||||
type tenantRepository struct {
|
||||
@@ -790,6 +794,14 @@ func (r *tenantRepository) RebalanceInactiveSchedulerPartitions(ctx context.Cont
|
||||
return r.queries.RebalanceInactiveSchedulerPartitions(ctx, r.pool)
|
||||
}
|
||||
|
||||
func (r *tenantRepository) DeleteTenant(ctx context.Context, id string) error {
|
||||
return r.queries.DeleteTenant(ctx, r.pool, sqlchelpers.UUIDFromStr(id))
|
||||
}
|
||||
|
||||
func (r *tenantRepository) GetTenantUsageData(ctx context.Context, tenantId string) (*sqlcv1.GetTenantUsageDataRow, error) {
|
||||
return r.queries.GetTenantUsageData(ctx, r.pool, sqlchelpers.UUIDFromStr(tenantId))
|
||||
}
|
||||
|
||||
func getPartitionName() pgtype.Text {
|
||||
hostname, ok := os.LookupEnv("HOSTNAME")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user