difftreelog
refactor drop pusher for now
in: trunk
2 files changed
cmds/pusher/Cargo.tomldiffbeforeafterboth--- a/cmds/pusher/Cargo.toml
+++ /dev/null
@@ -1,15 +0,0 @@
-[package]
-name = "pusher"
-version.workspace = true
-edition.workspace = true
-rust-version.workspace = true
-
-[dependencies]
-anyhow.workspace = true
-axum.workspace = true
-axum-extra.workspace = true
-futures-util.workspace = true
-hyper.workspace = true
-tokio = { workspace = true, features = ["macros", "process", "rt"] }
-tokio-util = { workspace = true, features = ["io"] }
-tracing.workspace = true
cmds/pusher/src/main.rsdiffbeforeafterboth1use std::process::Stdio;2use std::{io, pin::pin};34use anyhow::Result;5use axum::Router;6use axum::body::Bytes;7use axum::extract::WebSocketUpgrade;8use axum::extract::ws::{Message, WebSocket};9use axum::response::Response;10use axum::routing::{any, get};11use axum_extra::{12 TypedHeader,13 headers::{Authorization, authorization::Bearer},14};15use futures_util::{SinkExt, StreamExt as _};16use tokio::net::TcpListener;17use tokio::process::Command;18use tokio_util::io::{CopyToBytes, SinkWriter, StreamReader};1920async fn push(bearer: TypedHeader<Authorization<Bearer>>, req: WebSocketUpgrade) -> Response {21 if bearer.token() != "test-token" {22 todo!()23 // return Response::builder().status(403).body(()).expect("build");24 }25 req.on_upgrade(serve_nix)26}27async fn list_generations(bearer: TypedHeader<Authorization<Bearer>>, machine: String) -> Response {28 if bearer.token() != "test-token" {29 todo!()30 // return Response::builder().status(403).body(()).expect("build");31 }32 todo!()33}3435async fn serve_nix(ws: WebSocket) {36 let child = Command::new("nix-store")37 .arg("--serve")38 .arg("--write")39 .stdin(Stdio::piped())40 .stdout(Stdio::piped())41 .spawn()42 .unwrap();4344 let (tx, rx) = ws.split();4546 let mut read = pin!(StreamReader::new(rx.filter_map(|msg| async move {47 match msg {48 Ok(Message::Binary(b)) => Some(Ok(b)),49 Ok(Message::Ping(_)) => None,50 Ok(Message::Close(_)) => None,51 Ok(_) => Some(Err(io::Error::other("unexpected frame"))),52 Err(e) => Some(Err(io::Error::other(e))),53 }54 })));55 let mut write = pin!(SinkWriter::new(CopyToBytes::new(56 tx.with(|data: Bytes| async move { Ok::<_, axum::Error>(Message::Binary(data)) })57 .sink_map_err(|_| io::Error::other("idk")),58 )));59 let _ = tokio::io::copy(&mut read, &mut child.stdin.expect("stdin")).await;60 let _ = tokio::io::copy(&mut child.stdout.expect("stdout"), &mut write).await;61}6263#[tokio::main]64async fn main() -> Result<()> {65 let router = Router::new()66 .route("/push", any(push))67 .route("/generations/{machine}", get(list_generations));68 axum::serve(TcpListener::bind("localhost:8111").await?, router).await?;69 Ok(())70}