352 lines
9.5 KiB
Rust
352 lines
9.5 KiB
Rust
use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
|
|
|
|
use axum::{
|
|
Extension, Json,
|
|
body::Bytes,
|
|
extract::{ConnectInfo, Query, State},
|
|
http::StatusCode,
|
|
response::IntoResponse,
|
|
};
|
|
use julid::Julid;
|
|
use log::{debug, info, warn};
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::sync::mpsc::unbounded_channel;
|
|
|
|
use crate::{
|
|
JoecalState, TransferEvent, UploadDialog,
|
|
error::{LocalSendError, Result},
|
|
models::{Device, FileMetadata},
|
|
};
|
|
|
|
#[derive(Deserialize, Serialize)]
|
|
pub struct Session {
|
|
pub session_id: String,
|
|
pub files: HashMap<String, FileMetadata>,
|
|
pub file_tokens: HashMap<String, String>,
|
|
pub receiver: Device,
|
|
pub sender: Device,
|
|
pub status: SessionStatus,
|
|
pub addr: SocketAddr,
|
|
}
|
|
|
|
#[derive(PartialEq, Deserialize, Serialize)]
|
|
pub enum SessionStatus {
|
|
Pending,
|
|
Active,
|
|
Completed,
|
|
Failed,
|
|
Cancelled,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct PrepareUploadResponse {
|
|
pub session_id: String,
|
|
pub files: HashMap<String, String>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct PrepareUploadRequest {
|
|
pub info: Device,
|
|
pub files: HashMap<String, FileMetadata>,
|
|
}
|
|
|
|
impl JoecalState {
|
|
pub async fn prepare_upload(
|
|
&self,
|
|
peer: String,
|
|
files: HashMap<String, FileMetadata>,
|
|
) -> Result<PrepareUploadResponse> {
|
|
if !self.peers.lock().await.contains_key(&peer) {
|
|
return Err(LocalSendError::PeerNotFound);
|
|
}
|
|
|
|
let peer = self.peers.lock().await.get(&peer).unwrap().clone();
|
|
|
|
let response = self
|
|
.client
|
|
.post(format!(
|
|
"{}://{}/api/localsend/v2/prepare-upload",
|
|
peer.1.protocol,
|
|
peer.0.clone()
|
|
))
|
|
.json(&PrepareUploadRequest {
|
|
info: self.device.clone(),
|
|
files: files.clone(),
|
|
})
|
|
.send()
|
|
.await?;
|
|
|
|
debug!("Response: {response:?}");
|
|
|
|
let response: PrepareUploadResponse = response.json().await?;
|
|
|
|
let session = Session {
|
|
session_id: response.session_id.clone(),
|
|
files,
|
|
file_tokens: response.files.clone(),
|
|
receiver: peer.1,
|
|
sender: self.device.clone(),
|
|
status: SessionStatus::Active,
|
|
addr: peer.0,
|
|
};
|
|
|
|
self.sessions
|
|
.lock()
|
|
.await
|
|
.insert(response.session_id.clone(), session);
|
|
|
|
Ok(response)
|
|
}
|
|
|
|
pub async fn upload(
|
|
&self,
|
|
session_id: String,
|
|
file_id: String,
|
|
token: String,
|
|
body: Bytes,
|
|
) -> Result<()> {
|
|
let sessions = self.sessions.lock().await;
|
|
let session = sessions.get(&session_id).unwrap();
|
|
|
|
if session.status != SessionStatus::Active {
|
|
return Err(LocalSendError::SessionInactive);
|
|
}
|
|
|
|
if session.file_tokens.get(&file_id) != Some(&token) {
|
|
return Err(LocalSendError::InvalidToken);
|
|
}
|
|
|
|
let request = self.client
|
|
.post(format!(
|
|
"{}://{}/api/localsend/v2/upload?sessionId={session_id}&fileId={file_id}&token={token}",
|
|
session.receiver.protocol, session.addr))
|
|
.body(body);
|
|
|
|
debug!("Uploading file: {request:?}");
|
|
let response = request.send().await?;
|
|
|
|
if response.status() != 200 {
|
|
warn!("Upload failed: {response:?}");
|
|
return Err(LocalSendError::UploadFailed);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn send_file(&self, peer: String, file_path: PathBuf) -> Result<()> {
|
|
// Generate file metadata
|
|
let file_metadata = FileMetadata::from_path(&file_path)?;
|
|
|
|
// Prepare files map
|
|
let mut files = HashMap::new();
|
|
files.insert(file_metadata.id.clone(), file_metadata.clone());
|
|
|
|
// Prepare upload
|
|
let prepare_response = self.prepare_upload(peer, files).await?;
|
|
|
|
// Get file token
|
|
let token = prepare_response
|
|
.files
|
|
.get(&file_metadata.id)
|
|
.ok_or(LocalSendError::InvalidToken)?;
|
|
|
|
// Read file contents
|
|
let file_contents = tokio::fs::read(&file_path).await?;
|
|
let bytes = Bytes::from(file_contents);
|
|
|
|
// Upload file
|
|
self.upload(
|
|
prepare_response.session_id,
|
|
file_metadata.id,
|
|
token.clone(),
|
|
bytes,
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn cancel_upload(&self, session_id: String) -> Result<()> {
|
|
let sessions = self.sessions.lock().await;
|
|
let session = sessions.get(&session_id).unwrap();
|
|
|
|
let request = self
|
|
.client
|
|
.post(format!(
|
|
"{}://{}/api/localsend/v2/cancel?sessionId={}",
|
|
session.receiver.protocol, session.addr, session_id
|
|
))
|
|
.send()
|
|
.await?;
|
|
|
|
if request.status() != 200 {
|
|
return Err(LocalSendError::CancelFailed);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub async fn register_prepare_upload(
|
|
State(state): State<JoecalState>,
|
|
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
|
Json(req): Json<PrepareUploadRequest>,
|
|
) -> impl IntoResponse {
|
|
info!("Received upload request from alias: {}", req.info.alias);
|
|
|
|
let id = Julid::new();
|
|
let (tx, mut rx) = unbounded_channel();
|
|
// be sure to clear this request before this function exits!
|
|
state.add_upload_request(id, tx).await;
|
|
|
|
let dialog_send = state.transfer_event_tx.send(TransferEvent::UploadRequest {
|
|
alias: req.info.alias.clone(),
|
|
id,
|
|
});
|
|
match dialog_send {
|
|
Ok(_) => {}
|
|
Err(_e) => {
|
|
state.clear_upload_request(id).await;
|
|
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
|
|
}
|
|
}
|
|
|
|
let confirmation = rx.recv().await;
|
|
state.clear_upload_request(id).await;
|
|
|
|
let Some(confirmation) = confirmation else {
|
|
// the frontend must have dropped the tx before trying to send a reply back
|
|
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
|
|
};
|
|
|
|
if confirmation != UploadDialog::UploadConfirm {
|
|
return StatusCode::FORBIDDEN.into_response();
|
|
}
|
|
|
|
let session_id = id.as_string();
|
|
|
|
let file_tokens: HashMap<String, String> = req
|
|
.files
|
|
.keys()
|
|
.map(|id| (id.clone(), Julid::new().to_string())) // Replace with actual token logic
|
|
.collect();
|
|
|
|
let session = Session {
|
|
session_id: session_id.clone(),
|
|
files: req.files.clone(),
|
|
file_tokens: file_tokens.clone(),
|
|
receiver: state.device.clone(),
|
|
sender: req.info.clone(),
|
|
status: SessionStatus::Active,
|
|
addr,
|
|
};
|
|
|
|
state
|
|
.sessions
|
|
.lock()
|
|
.await
|
|
.insert(session_id.clone(), session);
|
|
|
|
(
|
|
StatusCode::OK,
|
|
Json(PrepareUploadResponse {
|
|
session_id,
|
|
files: file_tokens,
|
|
}),
|
|
)
|
|
.into_response()
|
|
}
|
|
|
|
pub async fn register_upload(
|
|
Query(params): Query<UploadParams>,
|
|
State(state): State<JoecalState>,
|
|
Extension(download_dir): Extension<String>,
|
|
body: Bytes,
|
|
) -> impl IntoResponse {
|
|
// Extract query parameters
|
|
let session_id = ¶ms.session_id;
|
|
let file_id = ¶ms.file_id;
|
|
let token = ¶ms.token;
|
|
|
|
// Get session and validate
|
|
let mut sessions_lock = state.sessions.lock().await;
|
|
let session = match sessions_lock.get_mut(session_id) {
|
|
Some(session) => session,
|
|
None => return StatusCode::BAD_REQUEST.into_response(),
|
|
};
|
|
|
|
if session.status != SessionStatus::Active {
|
|
return StatusCode::BAD_REQUEST.into_response();
|
|
}
|
|
|
|
// Validate token
|
|
if session.file_tokens.get(file_id) != Some(&token.to_string()) {
|
|
return StatusCode::FORBIDDEN.into_response();
|
|
}
|
|
|
|
// Get file metadata
|
|
let file_metadata = match session.files.get(file_id) {
|
|
Some(metadata) => metadata,
|
|
None => {
|
|
return (
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
"File not found".to_string(),
|
|
)
|
|
.into_response();
|
|
}
|
|
};
|
|
|
|
// Create directory if it doesn't exist
|
|
if let Err(e) = tokio::fs::create_dir_all(&*download_dir).await {
|
|
return (
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
format!("Failed to create directory: {e}"),
|
|
)
|
|
.into_response();
|
|
}
|
|
|
|
// Create file path
|
|
let file_path = format!("{}/{}", download_dir, file_metadata.file_name);
|
|
|
|
// Write file
|
|
if let Err(e) = tokio::fs::write(&file_path, body).await {
|
|
return (
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
format!("Failed to write file: {e}"),
|
|
)
|
|
.into_response();
|
|
}
|
|
|
|
StatusCode::OK.into_response()
|
|
}
|
|
|
|
// Query parameters struct
|
|
#[derive(Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct UploadParams {
|
|
session_id: String,
|
|
file_id: String,
|
|
token: String,
|
|
}
|
|
|
|
pub async fn register_cancel(
|
|
Query(params): Query<CancelParams>,
|
|
State(state): State<JoecalState>,
|
|
) -> impl IntoResponse {
|
|
let mut sessions_lock = state.sessions.lock().await;
|
|
let session = match sessions_lock.get_mut(¶ms.session_id) {
|
|
Some(session) => session,
|
|
None => return StatusCode::BAD_REQUEST.into_response(),
|
|
};
|
|
session.status = SessionStatus::Cancelled;
|
|
StatusCode::OK.into_response()
|
|
}
|
|
|
|
// Cancel parameters struct
|
|
#[derive(Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct CancelParams {
|
|
session_id: String,
|
|
}
|