chore(signoz): deprecate all flags (#8308)

Deprecate all flags

- Use querier.config.fluxInterval in lieu of passing `--flux-interval` and `--flux-interval-for-trace-detail`
- Remove `--gateway-url`
- Use telemetrystore.clickhouse.cluster in lieu of passing `--cluster` or `--cluster-name`
- Add an `unparam` check in the linter. Updated some functions across the querier codebase to be compatible with this linter.
- Remove prometheus config from docker builds.
This commit is contained in:
Vibhu Pandey
2025-06-21 00:55:38 +05:30
committed by GitHub
parent 5b342b9b5d
commit 5601c0886d
46 changed files with 411 additions and 350 deletions

View File

@@ -7,6 +7,7 @@ linters:
- sloglint
- depguard
- iface
- unparam
linters-settings:
sloglint:

View File

@@ -90,6 +90,15 @@ apiserver:
- /api/v1/version
- /
##################### Querier #####################
querier:
# The TTL for cached query results.
cache_ttl: 168h
# The interval for recent data that should not be cached.
flux_interval: 5m
# The maximum number of concurrent queries for missing ranges.
max_concurrent_queries: 4
##################### TelemetryStore #####################
telemetrystore:
# Maximum number of idle connections in the connection pool.
@@ -103,13 +112,15 @@ telemetrystore:
clickhouse:
# The DSN to use for clickhouse.
dsn: tcp://localhost:9000
# The cluster name to use for clickhouse.
cluster: cluster
# The query settings for clickhouse.
settings:
max_execution_time: 0
max_execution_time_leaf: 0
timeout_before_checking_execution_speed: 0
max_bytes_to_read: 0
max_result_rows_for_ch_query: 0
max_result_rows: 0
##################### Prometheus #####################
prometheus:
@@ -227,3 +238,9 @@ statsreporter:
collect:
# Whether to collect identities and traits (emails).
identities: true
##################### Gateway (License only) #####################
gateway:
# The URL of the gateway's api.
url: http://localhost:8080

View File

@@ -11,11 +11,9 @@ RUN apk update && \
COPY ./target/${OS}-${TARGETARCH}/signoz /root/signoz
COPY ./conf/prometheus.yml /root/config/prometheus.yml
COPY ./templates/email /root/templates
COPY frontend/build/ /etc/signoz/web/
RUN chmod 755 /root /root/signoz
ENTRYPOINT ["./signoz"]
CMD ["-config", "/root/config/prometheus.yml"]
ENTRYPOINT ["./signoz"]

View File

@@ -12,11 +12,9 @@ RUN apk update && \
rm -rf /var/cache/apk/*
COPY ./target/${OS}-${ARCH}/signoz /root/signoz
COPY ./conf/prometheus.yml /root/config/prometheus.yml
COPY ./templates/email /root/templates
COPY frontend/build/ /etc/signoz/web/
RUN chmod 755 /root /root/signoz
ENTRYPOINT ["./signoz"]
CMD ["-config", "/root/config/prometheus.yml"]

View File

@@ -7,7 +7,6 @@ import (
"github.com/SigNoz/signoz/ee/licensing/httplicensing"
"github.com/SigNoz/signoz/ee/query-service/integrations/gateway"
"github.com/SigNoz/signoz/ee/query-service/interfaces"
"github.com/SigNoz/signoz/ee/query-service/usage"
"github.com/SigNoz/signoz/pkg/alertmanager"
"github.com/SigNoz/signoz/pkg/apis/fields"
@@ -17,6 +16,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
rules "github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/signoz"
@@ -26,8 +26,7 @@ import (
)
type APIHandlerOptions struct {
DataConnector interfaces.DataConnector
PreferSpanMetrics bool
DataConnector interfaces.Reader
RulesManager *rules.Manager
UsageManager *usage.Manager
IntegrationsController *integrations.Controller
@@ -51,7 +50,6 @@ type APIHandler struct {
func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) {
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
Reader: opts.DataConnector,
PreferSpanMetrics: opts.PreferSpanMetrics,
RuleManager: opts.RulesManager,
IntegrationsController: opts.IntegrationsController,
CloudIntegrationsController: opts.CloudIntegrationsController,

View File

@@ -96,7 +96,7 @@ func (ah *APIHandler) receiveSAML(w http.ResponseWriter, r *http.Request) {
return
}
nextPage, err := ah.Signoz.Modules.User.PrepareSsoRedirect(ctx, redirectUri, email, ah.opts.JWT)
nextPage, err := ah.Signoz.Modules.User.PrepareSsoRedirect(ctx, redirectUri, email)
if err != nil {
zap.L().Error("[receiveSAML] failed to generate redirect URI after successful login ", zap.String("domain", domain.String()), zap.Error(err))
handleSsoError(w, r, redirectUri)

View File

@@ -59,7 +59,7 @@ func (ah *APIHandler) getFeatureFlags(w http.ResponseWriter, r *http.Request) {
}
}
if ah.opts.PreferSpanMetrics {
if constants.IsPreferSpanMetrics {
for idx, feature := range featureSet {
if feature.Name == licensetypes.UseSpanMetrics {
featureSet[idx].Active = true

View File

@@ -1,39 +0,0 @@
package db
import (
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/prometheus"
basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
)
type ClickhouseReader struct {
conn clickhouse.Conn
appdb sqlstore.SQLStore
*basechr.ClickHouseReader
}
func NewDataConnector(
sqlDB sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
cluster string,
fluxIntervalForTraceDetail time.Duration,
cache cache.Cache,
) *ClickhouseReader {
chReader := basechr.NewReader(sqlDB, telemetryStore, prometheus, cluster, fluxIntervalForTraceDetail, cache)
return &ClickhouseReader{
conn: telemetryStore.ClickhouseDB(),
appdb: sqlDB,
ClickHouseReader: chReader,
}
}
func (r *ClickhouseReader) GetSQLStore() sqlstore.SQLStore {
return r.appdb
}

View File

@@ -6,12 +6,10 @@ import (
"net"
"net/http"
_ "net/http/pprof" // http profiler
"time"
"github.com/gorilla/handlers"
"github.com/SigNoz/signoz/ee/query-service/app/api"
"github.com/SigNoz/signoz/ee/query-service/app/db"
"github.com/SigNoz/signoz/ee/query-service/constants"
"github.com/SigNoz/signoz/ee/query-service/integrations/gateway"
"github.com/SigNoz/signoz/ee/query-service/rules"
@@ -31,6 +29,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
baseapp "github.com/SigNoz/signoz/pkg/query-service/app"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
@@ -58,61 +57,55 @@ type ServerOptions struct {
Jwt *authtypes.JWT
}
// Server runs HTTP api service
// Server runs HTTP, Mux and a grpc server
type Server struct {
serverOptions *ServerOptions
ruleManager *baserules.Manager
config signoz.Config
signoz *signoz.SigNoz
jwt *authtypes.JWT
ruleManager *baserules.Manager
// public http router
httpConn net.Listener
httpServer *http.Server
httpConn net.Listener
httpServer *http.Server
httpHostPort string
// private http
privateConn net.Listener
privateHTTP *http.Server
privateConn net.Listener
privateHTTP *http.Server
privateHostPort string
opampServer *opamp.Server
// Usage manager
usageManager *usage.Manager
opampServer *opamp.Server
unavailableChannel chan healthcheck.Status
}
// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}
// NewServer creates and initializes Server
func NewServer(serverOptions *ServerOptions) (*Server, error) {
gatewayProxy, err := gateway.NewProxy(serverOptions.GatewayUrl, gateway.RoutePrefix)
func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT) (*Server, error) {
gatewayProxy, err := gateway.NewProxy(config.Gateway.URL.String(), gateway.RoutePrefix)
if err != nil {
return nil, err
}
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
if err != nil {
return nil, err
}
reader := db.NewDataConnector(
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus,
serverOptions.Cluster,
fluxIntervalForTraceDetail,
serverOptions.SigNoz.Cache,
reader := clickhouseReader.NewReader(
signoz.SQLStore,
signoz.TelemetryStore,
signoz.Prometheus,
signoz.TelemetryStore.Cluster(),
config.Querier.FluxInterval,
signoz.Cache,
)
rm, err := makeRulesManager(
reader,
serverOptions.SigNoz.Cache,
serverOptions.SigNoz.Alertmanager,
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus,
serverOptions.SigNoz.Modules.OrgGetter,
signoz.Cache,
signoz.Alertmanager,
signoz.SQLStore,
signoz.TelemetryStore,
signoz.Prometheus,
signoz.Modules.OrgGetter,
)
if err != nil {
@@ -120,16 +113,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
// initiate opamp
opAmpModel.InitDB(serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.Instrumentation.Logger(), serverOptions.SigNoz.Modules.OrgGetter)
opAmpModel.Init(signoz.SQLStore, signoz.Instrumentation.Logger(), signoz.Modules.OrgGetter)
integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore)
integrationsController, err := integrations.NewController(signoz.SQLStore)
if err != nil {
return nil, fmt.Errorf(
"couldn't create integrations controller: %w", err,
)
}
cloudIntegrationsController, err := cloudintegrations.NewController(serverOptions.SigNoz.SQLStore)
cloudIntegrationsController, err := cloudintegrations.NewController(signoz.SQLStore)
if err != nil {
return nil, fmt.Errorf(
"couldn't create cloud provider integrations controller: %w", err,
@@ -138,7 +131,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
serverOptions.SigNoz.SQLStore,
signoz.SQLStore,
integrationsController.GetPipelinesForInstalledIntegrations,
)
if err != nil {
@@ -147,7 +140,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// initiate agent config handler
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
Store: serverOptions.SigNoz.SQLStore,
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
})
if err != nil {
@@ -155,7 +148,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
// start the usagemanager
usageManager, err := usage.New(serverOptions.SigNoz.Licensing, serverOptions.SigNoz.TelemetryStore.ClickhouseDB(), serverOptions.SigNoz.Zeus, serverOptions.SigNoz.Modules.OrgGetter)
usageManager, err := usage.New(signoz.Licensing, signoz.TelemetryStore.ClickhouseDB(), signoz.Zeus, signoz.Modules.OrgGetter)
if err != nil {
return nil, err
}
@@ -165,7 +158,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
telemetry.GetInstance().SetReader(reader)
telemetry.GetInstance().SetSqlStore(serverOptions.SigNoz.SQLStore)
telemetry.GetInstance().SetSqlStore(signoz.SQLStore)
telemetry.GetInstance().SetSaasOperator(constants.SaasSegmentKey)
telemetry.GetInstance().SetSavedViewsInfoCallback(telemetry.GetSavedViewsInfo)
telemetry.GetInstance().SetAlertsInfoCallback(telemetry.GetAlertsInfo)
@@ -173,38 +166,36 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
telemetry.GetInstance().SetUserCountCallback(telemetry.GetUserCount)
telemetry.GetInstance().SetDashboardsInfoCallback(telemetry.GetDashboardsInfo)
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
if err != nil {
return nil, err
}
apiOpts := api.APIHandlerOptions{
DataConnector: reader,
PreferSpanMetrics: serverOptions.PreferSpanMetrics,
RulesManager: rm,
UsageManager: usageManager,
IntegrationsController: integrationsController,
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
FluxInterval: fluxInterval,
FluxInterval: config.Querier.FluxInterval,
Gateway: gatewayProxy,
GatewayUrl: serverOptions.GatewayUrl,
JWT: serverOptions.Jwt,
GatewayUrl: config.Gateway.URL.String(),
JWT: jwt,
}
apiHandler, err := api.NewAPIHandler(apiOpts, serverOptions.SigNoz)
apiHandler, err := api.NewAPIHandler(apiOpts, signoz)
if err != nil {
return nil, err
}
s := &Server{
config: config,
signoz: signoz,
jwt: jwt,
ruleManager: rm,
serverOptions: serverOptions,
httpHostPort: baseconst.HTTPHostPort,
privateHostPort: baseconst.PrivateHostPort,
unavailableChannel: make(chan healthcheck.Status),
usageManager: usageManager,
}
httpServer, err := s.createPublicServer(apiHandler, serverOptions.SigNoz.Web)
httpServer, err := s.createPublicServer(apiHandler, signoz.Web)
if err != nil {
return nil, err
@@ -237,18 +228,23 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return s, nil
}
// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}
func (s *Server) createPrivateServer(apiHandler *api.APIHandler) (*http.Server, error) {
r := baseapp.NewRouter()
r.Use(middleware.NewAuth(s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}, s.serverOptions.SigNoz.Sharder, s.serverOptions.SigNoz.Instrumentation.Logger()).Wrap)
r.Use(middleware.NewAPIKey(s.serverOptions.SigNoz.SQLStore, []string{"SIGNOZ-API-KEY"}, s.serverOptions.SigNoz.Instrumentation.Logger(), s.serverOptions.SigNoz.Sharder).Wrap)
r.Use(middleware.NewTimeout(s.serverOptions.SigNoz.Instrumentation.Logger(),
s.serverOptions.Config.APIServer.Timeout.ExcludedRoutes,
s.serverOptions.Config.APIServer.Timeout.Default,
s.serverOptions.Config.APIServer.Timeout.Max,
r.Use(middleware.NewAuth(s.jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}, s.signoz.Sharder, s.signoz.Instrumentation.Logger()).Wrap)
r.Use(middleware.NewAPIKey(s.signoz.SQLStore, []string{"SIGNOZ-API-KEY"}, s.signoz.Instrumentation.Logger(), s.signoz.Sharder).Wrap)
r.Use(middleware.NewTimeout(s.signoz.Instrumentation.Logger(),
s.config.APIServer.Timeout.ExcludedRoutes,
s.config.APIServer.Timeout.Default,
s.config.APIServer.Timeout.Max,
).Wrap)
r.Use(middleware.NewAnalytics().Wrap)
r.Use(middleware.NewLogging(s.serverOptions.SigNoz.Instrumentation.Logger(), s.serverOptions.Config.APIServer.Logging.ExcludedRoutes).Wrap)
r.Use(middleware.NewLogging(s.signoz.Instrumentation.Logger(), s.config.APIServer.Logging.ExcludedRoutes).Wrap)
apiHandler.RegisterPrivateRoutes(r)
@@ -270,17 +266,17 @@ func (s *Server) createPrivateServer(apiHandler *api.APIHandler) (*http.Server,
func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*http.Server, error) {
r := baseapp.NewRouter()
am := middleware.NewAuthZ(s.serverOptions.SigNoz.Instrumentation.Logger())
am := middleware.NewAuthZ(s.signoz.Instrumentation.Logger())
r.Use(middleware.NewAuth(s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}, s.serverOptions.SigNoz.Sharder, s.serverOptions.SigNoz.Instrumentation.Logger()).Wrap)
r.Use(middleware.NewAPIKey(s.serverOptions.SigNoz.SQLStore, []string{"SIGNOZ-API-KEY"}, s.serverOptions.SigNoz.Instrumentation.Logger(), s.serverOptions.SigNoz.Sharder).Wrap)
r.Use(middleware.NewTimeout(s.serverOptions.SigNoz.Instrumentation.Logger(),
s.serverOptions.Config.APIServer.Timeout.ExcludedRoutes,
s.serverOptions.Config.APIServer.Timeout.Default,
s.serverOptions.Config.APIServer.Timeout.Max,
r.Use(middleware.NewAuth(s.jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}, s.signoz.Sharder, s.signoz.Instrumentation.Logger()).Wrap)
r.Use(middleware.NewAPIKey(s.signoz.SQLStore, []string{"SIGNOZ-API-KEY"}, s.signoz.Instrumentation.Logger(), s.signoz.Sharder).Wrap)
r.Use(middleware.NewTimeout(s.signoz.Instrumentation.Logger(),
s.config.APIServer.Timeout.ExcludedRoutes,
s.config.APIServer.Timeout.Default,
s.config.APIServer.Timeout.Max,
).Wrap)
r.Use(middleware.NewAnalytics().Wrap)
r.Use(middleware.NewLogging(s.serverOptions.SigNoz.Instrumentation.Logger(), s.serverOptions.Config.APIServer.Logging.ExcludedRoutes).Wrap)
r.Use(middleware.NewLogging(s.signoz.Instrumentation.Logger(), s.config.APIServer.Logging.ExcludedRoutes).Wrap)
apiHandler.RegisterRoutes(r, am)
apiHandler.RegisterLogsRoutes(r, am)
@@ -321,7 +317,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
func (s *Server) initListeners() error {
// listen on public port
var err error
publicHostPort := s.serverOptions.HTTPHostPort
publicHostPort := s.httpHostPort
if publicHostPort == "" {
return fmt.Errorf("baseconst.HTTPHostPort is required")
}
@@ -331,10 +327,10 @@ func (s *Server) initListeners() error {
return err
}
zap.L().Info(fmt.Sprintf("Query server started listening on %s...", s.serverOptions.HTTPHostPort))
zap.L().Info(fmt.Sprintf("Query server started listening on %s...", s.httpHostPort))
// listen on private port to support internal services
privateHostPort := s.serverOptions.PrivateHostPort
privateHostPort := s.privateHostPort
if privateHostPort == "" {
return fmt.Errorf("baseconst.PrivateHostPort is required")
@@ -344,7 +340,7 @@ func (s *Server) initListeners() error {
if err != nil {
return err
}
zap.L().Info(fmt.Sprintf("Query server started listening on private port %s...", s.serverOptions.PrivateHostPort))
zap.L().Info(fmt.Sprintf("Query server started listening on private port %s...", s.privateHostPort))
return nil
}
@@ -364,7 +360,7 @@ func (s *Server) Start(ctx context.Context) error {
}
go func() {
zap.L().Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.serverOptions.HTTPHostPort))
zap.L().Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.httpHostPort))
switch err := s.httpServer.Serve(s.httpConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
@@ -390,7 +386,7 @@ func (s *Server) Start(ctx context.Context) error {
}
go func() {
zap.L().Info("Starting Private HTTP server", zap.Int("port", privatePort), zap.String("addr", s.serverOptions.PrivateHostPort))
zap.L().Info("Starting Private HTTP server", zap.Int("port", privatePort), zap.String("addr", s.privateHostPort))
switch err := s.privateHTTP.Serve(s.privateConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:

View File

@@ -37,9 +37,14 @@ func GetDefaultSiteURL() string {
const DotMetricsEnabled = "DOT_METRICS_ENABLED"
var IsDotMetricsEnabled = false
var IsPreferSpanMetrics = false
func init() {
if GetOrDefaultEnv(DotMetricsEnabled, "false") == "true" {
IsDotMetricsEnabled = true
}
if GetOrDefaultEnv("USE_SPAN_METRICS", "false") == "true" {
IsPreferSpanMetrics = true
}
}

View File

@@ -1,11 +0,0 @@
package interfaces
import (
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
)
// Connector defines methods for interaction
// with o11y data. for example - clickhouse
type DataConnector interface {
baseint.Reader
}

View File

@@ -102,10 +102,14 @@ func main() {
fileprovider.NewFactory(),
},
}, signoz.DeprecatedFlags{
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
Config: promConfigPath,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
Config: promConfigPath,
FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
Cluster: cluster,
GatewayUrl: gatewayUrl,
})
if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err))
@@ -148,20 +152,7 @@ func main() {
zap.L().Fatal("Failed to create signoz", zap.Error(err))
}
serverOptions := &app.ServerOptions{
Config: config,
SigNoz: signoz,
HTTPHostPort: baseconst.HTTPHostPort,
PreferSpanMetrics: preferSpanMetrics,
PrivateHostPort: baseconst.PrivateHostPort,
FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
Cluster: cluster,
GatewayUrl: gatewayUrl,
Jwt: jwt,
}
server, err := app.NewServer(serverOptions)
server, err := app.NewServer(config, signoz, jwt)
if err != nil {
zap.L().Fatal("Failed to create server", zap.Error(err))
}

33
pkg/gateway/config.go Normal file
View File

@@ -0,0 +1,33 @@
package gateway
import (
"errors"
"net/url"
"github.com/SigNoz/signoz/pkg/factory"
)
type Config struct {
URL *url.URL `mapstructure:"url"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("gateway"), newConfig)
}
func newConfig() factory.Config {
return &Config{
URL: &url.URL{
Scheme: "http",
Host: "localhost:8080",
Path: "/",
},
}
}
func (c Config) Validate() error {
if c.URL == nil {
return errors.New("url is required")
}
return nil
}

View File

@@ -455,7 +455,7 @@ func (m *Module) CreateUserForSAMLRequest(ctx context.Context, email string) (*t
}
func (m *Module) PrepareSsoRedirect(ctx context.Context, redirectUri, email string, jwt *authtypes.JWT) (string, error) {
func (m *Module) PrepareSsoRedirect(ctx context.Context, redirectUri, email string) (string, error) {
users, err := m.GetUsersByEmail(ctx, email)
if err != nil {
m.settings.Logger().ErrorContext(ctx, "failed to get user with email received from auth provider", "error", err)

View File

@@ -7,7 +7,6 @@ import (
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
)
@@ -38,7 +37,7 @@ type Module interface {
LoginPrecheck(ctx context.Context, orgID, email, sourceUrl string) (*types.GettableLoginPrecheck, error)
// sso
PrepareSsoRedirect(ctx context.Context, redirectUri, email string, jwt *authtypes.JWT) (string, error)
PrepareSsoRedirect(ctx context.Context, redirectUri, email string) (string, error)
CanUsePassword(ctx context.Context, email string) (bool, error)
// password

View File

@@ -11,11 +11,9 @@ RUN apk update && \
COPY ./target/${OS}-${TARGETARCH}/signoz-community /root/signoz
COPY ./conf/prometheus.yml /root/config/prometheus.yml
COPY ./templates/email /root/templates
COPY frontend/build/ /etc/signoz/web/
RUN chmod 755 /root /root/signoz
ENTRYPOINT ["./signoz"]
CMD ["-config", "/root/config/prometheus.yml"]

View File

@@ -12,11 +12,9 @@ RUN apk update && \
rm -rf /var/cache/apk/*
COPY ./target/${OS}-${ARCH}/signoz-community /root/signoz-community
COPY ./conf/prometheus.yml /root/config/prometheus.yml
COPY ./templates/email /root/templates
COPY frontend/build/ /etc/signoz/web/
RUN chmod 755 /root /root/signoz-community
ENTRYPOINT ["./signoz-community"]
CMD ["-config", "/root/config/prometheus.yml"]

View File

@@ -2321,6 +2321,7 @@ func (r *ClickHouseReader) GetTotalSpans(ctx context.Context) (uint64, error) {
return totalSpans, nil
}
// deprecated: remove this function in the next major release
func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) {
var spansInLastHeartBeatInterval uint64
r.db.QueryRow(ctx, fmt.Sprintf("SELECT count() from %s.%s where ts_bucket_start >= toUInt64(toUnixTimestamp(now() - toIntervalMinute(%d))) - 1800 and timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, r.traceTableName, int(interval.Minutes()), int(interval.Minutes()))).Scan(&spansInLastHeartBeatInterval)
@@ -2328,6 +2329,7 @@ func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context,
return spansInLastHeartBeatInterval, nil
}
// deprecated: remove this function in the next major release
func (r *ClickHouseReader) GetTotalLogs(ctx context.Context) (uint64, error) {
var totalLogs uint64
@@ -2378,6 +2380,7 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, orgID valuer.UU
return metricNameToTemporality, nil
}
// deprecated: remove this function in the next major release
func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {
queryStr := fmt.Sprintf("SELECT countDistinct(fingerprint) as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableNameV41Day)
@@ -2409,6 +2412,7 @@ func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]in
return timeSeriesData, nil
}
// deprecated: remove this function in the next major release
func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) {
var totalSamples uint64
@@ -2420,6 +2424,7 @@ func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Con
return totalSamples, nil
}
// deprecated: remove this function in the next major release
func (r *ClickHouseReader) GetTotalSamples(ctx context.Context) (uint64, error) {
var totalSamples uint64
@@ -2430,6 +2435,7 @@ func (r *ClickHouseReader) GetTotalSamples(ctx context.Context) (uint64, error)
return totalSamples, nil
}
// deprecated: remove this function in the next major release
func (r *ClickHouseReader) GetDistributedInfoInLastHeartBeatInterval(ctx context.Context) (map[string]interface{}, error) {
clusterInfo := []model.ClusterInfo{}
@@ -2443,6 +2449,7 @@ func (r *ClickHouseReader) GetDistributedInfoInLastHeartBeatInterval(ctx context
return nil, nil
}
// deprecated: remove this function in the next major release
func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) {
var totalLogLines uint64
@@ -2454,6 +2461,7 @@ func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Contex
return totalLogLines, err
}
// deprecated: remove this function in the next major release
func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) {
queryStr := fmt.Sprintf(`select serviceName, resources_string['deployment.environment'] as env,
resources_string['telemetry.sdk.language'] as language from %s.%s

View File

@@ -38,7 +38,6 @@ import (
jsoniter "github.com/json-iterator/go"
_ "github.com/mattn/go-sqlite3"
"github.com/SigNoz/signoz/pkg/cache"
traceFunnelsModule "github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations"
@@ -96,12 +95,11 @@ func NewRouter() *mux.Router {
// APIHandler implements the query service public API
type APIHandler struct {
reader interfaces.Reader
ruleManager *rules.Manager
querier interfaces.Querier
querierV2 interfaces.Querier
queryBuilder *queryBuilder.QueryBuilder
preferSpanMetrics bool
reader interfaces.Reader
ruleManager *rules.Manager
querier interfaces.Querier
querierV2 interfaces.Querier
queryBuilder *queryBuilder.QueryBuilder
// temporalityMap is a map of metric name to temporality
// to avoid fetching temporality for the same metric multiple times
@@ -140,8 +138,6 @@ type APIHandler struct {
pvcsRepo *inframetrics.PvcsRepo
JWT *authtypes.JWT
AlertmanagerAPI *alertmanager.API
LicensingAPI licensing.API
@@ -154,12 +150,9 @@ type APIHandler struct {
}
type APIHandlerOpts struct {
// business data reader e.g. clickhouse
Reader interfaces.Reader
PreferSpanMetrics bool
// rule manager handles rule crud operations
RuleManager *rules.Manager
@@ -172,14 +165,9 @@ type APIHandlerOpts struct {
// Log parsing pipelines
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
// cache
Cache cache.Cache
// Querier Influx Interval
// Flux Interval
FluxInterval time.Duration
JWT *authtypes.JWT
AlertmanagerAPI *alertmanager.API
LicensingAPI licensing.API
@@ -195,14 +183,14 @@ type APIHandlerOpts struct {
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
Cache: opts.Signoz.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
Cache: opts.Signoz.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
}
@@ -226,7 +214,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
aH := &APIHandler{
reader: opts.Reader,
preferSpanMetrics: opts.PreferSpanMetrics,
temporalityMap: make(map[string]map[v3.Temporality]bool),
ruleManager: opts.RuleManager,
IntegrationsController: opts.IntegrationsController,
@@ -245,7 +232,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
statefulsetsRepo: statefulsetsRepo,
jobsRepo: jobsRepo,
pvcsRepo: pvcsRepo,
JWT: opts.JWT,
SummaryService: summaryService,
AlertmanagerAPI: opts.AlertmanagerAPI,
LicensingAPI: opts.LicensingAPI,
@@ -1980,7 +1966,7 @@ func (aH *APIHandler) getFeatureFlags(w http.ResponseWriter, r *http.Request) {
return
}
if aH.preferSpanMetrics {
if constants.PreferSpanMetrics {
for idx, feature := range featureSet {
if feature.Name == licensetypes.UseSpanMetrics {
featureSet[idx].Active = true
@@ -2096,7 +2082,7 @@ func (aH *APIHandler) receiveGoogleAuth(w http.ResponseWriter, r *http.Request)
return
}
nextPage, err := aH.Signoz.Modules.User.PrepareSsoRedirect(ctx, redirectUri, identity.Email, aH.JWT)
nextPage, err := aH.Signoz.Modules.User.PrepareSsoRedirect(ctx, redirectUri, identity.Email)
if err != nil {
zap.L().Error("[receiveGoogleAuth] failed to generate redirect URI after successful login ", zap.String("domain", domain.String()), zap.Error(err))
handleSsoError(w, r, redirectUri)

View File

@@ -238,7 +238,7 @@ func newTestbed(t *testing.T) *testbed {
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.Nil(t, err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(testDB), sharder)
model.InitDB(testDB, slog.Default(), orgGetter)
model.Init(testDB, slog.Default(), orgGetter)
testConfigProvider := NewMockAgentConfigProvider()
opampServer := InitializeServer(nil, testConfigProvider)

View File

@@ -36,7 +36,7 @@ func (a *Agents) Count() int {
}
// Initialize the database and create schema if needed
func InitDB(sqlStore sqlstore.SQLStore, logger *slog.Logger, orgGetter organization.Getter) {
func Init(sqlStore sqlstore.SQLStore, logger *slog.Logger, orgGetter organization.Getter) {
AllAgents = Agents{
agentsById: make(map[string]*Agent),

View File

@@ -6,7 +6,6 @@ import (
"net"
"net/http"
_ "net/http/pprof" // http profiler
"time"
"github.com/gorilla/handlers"
@@ -42,95 +41,72 @@ import (
"go.uber.org/zap"
)
type ServerOptions struct {
Config signoz.Config
HTTPHostPort string
PrivateHostPort string
PreferSpanMetrics bool
CacheConfigPath string
FluxInterval string
FluxIntervalForTraceDetail string
Cluster string
SigNoz *signoz.SigNoz
Jwt *authtypes.JWT
}
// Server runs HTTP, Mux and a grpc server
type Server struct {
serverOptions *ServerOptions
ruleManager *rules.Manager
config signoz.Config
signoz *signoz.SigNoz
jwt *authtypes.JWT
ruleManager *rules.Manager
// public http router
httpConn net.Listener
httpServer *http.Server
httpConn net.Listener
httpServer *http.Server
httpHostPort string
// private http
privateConn net.Listener
privateHTTP *http.Server
privateConn net.Listener
privateHTTP *http.Server
privateHostPort string
opampServer *opamp.Server
unavailableChannel chan healthcheck.Status
}
// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}
// NewServer creates and initializes Server
func NewServer(serverOptions *ServerOptions) (*Server, error) {
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT) (*Server, error) {
integrationsController, err := integrations.NewController(signoz.SQLStore)
if err != nil {
return nil, err
}
integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore)
if err != nil {
return nil, err
}
cloudIntegrationsController, err := cloudintegrations.NewController(serverOptions.SigNoz.SQLStore)
cloudIntegrationsController, err := cloudintegrations.NewController(signoz.SQLStore)
if err != nil {
return nil, err
}
reader := clickhouseReader.NewReader(
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus,
serverOptions.Cluster,
fluxIntervalForTraceDetail,
serverOptions.SigNoz.Cache,
signoz.SQLStore,
signoz.TelemetryStore,
signoz.Prometheus,
signoz.TelemetryStore.Cluster(),
config.Querier.FluxInterval,
signoz.Cache,
)
rm, err := makeRulesManager(
reader,
serverOptions.SigNoz.Cache,
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus,
serverOptions.SigNoz.Modules.OrgGetter,
signoz.Cache,
signoz.SQLStore,
signoz.TelemetryStore,
signoz.Prometheus,
signoz.Modules.OrgGetter,
)
if err != nil {
return nil, err
}
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
if err != nil {
return nil, err
}
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
serverOptions.SigNoz.SQLStore, integrationsController.GetPipelinesForInstalledIntegrations,
signoz.SQLStore,
integrationsController.GetPipelinesForInstalledIntegrations,
)
if err != nil {
return nil, err
}
// todo(remove): remove in favour of statsreporter and analytics
telemetry.GetInstance().SetReader(reader)
telemetry.GetInstance().SetSqlStore(serverOptions.SigNoz.SQLStore)
telemetry.GetInstance().SetSqlStore(signoz.SQLStore)
telemetry.GetInstance().SetSavedViewsInfoCallback(telemetry.GetSavedViewsInfo)
telemetry.GetInstance().SetAlertsInfoCallback(telemetry.GetAlertsInfo)
telemetry.GetInstance().SetGetUsersCallback(telemetry.GetUsers)
@@ -139,30 +115,32 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
apiHandler, err := NewAPIHandler(APIHandlerOpts{
Reader: reader,
PreferSpanMetrics: serverOptions.PreferSpanMetrics,
RuleManager: rm,
IntegrationsController: integrationsController,
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
FluxInterval: fluxInterval,
JWT: serverOptions.Jwt,
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),
FluxInterval: config.Querier.FluxInterval,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
LicensingAPI: nooplicensing.NewLicenseAPI(),
FieldsAPI: fields.NewAPI(serverOptions.SigNoz.Instrumentation.ToProviderSettings(), serverOptions.SigNoz.TelemetryStore),
Signoz: serverOptions.SigNoz,
QuerierAPI: querierAPI.NewAPI(serverOptions.SigNoz.Querier),
FieldsAPI: fields.NewAPI(signoz.Instrumentation.ToProviderSettings(), signoz.TelemetryStore),
Signoz: signoz,
QuerierAPI: querierAPI.NewAPI(signoz.Querier),
})
if err != nil {
return nil, err
}
s := &Server{
config: config,
signoz: signoz,
jwt: jwt,
ruleManager: rm,
serverOptions: serverOptions,
httpHostPort: constants.HTTPHostPort,
privateHostPort: constants.PrivateHostPort,
unavailableChannel: make(chan healthcheck.Status),
}
httpServer, err := s.createPublicServer(apiHandler, serverOptions.SigNoz.Web)
httpServer, err := s.createPublicServer(apiHandler, signoz.Web)
if err != nil {
return nil, err
@@ -177,20 +155,23 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s.privateHTTP = privateServer
opAmpModel.InitDB(serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.Instrumentation.Logger(), serverOptions.SigNoz.Modules.OrgGetter)
opAmpModel.Init(signoz.SQLStore, signoz.Instrumentation.Logger(), signoz.Modules.OrgGetter)
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
Store: serverOptions.SigNoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
agentConfMgr, err := agentConf.Initiate(
&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
},
},
})
)
if err != nil {
return nil, err
}
s.opampServer = opamp.InitializeServer(
&opAmpModel.AllAgents, agentConfMgr,
&opAmpModel.AllAgents,
agentConfMgr,
)
orgs, err := apiHandler.Signoz.Modules.OrgGetter.ListByOwnedKeyRange(context.Background())
@@ -207,19 +188,24 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return s, nil
}
// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}
func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
r := NewRouter()
r.Use(middleware.NewAuth(s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}, s.serverOptions.SigNoz.Sharder, s.serverOptions.SigNoz.Instrumentation.Logger()).Wrap)
r.Use(middleware.NewTimeout(s.serverOptions.SigNoz.Instrumentation.Logger(),
s.serverOptions.Config.APIServer.Timeout.ExcludedRoutes,
s.serverOptions.Config.APIServer.Timeout.Default,
s.serverOptions.Config.APIServer.Timeout.Max,
r.Use(middleware.NewAuth(s.jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}, s.signoz.Sharder, s.signoz.Instrumentation.Logger()).Wrap)
r.Use(middleware.NewTimeout(s.signoz.Instrumentation.Logger(),
s.config.APIServer.Timeout.ExcludedRoutes,
s.config.APIServer.Timeout.Default,
s.config.APIServer.Timeout.Max,
).Wrap)
r.Use(middleware.NewAnalytics().Wrap)
r.Use(middleware.NewAPIKey(s.serverOptions.SigNoz.SQLStore, []string{"SIGNOZ-API-KEY"}, s.serverOptions.SigNoz.Instrumentation.Logger(), s.serverOptions.SigNoz.Sharder).Wrap)
r.Use(middleware.NewLogging(s.serverOptions.SigNoz.Instrumentation.Logger(), s.serverOptions.Config.APIServer.Logging.ExcludedRoutes).Wrap)
r.Use(middleware.NewAPIKey(s.signoz.SQLStore, []string{"SIGNOZ-API-KEY"}, s.signoz.Instrumentation.Logger(), s.signoz.Sharder).Wrap)
r.Use(middleware.NewLogging(s.signoz.Instrumentation.Logger(), s.config.APIServer.Logging.ExcludedRoutes).Wrap)
api.RegisterPrivateRoutes(r)
@@ -242,17 +228,17 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, error) {
r := NewRouter()
r.Use(middleware.NewAuth(s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}, s.serverOptions.SigNoz.Sharder, s.serverOptions.SigNoz.Instrumentation.Logger()).Wrap)
r.Use(middleware.NewTimeout(s.serverOptions.SigNoz.Instrumentation.Logger(),
s.serverOptions.Config.APIServer.Timeout.ExcludedRoutes,
s.serverOptions.Config.APIServer.Timeout.Default,
s.serverOptions.Config.APIServer.Timeout.Max,
r.Use(middleware.NewAuth(s.jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}, s.signoz.Sharder, s.signoz.Instrumentation.Logger()).Wrap)
r.Use(middleware.NewTimeout(s.signoz.Instrumentation.Logger(),
s.config.APIServer.Timeout.ExcludedRoutes,
s.config.APIServer.Timeout.Default,
s.config.APIServer.Timeout.Max,
).Wrap)
r.Use(middleware.NewAnalytics().Wrap)
r.Use(middleware.NewAPIKey(s.serverOptions.SigNoz.SQLStore, []string{"SIGNOZ-API-KEY"}, s.serverOptions.SigNoz.Instrumentation.Logger(), s.serverOptions.SigNoz.Sharder).Wrap)
r.Use(middleware.NewLogging(s.serverOptions.SigNoz.Instrumentation.Logger(), s.serverOptions.Config.APIServer.Logging.ExcludedRoutes).Wrap)
r.Use(middleware.NewAPIKey(s.signoz.SQLStore, []string{"SIGNOZ-API-KEY"}, s.signoz.Instrumentation.Logger(), s.signoz.Sharder).Wrap)
r.Use(middleware.NewLogging(s.signoz.Instrumentation.Logger(), s.config.APIServer.Logging.ExcludedRoutes).Wrap)
am := middleware.NewAuthZ(s.serverOptions.SigNoz.Instrumentation.Logger())
am := middleware.NewAuthZ(s.signoz.Instrumentation.Logger())
api.RegisterRoutes(r, am)
api.RegisterLogsRoutes(r, am)
@@ -293,7 +279,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
func (s *Server) initListeners() error {
// listen on public port
var err error
publicHostPort := s.serverOptions.HTTPHostPort
publicHostPort := s.httpHostPort
if publicHostPort == "" {
return fmt.Errorf("constants.HTTPHostPort is required")
}
@@ -303,10 +289,10 @@ func (s *Server) initListeners() error {
return err
}
zap.L().Info(fmt.Sprintf("Query server started listening on %s...", s.serverOptions.HTTPHostPort))
zap.L().Info(fmt.Sprintf("Query server started listening on %s...", s.httpHostPort))
// listen on private port to support internal services
privateHostPort := s.serverOptions.PrivateHostPort
privateHostPort := s.privateHostPort
if privateHostPort == "" {
return fmt.Errorf("constants.PrivateHostPort is required")
@@ -316,7 +302,7 @@ func (s *Server) initListeners() error {
if err != nil {
return err
}
zap.L().Info(fmt.Sprintf("Query server started listening on private port %s...", s.serverOptions.PrivateHostPort))
zap.L().Info(fmt.Sprintf("Query server started listening on private port %s...", s.privateHostPort))
return nil
}
@@ -336,7 +322,7 @@ func (s *Server) Start(ctx context.Context) error {
}
go func() {
zap.L().Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.serverOptions.HTTPHostPort))
zap.L().Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.httpHostPort))
switch err := s.httpServer.Serve(s.httpConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
@@ -362,7 +348,7 @@ func (s *Server) Start(ctx context.Context) error {
}
fmt.Println("starting private http")
go func() {
zap.L().Info("Starting Private HTTP server", zap.Int("port", privatePort), zap.String("addr", s.serverOptions.PrivateHostPort))
zap.L().Info("Starting Private HTTP server", zap.Int("port", privatePort), zap.String("addr", s.privateHostPort))
switch err := s.privateHTTP.Serve(s.privateConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:

View File

@@ -633,6 +633,7 @@ var DeprecatedStaticFieldsTraces = map[string]v3.AttributeKey{
var StaticFieldsTraces = map[string]v3.AttributeKey{}
var IsDotMetricsEnabled = false
var PreferSpanMetrics = false
func init() {
StaticFieldsTraces = maps.Clone(NewStaticFieldsTraces)
@@ -640,6 +641,9 @@ func init() {
if GetOrDefaultEnv(DotMetricsEnabled, "false") == "true" {
IsDotMetricsEnabled = true
}
if GetOrDefaultEnv("USE_SPAN_METRICS", "false") == "true" {
PreferSpanMetrics = true
}
}
const TRACE_V4_MAX_PAGINATION_LIMIT = 10000

View File

@@ -63,15 +63,19 @@ func main() {
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
// Deprecated
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
// Deprecated
flag.BoolVar(&preferSpanMetrics, "prefer-span-metrics", false, "(prefer span metrics for service level metrics)")
// Deprecated
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
// Deprecated
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
// Deprecated
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)")
// Deprecated
flag.StringVar(&fluxIntervalForTraceDetail, "flux-interval-trace-detail", "2m", "(the interval to exclude data from being cached to avoid incorrect cache for trace data in motion)")
// Deprecated
flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')")
// Allow using the consistent naming with the signoz collector
// Deprecated
flag.StringVar(&cluster, "cluster-name", "cluster", "(cluster name - defaults to 'cluster')")
// Deprecated
flag.IntVar(&maxIdleConns, "max-idle-conns", 50, "(number of connections to maintain in the pool, only used with clickhouse if not set in ClickHouseUrl env var DSN.)")
@@ -94,10 +98,14 @@ func main() {
fileprovider.NewFactory(),
},
}, signoz.DeprecatedFlags{
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
Config: promConfigPath,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
Config: promConfigPath,
FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
PreferSpanMetrics: preferSpanMetrics,
Cluster: cluster,
})
if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err))
@@ -136,20 +144,7 @@ func main() {
zap.L().Fatal("Failed to create signoz", zap.Error(err))
}
serverOptions := &app.ServerOptions{
Config: config,
HTTPHostPort: constants.HTTPHostPort,
PreferSpanMetrics: preferSpanMetrics,
PrivateHostPort: constants.PrivateHostPort,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
Cluster: cluster,
SigNoz: signoz,
Jwt: jwt,
}
server, err := app.NewServer(serverOptions)
server, err := app.NewServer(config, signoz, jwt)
if err != nil {
logger.Fatal("Failed to create server", zap.Error(err))
}

View File

@@ -12,7 +12,7 @@ import (
"go.uber.org/zap"
)
// GetDashboardsInfo returns analytics data for dashboards
// deprecated: remove this function in the next major release
func GetDashboardsInfo(ctx context.Context, sqlstore sqlstore.SQLStore) (*model.DashboardsInfo, error) {
dashboardsInfo := model.DashboardsInfo{}
// fetch dashboards from dashboard db
@@ -64,6 +64,7 @@ func GetDashboardsInfo(ctx context.Context, sqlstore sqlstore.SQLStore) (*model.
return &dashboardsInfo, nil
}
// deprecated: remove this function in the next major release
func isDashboardWithTSV2(data map[string]interface{}) bool {
jsonData, err := json.Marshal(data)
if err != nil {
@@ -72,6 +73,7 @@ func isDashboardWithTSV2(data map[string]interface{}) bool {
return strings.Contains(string(jsonData), "time_series_v2")
}
// deprecated: remove this function in the next major release
func isDashboardWithTagAttrs(data map[string]interface{}) bool {
jsonData, err := json.Marshal(data)
if err != nil {
@@ -81,6 +83,7 @@ func isDashboardWithTagAttrs(data map[string]interface{}) bool {
strings.Contains(string(jsonData), "tag_attributes")
}
// deprecated: remove this function in the next major release
func isDashboardWithLogsClickhouseQuery(data map[string]interface{}) bool {
jsonData, err := json.Marshal(data)
if err != nil {
@@ -91,6 +94,7 @@ func isDashboardWithLogsClickhouseQuery(data map[string]interface{}) bool {
return result
}
// deprecated: remove this function in the next major release
func isDashboardWithTracesClickhouseQuery(data map[string]interface{}) bool {
jsonData, err := json.Marshal(data)
if err != nil {
@@ -105,6 +109,7 @@ func isDashboardWithTracesClickhouseQuery(data map[string]interface{}) bool {
return result
}
// deprecated: remove this function in the next major release
func isDashboardWithPanelAndName(data map[string]interface{}) bool {
isDashboardName := false
isDashboardWithPanelAndName := false
@@ -125,8 +130,8 @@ func isDashboardWithPanelAndName(data map[string]interface{}) bool {
return isDashboardWithPanelAndName
}
// deprecated: remove this function in the next major release
func extractDashboardName(data map[string]interface{}) string {
if data != nil && data["title"] != nil {
title, ok := data["title"].(string)
if ok {
@@ -137,6 +142,7 @@ func extractDashboardName(data map[string]interface{}) string {
return ""
}
// deprecated: remove this function in the next major release
func checkLogPanelAttrContains(data map[string]interface{}) int {
var logsPanelsWithAttrContains int
filters, ok := data["filters"].(map[string]interface{})
@@ -163,6 +169,7 @@ func checkLogPanelAttrContains(data map[string]interface{}) int {
return logsPanelsWithAttrContains
}
// deprecated: remove this function in the next major release
func countPanelsInDashboard(inputData map[string]interface{}) model.DashboardsInfo {
var logsPanelCount, tracesPanelCount, metricsPanelCount, logsPanelsWithAttrContains int
traceChQueryCount := 0

View File

@@ -1,7 +1,7 @@
package telemetry
// deprecated: remove this function in the next major release
func ignoreEvents(event string, attributes map[string]interface{}) bool {
if event == TELEMETRY_EVENT_ACTIVE_USER {
for attr_key, attr_val := range attributes {

View File

@@ -14,6 +14,7 @@ import (
"go.uber.org/zap"
)
// deprecated: remove this function in the next major release
func getChannels(ctx context.Context, sqlstore sqlstore.SQLStore) ([]*alertmanagertypes.Channel, error) {
channels := []*alertmanagertypes.Channel{}
if err := sqlstore.BunDB().NewSelect().Model(&channels).Scan(ctx); err != nil {
@@ -23,6 +24,7 @@ func getChannels(ctx context.Context, sqlstore sqlstore.SQLStore) ([]*alertmanag
return channels, nil
}
// deprecated: remove this function in the next major release
func GetAlertsInfo(ctx context.Context, sqlstore sqlstore.SQLStore) (*model.AlertsInfo, error) {
alertsInfo := model.AlertsInfo{}

View File

@@ -14,6 +14,7 @@ import (
"go.uber.org/zap"
)
// deprecated: remove this function in the next major release
func GetViews(ctx context.Context, sqlstore sqlstore.SQLStore, orgID string) ([]*v3.SavedView, error) {
var views []types.SavedView
err := sqlstore.BunDB().NewSelect().Model(&views).Where("org_id = ?", orgID).Scan(ctx)
@@ -45,6 +46,7 @@ func GetViews(ctx context.Context, sqlstore sqlstore.SQLStore, orgID string) ([]
return savedViews, nil
}
// deprecated: remove this function in the next major release
func GetSavedViewsInfo(ctx context.Context, sqlstore sqlstore.SQLStore) (*model.SavedViewsInfo, error) {
savedViewsInfo := model.SavedViewsInfo{}
// get single org ID from db

View File

@@ -110,6 +110,7 @@ const RATE_LIMIT_VALUE = 1
var telemetry *Telemetry
var once sync.Once
// deprecated: remove this function in the next major release
func (a *Telemetry) IsSampled() bool {
random_number := a.minRandInt + rand.Intn(a.maxRandInt-a.minRandInt) + 1
@@ -122,6 +123,7 @@ func (a *Telemetry) IsSampled() bool {
}
// deprecated: remove this function in the next major release
func (telemetry *Telemetry) CheckQueryInfo(postData *v3.QueryRangeParamsV3) QueryInfoResult {
queryInfoResult := QueryInfoResult{}
if postData != nil && postData.CompositeQuery != nil {
@@ -172,22 +174,28 @@ func (telemetry *Telemetry) CheckQueryInfo(postData *v3.QueryRangeParamsV3) Quer
return queryInfoResult
}
// deprecated: remove this function in the next major release
func (telemetry *Telemetry) AddActiveTracesUser() {
telemetry.mutex.Lock()
telemetry.activeUser["traces"] = 1
telemetry.mutex.Unlock()
}
// deprecated: remove this function in the next major release
func (telemetry *Telemetry) AddActiveMetricsUser() {
telemetry.mutex.Lock()
telemetry.activeUser["metrics"] = 1
telemetry.mutex.Unlock()
}
// deprecated: remove this function in the next major release
func (telemetry *Telemetry) AddActiveLogsUser() {
telemetry.mutex.Lock()
telemetry.activeUser["logs"] = 1
telemetry.mutex.Unlock()
}
// deprecated: remove this function in the next major release
type Telemetry struct {
ossOperator analytics.Client
saasOperator analytics.Client
@@ -212,26 +220,32 @@ type Telemetry struct {
savedViewsInfoCallback func(ctx context.Context, store sqlstore.SQLStore) (*model.SavedViewsInfo, error)
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetAlertsInfoCallback(callback func(ctx context.Context, store sqlstore.SQLStore) (*model.AlertsInfo, error)) {
a.alertsInfoCallback = callback
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetUserCountCallback(callback func(ctx context.Context, store sqlstore.SQLStore) (int, error)) {
a.userCountCallback = callback
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetGetUsersCallback(callback func(ctx context.Context, store sqlstore.SQLStore) ([]TelemetryUser, error)) {
a.getUsersCallback = callback
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetSavedViewsInfoCallback(callback func(ctx context.Context, store sqlstore.SQLStore) (*model.SavedViewsInfo, error)) {
a.savedViewsInfoCallback = callback
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetDashboardsInfoCallback(callback func(ctx context.Context, store sqlstore.SQLStore) (*model.DashboardsInfo, error)) {
a.dashboardsInfoCallback = callback
}
// deprecated: remove this function in the next major release
func createTelemetry() {
// Do not do anything in CI (not even resolving the outbound IP address)
if testing.Testing() {
@@ -523,7 +537,7 @@ func createTelemetry() {
go s.StartBlocking()
}
// Get preferred outbound ip of this machine
// deprecated: remove this function in the next major release
func getOutboundIP() string {
ip := []byte(IP_NOT_FOUND_PLACEHOLDER)
@@ -542,6 +556,7 @@ func getOutboundIP() string {
return string(ip)
}
// deprecated: remove this function in the next major release
func (a *Telemetry) IdentifyUser(user *types.User) {
if user.Email == DEFAULT_CLOUD_EMAIL {
return
@@ -579,6 +594,7 @@ func (a *Telemetry) IdentifyUser(user *types.User) {
}
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SendIdentifyEvent(data map[string]interface{}, userEmail string) {
if !a.isTelemetryEnabled() || a.isTelemetryAnonymous() {
@@ -612,6 +628,7 @@ func (a *Telemetry) SendIdentifyEvent(data map[string]interface{}, userEmail str
}
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SendGroupEvent(data map[string]interface{}, userEmail string) {
if !a.isTelemetryEnabled() || a.isTelemetryAnonymous() {
return
@@ -646,18 +663,22 @@ func (a *Telemetry) SendGroupEvent(data map[string]interface{}, userEmail string
}
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetUserEmail(email string) {
a.userEmail = email
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetPatTokenUser() {
a.patTokenUser = true
}
// deprecated: remove this function in the next major release
func (a *Telemetry) GetUserEmail() string {
return a.userEmail
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetSaasOperator(saasOperatorKey string) {
if saasOperatorKey == "" {
return
@@ -665,6 +686,7 @@ func (a *Telemetry) SetSaasOperator(saasOperatorKey string) {
a.saasOperator = analytics.New(saasOperatorKey)
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetCompanyDomain(email string) {
email_split := strings.Split(email, "@")
@@ -675,10 +697,12 @@ func (a *Telemetry) SetCompanyDomain(email string) {
}
// deprecated: remove this function in the next major release
func (a *Telemetry) getCompanyDomain() string {
return a.companyDomain
}
// deprecated: remove this function in the next major release
func (a *Telemetry) checkEvents(event string) bool {
sendEvent := true
if event == TELEMETRY_EVENT_USER && a.isTelemetryAnonymous() {
@@ -687,6 +711,7 @@ func (a *Telemetry) checkEvents(event string) bool {
return sendEvent
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SendEvent(event string, data map[string]interface{}, userEmail string, rateLimitFlag bool, viaEventsAPI bool) {
// ignore telemetry for default user
@@ -772,32 +797,38 @@ func (a *Telemetry) SendEvent(event string, data map[string]interface{}, userEma
}
}
// deprecated: remove this function in the next major release
func (a *Telemetry) isTelemetryAnonymous() bool {
return a.isAnonymous
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetTelemetryAnonymous(value bool) {
a.isAnonymous = value
}
// deprecated: remove this function in the next major release
func (a *Telemetry) isTelemetryEnabled() bool {
return a.isEnabled
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetTelemetryEnabled(value bool) {
a.isEnabled = value
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetReader(reader interfaces.Reader) {
a.reader = reader
}
// deprecated: remove this function in the next major release
func (a *Telemetry) SetSqlStore(store sqlstore.SQLStore) {
a.sqlStore = store
}
// deprecated: remove this function in the next major release
func GetInstance() *Telemetry {
once.Do(func() {
createTelemetry()
})
@@ -805,6 +836,7 @@ func GetInstance() *Telemetry {
return telemetry
}
// deprecated: remove this function in the next major release
func getDeploymentType() string {
deploymentType := os.Getenv("DEPLOYMENT_TYPE")
if deploymentType == "" {

View File

@@ -13,10 +13,12 @@ type TelemetryUser struct {
Organization string
}
// deprecated: remove this function in the next major release
func GetUsers(ctx context.Context, sqlstore sqlstore.SQLStore) ([]TelemetryUser, error) {
return GetUsersWithOpts(ctx, 0, sqlstore)
}
// deprecated: remove this function in the next major release
func GetUserCount(ctx context.Context, sqlstore sqlstore.SQLStore) (int, error) {
users, err := GetUsersWithOpts(ctx, 0, sqlstore)
if err != nil {
@@ -25,7 +27,7 @@ func GetUserCount(ctx context.Context, sqlstore sqlstore.SQLStore) (int, error)
return len(users), nil
}
// GetUsersWithOpts fetches users and supports additional search options
// deprecated: remove this function in the next major release
func GetUsersWithOpts(ctx context.Context, limit int, sqlstore sqlstore.SQLStore) ([]TelemetryUser, error) {
var displayName string
err := sqlstore.BunDB().NewSelect().

View File

@@ -322,7 +322,6 @@ func NewFilterSuggestionsTestBed(t *testing.T) *FilterSuggestionsTestBed {
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
Reader: reader,
JWT: jwt,
Signoz: &signoz.SigNoz{
Modules: modules,
Handlers: handlers,

View File

@@ -468,6 +468,7 @@ type LogPipelinesTestBed struct {
opampClientConn *opamp.MockOpAmpConnection
store sqlstore.SQLStore
userModule user.Module
JWT *authtypes.JWT
}
// testDB can be injected for sharing a DB across multiple integration testbeds.
@@ -502,7 +503,6 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
LogsParsingPipelineController: controller,
JWT: jwt,
Signoz: &signoz.SigNoz{
Modules: modules,
Handlers: handlers,
@@ -535,6 +535,7 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
agentConfMgr: agentConfMgr,
store: sqlStore,
userModule: modules.User,
JWT: jwt,
}
}
@@ -545,7 +546,7 @@ func NewLogPipelinesTestBed(t *testing.T, testDB sqlstore.SQLStore, agentID stri
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
orgGetter := implorganization.NewGetter(implorganization.NewStore(testbed.store), sharder)
model.InitDB(testbed.store, slog.Default(), orgGetter)
model.Init(testbed.store, slog.Default(), orgGetter)
opampServer := opamp.InitializeServer(nil, testbed.agentConfMgr)
err = opampServer.Start(opamp.GetAvailableLocalAddress())
@@ -586,7 +587,7 @@ func (tb *LogPipelinesTestBed) PostPipelinesToQSExpectingStatusCode(
respWriter := httptest.NewRecorder()
ctx, err := tb.apiHandler.JWT.ContextFromRequest(req.Context(), req.Header.Get("Authorization"))
ctx, err := tb.JWT.ContextFromRequest(req.Context(), req.Header.Get("Authorization"))
if err != nil {
tb.t.Fatalf("couldn't get jwt from request: %v", err)
}

View File

@@ -384,7 +384,6 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *CloudI
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
Reader: reader,
CloudIntegrationsController: controller,
JWT: jwt,
Signoz: &signoz.SigNoz{
Modules: modules,
Handlers: handlers,

View File

@@ -597,10 +597,8 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration
handlers := signoz.NewHandlers(modules)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
Reader: reader,
IntegrationsController: controller,
JWT: jwt,
Reader: reader,
IntegrationsController: controller,
CloudIntegrationsController: cloudIntegrationsController,
Signoz: &signoz.SigNoz{
Modules: modules,

View File

@@ -16,6 +16,7 @@ import (
"github.com/SigNoz/signoz/pkg/config"
"github.com/SigNoz/signoz/pkg/emailing"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/gateway"
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
@@ -82,15 +83,23 @@ type Config struct {
// StatsReporter config
StatsReporter statsreporter.Config `mapstructure:"statsreporter"`
// Gateway config
Gateway gateway.Config `mapstructure:"gateway"`
}
// DeprecatedFlags are the flags that are deprecated and scheduled for removal.
// These flags are used to ensure backward compatibility with the old flags.
type DeprecatedFlags struct {
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
Config string
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
Config string
FluxInterval string
FluxIntervalForTraceDetail string
PreferSpanMetrics bool
Cluster string
GatewayUrl string
}
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) {
@@ -111,6 +120,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
emailing.NewConfigFactory(),
sharder.NewConfigFactory(),
statsreporter.NewConfigFactory(),
gateway.NewConfigFactory(),
}
conf, err := config.New(ctx, resolverConfig, configFactories)
@@ -265,4 +275,37 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
fmt.Println("[Deprecated] env TELEMETRY_ENABLED is deprecated and scheduled for removal. Please use SIGNOZ_ANALYTICS_ENABLED instead.")
config.Analytics.Enabled = os.Getenv("TELEMETRY_ENABLED") == "true"
}
if deprecatedFlags.FluxInterval != "" {
fmt.Println("[Deprecated] flag --flux-interval is deprecated and scheduled for removal. Please use SIGNOZ_QUERIER_FLUX__INTERVAL instead.")
fluxInterval, err := time.ParseDuration(deprecatedFlags.FluxInterval)
if err != nil {
fmt.Println("Error parsing --flux-interval, using default value.")
} else {
config.Querier.FluxInterval = fluxInterval
}
}
if deprecatedFlags.FluxIntervalForTraceDetail != "" {
fmt.Println("[Deprecated] flag --flux-interval-for-trace-detail is deprecated and scheduled for complete removal. Please use SIGNOZ_QUERIER_FLUX__INTERVAL instead.")
}
if deprecatedFlags.Cluster != "" {
fmt.Println("[Deprecated] flag --cluster is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER instead.")
config.TelemetryStore.Clickhouse.Cluster = deprecatedFlags.Cluster
}
if deprecatedFlags.PreferSpanMetrics {
fmt.Println("[Deprecated] flag --prefer-span-metrics is deprecated and scheduled for removal. Please use USE_SPAN_METRICS instead.")
}
if deprecatedFlags.GatewayUrl != "" {
fmt.Println("[Deprecated] flag --gateway-url is deprecated and scheduled for removal. Please use SIGNOZ_GATEWAY_URL instead.")
u, err := url.Parse(deprecatedFlags.GatewayUrl)
if err != nil {
fmt.Println("Error parsing --gateway-url, using default value.")
} else {
config.Gateway.URL = u
}
}
}

View File

@@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"
)
func resourceFilterStmtBuilder() (qbtypes.StatementBuilder[qbtypes.LogAggregation], error) {
func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation] {
fm := resourcefilter.NewFieldMapper()
cb := resourcefilter.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
@@ -30,7 +30,7 @@ func resourceFilterStmtBuilder() (qbtypes.StatementBuilder[qbtypes.LogAggregatio
fm,
cb,
mockMetadataStore,
), nil
)
}
func TestStatementBuilder(t *testing.T) {
@@ -118,8 +118,7 @@ func TestStatementBuilder(t *testing.T) {
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
resourceFilterStmtBuilder, err := resourceFilterStmtBuilder()
require.NoError(t, err)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),

View File

@@ -32,7 +32,7 @@ func NewFieldMapper() qbtypes.FieldMapper {
return &fieldMapper{}
}
func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
return attributeMetadataColumns["resource_attributes"], nil

View File

@@ -184,9 +184,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
if b.canShortCircuitDelta(query) {
// spatial_aggregation_cte directly for certain delta queries
if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
return nil, err
} else if frag != "" {
if frag, args := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs); frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
@@ -200,9 +198,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
}
// spatial_aggregation_cte
if frag, args, err := b.buildSpatialAggregationCTE(ctx, start, end, query, keys); err != nil {
return nil, err
} else if frag != "" {
if frag, args := b.buildSpatialAggregationCTE(ctx, start, end, query, keys); frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
@@ -222,7 +218,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
) (string, []any) {
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
@@ -261,7 +257,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
sb.GroupBy("ALL")
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}
func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
@@ -450,7 +446,7 @@ func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE(
_ uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, []any, error) {
) (string, []any) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ts")
@@ -466,7 +462,7 @@ func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE(
sb.GroupBy("ALL")
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}
func (b *metricQueryStatementBuilder) buildFinalSelect(

View File

@@ -12,6 +12,7 @@ import (
type provider struct {
settings factory.ScopedProviderSettings
clickHouseConn clickhouse.Conn
cluster string
hooks []telemetrystore.TelemetryStoreHook
}
@@ -49,6 +50,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
return &provider{
settings: settings,
clickHouseConn: chConn,
cluster: config.Clickhouse.Cluster,
hooks: hooks,
}, nil
}
@@ -57,6 +59,10 @@ func (p *provider) ClickhouseDB() clickhouse.Conn {
return p
}
func (p *provider) Cluster() string {
return p.cluster
}
func (p *provider) Close() error {
return p.clickHouseConn.Close()
}

View File

@@ -28,20 +28,23 @@ type ConnectionConfig struct {
DialTimeout time.Duration `mapstructure:"dial_timeout"`
}
type ClickhouseConfig struct {
// DSN is the database source name.
DSN string `mapstructure:"dsn"`
// Cluster is the cluster name to use for clickhouse.
Cluster string `mapstructure:"cluster"`
// QuerySettings is the query settings for clickhouse.
QuerySettings QuerySettings `mapstructure:"settings"`
}
type QuerySettings struct {
MaxExecutionTime int `mapstructure:"max_execution_time"`
MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
MaxBytesToRead int `mapstructure:"max_bytes_to_read"`
MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"`
}
type ClickhouseConfig struct {
// DSN is the database source name.
DSN string `mapstructure:"dsn"`
// QuerySettings is the query settings for clickhouse.
QuerySettings QuerySettings `mapstructure:"settings"`
MaxResultRows int `mapstructure:"max_result_rows"`
}
func NewConfigFactory() factory.ConfigFactory {
@@ -57,7 +60,8 @@ func newConfig() factory.Config {
DialTimeout: 5 * time.Second,
},
Clickhouse: ClickhouseConfig{
DSN: "tcp://localhost:9000",
DSN: "tcp://localhost:9000",
Cluster: "cluster",
},
}

View File

@@ -54,7 +54,7 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__EXECUTION__TIME__LEAF", "10")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_TIMEOUT__BEFORE__CHECKING__EXECUTION__SPEED", "10")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__BYTES__TO__READ", "1000000")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__RESULT__ROWS__FOR__CH__QUERY", "10000")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__RESULT__ROWS", "10000")
conf, err := config.New(
context.Background(),
@@ -82,7 +82,7 @@ func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
MaxExecutionTimeLeaf: 10,
TimeoutBeforeCheckingExecutionSpeed: 10,
MaxBytesToRead: 1000000,
MaxResultRowsForCHQuery: 10000,
MaxResultRows: 10000,
},
},
}

View File

@@ -7,7 +7,11 @@ import (
)
type TelemetryStore interface {
// ClickhouseDB returns the clickhouse database connection.
ClickhouseDB() clickhouse.Conn
// Cluster returns the cluster name.
Cluster() string
}
type TelemetryStoreHook interface {

View File

@@ -34,7 +34,7 @@ func (h *provider) BeforeQuery(ctx context.Context, _ *telemetrystore.QueryEvent
}
if ctx.Value("enforce_max_result_rows") != nil {
settings["max_result_rows"] = h.settings.MaxResultRowsForCHQuery
settings["max_result_rows"] = h.settings.MaxResultRows
}
if h.settings.MaxBytesToRead != 0 {
@@ -60,7 +60,9 @@ func (h *provider) BeforeQuery(ctx context.Context, _ *telemetrystore.QueryEvent
}
if ctx.Value("max_result_rows") != nil && ctx.Value("result_overflow_mode") != nil {
if maxResultRows, ok := ctx.Value("max_result_rows").(int); ok { settings["max_result_rows"] = maxResultRows }
if maxResultRows, ok := ctx.Value("max_result_rows").(int); ok {
settings["max_result_rows"] = maxResultRows
}
settings["result_overflow_mode"] = ctx.Value("result_overflow_mode")
}

View File

@@ -31,6 +31,11 @@ func (p *Provider) ClickhouseDB() clickhouse.Conn {
return p.clickhouseDB.(clickhouse.Conn)
}
// Cluster returns the cluster name
func (p *Provider) Cluster() string {
return "cluster"
}
// Mock returns the underlying Clickhouse mock instance for setting expectations
func (p *Provider) Mock() cmock.ClickConnMockCommon {
return p.clickhouseDB

View File

@@ -247,7 +247,7 @@ func (m *defaultFieldMapper) ColumnExpressionFor(
correction, found := telemetrytypes.SuggestCorrection(field.Name, maps.Keys(keys))
if found {
// we found a close match, in the error message send the suggestion
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "%s", correction)
} else {
// not even a close match, return an error
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name)

View File

@@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"
)
func resourceFilterStmtBuilder() (qbtypes.StatementBuilder[qbtypes.TraceAggregation], error) {
func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.TraceAggregation] {
fm := resourcefilter.NewFieldMapper()
cb := resourcefilter.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
@@ -24,7 +24,7 @@ func resourceFilterStmtBuilder() (qbtypes.StatementBuilder[qbtypes.TraceAggregat
fm,
cb,
mockMetadataStore,
), nil
)
}
func TestStatementBuilder(t *testing.T) {
@@ -72,8 +72,7 @@ func TestStatementBuilder(t *testing.T) {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
resourceFilterStmtBuilder, err := resourceFilterStmtBuilder()
require.NoError(t, err)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),