diff --git a/src/lib.rs b/src/lib.rs index c48df49..a65d4fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,9 @@ use std::{ //sync::{Arc, OnceLock}, time::Duration, }; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +pub mod server; use reqwest::Client; use sqlx::{ @@ -10,6 +13,7 @@ use sqlx::{ types::chrono::Utc, SqlitePool, }; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; const MAX_CONNS: u32 = 200; @@ -17,8 +21,9 @@ const MIN_CONNS: u32 = 5; const TIMEOUT: u64 = 2000; // in milliseconds pub struct BlogdorTheAggregator { - pub db: SqlitePool, - pub client: reqwest::Client, + db: SqlitePool, + client: reqwest::Client, + cancel: CancellationToken, } #[derive(Debug, Default, Clone, PartialEq, Eq)] @@ -33,15 +38,25 @@ pub struct FeedEntry { impl BlogdorTheAggregator { pub async fn new() -> Self { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "blogdor=debug,axum=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + let db = get_db_pool().await; let client = reqwest::Client::new(); + let cancel = CancellationToken::new(); - Self { db, client } + Self { db, client, cancel } } - pub async fn aggregate(&self, cancel: CancellationToken) { + pub async fn aggregate(&self) -> JoinHandle<()> { let db = self.db.clone(); let client = self.client.clone(); + let cancel = self.cancel.clone(); tokio::task::spawn(async move { let mut alarm = tokio::time::interval(Duration::from_hours(1)); loop { @@ -55,7 +70,15 @@ impl BlogdorTheAggregator { } } } - }); + }) + } + + pub async fn listen(&self) -> JoinHandle<()> { + server::spawn_server(self.db.clone(), self.cancel.clone()).await + } + + pub async fn close_db(&self) { + self.db.close().await; } } diff --git a/src/main.rs b/src/main.rs index ad75304..95ba837 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,79 +1,11 @@ -use axum::{routing::post, Router}; use blogdor::BlogdorTheAggregator; -use sqlx::SqlitePool; -use std::net::SocketAddr; -use tokio_util::sync::CancellationToken; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<(), ()> { - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "blogdor=debug,axum=debug".into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); - - let blogdor_the_aggregator = BlogdorTheAggregator::new().await; - - let pool = blogdor_the_aggregator.db.clone(); - - let server = make_router(pool.clone()); - - let cancel = CancellationToken::new(); - - blogdor_the_aggregator.aggregate(cancel.clone()).await; - - let server_handle = tokio::task::spawn(async move { - let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); - tracing::debug!("binding to {addr:?}"); - let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); - axum::serve(listener, server) - .with_graceful_shutdown(graceful_shutdown(cancel)) - .await - .unwrap() - }); - - server_handle.await.unwrap(); - pool.close().await; - - Ok(()) -} - -fn make_router(db: SqlitePool) -> Router { - Router::new() - .route( - "/api/v1/add-feed", - post(async move || { - tracing::debug!("got a post to add a feed"); - }), - ) - .with_state(db) -} - -async fn graceful_shutdown(cancel: CancellationToken) { - use tokio::signal; - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); - }; - - #[cfg(unix)] - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - }; - - #[cfg(not(unix))] - let terminate = std::future::pending::<()>(); - - tokio::select! { - _ = ctrl_c => {tracing::info!("received ctrl-c, shutting down web server")}, - _ = terminate => {tracing::info!("received kill signal, shutting down web server")}, - } - cancel.cancel(); +async fn main() { + let bta = BlogdorTheAggregator::new().await; + let aggregator_handle = bta.aggregate().await; + let server_handle = bta.listen().await; + server_handle.await.unwrap_or_default(); + aggregator_handle.await.unwrap_or_default(); + bta.close_db().await; } diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..a2267a1 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,58 @@ +use std::net::SocketAddr; + +use axum::{routing::post, Router}; +use sqlx::SqlitePool; +use tokio_util::sync::CancellationToken; + +pub async fn spawn_server( + pool: SqlitePool, + cancel: CancellationToken, +) -> tokio::task::JoinHandle<()> { + tokio::task::spawn(async move { + let server = make_router(pool); + let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); + tracing::debug!("binding to {addr:?}"); + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + axum::serve(listener, server) + .with_graceful_shutdown(graceful_shutdown(cancel)) + .await + .unwrap() + }) +} + +fn make_router(db: SqlitePool) -> Router { + Router::new() + .route( + "/api/v1/add-feed", + post(async move || { + tracing::debug!("got a post to add a feed"); + }), + ) + .with_state(db) +} + +async fn graceful_shutdown(cancel: CancellationToken) { + use tokio::signal; + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {tracing::info!("received ctrl-c, shutting down web server")}, + _ = terminate => {tracing::info!("received kill signal, shutting down web server")}, + } + cancel.cancel(); +}