git.delta.rocks / remowt / refs/commits / 6d9cf16dada2

difftreelog

feat drain stdout/stderr after signal

vmkslqpuYaroslav Bolyukin4 days agoparent: #eadb0bb.patch.diff
in: trunk

6 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,9 +308,9 @@
 
 [[package]]
 name = "bifrostlink"
-version = "0.2.4"
+version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad2d0e30a2aa432b78f41f9676572f88201d4dc73bc2b7bc90704d2e02b7d062"
+checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39"
 dependencies = [
  "async-trait",
  "async_fn_traits",
@@ -327,9 +327,9 @@
 
 [[package]]
 name = "bifrostlink-macros"
-version = "0.2.4"
+version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2121559c45cbe89c4f8d1d741360d5b028577254f6beca053dc02332da85b43"
+checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -338,9 +338,9 @@
 
 [[package]]
 name = "bifrostlink-ports"
-version = "0.2.4"
+version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e9395c4ccca497b0c50583e6de57aca921c046ae0c10f56030cd2c5a20db05f8"
+checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70"
 dependencies = [
  "bifrostlink",
  "bytes",
@@ -1835,7 +1835,7 @@
 
 [[package]]
 name = "polkit-backend"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "clap",
@@ -2055,7 +2055,7 @@
 
 [[package]]
 name = "remowt-agent"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2083,7 +2083,7 @@
 
 [[package]]
 name = "remowt-client"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2106,7 +2106,7 @@
 
 [[package]]
 name = "remowt-endpoints"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2124,7 +2124,7 @@
 
 [[package]]
 name = "remowt-link-shared"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "bifrostlink",
  "bytes",
@@ -2138,7 +2138,7 @@
 
 [[package]]
 name = "remowt-plugin"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2152,7 +2152,7 @@
 
 [[package]]
 name = "remowt-polkit-shared"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "nix",
  "serde",
@@ -2161,7 +2161,7 @@
 
 [[package]]
 name = "remowt-ssh"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -2189,7 +2189,7 @@
 
 [[package]]
 name = "remowt-ui-prompt"
-version = "0.1.4"
+version = "0.1.6"
 dependencies = [
  "anyhow",
  "bifrostlink",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,18 +3,18 @@
 resolver = "2"
 
 [workspace.package]
-version = "0.1.4"
+version = "0.1.6"
 license = "MIT"
 edition = "2021"
 repository = "https://git.delta.rocks/r/remowt"
 
 [workspace.dependencies]
-remowt-client = { version = "0.1.3", path = "crates/remowt-client" }
-remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }
-remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }
-remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }
-remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }
+remowt-client = { version = "0.1.6", path = "crates/remowt-client" }
+remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" }
+remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" }
+remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" }
 
 bifrostlink = "0.2.0"
 bifrostlink-macros = "0.2.0"
@@ -52,7 +52,9 @@
 thiserror = "2.0.18"
 
 [profile.release]
-panic = "abort"
+panic = "unwind"
 opt-level = "z"
 lto = true
 codegen-units = 1
+debug = "full"
+split-debuginfo = "off"
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -2,6 +2,7 @@
 use std::collections::{BTreeMap, HashMap};
 use std::fs::Permissions;
 use std::future::pending;
+use std::io;
 use std::os::unix::fs::PermissionsExt as _;
 use std::path::PathBuf;
 use std::sync::{Arc, Mutex, OnceLock};
@@ -230,7 +231,7 @@
 
 fn main() -> anyhow::Result<()> {
 	tracing_subscriber::fmt()
-		.with_writer(std::io::stderr)
+		.with_writer(io::stderr)
 		.without_time()
 		.init();
 	let opts = Opts::parse();
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
before · crates/remowt-client/src/lib.rs
1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use camino::{Utf8Path, Utf8PathBuf};10use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};13use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::keys::agent::client::AgentClient;15use russh::keys::agent::AgentIdentity;16use russh::keys::check_known_hosts;17use russh::keys::ssh_key::PublicKey;18use russh::Channel;19use tempfile::TempDir;20use tokio::net::UnixListener;21use tokio::sync::oneshot;22use tokio::{23	fs,24	io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},25};26use tracing::{debug, warn};27use uuid::Uuid;2829pub mod editor;30mod forwarded;31mod shell;32mod ssh_exec;33mod subprocess;3435use self::ssh_exec::SshExecChild;36pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};37pub use forwarded::{RemowtListener, RemowtStream};38pub use shell::{RemowtShell, RemowtShellResizer};3940type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4142fn sh_quote(s: impl AsRef<str>) -> String {43	format!("'{}'", s.as_ref().replace('\'', "'\\''"))44}4546const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];4748pub struct AgentBundle {49	dir: PathBuf,50	hashes: HashMap<String, String>,51}5253impl AgentBundle {54	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {55		let dir = dir.into();56		let hashes_path = dir.join("hashes");57		let raw = std::fs::read_to_string(&hashes_path)58			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;59		let mut hashes = HashMap::new();60		for line in raw.lines() {61			let line = line.trim();62			if line.is_empty() {63				continue;64			}65			let (arch, hash) = line66				.split_once(char::is_whitespace)67				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;68			hashes.insert(arch.to_owned(), hash.trim().to_owned());69		}70		ensure!(71			!hashes.is_empty(),72			"agent bundle {} has no hashes",73			dir.display()74		);75		Ok(Self { dir, hashes })76	}7778	fn binary(&self, arch: &str) -> PathBuf {79		self.dir.join(format!("remowt-agent-{arch}"))80	}8182	fn local_binary(&self) -> Result<PathBuf> {83		let arch = env::consts::ARCH;84		let path = self.binary(arch);85		ensure!(86			path.is_file(),87			"no local remowt-agent build for arch {arch} in bundle {}",88			self.dir.display()89		);90		Ok(path)91	}92}9394async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {95	let ch = sess.channel_open_session().await?;96	ch.exec(true, cmd).await?;9798	let mut child = SshExecChild::from_exec(ch);99	drop(child.stdin);100	drain_stderr(child.stderr, cmd.to_owned());101102	let mut out = Vec::new();103	child.stdout.read_to_end(&mut out).await?;104	let code = child.exit.await.ok().flatten();105	Ok((code, out))106}107108async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {109	let (code, mut out) = run(sess, cmd).await?;110	ensure!(111		code == Some(0),112		"remote command failed (exit {code:?}): {cmd}"113	);114	if !out.is_empty() {115		ensure!(116			out.ends_with(b"\n"),117			"remote command was not newline-terminated: {cmd}: {out:?}"118		);119		out.pop();120	}121	String::from_utf8(out).context("expected utf8 output for command")122}123124async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {125	debug!("uname -a");126	let arch = run_string_ok(sess, "uname -m").await?;127	let hash = bundle128		.hashes129		.get(&arch)130		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;131132	debug!("get dir");133	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;134	let dir = if cache.is_empty() {135		let home = run_string_ok(sess, "echo \"$HOME\"").await?;136		ensure!(137			!home.is_empty(),138			"remote $HOME and $XDG_CACHE_HOME both empty"139		);140		Utf8PathBuf::from(home).join(".cache/remowt")141	} else {142		Utf8PathBuf::from(cache).join("remowt")143	};144	let path = dir.join(hash);145146	debug!("presence");147	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;148	if present != Some(0) {149		let bin = bundle.binary(&arch);150		debug!("read");151		let bytes = fs::read(&bin)152			.await153			.with_context(|| format!("reading agent binary {}", bin.display()))?;154		debug!("upload");155		upload_agent(sess, &dir, &path, bytes).await?;156	}157	Ok(path)158}159160async fn upload_agent(161	sess: &Handle<SshHandler>,162	dir: &Utf8Path,163	path: &Utf8Path,164	bytes: Vec<u8>,165) -> Result<()> {166	debug!("mkdirp");167	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;168169	let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));170	let ch = sess.channel_open_session().await?;171	debug!("cat");172	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;173174	let mut child = SshExecChild::from_exec(ch);175	child176		.stdin177		.write_all(&bytes)178		.await179		.context("sending agent binary")?;180	child181		.stdin182		.shutdown()183		.await184		.context("sending agent binary")?;185	let code = child.wait().await;186	ensure!(code == Some(0), "agent upload failed (exit {code:?})");187188	debug!("chmod");189	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;190	run_string_ok(191		sess,192		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),193	)194	.await?;195	Ok(())196}197198pub struct SshHandler {199	host: String,200	port: u16,201	subs: Subs,202}203impl Handler for SshHandler {204	type Error = russh::Error;205	async fn check_server_key(206		&mut self,207		server_public_key: &PublicKey,208	) -> Result<bool, Self::Error> {209		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)210	}211	async fn server_channel_open_forwarded_streamlocal(212		&mut self,213		channel: Channel<Msg>,214		socket_path: &str,215		_session: &mut Session,216	) -> Result<(), Self::Error> {217		let Some(ch) = self218			.subs219			.lock()220			.expect("lock")221			.remove(&Utf8PathBuf::from(socket_path))222		else {223			return Err(russh::Error::WrongChannel);224		};225		let _ = ch.send(channel);226		Ok(())227	}228}229230enum Transport {231	Ssh {232		sess: Arc<Handle<SshHandler>>,233		subs: Subs,234		runtime_dir: Utf8PathBuf,235		agent_path: Utf8PathBuf,236	},237	Local {238		agent_path: PathBuf,239		runtime_dir: Utf8PathBuf,240	},241}242243struct RemowtInner {244	transport: Transport,245	rpc: Rpc<BifConfig>,246	elevated: tokio::sync::OnceCell<()>,247	#[allow(dead_code)]248	children: Mutex<Vec<tokio::process::Child>>,249	_runtime_tmp: Option<TempDir>,250}251252#[derive(Clone)]253pub struct Remowt(Arc<RemowtInner>);254255pub type RemowtRemote = Remote<BifConfig>;256257impl Remowt {258	/// Connect to the remote host over ssh, detect the architecture and deploy the required259	/// agent binary.260	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {261		let conf = russh_config::parse_home(host)?;262		let port = conf.host_config.port.or(conf.port).unwrap_or(22);263		let hostname = conf264			.host_config265			.hostname266			.clone()267			.unwrap_or_else(|| conf.host_name.clone());268		let user = conf269			.user270			.clone()271			.unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));272273		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));274		let mut sess = connect(275			Arc::new(Config::default()),276			(hostname.clone(), port),277			SshHandler {278				host: hostname,279				port,280				subs: subs.clone(),281			},282		)283		.await?;284285		let mut agent = AgentClient::connect_env().await?;286		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();287		let mut authenticated = false;288		for ident in agent.request_identities().await? {289			let AgentIdentity::PublicKey { key, .. } = ident else {290				continue;291			};292			if sess293				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)294				.await?295				.success()296			{297				authenticated = true;298				break;299			}300		}301		ensure!(authenticated, "ssh authentication failed");302303		let sess = Arc::new(sess);304305		debug!("deploying agent");306		let agent_path = deploy_agent(&sess, bundle).await?;307308		debug!("runtime dir");309		let runtime_dir = remote_runtime_dir(&sess).await?;310311		let rpc = Rpc::<BifConfig>::new(Address::User);312313		let cmd_chan = sess.channel_open_session().await?;314		debug!("starting agent");315		cmd_chan316			.exec(true, format!("{} real-agent", sh_quote(&agent_path)))317			.await?;318319		let child = SshExecChild::from_exec(cmd_chan);320		drain_stderr(child.stderr, "agent".to_owned());321		rpc.add_direct(322			Address::Agent,323			child_port(child.stdout, child.stdin),324			Rtt(0),325		);326327		Ok(Self(Arc::new(RemowtInner {328			transport: Transport::Ssh {329				sess,330				subs,331				runtime_dir,332				agent_path,333			},334			rpc,335			elevated: tokio::sync::OnceCell::new(),336			children: Mutex::new(Vec::new()),337			_runtime_tmp: None,338		})))339	}340341	/// "Connect" to the local machine's agent, by starting the agent binary locally.342	pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {343		let agent_path = bundle.local_binary()?;344		let mut child = tokio::process::Command::new(&agent_path)345			.arg("real-agent")346			.arg("--local")347			.stdin(std::process::Stdio::piped())348			.stdout(std::process::Stdio::piped())349			.kill_on_drop(true)350			.spawn()351			.with_context(|| format!("spawning agent binary {}", agent_path.display()))?;352		let stdin = child.stdin.take().expect("stdin piped");353		let stdout = child.stdout.take().expect("stdout piped");354355		let rpc = Rpc::<BifConfig>::new(Address::User);356		rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));357358		let (runtime_dir, runtime_tmp) = local_runtime_dir()?;359360		Ok(Self(Arc::new(RemowtInner {361			transport: Transport::Local {362				agent_path,363				runtime_dir,364			},365			rpc,366			elevated: tokio::sync::OnceCell::new(),367			children: Mutex::new(vec![child]),368			_runtime_tmp: runtime_tmp,369		})))370	}371372	/// Get the handle to the underlying russh session handle.373	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {374		match &self.0.transport {375			Transport::Ssh { sess, .. } => Some(sess.clone()),376			Transport::Local { .. } => None,377		}378	}379380	pub fn rpc(&self) -> Rpc<BifConfig> {381		self.0.rpc.clone()382	}383384	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {385		let client: PluginEndpointsClient<BifConfig> = self.endpoints();386		client387			.load_plugin(id, name.to_owned())388			.await?389			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))390	}391	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {392		self.ensure_escalated().await?;393		let client: PluginEndpointsClient<BifConfig> =394			PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));395		client396			.load_plugin_path(id, path.to_owned())397			.await?398			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))399	}400	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {401		R::wrap(self.0.rpc.remote(Address::Plugin(id)))402	}403404	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {405		R::wrap(self.0.rpc.remote(Address::Agent))406	}407	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {408		self.ensure_escalated().await?;409		Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))410	}411412	async fn ensure_escalated(&self) -> Result<()> {413		self.0414			.elevated415			.get_or_try_init(|| async {416				let (agent_path, local) = match &self.0.transport {417					Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),418					Transport::Local { agent_path, .. } => (419						agent_path420							.to_str()421							.ok_or_else(|| anyhow!("local agent path is not utf-8"))?422							.to_owned(),423						true,424					),425				};426427				let (tool, flags) = self.detect_escalation().await?;428				let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();429				args.push(agent_path);430				args.push("real-agent".to_owned());431				args.push("--privileged".to_owned());432				if local {433					args.push("--local".to_owned());434				}435436				let child = self437					.spawn(SpawnOptions {438						program: tool.to_owned(),439						args,440						stdin: StdioMode::Pipe,441						stdout: StdioMode::Pipe,442						stderr: StderrMode::Inherit,443						..Default::default()444					})445					.await446					.context("spawning privileged agent")?;447448				let stdin = child449					.stdin450					.ok_or_else(|| anyhow!("privileged agent stdin missing"))?;451				let stdout = child452					.stdout453					.ok_or_else(|| anyhow!("privileged agent stdout missing"))?;454455				let port = child_port(stdout, stdin);456				self.0457					.rpc458					.add_direct(Address::AgentPrivileged, port, Rtt(0));459				anyhow::Ok(())460			})461			.await?;462		Ok(())463	}464465	async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {466		for (tool, flags) in ESCALATORS {467			let probe = self468				.spawn(SpawnOptions {469					program: (*tool).to_owned(),470					args: vec!["--version".to_owned()],471					stdout: StdioMode::Null,472					stderr: StderrMode::Null,473					..Default::default()474				})475				.await;476			if let Ok(child) = probe {477				let _ = child.wait().await;478				return Ok((tool, flags));479			}480		}481		bail!("no escalation tool found")482	}483484	/// XDG_RUNTIME_DIR on the remote machine.485	pub fn runtime_dir(&self) -> Utf8PathBuf {486		match &self.0.transport {487			Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),488			Transport::Local { runtime_dir, .. } => runtime_dir.clone(),489		}490	}491492	/// Bind unix listener socket on the remote machine with auto-generated path, returning the path.493	pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {494		let sock = self495			.runtime_dir()496			.join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));497		let listener = self.bind_unix(&sock).await?;498		Ok((listener, sock))499	}500501	/// Bind unix listener socket on the remote machine on the specified path.502	pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {503		match &self.0.transport {504			Transport::Ssh { sess, subs, .. } => {505				let (tx, rx) = oneshot::channel();506				subs.lock().expect("lock").insert(path.to_owned(), tx);507				sess.streamlocal_forward(path.to_owned()).await?;508				Ok(RemowtListener::Ssh(rx))509			}510			Transport::Local { .. } => {511				let _ = std::fs::remove_file(path);512				Ok(RemowtListener::Local(513					UnixListener::bind(path)?,514					path.to_owned(),515				))516			}517		}518	}519}520521fn drain_stderr(stream: DuplexStream, context: String) {522	tokio::spawn(async move {523		let mut reader = BufReader::new(stream).lines();524		loop {525			match reader.next_line().await {526				Ok(Some(line)) => warn!(context = %context, "{line}"),527				Ok(None) => break,528				Err(e) => {529					warn!(context = %context, "stderr read failed: {e}");530					break;531				}532			}533		}534	});535}536537fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {538	if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {539		if !dir.is_empty() {540			return Ok((Utf8PathBuf::from(dir), None));541		}542	}543	let tmp = tempfile::Builder::new()544		.prefix("remowt.")545		.rand_bytes(12)546		.tempdir()?;547	let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())548		.map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;549	Ok((dir, Some(tmp)))550}551552async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {553	let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;554	let dir = dir.trim();555	if dir.is_empty() {556		let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;557		Ok(Utf8PathBuf::from(tmp))558	} else {559		Ok(Utf8PathBuf::from(dir))560	}561}
after · crates/remowt-client/src/lib.rs
1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use camino::{Utf8Path, Utf8PathBuf};10use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};13use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::keys::agent::client::AgentClient;15use russh::keys::agent::AgentIdentity;16use russh::keys::check_known_hosts;17use russh::keys::ssh_key::PublicKey;18use russh::Channel;19use tempfile::TempDir;20use tokio::io::AsyncRead;21use tokio::net::UnixListener;22use tokio::sync::oneshot;23use tokio::task::JoinHandle;24use tokio::{25	fs,26	io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},27};28use tracing::{debug, info, warn};29use uuid::Uuid;3031pub mod editor;32mod forwarded;33mod shell;34mod ssh_exec;35mod subprocess;3637use self::ssh_exec::SshExecChild;38pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};39pub use forwarded::{RemowtListener, RemowtStream};40pub use shell::{RemowtShell, RemowtShellResizer};4142type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4344fn sh_quote(s: impl AsRef<str>) -> String {45	format!("'{}'", s.as_ref().replace('\'', "'\\''"))46}4748const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];4950pub struct AgentBundle {51	dir: PathBuf,52	hashes: HashMap<String, String>,53}5455impl AgentBundle {56	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {57		let dir = dir.into();58		let hashes_path = dir.join("hashes");59		let raw = std::fs::read_to_string(&hashes_path)60			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;61		let mut hashes = HashMap::new();62		for line in raw.lines() {63			let line = line.trim();64			if line.is_empty() {65				continue;66			}67			let (arch, hash) = line68				.split_once(char::is_whitespace)69				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;70			hashes.insert(arch.to_owned(), hash.trim().to_owned());71		}72		ensure!(73			!hashes.is_empty(),74			"agent bundle {} has no hashes",75			dir.display()76		);77		Ok(Self { dir, hashes })78	}7980	fn binary(&self, arch: &str) -> PathBuf {81		self.dir.join(format!("remowt-agent-{arch}"))82	}8384	fn local_binary(&self) -> Result<PathBuf> {85		let arch = env::consts::ARCH;86		let path = self.binary(arch);87		ensure!(88			path.is_file(),89			"no local remowt-agent build for arch {arch} in bundle {}",90			self.dir.display()91		);92		Ok(path)93	}94}9596async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {97	let ch = sess.channel_open_session().await?;98	ch.exec(true, cmd).await?;99100	let mut child = SshExecChild::from_exec(ch);101	drop(child.stdin);102	drain_to_tracing(child.stderr, cmd.to_owned(), true);103104	let mut out = Vec::new();105	child.stdout.read_to_end(&mut out).await?;106	let code = child.exit.await.ok().flatten();107	Ok((code, out))108}109110async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {111	let (code, mut out) = run(sess, cmd).await?;112	ensure!(113		code == Some(0),114		"remote command failed (exit {code:?}): {cmd}"115	);116	if !out.is_empty() {117		ensure!(118			out.ends_with(b"\n"),119			"remote command was not newline-terminated: {cmd}: {out:?}"120		);121		out.pop();122	}123	String::from_utf8(out).context("expected utf8 output for command")124}125126async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {127	debug!("uname -a");128	let arch = run_string_ok(sess, "uname -m").await?;129	let hash = bundle130		.hashes131		.get(&arch)132		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133134	debug!("get dir");135	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;136	let dir = if cache.is_empty() {137		let home = run_string_ok(sess, "echo \"$HOME\"").await?;138		ensure!(139			!home.is_empty(),140			"remote $HOME and $XDG_CACHE_HOME both empty"141		);142		Utf8PathBuf::from(home).join(".cache/remowt")143	} else {144		Utf8PathBuf::from(cache).join("remowt")145	};146	let path = dir.join(hash);147148	debug!("presence");149	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;150	if present != Some(0) {151		let bin = bundle.binary(&arch);152		debug!("read");153		let bytes = fs::read(&bin)154			.await155			.with_context(|| format!("reading agent binary {}", bin.display()))?;156		debug!("upload");157		upload_agent(sess, &dir, &path, bytes).await?;158	}159	Ok(path)160}161162async fn upload_agent(163	sess: &Handle<SshHandler>,164	dir: &Utf8Path,165	path: &Utf8Path,166	bytes: Vec<u8>,167) -> Result<()> {168	debug!("mkdirp");169	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;170171	let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));172	let ch = sess.channel_open_session().await?;173	debug!("cat");174	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;175176	let mut child = SshExecChild::from_exec(ch);177	child178		.stdin179		.write_all(&bytes)180		.await181		.context("sending agent binary")?;182	child183		.stdin184		.shutdown()185		.await186		.context("sending agent binary")?;187	let code = child.wait().await;188	ensure!(code == Some(0), "agent upload failed (exit {code:?})");189190	debug!("chmod");191	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;192	run_string_ok(193		sess,194		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),195	)196	.await?;197	Ok(())198}199200pub struct SshHandler {201	host: String,202	port: u16,203	subs: Subs,204}205impl Handler for SshHandler {206	type Error = russh::Error;207	async fn check_server_key(208		&mut self,209		server_public_key: &PublicKey,210	) -> Result<bool, Self::Error> {211		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)212	}213	async fn server_channel_open_forwarded_streamlocal(214		&mut self,215		channel: Channel<Msg>,216		socket_path: &str,217		_session: &mut Session,218	) -> Result<(), Self::Error> {219		let Some(ch) = self220			.subs221			.lock()222			.expect("lock")223			.remove(&Utf8PathBuf::from(socket_path))224		else {225			return Err(russh::Error::WrongChannel);226		};227		let _ = ch.send(channel);228		Ok(())229	}230}231232enum Transport {233	Ssh {234		sess: Arc<Handle<SshHandler>>,235		subs: Subs,236		runtime_dir: Utf8PathBuf,237		agent_path: Utf8PathBuf,238	},239	Local {240		agent_path: PathBuf,241		runtime_dir: Utf8PathBuf,242	},243}244245struct RemowtInner {246	transport: Transport,247	rpc: Rpc<BifConfig>,248	elevated: tokio::sync::OnceCell<()>,249	#[allow(dead_code)]250	children: Mutex<Vec<tokio::process::Child>>,251	_runtime_tmp: Option<TempDir>,252}253254#[derive(Clone)]255pub struct Remowt(Arc<RemowtInner>);256257pub type RemowtRemote = Remote<BifConfig>;258259impl Remowt {260	/// Connect to the remote host over ssh, detect the architecture and deploy the required261	/// agent binary.262	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {263		let conf = russh_config::parse_home(host)?;264		let port = conf.host_config.port.or(conf.port).unwrap_or(22);265		let hostname = conf266			.host_config267			.hostname268			.clone()269			.unwrap_or_else(|| conf.host_name.clone());270		let user = conf271			.user272			.clone()273			.unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));274275		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));276		let mut sess = connect(277			Arc::new(Config::default()),278			(hostname.clone(), port),279			SshHandler {280				host: hostname,281				port,282				subs: subs.clone(),283			},284		)285		.await?;286287		let mut agent = AgentClient::connect_env().await?;288		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();289		let mut authenticated = false;290		for ident in agent.request_identities().await? {291			let AgentIdentity::PublicKey { key, .. } = ident else {292				continue;293			};294			if sess295				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)296				.await?297				.success()298			{299				authenticated = true;300				break;301			}302		}303		ensure!(authenticated, "ssh authentication failed");304305		let sess = Arc::new(sess);306307		debug!("deploying agent");308		let agent_path = deploy_agent(&sess, bundle).await?;309310		debug!("runtime dir");311		let runtime_dir = remote_runtime_dir(&sess).await?;312313		let rpc = Rpc::<BifConfig>::new(Address::User);314315		let cmd_chan = sess.channel_open_session().await?;316		debug!("starting agent");317		cmd_chan318			.exec(true, format!("{} real-agent", sh_quote(&agent_path)))319			.await?;320321		let child = SshExecChild::from_exec(cmd_chan);322		drain_to_tracing(child.stderr, "agent".to_owned(), true);323		rpc.add_direct(324			Address::Agent,325			child_port(child.stdout, child.stdin),326			Rtt(0),327		);328329		Ok(Self(Arc::new(RemowtInner {330			transport: Transport::Ssh {331				sess,332				subs,333				runtime_dir,334				agent_path,335			},336			rpc,337			elevated: tokio::sync::OnceCell::new(),338			children: Mutex::new(Vec::new()),339			_runtime_tmp: None,340		})))341	}342343	/// "Connect" to the local machine's agent, by starting the agent binary locally.344	pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {345		let agent_path = bundle.local_binary()?;346		let mut child = tokio::process::Command::new(&agent_path)347			.arg("real-agent")348			.arg("--local")349			.stdin(std::process::Stdio::piped())350			.stdout(std::process::Stdio::piped())351			.kill_on_drop(true)352			.spawn()353			.with_context(|| format!("spawning agent binary {}", agent_path.display()))?;354		let stdin = child.stdin.take().expect("stdin piped");355		let stdout = child.stdout.take().expect("stdout piped");356357		let rpc = Rpc::<BifConfig>::new(Address::User);358		rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));359360		let (runtime_dir, runtime_tmp) = local_runtime_dir()?;361362		Ok(Self(Arc::new(RemowtInner {363			transport: Transport::Local {364				agent_path,365				runtime_dir,366			},367			rpc,368			elevated: tokio::sync::OnceCell::new(),369			children: Mutex::new(vec![child]),370			_runtime_tmp: runtime_tmp,371		})))372	}373374	/// Get the handle to the underlying russh session handle.375	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {376		match &self.0.transport {377			Transport::Ssh { sess, .. } => Some(sess.clone()),378			Transport::Local { .. } => None,379		}380	}381382	pub fn rpc(&self) -> Rpc<BifConfig> {383		self.0.rpc.clone()384	}385386	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {387		let client: PluginEndpointsClient<BifConfig> = self.endpoints();388		client389			.load_plugin(id, name.to_owned())390			.await?391			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))392	}393	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {394		self.ensure_escalated().await?;395		let client: PluginEndpointsClient<BifConfig> =396			PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));397		client398			.load_plugin_path(id, path.to_owned())399			.await?400			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))401	}402	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {403		R::wrap(self.0.rpc.remote(Address::Plugin(id)))404	}405406	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {407		R::wrap(self.0.rpc.remote(Address::Agent))408	}409	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {410		self.ensure_escalated().await?;411		Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))412	}413414	async fn ensure_escalated(&self) -> Result<()> {415		self.0416			.elevated417			.get_or_try_init(|| async {418				let (agent_path, local) = match &self.0.transport {419					Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),420					Transport::Local { agent_path, .. } => (421						agent_path422							.to_str()423							.ok_or_else(|| anyhow!("local agent path is not utf-8"))?424							.to_owned(),425						true,426					),427				};428429				let (tool, flags) = self.detect_escalation().await?;430				let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();431				args.push(agent_path);432				args.push("real-agent".to_owned());433				args.push("--privileged".to_owned());434				if local {435					args.push("--local".to_owned());436				}437438				let child = self439					.spawn(SpawnOptions {440						program: tool.to_owned(),441						args,442						stdin: StdioMode::Pipe,443						stdout: StdioMode::Pipe,444						stderr: StderrMode::Inherit,445						..Default::default()446					})447					.await448					.context("spawning privileged agent")?;449450				let stdin = child451					.stdin452					.ok_or_else(|| anyhow!("privileged agent stdin missing"))?;453				let stdout = child454					.stdout455					.ok_or_else(|| anyhow!("privileged agent stdout missing"))?;456457				let port = child_port(stdout, stdin);458				self.0459					.rpc460					.add_direct(Address::AgentPrivileged, port, Rtt(0));461				anyhow::Ok(())462			})463			.await?;464		Ok(())465	}466467	async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {468		for (tool, flags) in ESCALATORS {469			let probe = self470				.spawn(SpawnOptions {471					program: (*tool).to_owned(),472					args: vec!["--version".to_owned()],473					stdout: StdioMode::Null,474					stderr: StderrMode::Null,475					..Default::default()476				})477				.await;478			if let Ok(child) = probe {479				let _ = child.wait().await;480				return Ok((tool, flags));481			}482		}483		bail!("no escalation tool found")484	}485486	/// XDG_RUNTIME_DIR on the remote machine.487	pub fn runtime_dir(&self) -> Utf8PathBuf {488		match &self.0.transport {489			Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),490			Transport::Local { runtime_dir, .. } => runtime_dir.clone(),491		}492	}493494	/// Bind unix listener socket on the remote machine with auto-generated path, returning the path.495	pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {496		let sock = self497			.runtime_dir()498			.join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));499		let listener = self.bind_unix(&sock).await?;500		Ok((listener, sock))501	}502503	/// Bind unix listener socket on the remote machine on the specified path.504	pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {505		match &self.0.transport {506			Transport::Ssh { sess, subs, .. } => {507				let (tx, rx) = oneshot::channel();508				subs.lock().expect("lock").insert(path.to_owned(), tx);509				sess.streamlocal_forward(path.to_owned()).await?;510				Ok(RemowtListener::Ssh(rx))511			}512			Transport::Local { .. } => {513				let _ = std::fs::remove_file(path);514				Ok(RemowtListener::Local(515					UnixListener::bind(path)?,516					path.to_owned(),517				))518			}519		}520	}521}522523pub(crate) fn drain_to_tracing(524	stream: impl AsyncRead + Unpin + 'static + Send,525	context: String,526	stderr: bool,527) -> JoinHandle<()> {528	tokio::spawn(async move {529		let mut reader = BufReader::new(stream);530		let mut buf = Vec::with_capacity(4096);531		loop {532			buf.clear();533			match reader.read_until(b'\n', &mut buf).await {534				Ok(0) => break,535				Ok(_) => {536					let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));537					if stderr {538						warn!(context = %context, "{line}");539					} else {540						info!(context = %context, "{line}");541					}542				}543				Err(e) => {544					warn!(context = %context, "child stdio read failed: {e}");545					break;546				}547			}548		}549	})550}551552fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {553	if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {554		if !dir.is_empty() {555			return Ok((Utf8PathBuf::from(dir), None));556		}557	}558	let tmp = tempfile::Builder::new()559		.prefix("remowt.")560		.rand_bytes(12)561		.tempdir()?;562	let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())563		.map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;564	Ok((dir, Some(tmp)))565}566567async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {568	let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;569	let dir = dir.trim();570	if dir.is_empty() {571		let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;572		Ok(Utf8PathBuf::from(tmp))573	} else {574		Ok(Utf8PathBuf::from(dir))575	}576}
modifiedcrates/remowt-client/src/subprocess.rsdiffbeforeafterboth
--- a/crates/remowt-client/src/subprocess.rs
+++ b/crates/remowt-client/src/subprocess.rs
@@ -6,13 +6,13 @@
 use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
 use remowt_link_shared::BifConfig;
 use serde::de::DeserializeOwned;
-use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};
+use tokio::io::AsyncWriteExt as _;
 use tokio::select;
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
 use tracing::{debug, info, warn};
 
 use crate::forwarded::{RemowtListener, RemowtStream};
-use crate::Remowt;
+use crate::{drain_to_tracing, Remowt};
 
 #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
 pub enum StdioMode {
@@ -62,12 +62,24 @@
 			client,
 		} = self;
 		drop(stdin);
-		drop(stdout);
-		drop(stderr);
-		client
-			.wait(id)
-			.await?
-			.map_err(|e| anyhow!("agent wait failed: {e}"))
+		let drain_out = async move {
+			if let Some(s) = stdout {
+				drain_to_tracing(s, "<child stdout>".to_owned(), false).await;
+			}
+		};
+		let drain_err = async move {
+			if let Some(s) = stderr {
+				drain_to_tracing(s, "<child stderr>".to_owned(), true).await;
+			}
+		};
+		let wait = async move {
+			client
+				.wait(id)
+				.await?
+				.map_err(|e| anyhow!("agent wait failed: {e}"))
+		};
+		let (code, _, _) = tokio::join!(wait, drain_out, drain_err);
+		code
 	}
 
 	pub async fn kill(&self, signal: i32) -> Result<()> {
@@ -163,7 +175,7 @@
 		);
 
 		let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
-		let stdout_stream = handle_output(stdout, stdout_res?, &program, false);
+		let stdout_stream = handle_output(stdout, stdout_res?, &program);
 		let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
 
 		Ok(RemowtChild {
@@ -215,18 +227,13 @@
 	}
 }
 
-fn handle_output(
-	mode: StdioMode,
-	s: Option<RemowtStream>,
-	program: &str,
-	is_stderr: bool,
-) -> Option<RemowtStream> {
+fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
 	match mode {
 		StdioMode::Pipe => s,
 		StdioMode::Inherit => {
 			if let Some(s) = s {
 				let program = program.to_owned();
-				tokio::spawn(pump_to_tracing(s, program, is_stderr));
+				tokio::spawn(drain_to_tracing(s, program, false));
 			}
 			None
 		}
@@ -244,7 +251,7 @@
 		StderrMode::Inherit => {
 			if let Some(s) = s {
 				let program = program.to_owned();
-				tokio::spawn(pump_to_tracing(s, program, true));
+				tokio::spawn(drain_to_tracing(s, program, true));
 			}
 			None
 		}
@@ -252,26 +259,6 @@
 	}
 }
 
-async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {
-	let mut reader = BufReader::new(stream).lines();
-	loop {
-		match reader.next_line().await {
-			Ok(Some(line)) => {
-				if is_stderr {
-					warn!(program, "{line}");
-				} else {
-					info!(program, "{line}");
-				}
-			}
-			Ok(None) => break,
-			Err(e) => {
-				warn!(program, "child log stream error: {e}");
-				break;
-			}
-		}
-	}
-}
-
 fn escape_bash(input: &str, out: &mut String) {
 	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
 	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
@@ -422,6 +409,27 @@
 		}
 	};
 
+	while let Some(e) = err.next().await {
+		if let Ok(line) = e {
+			warn!(program = %program, "{line}");
+		}
+	}
+	if want_stdout {
+		if let Some(out_bytes) = out_bytes.as_mut() {
+			while let Some(o) = out_bytes.next().await {
+				if let Ok(chunk) = o {
+					buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);
+				}
+			}
+		}
+	} else if let Some(out_lines) = out_lines.as_mut() {
+		while let Some(o) = out_lines.next().await {
+			if let Ok(line) = o {
+				info!(program = %program, "{line}");
+			}
+		}
+	}
+
 	match exit {
 		Some(0) => Ok(buf),
 		Some(c) => bail!("command '{line}' failed with status {c}"),
modifiedcrates/remowt-link-shared/src/port.rsdiffbeforeafterboth
--- a/crates/remowt-link-shared/src/port.rs
+++ b/crates/remowt-link-shared/src/port.rs
@@ -3,10 +3,8 @@
 use bifrostlink::Port;
 use bytes::{Bytes, BytesMut};
 use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+use tokio::select;
 
-/// Wire a length-prefixed duplex byte stream (e.g. a child process's
-/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
-/// length followed by that many payload bytes.
 pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
 where
 	R: AsyncRead + Unpin + Send + 'static,
@@ -18,13 +16,13 @@
 				let len = match reader.read_u32().await {
 					Ok(len) => len,
 					Err(e) => {
-						tracing::error!("child read failed: {e}");
+						log_read_end(&e);
 						break;
 					}
 				};
 				let mut buf = BytesMut::zeroed(len as usize);
 				if let Err(e) = reader.read_exact(&mut buf).await {
-					tracing::error!("child read failed: {e}");
+					log_read_end(&e);
 					break;
 				}
 				if tx.send(buf.freeze()).is_err() {
@@ -35,15 +33,45 @@
 		let write_task = async move {
 			while let Some(msg) = rx.recv().await {
 				if let Err(e) = write_frame(&mut writer, msg).await {
-					tracing::error!("child write failed: {e}");
+					log_write_end(&e);
 					break;
 				}
 			}
 		};
-		tokio::join!(read_task, write_task);
+		select! {
+			_ = read_task => {},
+			_ = write_task => {},
+		}
 	})
 }
 
+fn log_read_end(e: &io::Error) {
+	if matches!(
+		e.kind(),
+		io::ErrorKind::UnexpectedEof
+			| io::ErrorKind::BrokenPipe
+			| io::ErrorKind::ConnectionReset
+			| io::ErrorKind::ConnectionAborted
+	) {
+		tracing::debug!("child read ended: {e}");
+	} else {
+		tracing::error!("child read failed: {e}");
+	}
+}
+
+fn log_write_end(e: &io::Error) {
+	if matches!(
+		e.kind(),
+		io::ErrorKind::BrokenPipe
+			| io::ErrorKind::ConnectionReset
+			| io::ErrorKind::ConnectionAborted
+	) {
+		tracing::debug!("child write ended: {e}");
+	} else {
+		tracing::error!("child write failed: {e}");
+	}
+}
+
 async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
 	let len = u32::try_from(msg.len())
 		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;