git.delta.rocks / remowt / refs/commits / 7e5126608cef

difftreelog

feat(client) proper support for local

uzyqskstYaroslav Bolyukin2026-06-13parent: #600e6ed.patch.diff
in: trunk

16 files changed

modified.gitignorediffbeforeafterboth
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
 /target
 /.direnv
+/result
+/result-*
modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2096,6 +2096,7 @@
  "russh-config",
  "serde",
  "serde_json",
+ "tempfile",
  "tokio",
  "tracing",
  "uuid",
@@ -2131,6 +2132,7 @@
  "serde_json",
  "thiserror",
  "tokio",
+ "tracing",
 ]
 
 [[package]]
@@ -2140,7 +2142,6 @@
  "anyhow",
  "bifrostlink",
  "bifrostlink-ports",
- "bytes",
  "remowt-link-shared",
  "serde_json",
  "tokio",
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
before · cmds/remowt-agent/src/main.rs
1use std::borrow::Cow;2use std::collections::{BTreeMap, HashMap};3use std::fs::Permissions;4use std::future::pending;5use std::os::unix::fs::PermissionsExt as _;6use std::path::PathBuf;7use std::sync::{Arc, Mutex, OnceLock};89use bifrostlink::declarative::RemoteEndpoints;10use bifrostlink::Rpc;11use bifrostlink_ports::stdio::from_stdio;12use bifrostlink_ports::unix_socket::from_socket;13use clap::Parser;14use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};15use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};16use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};17use remowt_ui_prompt::bifrost::PromptEndpointsClient;18use remowt_ui_prompt::rofi::RofiPrompter;19use remowt_ui_prompt::{PrependSourcePrompter, Prompter, Source};20use tokio::fs;21use tokio::net::UnixStream;22use tokio::runtime::Builder;23use tokio::task::AbortHandle;24use tracing::{debug, info, trace};25use zbus::fdo;26use zbus::zvariant::{OwnedValue, Str};27use zbus::{interface, proxy, Connection};28use zbus_polkit::policykit1::Subject;2930use self::helper::{Helper, SocketHelper, SuidHelper};3132pub mod askpass;33pub mod bus;34pub mod editor;35pub mod helper;3637struct CancelTaskOnDrop {38	tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,39	handle: String,40}41impl Drop for CancelTaskOnDrop {42	fn drop(&mut self) {43		debug!("cancel on drop");44		if let Some(task) = self45			.tasks46			.lock()47			.expect("not poisoned")48			.remove(&self.handle)49		{50			task.abort();51		}52	}53}5455struct Agent<H, P> {56	tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,57	helper: H,58	prompter: P,59}60impl<H, P> Agent<H, P> {61	fn new(helper: H, prompter: P) -> Self {62		Agent {63			tasks: Arc::new(Mutex::new(HashMap::new())),64			helper,65			prompter,66		}67	}68}6970#[interface(name = "org.freedesktop.PolicyKit1.AuthenticationAgent")]71impl<H, P> Agent<H, P>72where73	H: Helper + Clone + Send + Sync + 'static,74	P: Prompter + Clone + Send + Sync + 'static,75{76	/// BeginAuthentication method77	#[allow(clippy::too_many_arguments)]78	async fn begin_authentication(79		&self,80		action_id: String,81		message: String,82		_icon_name: String,83		mut details: BTreeMap<String, String>,84		cookie: String,85		identities: Vec<Identity>,86	) -> zbus::fdo::Result<()> {87		use std::fmt::Write;88		info!("begin auth");89		let _cancel_guard = Arc::new(OnceLock::new());90		let task = {91			let helper = self.helper.clone();92			let prompter = self.prompter.clone();93			let cookie = cookie.clone();94			let _cancel_guard = _cancel_guard.clone();95			tokio::task::spawn(async move {96				let _cancel_guard = _cancel_guard.clone();97				trace!("conversation task");98				let mut description = format!("{message}\n\n<b>Action id:</b> {action_id}",);99				if let Some(subject) = details.remove("polkit.caller-pid") {100					let _ = write!(description, "\n<b>Caller:</b> ");101					if let Ok(pid) = subject.parse::<u32>() {102						let _ = write!(description, "{}", PidDisplay(pid));103					} else {104						let _ = write!(description, "{}", emphasize("invalid pid"));105					}106				}107				if let Some(subject) = details.remove("polkit.subject-pid") {108					let _ = write!(description, "\n<b>Subject:</b> ");109					if let Ok(pid) = subject.parse::<u32>() {110						let _ = write!(description, "{}", PidDisplay(pid));111					} else {112						let _ = write!(description, "{}", emphasize("invalid pid"));113					}114				}115				let mut prompter = PrependSourcePrompter {116					source: vec![Source(Cow::Borrowed("polkit agent"))],117					description: description.clone(),118					prompter,119				};120121				let identity_displays: Vec<String> =122					identities.iter().map(|v| v.to_string()).collect();123				let identity_displays: Vec<&str> =124					identity_displays.iter().map(|v| v.as_str()).collect();125				debug!("choose identity");126				let choosen_identity = match identity_displays.len() {127					0 => {128						return Err(fdo::Error::AuthFailed(129							"no identity to authenticate as".to_owned(),130						))131					}132					1 => 0,133					_ => {134						prompter135							.prompt_enum(136								"Identity",137								"Select identity to use for polkit authorization",138								&identity_displays,139								&[],140							)141							.await?142					}143				};144				debug!("identity chosen");145146				let _ = write!(147					description,148					"\n<b>Identity:</b> {}",149					identities[choosen_identity as usize]150				);151				prompter.description = description;152153				prompter.source.push(Source(Cow::Borrowed("polkit daemon")));154155				helper156					.help_me(157						&cookie,158						prompter,159						identities[choosen_identity as usize].clone(),160					)161					.await162					.map_err(|e| fdo::Error::Failed(e.to_string()))?;163				// let connection = Connection::system().await?;164				// let helper = PolkitHelperProxy::new(&connection).await?;165166				Ok(())167			})168		};169		self.tasks170			.lock()171			.unwrap()172			.insert(cookie.clone(), task.abort_handle());173		debug!("abort handle stored");174		let _ = _cancel_guard.set(CancelTaskOnDrop {175			tasks: self.tasks.clone(),176			handle: cookie.clone(),177		});178179		let _ = task.await;180181		Ok(())182	}183184	/// CancelAuthentication method185	async fn cancel_authentication(&self, cookie: &str) -> zbus::fdo::Result<()> {186		debug!("auth cancelled");187		if let Some(abort) = self.tasks.lock().unwrap().remove(cookie) {188			debug!("abort handle found");189			abort.abort();190		}191		// debug!("Authentication cancled ! {cookie}");192		Ok(())193	}194}195196const OBJ_PATH: &str = "/org/freedesktop/PolicyKit1/AuthenticationAgent";197198#[proxy(199	interface = "lach.PolkitHelper",200	default_service = "lach.polkit.helper1",201	default_path = "/lach/PolkitHelper"202)]203trait PolkitHelper {204	fn init_conversation(&self, request: BackendRequest) -> zbus::Result<()>;205}206207#[derive(Parser)]208enum Opts {209	AskPass {210		prompt: String,211		description: String,212	},213	Editor {214		/// Argument to nvim215		path: String,216	},217	RealAgent {218		#[arg(long)]219		path: Option<PathBuf>,220		/// Expect own address to be AgentPrivileged, skip installing polkit agent221		#[arg(long)]222		privileged: bool,223	},224	LocalAgent,225}226227fn main() -> anyhow::Result<()> {228	// Log to stderr: `privileged-agent` uses stdout as the bifrost transport,229	// so anything written there would corrupt the stream.230	tracing_subscriber::fmt()231		.with_writer(std::io::stderr)232		.without_time()233		.init();234	let opts = Opts::parse();235236	let runtime = Builder::new_current_thread().enable_all().build()?;237238	match opts {239		Opts::AskPass {240			prompt,241			description,242		} => runtime.block_on(askpass::ask(&prompt, description)),243		Opts::LocalAgent => runtime.block_on(main_real()),244		Opts::Editor { path } => runtime.block_on(editor::edit(path)),245		Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),246	}247}248async fn main_real() -> anyhow::Result<()> {249	let conn = Connection::system().await?;250	let helper = SocketHelper {251		fallback: SuidHelper,252	};253	register_auth_agent(&conn, Agent::new(helper, RofiPrompter)).await?;254255	let _conn = conn;256	pending().await257}258async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {259	let address = if privileged {260		Address::AgentPrivileged261	} else {262		Address::Agent263	};264	let mut rpc = Rpc::<BifConfig>::new(address);265266	Fs::new().register_endpoints(&mut rpc);267	Systemd.register_endpoints(&mut rpc);268	Pty::new().register_endpoints(&mut rpc);269270	remowt_plugin::host::serve(&mut rpc);271272	let user_prompter = PromptEndpointsClient::wrap(rpc.remote(Address::User));273	let editor_client = EditorEndpointsClient::wrap(rpc.remote(Address::User));274275	let bus = bus::spawn().await?;276	askpass::serve(&bus.conn, user_prompter.clone()).await?;277	editor::serve(&bus.conn, editor_client).await?;278279	let helpers = tempfile::Builder::new().prefix("remowt-path.").tempdir()?;280	let exe = std::env::current_exe()?;281	let askpass_helper = helpers.path().join("remowt-askpass");282	let editor_helper = helpers.path().join("remowt-editor");283	{284		let script = format!(285			"#!/bin/sh\nexec {} ask-pass \"password\" \"$1\"\n",286			sh_quote(&exe.to_string_lossy())287		);288		fs::write(&askpass_helper, script).await?;289		fs::set_permissions(&askpass_helper, Permissions::from_mode(0o755)).await?;290	}291	{292		let script = format!(293			"#!/bin/sh\nexec {} editor \"$1\"\n",294			sh_quote(&exe.to_string_lossy())295		);296		fs::write(&editor_helper, script).await?;297		fs::set_permissions(&editor_helper, Permissions::from_mode(0o755)).await?;298	}299300	// Safety: Hoping tokio own threads won't read any of those...301	unsafe {302		prepend_path(helpers.path());303		std::env::set_var("SUDO_ASKPASS", &askpass_helper);304		std::env::set_var("SSH_ASKPASS", &askpass_helper);305		std::env::set_var("SSH_ASKPASS_REQUIRE", "force");306		std::env::set_var("EDITOR", &editor_helper);307		std::env::set_var("VISUAL", &editor_helper);308		std::env::set_var("DBUS_SESSION_BUS_ADDRESS", &bus.address);309	}310311	let port = match path {312		Some(path) => from_socket(UnixStream::connect(path).await?),313		None => from_stdio(),314	};315	rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));316317	let polkit_conn = if !privileged {318		// The unprivileged agent doubles as a polkit authentication agent so319		// `run0` (e.g. our own elevation) routes its prompt to the User over320		// bifrost instead of failing on a tty-less session.321		let conn = Connection::system().await?;322		let helper = SocketHelper {323			fallback: SuidHelper,324		};325		register_auth_agent(&conn, Agent::new(helper, user_prompter)).await?;326		Some(conn)327	} else {328		None329	};330331	let _keep_alive = (bus, helpers, polkit_conn);332	pending().await333}334335async fn register_auth_agent<H, P>(conn: &Connection, agent: Agent<H, P>) -> anyhow::Result<()>336where337	H: Helper + Clone + Send + Sync + 'static,338	P: Prompter + Clone + Send + Sync + 'static,339{340	let proxy = zbus_polkit::policykit1::AuthorityProxy::new(conn).await?;341	conn.object_server().at(OBJ_PATH, agent).await?;342343	let subject = auth_agent_subject()?;344	proxy345		.register_authentication_agent(&subject, "C", OBJ_PATH)346		.await?;347	debug!(kind = subject.subject_kind, "registered polkit agent");348	Ok(())349}350351fn auth_agent_subject() -> anyhow::Result<Subject> {352	let mut details = HashMap::new();353	if let Ok(session_id) = std::env::var("XDG_SESSION_ID") {354		let val: OwnedValue = Str::from(session_id).into();355		details.insert("session-id".to_string(), val);356		return Ok(Subject {357			subject_kind: "unix-session".to_string(),358			subject_details: details,359		});360	}361362	details.insert("pid".to_string(), OwnedValue::from(std::process::id()));363	Ok(Subject {364		subject_kind: "unix-process".to_string(),365		subject_details: details,366	})367}368369fn sh_quote(s: &str) -> String {370	format!("'{}'", s.replace('\'', "'\\''"))371}372373/// Prepend `dir` to the process `PATH`.374///375/// # SAFETY376///377/// Same as `set_var`378unsafe fn prepend_path(dir: &std::path::Path) {379	let value = match std::env::var_os("PATH") {380		Some(existing) => {381			let mut v = dir.as_os_str().to_owned();382			v.push(":");383			v.push(existing);384			v385		}386		None => dir.as_os_str().to_owned(),387	};388	unsafe {389		std::env::set_var("PATH", value);390	}391}
after · cmds/remowt-agent/src/main.rs
1use std::borrow::Cow;2use std::collections::{BTreeMap, HashMap};3use std::fs::Permissions;4use std::future::pending;5use std::os::unix::fs::PermissionsExt as _;6use std::path::PathBuf;7use std::sync::{Arc, Mutex, OnceLock};89use bifrostlink::declarative::RemoteEndpoints;10use bifrostlink::Rpc;11use bifrostlink_ports::stdio::from_stdio;12use bifrostlink_ports::unix_socket::from_socket;13use clap::Parser;14use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};15use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};16use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};17use remowt_ui_prompt::bifrost::PromptEndpointsClient;18use remowt_ui_prompt::rofi::RofiPrompter;19use remowt_ui_prompt::{PrependSourcePrompter, Prompter, Source};20use tokio::fs;21use tokio::net::UnixStream;22use tokio::runtime::Builder;23use tokio::task::AbortHandle;24use tracing::{debug, info, trace};25use zbus::fdo;26use zbus::zvariant::{OwnedValue, Str};27use zbus::{interface, proxy, Connection};28use zbus_polkit::policykit1::Subject;2930use self::helper::{Helper, SocketHelper, SuidHelper};3132pub mod askpass;33pub mod bus;34pub mod editor;35pub mod helper;3637struct CancelTaskOnDrop {38	tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,39	handle: String,40}41impl Drop for CancelTaskOnDrop {42	fn drop(&mut self) {43		debug!("cancel on drop");44		if let Some(task) = self45			.tasks46			.lock()47			.expect("not poisoned")48			.remove(&self.handle)49		{50			task.abort();51		}52	}53}5455struct Agent<H, P> {56	tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,57	helper: H,58	prompter: P,59}60impl<H, P> Agent<H, P> {61	fn new(helper: H, prompter: P) -> Self {62		Agent {63			tasks: Arc::new(Mutex::new(HashMap::new())),64			helper,65			prompter,66		}67	}68}6970#[interface(name = "org.freedesktop.PolicyKit1.AuthenticationAgent")]71impl<H, P> Agent<H, P>72where73	H: Helper + Clone + Send + Sync + 'static,74	P: Prompter + Clone + Send + Sync + 'static,75{76	/// BeginAuthentication method77	#[allow(clippy::too_many_arguments)]78	async fn begin_authentication(79		&self,80		action_id: String,81		message: String,82		_icon_name: String,83		mut details: BTreeMap<String, String>,84		cookie: String,85		identities: Vec<Identity>,86	) -> zbus::fdo::Result<()> {87		use std::fmt::Write;88		info!("begin auth");89		let _cancel_guard = Arc::new(OnceLock::new());90		let task = {91			let helper = self.helper.clone();92			let prompter = self.prompter.clone();93			let cookie = cookie.clone();94			let _cancel_guard = _cancel_guard.clone();95			tokio::task::spawn(async move {96				let _cancel_guard = _cancel_guard.clone();97				trace!("conversation task");98				let mut description = format!("{message}\n\n<b>Action id:</b> {action_id}",);99				if let Some(subject) = details.remove("polkit.caller-pid") {100					let _ = write!(description, "\n<b>Caller:</b> ");101					if let Ok(pid) = subject.parse::<u32>() {102						let _ = write!(description, "{}", PidDisplay(pid));103					} else {104						let _ = write!(description, "{}", emphasize("invalid pid"));105					}106				}107				if let Some(subject) = details.remove("polkit.subject-pid") {108					let _ = write!(description, "\n<b>Subject:</b> ");109					if let Ok(pid) = subject.parse::<u32>() {110						let _ = write!(description, "{}", PidDisplay(pid));111					} else {112						let _ = write!(description, "{}", emphasize("invalid pid"));113					}114				}115				let mut prompter = PrependSourcePrompter {116					source: vec![Source(Cow::Borrowed("polkit agent"))],117					description: description.clone(),118					prompter,119				};120121				let identity_displays: Vec<String> =122					identities.iter().map(|v| v.to_string()).collect();123				let identity_displays: Vec<&str> =124					identity_displays.iter().map(|v| v.as_str()).collect();125				debug!("choose identity");126				let choosen_identity = match identity_displays.len() {127					0 => {128						return Err(fdo::Error::AuthFailed(129							"no identity to authenticate as".to_owned(),130						))131					}132					1 => 0,133					_ => {134						prompter135							.prompt_enum(136								"Identity",137								"Select identity to use for polkit authorization",138								&identity_displays,139								&[],140							)141							.await?142					}143				};144				debug!("identity chosen");145146				let _ = write!(147					description,148					"\n<b>Identity:</b> {}",149					identities[choosen_identity as usize]150				);151				prompter.description = description;152153				prompter.source.push(Source(Cow::Borrowed("polkit daemon")));154155				helper156					.help_me(157						&cookie,158						prompter,159						identities[choosen_identity as usize].clone(),160					)161					.await162					.map_err(|e| fdo::Error::Failed(e.to_string()))?;163				// let connection = Connection::system().await?;164				// let helper = PolkitHelperProxy::new(&connection).await?;165166				Ok(())167			})168		};169		self.tasks170			.lock()171			.unwrap()172			.insert(cookie.clone(), task.abort_handle());173		debug!("abort handle stored");174		let _ = _cancel_guard.set(CancelTaskOnDrop {175			tasks: self.tasks.clone(),176			handle: cookie.clone(),177		});178179		let _ = task.await;180181		Ok(())182	}183184	/// CancelAuthentication method185	async fn cancel_authentication(&self, cookie: &str) -> zbus::fdo::Result<()> {186		debug!("auth cancelled");187		if let Some(abort) = self.tasks.lock().unwrap().remove(cookie) {188			debug!("abort handle found");189			abort.abort();190		}191		// debug!("Authentication cancled ! {cookie}");192		Ok(())193	}194}195196const OBJ_PATH: &str = "/org/freedesktop/PolicyKit1/AuthenticationAgent";197198#[proxy(199	interface = "lach.PolkitHelper",200	default_service = "lach.polkit.helper1",201	default_path = "/lach/PolkitHelper"202)]203trait PolkitHelper {204	fn init_conversation(&self, request: BackendRequest) -> zbus::Result<()>;205}206207#[derive(Parser)]208enum Opts {209	AskPass {210		prompt: String,211		description: String,212	},213	Editor {214		/// Argument to nvim215		path: String,216	},217	RealAgent {218		#[arg(long)]219		path: Option<PathBuf>,220		/// Expect own address to be AgentPrivileged, skip installing polkit agent221		#[arg(long)]222		privileged: bool,223	},224	LocalAgent,225}226227fn main() -> anyhow::Result<()> {228	tracing_subscriber::fmt()229		.with_writer(std::io::stderr)230		.without_time()231		.init();232	let opts = Opts::parse();233234	let runtime = Builder::new_current_thread().enable_all().build()?;235236	match opts {237		Opts::AskPass {238			prompt,239			description,240		} => runtime.block_on(askpass::ask(&prompt, description)),241		Opts::LocalAgent => runtime.block_on(main_real()),242		Opts::Editor { path } => runtime.block_on(editor::edit(path)),243		Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),244	}245}246async fn main_real() -> anyhow::Result<()> {247	let conn = Connection::system().await?;248	let helper = SocketHelper {249		fallback: SuidHelper,250	};251	register_auth_agent(&conn, Agent::new(helper, RofiPrompter)).await?;252253	let _conn = conn;254	pending().await255}256async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {257	let address = if privileged {258		Address::AgentPrivileged259	} else {260		Address::Agent261	};262	let mut rpc = Rpc::<BifConfig>::new(address);263264	Fs::new().register_endpoints(&mut rpc);265	Systemd.register_endpoints(&mut rpc);266	Pty::new().register_endpoints(&mut rpc);267268	remowt_plugin::host::serve(&mut rpc);269270	let user_prompter = PromptEndpointsClient::wrap(rpc.remote(Address::User));271	let editor_client = EditorEndpointsClient::wrap(rpc.remote(Address::User));272273	let bus = bus::spawn().await?;274	askpass::serve(&bus.conn, user_prompter.clone()).await?;275	editor::serve(&bus.conn, editor_client).await?;276277	let helpers = tempfile::Builder::new().prefix("remowt-path.").tempdir()?;278	let exe = std::env::current_exe()?;279	let askpass_helper = helpers.path().join("remowt-askpass");280	let editor_helper = helpers.path().join("remowt-editor");281	{282		let script = format!(283			"#!/bin/sh\nexec {} ask-pass \"password\" \"$1\"\n",284			sh_quote(&exe.to_string_lossy())285		);286		fs::write(&askpass_helper, script).await?;287		fs::set_permissions(&askpass_helper, Permissions::from_mode(0o755)).await?;288	}289	{290		let script = format!(291			"#!/bin/sh\nexec {} editor \"$1\"\n",292			sh_quote(&exe.to_string_lossy())293		);294		fs::write(&editor_helper, script).await?;295		fs::set_permissions(&editor_helper, Permissions::from_mode(0o755)).await?;296	}297298	// Safety: Hoping tokio own threads won't read any of those...299	unsafe {300		prepend_path(helpers.path());301		std::env::set_var("SUDO_ASKPASS", &askpass_helper);302		std::env::set_var("SSH_ASKPASS", &askpass_helper);303		std::env::set_var("SSH_ASKPASS_REQUIRE", "force");304		std::env::set_var("EDITOR", &editor_helper);305		std::env::set_var("VISUAL", &editor_helper);306		std::env::set_var("DBUS_SESSION_BUS_ADDRESS", &bus.address);307	}308309	let port = match path {310		Some(path) => from_socket(UnixStream::connect(path).await?),311		None => from_stdio(),312	};313	rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));314315	let polkit_conn = if !privileged {316		// The unprivileged agent doubles as a polkit authentication agent so317		// `run0` (e.g. our own elevation) routes its prompt to the User over318		// bifrost instead of failing on a tty-less session.319		let conn = Connection::system().await?;320		let helper = SocketHelper {321			fallback: SuidHelper,322		};323		register_auth_agent(&conn, Agent::new(helper, user_prompter)).await?;324		Some(conn)325	} else {326		None327	};328329	let _keep_alive = (bus, helpers, polkit_conn);330	pending().await331}332333async fn register_auth_agent<H, P>(conn: &Connection, agent: Agent<H, P>) -> anyhow::Result<()>334where335	H: Helper + Clone + Send + Sync + 'static,336	P: Prompter + Clone + Send + Sync + 'static,337{338	let proxy = zbus_polkit::policykit1::AuthorityProxy::new(conn).await?;339	conn.object_server().at(OBJ_PATH, agent).await?;340341	let subject = auth_agent_subject()?;342	proxy343		.register_authentication_agent(&subject, "C", OBJ_PATH)344		.await?;345	debug!(kind = subject.subject_kind, "registered polkit agent");346	Ok(())347}348349fn auth_agent_subject() -> anyhow::Result<Subject> {350	let mut details = HashMap::new();351	if let Ok(session_id) = std::env::var("XDG_SESSION_ID") {352		let val: OwnedValue = Str::from(session_id).into();353		details.insert("session-id".to_string(), val);354		return Ok(Subject {355			subject_kind: "unix-session".to_string(),356			subject_details: details,357		});358	}359360	details.insert("pid".to_string(), OwnedValue::from(std::process::id()));361	Ok(Subject {362		subject_kind: "unix-process".to_string(),363		subject_details: details,364	})365}366367fn sh_quote(s: &str) -> String {368	format!("'{}'", s.replace('\'', "'\\''"))369}370371/// Prepend `dir` to the process `PATH`.372///373/// # SAFETY374///375/// Same as `set_var`376unsafe fn prepend_path(dir: &std::path::Path) {377	let value = match std::env::var_os("PATH") {378		Some(existing) => {379			let mut v = dir.as_os_str().to_owned();380			v.push(":");381			v.push(existing);382			v383		}384		None => dir.as_os_str().to_owned(),385	};386	unsafe {387		std::env::set_var("PATH", value);388	}389}
modifiedcmds/remowt-ssh/Cargo.tomldiffbeforeafterboth
--- a/cmds/remowt-ssh/Cargo.toml
+++ b/cmds/remowt-ssh/Cargo.toml
@@ -11,8 +11,15 @@
 tracing-subscriber.workspace = true
 bifrostlink.workspace = true
 remowt-link-shared.workspace = true
-remowt-client.workspace = true
-tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }
+remowt-client = { workspace = true, features = ["shell"] }
+tokio = { workspace = true, features = [
+	"macros",
+	"fs",
+	"net",
+	"io-util",
+	"rt",
+	"signal",
+] }
 nix = { workspace = true, features = ["term"] }
 anyhow.workspace = true
 bifrostlink-ports.workspace = true
modifiedcmds/remowt-ssh/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -19,11 +19,14 @@
 use tokio::io::unix::AsyncFd;
 use tokio::io::{AsyncRead, ReadBuf};
 use tokio::signal::unix::{signal, SignalKind};
-use tracing::info;
+use tracing::debug;
 
 #[derive(Parser)]
-struct Opts {
-	host: String,
+enum Opts {
+	/// Connect to remote host with remowt agent.
+	Ssh { host: String },
+	/// Connect to local host for testing the connectivity.
+	Local,
 }
 
 fn agents_dir() -> anyhow::Result<PathBuf> {
@@ -35,18 +38,27 @@
 
 #[tokio::main(flavor = "current_thread")]
 async fn main() -> anyhow::Result<()> {
-	tracing_subscriber::fmt::init();
+	tracing_subscriber::fmt()
+		.with_writer(std::io::stderr)
+		.without_time()
+		.init();
 	let opts = Opts::parse();
 
 	let bundle = AgentBundle::from_dir(agents_dir()?)?;
-	let conn = Remowt::connect(&opts.host, &bundle).await?;
+	let conn = match &opts {
+		Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,
+		Opts::Local => Remowt::connect_local(&bundle).await?,
+	};
 	let mut rpc = conn.rpc();
 
 	serve_prompts(
 		&mut rpc,
 		PrependSourcePrompter {
 			prompter: RofiPrompter,
-			source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],
+			source: match opts {
+				Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
+				Opts::Local => vec![],
+			},
 			description: "".to_owned(),
 		},
 	);
@@ -54,9 +66,9 @@
 		serve_editor(&mut rpc, SshEditor { sess });
 	}
 
-	info!("entering shell");
+	debug!("entering shell");
 	run_shell(&conn).await?;
-	info!("shell ended");
+	debug!("shell ended");
 
 	Ok(())
 }
modifiedcrates/remowt-client/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -16,7 +16,18 @@
 remowt-link-shared.workspace = true
 russh.workspace = true
 russh-config.workspace = true
-tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }
+tempfile.workspace = true
+tokio = { workspace = true, features = [
+	"net",
+	"io-util",
+	"rt",
+	"sync",
+	"macros",
+	"process",
+] }
 tracing.workspace = true
 uuid = { workspace = true, features = ["v4"] }
-remowt-endpoints.workspace = true
+remowt-endpoints = { workspace = true, optional = true }
+
+[features]
+shell = ["dep:remowt-endpoints"]
addedcrates/remowt-client/src/forwarded.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-client/src/forwarded.rs
@@ -0,0 +1,79 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Result};
+use camino::Utf8PathBuf;
+use russh::client::Msg;
+use russh::{Channel, ChannelStream};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::net::{UnixListener, UnixStream};
+use tokio::sync::oneshot;
+
+pub enum RemowtListener {
+	Ssh(oneshot::Receiver<Channel<Msg>>),
+	Local(UnixListener, Utf8PathBuf),
+}
+
+impl RemowtListener {
+	pub async fn accept(self) -> Result<RemowtStream> {
+		match self {
+			RemowtListener::Ssh(rx) => {
+				let ch = rx
+					.await
+					.map_err(|_| anyhow!("agent never connected the forwarded socket"))?;
+				Ok(RemowtStream::Ssh(ch.into_stream()))
+			}
+			RemowtListener::Local(listener, path) => {
+				let (stream, _) = listener.accept().await?;
+				let _ = std::fs::remove_file(&path);
+				Ok(RemowtStream::Local(stream))
+			}
+		}
+	}
+}
+
+pub enum RemowtStream {
+	Ssh(ChannelStream<Msg>),
+	Local(UnixStream),
+}
+
+impl AsyncRead for RemowtStream {
+	fn poll_read(
+		self: Pin<&mut Self>,
+		cx: &mut Context<'_>,
+		buf: &mut ReadBuf<'_>,
+	) -> Poll<io::Result<()>> {
+		match self.get_mut() {
+			RemowtStream::Ssh(s) => Pin::new(s).poll_read(cx, buf),
+			RemowtStream::Local(s) => Pin::new(s).poll_read(cx, buf),
+		}
+	}
+}
+
+impl AsyncWrite for RemowtStream {
+	fn poll_write(
+		self: Pin<&mut Self>,
+		cx: &mut Context<'_>,
+		buf: &[u8],
+	) -> Poll<io::Result<usize>> {
+		match self.get_mut() {
+			RemowtStream::Ssh(s) => Pin::new(s).poll_write(cx, buf),
+			RemowtStream::Local(s) => Pin::new(s).poll_write(cx, buf),
+		}
+	}
+
+	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+		match self.get_mut() {
+			RemowtStream::Ssh(s) => Pin::new(s).poll_flush(cx),
+			RemowtStream::Local(s) => Pin::new(s).poll_flush(cx),
+		}
+	}
+
+	fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+		match self.get_mut() {
+			RemowtStream::Ssh(s) => Pin::new(s).poll_shutdown(cx),
+			RemowtStream::Local(s) => Pin::new(s).poll_shutdown(cx),
+		}
+	}
+}
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -1,61 +1,53 @@
 use std::collections::HashMap;
+use std::env;
 use std::path::PathBuf;
 use std::sync::{Arc, Mutex};
-use std::{env, io};
 
 use anyhow::{anyhow, bail, ensure, Context as _, Result};
 use bifrostlink::declarative::RemoteEndpoints;
-use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};
+use bifrostlink::{Remote, Rpc, Rtt};
 use bifrostlink_ports::unix_socket::from_socket;
-use bytes::{Bytes, BytesMut};
 use camino::{Utf8Path, Utf8PathBuf};
-use remowt_endpoints::{
-	fs::Fs,
-	pty::{Pty, PtyClient, ShellId},
-	systemd::Systemd,
-};
 use remowt_link_shared::plugin::PluginEndpointsClient;
-use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator};
+use remowt_link_shared::port::child_port;
+use remowt_link_shared::{Address, BifConfig};
 use russh::client::{connect, Config, Handle, Handler, Msg, Session};
 use russh::keys::agent::client::AgentClient;
 use russh::keys::agent::AgentIdentity;
 use russh::keys::check_known_hosts;
 use russh::keys::ssh_key::PublicKey;
-use russh::{Channel, ChannelMsg, ChannelStream};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};
-use tokio::join;
+use russh::Channel;
+use tempfile::TempDir;
 use tokio::net::UnixListener;
-use tokio::sync::mpsc;
 use tokio::sync::oneshot::{self, channel};
-use tracing::error;
+use tokio::{
+	fs,
+	io::{AsyncReadExt as _, AsyncWriteExt as _},
+};
+use tracing::{debug, error};
 use uuid::Uuid;
 
+use self::port::channel_port;
+use self::subprocess::RemowtChild;
+
 pub mod editor;
+mod forwarded;
+mod port;
+#[cfg(feature = "shell")]
+mod shell;
+mod subprocess;
 
-type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
+pub use forwarded::{RemowtListener, RemowtStream};
+#[cfg(feature = "shell")]
+pub use shell::{RemowtShell, RemowtShellResizer};
 
-async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
-	let len = srx.read_u32().await?;
-	let mut buf = BytesMut::zeroed(len as usize);
-	srx.read_exact(&mut buf).await?;
-	Ok(buf)
-}
-async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
-	stx.write_u32(value.len().try_into().expect("can't be larger"))
-		.await?;
-	stx.write_all(&value).await?;
-	Ok(())
-}
+type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
 
 fn sh_quote(s: impl AsRef<str>) -> String {
 	format!("'{}'", s.as_ref().replace('\'', "'\\''"))
 }
 
-const ESCALATORS: [(&str, &[&str]); 3] = [
-	("run0", &["--background=", "--pipe"]),
-	("sudo", &[]),
-	("doas", &[]),
-];
+const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];
 
 pub struct AgentBundle {
 	dir: PathBuf,
@@ -90,26 +82,36 @@
 	fn binary(&self, arch: &str) -> PathBuf {
 		self.dir.join(format!("remowt-agent-{arch}"))
 	}
+
+	fn local_binary(&self) -> Result<PathBuf> {
+		let arch = env::consts::ARCH;
+		let path = self.binary(arch);
+		ensure!(
+			path.is_file(),
+			"no local remowt-agent build for arch {arch} in bundle {}",
+			self.dir.display()
+		);
+		Ok(path)
+	}
 }
 
 async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {
-	let mut ch = sess.channel_open_session().await?;
+	let ch = sess.channel_open_session().await?;
 	ch.exec(true, cmd).await?;
+
+	let mut child = RemowtChild::from_exec(ch);
+	drop(child.stdin);
+
 	let mut out = Vec::new();
-	let mut code = None;
-	while let Some(msg) = ch.wait().await {
-		match msg {
-			ChannelMsg::Data { data } => out.extend(data.as_ref()),
-			ChannelMsg::ExtendedData { data, .. } => {
-				error!(
-					"remote stderr: {}",
-					String::from_utf8_lossy(data.as_ref()).trim()
-				);
-			}
-			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
-			_ => {}
-		}
+	let mut err = Vec::new();
+	tokio::try_join!(
+		child.stdout.read_to_end(&mut out),
+		child.stderr.read_to_end(&mut err),
+	)?;
+	if !err.is_empty() {
+		error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());
 	}
+	let code = child.exit.await.ok().flatten();
 	Ok((code, out))
 }
 
@@ -119,21 +121,27 @@
 		code == Some(0),
 		"remote command failed (exit {code:?}): {cmd}"
 	);
-	ensure!(out.ends_with(b"\n"));
-	out.pop();
+	if !out.is_empty() {
+		ensure!(
+			out.ends_with(b"\n"),
+			"remote command was not newline-terminated: {cmd}: {out:?}"
+		);
+		out.pop();
+	}
 	String::from_utf8(out).context("expected utf8 output for command")
 }
 
 async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {
+	debug!("uname -a");
 	let arch = run_string_ok(sess, "uname -m").await?;
 	let hash = bundle
 		.hashes
 		.get(&arch)
 		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;
 
+	debug!("get dir");
 	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")
 		.await?
-		.trim()
 		.to_owned();
 	let dir = if cache.is_empty() {
 		let home = run_string_ok(sess, "echo \"$HOME\"").await?;
@@ -141,17 +149,21 @@
 			!home.is_empty(),
 			"remote $HOME and $XDG_CACHE_HOME both empty"
 		);
-		Utf8PathBuf::from(home).join("cache/remowt")
+		Utf8PathBuf::from(home).join(".cache/remowt")
 	} else {
 		Utf8PathBuf::from(cache).join("remowt")
 	};
 	let path = dir.join(hash);
 
+	debug!("presence");
 	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;
 	if present != Some(0) {
 		let bin = bundle.binary(&arch);
-		let bytes = std::fs::read(&bin)
+		debug!("read");
+		let bytes = fs::read(&bin)
+			.await
 			.with_context(|| format!("reading agent binary {}", bin.display()))?;
+		debug!("upload");
 		upload_agent(sess, &dir, &path, bytes).await?;
 	}
 	Ok(path)
@@ -163,29 +175,29 @@
 	path: &Utf8Path,
 	bytes: Vec<u8>,
 ) -> Result<()> {
+	debug!("mkdirp");
 	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;
 
-	let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));
+	let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));
 	let ch = sess.channel_open_session().await?;
+	debug!("cat");
 	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;
-	ch.data_bytes(bytes).await?;
-	ch.eof().await?;
-	let mut ch = ch;
-	let mut code = None;
-	while let Some(msg) = ch.wait().await {
-		match msg {
-			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
-			ChannelMsg::ExtendedData { data, .. } => {
-				error!(
-					"agent upload: {}",
-					String::from_utf8_lossy(data.as_ref()).trim()
-				);
-			}
-			_ => {}
-		}
-	}
+
+	let mut child = RemowtChild::from_exec(ch);
+	child
+		.stdin
+		.write_all(&bytes)
+		.await
+		.context("sending agent binary")?;
+	child
+		.stdin
+		.shutdown()
+		.await
+		.context("sending agent binary")?;
+	let code = child.wait().await;
 	ensure!(code == Some(0), "agent upload failed (exit {code:?})");
 
+	debug!("chmod");
 	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;
 	run_string_ok(
 		sess,
@@ -205,7 +217,7 @@
 			return Ok((tool, flags));
 		}
 	}
-	bail!("no escalation tool (run0/sudo/doas) found on remote")
+	bail!("no escalation tool found on remote")
 }
 
 fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {
@@ -226,36 +238,6 @@
 	env::split_paths(&path)
 		.map(|dir| dir.join(name))
 		.find(|p| p.is_file())
-}
-
-fn port_from_channel(ch: Channel<Msg>) -> Port {
-	Port::new(move |mut rx, tx| async move {
-		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
-		let srx_task = async move {
-			loop {
-				match read(&mut srx).await {
-					Ok(buf) => {
-						if tx.send(buf.freeze()).is_err() {
-							break;
-						}
-					}
-					Err(e) => {
-						error!("channel read failed: {e}");
-						break;
-					}
-				}
-			}
-		};
-		let stx_task = async move {
-			while let Some(value) = rx.recv().await {
-				if let Err(e) = write(&mut stx, value).await {
-					error!("channel write failed: {e}");
-					break;
-				}
-			}
-		};
-		join!(srx_task, stx_task);
-	})
 }
 
 pub struct SshHandler {
@@ -286,56 +268,20 @@
 			return Err(russh::Error::WrongChannel);
 		};
 		let _ = ch.send(channel);
-		Ok(())
-	}
-}
-
-struct SshElevator {
-	sess: Arc<Handle<SshHandler>>,
-	rpc: WeakRpc<BifConfig>,
-	agent_path: Utf8PathBuf,
-}
-impl Elevator for SshElevator {
-	async fn elevate(&self) -> Result<(), ElevateError> {
-		let fail = |e: String| ElevateError::Failed(e);
-		let (tool, flags) = detect_escalation(&self.sess)
-			.await
-			.map_err(|e| fail(e.to_string()))?;
-		let ch = self
-			.sess
-			.channel_open_session()
-			.await
-			.map_err(|e| fail(e.to_string()))?;
-		ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))
-			.await
-			.map_err(|e| fail(e.to_string()))?;
-		let rpc = self
-			.rpc
-			.clone()
-			.upgrade()
-			.ok_or_else(|| fail("rpc is gone".to_owned()))?;
-		rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));
 		Ok(())
 	}
 }
 
-pub struct RemoteChild {
-	pub stdout: DuplexStream,
-	pub stderr: DuplexStream,
-	pub exit: oneshot::Receiver<Option<u32>>,
-}
-
 enum Transport {
 	Ssh {
 		sess: Arc<Handle<SshHandler>>,
 		subs: Subs,
-		remote_dir: Utf8PathBuf,
+		runtime_dir: Utf8PathBuf,
 		agent_path: Utf8PathBuf,
 	},
 	Local {
-		#[allow(dead_code)]
-		agent: Rpc<BifConfig>,
-		agent_path: String,
+		agent_path: PathBuf,
+		runtime_dir: Utf8PathBuf,
 	},
 }
 
@@ -344,44 +290,11 @@
 	rpc: Rpc<BifConfig>,
 	elevated: tokio::sync::OnceCell<()>,
 	children: Mutex<Vec<tokio::process::Child>>,
+	_runtime_tmp: Option<TempDir>,
 }
 
 pub type RemowtRemote = Remote<BifConfig>;
 
-fn loopback() -> (Port, Port) {
-	let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();
-	let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();
-	let user = Port::new(move |mut rx, tx| async move {
-		loop {
-			tokio::select! {
-				msg = rx.recv() => match msg {
-					Some(msg) => if a2b_tx.send(msg).is_err() { break },
-					None => break,
-				},
-				msg = b2a_rx.recv() => match msg {
-					Some(msg) => if tx.send(msg).is_err() { break },
-					None => break,
-				},
-			}
-		}
-	});
-	let agent = Port::new(move |mut rx, tx| async move {
-		loop {
-			tokio::select! {
-				msg = rx.recv() => match msg {
-					Some(msg) => if b2a_tx.send(msg).is_err() { break },
-					None => break,
-				},
-				msg = a2b_rx.recv() => match msg {
-					Some(msg) => if tx.send(msg).is_err() { break },
-					None => break,
-				},
-			}
-		}
-	});
-	(user, agent)
-}
-
 impl Remowt {
 	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {
 		let conf = russh_config::parse_home(host)?;
@@ -426,13 +339,14 @@
 		}
 		ensure!(authenticated, "ssh authentication failed");
 
-		// All remaining session ops take `&self`; share the handle.
 		let sess = Arc::new(sess);
 
+		debug!("deploying agent");
 		let agent_path = deploy_agent(&sess, bundle).await?;
 
-		let remote_dir = remote_mktemp(&sess).await?;
-		let primary = remote_dir.join("primary.sock");
+		debug!("runtime dir");
+		let runtime_dir = remote_runtime_dir(&sess).await?;
+		let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));
 
 		let (onetx, onerx) = channel();
 		subs.lock().expect("lock").insert(primary.clone(), onetx);
@@ -442,6 +356,7 @@
 
 		// TODO: ensure no injection is possible in the socket path.
 		let cmd_chan = sess.channel_open_session().await?;
+		debug!("starting agent");
 		cmd_chan
 			.exec(
 				true,
@@ -453,7 +368,7 @@
 			)
 			.await?;
 
-		let port = port_from_channel(
+		let port = channel_port(
 			onerx
 				.await
 				.map_err(|_| anyhow!("agent never opened its channel"))?,
@@ -464,36 +379,42 @@
 			transport: Transport::Ssh {
 				sess,
 				subs,
-				remote_dir,
+				runtime_dir,
 				agent_path,
 			},
 			rpc,
 			elevated: tokio::sync::OnceCell::new(),
 			children: Mutex::new(Vec::new()),
+			_runtime_tmp: None,
 		})
 	}
 
-	pub async fn connect_local(agent_path: &str) -> Result<Self> {
-		let (port_user, port_agent) = loopback();
+	pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {
+		let agent_path = bundle.local_binary()?;
+		let mut child = tokio::process::Command::new(&agent_path)
+			.arg("real-agent")
+			.stdin(std::process::Stdio::piped())
+			.stdout(std::process::Stdio::piped())
+			.kill_on_drop(true)
+			.spawn()
+			.with_context(|| format!("spawning agent binary {}", agent_path.display()))?;
+		let stdin = child.stdin.take().expect("stdin piped");
+		let stdout = child.stdout.take().expect("stdout piped");
+
 		let rpc = Rpc::<BifConfig>::new(Address::User);
-		let mut agent = Rpc::<BifConfig>::new(Address::Agent);
+		rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));
 
-		// Register handlers before wiring up the link (see the agent binary).
-		Fs::new().register_endpoints(&mut agent);
-		Systemd.register_endpoints(&mut agent);
-		Pty::new().register_endpoints(&mut agent);
+		let (runtime_dir, runtime_tmp) = local_runtime_dir()?;
 
-		agent.add_direct(Address::User, port_agent, Rtt(0));
-		rpc.add_direct(Address::Agent, port_user, Rtt(0));
-
 		Ok(Self {
 			transport: Transport::Local {
-				agent,
-				agent_path: agent_path.to_owned(),
+				agent_path,
+				runtime_dir,
 			},
 			rpc,
 			elevated: tokio::sync::OnceCell::new(),
-			children: Mutex::new(Vec::new()),
+			children: Mutex::new(vec![child]),
+			_runtime_tmp: runtime_tmp,
 		})
 	}
 
@@ -547,24 +468,25 @@
 						let ch = sess.channel_open_session().await?;
 						ch.exec(true, privileged_cmd(tool, flags, agent_path, None))
 							.await?;
-						port_from_channel(ch)
+						channel_port(ch)
 					}
 					Transport::Local { agent_path, .. } => {
-						let sock = env::temp_dir()
-							.join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));
+						let sock = self
+							.runtime_dir()
+							.join(format!("remowt-priv-{}.sock", Uuid::new_v4()));
 						let _ = std::fs::remove_file(&sock);
 						let listener = UnixListener::bind(&sock)?;
 						let (tool, flags) = ESCALATORS
 							.iter()
 							.find(|(t, _)| find_in_path(t).is_some())
-							.ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;
+							.ok_or_else(|| anyhow!("no escalation tool found"))?;
 						let child = tokio::process::Command::new(tool)
 							.args(*flags)
 							.arg(agent_path)
 							.arg("real-agent")
 							.arg("--privileged")
 							.arg("--path")
-							.arg(sock.to_str().expect("temp path is utf-8"))
+							.arg(sock.as_str())
 							.kill_on_drop(true)
 							.spawn()?;
 						self.children.lock().expect("lock").push(child);
@@ -580,138 +502,63 @@
 		Ok(())
 	}
 
-	pub async fn exec(&self, command: String) -> Result<RemoteChild> {
+	pub async fn exec(&self, command: String) -> Result<RemowtChild> {
 		let Some(sess) = self.ssh() else {
 			bail!("exec should not be called on local")
 		};
 		let ch = sess.channel_open_session().await?;
 		ch.exec(true, command).await?;
-
-		let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);
-		let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);
-		let (exit_tx, exit) = oneshot::channel();
-
-		tokio::spawn(async move {
-			let mut ch = ch;
-			let mut code = None;
-			while let Some(msg) = ch.wait().await {
-				match msg {
-					ChannelMsg::Data { data } => {
-						if out_w.write_all(&data).await.is_err() {
-							break;
-						}
-					}
-					ChannelMsg::ExtendedData { data, .. } => {
-						if err_w.write_all(&data).await.is_err() {
-							break;
-						}
-					}
-					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
-					_ => {}
-				}
-			}
-			let _ = out_w.shutdown().await;
-			let _ = err_w.shutdown().await;
-			let _ = exit_tx.send(code);
-		});
-
-		Ok(RemoteChild {
-			stdout,
-			stderr,
-			exit,
-		})
+		Ok(RemowtChild::from_exec(ch))
 	}
 
-	pub fn serve_elevate(&self) -> Result<()> {
-		let Transport::Ssh {
-			sess, agent_path, ..
-		} = &self.transport
-		else {
-			bail!("elevate should not be called on local")
-		};
-		let mut rpc = self.rpc.clone();
-		ElevateEndpoints(SshElevator {
-			sess: sess.clone(),
-			rpc: self.rpc.clone().downgrade(),
-			agent_path: agent_path.to_owned(),
-		})
-		.register_endpoints(&mut rpc);
-		Ok(())
+	fn runtime_dir(&self) -> Utf8PathBuf {
+		match &self.transport {
+			Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),
+			Transport::Local { runtime_dir, .. } => runtime_dir.clone(),
+		}
 	}
 
-	pub fn remote_dir(&self) -> Option<&Utf8Path> {
+	pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {
 		match &self.transport {
-			Transport::Ssh { remote_dir, .. } => Some(remote_dir),
-			Transport::Local { .. } => None,
+			Transport::Ssh { sess, subs, .. } => {
+				let (tx, rx) = oneshot::channel();
+				subs.lock().expect("lock").insert(path.to_owned(), tx);
+				sess.streamlocal_forward(path.to_owned()).await?;
+				Ok(RemowtListener::Ssh(rx))
+			}
+			Transport::Local { .. } => {
+				let _ = std::fs::remove_file(path);
+				Ok(RemowtListener::Local(
+					UnixListener::bind(path)?,
+					path.to_owned(),
+				))
+			}
 		}
-	}
-
-	pub async fn forward_socket(
-		&self,
-		remote_path: &Utf8Path,
-	) -> Result<oneshot::Receiver<Channel<Msg>>> {
-		let Transport::Ssh { sess, subs, .. } = &self.transport else {
-			bail!("forward_socket should not be called on local")
-		};
-		let (tx, rx) = oneshot::channel();
-		subs.lock()
-			.expect("lock")
-			.insert(remote_path.to_owned(), tx);
-		sess.streamlocal_forward(remote_path.to_owned()).await?;
-		Ok(rx)
-	}
-
-	pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {
-		let Transport::Ssh { remote_dir, .. } = &self.transport else {
-			bail!("open_shell should not be called on local")
-		};
-		let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));
-
-		let rx = self.forward_socket(&sock).await?;
-		let client: PtyClient<BifConfig> = self.endpoints();
-		let id = client
-			.open_shell(sock, term.to_owned(), cols, rows)
-			.await?
-			.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
-		let ch = rx
-			.await
-			.map_err(|_| anyhow!("agent never connected the shell socket"))?;
-
-		Ok(Shell {
-			id,
-			stream: ch.into_stream(),
-			remote: self.rpc.remote(Address::Agent),
-		})
 	}
 }
 
-pub struct Shell {
-	pub id: ShellId,
-	pub stream: ChannelStream<Msg>,
-	remote: Remote<BifConfig>,
-}
-
-impl Shell {
-	pub fn resizer(&self) -> ShellResizer {
-		ShellResizer {
-			remote: self.remote.clone(),
-			id: self.id,
+fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
+	if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {
+		if !dir.is_empty() {
+			return Ok((Utf8PathBuf::from(dir), None));
 		}
 	}
-}
-
-#[derive(Clone)]
-pub struct ShellResizer {
-	remote: Remote<BifConfig>,
-	id: ShellId,
+	let tmp = tempfile::Builder::new()
+		.prefix("remowt.")
+		.rand_bytes(12)
+		.tempdir()?;
+	let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())
+		.map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;
+	Ok((dir, Some(tmp)))
 }
 
-impl ShellResizer {
-	pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
-		PtyClient::wrap(self.remote.clone())
-			.resize(self.id, cols, rows)
-			.await?
-			.map_err(|e| anyhow!("failed to resize remote shell: {e}"))
+async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {
+	let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;
+	let dir = dir.trim();
+	if dir.is_empty() {
+		remote_mktemp(sess).await
+	} else {
+		Ok(Utf8PathBuf::from(dir))
 	}
 }
 
addedcrates/remowt-client/src/port.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-client/src/port.rs
@@ -0,0 +1,52 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use russh::{Channel, ChannelStream};
+use russh::client::Msg;
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};
+use tokio::join;
+use tracing::error;
+
+async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
+	let len = srx.read_u32().await?;
+	let mut buf = BytesMut::zeroed(len as usize);
+	srx.read_exact(&mut buf).await?;
+	Ok(buf)
+}
+async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
+	stx.write_u32(value.len().try_into().expect("can't be larger"))
+		.await?;
+	stx.write_all(&value).await?;
+	Ok(())
+}
+
+pub fn channel_port(ch: Channel<Msg>) -> Port {
+	Port::new(move |mut rx, tx| async move {
+		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
+		let srx_task = async move {
+			loop {
+				match read(&mut srx).await {
+					Ok(buf) => {
+						if tx.send(buf.freeze()).is_err() {
+							break;
+						}
+					}
+					Err(e) => {
+						error!("channel read failed: {e}");
+						break;
+					}
+				}
+			}
+		};
+		let stx_task = async move {
+			while let Some(value) = rx.recv().await {
+				if let Err(e) = write(&mut stx, value).await {
+					error!("channel write failed: {e}");
+					break;
+				}
+			}
+		};
+		join!(srx_task, stx_task);
+	})
+}
addedcrates/remowt-client/src/shell.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-client/src/shell.rs
@@ -0,0 +1,60 @@
+use anyhow::{anyhow, Result};
+use bifrostlink::declarative::RemoteEndpoints as _;
+use bifrostlink::Remote;
+use remowt_endpoints::pty::{PtyClient, ShellId};
+use remowt_link_shared::{Address, BifConfig};
+use uuid::Uuid;
+
+use crate::forwarded::RemowtStream;
+use crate::Remowt;
+
+pub struct RemowtShell {
+	pub id: ShellId,
+	pub stream: RemowtStream,
+	remote: Remote<BifConfig>,
+}
+impl RemowtShell {
+	pub fn resizer(&self) -> RemowtShellResizer {
+		RemowtShellResizer {
+			remote: self.remote.clone(),
+			id: self.id,
+		}
+	}
+}
+
+#[derive(Clone)]
+pub struct RemowtShellResizer {
+	remote: Remote<BifConfig>,
+	id: ShellId,
+}
+
+impl RemowtShellResizer {
+	pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
+		PtyClient::wrap(self.remote.clone())
+			.resize(self.id, cols, rows)
+			.await?
+			.map_err(|e| anyhow!("failed to resize remote shell: {e}"))
+	}
+}
+
+impl Remowt {
+	pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {
+		let sock = self
+			.runtime_dir()
+			.join(format!("remowt-shell-{}.sock", Uuid::new_v4()));
+
+		let forwarded = self.forward_socket(&sock).await?;
+		let client: PtyClient<BifConfig> = self.endpoints();
+		let id = client
+			.open_shell(sock, term.to_owned(), cols, rows)
+			.await?
+			.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
+		let stream = forwarded.accept().await?;
+
+		Ok(RemowtShell {
+			id,
+			stream,
+			remote: self.rpc.remote(Address::Agent),
+		})
+	}
+}
addedcrates/remowt-client/src/subprocess.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-client/src/subprocess.rs
@@ -0,0 +1,84 @@
+use bytes::Bytes;
+use russh::client::Msg;
+use russh::{Channel, ChannelMsg};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
+use tokio::sync::oneshot;
+
+const BUF: usize = 64 * 1024;
+
+pub struct RemowtChild {
+	pub stdin: DuplexStream,
+	pub stdout: DuplexStream,
+	pub stderr: DuplexStream,
+	pub exit: oneshot::Receiver<Option<u32>>,
+}
+
+impl RemowtChild {
+	/// Manage channel returned by russh exec().
+	pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
+		let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
+		let (mut out_w, stdout) = tokio::io::duplex(BUF);
+		let (mut err_w, stderr) = tokio::io::duplex(BUF);
+		let (exit_tx, exit) = oneshot::channel();
+
+		tokio::spawn(async move {
+			let (mut read, write) = ch.split();
+
+			// Forward our stdin to the channel, signalling EOF when it closes.
+			let stdin_pump = tokio::spawn(async move {
+				let mut buf = vec![0u8; BUF];
+				loop {
+					match stdin_r.read(&mut buf).await {
+						Ok(0) | Err(_) => break,
+						Ok(n) => {
+							if write
+								.data_bytes(Bytes::copy_from_slice(&buf[..n]))
+								.await
+								.is_err()
+							{
+								return;
+							}
+						}
+					}
+				}
+				let _ = write.eof().await;
+			});
+
+			let mut code = None;
+			while let Some(msg) = read.wait().await {
+				match msg {
+					ChannelMsg::Data { data } => {
+						if out_w.write_all(&data).await.is_err() {
+							break;
+						}
+					}
+					ChannelMsg::ExtendedData { data, .. } => {
+						if err_w.write_all(&data).await.is_err() {
+							break;
+						}
+					}
+					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
+					_ => {}
+				}
+			}
+
+			// The process is gone; stop waiting on stdin we'll never forward.
+			stdin_pump.abort();
+			let _ = out_w.shutdown().await;
+			let _ = err_w.shutdown().await;
+			let _ = exit_tx.send(code);
+		});
+
+		RemowtChild {
+			stdin,
+			stdout,
+			stderr,
+			exit,
+		}
+	}
+
+	/// Wait for the process to finish, returning its exit status.
+	pub async fn wait(self) -> Option<u32> {
+		self.exit.await.ok().flatten()
+	}
+}
modifiedcrates/remowt-link-shared/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-link-shared/Cargo.toml
+++ b/crates/remowt-link-shared/Cargo.toml
@@ -11,6 +11,7 @@
 serde = { workspace = true, features = ["derive"] }
 serde_json.workspace = true
 thiserror.workspace = true
-tokio = { workspace = true, features = ["fs"] }
+tokio = { workspace = true, features = ["fs", "io-util", "macros"] }
+tracing.workspace = true
 remowt-ui-prompt.workspace = true
 camino = { workspace = true, features = ["serde1"] }
modifiedcrates/remowt-link-shared/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -1,14 +1,12 @@
-use std::future::Future;
-
-use bifrostlink::declarative::endpoints;
 use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};
 use bifrostlink::notification;
 use bifrostlink::packet::OpaquePacketWrapper;
-use bifrostlink::{AddressT, Config};
+use bifrostlink::AddressT;
 use serde::de::DeserializeOwned;
 use serde::{Deserialize, Serialize};
 
 pub mod editor;
+pub mod port;
 
 #[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]
 pub enum Address {
@@ -20,26 +18,6 @@
 impl AddressT for Address {}
 
 pub mod plugin;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum ElevateError {
-	#[error("elevation failed: {0}")]
-	Failed(String),
-}
-
-pub trait Elevator: Send + Sync {
-	fn elevate(&self) -> impl Future<Output = Result<(), ElevateError>> + Send;
-}
-
-pub struct ElevateEndpoints<E>(pub E);
-
-#[endpoints(ns = 3)]
-impl<E: Elevator + 'static> ElevateEndpoints<E> {
-	#[endpoints(id = 1)]
-	async fn elevate(&self) -> Result<(), ElevateError> {
-		self.0.elevate().await
-	}
-}
 
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
addedcrates/remowt-link-shared/src/port.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-link-shared/src/port.rs
@@ -0,0 +1,54 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+
+/// Wire a length-prefixed duplex byte stream (e.g. a child process's
+/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
+/// length followed by that many payload bytes.
+pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
+where
+	R: AsyncRead + Unpin + Send + 'static,
+	W: AsyncWrite + Unpin + Send + 'static,
+{
+	Port::new(|mut rx, tx| async move {
+		let read_task = async move {
+			loop {
+				let len = match reader.read_u32().await {
+					Ok(len) => len,
+					Err(e) => {
+						tracing::error!("child read failed: {e}");
+						break;
+					}
+				};
+				let mut buf = BytesMut::zeroed(len as usize);
+				if let Err(e) = reader.read_exact(&mut buf).await {
+					tracing::error!("child read failed: {e}");
+					break;
+				}
+				if tx.send(buf.freeze()).is_err() {
+					break;
+				}
+			}
+		};
+		let write_task = async move {
+			while let Some(msg) = rx.recv().await {
+				if let Err(e) = write_frame(&mut writer, msg).await {
+					tracing::error!("child write failed: {e}");
+					break;
+				}
+			}
+		};
+		tokio::join!(read_task, write_task);
+	})
+}
+
+async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
+	let len = u32::try_from(msg.len())
+		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
+	writer.write_u32(len).await?;
+	writer.write_all(&msg).await?;
+	writer.flush().await?;
+	Ok(())
+}
modifiedcrates/remowt-plugin/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-plugin/Cargo.toml
+++ b/crates/remowt-plugin/Cargo.toml
@@ -9,7 +9,6 @@
 anyhow.workspace = true
 bifrostlink.workspace = true
 bifrostlink-ports.workspace = true
-bytes.workspace = true
 remowt-link-shared.workspace = true
 serde_json.workspace = true
 tokio = { workspace = true, features = [
modifiedcrates/remowt-plugin/src/host.rsdiffbeforeafterboth
--- a/crates/remowt-plugin/src/host.rs
+++ b/crates/remowt-plugin/src/host.rs
@@ -1,14 +1,12 @@
 use std::ffi::OsStr;
-use std::io;
 use std::process::Stdio;
 use std::sync::Mutex;
 
-use bifrostlink::{Port, Rpc, Rtt, WeakRpc};
-use bytes::{Bytes, BytesMut};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
-use tokio::process::{Child, ChildStdin, ChildStdout, Command};
+use bifrostlink::{Rpc, Rtt, WeakRpc};
+use tokio::process::{Child, Command};
 
 use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};
+use remowt_link_shared::port::child_port;
 use remowt_link_shared::{Address, BifConfig};
 
 pub fn serve(rpc: &mut Rpc<BifConfig>) {
@@ -68,46 +66,4 @@
 		}
 		self.spawn(id, path)
 	}
-}
-
-fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {
-	Port::new(|mut rx, tx| async move {
-		let reader = async move {
-			loop {
-				let len = match stdout.read_u32().await {
-					Ok(len) => len,
-					Err(e) => {
-						tracing::error!("plugin stdout read failed: {e}");
-						break;
-					}
-				};
-				let mut buf = BytesMut::zeroed(len as usize);
-				if let Err(e) = stdout.read_exact(&mut buf).await {
-					tracing::error!("plugin stdout read failed: {e}");
-					break;
-				}
-				if tx.send(buf.freeze()).is_err() {
-					break;
-				}
-			}
-		};
-		let writer = async move {
-			while let Some(msg) = rx.recv().await {
-				if let Err(e) = write_frame(&mut stdin, msg).await {
-					tracing::error!("plugin stdin write failed: {e}");
-					break;
-				}
-			}
-		};
-		tokio::join!(reader, writer);
-	})
-}
-
-async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {
-	let len = u32::try_from(msg.len())
-		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
-	stdin.write_u32(len).await?;
-	stdin.write_all(&msg).await?;
-	stdin.flush().await?;
-	Ok(())
 }