git.delta.rocks / remowt / refs/commits / 6d9cf16dada2

difftreelog

feat drain stdout/stderr after signal

vmkslqpuYaroslav Bolyukin4 days agoparent: #eadb0bb.patch.diff
in: trunk

6 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,9 +308,9 @@
 
 [[package]]
 name = "bifrostlink"
-version = "0.2.4"
+version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad2d0e30a2aa432b78f41f9676572f88201d4dc73bc2b7bc90704d2e02b7d062"
+checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39"
 dependencies = [
  "async-trait",
  "async_fn_traits",
@@ -327,9 +327,9 @@
 
 [[package]]
 name = "bifrostlink-macros"
-version = "0.2.4"
+version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2121559c45cbe89c4f8d1d741360d5b028577254f6beca053dc02332da85b43"
+checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -338,9 +338,9 @@
 
 [[package]]
 name = "bifrostlink-ports"
-version = "0.2.4"
+version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e9395c4ccca497b0c50583e6de57aca921c046ae0c10f56030cd2c5a20db05f8"
+checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70"
 dependencies = [
  "bifrostlink",
  "bytes",
@@ -1835,7 +1835,7 @@
 
 [[package]]
 name = "polkit-backend"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "clap",
@@ -2055,7 +2055,7 @@
 
 [[package]]
 name = "remowt-agent"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2083,7 +2083,7 @@
 
 [[package]]
 name = "remowt-client"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2106,7 +2106,7 @@
 
 [[package]]
 name = "remowt-endpoints"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2124,7 +2124,7 @@
 
 [[package]]
 name = "remowt-link-shared"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "bifrostlink",
  "bytes",
@@ -2138,7 +2138,7 @@
 
 [[package]]
 name = "remowt-plugin"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2152,7 +2152,7 @@
 
 [[package]]
 name = "remowt-polkit-shared"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "nix",
  "serde",
@@ -2161,7 +2161,7 @@
 
 [[package]]
 name = "remowt-ssh"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -2189,7 +2189,7 @@
 
 [[package]]
 name = "remowt-ui-prompt"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
modifiedCargo.tomldiffbeforeafterboth
before · Cargo.toml
1[workspace]2members = ["cmds/*", "crates/*"]3resolver = "2"45[workspace.package]6version = "0.1.4"7license = "MIT"8edition = "2021"9repository = "https://git.delta.rocks/r/remowt"1011[workspace.dependencies]12remowt-client = { version = "0.1.3", path = "crates/remowt-client" }13remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }14remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }15remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }16remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }17remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }1819bifrostlink = "0.2.0"20bifrostlink-macros = "0.2.0"21bifrostlink-ports = "0.2.0"2223camino = { version = "1.2.2", features = ["serde1"] }24anyhow = "1.0.86"25async-trait = "0.1.81"26bytes = "1.11.0"27clap = "4.5.16"28futures = "0.3.30"29futures-util = "0.3.30"30nix = "0.31.3"31openssh = "0.11.0"32pam-client = "0.5.0"33rand = "0.10.1"34russh = { version = "0.61.2", default-features = false, features = [35	"ring",36	"flate2",37	"rsa",38] }39russh-config = "0.58.0"40serde = "1.0.228"41serde_json = "1.0.149"42tempdir = "0.3.7"43tempfile = "3"44tokio = { version = "1.39.3", features = ["fs"] }45tokio-stream = "0.1.15"46tokio-util = "0.7.11"47tracing = "0.1.40"48tracing-subscriber = "0.3.18"49uuid = "1.10.0"50zbus = "5.16.0"51zbus_polkit = "5.0.0"52thiserror = "2.0.18"5354[profile.release]55panic = "abort"56opt-level = "z"57lto = true58codegen-units = 1
after · Cargo.toml
1[workspace]2members = ["cmds/*", "crates/*"]3resolver = "2"45[workspace.package]6version = "0.1.6"7license = "MIT"8edition = "2021"9repository = "https://git.delta.rocks/r/remowt"1011[workspace.dependencies]12remowt-client = { version = "0.1.6", path = "crates/remowt-client" }13remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" }14remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" }15remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" }16remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" }17remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" }1819bifrostlink = "0.2.0"20bifrostlink-macros = "0.2.0"21bifrostlink-ports = "0.2.0"2223camino = { version = "1.2.2", features = ["serde1"] }24anyhow = "1.0.86"25async-trait = "0.1.81"26bytes = "1.11.0"27clap = "4.5.16"28futures = "0.3.30"29futures-util = "0.3.30"30nix = "0.31.3"31openssh = "0.11.0"32pam-client = "0.5.0"33rand = "0.10.1"34russh = { version = "0.61.2", default-features = false, features = [35	"ring",36	"flate2",37	"rsa",38] }39russh-config = "0.58.0"40serde = "1.0.228"41serde_json = "1.0.149"42tempdir = "0.3.7"43tempfile = "3"44tokio = { version = "1.39.3", features = ["fs"] }45tokio-stream = "0.1.15"46tokio-util = "0.7.11"47tracing = "0.1.40"48tracing-subscriber = "0.3.18"49uuid = "1.10.0"50zbus = "5.16.0"51zbus_polkit = "5.0.0"52thiserror = "2.0.18"5354[profile.release]55panic = "unwind"56opt-level = "z"57lto = true58codegen-units = 159debug = "full"60split-debuginfo = "off"
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -2,6 +2,7 @@
 use std::collections::{BTreeMap, HashMap};
 use std::fs::Permissions;
 use std::future::pending;
+use std::io;
 use std::os::unix::fs::PermissionsExt as _;
 use std::path::PathBuf;
 use std::sync::{Arc, Mutex, OnceLock};
@@ -230,7 +231,7 @@
 
 fn main() -> anyhow::Result<()> {
 	tracing_subscriber::fmt()
-		.with_writer(std::io::stderr)
+		.with_writer(io::stderr)
 		.without_time()
 		.init();
 	let opts = Opts::parse();
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -17,13 +17,15 @@
 use russh::keys::ssh_key::PublicKey;
 use russh::Channel;
 use tempfile::TempDir;
+use tokio::io::AsyncRead;
 use tokio::net::UnixListener;
 use tokio::sync::oneshot;
+use tokio::task::JoinHandle;
 use tokio::{
 	fs,
-	io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},
+	io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},
 };
-use tracing::{debug, warn};
+use tracing::{debug, info, warn};
 use uuid::Uuid;
 
 pub mod editor;
@@ -97,7 +99,7 @@
 
 	let mut child = SshExecChild::from_exec(ch);
 	drop(child.stdin);
-	drain_stderr(child.stderr, cmd.to_owned());
+	drain_to_tracing(child.stderr, cmd.to_owned(), true);
 
 	let mut out = Vec::new();
 	child.stdout.read_to_end(&mut out).await?;
@@ -317,7 +319,7 @@
 			.await?;
 
 		let child = SshExecChild::from_exec(cmd_chan);
-		drain_stderr(child.stderr, "agent".to_owned());
+		drain_to_tracing(child.stderr, "agent".to_owned(), true);
 		rpc.add_direct(
 			Address::Agent,
 			child_port(child.stdout, child.stdin),
@@ -518,20 +520,33 @@
 	}
 }
 
-fn drain_stderr(stream: DuplexStream, context: String) {
+pub(crate) fn drain_to_tracing(
+	stream: impl AsyncRead + Unpin + 'static + Send,
+	context: String,
+	stderr: bool,
+) -> JoinHandle<()> {
 	tokio::spawn(async move {
-		let mut reader = BufReader::new(stream).lines();
+		let mut reader = BufReader::new(stream);
+		let mut buf = Vec::with_capacity(4096);
 		loop {
-			match reader.next_line().await {
-				Ok(Some(line)) => warn!(context = %context, "{line}"),
-				Ok(None) => break,
+			buf.clear();
+			match reader.read_until(b'\n', &mut buf).await {
+				Ok(0) => break,
+				Ok(_) => {
+					let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));
+					if stderr {
+						warn!(context = %context, "{line}");
+					} else {
+						info!(context = %context, "{line}");
+					}
+				}
 				Err(e) => {
-					warn!(context = %context, "stderr read failed: {e}");
+					warn!(context = %context, "child stdio read failed: {e}");
 					break;
 				}
 			}
 		}
-	});
+	})
 }
 
 fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
modifiedcrates/remowt-client/src/subprocess.rsdiffbeforeafterboth
--- a/crates/remowt-client/src/subprocess.rs
+++ b/crates/remowt-client/src/subprocess.rs
@@ -6,13 +6,13 @@
 use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
 use remowt_link_shared::BifConfig;
 use serde::de::DeserializeOwned;
-use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};
+use tokio::io::AsyncWriteExt as _;
 use tokio::select;
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
 use tracing::{debug, info, warn};
 
 use crate::forwarded::{RemowtListener, RemowtStream};
-use crate::Remowt;
+use crate::{drain_to_tracing, Remowt};
 
 #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
 pub enum StdioMode {
@@ -62,12 +62,24 @@
 			client,
 		} = self;
 		drop(stdin);
-		drop(stdout);
-		drop(stderr);
-		client
-			.wait(id)
-			.await?
-			.map_err(|e| anyhow!("agent wait failed: {e}"))
+		let drain_out = async move {
+			if let Some(s) = stdout {
+				drain_to_tracing(s, "<child stdout>".to_owned(), false).await;
+			}
+		};
+		let drain_err = async move {
+			if let Some(s) = stderr {
+				drain_to_tracing(s, "<child stderr>".to_owned(), true).await;
+			}
+		};
+		let wait = async move {
+			client
+				.wait(id)
+				.await?
+				.map_err(|e| anyhow!("agent wait failed: {e}"))
+		};
+		let (code, _, _) = tokio::join!(wait, drain_out, drain_err);
+		code
 	}
 
 	pub async fn kill(&self, signal: i32) -> Result<()> {
@@ -163,7 +175,7 @@
 		);
 
 		let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
-		let stdout_stream = handle_output(stdout, stdout_res?, &program, false);
+		let stdout_stream = handle_output(stdout, stdout_res?, &program);
 		let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
 
 		Ok(RemowtChild {
@@ -215,18 +227,13 @@
 	}
 }
 
-fn handle_output(
-	mode: StdioMode,
-	s: Option<RemowtStream>,
-	program: &str,
-	is_stderr: bool,
-) -> Option<RemowtStream> {
+fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
 	match mode {
 		StdioMode::Pipe => s,
 		StdioMode::Inherit => {
 			if let Some(s) = s {
 				let program = program.to_owned();
-				tokio::spawn(pump_to_tracing(s, program, is_stderr));
+				tokio::spawn(drain_to_tracing(s, program, false));
 			}
 			None
 		}
@@ -244,7 +251,7 @@
 		StderrMode::Inherit => {
 			if let Some(s) = s {
 				let program = program.to_owned();
-				tokio::spawn(pump_to_tracing(s, program, true));
+				tokio::spawn(drain_to_tracing(s, program, true));
 			}
 			None
 		}
@@ -252,26 +259,6 @@
 	}
 }
 
-async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {
-	let mut reader = BufReader::new(stream).lines();
-	loop {
-		match reader.next_line().await {
-			Ok(Some(line)) => {
-				if is_stderr {
-					warn!(program, "{line}");
-				} else {
-					info!(program, "{line}");
-				}
-			}
-			Ok(None) => break,
-			Err(e) => {
-				warn!(program, "child log stream error: {e}");
-				break;
-			}
-		}
-	}
-}
-
 fn escape_bash(input: &str, out: &mut String) {
 	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
 	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
@@ -422,6 +409,27 @@
 		}
 	};
 
+	while let Some(e) = err.next().await {
+		if let Ok(line) = e {
+			warn!(program = %program, "{line}");
+		}
+	}
+	if want_stdout {
+		if let Some(out_bytes) = out_bytes.as_mut() {
+			while let Some(o) = out_bytes.next().await {
+				if let Ok(chunk) = o {
+					buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);
+				}
+			}
+		}
+	} else if let Some(out_lines) = out_lines.as_mut() {
+		while let Some(o) = out_lines.next().await {
+			if let Ok(line) = o {
+				info!(program = %program, "{line}");
+			}
+		}
+	}
+
 	match exit {
 		Some(0) => Ok(buf),
 		Some(c) => bail!("command '{line}' failed with status {c}"),
modifiedcrates/remowt-link-shared/src/port.rsdiffbeforeafterboth
--- a/crates/remowt-link-shared/src/port.rs
+++ b/crates/remowt-link-shared/src/port.rs
@@ -3,10 +3,8 @@
 use bifrostlink::Port;
 use bytes::{Bytes, BytesMut};
 use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+use tokio::select;
 
-/// Wire a length-prefixed duplex byte stream (e.g. a child process's
-/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
-/// length followed by that many payload bytes.
 pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
 where
 	R: AsyncRead + Unpin + Send + 'static,
@@ -18,13 +16,13 @@
 				let len = match reader.read_u32().await {
 					Ok(len) => len,
 					Err(e) => {
-						tracing::error!("child read failed: {e}");
+						log_read_end(&e);
 						break;
 					}
 				};
 				let mut buf = BytesMut::zeroed(len as usize);
 				if let Err(e) = reader.read_exact(&mut buf).await {
-					tracing::error!("child read failed: {e}");
+					log_read_end(&e);
 					break;
 				}
 				if tx.send(buf.freeze()).is_err() {
@@ -35,15 +33,45 @@
 		let write_task = async move {
 			while let Some(msg) = rx.recv().await {
 				if let Err(e) = write_frame(&mut writer, msg).await {
-					tracing::error!("child write failed: {e}");
+					log_write_end(&e);
 					break;
 				}
 			}
 		};
-		tokio::join!(read_task, write_task);
+		select! {
+			_ = read_task => {},
+			_ = write_task => {},
+		}
 	})
 }
 
+fn log_read_end(e: &io::Error) {
+	if matches!(
+		e.kind(),
+		io::ErrorKind::UnexpectedEof
+			| io::ErrorKind::BrokenPipe
+			| io::ErrorKind::ConnectionReset
+			| io::ErrorKind::ConnectionAborted
+	) {
+		tracing::debug!("child read ended: {e}");
+	} else {
+		tracing::error!("child read failed: {e}");
+	}
+}
+
+fn log_write_end(e: &io::Error) {
+	if matches!(
+		e.kind(),
+		io::ErrorKind::BrokenPipe
+			| io::ErrorKind::ConnectionReset
+			| io::ErrorKind::ConnectionAborted
+	) {
+		tracing::debug!("child write ended: {e}");
+	} else {
+		tracing::error!("child write failed: {e}");
+	}
+}
+
 async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
 	let len = u32::try_from(msg.len())
 		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;