Add webhooks mechanism

This commit is contained in:
Luis Eduardo Jeréz Girón
2024-08-18 23:02:25 -06:00
parent 7bbe9a5ae0
commit c9ea76be56
21 changed files with 461 additions and 21 deletions

View File

@@ -0,0 +1,55 @@
-- +goose Up
-- +goose StatementBegin
CREATE TABLE IF NOT EXISTS webhooks (
id UUID NOT NULL DEFAULT uuid_generate_v4() PRIMARY KEY,
name TEXT NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT FALSE,
event_type TEXT NOT NULL CHECK (event_type IN (
'database_healthy', 'database_unhealthy',
'destination_healthy', 'destination_unhealthy',
'execution_success', 'execution_failed'
)),
target_ids UUID[] NOT NULL, -- database_id, restoration_id, etc.
url TEXT NOT NULL,
method TEXT NOT NULL CHECK (method IN ('GET', 'POST')),
headers TEXT, -- user-defined headers in JSON format
body TEXT, -- user-defined body in JSON format
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ
);
CREATE TRIGGER webhooks_change_updated_at
BEFORE UPDATE ON webhooks FOR EACH ROW EXECUTE FUNCTION change_updated_at();
CREATE INDEX IF NOT EXISTS
idx_webhooks_target_ids ON webhooks USING GIN (target_ids);
CREATE TABLE IF NOT EXISTS webhook_results (
id UUID NOT NULL DEFAULT uuid_generate_v4() PRIMARY KEY,
webhook_id UUID NOT NULL REFERENCES webhooks(id) ON DELETE CASCADE,
req_method TEXT CHECK (req_method IN ('GET', 'POST')),
req_headers TEXT,
req_body TEXT,
res_status SMALLINT,
res_headers TEXT,
res_body TEXT,
res_duration INTEGER, -- in milliseconds
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS
idx_webhook_results_webhook_id ON webhook_results(webhook_id);
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
DROP TABLE IF EXISTS webhook_results;
DROP TABLE IF EXISTS webhooks;
-- +goose StatementEnd

View File

@@ -4,20 +4,24 @@ import (
"github.com/eduardolat/pgbackweb/internal/config"
"github.com/eduardolat/pgbackweb/internal/database/dbgen"
"github.com/eduardolat/pgbackweb/internal/integration"
"github.com/eduardolat/pgbackweb/internal/service/webhooks"
)
type Service struct {
env *config.Env
dbgen *dbgen.Queries
ints *integration.Integration
env *config.Env
dbgen *dbgen.Queries
ints *integration.Integration
webhooksService *webhooks.Service
}
func New(
env *config.Env, dbgen *dbgen.Queries, ints *integration.Integration,
webhooksService *webhooks.Service,
) *Service {
return &Service{
env: env,
dbgen: dbgen,
ints: ints,
env: env,
dbgen: dbgen,
ints: ints,
webhooksService: webhooksService,
}
}

View File

@@ -37,10 +37,16 @@ func (s *Service) TestDatabaseAndStoreResult(
}
err = s.TestDatabase(ctx, db.PgVersion, db.DecryptedConnectionString)
if err != nil && db.TestOk.Valid && db.TestOk.Bool {
s.webhooksService.RunDatabaseUnhealthy(db.ID)
}
if err != nil {
return storeRes(false, err)
}
if db.TestOk.Valid && !db.TestOk.Bool {
s.webhooksService.RunDatabaseHealthy(db.ID)
}
return storeRes(true, nil)
}

View File

@@ -4,20 +4,24 @@ import (
"github.com/eduardolat/pgbackweb/internal/config"
"github.com/eduardolat/pgbackweb/internal/database/dbgen"
"github.com/eduardolat/pgbackweb/internal/integration"
"github.com/eduardolat/pgbackweb/internal/service/webhooks"
)
type Service struct {
env *config.Env
dbgen *dbgen.Queries
ints *integration.Integration
env *config.Env
dbgen *dbgen.Queries
ints *integration.Integration
webhooksService *webhooks.Service
}
func New(
env *config.Env, dbgen *dbgen.Queries, ints *integration.Integration,
webhooksService *webhooks.Service,
) *Service {
return &Service{
env: env,
dbgen: dbgen,
ints: ints,
env: env,
dbgen: dbgen,
ints: ints,
webhooksService: webhooksService,
}
}

View File

@@ -40,10 +40,16 @@ func (s *Service) TestDestinationAndStoreResult(
dest.DecryptedAccessKey, dest.DecryptedSecretKey, dest.Region,
dest.Endpoint, dest.BucketName,
)
if err != nil && dest.TestOk.Valid && dest.TestOk.Bool {
s.webhooksService.RunDestinationUnhealthy(dest.ID)
}
if err != nil {
return storeRes(false, err)
}
if dest.TestOk.Valid && !dest.TestOk.Bool {
s.webhooksService.RunDestinationHealthy(dest.ID)
}
return storeRes(true, nil)
}

View File

@@ -4,20 +4,24 @@ import (
"github.com/eduardolat/pgbackweb/internal/config"
"github.com/eduardolat/pgbackweb/internal/database/dbgen"
"github.com/eduardolat/pgbackweb/internal/integration"
"github.com/eduardolat/pgbackweb/internal/service/webhooks"
)
type Service struct {
env *config.Env
dbgen *dbgen.Queries
ints *integration.Integration
env *config.Env
dbgen *dbgen.Queries
ints *integration.Integration
webhooksService *webhooks.Service
}
func New(
env *config.Env, dbgen *dbgen.Queries, ints *integration.Integration,
webhooksService *webhooks.Service,
) *Service {
return &Service{
env: env,
dbgen: dbgen,
ints: ints,
env: env,
dbgen: dbgen,
ints: ints,
webhooksService: webhooksService,
}
}

View File

@@ -17,6 +17,14 @@ import (
// RunExecution runs a backup execution
func (s *Service) RunExecution(ctx context.Context, backupID uuid.UUID) error {
updateExec := func(params dbgen.ExecutionsServiceUpdateExecutionParams) error {
if params.Status.String == "success" {
s.webhooksService.RunExecutionSuccess(backupID)
}
if params.Status.String == "failed" {
s.webhooksService.RunExecutionFailed(backupID)
}
_, err := s.dbgen.ExecutionsServiceUpdateExecution(
ctx, params,
)

View File

@@ -12,6 +12,7 @@ import (
"github.com/eduardolat/pgbackweb/internal/service/executions"
"github.com/eduardolat/pgbackweb/internal/service/restorations"
"github.com/eduardolat/pgbackweb/internal/service/users"
"github.com/eduardolat/pgbackweb/internal/service/webhooks"
)
type Service struct {
@@ -22,16 +23,18 @@ type Service struct {
ExecutionsService *executions.Service
UsersService *users.Service
RestorationsService *restorations.Service
WebhooksService *webhooks.Service
}
func New(
env *config.Env, dbgen *dbgen.Queries,
cr *cron.Cron, ints *integration.Integration,
) *Service {
webhooksService := webhooks.New(dbgen)
authService := auth.New(env, dbgen)
databasesService := databases.New(env, dbgen, ints)
destinationsService := destinations.New(env, dbgen, ints)
executionsService := executions.New(env, dbgen, ints)
databasesService := databases.New(env, dbgen, ints, webhooksService)
destinationsService := destinations.New(env, dbgen, ints, webhooksService)
executionsService := executions.New(env, dbgen, ints, webhooksService)
usersService := users.New(dbgen)
backupsService := backups.New(dbgen, cr, executionsService)
restorationsService := restorations.New(
@@ -46,5 +49,6 @@ func New(
ExecutionsService: executionsService,
UsersService: usersService,
RestorationsService: restorationsService,
WebhooksService: webhooksService,
}
}

View File

@@ -0,0 +1,13 @@
package webhooks
import (
"context"
"github.com/eduardolat/pgbackweb/internal/database/dbgen"
)
func (s *Service) CreateWebhook(
ctx context.Context, params dbgen.WebhooksServiceCreateWebhookParams,
) (dbgen.Webhook, error) {
return s.dbgen.WebhooksServiceCreateWebhook(ctx, params)
}

View File

@@ -0,0 +1,8 @@
-- name: WebhooksServiceCreateWebhook :one
INSERT INTO webhooks (
name, is_active, event_type, target_ids,
url, method, headers, body
) VALUES (
@name, @is_active, @event_type, @target_ids,
@url, @method, @headers, @body
) RETURNING *;

View File

@@ -0,0 +1,13 @@
package webhooks
import (
"context"
"github.com/google/uuid"
)
func (s *Service) DeleteWebhook(
ctx context.Context, id uuid.UUID,
) error {
return s.dbgen.WebhooksServiceDeleteWebhook(ctx, id)
}

View File

@@ -0,0 +1,2 @@
-- name: WebhooksServiceDeleteWebhook :exec
DELETE FROM webhooks WHERE id = @webhook_id;

View File

@@ -0,0 +1,14 @@
package webhooks
import (
"context"
"github.com/eduardolat/pgbackweb/internal/database/dbgen"
"github.com/google/uuid"
)
func (s *Service) GetWebhook(
ctx context.Context, id uuid.UUID,
) (dbgen.Webhook, error) {
return s.dbgen.WebhooksServiceGetWebhook(ctx, id)
}

View File

@@ -0,0 +1,2 @@
-- name: WebhooksServiceGetWebhook :one
SELECT * FROM webhooks WHERE id = @webhook_id;

View File

@@ -0,0 +1,44 @@
package webhooks
import (
"context"
"github.com/eduardolat/pgbackweb/internal/database/dbgen"
"github.com/eduardolat/pgbackweb/internal/util/paginateutil"
)
type PaginateWebhooksParams struct {
Page int
Limit int
}
func (s *Service) PaginateWebhooks(
ctx context.Context, params PaginateWebhooksParams,
) (paginateutil.PaginateResponse, []dbgen.Webhook, error) {
page := max(params.Page, 1)
limit := min(max(params.Limit, 1), 100)
count, err := s.dbgen.WebhooksServicePaginateWebhooksCount(ctx)
if err != nil {
return paginateutil.PaginateResponse{}, nil, err
}
paginateParams := paginateutil.PaginateParams{
Page: page,
Limit: limit,
}
offset := paginateutil.CreateOffsetFromParams(paginateParams)
paginateResponse := paginateutil.CreatePaginateResponse(paginateParams, int(count))
webhooks, err := s.dbgen.WebhooksServicePaginateWebhooks(
ctx, dbgen.WebhooksServicePaginateWebhooksParams{
Limit: int32(params.Limit),
Offset: int32(offset),
},
)
if err != nil {
return paginateutil.PaginateResponse{}, nil, err
}
return paginateResponse, webhooks, nil
}

View File

@@ -0,0 +1,7 @@
-- name: WebhooksServicePaginateWebhooksCount :one
SELECT COUNT(*) FROM webhooks;
-- name: WebhooksServicePaginateWebhooks :many
SELECT * FROM webhooks
ORDER BY created_at DESC
LIMIT sqlc.arg('limit') OFFSET sqlc.arg('offset');

View File

@@ -0,0 +1,174 @@
package webhooks
import (
"context"
"database/sql"
"encoding/json"
"io"
"net/http"
"strings"
"time"
"github.com/eduardolat/pgbackweb/internal/database/dbgen"
"github.com/eduardolat/pgbackweb/internal/logger"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)
// RunDatabaseHealthy runs the healthy webhooks for the given database ID.
func (s *Service) RunDatabaseHealthy(databaseID uuid.UUID) {
go func() {
ctx := context.Background()
runWebhook(s, ctx, eventTypeDatabaseHealthy, databaseID)
}()
}
// RunDatabaseUnhealthy runs the unhealthy webhooks for the given database ID.
func (s *Service) RunDatabaseUnhealthy(databaseID uuid.UUID) {
go func() {
ctx := context.Background()
runWebhook(s, ctx, eventTypeDatabaseUnhealthy, databaseID)
}()
}
// RunDestinationHealthy runs the healthy webhooks for the given destination ID.
func (s *Service) RunDestinationHealthy(destinationID uuid.UUID) {
go func() {
ctx := context.Background()
runWebhook(s, ctx, eventTypeDestinationHealthy, destinationID)
}()
}
// RunDestinationUnhealthy runs the unhealthy webhooks for the given
// destination ID.
func (s *Service) RunDestinationUnhealthy(destinationID uuid.UUID) {
go func() {
ctx := context.Background()
runWebhook(s, ctx, eventTypeDestinationUnhealthy, destinationID)
}()
}
// RunExecutionSuccess runs the success webhooks for the given execution ID.
func (s *Service) RunExecutionSuccess(backupID uuid.UUID) {
go func() {
ctx := context.Background()
runWebhook(s, ctx, eventTypeExecutionSuccess, backupID)
}()
}
// RunExecutionFailed runs the failed webhooks for the given execution ID.
func (s *Service) RunExecutionFailed(backupID uuid.UUID) {
go func() {
ctx := context.Background()
runWebhook(s, ctx, eventTypeExecutionFailed, backupID)
}()
}
// runWebhook runs the webhooks for the given event type and target ID.
func runWebhook(
s *Service, ctx context.Context, eventType webhook, targetID uuid.UUID,
) {
webhooks, err := s.dbgen.WebhooksServiceGetWebhooksToRun(
ctx, dbgen.WebhooksServiceGetWebhooksToRunParams{
EventType: eventType.Value,
TargetID: []uuid.UUID{targetID},
},
)
if err != nil {
logger.Error("error getting webhooks to run", logger.KV{"error": err})
return
}
if len(webhooks) == 0 {
return
}
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(5)
for _, webhook := range webhooks {
eg.Go(func() error {
err := sendWebhookRequest(s, ctx, webhook)
if err != nil {
logger.Error("error sending webhook request", logger.KV{
"webhook_id": webhook.ID,
"error": err.Error(),
})
}
return nil
})
}
_ = eg.Wait()
}
// sendWebhookRequest sends a webhook request to the given webhook and
// stores the result in the database.
func sendWebhookRequest(
s *Service, ctx context.Context, webhook dbgen.Webhook,
) error {
timeStart := time.Now()
body := strings.NewReader(webhook.Body.String)
headers := map[string]string{}
if webhook.Headers.Valid {
err := json.Unmarshal([]byte(webhook.Headers.String), &headers)
if err != nil {
return err
}
}
client := http.Client{Timeout: time.Second * 30}
req, err := http.NewRequestWithContext(
ctx, webhook.Method, webhook.Url, body,
)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
for k, v := range headers {
req.Header.Set(k, v)
}
res, err := client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
resHeaders, err := json.Marshal(res.Header)
if err != nil {
return err
}
resBody, err := io.ReadAll(res.Body)
if err != nil {
return err
}
_, err = s.dbgen.WebhooksServiceCreateWebhookResult(
ctx, dbgen.WebhooksServiceCreateWebhookResultParams{
WebhookID: webhook.ID,
ReqMethod: sql.NullString{String: req.Method, Valid: true},
ReqHeaders: webhook.Headers,
ReqBody: webhook.Body,
ResStatus: sql.NullInt16{Int16: int16(res.StatusCode), Valid: true},
ResHeaders: sql.NullString{String: string(resHeaders), Valid: true},
ResBody: sql.NullString{String: string(resBody), Valid: true},
ResDuration: sql.NullInt32{
Int32: int32(time.Since(timeStart).Milliseconds()),
Valid: true,
},
},
)
if err != nil {
return err
}
logger.Info("webhook sent successfully", logger.KV{
"webhook_id": webhook.ID,
"status": res.Status,
})
return nil
}

View File

@@ -0,0 +1,16 @@
-- name: WebhooksServiceGetWebhooksToRun :many
SELECT * FROM webhooks
WHERE is_active = true
AND event_type = @event_type
AND @target_id = ANY(target_ids);
-- name: WebhooksServiceCreateWebhookResult :one
INSERT INTO webhook_results (
webhook_id, req_method, req_headers, req_body,
res_status, res_headers, res_body, res_duration
)
VALUES (
@webhook_id, @req_method, @req_headers, @req_body,
@res_status, @res_headers, @res_body, @res_duration
)
RETURNING *;

View File

@@ -0,0 +1,13 @@
package webhooks
import (
"context"
"github.com/eduardolat/pgbackweb/internal/database/dbgen"
)
func (s *Service) UpdateWebhook(
ctx context.Context, params dbgen.WebhooksServiceUpdateWebhookParams,
) (dbgen.Webhook, error) {
return s.dbgen.WebhooksServiceUpdateWebhook(ctx, params)
}

View File

@@ -0,0 +1,12 @@
-- name: WebhooksServiceUpdateWebhook :one
UPDATE webhooks
SET
name = COALESCE(sqlc.narg('name'), name),
is_active = COALESCE(sqlc.narg('is_active'), is_active),
target_ids = COALESCE(sqlc.narg('target_ids'), target_ids),
url = COALESCE(sqlc.narg('url'), url),
method = COALESCE(sqlc.narg('method'), method),
headers = COALESCE(sqlc.narg('headers'), headers),
body = COALESCE(sqlc.narg('body'), body)
WHERE id = @webhook_id
RETURNING *;

View File

@@ -0,0 +1,31 @@
package webhooks
import (
"github.com/eduardolat/pgbackweb/internal/database/dbgen"
"github.com/orsinium-labs/enum"
)
type webhook = enum.Member[string]
var (
eventTypeDatabaseHealthy = webhook{Value: "database_healthy"}
eventTypeDatabaseUnhealthy = webhook{Value: "database_unhealthy"}
eventTypeDestinationHealthy = webhook{Value: "destination_healthy"}
eventTypeDestinationUnhealthy = webhook{Value: "destination_unhealthy"}
eventTypeExecutionSuccess = webhook{Value: "execution_success"}
eventTypeExecutionFailed = webhook{Value: "execution_failed"}
)
type Service struct {
dbgen *dbgen.Queries
}
func New(
dbgen *dbgen.Queries,
) *Service {
return &Service{
dbgen: dbgen,
}
}