git.delta.rocks / remowt / refs/commits / 751b8f1ca8aa

difftreelog

feat subprocess through agent

kwlonlsvYaroslav Bolyukin6 days agoparent: #99930b3.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,9 +308,9 @@
 
 [[package]]
 name = "bifrostlink"
-version = "0.2.2"
+version = "0.2.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4bd6d4c7cf73270cbf6846a42361d409df23262d276ff15110e787067d89ad1d"
+checksum = "704657867d2107831c57edd363726440c86675464b5fc113702854e17062cafc"
 dependencies = [
  "async-trait",
  "async_fn_traits",
@@ -327,9 +327,9 @@
 
 [[package]]
 name = "bifrostlink-macros"
-version = "0.2.2"
+version = "0.2.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "329813d3e34c7e65638480cefbb09d1e1dc47fa5dbf0f5f0cf8d9eee424f4f19"
+checksum = "158308eb569b467c0116680f79d0ecc389f4d540f6d5a0c9279bfe79b1cd5bdb"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -338,9 +338,9 @@
 
 [[package]]
 name = "bifrostlink-ports"
-version = "0.2.2"
+version = "0.2.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d6683eb0d4366b762fc45c0eb14e384d3554a23bc1ba3f1246368900d41e3700"
+checksum = "7612993f0bd8bc6a71867461266567212a35a716b2a5aef5f9967ab08c891782"
 dependencies = [
  "bifrostlink",
  "bytes",
@@ -1835,7 +1835,7 @@
 
 [[package]]
 name = "polkit-backend"
-version = "0.1.1"
+version = "0.1.3"
 dependencies = [
  "anyhow",
  "clap",
@@ -2055,7 +2055,7 @@
 
 [[package]]
 name = "remowt-agent"
-version = "0.1.1"
+version = "0.1.3"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2083,13 +2083,14 @@
 
 [[package]]
 name = "remowt-client"
-version = "0.1.1"
+version = "0.1.3"
 dependencies = [
  "anyhow",
  "bifrostlink",
  "bifrostlink-ports",
  "bytes",
  "camino",
+ "futures",
  "remowt-endpoints",
  "remowt-link-shared",
  "russh",
@@ -2098,13 +2099,14 @@
  "serde_json",
  "tempfile",
  "tokio",
+ "tokio-util",
  "tracing",
  "uuid",
 ]
 
 [[package]]
 name = "remowt-endpoints"
-version = "0.1.1"
+version = "0.1.3"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2122,7 +2124,7 @@
 
 [[package]]
 name = "remowt-link-shared"
-version = "0.1.1"
+version = "0.1.3"
 dependencies = [
  "bifrostlink",
  "bytes",
@@ -2136,7 +2138,7 @@
 
 [[package]]
 name = "remowt-plugin"
-version = "0.1.1"
+version = "0.1.3"
 dependencies = [
  "anyhow",
  "bifrostlink",
@@ -2150,7 +2152,7 @@
 
 [[package]]
 name = "remowt-polkit-shared"
-version = "0.1.1"
+version = "0.1.3"
 dependencies = [
  "nix",
  "serde",
@@ -2159,7 +2161,7 @@
 
 [[package]]
 name = "remowt-ssh"
-version = "0.1.1"
+version = "0.1.3"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -2187,7 +2189,7 @@
 
 [[package]]
 name = "remowt-ui-prompt"
-version = "0.1.1"
+version = "0.1.3"
 dependencies = [
  "bifrostlink",
  "bifrostlink-macros",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,18 +3,18 @@
 resolver = "2"
 
 [workspace.package]
-version = "0.1.1"
+version = "0.1.3"
 license = "MIT"
 edition = "2021"
-repository = "https://gitlab.delta.directory/iam/remowt"
+repository = "https://git.delta.rocks/r/remowt"
 
 [workspace.dependencies]
-remowt-client = { version = "0.1.1", path = "crates/remowt-client" }
-remowt-polkit-shared = { version = "0.1.1", path = "crates/polkit-shared" }
-remowt-link-shared = { version = "0.1.1", path = "crates/remowt-link-shared" }
-remowt-plugin = { version = "0.1.1", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.1", path = "crates/remowt-ui-prompt" }
-remowt-endpoints = { version = "0.1.1", path = "crates/remowt-endpoints" }
+remowt-client = { version = "0.1.3", path = "crates/remowt-client" }
+remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }
+remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }
+remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }
 
 bifrostlink = "0.2.0"
 bifrostlink-macros = "0.2.0"
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -11,7 +11,7 @@
 use bifrostlink_ports::stdio::from_stdio;
 use bifrostlink_ports::unix_socket::from_socket;
 use clap::Parser;
-use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};
+use remowt_endpoints::{fs::Fs, nix_daemon::NixDaemon, pty::Pty, subprocess::Subprocess, systemd::Systemd};
 use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};
 use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};
 use remowt_ui_prompt::bifrost::PromptEndpointsClient;
@@ -21,7 +21,7 @@
 use tokio::net::UnixStream;
 use tokio::runtime::Builder;
 use tokio::task::AbortHandle;
-use tracing::{debug, info, trace};
+use tracing::{debug, trace};
 use zbus::fdo;
 use zbus::zvariant::{OwnedValue, Str};
 use zbus::{interface, proxy, Connection};
@@ -85,7 +85,7 @@
 		identities: Vec<Identity>,
 	) -> zbus::fdo::Result<()> {
 		use std::fmt::Write;
-		info!("begin auth");
+		debug!("begin auth");
 		let _cancel_guard = Arc::new(OnceLock::new());
 		let task = {
 			let helper = self.helper.clone();
@@ -220,6 +220,8 @@
 		/// Expect own address to be AgentPrivileged, skip installing polkit agent
 		#[arg(long)]
 		privileged: bool,
+		#[arg(long)]
+		local: bool,
 	},
 	LocalAgent,
 }
@@ -240,7 +242,11 @@
 		} => runtime.block_on(askpass::ask(&prompt, description)),
 		Opts::LocalAgent => runtime.block_on(main_real()),
 		Opts::Editor { path } => runtime.block_on(editor::edit(path)),
-		Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),
+		Opts::RealAgent {
+			path,
+			privileged,
+			local,
+		} => runtime.block_on(main_real_agent(path, privileged, local)),
 	}
 }
 async fn main_real() -> anyhow::Result<()> {
@@ -253,7 +259,11 @@
 	let _conn = conn;
 	pending().await
 }
-async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {
+async fn main_real_agent(
+	path: Option<PathBuf>,
+	privileged: bool,
+	local: bool,
+) -> anyhow::Result<()> {
 	let address = if privileged {
 		Address::AgentPrivileged
 	} else {
@@ -264,6 +274,8 @@
 	Fs::new().register_endpoints(&mut rpc);
 	Systemd.register_endpoints(&mut rpc);
 	Pty::new().register_endpoints(&mut rpc);
+	Subprocess::new().register_endpoints(&mut rpc);
+	NixDaemon.register_endpoints(&mut rpc);
 
 	remowt_plugin::host::serve(&mut rpc);
 
@@ -312,7 +324,7 @@
 	};
 	rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));
 
-	let polkit_conn = if !privileged {
+	let polkit_conn = if !privileged && !local {
 		// The unprivileged agent doubles as a polkit authentication agent so
 		// `run0` (e.g. our own elevation) routes its prompt to the User over
 		// bifrost instead of failing on a tty-less session.
modifiedcmds/remowt-ssh/Cargo.tomldiffbeforeafterboth
--- a/cmds/remowt-ssh/Cargo.toml
+++ b/cmds/remowt-ssh/Cargo.toml
@@ -11,7 +11,7 @@
 tracing-subscriber.workspace = true
 bifrostlink.workspace = true
 remowt-link-shared.workspace = true
-remowt-client = { workspace = true, features = ["shell"] }
+remowt-client.workspace = true
 tokio = { workspace = true, features = [
 	"macros",
 	"fs",
modifiedcmds/remowt-ssh/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -24,9 +24,16 @@
 #[derive(Parser)]
 enum Opts {
 	/// Connect to remote host with remowt agent.
-	Ssh { host: String },
+	Ssh {
+		host: String,
+		#[arg(long)]
+		escalate: bool,
+	},
 	/// Connect to local host for testing the connectivity.
-	Local,
+	Local {
+		#[arg(long)]
+		escalate: bool,
+	},
 }
 
 fn agents_dir() -> anyhow::Result<PathBuf> {
@@ -45,9 +52,9 @@
 	let opts = Opts::parse();
 
 	let bundle = AgentBundle::from_dir(agents_dir()?)?;
-	let conn = match &opts {
-		Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,
-		Opts::Local => Remowt::connect_local(&bundle).await?,
+	let (conn, escalate) = match &opts {
+		Opts::Ssh { host, escalate } => (Remowt::connect(host, &bundle).await?, *escalate),
+		Opts::Local { escalate } => (Remowt::connect_local(&bundle).await?, *escalate),
 	};
 	let mut rpc = conn.rpc();
 
@@ -56,8 +63,8 @@
 		PrependSourcePrompter {
 			prompter: RofiPrompter,
 			source: match opts {
-				Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
-				Opts::Local => vec![],
+				Opts::Ssh { host, .. } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
+				Opts::Local { .. } => vec![],
 			},
 			description: "".to_owned(),
 		},
@@ -67,13 +74,13 @@
 	}
 
 	debug!("entering shell");
-	run_shell(&conn).await?;
+	run_shell(&conn, escalate).await?;
 	debug!("shell ended");
 
 	Ok(())
 }
 
-async fn run_shell(conn: &Remowt) -> anyhow::Result<()> {
+async fn run_shell(conn: &Remowt, escalate: bool) -> anyhow::Result<()> {
 	let term = match std::env::var("TERM") {
 		Ok(v) => v,
 		Err(VarError::NotPresent) => "xterm-256color".to_owned(),
@@ -81,7 +88,7 @@
 	};
 	let (cols, rows) = term_size().unwrap_or((80, 24));
 
-	let shell = conn.open_shell(&term, cols, rows).await?;
+	let shell = conn.open_shell(&term, cols, rows, escalate).await?;
 	let resizer = shell.resizer();
 	let stream = shell.stream;
 
modifiedcrates/remowt-client/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -27,7 +27,6 @@
 ] }
 tracing.workspace = true
 uuid = { workspace = true, features = ["v4"] }
-remowt-endpoints = { workspace = true, optional = true }
-
-[features]
-shell = ["dep:remowt-endpoints"]
+remowt-endpoints.workspace = true
+tokio-util = { workspace = true, features = ["codec"] }
+futures.workspace = true
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -6,7 +6,6 @@
 use anyhow::{anyhow, bail, ensure, Context as _, Result};
 use bifrostlink::declarative::RemoteEndpoints;
 use bifrostlink::{Remote, Rpc, Rtt};
-use bifrostlink_ports::unix_socket::from_socket;
 use camino::{Utf8Path, Utf8PathBuf};
 use remowt_link_shared::plugin::PluginEndpointsClient;
 use remowt_link_shared::port::child_port;
@@ -19,26 +18,23 @@
 use russh::Channel;
 use tempfile::TempDir;
 use tokio::net::UnixListener;
-use tokio::sync::oneshot::{self, channel};
+use tokio::sync::oneshot;
 use tokio::{
 	fs,
-	io::{AsyncReadExt as _, AsyncWriteExt as _},
+	io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},
 };
-use tracing::{debug, error};
+use tracing::{debug, warn};
 use uuid::Uuid;
-
-use self::port::channel_port;
-use self::subprocess::RemowtChild;
 
 pub mod editor;
 mod forwarded;
-mod port;
-#[cfg(feature = "shell")]
 mod shell;
+mod ssh_exec;
 mod subprocess;
 
+use self::ssh_exec::SshExecChild;
+pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};
 pub use forwarded::{RemowtListener, RemowtStream};
-#[cfg(feature = "shell")]
 pub use shell::{RemowtShell, RemowtShellResizer};
 
 type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
@@ -99,18 +95,12 @@
 	let ch = sess.channel_open_session().await?;
 	ch.exec(true, cmd).await?;
 
-	let mut child = RemowtChild::from_exec(ch);
+	let mut child = SshExecChild::from_exec(ch);
 	drop(child.stdin);
+	drain_stderr(child.stderr, cmd.to_owned());
 
 	let mut out = Vec::new();
-	let mut err = Vec::new();
-	tokio::try_join!(
-		child.stdout.read_to_end(&mut out),
-		child.stderr.read_to_end(&mut err),
-	)?;
-	if !err.is_empty() {
-		error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());
-	}
+	child.stdout.read_to_end(&mut out).await?;
 	let code = child.exit.await.ok().flatten();
 	Ok((code, out))
 }
@@ -140,9 +130,7 @@
 		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;
 
 	debug!("get dir");
-	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")
-		.await?
-		.to_owned();
+	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;
 	let dir = if cache.is_empty() {
 		let home = run_string_ok(sess, "echo \"$HOME\"").await?;
 		ensure!(
@@ -183,7 +171,7 @@
 	debug!("cat");
 	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;
 
-	let mut child = RemowtChild::from_exec(ch);
+	let mut child = SshExecChild::from_exec(ch);
 	child
 		.stdin
 		.write_all(&bytes)
@@ -205,39 +193,6 @@
 	)
 	.await?;
 	Ok(())
-}
-
-async fn detect_escalation(
-	sess: &Handle<SshHandler>,
-) -> Result<(&'static str, &'static [&'static str])> {
-	for (tool, flags) in ESCALATORS {
-		// `tool` is a fixed identifier (no metacharacters), safe to interpolate.
-		let (code, _) = run(sess, &format!("command -v {tool}")).await?;
-		if code == Some(0) {
-			return Ok((tool, flags));
-		}
-	}
-	bail!("no escalation tool found on remote")
-}
-
-fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {
-	let mut parts = vec![tool.to_owned()];
-	parts.extend(flags.iter().map(|f| f.to_string()));
-	parts.push(sh_quote(agent_path));
-	parts.push("real-agent".to_owned());
-	parts.push("--privileged".to_owned());
-	if let Some(p) = path {
-		parts.push("--path".to_owned());
-		parts.push(sh_quote(p));
-	}
-	parts.join(" ")
-}
-
-fn find_in_path(name: &str) -> Option<std::path::PathBuf> {
-	let path = env::var_os("PATH")?;
-	env::split_paths(&path)
-		.map(|dir| dir.join(name))
-		.find(|p| p.is_file())
 }
 
 pub struct SshHandler {
@@ -285,17 +240,23 @@
 	},
 }
 
-pub struct Remowt {
+struct RemowtInner {
 	transport: Transport,
 	rpc: Rpc<BifConfig>,
 	elevated: tokio::sync::OnceCell<()>,
+	#[allow(dead_code)]
 	children: Mutex<Vec<tokio::process::Child>>,
 	_runtime_tmp: Option<TempDir>,
 }
 
+#[derive(Clone)]
+pub struct Remowt(Arc<RemowtInner>);
+
 pub type RemowtRemote = Remote<BifConfig>;
 
 impl Remowt {
+	/// Connect to the remote host over ssh, detect the architecture and deploy the required
+	/// agent binary.
 	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {
 		let conf = russh_config::parse_home(host)?;
 		let port = conf.host_config.port.or(conf.port).unwrap_or(22);
@@ -346,36 +307,24 @@
 
 		debug!("runtime dir");
 		let runtime_dir = remote_runtime_dir(&sess).await?;
-		let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));
 
-		let (onetx, onerx) = channel();
-		subs.lock().expect("lock").insert(primary.clone(), onetx);
-		sess.streamlocal_forward(primary.clone()).await?;
-
 		let rpc = Rpc::<BifConfig>::new(Address::User);
 
-		// TODO: ensure no injection is possible in the socket path.
 		let cmd_chan = sess.channel_open_session().await?;
 		debug!("starting agent");
 		cmd_chan
-			.exec(
-				true,
-				format!(
-					"{} real-agent --path={}",
-					sh_quote(&agent_path),
-					sh_quote(&primary)
-				),
-			)
+			.exec(true, format!("{} real-agent", sh_quote(&agent_path)))
 			.await?;
 
-		let port = channel_port(
-			onerx
-				.await
-				.map_err(|_| anyhow!("agent never opened its channel"))?,
+		let child = SshExecChild::from_exec(cmd_chan);
+		drain_stderr(child.stderr, "agent".to_owned());
+		rpc.add_direct(
+			Address::Agent,
+			child_port(child.stdout, child.stdin),
+			Rtt(0),
 		);
-		rpc.add_direct(Address::Agent, port, Rtt(0));
 
-		Ok(Self {
+		Ok(Self(Arc::new(RemowtInner {
 			transport: Transport::Ssh {
 				sess,
 				subs,
@@ -386,13 +335,15 @@
 			elevated: tokio::sync::OnceCell::new(),
 			children: Mutex::new(Vec::new()),
 			_runtime_tmp: None,
-		})
+		})))
 	}
 
+	/// "Connect" to the local machine's agent, by starting the agent binary locally.
 	pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {
 		let agent_path = bundle.local_binary()?;
 		let mut child = tokio::process::Command::new(&agent_path)
 			.arg("real-agent")
+			.arg("--local")
 			.stdin(std::process::Stdio::piped())
 			.stdout(std::process::Stdio::piped())
 			.kill_on_drop(true)
@@ -406,7 +357,7 @@
 
 		let (runtime_dir, runtime_tmp) = local_runtime_dir()?;
 
-		Ok(Self {
+		Ok(Self(Arc::new(RemowtInner {
 			transport: Transport::Local {
 				agent_path,
 				runtime_dir,
@@ -415,22 +366,19 @@
 			elevated: tokio::sync::OnceCell::new(),
 			children: Mutex::new(vec![child]),
 			_runtime_tmp: runtime_tmp,
-		})
+		})))
 	}
 
+	/// Get the handle to the underlying russh session handle.
 	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {
-		match &self.transport {
+		match &self.0.transport {
 			Transport::Ssh { sess, .. } => Some(sess.clone()),
 			Transport::Local { .. } => None,
 		}
 	}
 
 	pub fn rpc(&self) -> Rpc<BifConfig> {
-		self.rpc.clone()
-	}
-
-	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {
-		R::wrap(self.rpc.remote(Address::Agent))
+		self.0.rpc.clone()
 	}
 
 	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {
@@ -441,85 +389,118 @@
 			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))
 	}
 	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {
-		self.ensure_elevated().await?;
+		self.ensure_escalated().await?;
 		let client: PluginEndpointsClient<BifConfig> =
-			PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));
+			PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));
 		client
 			.load_plugin_path(id, path.to_owned())
 			.await?
 			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))
 	}
 	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {
-		R::wrap(self.rpc.remote(Address::Plugin(id)))
+		R::wrap(self.0.rpc.remote(Address::Plugin(id)))
 	}
+
+	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {
+		R::wrap(self.0.rpc.remote(Address::Agent))
+	}
 	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {
-		self.ensure_elevated().await?;
-		Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))
+		self.ensure_escalated().await?;
+		Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))
 	}
 
-	async fn ensure_elevated(&self) -> Result<()> {
-		self.elevated
+	async fn ensure_escalated(&self) -> Result<()> {
+		self.0
+			.elevated
 			.get_or_try_init(|| async {
-				let port = match &self.transport {
-					Transport::Ssh {
-						sess, agent_path, ..
-					} => {
-						let (tool, flags) = detect_escalation(sess).await?;
-						let ch = sess.channel_open_session().await?;
-						ch.exec(true, privileged_cmd(tool, flags, agent_path, None))
-							.await?;
-						channel_port(ch)
-					}
-					Transport::Local { agent_path, .. } => {
-						let sock = self
-							.runtime_dir()
-							.join(format!("remowt-priv-{}.sock", Uuid::new_v4()));
-						let _ = std::fs::remove_file(&sock);
-						let listener = UnixListener::bind(&sock)?;
-						let (tool, flags) = ESCALATORS
-							.iter()
-							.find(|(t, _)| find_in_path(t).is_some())
-							.ok_or_else(|| anyhow!("no escalation tool found"))?;
-						let child = tokio::process::Command::new(tool)
-							.args(*flags)
-							.arg(agent_path)
-							.arg("real-agent")
-							.arg("--privileged")
-							.arg("--path")
-							.arg(sock.as_str())
-							.kill_on_drop(true)
-							.spawn()?;
-						self.children.lock().expect("lock").push(child);
-						let (stream, _) = listener.accept().await?;
-						let _ = std::fs::remove_file(&sock);
-						from_socket(stream)
-					}
+				let (agent_path, local) = match &self.0.transport {
+					Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),
+					Transport::Local { agent_path, .. } => (
+						agent_path
+							.to_str()
+							.ok_or_else(|| anyhow!("local agent path is not utf-8"))?
+							.to_owned(),
+						true,
+					),
 				};
-				self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));
+
+				let (tool, flags) = self.detect_escalation().await?;
+				let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();
+				args.push(agent_path);
+				args.push("real-agent".to_owned());
+				args.push("--privileged".to_owned());
+				if local {
+					args.push("--local".to_owned());
+				}
+
+				let child = self
+					.spawn(SpawnOptions {
+						program: tool.to_owned(),
+						args,
+						stdin: StdioMode::Pipe,
+						stdout: StdioMode::Pipe,
+						stderr: StderrMode::Inherit,
+						..Default::default()
+					})
+					.await
+					.context("spawning privileged agent")?;
+
+				let stdin = child
+					.stdin
+					.ok_or_else(|| anyhow!("privileged agent stdin missing"))?;
+				let stdout = child
+					.stdout
+					.ok_or_else(|| anyhow!("privileged agent stdout missing"))?;
+
+				let port = child_port(stdout, stdin);
+				self.0
+					.rpc
+					.add_direct(Address::AgentPrivileged, port, Rtt(0));
 				anyhow::Ok(())
 			})
 			.await?;
 		Ok(())
 	}
 
-	pub async fn exec(&self, command: String) -> Result<RemowtChild> {
-		let Some(sess) = self.ssh() else {
-			bail!("exec should not be called on local")
-		};
-		let ch = sess.channel_open_session().await?;
-		ch.exec(true, command).await?;
-		Ok(RemowtChild::from_exec(ch))
+	async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {
+		for (tool, flags) in ESCALATORS {
+			let probe = self
+				.spawn(SpawnOptions {
+					program: (*tool).to_owned(),
+					args: vec!["--version".to_owned()],
+					stdout: StdioMode::Null,
+					stderr: StderrMode::Null,
+					..Default::default()
+				})
+				.await;
+			if let Ok(child) = probe {
+				let _ = child.wait().await;
+				return Ok((tool, flags));
+			}
+		}
+		bail!("no escalation tool found")
 	}
 
-	fn runtime_dir(&self) -> Utf8PathBuf {
-		match &self.transport {
+	/// XDG_RUNTIME_DIR on the remote machine.
+	pub fn runtime_dir(&self) -> Utf8PathBuf {
+		match &self.0.transport {
 			Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),
 			Transport::Local { runtime_dir, .. } => runtime_dir.clone(),
 		}
 	}
 
-	pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {
-		match &self.transport {
+	/// Bind unix listener socket on the remote machine with auto-generated path, returning the path.
+	pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {
+		let sock = self
+			.runtime_dir()
+			.join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));
+		let listener = self.bind_unix(&sock).await?;
+		Ok((listener, sock))
+	}
+
+	/// Bind unix listener socket on the remote machine on the specified path.
+	pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {
+		match &self.0.transport {
 			Transport::Ssh { sess, subs, .. } => {
 				let (tx, rx) = oneshot::channel();
 				subs.lock().expect("lock").insert(path.to_owned(), tx);
@@ -537,6 +518,22 @@
 	}
 }
 
+fn drain_stderr(stream: DuplexStream, context: String) {
+	tokio::spawn(async move {
+		let mut reader = BufReader::new(stream).lines();
+		loop {
+			match reader.next_line().await {
+				Ok(Some(line)) => warn!(context = %context, "{line}"),
+				Ok(None) => break,
+				Err(e) => {
+					warn!(context = %context, "stderr read failed: {e}");
+					break;
+				}
+			}
+		}
+	});
+}
+
 fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
 	if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {
 		if !dir.is_empty() {
@@ -556,34 +553,9 @@
 	let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;
 	let dir = dir.trim();
 	if dir.is_empty() {
-		remote_mktemp(sess).await
+		let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;
+		Ok(Utf8PathBuf::from(tmp))
 	} else {
 		Ok(Utf8PathBuf::from(dir))
-	}
-}
-
-async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {
-	let mut cmd_chan = sess.channel_open_session().await?;
-	cmd_chan
-		.exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")
-		.await?;
-	let mut stdout = vec![];
-	loop {
-		let Some(msg) = cmd_chan.wait().await else {
-			bail!("unexpected channel end");
-		};
-		match msg {
-			russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),
-			russh::ChannelMsg::ExitStatus { exit_status } => {
-				if exit_status != 0 {
-					bail!("mktemp failed");
-				}
-				break;
-			}
-			_ => {}
-		}
 	}
-	ensure!(stdout.ends_with(b"\n"));
-	stdout.pop();
-	Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))
 }
deletedcrates/remowt-client/src/port.rsdiffbeforeafterboth
--- a/crates/remowt-client/src/port.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-use std::io;
-
-use bifrostlink::Port;
-use bytes::{Bytes, BytesMut};
-use russh::{Channel, ChannelStream};
-use russh::client::Msg;
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};
-use tokio::join;
-use tracing::error;
-
-async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
-	let len = srx.read_u32().await?;
-	let mut buf = BytesMut::zeroed(len as usize);
-	srx.read_exact(&mut buf).await?;
-	Ok(buf)
-}
-async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
-	stx.write_u32(value.len().try_into().expect("can't be larger"))
-		.await?;
-	stx.write_all(&value).await?;
-	Ok(())
-}
-
-pub fn channel_port(ch: Channel<Msg>) -> Port {
-	Port::new(move |mut rx, tx| async move {
-		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
-		let srx_task = async move {
-			loop {
-				match read(&mut srx).await {
-					Ok(buf) => {
-						if tx.send(buf.freeze()).is_err() {
-							break;
-						}
-					}
-					Err(e) => {
-						error!("channel read failed: {e}");
-						break;
-					}
-				}
-			}
-		};
-		let stx_task = async move {
-			while let Some(value) = rx.recv().await {
-				if let Err(e) = write(&mut stx, value).await {
-					error!("channel write failed: {e}");
-					break;
-				}
-			}
-		};
-		join!(srx_task, stx_task);
-	})
-}
modifiedcrates/remowt-client/src/shell.rsdiffbeforeafterboth
--- a/crates/remowt-client/src/shell.rs
+++ b/crates/remowt-client/src/shell.rs
@@ -3,7 +3,6 @@
 use bifrostlink::Remote;
 use remowt_endpoints::pty::{PtyClient, ShellId};
 use remowt_link_shared::{Address, BifConfig};
-use uuid::Uuid;
 
 use crate::forwarded::RemowtStream;
 use crate::Remowt;
@@ -38,15 +37,21 @@
 }
 
 impl Remowt {
-	pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {
-		let sock = self
-			.runtime_dir()
-			.join(format!("remowt-shell-{}.sock", Uuid::new_v4()));
-
-		let forwarded = self.forward_socket(&sock).await?;
-		let client: PtyClient<BifConfig> = self.endpoints();
+	pub async fn open_shell(
+		&self,
+		term: &str,
+		cols: u16,
+		rows: u16,
+		escalate: bool,
+	) -> Result<RemowtShell> {
+		let (forwarded, path) = self.bind_runtime_unix("shell").await?;
+		let client: PtyClient<BifConfig> = if escalate {
+			self.run0_endpoints().await?
+		} else {
+			self.endpoints()
+		};
 		let id = client
-			.open_shell(sock, term.to_owned(), cols, rows)
+			.open_shell(path, term.to_owned(), cols, rows)
 			.await?
 			.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
 		let stream = forwarded.accept().await?;
@@ -54,7 +59,7 @@
 		Ok(RemowtShell {
 			id,
 			stream,
-			remote: self.rpc.remote(Address::Agent),
+			remote: self.0.rpc.remote(Address::Agent),
 		})
 	}
 }
addedcrates/remowt-client/src/ssh_exec.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-client/src/ssh_exec.rs
@@ -0,0 +1,82 @@
+use bytes::Bytes;
+use russh::client::Msg;
+use russh::{Channel, ChannelMsg};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
+use tokio::sync::oneshot;
+
+const BUF: usize = 64 * 1024;
+
+pub(crate) struct SshExecChild {
+	pub stdin: DuplexStream,
+	pub stdout: DuplexStream,
+	pub stderr: DuplexStream,
+	pub exit: oneshot::Receiver<Option<u32>>,
+}
+
+impl SshExecChild {
+	/// Manage channel returned by russh exec().
+	pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
+		let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
+		let (mut out_w, stdout) = tokio::io::duplex(BUF);
+		let (mut err_w, stderr) = tokio::io::duplex(BUF);
+		let (exit_tx, exit) = oneshot::channel();
+
+		tokio::spawn(async move {
+			let (mut read, write) = ch.split();
+
+			let stdin_pump = tokio::spawn(async move {
+				let mut buf = vec![0u8; BUF];
+				loop {
+					match stdin_r.read(&mut buf).await {
+						Ok(0) | Err(_) => break,
+						Ok(n) => {
+							if write
+								.data_bytes(Bytes::copy_from_slice(&buf[..n]))
+								.await
+								.is_err()
+							{
+								return;
+							}
+						}
+					}
+				}
+				let _ = write.eof().await;
+			});
+
+			let mut code = None;
+			while let Some(msg) = read.wait().await {
+				match msg {
+					ChannelMsg::Data { data } => {
+						if out_w.write_all(&data).await.is_err() {
+							break;
+						}
+					}
+					ChannelMsg::ExtendedData { data, .. } => {
+						if err_w.write_all(&data).await.is_err() {
+							break;
+						}
+					}
+					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
+					_ => {}
+				}
+			}
+
+			stdin_pump.abort();
+			let _ = out_w.shutdown().await;
+			let _ = err_w.shutdown().await;
+			let _ = exit_tx.send(code);
+		});
+
+		SshExecChild {
+			stdin,
+			stdout,
+			stderr,
+			exit,
+		}
+	}
+
+	/// Wait for the process to finish, returning its exit status.
+	pub(crate) async fn wait(self) -> Option<u32> {
+		self.exit.await.ok().flatten()
+	}
+}
modifiedcrates/remowt-client/src/subprocess.rsdiffbeforeafterboth
--- a/crates/remowt-client/src/subprocess.rs
+++ b/crates/remowt-client/src/subprocess.rs
@@ -1,84 +1,430 @@
-use bytes::Bytes;
-use russh::client::Msg;
-use russh::{Channel, ChannelMsg};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
-use tokio::sync::oneshot;
+use std::pin::pin;
+
+use anyhow::{anyhow, bail, Result};
+use camino::Utf8PathBuf;
+use futures::StreamExt as _;
+use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
+use remowt_link_shared::BifConfig;
+use serde::de::DeserializeOwned;
+use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};
+use tokio::select;
+use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
+use tracing::{debug, info, warn};
+
+use crate::forwarded::{RemowtListener, RemowtStream};
+use crate::Remowt;
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
+pub enum StdioMode {
+	#[default]
+	Null,
+	Pipe,
+	Inherit,
+}
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
+pub enum StderrMode {
+	#[default]
+	Null,
+	Pipe,
+	Inherit,
+	MergeWithStdout,
+}
 
-const BUF: usize = 64 * 1024;
+#[derive(Default)]
+pub struct SpawnOptions {
+	pub program: String,
+	pub args: Vec<String>,
+	pub env: Vec<(String, String)>,
+	pub env_clear: bool,
+	pub cwd: Option<Utf8PathBuf>,
+	pub escalated: bool,
+	pub stdin: StdioMode,
+	pub stdout: StdioMode,
+	pub stderr: StderrMode,
+}
 
 pub struct RemowtChild {
-	pub stdin: DuplexStream,
-	pub stdout: DuplexStream,
-	pub stderr: DuplexStream,
-	pub exit: oneshot::Receiver<Option<u32>>,
+	pub stdin: Option<RemowtStream>,
+	pub stdout: Option<RemowtStream>,
+	pub stderr: Option<RemowtStream>,
+	id: ProcId,
+	client: SubprocessClient<BifConfig>,
 }
 
 impl RemowtChild {
-	/// Manage channel returned by russh exec().
-	pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
-		let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
-		let (mut out_w, stdout) = tokio::io::duplex(BUF);
-		let (mut err_w, stderr) = tokio::io::duplex(BUF);
-		let (exit_tx, exit) = oneshot::channel();
+	pub async fn wait(self) -> Result<Option<i32>> {
+		let RemowtChild {
+			stdin,
+			stdout,
+			stderr,
+			id,
+			client,
+		} = self;
+		drop(stdin);
+		drop(stdout);
+		drop(stderr);
+		client
+			.wait(id)
+			.await?
+			.map_err(|e| anyhow!("agent wait failed: {e}"))
+	}
+
+	pub async fn kill(&self, signal: i32) -> Result<()> {
+		self.client
+			.kill(self.id, signal)
+			.await?
+			.map_err(|e| anyhow!("agent kill failed: {e}"))
+	}
+}
+
+fn needs_socket(m: StdioMode) -> bool {
+	matches!(m, StdioMode::Pipe | StdioMode::Inherit)
+}
+
+fn stderr_needs_socket(m: StderrMode) -> bool {
+	matches!(m, StderrMode::Pipe | StderrMode::Inherit)
+}
+
+impl Remowt {
+	pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {
+		let SpawnOptions {
+			program,
+			args,
+			env,
+			env_clear,
+			cwd,
+			escalated,
+			stdin,
+			stdout,
+			stderr,
+		} = opts;
+
+		if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {
+			bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");
+		}
+
+		let stdin_bound = if needs_socket(stdin) {
+			Some(self.bind_runtime_unix("proc-stdin").await?)
+		} else {
+			None
+		};
+		let stdout_bound = if needs_socket(stdout) {
+			Some(self.bind_runtime_unix("proc-stdout").await?)
+		} else {
+			None
+		};
+		let stderr_bound = if stderr_needs_socket(stderr) {
+			Some(self.bind_runtime_unix("proc-stderr").await?)
+		} else {
+			None
+		};
+
+		let stdin_spec = match &stdin_bound {
+			Some((_, p)) => StdioSpec::Socket(p.clone()),
+			None => StdioSpec::Null,
+		};
+		let stdout_spec = match &stdout_bound {
+			Some((_, p)) => StdioSpec::Socket(p.clone()),
+			None => StdioSpec::Null,
+		};
+		let stderr_spec = match (&stderr, &stderr_bound) {
+			(StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),
+			(StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,
+			_ => StderrSpec::Null,
+		};
 
-		tokio::spawn(async move {
-			let (mut read, write) = ch.split();
+		let client: SubprocessClient<BifConfig> = if escalated {
+			// Boxed to break the async-fn type cycle
+			Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?
+		} else {
+			self.endpoints()
+		};
 
-			// Forward our stdin to the channel, signalling EOF when it closes.
-			let stdin_pump = tokio::spawn(async move {
-				let mut buf = vec![0u8; BUF];
-				loop {
-					match stdin_r.read(&mut buf).await {
-						Ok(0) | Err(_) => break,
-						Ok(n) => {
-							if write
-								.data_bytes(Bytes::copy_from_slice(&buf[..n]))
-								.await
-								.is_err()
-							{
-								return;
-							}
-						}
+		let spec = SpawnSpec {
+			program: program.clone(),
+			args,
+			env,
+			env_clear,
+			cwd,
+			stdin: stdin_spec,
+			stdout: stdout_spec,
+			stderr: stderr_spec,
+		};
+		let id = client
+			.spawn(spec)
+			.await?
+			.map_err(|e| anyhow!("agent spawn failed: {e}"))?;
+
+		let (stdin_res, stdout_res, stderr_res) = tokio::join!(
+			accept(stdin_bound),
+			accept(stdout_bound),
+			accept(stderr_bound),
+		);
+
+		let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
+		let stdout_stream = handle_output(stdout, stdout_res?, &program, false);
+		let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
+
+		Ok(RemowtChild {
+			stdin: stdin_stream,
+			stdout: stdout_stream,
+			stderr: stderr_stream,
+			id,
+			client,
+		})
+	}
+
+	pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {
+		let program = program.as_ref().to_owned();
+		RemowtCommand {
+			program,
+			args: vec![],
+			env: vec![],
+			remowt: self.clone(),
+			escalated: false,
+		}
+	}
+}
+
+async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {
+	match b {
+		Some((l, _)) => Ok(Some(l.accept().await?)),
+		None => Ok(None),
+	}
+}
+
+fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
+	match mode {
+		StdioMode::Pipe => s,
+		StdioMode::Inherit => {
+			if let Some(s) = s {
+				let program = program.to_owned();
+				tokio::spawn(async move {
+					let mut stdin = tokio::io::stdin();
+					let mut s = s;
+					if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {
+						warn!(program, "stdin forward ended: {e}");
 					}
-				}
-				let _ = write.eof().await;
-			});
+					let _ = s.shutdown().await;
+				});
+			}
+			None
+		}
+		StdioMode::Null => None,
+	}
+}
+
+fn handle_output(
+	mode: StdioMode,
+	s: Option<RemowtStream>,
+	program: &str,
+	is_stderr: bool,
+) -> Option<RemowtStream> {
+	match mode {
+		StdioMode::Pipe => s,
+		StdioMode::Inherit => {
+			if let Some(s) = s {
+				let program = program.to_owned();
+				tokio::spawn(pump_to_tracing(s, program, is_stderr));
+			}
+			None
+		}
+		StdioMode::Null => None,
+	}
+}
+
+fn handle_output_err(
+	mode: StderrMode,
+	s: Option<RemowtStream>,
+	program: &str,
+) -> Option<RemowtStream> {
+	match mode {
+		StderrMode::Pipe => s,
+		StderrMode::Inherit => {
+			if let Some(s) = s {
+				let program = program.to_owned();
+				tokio::spawn(pump_to_tracing(s, program, true));
+			}
+			None
+		}
+		StderrMode::MergeWithStdout | StderrMode::Null => None,
+	}
+}
 
-			let mut code = None;
-			while let Some(msg) = read.wait().await {
-				match msg {
-					ChannelMsg::Data { data } => {
-						if out_w.write_all(&data).await.is_err() {
-							break;
-						}
-					}
-					ChannelMsg::ExtendedData { data, .. } => {
-						if err_w.write_all(&data).await.is_err() {
-							break;
-						}
-					}
-					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
-					_ => {}
+async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {
+	let mut reader = BufReader::new(stream).lines();
+	loop {
+		match reader.next_line().await {
+			Ok(Some(line)) => {
+				if is_stderr {
+					warn!(program, "{line}");
+				} else {
+					info!(program, "{line}");
 				}
 			}
+			Ok(None) => break,
+			Err(e) => {
+				warn!(program, "child log stream error: {e}");
+				break;
+			}
+		}
+	}
+}
+
+fn escape_bash(input: &str, out: &mut String) {
+	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
+	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
+		out.push_str(input);
+		return;
+	}
+	out.push('\'');
+	for (i, v) in input.split('\'').enumerate() {
+		if i != 0 {
+			out.push_str("'\"'\"'");
+		}
+		out.push_str(v);
+	}
+	out.push('\'');
+}
+
+#[derive(Clone)]
+pub struct RemowtCommand {
+	program: String,
+	args: Vec<String>,
+	env: Vec<(String, String)>,
+	remowt: Remowt,
+	escalated: bool,
+}
+
+impl RemowtCommand {
+	pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
+		self.args.push(arg.as_ref().to_owned());
+		self
+	}
+	pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
+		for arg in args {
+			self.args.push(arg.as_ref().to_owned());
+		}
+		self
+	}
+	pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+		self.args
+			.push(format!("{}={}", key.as_ref(), value.as_ref()));
+		self
+	}
+	pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+		self.args.push(key.as_ref().to_owned());
+		self.args.push(value.as_ref().to_owned());
+		self
+	}
+	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+		self.env
+			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));
+		self
+	}
 
-			// The process is gone; stop waiting on stdin we'll never forward.
-			stdin_pump.abort();
-			let _ = out_w.shutdown().await;
-			let _ = err_w.shutdown().await;
-			let _ = exit_tx.send(code);
-		});
+	pub fn sudo(mut self) -> Self {
+		self.escalated = true;
+		self
+	}
 
-		RemowtChild {
-			stdin,
-			stdout,
-			stderr,
-			exit,
+	/// Only for display.
+	fn shell_line(&self) -> String {
+		let mut out = String::new();
+		if self.escalated {
+			out.push_str("run0 ");
+		}
+		if !self.env.is_empty() {
+			out.push_str("env");
+			for (k, v) in &self.env {
+				out.push(' ');
+				assert!(!k.contains('='));
+				escape_bash(k, &mut out);
+				out.push('=');
+				escape_bash(v, &mut out);
+			}
+			out.push(' ');
 		}
+		escape_bash(&self.program, &mut out);
+		for arg in &self.args {
+			out.push(' ');
+			escape_bash(arg, &mut out);
+		}
+		out
 	}
 
-	/// Wait for the process to finish, returning its exit status.
-	pub async fn wait(self) -> Option<u32> {
-		self.exit.await.ok().flatten()
+	fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {
+		let line = self.shell_line();
+		let opts = SpawnOptions {
+			program: self.program,
+			args: self.args,
+			env: self.env,
+			env_clear: false,
+			cwd: None,
+			escalated: self.escalated,
+			stdin: StdioMode::Null,
+			stdout: StdioMode::Pipe,
+			stderr: StderrMode::Pipe,
+		};
+		(self.remowt, opts, line)
+	}
+
+	pub async fn run(self) -> Result<()> {
+		run_inner(self, false).await.map(|_| ())
+	}
+	pub async fn run_string(self) -> Result<String> {
+		let bytes = run_inner(self, true).await?.expect("want_stdout");
+		Ok(String::from_utf8(bytes)?)
+	}
+	pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {
+		let s = self.run_string().await?;
+		Ok(serde_json::from_str(&s)?)
+	}
+}
+
+async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {
+	let (remowt, opts, line) = cmd.into_spawn_options();
+	debug!("running command {line:?} over remowt");
+	let program = opts.program.clone();
+	let mut child = remowt.spawn(opts).await?;
+	let stderr = child.stderr.take().expect("stderr=Pipe");
+	let stdout = child.stdout.take().expect("stdout=Pipe");
+
+	let mut err = FramedRead::new(stderr, LinesCodec::new());
+	let (mut out_bytes, mut out_lines) = if want_stdout {
+		(Some(FramedRead::new(stdout, BytesCodec::new())), None)
+	} else {
+		(None, Some(FramedRead::new(stdout, LinesCodec::new())))
+	};
+
+	let mut buf = if want_stdout { Some(Vec::new()) } else { None };
+
+	let mut wait = pin!(child.wait());
+	let exit = loop {
+		select! {
+			biased;
+
+			Some(e) = err.next() => {
+				let e = e?;
+				warn!(program = %program, "{e}");
+			}
+			Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {
+				buf.as_mut().expect("want_stdout").extend_from_slice(&o?);
+			}
+			Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {
+				let o = o?;
+				info!(program = %program, "{o}");
+			}
+			res = &mut wait => {
+				break res?;
+			}
+		}
+	};
+
+	match exit {
+		Some(0) => Ok(buf),
+		Some(c) => bail!("command '{line}' failed with status {c}"),
+		None => Err(anyhow!("command '{line}' ended without an exit status")),
 	}
 }
modifiedcrates/remowt-endpoints/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-endpoints/Cargo.toml
+++ b/crates/remowt-endpoints/Cargo.toml
@@ -16,5 +16,5 @@
 tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }
 tracing.workspace = true
 uuid.workspace = true
-nix = { workspace = true, features = ["process", "term"] }
+nix = { workspace = true, features = ["process", "signal", "term"] }
 zbus.workspace = true
modifiedcrates/remowt-endpoints/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-endpoints/src/lib.rs
+++ b/crates/remowt-endpoints/src/lib.rs
@@ -1,4 +1,5 @@
 pub mod fs;
 pub mod nix_daemon;
 pub mod pty;
+pub mod subprocess;
 pub mod systemd;
modifiedcrates/remowt-endpoints/src/pty.rsdiffbeforeafterboth
after · crates/remowt-endpoints/src/pty.rs
1use std::collections::HashMap;2use std::io;3use std::os::fd::{AsRawFd, OwnedFd};4use std::pin::Pin;5use std::process::Stdio;6use std::sync::atomic::{AtomicU64, Ordering};7use std::sync::{Arc, Mutex};8use std::task::{Context, Poll};910use bifrostlink::declarative::endpoints;11use bifrostlink::Config;12use camino::Utf8PathBuf;13use nix::libc;14use nix::pty::{openpty, OpenptyResult, Winsize};15use serde::{Deserialize, Serialize};16use tokio::io::unix::AsyncFd;17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};18use tokio::net::UnixStream;19use tracing::{debug, info, warn};2021pub type ShellId = u64;2223#[derive(Serialize, Deserialize, Debug, thiserror::Error)]24pub enum Error {25	#[error("openpty failed: {0}")]26	Open(String),27	#[error("failed to spawn shell: {0}")]28	Spawn(String),29	#[error("failed to connect to forwarded socket: {0}")]30	Connect(String),31	#[error("no shell with that id")]32	NoSuchShell,33	#[error("resize failed: {0}")]34	Resize(String),35	#[error("io error: {0}")]36	Io(String),37}3839impl From<io::Error> for Error {40	fn from(e: io::Error) -> Self {41		Error::Io(e.to_string())42	}43}4445#[derive(Clone, Default)]46pub struct Pty {47	shells: Arc<Mutex<HashMap<ShellId, OwnedFd>>>,48	next_id: Arc<AtomicU64>,49}5051impl Pty {52	pub fn new() -> Self {53		Self::default()54	}55}5657#[endpoints(ns = 7)]58impl Pty {59	#[endpoints(id = 1)]60	async fn open_shell(61		&self,62		socket_path: Utf8PathBuf,63		term: String,64		cols: u16,65		rows: u16,66	) -> Result<ShellId, Error> {67		let ws = Winsize {68			ws_row: rows,69			ws_col: cols,70			ws_xpixel: 0,71			ws_ypixel: 0,72		};73		let OpenptyResult { master, slave } =74			openpty(Some(&ws), None).map_err(|e| Error::Open(e.to_string()))?;7576		let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_owned());7778		let slave_in = slave.try_clone()?;79		let slave_out = slave.try_clone()?;80		let slave_err = slave;8182		let mut cmd = tokio::process::Command::new(&shell);83		cmd.env("TERM", &term);84		if let Ok(home) = std::env::var("HOME") {85			cmd.current_dir(home);86		}87		cmd.stdin(Stdio::from(slave_in));88		cmd.stdout(Stdio::from(slave_out));89		cmd.stderr(Stdio::from(slave_err));90		// SAFETY: only async-signal-safe calls (setsid, ioctl) before exec.91		unsafe {92			cmd.pre_exec(|| {93				nix::unistd::setsid().map_err(|e| io::Error::from_raw_os_error(e as i32))?;94				if libc::ioctl(0, libc::TIOCSCTTY as _, 0) < 0 {95					return Err(io::Error::last_os_error());96				}97				Ok(())98			});99		}100101		let mut child = cmd.spawn().map_err(|e| Error::Spawn(e.to_string()))?;102103		let resize_fd = master.try_clone()?;104		let id = self.next_id.fetch_add(1, Ordering::Relaxed);105		self.shells106			.lock()107			.expect("not poisoned")108			.insert(id, resize_fd);109110		let sock = match UnixStream::connect(&socket_path).await {111			Ok(s) => s,112			Err(e) => {113				self.shells.lock().expect("not poisoned").remove(&id);114				let _ = child.kill().await;115				return Err(Error::Connect(e.to_string()));116			}117		};118		let pty = AsyncPty::new(master)?;119120		debug!(id, shell, "shell opened");121		let shells = self.shells.clone();122		tokio::spawn(async move {123			let mut pty = pty;124			let mut sock = sock;125			if let Err(e) = tokio::io::copy_bidirectional(&mut pty, &mut sock).await {126				warn!(id, "shell pump ended: {e}");127			}128			let _ = child.kill().await;129			shells.lock().expect("not poisoned").remove(&id);130			info!(id, "shell closed");131		});132133		Ok(id)134	}135136	#[endpoints(id = 2)]137	async fn resize(&self, id: ShellId, cols: u16, rows: u16) -> Result<(), Error> {138		let ws = libc::winsize {139			ws_row: rows,140			ws_col: cols,141			ws_xpixel: 0,142			ws_ypixel: 0,143		};144		let shells = self.shells.lock().expect("not poisoned");145		let fd = shells.get(&id).ok_or(Error::NoSuchShell)?;146		// SAFETY: `fd` is a live PTY master147		let rc = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ as _, &ws) };148		if rc < 0 {149			return Err(Error::Resize(io::Error::last_os_error().to_string()));150		}151		Ok(())152	}153}154155struct AsyncPty {156	fd: AsyncFd<OwnedFd>,157}158159impl AsyncPty {160	fn new(fd: OwnedFd) -> io::Result<Self> {161		let raw = fd.as_raw_fd();162		// SAFETY: standard F_GETFL/F_SETFL round-trip on a valid fd.163		unsafe {164			let flags = libc::fcntl(raw, libc::F_GETFL);165			if flags < 0 {166				return Err(io::Error::last_os_error());167			}168			if libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {169				return Err(io::Error::last_os_error());170			}171		}172		Ok(Self {173			fd: AsyncFd::new(fd)?,174		})175	}176}177178impl AsyncRead for AsyncPty {179	fn poll_read(180		self: Pin<&mut Self>,181		cx: &mut Context<'_>,182		buf: &mut ReadBuf<'_>,183	) -> Poll<io::Result<()>> {184		let this = self.get_mut();185		loop {186			let mut guard = match this.fd.poll_read_ready(cx) {187				Poll::Ready(Ok(g)) => g,188				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),189				Poll::Pending => return Poll::Pending,190			};191			let unfilled = buf.initialize_unfilled();192			let res = guard.try_io(|inner| {193				let fd = inner.get_ref().as_raw_fd();194				// SAFETY: writing into `unfilled`'s own backing storage.195				let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };196				if n < 0 {197					let err = io::Error::last_os_error();198					if err.raw_os_error() == Some(libc::EIO) {199						Ok(0)200					} else {201						Err(err)202					}203				} else {204					Ok(n as usize)205				}206			});207			match res {208				Ok(Ok(n)) => {209					buf.advance(n);210					return Poll::Ready(Ok(()));211				}212				Ok(Err(e)) => return Poll::Ready(Err(e)),213				Err(_would_block) => continue,214			}215		}216	}217}218219impl AsyncWrite for AsyncPty {220	fn poll_write(221		self: Pin<&mut Self>,222		cx: &mut Context<'_>,223		buf: &[u8],224	) -> Poll<io::Result<usize>> {225		let this = self.get_mut();226		loop {227			let mut guard = match this.fd.poll_write_ready(cx) {228				Poll::Ready(Ok(g)) => g,229				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),230				Poll::Pending => return Poll::Pending,231			};232			let res = guard.try_io(|inner| {233				let fd = inner.get_ref().as_raw_fd();234				// SAFETY: reading from `buf` for `buf.len()` bytes.235				let n = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) };236				if n < 0 {237					Err(io::Error::last_os_error())238				} else {239					Ok(n as usize)240				}241			});242			match res {243				Ok(r) => return Poll::Ready(r),244				Err(_would_block) => continue,245			}246		}247	}248249	fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {250		Poll::Ready(Ok(()))251	}252253	fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {254		Poll::Ready(Ok(()))255	}256}
addedcrates/remowt-endpoints/src/subprocess.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-endpoints/src/subprocess.rs
@@ -0,0 +1,278 @@
+use std::collections::HashMap;
+use std::io;
+use std::process::Stdio;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{Arc, Mutex};
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use camino::Utf8PathBuf;
+use nix::sys::signal::{self, Signal};
+use nix::unistd::Pid;
+use serde::{Deserialize, Serialize};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
+use tokio::net::UnixStream;
+use tokio::process::{ChildStderr, ChildStdout, Command};
+use tokio::sync::{mpsc, watch};
+use tracing::{debug, warn};
+
+pub type ProcId = u64;
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum StdioSpec {
+	Null,
+	Socket(Utf8PathBuf),
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum StderrSpec {
+	Null,
+	Socket(Utf8PathBuf),
+	MergeWithStdout,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct SpawnSpec {
+	pub program: String,
+	pub args: Vec<String>,
+	pub env: Vec<(String, String)>,
+	pub env_clear: bool,
+	pub cwd: Option<Utf8PathBuf>,
+	pub stdin: StdioSpec,
+	pub stdout: StdioSpec,
+	pub stderr: StderrSpec,
+}
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+	#[error("spawn failed: {0}")]
+	Spawn(String),
+	#[error("connect to forwarded socket failed: {0}")]
+	Connect(String),
+	#[error("no process with that id")]
+	NoSuchProcess,
+	#[error("MergeWithStdout requires stdout=Socket")]
+	BadMerge,
+	#[error("invalid signal: {0}")]
+	BadSignal(i32),
+	#[error("kill failed: {0}")]
+	Kill(String),
+	#[error("io error: {0}")]
+	Io(String),
+}
+
+impl From<io::Error> for Error {
+	fn from(e: io::Error) -> Self {
+		Error::Io(e.to_string())
+	}
+}
+
+struct ChildState {
+	pid: u32,
+	exit_rx: watch::Receiver<Option<Option<i32>>>,
+}
+
+#[derive(Clone, Default)]
+pub struct Subprocess {
+	children: Arc<Mutex<HashMap<ProcId, ChildState>>>,
+	next_id: Arc<AtomicU64>,
+}
+
+impl Subprocess {
+	pub fn new() -> Self {
+		Self::default()
+	}
+}
+
+#[endpoints(ns = 10)]
+impl Subprocess {
+	#[endpoints(id = 1)]
+	async fn spawn(&self, spec: SpawnSpec) -> Result<ProcId, Error> {
+		let SpawnSpec {
+			program,
+			args,
+			env,
+			env_clear,
+			cwd,
+			stdin,
+			stdout,
+			stderr,
+		} = spec;
+
+		if matches!(stderr, StderrSpec::MergeWithStdout) && !matches!(stdout, StdioSpec::Socket(_))
+		{
+			return Err(Error::BadMerge);
+		}
+
+		let mut cmd = Command::new(&program);
+		cmd.args(&args);
+		if env_clear {
+			cmd.env_clear();
+		}
+		for (k, v) in &env {
+			cmd.env(k, v);
+		}
+		if let Some(cwd) = &cwd {
+			cmd.current_dir(cwd);
+		}
+		cmd.stdin(match &stdin {
+			StdioSpec::Socket(_) => Stdio::piped(),
+			StdioSpec::Null => Stdio::null(),
+		});
+		cmd.stdout(match &stdout {
+			StdioSpec::Socket(_) => Stdio::piped(),
+			StdioSpec::Null => Stdio::null(),
+		});
+		cmd.stderr(match &stderr {
+			StderrSpec::Socket(_) | StderrSpec::MergeWithStdout => Stdio::piped(),
+			StderrSpec::Null => Stdio::null(),
+		});
+		cmd.kill_on_drop(false);
+
+		let mut child = cmd.spawn().map_err(|e| Error::Spawn(e.to_string()))?;
+		let pid = child
+			.id()
+			.ok_or_else(|| Error::Spawn("child exited before pid available".to_owned()))?;
+
+		if let StdioSpec::Socket(path) = &stdin {
+			let sock = UnixStream::connect(path)
+				.await
+				.map_err(|e| Error::Connect(e.to_string()))?;
+			let mut stdin_w = child.stdin.take().expect("piped");
+			tokio::spawn(async move {
+				let (mut sr, _) = tokio::io::split(sock);
+				let _ = tokio::io::copy(&mut sr, &mut stdin_w).await;
+				let _ = stdin_w.shutdown().await;
+			});
+		}
+
+		let stdout_handle = child.stdout.take();
+		let stderr_handle = child.stderr.take();
+
+		match (&stdout, &stderr, stdout_handle, stderr_handle) {
+			(StdioSpec::Socket(out_path), StderrSpec::MergeWithStdout, Some(out), Some(err)) => {
+				let sock = UnixStream::connect(out_path)
+					.await
+					.map_err(|e| Error::Connect(e.to_string()))?;
+				tokio::spawn(merge_to_sock(out, err, sock));
+			}
+			(StdioSpec::Socket(out_path), _, Some(out), err_opt) => {
+				let sock = UnixStream::connect(out_path)
+					.await
+					.map_err(|e| Error::Connect(e.to_string()))?;
+				tokio::spawn(pump_to_sock(out, sock));
+				if let (StderrSpec::Socket(err_path), Some(err)) = (&stderr, err_opt) {
+					let err_sock = UnixStream::connect(err_path)
+						.await
+						.map_err(|e| Error::Connect(e.to_string()))?;
+					tokio::spawn(pump_to_sock(err, err_sock));
+				}
+			}
+			(StdioSpec::Null, StderrSpec::Socket(err_path), _, Some(err)) => {
+				let sock = UnixStream::connect(err_path)
+					.await
+					.map_err(|e| Error::Connect(e.to_string()))?;
+				tokio::spawn(pump_to_sock(err, sock));
+			}
+			_ => {}
+		}
+
+		let (exit_tx, exit_rx) = watch::channel(None);
+		let id = self.next_id.fetch_add(1, Ordering::Relaxed);
+		self.children
+			.lock()
+			.expect("not poisoned")
+			.insert(id, ChildState { pid, exit_rx });
+
+		debug!(id, pid, program, "subprocess spawned");
+		tokio::spawn(async move {
+			let result = child.wait().await;
+			let code = match result {
+				Ok(status) => status.code(),
+				Err(e) => {
+					warn!(id, "child.wait failed: {e}");
+					None
+				}
+			};
+			let _ = exit_tx.send(Some(code));
+		});
+
+		Ok(id)
+	}
+
+	#[endpoints(id = 2)]
+	async fn wait(&self, id: ProcId) -> Result<Option<i32>, Error> {
+		let mut rx = {
+			let map = self.children.lock().expect("not poisoned");
+			let entry = map.get(&id).ok_or(Error::NoSuchProcess)?;
+			entry.exit_rx.clone()
+		};
+		rx.wait_for(|v| v.is_some())
+			.await
+			.map_err(|_| Error::Io("exit channel closed".to_owned()))?;
+		let code = rx.borrow().flatten();
+		self.children.lock().expect("not poisoned").remove(&id);
+		Ok(code)
+	}
+
+	#[endpoints(id = 3)]
+	async fn kill(&self, id: ProcId, signal: i32) -> Result<(), Error> {
+		let pid = {
+			let map = self.children.lock().expect("not poisoned");
+			let entry = map.get(&id).ok_or(Error::NoSuchProcess)?;
+			entry.pid
+		};
+		let sig = Signal::try_from(signal).map_err(|_| Error::BadSignal(signal))?;
+		signal::kill(Pid::from_raw(pid as i32), sig).map_err(|e| Error::Kill(e.to_string()))?;
+		Ok(())
+	}
+}
+
+async fn pump_to_sock<R>(mut from: R, sock: UnixStream)
+where
+	R: tokio::io::AsyncRead + Unpin,
+{
+	let (_, mut sw) = tokio::io::split(sock);
+	let _ = tokio::io::copy(&mut from, &mut sw).await;
+	let _ = sw.shutdown().await;
+}
+
+async fn merge_to_sock(mut stdout: ChildStdout, mut stderr: ChildStderr, sock: UnixStream) {
+	let (_, mut sw) = tokio::io::split(sock);
+	let (tx, mut rx) = mpsc::channel::<Vec<u8>>(64);
+	let tx_out = tx.clone();
+	let out_pump = tokio::spawn(async move {
+		let mut buf = vec![0u8; 4096];
+		loop {
+			match stdout.read(&mut buf).await {
+				Ok(0) | Err(_) => break,
+				Ok(n) => {
+					if tx_out.send(buf[..n].to_vec()).await.is_err() {
+						break;
+					}
+				}
+			}
+		}
+	});
+	let err_pump = tokio::spawn(async move {
+		let mut buf = vec![0u8; 4096];
+		loop {
+			match stderr.read(&mut buf).await {
+				Ok(0) | Err(_) => break,
+				Ok(n) => {
+					if tx.send(buf[..n].to_vec()).await.is_err() {
+						break;
+					}
+				}
+			}
+		}
+	});
+	while let Some(chunk) = rx.recv().await {
+		if sw.write_all(&chunk).await.is_err() {
+			break;
+		}
+	}
+	let _ = out_pump.await;
+	let _ = err_pump.await;
+	let _ = sw.shutdown().await;
+}