mirror of
https://github.com/MizuchiLabs/mantrae.git
synced 2026-05-04 15:10:09 -05:00
fix potential panic on empty fetch
This commit is contained in:
@@ -2,14 +2,15 @@ package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/mizuchilabs/mantrae/pkg/meta"
|
||||
mantraev1 "github.com/mizuchilabs/mantrae/proto/gen/mantrae/v1"
|
||||
"github.com/mizuchilabs/mantrae/server/internal/config"
|
||||
"github.com/mizuchilabs/mantrae/server/internal/store/db"
|
||||
"github.com/mizuchilabs/mantrae/server/internal/traefik"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"gopkg.in/yaml.v3"
|
||||
@@ -17,18 +18,20 @@ import (
|
||||
|
||||
var (
|
||||
updateGroup singleflight.Group
|
||||
lastUpdateTime sync.Map // map[string]time.Time
|
||||
lastUpdateTime sync.Map
|
||||
)
|
||||
|
||||
func scheduleUpdate(r *http.Request, q *db.Queries, profileID int64) {
|
||||
func scheduleUpdate(r *http.Request, app *config.App, profileID int64) {
|
||||
instanceName := r.Header.Get(meta.HeaderTraefikName)
|
||||
if instanceName == "" {
|
||||
instanceURL := r.Header.Get(meta.HeaderTraefikURL)
|
||||
if instanceName == "" || instanceURL == "" {
|
||||
slog.Debug("Skipping traefik update, missing headers")
|
||||
return
|
||||
}
|
||||
|
||||
// Check if we recently updated this instance
|
||||
if lastUpdate, ok := lastUpdateTime.Load(instanceName); ok {
|
||||
if time.Since(lastUpdate.(time.Time)) < 30*time.Second {
|
||||
if time.Since(lastUpdate.(time.Time)) < 5*time.Second {
|
||||
return // Skip if updated recently
|
||||
}
|
||||
}
|
||||
@@ -37,7 +40,20 @@ func scheduleUpdate(r *http.Request, q *db.Queries, profileID int64) {
|
||||
go func() {
|
||||
_, _, _ = updateGroup.Do(instanceName, func() (any, error) {
|
||||
lastUpdateTime.Store(instanceName, time.Now())
|
||||
traefik.UpdateTraefikInstance(r, q, profileID)
|
||||
result, err := traefik.UpdateTraefikInstance(r, app.Conn.GetQuery(), profileID)
|
||||
if err != nil {
|
||||
slog.Error("failed to update traefik instance", "error", err)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if result != nil {
|
||||
app.Event.Broadcast(&mantraev1.EventStreamResponse{
|
||||
Action: mantraev1.EventAction_EVENT_ACTION_UPDATED,
|
||||
Data: &mantraev1.EventStreamResponse_TraefikInstance{
|
||||
TraefikInstance: result.ToProto(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
}()
|
||||
@@ -61,7 +77,7 @@ func PublishTraefikConfig(a *config.App) http.HandlerFunc {
|
||||
}
|
||||
|
||||
// Create or update traefik instance
|
||||
scheduleUpdate(r, a.Conn.GetQuery(), profile.ID)
|
||||
scheduleUpdate(r, a, profile.ID)
|
||||
|
||||
cfg, err := traefik.BuildDynamicConfig(r.Context(), a.Conn.GetQuery(), profile)
|
||||
if err != nil {
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -30,29 +31,24 @@ type APIResponse struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func UpdateTraefikInstance(r *http.Request, q *db.Queries, profileID int64) {
|
||||
func UpdateTraefikInstance(
|
||||
r *http.Request,
|
||||
q *db.Queries,
|
||||
profileID int64,
|
||||
) (*db.TraefikInstance, error) {
|
||||
instanceName := r.Header.Get(meta.HeaderTraefikName)
|
||||
instanceURL := r.Header.Get(meta.HeaderTraefikURL)
|
||||
if instanceName == "" || instanceURL == "" {
|
||||
slog.Debug("Skipping traefik update, missing headers")
|
||||
return
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
instance, err := q.GetTraefikInstanceByName(
|
||||
context.Background(),
|
||||
db.GetTraefikInstanceByNameParams{
|
||||
ProfileID: profileID,
|
||||
Name: instanceName,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
// Create new temporary instance
|
||||
instance = db.TraefikInstance{
|
||||
ID: uuid.New().String(),
|
||||
ProfileID: profileID,
|
||||
Name: instanceName,
|
||||
Url: instanceURL,
|
||||
}
|
||||
params := db.UpsertTraefikInstanceParams{
|
||||
ID: uuid.New().String(),
|
||||
ProfileID: profileID,
|
||||
Name: instanceName,
|
||||
Url: instanceURL,
|
||||
Tls: strings.HasPrefix(instanceURL, "https"),
|
||||
}
|
||||
|
||||
endpoints := []struct {
|
||||
@@ -70,7 +66,7 @@ func UpdateTraefikInstance(r *http.Request, q *db.Queries, profileID int64) {
|
||||
// Fetch all endpoints concurrently
|
||||
for _, endpoint := range endpoints {
|
||||
go func(name, path string) {
|
||||
data, err := fetch(instance, path)
|
||||
data, err := fetch(params, path)
|
||||
responses <- APIResponse{Name: name, Data: data, Err: err}
|
||||
}(endpoint.name, endpoint.path)
|
||||
}
|
||||
@@ -102,57 +98,55 @@ func UpdateTraefikInstance(r *http.Request, q *db.Queries, profileID int64) {
|
||||
|
||||
// If any critical fetch failed, abort
|
||||
if len(fetchErrors) > 0 {
|
||||
return
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Decode responses
|
||||
var config schema.Configuration
|
||||
if err := json.NewDecoder(results["raw"]).Decode(&config); err != nil {
|
||||
slog.Error("failed to decode raw data", "error", err)
|
||||
return
|
||||
if results["raw"] != nil {
|
||||
var config schema.Configuration
|
||||
if err := json.NewDecoder(results["raw"]).Decode(&config); err != nil {
|
||||
slog.Warn("failed to decode raw config", "error", err)
|
||||
} else {
|
||||
params.Config = &config
|
||||
}
|
||||
}
|
||||
|
||||
var entrypoints schema.EntryPoints
|
||||
if err := json.NewDecoder(results["entrypoints"]).Decode(&entrypoints); err != nil {
|
||||
slog.Error("failed to decode entrypoints", "error", err)
|
||||
return
|
||||
if results["entrypoints"] != nil {
|
||||
var entrypoints schema.EntryPoints
|
||||
if err := json.NewDecoder(results["entrypoints"]).Decode(&entrypoints); err != nil {
|
||||
slog.Warn("failed to decode entrypoints", "error", err)
|
||||
} else {
|
||||
params.Entrypoints = &entrypoints
|
||||
}
|
||||
}
|
||||
|
||||
var overview schema.Overview
|
||||
if err := json.NewDecoder(results["overview"]).Decode(&overview); err != nil {
|
||||
slog.Error("failed to decode overview", "error", err)
|
||||
return
|
||||
if results["overview"] != nil {
|
||||
var overview schema.Overview
|
||||
params.Overview = &overview
|
||||
if err := json.NewDecoder(results["overview"]).Decode(&overview); err != nil {
|
||||
slog.Warn("failed to decode overview", "error", err)
|
||||
} else {
|
||||
params.Overview = &overview
|
||||
}
|
||||
}
|
||||
|
||||
var version schema.Version
|
||||
if err := json.NewDecoder(results["version"]).Decode(&version); err != nil {
|
||||
slog.Error("failed to decode version", "error", err)
|
||||
return
|
||||
if results["version"] != nil {
|
||||
var version schema.Version
|
||||
if err := json.NewDecoder(results["version"]).Decode(&version); err != nil {
|
||||
slog.Warn("failed to decode version", "error", err)
|
||||
} else {
|
||||
params.Version = &version
|
||||
}
|
||||
}
|
||||
|
||||
// Upsert with timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if _, err := q.UpsertTraefikInstance(ctx, db.UpsertTraefikInstanceParams{
|
||||
ID: instance.ID,
|
||||
Name: instance.Name,
|
||||
Url: instance.Url,
|
||||
Username: instance.Username,
|
||||
Password: instance.Password,
|
||||
Tls: instance.Tls,
|
||||
Entrypoints: &entrypoints,
|
||||
Overview: &overview,
|
||||
Config: &config,
|
||||
Version: &version,
|
||||
ProfileID: profileID,
|
||||
}); err != nil {
|
||||
slog.Error("failed to update traefik instance", "error", err)
|
||||
return
|
||||
result, err := q.UpsertTraefikInstance(context.Background(), params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func fetch(instance db.TraefikInstance, endpoint string) (io.ReadCloser, error) {
|
||||
func fetch(instance db.UpsertTraefikInstanceParams, endpoint string) (io.ReadCloser, error) {
|
||||
if instance.Url == "" {
|
||||
return nil, fmt.Errorf("invalid URL or endpoint")
|
||||
}
|
||||
@@ -181,9 +175,6 @@ func fetch(instance db.TraefikInstance, endpoint string) (io.ReadCloser, error)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
if err = resp.Body.Close(); err != nil {
|
||||
slog.Error("failed to close response body", "error", err)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to fetch %s: %w", instance.Url+endpoint, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import {
|
||||
agentClient,
|
||||
dnsClient,
|
||||
entryPointClient,
|
||||
middlewareClient,
|
||||
profileClient,
|
||||
routerClient,
|
||||
serversTransportClient,
|
||||
serviceClient,
|
||||
traefikClient,
|
||||
userClient,
|
||||
utilClient
|
||||
agentClient,
|
||||
dnsClient,
|
||||
entryPointClient,
|
||||
middlewareClient,
|
||||
profileClient,
|
||||
routerClient,
|
||||
serversTransportClient,
|
||||
serviceClient,
|
||||
traefikClient,
|
||||
userClient,
|
||||
utilClient
|
||||
} from '$lib/api';
|
||||
import type { Agent } from '$lib/gen/mantrae/v1/agent_pb';
|
||||
import type { DnsProvider } from '$lib/gen/mantrae/v1/dns_provider_pb';
|
||||
|
||||
Reference in New Issue
Block a user