From 6d9cf16dada2e7ef156384ff4ed1c54129d5e73f Mon Sep 17 00:00:00 2001 From: Yaroslav Bolyukin Date: Sun, 14 Jun 2026 03:03:08 +0000 Subject: [PATCH] feat: drain stdout/stderr after signal --- --- a/Cargo.lock +++ b/Cargo.lock @@ -308,9 +308,9 @@ [[package]] name = "bifrostlink" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad2d0e30a2aa432b78f41f9676572f88201d4dc73bc2b7bc90704d2e02b7d062" +checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39" dependencies = [ "async-trait", "async_fn_traits", @@ -327,9 +327,9 @@ [[package]] name = "bifrostlink-macros" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2121559c45cbe89c4f8d1d741360d5b028577254f6beca053dc02332da85b43" +checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0" dependencies = [ "proc-macro2", "quote", @@ -338,9 +338,9 @@ [[package]] name = "bifrostlink-ports" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9395c4ccca497b0c50583e6de57aca921c046ae0c10f56030cd2c5a20db05f8" +checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70" dependencies = [ "bifrostlink", "bytes", @@ -1835,7 +1835,7 @@ [[package]] name = "polkit-backend" -version = "0.1.4" +version = "0.1.6" dependencies = [ "anyhow", "clap", @@ -2055,7 +2055,7 @@ [[package]] name = "remowt-agent" -version = "0.1.4" +version = "0.1.6" dependencies = [ "anyhow", "bifrostlink", @@ -2083,7 +2083,7 @@ [[package]] name = "remowt-client" -version = "0.1.4" +version = "0.1.6" dependencies = [ "anyhow", "bifrostlink", @@ -2106,7 +2106,7 @@ [[package]] name = "remowt-endpoints" -version = "0.1.4" +version = "0.1.6" dependencies = [ "anyhow", "bifrostlink", @@ -2124,7 +2124,7 @@ [[package]] name = "remowt-link-shared" -version = "0.1.4" +version = "0.1.6" dependencies = [ "bifrostlink", "bytes", @@ -2138,7 +2138,7 @@ [[package]] name = "remowt-plugin" -version = "0.1.4" +version = "0.1.6" dependencies = [ "anyhow", "bifrostlink", @@ -2152,7 +2152,7 @@ [[package]] name = "remowt-polkit-shared" -version = "0.1.4" +version = "0.1.6" dependencies = [ "nix", "serde", @@ -2161,7 +2161,7 @@ [[package]] name = "remowt-ssh" -version = "0.1.4" +version = "0.1.6" dependencies = [ "anyhow", "async-trait", @@ -2189,7 +2189,7 @@ [[package]] name = "remowt-ui-prompt" -version = "0.1.4" +version = "0.1.6" dependencies = [ "anyhow", "bifrostlink", --- a/Cargo.toml +++ b/Cargo.toml @@ -3,18 +3,18 @@ resolver = "2" [workspace.package] -version = "0.1.4" +version = "0.1.6" license = "MIT" edition = "2021" repository = "https://git.delta.rocks/r/remowt" [workspace.dependencies] -remowt-client = { version = "0.1.3", path = "crates/remowt-client" } -remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" } -remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" } -remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" } -remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" } -remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" } +remowt-client = { version = "0.1.6", path = "crates/remowt-client" } +remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" } +remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" } +remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" } +remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" } +remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" } bifrostlink = "0.2.0" bifrostlink-macros = "0.2.0" @@ -52,7 +52,9 @@ thiserror = "2.0.18" [profile.release] -panic = "abort" +panic = "unwind" opt-level = "z" lto = true codegen-units = 1 +debug = "full" +split-debuginfo = "off" --- a/cmds/remowt-agent/src/main.rs +++ b/cmds/remowt-agent/src/main.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap}; use std::fs::Permissions; use std::future::pending; +use std::io; use std::os::unix::fs::PermissionsExt as _; use std::path::PathBuf; use std::sync::{Arc, Mutex, OnceLock}; @@ -230,7 +231,7 @@ fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() - .with_writer(std::io::stderr) + .with_writer(io::stderr) .without_time() .init(); let opts = Opts::parse(); --- a/crates/remowt-client/src/lib.rs +++ b/crates/remowt-client/src/lib.rs @@ -17,13 +17,15 @@ use russh::keys::ssh_key::PublicKey; use russh::Channel; use tempfile::TempDir; +use tokio::io::AsyncRead; use tokio::net::UnixListener; use tokio::sync::oneshot; +use tokio::task::JoinHandle; use tokio::{ fs, - io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream}, + io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader}, }; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; use uuid::Uuid; pub mod editor; @@ -97,7 +99,7 @@ let mut child = SshExecChild::from_exec(ch); drop(child.stdin); - drain_stderr(child.stderr, cmd.to_owned()); + drain_to_tracing(child.stderr, cmd.to_owned(), true); let mut out = Vec::new(); child.stdout.read_to_end(&mut out).await?; @@ -317,7 +319,7 @@ .await?; let child = SshExecChild::from_exec(cmd_chan); - drain_stderr(child.stderr, "agent".to_owned()); + drain_to_tracing(child.stderr, "agent".to_owned(), true); rpc.add_direct( Address::Agent, child_port(child.stdout, child.stdin), @@ -518,20 +520,33 @@ } } -fn drain_stderr(stream: DuplexStream, context: String) { +pub(crate) fn drain_to_tracing( + stream: impl AsyncRead + Unpin + 'static + Send, + context: String, + stderr: bool, +) -> JoinHandle<()> { tokio::spawn(async move { - let mut reader = BufReader::new(stream).lines(); + let mut reader = BufReader::new(stream); + let mut buf = Vec::with_capacity(4096); loop { - match reader.next_line().await { - Ok(Some(line)) => warn!(context = %context, "{line}"), - Ok(None) => break, + buf.clear(); + match reader.read_until(b'\n', &mut buf).await { + Ok(0) => break, + Ok(_) => { + let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf)); + if stderr { + warn!(context = %context, "{line}"); + } else { + info!(context = %context, "{line}"); + } + } Err(e) => { - warn!(context = %context, "stderr read failed: {e}"); + warn!(context = %context, "child stdio read failed: {e}"); break; } } } - }); + }) } fn local_runtime_dir() -> Result<(Utf8PathBuf, Option)> { --- a/crates/remowt-client/src/subprocess.rs +++ b/crates/remowt-client/src/subprocess.rs @@ -6,13 +6,13 @@ use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient}; use remowt_link_shared::BifConfig; use serde::de::DeserializeOwned; -use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader}; +use tokio::io::AsyncWriteExt as _; use tokio::select; use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec}; use tracing::{debug, info, warn}; use crate::forwarded::{RemowtListener, RemowtStream}; -use crate::Remowt; +use crate::{drain_to_tracing, Remowt}; #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub enum StdioMode { @@ -62,12 +62,24 @@ client, } = self; drop(stdin); - drop(stdout); - drop(stderr); - client - .wait(id) - .await? - .map_err(|e| anyhow!("agent wait failed: {e}")) + let drain_out = async move { + if let Some(s) = stdout { + drain_to_tracing(s, "".to_owned(), false).await; + } + }; + let drain_err = async move { + if let Some(s) = stderr { + drain_to_tracing(s, "".to_owned(), true).await; + } + }; + let wait = async move { + client + .wait(id) + .await? + .map_err(|e| anyhow!("agent wait failed: {e}")) + }; + let (code, _, _) = tokio::join!(wait, drain_out, drain_err); + code } pub async fn kill(&self, signal: i32) -> Result<()> { @@ -163,7 +175,7 @@ ); let stdin_stream = handle_stdin(stdin, stdin_res?, &program); - let stdout_stream = handle_output(stdout, stdout_res?, &program, false); + let stdout_stream = handle_output(stdout, stdout_res?, &program); let stderr_stream = handle_output_err(stderr, stderr_res?, &program); Ok(RemowtChild { @@ -215,18 +227,13 @@ } } -fn handle_output( - mode: StdioMode, - s: Option, - program: &str, - is_stderr: bool, -) -> Option { +fn handle_output(mode: StdioMode, s: Option, program: &str) -> Option { match mode { StdioMode::Pipe => s, StdioMode::Inherit => { if let Some(s) = s { let program = program.to_owned(); - tokio::spawn(pump_to_tracing(s, program, is_stderr)); + tokio::spawn(drain_to_tracing(s, program, false)); } None } @@ -244,7 +251,7 @@ StderrMode::Inherit => { if let Some(s) = s { let program = program.to_owned(); - tokio::spawn(pump_to_tracing(s, program, true)); + tokio::spawn(drain_to_tracing(s, program, true)); } None } @@ -252,26 +259,6 @@ } } -async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) { - let mut reader = BufReader::new(stream).lines(); - loop { - match reader.next_line().await { - Ok(Some(line)) => { - if is_stderr { - warn!(program, "{line}"); - } else { - info!(program, "{line}"); - } - } - Ok(None) => break, - Err(e) => { - warn!(program, "child log stream error: {e}"); - break; - } - } - } -} - fn escape_bash(input: &str, out: &mut String) { const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}"; if input.chars().all(|c| !TO_ESCAPE.contains(c)) { @@ -422,6 +409,27 @@ } }; + while let Some(e) = err.next().await { + if let Ok(line) = e { + warn!(program = %program, "{line}"); + } + } + if want_stdout { + if let Some(out_bytes) = out_bytes.as_mut() { + while let Some(o) = out_bytes.next().await { + if let Ok(chunk) = o { + buf.as_mut().expect("want_stdout").extend_from_slice(&chunk); + } + } + } + } else if let Some(out_lines) = out_lines.as_mut() { + while let Some(o) = out_lines.next().await { + if let Ok(line) = o { + info!(program = %program, "{line}"); + } + } + } + match exit { Some(0) => Ok(buf), Some(c) => bail!("command '{line}' failed with status {c}"), --- a/crates/remowt-link-shared/src/port.rs +++ b/crates/remowt-link-shared/src/port.rs @@ -3,10 +3,8 @@ use bifrostlink::Port; use bytes::{Bytes, BytesMut}; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; +use tokio::select; -/// Wire a length-prefixed duplex byte stream (e.g. a child process's -/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32` -/// length followed by that many payload bytes. pub fn child_port(mut reader: R, mut writer: W) -> Port where R: AsyncRead + Unpin + Send + 'static, @@ -18,13 +16,13 @@ let len = match reader.read_u32().await { Ok(len) => len, Err(e) => { - tracing::error!("child read failed: {e}"); + log_read_end(&e); break; } }; let mut buf = BytesMut::zeroed(len as usize); if let Err(e) = reader.read_exact(&mut buf).await { - tracing::error!("child read failed: {e}"); + log_read_end(&e); break; } if tx.send(buf.freeze()).is_err() { @@ -35,15 +33,45 @@ let write_task = async move { while let Some(msg) = rx.recv().await { if let Err(e) = write_frame(&mut writer, msg).await { - tracing::error!("child write failed: {e}"); + log_write_end(&e); break; } } }; - tokio::join!(read_task, write_task); + select! { + _ = read_task => {}, + _ = write_task => {}, + } }) } +fn log_read_end(e: &io::Error) { + if matches!( + e.kind(), + io::ErrorKind::UnexpectedEof + | io::ErrorKind::BrokenPipe + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionAborted + ) { + tracing::debug!("child read ended: {e}"); + } else { + tracing::error!("child read failed: {e}"); + } +} + +fn log_write_end(e: &io::Error) { + if matches!( + e.kind(), + io::ErrorKind::BrokenPipe + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionAborted + ) { + tracing::debug!("child write ended: {e}"); + } else { + tracing::error!("child write failed: {e}"); + } +} + async fn write_frame(writer: &mut W, msg: Bytes) -> io::Result<()> { let len = u32::try_from(msg.len()) .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?; -- gitstuff