git.delta.rocks / jrsonnet / refs/commits / 4593f252154e

difftreelog

refactor drop pusher for now

xnoyxswxYaroslav Bolyukin2026-04-18parent: #4973cba.patch.diff
in: trunk

2 files changed

deletedcmds/pusher/Cargo.tomldiffbeforeafterboth
--- a/cmds/pusher/Cargo.toml
+++ /dev/null
@@ -1,15 +0,0 @@
-[package]
-name = "pusher"
-version.workspace = true
-edition.workspace = true
-rust-version.workspace = true
-
-[dependencies]
-anyhow.workspace = true
-axum.workspace = true
-axum-extra.workspace = true
-futures-util.workspace = true
-hyper.workspace = true
-tokio = { workspace = true, features = ["macros", "process", "rt"] }
-tokio-util = { workspace = true, features = ["io"] }
-tracing.workspace = true
deletedcmds/pusher/src/main.rsdiffbeforeafterboth
before · cmds/pusher/src/main.rs
1use std::process::Stdio;2use std::{io, pin::pin};34use anyhow::Result;5use axum::Router;6use axum::body::Bytes;7use axum::extract::WebSocketUpgrade;8use axum::extract::ws::{Message, WebSocket};9use axum::response::Response;10use axum::routing::{any, get};11use axum_extra::{12	TypedHeader,13	headers::{Authorization, authorization::Bearer},14};15use futures_util::{SinkExt, StreamExt as _};16use tokio::net::TcpListener;17use tokio::process::Command;18use tokio_util::io::{CopyToBytes, SinkWriter, StreamReader};1920async fn push(bearer: TypedHeader<Authorization<Bearer>>, req: WebSocketUpgrade) -> Response {21	if bearer.token() != "test-token" {22		todo!()23		// return Response::builder().status(403).body(()).expect("build");24	}25	req.on_upgrade(serve_nix)26}27async fn list_generations(bearer: TypedHeader<Authorization<Bearer>>, machine: String) -> Response {28	if bearer.token() != "test-token" {29		todo!()30		// return Response::builder().status(403).body(()).expect("build");31	}32	todo!()33}3435async fn serve_nix(ws: WebSocket) {36	let child = Command::new("nix-store")37		.arg("--serve")38		.arg("--write")39		.stdin(Stdio::piped())40		.stdout(Stdio::piped())41		.spawn()42		.unwrap();4344	let (tx, rx) = ws.split();4546	let mut read = pin!(StreamReader::new(rx.filter_map(|msg| async move {47		match msg {48			Ok(Message::Binary(b)) => Some(Ok(b)),49			Ok(Message::Ping(_)) => None,50			Ok(Message::Close(_)) => None,51			Ok(_) => Some(Err(io::Error::other("unexpected frame"))),52			Err(e) => Some(Err(io::Error::other(e))),53		}54	})));55	let mut write = pin!(SinkWriter::new(CopyToBytes::new(56		tx.with(|data: Bytes| async move { Ok::<_, axum::Error>(Message::Binary(data)) })57			.sink_map_err(|_| io::Error::other("idk")),58	)));59	let _ = tokio::io::copy(&mut read, &mut child.stdin.expect("stdin")).await;60	let _ = tokio::io::copy(&mut child.stdout.expect("stdout"), &mut write).await;61}6263#[tokio::main]64async fn main() -> Result<()> {65	let router = Router::new()66		.route("/push", any(push))67		.route("/generations/{machine}", get(list_generations));68	axum::serve(TcpListener::bind("localhost:8111").await?, router).await?;69	Ok(())70}