Wire up S3 storage configuration.

This commit is contained in:
Sebastian Jeltsch
2024-11-25 11:56:36 +01:00
parent 31781801ff
commit e986e73a55
8 changed files with 186 additions and 26 deletions

88
Cargo.lock generated
View File

@@ -1058,6 +1058,16 @@ dependencies = [
"libc",
]
[[package]]
name = "core-foundation"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
@@ -1440,7 +1450,7 @@ dependencies = [
"dlopen2",
"dlopen2_derive",
"once_cell",
"rustls-native-certs",
"rustls-native-certs 0.7.3",
"rustls-pemfile",
]
@@ -2508,6 +2518,7 @@ dependencies = [
"hyper",
"hyper-util",
"rustls",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"tokio",
"tokio-rustls",
@@ -3142,6 +3153,16 @@ dependencies = [
"serde",
]
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
]
[[package]]
name = "memchr"
version = "2.7.4"
@@ -3433,13 +3454,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6eb4c22c6154a1e759d7099f9ffad7cc5ef8245f9efbab4a41b92623079c82f3"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"chrono",
"futures",
"humantime",
"hyper",
"itertools 0.13.0",
"md-5",
"parking_lot",
"percent-encoding",
"quick-xml",
"rand",
"reqwest",
"ring",
"serde",
"serde_json",
"snafu",
"tokio",
"tracing",
@@ -4045,6 +4075,16 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-xml"
version = "0.36.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quinn"
version = "0.11.6"
@@ -4299,6 +4339,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http 1.1.0",
"http-body",
"http-body-util",
@@ -4314,6 +4355,7 @@ dependencies = [
"pin-project-lite",
"quinn",
"rustls",
"rustls-native-certs 0.8.1",
"rustls-pemfile",
"rustls-pki-types",
"serde",
@@ -4322,10 +4364,12 @@ dependencies = [
"sync_wrapper 1.0.2",
"tokio",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
"windows-registry",
@@ -4521,7 +4565,19 @@ dependencies = [
"rustls-pemfile",
"rustls-pki-types",
"schannel",
"security-framework",
"security-framework 2.11.1",
]
[[package]]
name = "rustls-native-certs"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
dependencies = [
"openssl-probe",
"rustls-pki-types",
"schannel",
"security-framework 3.0.1",
]
[[package]]
@@ -4689,7 +4745,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation 0.9.4",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1415a607e92bec364ea2cf9264646dcce0f91e6d65281bd6f2819cca3bf39c8"
dependencies = [
"bitflags",
"core-foundation 0.10.0",
"core-foundation-sys",
"libc",
"security-framework-sys",
@@ -6497,6 +6566,19 @@ version = "0.2.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d"
[[package]]
name = "wasm-streams"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "wasm_dep_analyzer"
version = "0.1.0"

View File

@@ -6,7 +6,6 @@ friction.
For context, some larger features we have on our Roadmap:
- Realtime/notification APIs for subscribing to data changes.
- S3 buckets and other cloud storage. The backend already supports it but it isn't wired up.
- Auth: more customizable settings, more customizable UI, and multi-factor.
Also, service-accounts to auth other backends as opposed to end-users.
- Many SQLite databases: imagine a separate database by tenant or user.

View File

@@ -54,6 +54,18 @@ message AuthConfig {
map<string, OAuthProviderConfig> oauth_providers = 11;
}
message S3StorageConfig {
optional string endpoint = 1;
optional string region = 2;
optional string bucket_name = 5;
/// S3 access key, a.k.a. username.
optional string access_key = 8;
/// S3 secret access key, a.k.a. password.
optional string secret_access_key = 9 [ (secret) = true ];
}
message ServerConfig {
/// Application name presented to users, e.g. when sending emails. Default:
/// "TrailBase".
@@ -71,6 +83,9 @@ message ServerConfig {
/// Interval at which backups are persisted. Setting it to 0 will disable
/// backups. Default: 0.
optional int64 backup_interval_sec = 12;
/// If present will use S3 setup over local file-system based storage.
optional S3StorageConfig s3_storage_config = 13;
}
/// Sqlite specific (as opposed to standard SQL) constrained-violation

View File

@@ -40,7 +40,7 @@ libsql = { workspace = true }
log = "^0.4.21"
minijinja = "2.1.2"
oauth2 = { version = "5.0.0-alpha.4", default-features = false, features = ["reqwest", "rustls-tls"] }
object_store = { version = "0.11.0", default-features = false }
object_store = { version = "0.11.0", default-features = false, features=["aws"] }
parking_lot = "0.12.3"
prost = "^0.12.6"
prost-reflect = { version = "^0.13.0", features = ["derive", "text-format"] }

View File

@@ -6,7 +6,7 @@ use std::sync::Arc;
use crate::auth::jwt::JwtHelper;
use crate::auth::oauth::providers::{ConfiguredOAuthProviders, OAuthProviderType};
use crate::config::proto::{Config, QueryApiConfig, RecordApiConfig};
use crate::config::proto::{Config, QueryApiConfig, RecordApiConfig, S3StorageConfig};
use crate::config::{validate_config, write_config_and_vault_textproto};
use crate::constants::SITE_URL_DEFAULT;
use crate::data_dir::DataDir;
@@ -36,8 +36,8 @@ struct InternalState {
jwt: JwtHelper,
table_metadata: TableMetadataCache,
object_store: Box<dyn ObjectStore + Send + Sync>,
#[allow(unused)]
runtime: RuntimeHandle,
#[cfg(test)]
@@ -54,6 +54,7 @@ pub(crate) struct AppStateArgs {
pub conn: Connection,
pub logs_conn: Connection,
pub jwt: JwtHelper,
pub object_store: Box<dyn ObjectStore + Send + Sync>,
pub js_runtime_threads: Option<usize>,
}
@@ -125,6 +126,7 @@ impl AppState {
logs_conn: args.logs_conn,
jwt: args.jwt,
table_metadata: args.table_metadata,
object_store: args.object_store,
runtime,
#[cfg(test)]
cleanup: vec![],
@@ -166,13 +168,8 @@ impl AppState {
self.table_metadata().invalidate_all().await
}
pub(crate) fn objectstore(
&self,
) -> Result<Box<dyn ObjectStore + Send + Sync>, object_store::Error> {
// FIXME: We should probably have a long-lived store on AppState.
return Ok(Box::new(
object_store::local::LocalFileSystem::new_with_prefix(self.data_dir().uploads_path())?,
));
pub(crate) fn objectstore(&self) -> &(dyn ObjectStore + Send + Sync) {
return &*self.state.object_store;
}
pub(crate) fn get_oauth_provider(&self, name: &str) -> Option<Arc<OAuthProviderType>> {
@@ -371,12 +368,32 @@ pub async fn test_state(options: Option<TestStateOptions>) -> anyhow::Result<App
let main_conn_clone1 = main_conn.clone();
let table_metadata_clone = table_metadata.clone();
let data_dir = DataDir(temp_dir.path().to_path_buf());
let object_store = if std::env::var("TEST_S3_OBJECT_STORE").map_or(false, |v| v == "TRUE") {
info!("Use S3 Storage for tests");
build_objectstore(
&data_dir,
Some(&S3StorageConfig {
endpoint: Some("http://127.0.0.1:9000".to_string()),
region: None,
bucket_name: Some("test".to_string()),
access_key: Some("minioadmin".to_string()),
secret_access_key: Some("minioadmin".to_string()),
}),
)
.unwrap()
} else {
build_objectstore(&data_dir, None).unwrap()
};
let runtime = RuntimeHandle::new();
runtime.set_connection(main_conn.clone());
return Ok(AppState {
state: Arc::new(InternalState {
data_dir: DataDir(temp_dir.path().to_path_buf()),
data_dir,
public_dir: None,
dev: true,
oauth: Computed::new(&config, |c| {
@@ -415,6 +432,7 @@ pub async fn test_state(options: Option<TestStateOptions>) -> anyhow::Result<App
logs_conn,
jwt: jwt::test_jwt_helper(),
table_metadata,
object_store,
runtime,
cleanup: vec![Box::new(temp_dir)],
}),
@@ -445,3 +463,44 @@ fn build_query_api(conn: libsql::Connection, config: QueryApiConfig) -> Result<Q
// TODO: Check virtual table exists
return QueryApi::from(conn, config);
}
pub(crate) fn build_objectstore(
data_dir: &DataDir,
config: Option<&S3StorageConfig>,
) -> Result<Box<dyn ObjectStore + Send + Sync>, object_store::Error> {
if let Some(config) = config {
let mut builder = object_store::aws::AmazonS3Builder::from_env();
if let Some(ref endpoint) = config.endpoint {
builder = builder.with_endpoint(endpoint);
if endpoint.starts_with("http://") {
builder =
builder.with_client_options(object_store::ClientOptions::default().with_allow_http(true))
}
}
if let Some(ref region) = config.region {
builder = builder.with_region(region);
}
let Some(ref bucket_name) = config.bucket_name else {
panic!("S3StorageConfig missing 'bucket_name'.");
};
builder = builder.with_bucket_name(bucket_name);
if let Some(ref access_key) = config.access_key {
builder = builder.with_access_key_id(access_key);
}
if let Some(ref secret_access_key) = config.secret_access_key {
builder = builder.with_secret_access_key(secret_access_key);
}
return Ok(Box::new(builder.build()?));
}
return Ok(Box::new(
object_store::local::LocalFileSystem::new_with_prefix(data_dir.uploads_path())?,
));
}

View File

@@ -25,7 +25,7 @@ pub(crate) async fn read_file_into_response(
state: &AppState,
file_upload: FileUpload,
) -> Result<Response, FileError> {
let store = state.objectstore()?;
let store = state.objectstore();
let path = object_store::path::Path::from(file_upload.path());
let result = store.get(&path).await?;
@@ -69,19 +69,19 @@ pub(crate) async fn delete_files_in_row(
};
if let Some(json) = &column_metadata.json {
let store = state.objectstore()?;
let store = state.objectstore();
match json {
JsonColumnMetadata::SchemaName(name) if name == "std.FileUpload" => {
if let Ok(json) = row.get_str(i) {
let file: FileUpload = serde_json::from_str(json)?;
delete_file(&*store, file).await?;
delete_file(store, file).await?;
}
}
JsonColumnMetadata::SchemaName(name) if name == "std.FileUploads" => {
if let Ok(json) = row.get_str(i) {
let file_uploads: FileUploads = serde_json::from_str(json)?;
for file in file_uploads.0 {
delete_file(&*store, file).await?;
delete_file(store, file).await?;
}
}
}

View File

@@ -417,9 +417,9 @@ impl InsertQueryBuilder {
// We're storing any files to the object store first to make sure the DB entry is valid right
// after commit and not racily pointing to soon-to-be-written files.
if !files.is_empty() {
let objectstore = state.objectstore()?;
let objectstore = state.objectstore();
for (metadata, content) in &mut files {
write_file(&*objectstore, metadata, content).await?;
write_file(objectstore, metadata, content).await?;
}
}
@@ -427,7 +427,7 @@ impl InsertQueryBuilder {
Ok(row) => row,
Err(err) => {
if !files.is_empty() {
let objectstore = state.objectstore()?;
let objectstore = state.objectstore();
for (metadata, _files) in &files {
let path = object_store::path::Path::from(metadata.path());
@@ -504,9 +504,9 @@ impl UpdateQueryBuilder {
// We're storing to object store before writing the entry to the DB.
let mut files = std::mem::take(&mut params.files);
if !files.is_empty() {
let store = state.objectstore()?;
let objectstore = state.objectstore();
for (metadata, content) in &mut files {
write_file(&*store, metadata, content).await?;
write_file(objectstore, metadata, content).await?;
}
}
@@ -560,7 +560,7 @@ impl UpdateQueryBuilder {
Ok(files_row) => files_row,
Err(err) => {
if !files.is_empty() {
let store = state.objectstore()?;
let store = state.objectstore();
for (metadata, _content) in &files {
let path = object_store::path::Path::from(metadata.path());
if let Err(err) = store.delete(&path).await {

View File

@@ -4,7 +4,7 @@ use std::path::PathBuf;
use thiserror::Error;
use trailbase_sqlite::{connect_sqlite, query_one_row};
use crate::app_state::{AppState, AppStateArgs};
use crate::app_state::{build_objectstore, AppState, AppStateArgs};
use crate::auth::jwt::{JwtHelper, JwtHelperError};
use crate::config::load_or_init_config_textproto;
use crate::constants::USER_TABLE;
@@ -35,6 +35,8 @@ pub enum InitError {
SchemaError(#[from] trailbase_sqlite::schema::SchemaError),
#[error("Script error: {0}")]
ScriptError(String),
#[error("ObjectStore error: {0}")]
ObjectStore(#[from] object_store::Error),
}
#[derive(Default)]
@@ -105,6 +107,8 @@ pub async fn init_app_state(
debug!("Failed to load maxmind geoip DB '{geoip_db_path:?}': {err}");
}
let object_store = build_objectstore(&data_dir, config.server.s3_storage_config.as_ref())?;
// Write out the latest .js/.d.ts runtime files.
#[cfg(feature = "v8")]
crate::js::write_js_runtime_files(&data_dir).await;
@@ -118,6 +122,7 @@ pub async fn init_app_state(
conn: main_conn.clone(),
logs_conn,
jwt,
object_store,
js_runtime_threads: args.js_runtime_threads,
});