git.delta.rocks / remowt / refs/commits / 7e5126608cef

difftreelog

feat(client) proper support for local

uzyqskstYaroslav Bolyukin2026-06-13parent: #600e6ed.patch.diff
in: trunk

16 files changed

modified.gitignorediffbeforeafterboth
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
 /target
 /.direnv
+/result
+/result-*
modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2096,6 +2096,7 @@
  "russh-config",
  "serde",
  "serde_json",
+ "tempfile",
  "tokio",
  "tracing",
  "uuid",
@@ -2131,6 +2132,7 @@
  "serde_json",
  "thiserror",
  "tokio",
+ "tracing",
 ]
 
 [[package]]
@@ -2140,7 +2142,6 @@
  "anyhow",
  "bifrostlink",
  "bifrostlink-ports",
- "bytes",
  "remowt-link-shared",
  "serde_json",
  "tokio",
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -225,8 +225,6 @@
 }
 
 fn main() -> anyhow::Result<()> {
-	// Log to stderr: `privileged-agent` uses stdout as the bifrost transport,
-	// so anything written there would corrupt the stream.
 	tracing_subscriber::fmt()
 		.with_writer(std::io::stderr)
 		.without_time()
modifiedcmds/remowt-ssh/Cargo.tomldiffbeforeafterboth
--- a/cmds/remowt-ssh/Cargo.toml
+++ b/cmds/remowt-ssh/Cargo.toml
@@ -11,8 +11,15 @@
 tracing-subscriber.workspace = true
 bifrostlink.workspace = true
 remowt-link-shared.workspace = true
-remowt-client.workspace = true
-tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }
+remowt-client = { workspace = true, features = ["shell"] }
+tokio = { workspace = true, features = [
+	"macros",
+	"fs",
+	"net",
+	"io-util",
+	"rt",
+	"signal",
+] }
 nix = { workspace = true, features = ["term"] }
 anyhow.workspace = true
 bifrostlink-ports.workspace = true
modifiedcmds/remowt-ssh/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -19,11 +19,14 @@
 use tokio::io::unix::AsyncFd;
 use tokio::io::{AsyncRead, ReadBuf};
 use tokio::signal::unix::{signal, SignalKind};
-use tracing::info;
+use tracing::debug;
 
 #[derive(Parser)]
-struct Opts {
-	host: String,
+enum Opts {
+	/// Connect to remote host with remowt agent.
+	Ssh { host: String },
+	/// Connect to local host for testing the connectivity.
+	Local,
 }
 
 fn agents_dir() -> anyhow::Result<PathBuf> {
@@ -35,18 +38,27 @@
 
 #[tokio::main(flavor = "current_thread")]
 async fn main() -> anyhow::Result<()> {
-	tracing_subscriber::fmt::init();
+	tracing_subscriber::fmt()
+		.with_writer(std::io::stderr)
+		.without_time()
+		.init();
 	let opts = Opts::parse();
 
 	let bundle = AgentBundle::from_dir(agents_dir()?)?;
-	let conn = Remowt::connect(&opts.host, &bundle).await?;
+	let conn = match &opts {
+		Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,
+		Opts::Local => Remowt::connect_local(&bundle).await?,
+	};
 	let mut rpc = conn.rpc();
 
 	serve_prompts(
 		&mut rpc,
 		PrependSourcePrompter {
 			prompter: RofiPrompter,
-			source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],
+			source: match opts {
+				Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
+				Opts::Local => vec![],
+			},
 			description: "".to_owned(),
 		},
 	);
@@ -54,9 +66,9 @@
 		serve_editor(&mut rpc, SshEditor { sess });
 	}
 
-	info!("entering shell");
+	debug!("entering shell");
 	run_shell(&conn).await?;
-	info!("shell ended");
+	debug!("shell ended");
 
 	Ok(())
 }
modifiedcrates/remowt-client/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -16,7 +16,18 @@
 remowt-link-shared.workspace = true
 russh.workspace = true
 russh-config.workspace = true
-tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }
+tempfile.workspace = true
+tokio = { workspace = true, features = [
+	"net",
+	"io-util",
+	"rt",
+	"sync",
+	"macros",
+	"process",
+] }
 tracing.workspace = true
 uuid = { workspace = true, features = ["v4"] }
-remowt-endpoints.workspace = true
+remowt-endpoints = { workspace = true, optional = true }
+
+[features]
+shell = ["dep:remowt-endpoints"]
addedcrates/remowt-client/src/forwarded.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-client/src/forwarded.rs
@@ -0,0 +1,79 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Result};
+use camino::Utf8PathBuf;
+use russh::client::Msg;
+use russh::{Channel, ChannelStream};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::net::{UnixListener, UnixStream};
+use tokio::sync::oneshot;
+
+pub enum RemowtListener {
+	Ssh(oneshot::Receiver<Channel<Msg>>),
+	Local(UnixListener, Utf8PathBuf),
+}
+
+impl RemowtListener {
+	pub async fn accept(self) -> Result<RemowtStream> {
+		match self {
+			RemowtListener::Ssh(rx) => {
+				let ch = rx
+					.await
+					.map_err(|_| anyhow!("agent never connected the forwarded socket"))?;
+				Ok(RemowtStream::Ssh(ch.into_stream()))
+			}
+			RemowtListener::Local(listener, path) => {
+				let (stream, _) = listener.accept().await?;
+				let _ = std::fs::remove_file(&path);
+				Ok(RemowtStream::Local(stream))
+			}
+		}
+	}
+}
+
+pub enum RemowtStream {
+	Ssh(ChannelStream<Msg>),
+	Local(UnixStream),
+}
+
+impl AsyncRead for RemowtStream {
+	fn poll_read(
+		self: Pin<&mut Self>,
+		cx: &mut Context<'_>,
+		buf: &mut ReadBuf<'_>,
+	) -> Poll<io::Result<()>> {
+		match self.get_mut() {
+			RemowtStream::Ssh(s) => Pin::new(s).poll_read(cx, buf),
+			RemowtStream::Local(s) => Pin::new(s).poll_read(cx, buf),
+		}
+	}
+}
+
+impl AsyncWrite for RemowtStream {
+	fn poll_write(
+		self: Pin<&mut Self>,
+		cx: &mut Context<'_>,
+		buf: &[u8],
+	) -> Poll<io::Result<usize>> {
+		match self.get_mut() {
+			RemowtStream::Ssh(s) => Pin::new(s).poll_write(cx, buf),
+			RemowtStream::Local(s) => Pin::new(s).poll_write(cx, buf),
+		}
+	}
+
+	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+		match self.get_mut() {
+			RemowtStream::Ssh(s) => Pin::new(s).poll_flush(cx),
+			RemowtStream::Local(s) => Pin::new(s).poll_flush(cx),
+		}
+	}
+
+	fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+		match self.get_mut() {
+			RemowtStream::Ssh(s) => Pin::new(s).poll_shutdown(cx),
+			RemowtStream::Local(s) => Pin::new(s).poll_shutdown(cx),
+		}
+	}
+}
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
after · crates/remowt-client/src/lib.rs
1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use bifrostlink_ports::unix_socket::from_socket;10use camino::{Utf8Path, Utf8PathBuf};11use remowt_link_shared::plugin::PluginEndpointsClient;12use remowt_link_shared::port::child_port;13use remowt_link_shared::{Address, BifConfig};14use russh::client::{connect, Config, Handle, Handler, Msg, Session};15use russh::keys::agent::client::AgentClient;16use russh::keys::agent::AgentIdentity;17use russh::keys::check_known_hosts;18use russh::keys::ssh_key::PublicKey;19use russh::Channel;20use tempfile::TempDir;21use tokio::net::UnixListener;22use tokio::sync::oneshot::{self, channel};23use tokio::{24	fs,25	io::{AsyncReadExt as _, AsyncWriteExt as _},26};27use tracing::{debug, error};28use uuid::Uuid;2930use self::port::channel_port;31use self::subprocess::RemowtChild;3233pub mod editor;34mod forwarded;35mod port;36#[cfg(feature = "shell")]37mod shell;38mod subprocess;3940pub use forwarded::{RemowtListener, RemowtStream};41#[cfg(feature = "shell")]42pub use shell::{RemowtShell, RemowtShellResizer};4344type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4546fn sh_quote(s: impl AsRef<str>) -> String {47	format!("'{}'", s.as_ref().replace('\'', "'\\''"))48}4950const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];5152pub struct AgentBundle {53	dir: PathBuf,54	hashes: HashMap<String, String>,55}5657impl AgentBundle {58	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {59		let dir = dir.into();60		let hashes_path = dir.join("hashes");61		let raw = std::fs::read_to_string(&hashes_path)62			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;63		let mut hashes = HashMap::new();64		for line in raw.lines() {65			let line = line.trim();66			if line.is_empty() {67				continue;68			}69			let (arch, hash) = line70				.split_once(char::is_whitespace)71				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;72			hashes.insert(arch.to_owned(), hash.trim().to_owned());73		}74		ensure!(75			!hashes.is_empty(),76			"agent bundle {} has no hashes",77			dir.display()78		);79		Ok(Self { dir, hashes })80	}8182	fn binary(&self, arch: &str) -> PathBuf {83		self.dir.join(format!("remowt-agent-{arch}"))84	}8586	fn local_binary(&self) -> Result<PathBuf> {87		let arch = env::consts::ARCH;88		let path = self.binary(arch);89		ensure!(90			path.is_file(),91			"no local remowt-agent build for arch {arch} in bundle {}",92			self.dir.display()93		);94		Ok(path)95	}96}9798async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {99	let ch = sess.channel_open_session().await?;100	ch.exec(true, cmd).await?;101102	let mut child = RemowtChild::from_exec(ch);103	drop(child.stdin);104105	let mut out = Vec::new();106	let mut err = Vec::new();107	tokio::try_join!(108		child.stdout.read_to_end(&mut out),109		child.stderr.read_to_end(&mut err),110	)?;111	if !err.is_empty() {112		error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());113	}114	let code = child.exit.await.ok().flatten();115	Ok((code, out))116}117118async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {119	let (code, mut out) = run(sess, cmd).await?;120	ensure!(121		code == Some(0),122		"remote command failed (exit {code:?}): {cmd}"123	);124	if !out.is_empty() {125		ensure!(126			out.ends_with(b"\n"),127			"remote command was not newline-terminated: {cmd}: {out:?}"128		);129		out.pop();130	}131	String::from_utf8(out).context("expected utf8 output for command")132}133134async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {135	debug!("uname -a");136	let arch = run_string_ok(sess, "uname -m").await?;137	let hash = bundle138		.hashes139		.get(&arch)140		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;141142	debug!("get dir");143	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")144		.await?145		.to_owned();146	let dir = if cache.is_empty() {147		let home = run_string_ok(sess, "echo \"$HOME\"").await?;148		ensure!(149			!home.is_empty(),150			"remote $HOME and $XDG_CACHE_HOME both empty"151		);152		Utf8PathBuf::from(home).join(".cache/remowt")153	} else {154		Utf8PathBuf::from(cache).join("remowt")155	};156	let path = dir.join(hash);157158	debug!("presence");159	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;160	if present != Some(0) {161		let bin = bundle.binary(&arch);162		debug!("read");163		let bytes = fs::read(&bin)164			.await165			.with_context(|| format!("reading agent binary {}", bin.display()))?;166		debug!("upload");167		upload_agent(sess, &dir, &path, bytes).await?;168	}169	Ok(path)170}171172async fn upload_agent(173	sess: &Handle<SshHandler>,174	dir: &Utf8Path,175	path: &Utf8Path,176	bytes: Vec<u8>,177) -> Result<()> {178	debug!("mkdirp");179	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;180181	let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));182	let ch = sess.channel_open_session().await?;183	debug!("cat");184	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;185186	let mut child = RemowtChild::from_exec(ch);187	child188		.stdin189		.write_all(&bytes)190		.await191		.context("sending agent binary")?;192	child193		.stdin194		.shutdown()195		.await196		.context("sending agent binary")?;197	let code = child.wait().await;198	ensure!(code == Some(0), "agent upload failed (exit {code:?})");199200	debug!("chmod");201	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;202	run_string_ok(203		sess,204		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),205	)206	.await?;207	Ok(())208}209210async fn detect_escalation(211	sess: &Handle<SshHandler>,212) -> Result<(&'static str, &'static [&'static str])> {213	for (tool, flags) in ESCALATORS {214		// `tool` is a fixed identifier (no metacharacters), safe to interpolate.215		let (code, _) = run(sess, &format!("command -v {tool}")).await?;216		if code == Some(0) {217			return Ok((tool, flags));218		}219	}220	bail!("no escalation tool found on remote")221}222223fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {224	let mut parts = vec![tool.to_owned()];225	parts.extend(flags.iter().map(|f| f.to_string()));226	parts.push(sh_quote(agent_path));227	parts.push("real-agent".to_owned());228	parts.push("--privileged".to_owned());229	if let Some(p) = path {230		parts.push("--path".to_owned());231		parts.push(sh_quote(p));232	}233	parts.join(" ")234}235236fn find_in_path(name: &str) -> Option<std::path::PathBuf> {237	let path = env::var_os("PATH")?;238	env::split_paths(&path)239		.map(|dir| dir.join(name))240		.find(|p| p.is_file())241}242243pub struct SshHandler {244	host: String,245	port: u16,246	subs: Subs,247}248impl Handler for SshHandler {249	type Error = russh::Error;250	async fn check_server_key(251		&mut self,252		server_public_key: &PublicKey,253	) -> Result<bool, Self::Error> {254		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)255	}256	async fn server_channel_open_forwarded_streamlocal(257		&mut self,258		channel: Channel<Msg>,259		socket_path: &str,260		_session: &mut Session,261	) -> Result<(), Self::Error> {262		let Some(ch) = self263			.subs264			.lock()265			.expect("lock")266			.remove(&Utf8PathBuf::from(socket_path))267		else {268			return Err(russh::Error::WrongChannel);269		};270		let _ = ch.send(channel);271		Ok(())272	}273}274275enum Transport {276	Ssh {277		sess: Arc<Handle<SshHandler>>,278		subs: Subs,279		runtime_dir: Utf8PathBuf,280		agent_path: Utf8PathBuf,281	},282	Local {283		agent_path: PathBuf,284		runtime_dir: Utf8PathBuf,285	},286}287288pub struct Remowt {289	transport: Transport,290	rpc: Rpc<BifConfig>,291	elevated: tokio::sync::OnceCell<()>,292	children: Mutex<Vec<tokio::process::Child>>,293	_runtime_tmp: Option<TempDir>,294}295296pub type RemowtRemote = Remote<BifConfig>;297298impl Remowt {299	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {300		let conf = russh_config::parse_home(host)?;301		let port = conf.host_config.port.or(conf.port).unwrap_or(22);302		let hostname = conf303			.host_config304			.hostname305			.clone()306			.unwrap_or_else(|| conf.host_name.clone());307		let user = conf308			.user309			.clone()310			.unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));311312		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));313		let mut sess = connect(314			Arc::new(Config::default()),315			(hostname.clone(), port),316			SshHandler {317				host: hostname,318				port,319				subs: subs.clone(),320			},321		)322		.await?;323324		let mut agent = AgentClient::connect_env().await?;325		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();326		let mut authenticated = false;327		for ident in agent.request_identities().await? {328			let AgentIdentity::PublicKey { key, .. } = ident else {329				continue;330			};331			if sess332				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)333				.await?334				.success()335			{336				authenticated = true;337				break;338			}339		}340		ensure!(authenticated, "ssh authentication failed");341342		let sess = Arc::new(sess);343344		debug!("deploying agent");345		let agent_path = deploy_agent(&sess, bundle).await?;346347		debug!("runtime dir");348		let runtime_dir = remote_runtime_dir(&sess).await?;349		let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));350351		let (onetx, onerx) = channel();352		subs.lock().expect("lock").insert(primary.clone(), onetx);353		sess.streamlocal_forward(primary.clone()).await?;354355		let rpc = Rpc::<BifConfig>::new(Address::User);356357		// TODO: ensure no injection is possible in the socket path.358		let cmd_chan = sess.channel_open_session().await?;359		debug!("starting agent");360		cmd_chan361			.exec(362				true,363				format!(364					"{} real-agent --path={}",365					sh_quote(&agent_path),366					sh_quote(&primary)367				),368			)369			.await?;370371		let port = channel_port(372			onerx373				.await374				.map_err(|_| anyhow!("agent never opened its channel"))?,375		);376		rpc.add_direct(Address::Agent, port, Rtt(0));377378		Ok(Self {379			transport: Transport::Ssh {380				sess,381				subs,382				runtime_dir,383				agent_path,384			},385			rpc,386			elevated: tokio::sync::OnceCell::new(),387			children: Mutex::new(Vec::new()),388			_runtime_tmp: None,389		})390	}391392	pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {393		let agent_path = bundle.local_binary()?;394		let mut child = tokio::process::Command::new(&agent_path)395			.arg("real-agent")396			.stdin(std::process::Stdio::piped())397			.stdout(std::process::Stdio::piped())398			.kill_on_drop(true)399			.spawn()400			.with_context(|| format!("spawning agent binary {}", agent_path.display()))?;401		let stdin = child.stdin.take().expect("stdin piped");402		let stdout = child.stdout.take().expect("stdout piped");403404		let rpc = Rpc::<BifConfig>::new(Address::User);405		rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));406407		let (runtime_dir, runtime_tmp) = local_runtime_dir()?;408409		Ok(Self {410			transport: Transport::Local {411				agent_path,412				runtime_dir,413			},414			rpc,415			elevated: tokio::sync::OnceCell::new(),416			children: Mutex::new(vec![child]),417			_runtime_tmp: runtime_tmp,418		})419	}420421	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {422		match &self.transport {423			Transport::Ssh { sess, .. } => Some(sess.clone()),424			Transport::Local { .. } => None,425		}426	}427428	pub fn rpc(&self) -> Rpc<BifConfig> {429		self.rpc.clone()430	}431432	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {433		R::wrap(self.rpc.remote(Address::Agent))434	}435436	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {437		let client: PluginEndpointsClient<BifConfig> = self.endpoints();438		client439			.load_plugin(id, name.to_owned())440			.await?441			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))442	}443	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {444		self.ensure_elevated().await?;445		let client: PluginEndpointsClient<BifConfig> =446			PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));447		client448			.load_plugin_path(id, path.to_owned())449			.await?450			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))451	}452	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {453		R::wrap(self.rpc.remote(Address::Plugin(id)))454	}455	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {456		self.ensure_elevated().await?;457		Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))458	}459460	async fn ensure_elevated(&self) -> Result<()> {461		self.elevated462			.get_or_try_init(|| async {463				let port = match &self.transport {464					Transport::Ssh {465						sess, agent_path, ..466					} => {467						let (tool, flags) = detect_escalation(sess).await?;468						let ch = sess.channel_open_session().await?;469						ch.exec(true, privileged_cmd(tool, flags, agent_path, None))470							.await?;471						channel_port(ch)472					}473					Transport::Local { agent_path, .. } => {474						let sock = self475							.runtime_dir()476							.join(format!("remowt-priv-{}.sock", Uuid::new_v4()));477						let _ = std::fs::remove_file(&sock);478						let listener = UnixListener::bind(&sock)?;479						let (tool, flags) = ESCALATORS480							.iter()481							.find(|(t, _)| find_in_path(t).is_some())482							.ok_or_else(|| anyhow!("no escalation tool found"))?;483						let child = tokio::process::Command::new(tool)484							.args(*flags)485							.arg(agent_path)486							.arg("real-agent")487							.arg("--privileged")488							.arg("--path")489							.arg(sock.as_str())490							.kill_on_drop(true)491							.spawn()?;492						self.children.lock().expect("lock").push(child);493						let (stream, _) = listener.accept().await?;494						let _ = std::fs::remove_file(&sock);495						from_socket(stream)496					}497				};498				self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));499				anyhow::Ok(())500			})501			.await?;502		Ok(())503	}504505	pub async fn exec(&self, command: String) -> Result<RemowtChild> {506		let Some(sess) = self.ssh() else {507			bail!("exec should not be called on local")508		};509		let ch = sess.channel_open_session().await?;510		ch.exec(true, command).await?;511		Ok(RemowtChild::from_exec(ch))512	}513514	fn runtime_dir(&self) -> Utf8PathBuf {515		match &self.transport {516			Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),517			Transport::Local { runtime_dir, .. } => runtime_dir.clone(),518		}519	}520521	pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {522		match &self.transport {523			Transport::Ssh { sess, subs, .. } => {524				let (tx, rx) = oneshot::channel();525				subs.lock().expect("lock").insert(path.to_owned(), tx);526				sess.streamlocal_forward(path.to_owned()).await?;527				Ok(RemowtListener::Ssh(rx))528			}529			Transport::Local { .. } => {530				let _ = std::fs::remove_file(path);531				Ok(RemowtListener::Local(532					UnixListener::bind(path)?,533					path.to_owned(),534				))535			}536		}537	}538}539540fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {541	if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {542		if !dir.is_empty() {543			return Ok((Utf8PathBuf::from(dir), None));544		}545	}546	let tmp = tempfile::Builder::new()547		.prefix("remowt.")548		.rand_bytes(12)549		.tempdir()?;550	let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())551		.map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;552	Ok((dir, Some(tmp)))553}554555async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {556	let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;557	let dir = dir.trim();558	if dir.is_empty() {559		remote_mktemp(sess).await560	} else {561		Ok(Utf8PathBuf::from(dir))562	}563}564565async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {566	let mut cmd_chan = sess.channel_open_session().await?;567	cmd_chan568		.exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")569		.await?;570	let mut stdout = vec![];571	loop {572		let Some(msg) = cmd_chan.wait().await else {573			bail!("unexpected channel end");574		};575		match msg {576			russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),577			russh::ChannelMsg::ExitStatus { exit_status } => {578				if exit_status != 0 {579					bail!("mktemp failed");580				}581				break;582			}583			_ => {}584		}585	}586	ensure!(stdout.ends_with(b"\n"));587	stdout.pop();588	Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))589}
addedcrates/remowt-client/src/port.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-client/src/port.rs
@@ -0,0 +1,52 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use russh::{Channel, ChannelStream};
+use russh::client::Msg;
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};
+use tokio::join;
+use tracing::error;
+
+async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
+	let len = srx.read_u32().await?;
+	let mut buf = BytesMut::zeroed(len as usize);
+	srx.read_exact(&mut buf).await?;
+	Ok(buf)
+}
+async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
+	stx.write_u32(value.len().try_into().expect("can't be larger"))
+		.await?;
+	stx.write_all(&value).await?;
+	Ok(())
+}
+
+pub fn channel_port(ch: Channel<Msg>) -> Port {
+	Port::new(move |mut rx, tx| async move {
+		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
+		let srx_task = async move {
+			loop {
+				match read(&mut srx).await {
+					Ok(buf) => {
+						if tx.send(buf.freeze()).is_err() {
+							break;
+						}
+					}
+					Err(e) => {
+						error!("channel read failed: {e}");
+						break;
+					}
+				}
+			}
+		};
+		let stx_task = async move {
+			while let Some(value) = rx.recv().await {
+				if let Err(e) = write(&mut stx, value).await {
+					error!("channel write failed: {e}");
+					break;
+				}
+			}
+		};
+		join!(srx_task, stx_task);
+	})
+}
addedcrates/remowt-client/src/shell.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-client/src/shell.rs
@@ -0,0 +1,60 @@
+use anyhow::{anyhow, Result};
+use bifrostlink::declarative::RemoteEndpoints as _;
+use bifrostlink::Remote;
+use remowt_endpoints::pty::{PtyClient, ShellId};
+use remowt_link_shared::{Address, BifConfig};
+use uuid::Uuid;
+
+use crate::forwarded::RemowtStream;
+use crate::Remowt;
+
+pub struct RemowtShell {
+	pub id: ShellId,
+	pub stream: RemowtStream,
+	remote: Remote<BifConfig>,
+}
+impl RemowtShell {
+	pub fn resizer(&self) -> RemowtShellResizer {
+		RemowtShellResizer {
+			remote: self.remote.clone(),
+			id: self.id,
+		}
+	}
+}
+
+#[derive(Clone)]
+pub struct RemowtShellResizer {
+	remote: Remote<BifConfig>,
+	id: ShellId,
+}
+
+impl RemowtShellResizer {
+	pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
+		PtyClient::wrap(self.remote.clone())
+			.resize(self.id, cols, rows)
+			.await?
+			.map_err(|e| anyhow!("failed to resize remote shell: {e}"))
+	}
+}
+
+impl Remowt {
+	pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {
+		let sock = self
+			.runtime_dir()
+			.join(format!("remowt-shell-{}.sock", Uuid::new_v4()));
+
+		let forwarded = self.forward_socket(&sock).await?;
+		let client: PtyClient<BifConfig> = self.endpoints();
+		let id = client
+			.open_shell(sock, term.to_owned(), cols, rows)
+			.await?
+			.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
+		let stream = forwarded.accept().await?;
+
+		Ok(RemowtShell {
+			id,
+			stream,
+			remote: self.rpc.remote(Address::Agent),
+		})
+	}
+}
addedcrates/remowt-client/src/subprocess.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-client/src/subprocess.rs
@@ -0,0 +1,84 @@
+use bytes::Bytes;
+use russh::client::Msg;
+use russh::{Channel, ChannelMsg};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
+use tokio::sync::oneshot;
+
+const BUF: usize = 64 * 1024;
+
+pub struct RemowtChild {
+	pub stdin: DuplexStream,
+	pub stdout: DuplexStream,
+	pub stderr: DuplexStream,
+	pub exit: oneshot::Receiver<Option<u32>>,
+}
+
+impl RemowtChild {
+	/// Manage channel returned by russh exec().
+	pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
+		let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
+		let (mut out_w, stdout) = tokio::io::duplex(BUF);
+		let (mut err_w, stderr) = tokio::io::duplex(BUF);
+		let (exit_tx, exit) = oneshot::channel();
+
+		tokio::spawn(async move {
+			let (mut read, write) = ch.split();
+
+			// Forward our stdin to the channel, signalling EOF when it closes.
+			let stdin_pump = tokio::spawn(async move {
+				let mut buf = vec![0u8; BUF];
+				loop {
+					match stdin_r.read(&mut buf).await {
+						Ok(0) | Err(_) => break,
+						Ok(n) => {
+							if write
+								.data_bytes(Bytes::copy_from_slice(&buf[..n]))
+								.await
+								.is_err()
+							{
+								return;
+							}
+						}
+					}
+				}
+				let _ = write.eof().await;
+			});
+
+			let mut code = None;
+			while let Some(msg) = read.wait().await {
+				match msg {
+					ChannelMsg::Data { data } => {
+						if out_w.write_all(&data).await.is_err() {
+							break;
+						}
+					}
+					ChannelMsg::ExtendedData { data, .. } => {
+						if err_w.write_all(&data).await.is_err() {
+							break;
+						}
+					}
+					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
+					_ => {}
+				}
+			}
+
+			// The process is gone; stop waiting on stdin we'll never forward.
+			stdin_pump.abort();
+			let _ = out_w.shutdown().await;
+			let _ = err_w.shutdown().await;
+			let _ = exit_tx.send(code);
+		});
+
+		RemowtChild {
+			stdin,
+			stdout,
+			stderr,
+			exit,
+		}
+	}
+
+	/// Wait for the process to finish, returning its exit status.
+	pub async fn wait(self) -> Option<u32> {
+		self.exit.await.ok().flatten()
+	}
+}
modifiedcrates/remowt-link-shared/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-link-shared/Cargo.toml
+++ b/crates/remowt-link-shared/Cargo.toml
@@ -11,6 +11,7 @@
 serde = { workspace = true, features = ["derive"] }
 serde_json.workspace = true
 thiserror.workspace = true
-tokio = { workspace = true, features = ["fs"] }
+tokio = { workspace = true, features = ["fs", "io-util", "macros"] }
+tracing.workspace = true
 remowt-ui-prompt.workspace = true
 camino = { workspace = true, features = ["serde1"] }
modifiedcrates/remowt-link-shared/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -1,14 +1,12 @@
-use std::future::Future;
-
-use bifrostlink::declarative::endpoints;
 use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};
 use bifrostlink::notification;
 use bifrostlink::packet::OpaquePacketWrapper;
-use bifrostlink::{AddressT, Config};
+use bifrostlink::AddressT;
 use serde::de::DeserializeOwned;
 use serde::{Deserialize, Serialize};
 
 pub mod editor;
+pub mod port;
 
 #[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]
 pub enum Address {
@@ -20,26 +18,6 @@
 impl AddressT for Address {}
 
 pub mod plugin;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum ElevateError {
-	#[error("elevation failed: {0}")]
-	Failed(String),
-}
-
-pub trait Elevator: Send + Sync {
-	fn elevate(&self) -> impl Future<Output = Result<(), ElevateError>> + Send;
-}
-
-pub struct ElevateEndpoints<E>(pub E);
-
-#[endpoints(ns = 3)]
-impl<E: Elevator + 'static> ElevateEndpoints<E> {
-	#[endpoints(id = 1)]
-	async fn elevate(&self) -> Result<(), ElevateError> {
-		self.0.elevate().await
-	}
-}
 
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
addedcrates/remowt-link-shared/src/port.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-link-shared/src/port.rs
@@ -0,0 +1,54 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+
+/// Wire a length-prefixed duplex byte stream (e.g. a child process's
+/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
+/// length followed by that many payload bytes.
+pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
+where
+	R: AsyncRead + Unpin + Send + 'static,
+	W: AsyncWrite + Unpin + Send + 'static,
+{
+	Port::new(|mut rx, tx| async move {
+		let read_task = async move {
+			loop {
+				let len = match reader.read_u32().await {
+					Ok(len) => len,
+					Err(e) => {
+						tracing::error!("child read failed: {e}");
+						break;
+					}
+				};
+				let mut buf = BytesMut::zeroed(len as usize);
+				if let Err(e) = reader.read_exact(&mut buf).await {
+					tracing::error!("child read failed: {e}");
+					break;
+				}
+				if tx.send(buf.freeze()).is_err() {
+					break;
+				}
+			}
+		};
+		let write_task = async move {
+			while let Some(msg) = rx.recv().await {
+				if let Err(e) = write_frame(&mut writer, msg).await {
+					tracing::error!("child write failed: {e}");
+					break;
+				}
+			}
+		};
+		tokio::join!(read_task, write_task);
+	})
+}
+
+async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
+	let len = u32::try_from(msg.len())
+		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
+	writer.write_u32(len).await?;
+	writer.write_all(&msg).await?;
+	writer.flush().await?;
+	Ok(())
+}
modifiedcrates/remowt-plugin/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-plugin/Cargo.toml
+++ b/crates/remowt-plugin/Cargo.toml
@@ -9,7 +9,6 @@
 anyhow.workspace = true
 bifrostlink.workspace = true
 bifrostlink-ports.workspace = true
-bytes.workspace = true
 remowt-link-shared.workspace = true
 serde_json.workspace = true
 tokio = { workspace = true, features = [
modifiedcrates/remowt-plugin/src/host.rsdiffbeforeafterboth
--- a/crates/remowt-plugin/src/host.rs
+++ b/crates/remowt-plugin/src/host.rs
@@ -1,14 +1,12 @@
 use std::ffi::OsStr;
-use std::io;
 use std::process::Stdio;
 use std::sync::Mutex;
 
-use bifrostlink::{Port, Rpc, Rtt, WeakRpc};
-use bytes::{Bytes, BytesMut};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
-use tokio::process::{Child, ChildStdin, ChildStdout, Command};
+use bifrostlink::{Rpc, Rtt, WeakRpc};
+use tokio::process::{Child, Command};
 
 use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};
+use remowt_link_shared::port::child_port;
 use remowt_link_shared::{Address, BifConfig};
 
 pub fn serve(rpc: &mut Rpc<BifConfig>) {
@@ -68,46 +66,4 @@
 		}
 		self.spawn(id, path)
 	}
-}
-
-fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {
-	Port::new(|mut rx, tx| async move {
-		let reader = async move {
-			loop {
-				let len = match stdout.read_u32().await {
-					Ok(len) => len,
-					Err(e) => {
-						tracing::error!("plugin stdout read failed: {e}");
-						break;
-					}
-				};
-				let mut buf = BytesMut::zeroed(len as usize);
-				if let Err(e) = stdout.read_exact(&mut buf).await {
-					tracing::error!("plugin stdout read failed: {e}");
-					break;
-				}
-				if tx.send(buf.freeze()).is_err() {
-					break;
-				}
-			}
-		};
-		let writer = async move {
-			while let Some(msg) = rx.recv().await {
-				if let Err(e) = write_frame(&mut stdin, msg).await {
-					tracing::error!("plugin stdin write failed: {e}");
-					break;
-				}
-			}
-		};
-		tokio::join!(reader, writer);
-	})
-}
-
-async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {
-	let len = u32::try_from(msg.len())
-		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
-	stdin.write_u32(len).await?;
-	stdin.write_all(&msg).await?;
-	stdin.flush().await?;
-	Ok(())
 }