git.delta.rocks / remowt / refs/commits / 6d9cf16dada2

difftreelog

feat drain stdout/stderr after signal

vmkslqpuYaroslav Bolyukin4 days agoparent: #eadb0bb.patch.diff
in: trunk

6 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,9 +308,9 @@
 
 [[package]]
 name = "bifrostlink"
-version = "0.2.4"
+version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad2d0e30a2aa432b78f41f9676572f88201d4dc73bc2b7bc90704d2e02b7d062"
+checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39"
 dependencies = [
  "async-trait",
  "async_fn_traits",
@@ -327,9 +327,9 @@
 
 [[package]]
 name = "bifrostlink-macros"
-version = "0.2.4"
+version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2121559c45cbe89c4f8d1d741360d5b028577254f6beca053dc02332da85b43"
+checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -338,9 +338,9 @@
 
 [[package]]
 name = "bifrostlink-ports"
-version = "0.2.4"
+version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e9395c4ccca497b0c50583e6de57aca921c046ae0c10f56030cd2c5a20db05f8"
+checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70"
 dependencies = [
  "bifrostlink",
  "bytes",
@@ -1835,7 +1835,7 @@
 
 [[package]]
 name = "polkit-backend"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "clap",
@@ -2055,7 +2055,7 @@
 
 [[package]]
 name = "remowt-agent"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2083,7 +2083,7 @@
 
 [[package]]
 name = "remowt-client"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2106,7 +2106,7 @@
 
 [[package]]
 name = "remowt-endpoints"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2124,7 +2124,7 @@
 
 [[package]]
 name = "remowt-link-shared"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "bifrostlink",
  "bytes",
@@ -2138,7 +2138,7 @@
 
 [[package]]
 name = "remowt-plugin"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2152,7 +2152,7 @@
 
 [[package]]
 name = "remowt-polkit-shared"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "nix",
  "serde",
@@ -2161,7 +2161,7 @@
 
 [[package]]
 name = "remowt-ssh"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -2189,7 +2189,7 @@
 
 [[package]]
 name = "remowt-ui-prompt"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,18 +3,18 @@
 resolver = "2"
 
 [workspace.package]
-version = "0.1.4"
+version = "0.1.6"
 license = "MIT"
 edition = "2021"
 repository = "https://git.delta.rocks/r/remowt"
 
 [workspace.dependencies]
-remowt-client = { version = "0.1.3", path = "crates/remowt-client" }
-remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }
-remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }
-remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }
-remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }
+remowt-client = { version = "0.1.6", path = "crates/remowt-client" }
+remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" }
+remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" }
+remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" }
 
 bifrostlink = "0.2.0"
 bifrostlink-macros = "0.2.0"
@@ -52,7 +52,9 @@
 thiserror = "2.0.18"
 
 [profile.release]
-panic = "abort"
+panic = "unwind"
 opt-level = "z"
 lto = true
 codegen-units = 1
+debug = "full"
+split-debuginfo = "off"
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -2,6 +2,7 @@
 use std::collections::{BTreeMap, HashMap};
 use std::fs::Permissions;
 use std::future::pending;
+use std::io;
 use std::os::unix::fs::PermissionsExt as _;
 use std::path::PathBuf;
 use std::sync::{Arc, Mutex, OnceLock};
@@ -230,7 +231,7 @@
 
 fn main() -> anyhow::Result<()> {
 	tracing_subscriber::fmt()
-		.with_writer(std::io::stderr)
+		.with_writer(io::stderr)
 		.without_time()
 		.init();
 	let opts = Opts::parse();
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -17,13 +17,15 @@
 use russh::keys::ssh_key::PublicKey;
 use russh::Channel;
 use tempfile::TempDir;
+use tokio::io::AsyncRead;
 use tokio::net::UnixListener;
 use tokio::sync::oneshot;
+use tokio::task::JoinHandle;
 use tokio::{
 	fs,
-	io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},
+	io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},
 };
-use tracing::{debug, warn};
+use tracing::{debug, info, warn};
 use uuid::Uuid;
 
 pub mod editor;
@@ -97,7 +99,7 @@
 
 	let mut child = SshExecChild::from_exec(ch);
 	drop(child.stdin);
-	drain_stderr(child.stderr, cmd.to_owned());
+	drain_to_tracing(child.stderr, cmd.to_owned(), true);
 
 	let mut out = Vec::new();
 	child.stdout.read_to_end(&mut out).await?;
@@ -317,7 +319,7 @@
 			.await?;
 
 		let child = SshExecChild::from_exec(cmd_chan);
-		drain_stderr(child.stderr, "agent".to_owned());
+		drain_to_tracing(child.stderr, "agent".to_owned(), true);
 		rpc.add_direct(
 			Address::Agent,
 			child_port(child.stdout, child.stdin),
@@ -518,20 +520,33 @@
 	}
 }
 
-fn drain_stderr(stream: DuplexStream, context: String) {
+pub(crate) fn drain_to_tracing(
+	stream: impl AsyncRead + Unpin + 'static + Send,
+	context: String,
+	stderr: bool,
+) -> JoinHandle<()> {
 	tokio::spawn(async move {
-		let mut reader = BufReader::new(stream).lines();
+		let mut reader = BufReader::new(stream);
+		let mut buf = Vec::with_capacity(4096);
 		loop {
-			match reader.next_line().await {
-				Ok(Some(line)) => warn!(context = %context, "{line}"),
-				Ok(None) => break,
+			buf.clear();
+			match reader.read_until(b'\n', &mut buf).await {
+				Ok(0) => break,
+				Ok(_) => {
+					let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));
+					if stderr {
+						warn!(context = %context, "{line}");
+					} else {
+						info!(context = %context, "{line}");
+					}
+				}
 				Err(e) => {
-					warn!(context = %context, "stderr read failed: {e}");
+					warn!(context = %context, "child stdio read failed: {e}");
 					break;
 				}
 			}
 		}
-	});
+	})
 }
 
 fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
modifiedcrates/remowt-client/src/subprocess.rsdiffbeforeafterboth
before · crates/remowt-client/src/subprocess.rs
1use std::pin::pin;23use anyhow::{anyhow, bail, Result};4use camino::Utf8PathBuf;5use futures::StreamExt as _;6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};7use remowt_link_shared::BifConfig;8use serde::de::DeserializeOwned;9use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};10use tokio::select;11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{debug, info, warn};1314use crate::forwarded::{RemowtListener, RemowtStream};15use crate::Remowt;1617#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]18pub enum StdioMode {19	#[default]20	Null,21	Pipe,22	Inherit,23}2425#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]26pub enum StderrMode {27	#[default]28	Null,29	Pipe,30	Inherit,31	MergeWithStdout,32}3334#[derive(Default)]35pub struct SpawnOptions {36	pub program: String,37	pub args: Vec<String>,38	pub env: Vec<(String, String)>,39	pub env_clear: bool,40	pub cwd: Option<Utf8PathBuf>,41	pub escalated: bool,42	pub stdin: StdioMode,43	pub stdout: StdioMode,44	pub stderr: StderrMode,45}4647pub struct RemowtChild {48	pub stdin: Option<RemowtStream>,49	pub stdout: Option<RemowtStream>,50	pub stderr: Option<RemowtStream>,51	id: ProcId,52	client: SubprocessClient<BifConfig>,53}5455impl RemowtChild {56	pub async fn wait(self) -> Result<Option<i32>> {57		let RemowtChild {58			stdin,59			stdout,60			stderr,61			id,62			client,63		} = self;64		drop(stdin);65		drop(stdout);66		drop(stderr);67		client68			.wait(id)69			.await?70			.map_err(|e| anyhow!("agent wait failed: {e}"))71	}7273	pub async fn kill(&self, signal: i32) -> Result<()> {74		self.client75			.kill(self.id, signal)76			.await?77			.map_err(|e| anyhow!("agent kill failed: {e}"))78	}79}8081fn needs_socket(m: StdioMode) -> bool {82	matches!(m, StdioMode::Pipe | StdioMode::Inherit)83}8485fn stderr_needs_socket(m: StderrMode) -> bool {86	matches!(m, StderrMode::Pipe | StderrMode::Inherit)87}8889impl Remowt {90	pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {91		let SpawnOptions {92			program,93			args,94			env,95			env_clear,96			cwd,97			escalated,98			stdin,99			stdout,100			stderr,101		} = opts;102103		if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {104			bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");105		}106107		let stdin_bound = if needs_socket(stdin) {108			Some(self.bind_runtime_unix("proc-stdin").await?)109		} else {110			None111		};112		let stdout_bound = if needs_socket(stdout) {113			Some(self.bind_runtime_unix("proc-stdout").await?)114		} else {115			None116		};117		let stderr_bound = if stderr_needs_socket(stderr) {118			Some(self.bind_runtime_unix("proc-stderr").await?)119		} else {120			None121		};122123		let stdin_spec = match &stdin_bound {124			Some((_, p)) => StdioSpec::Socket(p.clone()),125			None => StdioSpec::Null,126		};127		let stdout_spec = match &stdout_bound {128			Some((_, p)) => StdioSpec::Socket(p.clone()),129			None => StdioSpec::Null,130		};131		let stderr_spec = match (&stderr, &stderr_bound) {132			(StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),133			(StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,134			_ => StderrSpec::Null,135		};136137		let client: SubprocessClient<BifConfig> = if escalated {138			// Boxed to break the async-fn type cycle139			Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?140		} else {141			self.endpoints()142		};143144		let spec = SpawnSpec {145			program: program.clone(),146			args,147			env,148			env_clear,149			cwd,150			stdin: stdin_spec,151			stdout: stdout_spec,152			stderr: stderr_spec,153		};154		let id = client155			.spawn(spec)156			.await?157			.map_err(|e| anyhow!("agent spawn failed: {e}"))?;158159		let (stdin_res, stdout_res, stderr_res) = tokio::join!(160			accept(stdin_bound),161			accept(stdout_bound),162			accept(stderr_bound),163		);164165		let stdin_stream = handle_stdin(stdin, stdin_res?, &program);166		let stdout_stream = handle_output(stdout, stdout_res?, &program, false);167		let stderr_stream = handle_output_err(stderr, stderr_res?, &program);168169		Ok(RemowtChild {170			stdin: stdin_stream,171			stdout: stdout_stream,172			stderr: stderr_stream,173			id,174			client,175		})176	}177178	pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {179		let program = program.as_ref().to_owned();180		RemowtCommand {181			program,182			args: vec![],183			env: vec![],184			remowt: self.clone(),185			escalated: false,186		}187	}188}189190async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {191	match b {192		Some((l, _)) => Ok(Some(l.accept().await?)),193		None => Ok(None),194	}195}196197fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {198	match mode {199		StdioMode::Pipe => s,200		StdioMode::Inherit => {201			if let Some(s) = s {202				let program = program.to_owned();203				tokio::spawn(async move {204					let mut stdin = tokio::io::stdin();205					let mut s = s;206					if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {207						warn!(program, "stdin forward ended: {e}");208					}209					let _ = s.shutdown().await;210				});211			}212			None213		}214		StdioMode::Null => None,215	}216}217218fn handle_output(219	mode: StdioMode,220	s: Option<RemowtStream>,221	program: &str,222	is_stderr: bool,223) -> Option<RemowtStream> {224	match mode {225		StdioMode::Pipe => s,226		StdioMode::Inherit => {227			if let Some(s) = s {228				let program = program.to_owned();229				tokio::spawn(pump_to_tracing(s, program, is_stderr));230			}231			None232		}233		StdioMode::Null => None,234	}235}236237fn handle_output_err(238	mode: StderrMode,239	s: Option<RemowtStream>,240	program: &str,241) -> Option<RemowtStream> {242	match mode {243		StderrMode::Pipe => s,244		StderrMode::Inherit => {245			if let Some(s) = s {246				let program = program.to_owned();247				tokio::spawn(pump_to_tracing(s, program, true));248			}249			None250		}251		StderrMode::MergeWithStdout | StderrMode::Null => None,252	}253}254255async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {256	let mut reader = BufReader::new(stream).lines();257	loop {258		match reader.next_line().await {259			Ok(Some(line)) => {260				if is_stderr {261					warn!(program, "{line}");262				} else {263					info!(program, "{line}");264				}265			}266			Ok(None) => break,267			Err(e) => {268				warn!(program, "child log stream error: {e}");269				break;270			}271		}272	}273}274275fn escape_bash(input: &str, out: &mut String) {276	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";277	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {278		out.push_str(input);279		return;280	}281	out.push('\'');282	for (i, v) in input.split('\'').enumerate() {283		if i != 0 {284			out.push_str("'\"'\"'");285		}286		out.push_str(v);287	}288	out.push('\'');289}290291#[derive(Clone)]292pub struct RemowtCommand {293	program: String,294	args: Vec<String>,295	env: Vec<(String, String)>,296	remowt: Remowt,297	escalated: bool,298}299300impl RemowtCommand {301	pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {302		self.args.push(arg.as_ref().to_owned());303		self304	}305	pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {306		for arg in args {307			self.args.push(arg.as_ref().to_owned());308		}309		self310	}311	pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {312		self.args313			.push(format!("{}={}", key.as_ref(), value.as_ref()));314		self315	}316	pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {317		self.args.push(key.as_ref().to_owned());318		self.args.push(value.as_ref().to_owned());319		self320	}321	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {322		self.env323			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));324		self325	}326327	pub fn sudo(mut self) -> Self {328		self.escalated = true;329		self330	}331332	/// Only for display.333	fn shell_line(&self) -> String {334		let mut out = String::new();335		if self.escalated {336			out.push_str("run0 ");337		}338		if !self.env.is_empty() {339			out.push_str("env");340			for (k, v) in &self.env {341				out.push(' ');342				assert!(!k.contains('='));343				escape_bash(k, &mut out);344				out.push('=');345				escape_bash(v, &mut out);346			}347			out.push(' ');348		}349		escape_bash(&self.program, &mut out);350		for arg in &self.args {351			out.push(' ');352			escape_bash(arg, &mut out);353		}354		out355	}356357	fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {358		let line = self.shell_line();359		let opts = SpawnOptions {360			program: self.program,361			args: self.args,362			env: self.env,363			env_clear: false,364			cwd: None,365			escalated: self.escalated,366			stdin: StdioMode::Null,367			stdout: StdioMode::Pipe,368			stderr: StderrMode::Pipe,369		};370		(self.remowt, opts, line)371	}372373	pub async fn run(self) -> Result<()> {374		run_inner(self, false).await.map(|_| ())375	}376	pub async fn run_string(self) -> Result<String> {377		let bytes = run_inner(self, true).await?.expect("want_stdout");378		Ok(String::from_utf8(bytes)?)379	}380	pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {381		let s = self.run_string().await?;382		Ok(serde_json::from_str(&s)?)383	}384}385386async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {387	let (remowt, opts, line) = cmd.into_spawn_options();388	debug!("running command {line:?} over remowt");389	let program = opts.program.clone();390	let mut child = remowt.spawn(opts).await?;391	let stderr = child.stderr.take().expect("stderr=Pipe");392	let stdout = child.stdout.take().expect("stdout=Pipe");393394	let mut err = FramedRead::new(stderr, LinesCodec::new());395	let (mut out_bytes, mut out_lines) = if want_stdout {396		(Some(FramedRead::new(stdout, BytesCodec::new())), None)397	} else {398		(None, Some(FramedRead::new(stdout, LinesCodec::new())))399	};400401	let mut buf = if want_stdout { Some(Vec::new()) } else { None };402403	let mut wait = pin!(child.wait());404	let exit = loop {405		select! {406			biased;407408			Some(e) = err.next() => {409				let e = e?;410				warn!(program = %program, "{e}");411			}412			Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {413				buf.as_mut().expect("want_stdout").extend_from_slice(&o?);414			}415			Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {416				let o = o?;417				info!(program = %program, "{o}");418			}419			res = &mut wait => {420				break res?;421			}422		}423	};424425	match exit {426		Some(0) => Ok(buf),427		Some(c) => bail!("command '{line}' failed with status {c}"),428		None => Err(anyhow!("command '{line}' ended without an exit status")),429	}430}
after · crates/remowt-client/src/subprocess.rs
1use std::pin::pin;23use anyhow::{anyhow, bail, Result};4use camino::Utf8PathBuf;5use futures::StreamExt as _;6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};7use remowt_link_shared::BifConfig;8use serde::de::DeserializeOwned;9use tokio::io::AsyncWriteExt as _;10use tokio::select;11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{debug, info, warn};1314use crate::forwarded::{RemowtListener, RemowtStream};15use crate::{drain_to_tracing, Remowt};1617#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]18pub enum StdioMode {19	#[default]20	Null,21	Pipe,22	Inherit,23}2425#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]26pub enum StderrMode {27	#[default]28	Null,29	Pipe,30	Inherit,31	MergeWithStdout,32}3334#[derive(Default)]35pub struct SpawnOptions {36	pub program: String,37	pub args: Vec<String>,38	pub env: Vec<(String, String)>,39	pub env_clear: bool,40	pub cwd: Option<Utf8PathBuf>,41	pub escalated: bool,42	pub stdin: StdioMode,43	pub stdout: StdioMode,44	pub stderr: StderrMode,45}4647pub struct RemowtChild {48	pub stdin: Option<RemowtStream>,49	pub stdout: Option<RemowtStream>,50	pub stderr: Option<RemowtStream>,51	id: ProcId,52	client: SubprocessClient<BifConfig>,53}5455impl RemowtChild {56	pub async fn wait(self) -> Result<Option<i32>> {57		let RemowtChild {58			stdin,59			stdout,60			stderr,61			id,62			client,63		} = self;64		drop(stdin);65		let drain_out = async move {66			if let Some(s) = stdout {67				drain_to_tracing(s, "<child stdout>".to_owned(), false).await;68			}69		};70		let drain_err = async move {71			if let Some(s) = stderr {72				drain_to_tracing(s, "<child stderr>".to_owned(), true).await;73			}74		};75		let wait = async move {76			client77				.wait(id)78				.await?79				.map_err(|e| anyhow!("agent wait failed: {e}"))80		};81		let (code, _, _) = tokio::join!(wait, drain_out, drain_err);82		code83	}8485	pub async fn kill(&self, signal: i32) -> Result<()> {86		self.client87			.kill(self.id, signal)88			.await?89			.map_err(|e| anyhow!("agent kill failed: {e}"))90	}91}9293fn needs_socket(m: StdioMode) -> bool {94	matches!(m, StdioMode::Pipe | StdioMode::Inherit)95}9697fn stderr_needs_socket(m: StderrMode) -> bool {98	matches!(m, StderrMode::Pipe | StderrMode::Inherit)99}100101impl Remowt {102	pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {103		let SpawnOptions {104			program,105			args,106			env,107			env_clear,108			cwd,109			escalated,110			stdin,111			stdout,112			stderr,113		} = opts;114115		if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {116			bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");117		}118119		let stdin_bound = if needs_socket(stdin) {120			Some(self.bind_runtime_unix("proc-stdin").await?)121		} else {122			None123		};124		let stdout_bound = if needs_socket(stdout) {125			Some(self.bind_runtime_unix("proc-stdout").await?)126		} else {127			None128		};129		let stderr_bound = if stderr_needs_socket(stderr) {130			Some(self.bind_runtime_unix("proc-stderr").await?)131		} else {132			None133		};134135		let stdin_spec = match &stdin_bound {136			Some((_, p)) => StdioSpec::Socket(p.clone()),137			None => StdioSpec::Null,138		};139		let stdout_spec = match &stdout_bound {140			Some((_, p)) => StdioSpec::Socket(p.clone()),141			None => StdioSpec::Null,142		};143		let stderr_spec = match (&stderr, &stderr_bound) {144			(StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),145			(StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,146			_ => StderrSpec::Null,147		};148149		let client: SubprocessClient<BifConfig> = if escalated {150			// Boxed to break the async-fn type cycle151			Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?152		} else {153			self.endpoints()154		};155156		let spec = SpawnSpec {157			program: program.clone(),158			args,159			env,160			env_clear,161			cwd,162			stdin: stdin_spec,163			stdout: stdout_spec,164			stderr: stderr_spec,165		};166		let id = client167			.spawn(spec)168			.await?169			.map_err(|e| anyhow!("agent spawn failed: {e}"))?;170171		let (stdin_res, stdout_res, stderr_res) = tokio::join!(172			accept(stdin_bound),173			accept(stdout_bound),174			accept(stderr_bound),175		);176177		let stdin_stream = handle_stdin(stdin, stdin_res?, &program);178		let stdout_stream = handle_output(stdout, stdout_res?, &program);179		let stderr_stream = handle_output_err(stderr, stderr_res?, &program);180181		Ok(RemowtChild {182			stdin: stdin_stream,183			stdout: stdout_stream,184			stderr: stderr_stream,185			id,186			client,187		})188	}189190	pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {191		let program = program.as_ref().to_owned();192		RemowtCommand {193			program,194			args: vec![],195			env: vec![],196			remowt: self.clone(),197			escalated: false,198		}199	}200}201202async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {203	match b {204		Some((l, _)) => Ok(Some(l.accept().await?)),205		None => Ok(None),206	}207}208209fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {210	match mode {211		StdioMode::Pipe => s,212		StdioMode::Inherit => {213			if let Some(s) = s {214				let program = program.to_owned();215				tokio::spawn(async move {216					let mut stdin = tokio::io::stdin();217					let mut s = s;218					if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {219						warn!(program, "stdin forward ended: {e}");220					}221					let _ = s.shutdown().await;222				});223			}224			None225		}226		StdioMode::Null => None,227	}228}229230fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {231	match mode {232		StdioMode::Pipe => s,233		StdioMode::Inherit => {234			if let Some(s) = s {235				let program = program.to_owned();236				tokio::spawn(drain_to_tracing(s, program, false));237			}238			None239		}240		StdioMode::Null => None,241	}242}243244fn handle_output_err(245	mode: StderrMode,246	s: Option<RemowtStream>,247	program: &str,248) -> Option<RemowtStream> {249	match mode {250		StderrMode::Pipe => s,251		StderrMode::Inherit => {252			if let Some(s) = s {253				let program = program.to_owned();254				tokio::spawn(drain_to_tracing(s, program, true));255			}256			None257		}258		StderrMode::MergeWithStdout | StderrMode::Null => None,259	}260}261262fn escape_bash(input: &str, out: &mut String) {263	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";264	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {265		out.push_str(input);266		return;267	}268	out.push('\'');269	for (i, v) in input.split('\'').enumerate() {270		if i != 0 {271			out.push_str("'\"'\"'");272		}273		out.push_str(v);274	}275	out.push('\'');276}277278#[derive(Clone)]279pub struct RemowtCommand {280	program: String,281	args: Vec<String>,282	env: Vec<(String, String)>,283	remowt: Remowt,284	escalated: bool,285}286287impl RemowtCommand {288	pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {289		self.args.push(arg.as_ref().to_owned());290		self291	}292	pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {293		for arg in args {294			self.args.push(arg.as_ref().to_owned());295		}296		self297	}298	pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {299		self.args300			.push(format!("{}={}", key.as_ref(), value.as_ref()));301		self302	}303	pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {304		self.args.push(key.as_ref().to_owned());305		self.args.push(value.as_ref().to_owned());306		self307	}308	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {309		self.env310			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));311		self312	}313314	pub fn sudo(mut self) -> Self {315		self.escalated = true;316		self317	}318319	/// Only for display.320	fn shell_line(&self) -> String {321		let mut out = String::new();322		if self.escalated {323			out.push_str("run0 ");324		}325		if !self.env.is_empty() {326			out.push_str("env");327			for (k, v) in &self.env {328				out.push(' ');329				assert!(!k.contains('='));330				escape_bash(k, &mut out);331				out.push('=');332				escape_bash(v, &mut out);333			}334			out.push(' ');335		}336		escape_bash(&self.program, &mut out);337		for arg in &self.args {338			out.push(' ');339			escape_bash(arg, &mut out);340		}341		out342	}343344	fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {345		let line = self.shell_line();346		let opts = SpawnOptions {347			program: self.program,348			args: self.args,349			env: self.env,350			env_clear: false,351			cwd: None,352			escalated: self.escalated,353			stdin: StdioMode::Null,354			stdout: StdioMode::Pipe,355			stderr: StderrMode::Pipe,356		};357		(self.remowt, opts, line)358	}359360	pub async fn run(self) -> Result<()> {361		run_inner(self, false).await.map(|_| ())362	}363	pub async fn run_string(self) -> Result<String> {364		let bytes = run_inner(self, true).await?.expect("want_stdout");365		Ok(String::from_utf8(bytes)?)366	}367	pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {368		let s = self.run_string().await?;369		Ok(serde_json::from_str(&s)?)370	}371}372373async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {374	let (remowt, opts, line) = cmd.into_spawn_options();375	debug!("running command {line:?} over remowt");376	let program = opts.program.clone();377	let mut child = remowt.spawn(opts).await?;378	let stderr = child.stderr.take().expect("stderr=Pipe");379	let stdout = child.stdout.take().expect("stdout=Pipe");380381	let mut err = FramedRead::new(stderr, LinesCodec::new());382	let (mut out_bytes, mut out_lines) = if want_stdout {383		(Some(FramedRead::new(stdout, BytesCodec::new())), None)384	} else {385		(None, Some(FramedRead::new(stdout, LinesCodec::new())))386	};387388	let mut buf = if want_stdout { Some(Vec::new()) } else { None };389390	let mut wait = pin!(child.wait());391	let exit = loop {392		select! {393			biased;394395			Some(e) = err.next() => {396				let e = e?;397				warn!(program = %program, "{e}");398			}399			Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {400				buf.as_mut().expect("want_stdout").extend_from_slice(&o?);401			}402			Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {403				let o = o?;404				info!(program = %program, "{o}");405			}406			res = &mut wait => {407				break res?;408			}409		}410	};411412	while let Some(e) = err.next().await {413		if let Ok(line) = e {414			warn!(program = %program, "{line}");415		}416	}417	if want_stdout {418		if let Some(out_bytes) = out_bytes.as_mut() {419			while let Some(o) = out_bytes.next().await {420				if let Ok(chunk) = o {421					buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);422				}423			}424		}425	} else if let Some(out_lines) = out_lines.as_mut() {426		while let Some(o) = out_lines.next().await {427			if let Ok(line) = o {428				info!(program = %program, "{line}");429			}430		}431	}432433	match exit {434		Some(0) => Ok(buf),435		Some(c) => bail!("command '{line}' failed with status {c}"),436		None => Err(anyhow!("command '{line}' ended without an exit status")),437	}438}
modifiedcrates/remowt-link-shared/src/port.rsdiffbeforeafterboth
--- a/crates/remowt-link-shared/src/port.rs
+++ b/crates/remowt-link-shared/src/port.rs
@@ -3,10 +3,8 @@
 use bifrostlink::Port;
 use bytes::{Bytes, BytesMut};
 use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+use tokio::select;
 
-/// Wire a length-prefixed duplex byte stream (e.g. a child process's
-/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
-/// length followed by that many payload bytes.
 pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
 where
 	R: AsyncRead + Unpin + Send + 'static,
@@ -18,13 +16,13 @@
 				let len = match reader.read_u32().await {
 					Ok(len) => len,
 					Err(e) => {
-						tracing::error!("child read failed: {e}");
+						log_read_end(&e);
 						break;
 					}
 				};
 				let mut buf = BytesMut::zeroed(len as usize);
 				if let Err(e) = reader.read_exact(&mut buf).await {
-					tracing::error!("child read failed: {e}");
+					log_read_end(&e);
 					break;
 				}
 				if tx.send(buf.freeze()).is_err() {
@@ -35,15 +33,45 @@
 		let write_task = async move {
 			while let Some(msg) = rx.recv().await {
 				if let Err(e) = write_frame(&mut writer, msg).await {
-					tracing::error!("child write failed: {e}");
+					log_write_end(&e);
 					break;
 				}
 			}
 		};
-		tokio::join!(read_task, write_task);
+		select! {
+			_ = read_task => {},
+			_ = write_task => {},
+		}
 	})
 }
 
+fn log_read_end(e: &io::Error) {
+	if matches!(
+		e.kind(),
+		io::ErrorKind::UnexpectedEof
+			| io::ErrorKind::BrokenPipe
+			| io::ErrorKind::ConnectionReset
+			| io::ErrorKind::ConnectionAborted
+	) {
+		tracing::debug!("child read ended: {e}");
+	} else {
+		tracing::error!("child read failed: {e}");
+	}
+}
+
+fn log_write_end(e: &io::Error) {
+	if matches!(
+		e.kind(),
+		io::ErrorKind::BrokenPipe
+			| io::ErrorKind::ConnectionReset
+			| io::ErrorKind::ConnectionAborted
+	) {
+		tracing::debug!("child write ended: {e}");
+	} else {
+		tracing::error!("child write failed: {e}");
+	}
+}
+
 async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
 	let len = u32::try_from(msg.len())
 		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;