Trigger sqlite log writes on a specific span.

This commit is contained in:
Sebastian Jeltsch
2024-12-07 12:45:10 +01:00
parent b8be749244
commit 765401af35
4 changed files with 101 additions and 87 deletions

View File

@@ -18,9 +18,7 @@ pub async fn handler(State(_state): State<AppState>, user: Option<User>) -> Resp
#[tokio::main]
async fn main() -> Result<(), BoxError> {
env_logger::init_from_env(
env_logger::Env::new().default_filter_or("info,trailbase_core=debug,refinery_core=warn"),
);
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info,refinery_core=warn"));
let custom_routes: Router<AppState> = Router::new().route("/", get(handler));
@@ -43,23 +41,21 @@ async fn main() -> Result<(), BoxError> {
)
.await?;
let filter = || {
filter::Targets::new()
.with_target("tower_http::trace::on_response", filter::LevelFilter::DEBUG)
.with_target("tower_http::trace::on_request", filter::LevelFilter::DEBUG)
.with_target("tower_http::trace::make_span", filter::LevelFilter::DEBUG)
.with_default(filter::LevelFilter::INFO)
};
// This declares **where** tracing is being logged to, e.g. stderr, file, sqlite.
let layer = tracing_subscriber::registry()
.with(trailbase_core::logging::SqliteLogLayer::new(app.state()).with_filter(filter()));
let _ = layer
let _layer = tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.compact()
.with_filter(filter()),
trailbase_core::logging::SqliteLogLayer::new(app.state())
.with_filter(filter::LevelFilter::INFO),
)
.with(
tracing_subscriber::fmt::layer().compact().with_filter(
// Limit messages to INFO and above except for request handling logs.
filter::Targets::new()
.with_target("tower_http::trace::on_response", filter::LevelFilter::DEBUG)
.with_target("tower_http::trace::on_request", filter::LevelFilter::DEBUG)
.with_target("tower_http::trace::make_span", filter::LevelFilter::DEBUG)
.with_default(filter::LevelFilter::INFO),
),
)
.try_init();

View File

@@ -91,29 +91,28 @@ async fn async_main() -> Result<(), BoxError> {
})
.await?;
let filter = || {
filter::Targets::new()
.with_target("tower_http::trace::on_response", filter::LevelFilter::DEBUG)
.with_target("tower_http::trace::on_request", filter::LevelFilter::DEBUG)
.with_target("tower_http::trace::make_span", filter::LevelFilter::DEBUG)
.with_default(filter::LevelFilter::INFO)
};
// This declares **where** tracing is being logged to, e.g. stderr, file, sqlite.
//
// NOTE: the try_init() will actually fail because the tracing system was already initialized
// by the env_logger above.
// FIXME: Without the sqlite logger here, logging is broken despite us trying to initialize
// in app.server() as well.
let layer = tracing_subscriber::registry()
.with(trailbase_core::logging::SqliteLogLayer::new(app.state()).with_filter(filter()));
let layer = tracing_subscriber::registry().with(
trailbase_core::logging::SqliteLogLayer::new(app.state())
.with_filter(filter::LevelFilter::INFO),
);
if stderr_logging {
let _ = layer
.with(
tracing_subscriber::fmt::layer()
.compact()
.with_filter(filter()),
tracing_subscriber::fmt::layer().compact().with_filter(
// Limit messages to INFO and above except for request handling logs.
filter::Targets::new()
.with_target("tower_http::trace::on_response", filter::LevelFilter::DEBUG)
.with_target("tower_http::trace::on_request", filter::LevelFilter::DEBUG)
.with_target("tower_http::trace::make_span", filter::LevelFilter::DEBUG)
.with_default(filter::LevelFilter::INFO),
),
)
.try_init();
} else {

View File

@@ -8,6 +8,7 @@ use serde_json::json;
use std::time::Duration;
use tracing::field::Field;
use tracing::span::{Attributes, Id, Record, Span};
use tracing::Level;
use tracing_subscriber::layer::{Context, Layer};
use crate::AppState;
@@ -41,6 +42,8 @@ pub(crate) struct Log {
pub created: Option<f64>,
pub r#type: i32,
// NOTE: Level isn't particularly useful at the moment. It's not derived from status but rather
// whatever we choose for the span, i.e. `LEVEL`.
pub level: i32,
pub status: u16,
pub method: String,
@@ -55,46 +58,49 @@ pub(crate) struct Log {
pub data: Option<serde_json::Value>,
}
const LEVEL: Level = Level::INFO;
const NAME: &str = "TB::sqlog";
pub(super) fn sqlite_logger_make_span(request: &Request<Body>) -> Span {
let headers = request.headers();
let host = get_header(headers, "host").unwrap_or("");
let user_agent = get_header(headers, "user-agent").unwrap_or("");
let referer = get_header(headers, "referer").unwrap_or("");
let client_ip = InsecureClientIp::from(headers, request.extensions()).map(|ip| ip.0.to_string());
let client_ip = InsecureClientIp::from(headers, request.extensions())
.map_or_else(|_err| String::new(), |ip| ip.0.to_string());
// NOTE: "%" means print using fmt::Display, and "?" means fmt::Debug.
let span = tracing::info_span!(
"request",
let span = tracing::span!(
LEVEL,
NAME,
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
host,
client_ip = client_ip.as_ref().map_or("", |s| s.as_str()),
client_ip,
user_agent,
referer,
latency_ms = tracing::field::Empty,
status = tracing::field::Empty,
length= tracing::field::Empty,
);
return span;
}
pub(super) fn sqlite_logger_on_request(_req: &Request<Body>, _span: &Span) {
// We're deliberately not creating a request event, since we're already inserting all the
// request related information into the span
// We don't need to record anything extra, since we already unpacked the request during span
// creation above.
}
pub(super) fn sqlite_logger_on_response(
response: &Response<Body>,
latency: Duration,
_span: &Span,
) {
pub(super) fn sqlite_logger_on_response(response: &Response<Body>, latency: Duration, span: &Span) {
let length = get_header(response.headers(), "content-length");
span.record("latency_ms", as_millis_f64(&latency));
span.record("status", response.status().as_u16());
span.record("length", length.and_then(|l| l.parse::<i64>().ok()));
tracing::info!(
name: "response",
latency_ms = as_millis_f64(&latency),
status = response.status().as_u16(),
length = length.and_then(|l| l.parse::<i64>().ok()),
);
// Log an event that can actually be seen, e.g. when a stderr logger is installed.
tracing::info!("response sent");
}
pub struct SqliteLogLayer {
@@ -149,7 +155,9 @@ impl SqliteLogLayer {
// then writes to Sqlite.
#[inline]
fn write_log(&self, log: LogFieldStorage) {
self.sender.send(Box::new(log)).expect(BUG_TEXT);
if let Err(err) = self.sender.send(Box::new(log)) {
panic!("Sending logs failed: {err}");
}
}
#[inline]
@@ -190,35 +198,62 @@ where
S: tracing::Subscriber,
S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
{
/// When a new "__tbreq" span is created, attach field storage.
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let span = ctx.span(id).expect(BUG_TEXT);
let span = ctx.span(id).expect("span must exist in on_new_span");
let _metadata = match ctx.metadata(&span.id()) {
Some(metadata) if metadata.name() == NAME => metadata,
_ => {
return;
}
};
let mut storage = LogFieldStorage::default();
attrs.record(&mut LogJsonVisitor(&mut storage));
span.extensions_mut().insert(storage);
}
// Then the (request->response) span "__tbreq" is closed, write out the logs.
fn on_close(&self, id: Id, ctx: Context<'_, S>) {
let Some(span) = ctx.span(&id) else {
return;
};
let metadata = span.metadata();
if metadata.name() != NAME {
return;
}
let mut extensions = span.extensions_mut();
if let Some(mut storage) = extensions.remove::<LogFieldStorage>() {
storage.level = level_to_int(metadata.level());
self.write_log(storage);
} else {
error!("span already closed/consumed?!");
}
}
// When span.record() is called, add to field storage.
fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
let span = ctx.span(id).expect(BUG_TEXT);
let Some(span) = ctx.span(id) else {
return;
};
let mut extensions = span.extensions_mut();
if let Some(storage) = extensions.get_mut::<LogFieldStorage>() {
values.record(&mut LogJsonVisitor(storage));
} else {
info!("logs already consumed");
}
}
// Add events to field storage.
fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
let span = ctx.event_span(event).expect(BUG_TEXT);
let Some(span) = ctx.event_span(event) else {
return;
};
let mut extensions = span.extensions_mut();
if let Some(mut storage) = extensions.remove::<LogFieldStorage>() {
event.record(&mut LogJsonVisitor(&mut storage));
storage.level = level_to_int(event.metadata().level());
self.write_log(storage);
if let Some(storage) = extensions.get_mut::<LogFieldStorage>() {
event.record(&mut LogJsonVisitor(storage));
}
}
}
@@ -338,5 +373,3 @@ fn level_to_int(level: &tracing::Level) -> i64 {
tracing::Level::ERROR => 0,
}
}
const BUG_TEXT: &str = "Span not found, this is a bug";

View File

@@ -4,10 +4,7 @@ use axum_test::multipart::MultipartForm;
use axum_test::TestServer;
use cookie::Cookie;
use std::rc::Rc;
use tracing_subscriber::{
filter::{self, LevelFilter},
prelude::*,
};
use tracing_subscriber::prelude::*;
use trailbase_sqlite::params;
use trailbase_core::api::{create_user_handler, login_with_password, CreateUserRequest};
@@ -87,26 +84,11 @@ async fn test_record_apis() {
add_user_to_room(conn, user_x, room).await.unwrap();
// Set up logging
let filter = || {
filter::Targets::new()
.with_target("tower_http::trace::on_response", LevelFilter::DEBUG)
.with_target("tower_http::trace::on_request", LevelFilter::DEBUG)
.with_target("tower_http::trace::make_span", LevelFilter::DEBUG)
.with_default(LevelFilter::INFO)
};
// This declares **where** tracing is being logged to, e.g. stderr, file, sqlite.
let layer = tracing_subscriber::registry()
.with(trailbase_core::logging::SqliteLogLayer::new(app.state()).with_filter(filter()));
let _ = layer
.with(
tracing_subscriber::fmt::layer()
.compact()
.with_filter(filter()),
)
.try_init();
// Set up logging: declares **where** tracing is being logged to, e.g. stderr, file, sqlite.
tracing_subscriber::registry()
.with(trailbase_core::logging::SqliteLogLayer::new(app.state()))
.try_init()
.unwrap();
{
let server = TestServer::new(app.router().clone()).unwrap();
@@ -218,14 +200,18 @@ async fn test_record_apis() {
let row = logs_conn
.query_row(
"SELECT client_ip FROM _logs WHERE client_ip = $1",
"SELECT client_ip, latency, status FROM _logs WHERE client_ip = $1",
trailbase_sqlite::params!(client_ip),
)
.await
.unwrap()
.unwrap();
// We're also testing stiching here, since client_ip is recorded on_request and latency/status
// on_response.
assert_eq!(row.get::<String>(0).unwrap(), client_ip);
assert!(row.get::<f64>(1).unwrap() > 0.0);
assert_eq!(row.get::<i64>(2).unwrap(), 200);
}
pub async fn create_chat_message_app_tables(