git.delta.rocks / jrsonnet / refs/commits / 8228548e369b

difftreelog

feat reference pusher server

wmkppnkuYaroslav Bolyukin2026-01-22parent: #b10bfb8.patch.diff
in: trunk

2 files changed

addedcmds/pusher/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/cmds/pusher/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "pusher"
+version.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+axum = { version = "0.8.7", features = ["http2", "macros", "ws"] }
+axum-extra = { version = "0.12.2", features = ["typed-header"] }
+futures-util = { version = "0.3.31", features = ["sink"] }
+hyper = "1.8.1"
+tokio = { workspace = true, features = ["macros", "process", "rt"] }
+tokio-util = { version = "0.7.17", features = ["io"] }
+tracing.workspace = true
addedcmds/pusher/src/main.rsdiffbeforeafterboth
after · cmds/pusher/src/main.rs
1use std::process::Stdio;2use std::{io, pin::pin};34use anyhow::Result;5use axum::Router;6use axum::body::Bytes;7use axum::extract::WebSocketUpgrade;8use axum::extract::ws::{Message, WebSocket};9use axum::response::Response;10use axum::routing::{any, get};11use axum_extra::{12	TypedHeader,13	headers::{Authorization, authorization::Bearer},14};15use futures_util::{SinkExt, StreamExt as _};16use tokio::net::TcpListener;17use tokio::process::Command;18use tokio_util::io::{CopyToBytes, SinkWriter, StreamReader};1920async fn push(bearer: TypedHeader<Authorization<Bearer>>, req: WebSocketUpgrade) -> Response {21	if bearer.token() != "test-token" {22		todo!()23		// return Response::builder().status(403).body(()).expect("build");24	}25	req.on_upgrade(serve_nix)26}27async fn list_generations(bearer: TypedHeader<Authorization<Bearer>>, machine: String) -> Response {28	if bearer.token() != "test-token" {29		todo!()30		// return Response::builder().status(403).body(()).expect("build");31	}32	todo!()33}3435async fn serve_nix(ws: WebSocket) {36	let child = Command::new("nix-store")37		.arg("--serve")38		.arg("--write")39		.stdin(Stdio::piped())40		.stdout(Stdio::piped())41		.spawn()42		.unwrap();4344	let (tx, rx) = ws.split();4546	let mut read = pin!(StreamReader::new(rx.filter_map(|msg| async move {47		match msg {48			Ok(Message::Binary(b)) => Some(Ok(b)),49			Ok(Message::Ping(_)) => None,50			Ok(Message::Close(_)) => None,51			Ok(_) => Some(Err(io::Error::other("unexpected frame"))),52			Err(e) => Some(Err(io::Error::other(e))),53		}54	})));55	let mut write = pin!(SinkWriter::new(CopyToBytes::new(56		tx.with(|data: Bytes| async move { Ok::<_, axum::Error>(Message::Binary(data)) })57			.sink_map_err(|_| io::Error::other("idk")),58	)));59	let _ = tokio::io::copy(&mut read, &mut child.stdin.expect("stdin")).await;60	let _ = tokio::io::copy(&mut child.stdout.expect("stdout"), &mut write).await;61}6263#[tokio::main]64async fn main() -> Result<()> {65	let router = Router::new()66		.route("/push", any(push))67		.route("/generations/{machine}", get(list_generations));68	axum::serve(TcpListener::bind("localhost:8111").await?, router).await?;69	Ok(())70}