refactor
This commit is contained in:
parent
3114a88619
commit
c8ee7a0734
3 changed files with 93 additions and 80 deletions
33
src/lib.rs
33
src/lib.rs
|
|
@ -2,6 +2,9 @@ use std::{
|
||||||
//sync::{Arc, OnceLock},
|
//sync::{Arc, OnceLock},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
|
||||||
|
pub mod server;
|
||||||
|
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use sqlx::{
|
use sqlx::{
|
||||||
|
|
@ -10,6 +13,7 @@ use sqlx::{
|
||||||
types::chrono::Utc,
|
types::chrono::Utc,
|
||||||
SqlitePool,
|
SqlitePool,
|
||||||
};
|
};
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
const MAX_CONNS: u32 = 200;
|
const MAX_CONNS: u32 = 200;
|
||||||
|
|
@ -17,8 +21,9 @@ const MIN_CONNS: u32 = 5;
|
||||||
const TIMEOUT: u64 = 2000; // in milliseconds
|
const TIMEOUT: u64 = 2000; // in milliseconds
|
||||||
|
|
||||||
pub struct BlogdorTheAggregator {
|
pub struct BlogdorTheAggregator {
|
||||||
pub db: SqlitePool,
|
db: SqlitePool,
|
||||||
pub client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
|
cancel: CancellationToken,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||||
|
|
@ -33,15 +38,25 @@ pub struct FeedEntry {
|
||||||
|
|
||||||
impl BlogdorTheAggregator {
|
impl BlogdorTheAggregator {
|
||||||
pub async fn new() -> Self {
|
pub async fn new() -> Self {
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(
|
||||||
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||||
|
.unwrap_or_else(|_| "blogdor=debug,axum=debug".into()),
|
||||||
|
)
|
||||||
|
.with(tracing_subscriber::fmt::layer())
|
||||||
|
.init();
|
||||||
|
|
||||||
let db = get_db_pool().await;
|
let db = get_db_pool().await;
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
|
let cancel = CancellationToken::new();
|
||||||
|
|
||||||
Self { db, client }
|
Self { db, client, cancel }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn aggregate(&self, cancel: CancellationToken) {
|
pub async fn aggregate(&self) -> JoinHandle<()> {
|
||||||
let db = self.db.clone();
|
let db = self.db.clone();
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
|
let cancel = self.cancel.clone();
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
let mut alarm = tokio::time::interval(Duration::from_hours(1));
|
let mut alarm = tokio::time::interval(Duration::from_hours(1));
|
||||||
loop {
|
loop {
|
||||||
|
|
@ -55,7 +70,15 @@ impl BlogdorTheAggregator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn listen(&self) -> JoinHandle<()> {
|
||||||
|
server::spawn_server(self.db.clone(), self.cancel.clone()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn close_db(&self) {
|
||||||
|
self.db.close().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
82
src/main.rs
82
src/main.rs
|
|
@ -1,79 +1,11 @@
|
||||||
use axum::{routing::post, Router};
|
|
||||||
use blogdor::BlogdorTheAggregator;
|
use blogdor::BlogdorTheAggregator;
|
||||||
use sqlx::SqlitePool;
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() -> Result<(), ()> {
|
async fn main() {
|
||||||
tracing_subscriber::registry()
|
let bta = BlogdorTheAggregator::new().await;
|
||||||
.with(
|
let aggregator_handle = bta.aggregate().await;
|
||||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
let server_handle = bta.listen().await;
|
||||||
.unwrap_or_else(|_| "blogdor=debug,axum=debug".into()),
|
server_handle.await.unwrap_or_default();
|
||||||
)
|
aggregator_handle.await.unwrap_or_default();
|
||||||
.with(tracing_subscriber::fmt::layer())
|
bta.close_db().await;
|
||||||
.init();
|
|
||||||
|
|
||||||
let blogdor_the_aggregator = BlogdorTheAggregator::new().await;
|
|
||||||
|
|
||||||
let pool = blogdor_the_aggregator.db.clone();
|
|
||||||
|
|
||||||
let server = make_router(pool.clone());
|
|
||||||
|
|
||||||
let cancel = CancellationToken::new();
|
|
||||||
|
|
||||||
blogdor_the_aggregator.aggregate(cancel.clone()).await;
|
|
||||||
|
|
||||||
let server_handle = tokio::task::spawn(async move {
|
|
||||||
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
|
|
||||||
tracing::debug!("binding to {addr:?}");
|
|
||||||
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
|
|
||||||
axum::serve(listener, server)
|
|
||||||
.with_graceful_shutdown(graceful_shutdown(cancel))
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
});
|
|
||||||
|
|
||||||
server_handle.await.unwrap();
|
|
||||||
pool.close().await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_router(db: SqlitePool) -> Router {
|
|
||||||
Router::new()
|
|
||||||
.route(
|
|
||||||
"/api/v1/add-feed",
|
|
||||||
post(async move || {
|
|
||||||
tracing::debug!("got a post to add a feed");
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
.with_state(db)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn graceful_shutdown(cancel: CancellationToken) {
|
|
||||||
use tokio::signal;
|
|
||||||
let ctrl_c = async {
|
|
||||||
signal::ctrl_c()
|
|
||||||
.await
|
|
||||||
.expect("failed to install Ctrl+C handler");
|
|
||||||
};
|
|
||||||
|
|
||||||
#[cfg(unix)]
|
|
||||||
let terminate = async {
|
|
||||||
signal::unix::signal(signal::unix::SignalKind::terminate())
|
|
||||||
.expect("failed to install signal handler")
|
|
||||||
.recv()
|
|
||||||
.await;
|
|
||||||
};
|
|
||||||
|
|
||||||
#[cfg(not(unix))]
|
|
||||||
let terminate = std::future::pending::<()>();
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
_ = ctrl_c => {tracing::info!("received ctrl-c, shutting down web server")},
|
|
||||||
_ = terminate => {tracing::info!("received kill signal, shutting down web server")},
|
|
||||||
}
|
|
||||||
cancel.cancel();
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
58
src/server.rs
Normal file
58
src/server.rs
Normal file
|
|
@ -0,0 +1,58 @@
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use axum::{routing::post, Router};
|
||||||
|
use sqlx::SqlitePool;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
|
pub async fn spawn_server(
|
||||||
|
pool: SqlitePool,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> tokio::task::JoinHandle<()> {
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
let server = make_router(pool);
|
||||||
|
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
|
||||||
|
tracing::debug!("binding to {addr:?}");
|
||||||
|
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
|
||||||
|
axum::serve(listener, server)
|
||||||
|
.with_graceful_shutdown(graceful_shutdown(cancel))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_router(db: SqlitePool) -> Router {
|
||||||
|
Router::new()
|
||||||
|
.route(
|
||||||
|
"/api/v1/add-feed",
|
||||||
|
post(async move || {
|
||||||
|
tracing::debug!("got a post to add a feed");
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.with_state(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn graceful_shutdown(cancel: CancellationToken) {
|
||||||
|
use tokio::signal;
|
||||||
|
let ctrl_c = async {
|
||||||
|
signal::ctrl_c()
|
||||||
|
.await
|
||||||
|
.expect("failed to install Ctrl+C handler");
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
let terminate = async {
|
||||||
|
signal::unix::signal(signal::unix::SignalKind::terminate())
|
||||||
|
.expect("failed to install signal handler")
|
||||||
|
.recv()
|
||||||
|
.await;
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
let terminate = std::future::pending::<()>();
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = ctrl_c => {tracing::info!("received ctrl-c, shutting down web server")},
|
||||||
|
_ = terminate => {tracing::info!("received kill signal, shutting down web server")},
|
||||||
|
}
|
||||||
|
cancel.cancel();
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue