From 8228548e369bf5cedf2aa61a42870ec3d600cf28 Mon Sep 17 00:00:00 2001 From: Yaroslav Bolyukin Date: Fri, 19 Dec 2025 23:29:20 +0000 Subject: [PATCH] feat: reference pusher server --- --- /dev/null +++ b/cmds/pusher/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "pusher" +version.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow.workspace = true +axum = { version = "0.8.7", features = ["http2", "macros", "ws"] } +axum-extra = { version = "0.12.2", features = ["typed-header"] } +futures-util = { version = "0.3.31", features = ["sink"] } +hyper = "1.8.1" +tokio = { workspace = true, features = ["macros", "process", "rt"] } +tokio-util = { version = "0.7.17", features = ["io"] } +tracing.workspace = true --- /dev/null +++ b/cmds/pusher/src/main.rs @@ -0,0 +1,70 @@ +use std::process::Stdio; +use std::{io, pin::pin}; + +use anyhow::Result; +use axum::Router; +use axum::body::Bytes; +use axum::extract::WebSocketUpgrade; +use axum::extract::ws::{Message, WebSocket}; +use axum::response::Response; +use axum::routing::{any, get}; +use axum_extra::{ + TypedHeader, + headers::{Authorization, authorization::Bearer}, +}; +use futures_util::{SinkExt, StreamExt as _}; +use tokio::net::TcpListener; +use tokio::process::Command; +use tokio_util::io::{CopyToBytes, SinkWriter, StreamReader}; + +async fn push(bearer: TypedHeader>, req: WebSocketUpgrade) -> Response { + if bearer.token() != "test-token" { + todo!() + // return Response::builder().status(403).body(()).expect("build"); + } + req.on_upgrade(serve_nix) +} +async fn list_generations(bearer: TypedHeader>, machine: String) -> Response { + if bearer.token() != "test-token" { + todo!() + // return Response::builder().status(403).body(()).expect("build"); + } + todo!() +} + +async fn serve_nix(ws: WebSocket) { + let child = Command::new("nix-store") + .arg("--serve") + .arg("--write") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let (tx, rx) = ws.split(); + + let mut read = pin!(StreamReader::new(rx.filter_map(|msg| async move { + match msg { + Ok(Message::Binary(b)) => Some(Ok(b)), + Ok(Message::Ping(_)) => None, + Ok(Message::Close(_)) => None, + Ok(_) => Some(Err(io::Error::other("unexpected frame"))), + Err(e) => Some(Err(io::Error::other(e))), + } + }))); + let mut write = pin!(SinkWriter::new(CopyToBytes::new( + tx.with(|data: Bytes| async move { Ok::<_, axum::Error>(Message::Binary(data)) }) + .sink_map_err(|_| io::Error::other("idk")), + ))); + let _ = tokio::io::copy(&mut read, &mut child.stdin.expect("stdin")).await; + let _ = tokio::io::copy(&mut child.stdout.expect("stdout"), &mut write).await; +} + +#[tokio::main] +async fn main() -> Result<()> { + let router = Router::new() + .route("/push", any(push)) + .route("/generations/{machine}", get(list_generations)); + axum::serve(TcpListener::bind("localhost:8111").await?, router).await?; + Ok(()) +} -- gitstuff