git.delta.rocks / remowt / refs/commits / 600e6edc0e86

difftreelog

refactor merge well-known endpoints into a single crate

ymvmzxwvYaroslav Bolyukin2026-06-12parent: #875f55a.patch.diff
in: trunk

36 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2065,10 +2065,10 @@
  "futures-util",
  "nix",
  "rand 0.10.1",
+ "remowt-endpoints",
  "remowt-link-shared",
  "remowt-plugin",
  "remowt-polkit-shared",
- "remowt-pty",
  "remowt-ui-prompt",
  "serde",
  "tempfile",
@@ -2090,6 +2090,7 @@
  "bifrostlink-ports",
  "bytes",
  "camino",
+ "remowt-endpoints",
  "remowt-link-shared",
  "russh",
  "russh-config",
@@ -2101,16 +2102,21 @@
 ]
 
 [[package]]
-name = "remowt-fs"
+name = "remowt-endpoints"
 version = "0.1.1"
 dependencies = [
+ "anyhow",
  "bifrostlink",
  "bifrostlink-macros",
  "camino",
+ "nix",
  "serde",
  "tempfile",
  "thiserror",
  "tokio",
+ "tracing",
+ "uuid",
+ "zbus",
 ]
 
 [[package]]
@@ -2120,30 +2126,11 @@
  "bifrostlink",
  "bytes",
  "camino",
- "remowt-fs",
- "remowt-pty",
- "remowt-systemd",
  "remowt-ui-prompt",
  "serde",
  "serde_json",
- "thiserror",
- "tokio",
-]
-
-[[package]]
-name = "remowt-nix-daemon"
-version = "0.1.1"
-dependencies = [
- "anyhow",
- "bifrostlink",
- "bifrostlink-macros",
- "camino",
- "remowt-client",
- "serde",
  "thiserror",
  "tokio",
- "tracing",
- "uuid",
 ]
 
 [[package]]
@@ -2171,20 +2158,6 @@
 ]
 
 [[package]]
-name = "remowt-pty"
-version = "0.1.1"
-dependencies = [
- "bifrostlink",
- "bifrostlink-macros",
- "camino",
- "nix",
- "serde",
- "thiserror",
- "tokio",
- "tracing",
-]
-
-[[package]]
 name = "remowt-ssh"
 version = "0.1.1"
 dependencies = [
@@ -2210,17 +2183,6 @@
  "tracing",
  "tracing-subscriber",
  "uuid",
-]
-
-[[package]]
-name = "remowt-systemd"
-version = "0.1.1"
-dependencies = [
- "bifrostlink",
- "bifrostlink-macros",
- "serde",
- "thiserror",
- "zbus",
 ]
 
 [[package]]
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,14 +9,12 @@
 repository = "https://gitlab.delta.directory/iam/remowt"
 
 [workspace.dependencies]
-remowt-fs = { version = "0.1.1", path = "crates/remowt-fs" }
-remowt-pty = { version = "0.1.1", path = "crates/remowt-pty" }
-remowt-systemd = { version = "0.1.1", path = "crates/remowt-systemd" }
 remowt-client = { version = "0.1.1", path = "crates/remowt-client" }
 remowt-polkit-shared = { version = "0.1.1", path = "crates/polkit-shared" }
 remowt-link-shared = { version = "0.1.1", path = "crates/remowt-link-shared" }
 remowt-plugin = { version = "0.1.1", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.1", path = "crates/ui-prompt" }
+remowt-ui-prompt = { version = "0.1.1", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.1", path = "crates/remowt-endpoints" }
 
 bifrostlink = "0.2.0"
 bifrostlink-macros = "0.2.0"
modifiedcmds/polkit-dbus-helper/src/main.rsdiffbeforeafterboth
--- a/cmds/polkit-dbus-helper/src/main.rs
+++ b/cmds/polkit-dbus-helper/src/main.rs
@@ -8,10 +8,10 @@
 use nix::unistd::{setuid, Uid, User};
 use pam_client::{Context, ConversationHandler, ErrorCode, Flag};
 use remowt_polkit_shared::BackendRequest;
+use remowt_ui_prompt::dbus::DbusPrompterProxyBlocking;
+use remowt_ui_prompt::BlockingPrompter;
 use tokio::task::{block_in_place, spawn_blocking};
 use tracing::trace;
-use remowt_ui_prompt::dbus::DbusPrompterProxyBlocking;
-use remowt_ui_prompt::BlockingPrompter;
 use zbus::fdo;
 use zbus::message::Header;
 use zbus::zvariant::OwnedValue;
modifiedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth
--- a/cmds/remowt-agent/Cargo.toml
+++ b/cmds/remowt-agent/Cargo.toml
@@ -17,7 +17,6 @@
 rand.workspace = true
 remowt-link-shared.workspace = true
 remowt-plugin.workspace = true
-remowt-pty.workspace = true
 serde = { workspace = true, features = ["derive"] }
 tempfile.workspace = true
 tokio = { workspace = true, features = [
@@ -36,3 +35,4 @@
 uuid = { workspace = true, features = ["v4"] }
 zbus = { workspace = true, features = ["tokio"] }
 zbus_polkit = { workspace = true, features = ["tokio"] }
+remowt-endpoints.workspace = true
modifiedcmds/remowt-agent/src/helper/protocol.rsdiffbeforeafterboth
--- a/cmds/remowt-agent/src/helper/protocol.rs
+++ b/cmds/remowt-agent/src/helper/protocol.rs
@@ -3,10 +3,10 @@
 use anyhow::bail;
 use futures::stream::Peekable;
 use futures::StreamExt as _;
+use remowt_ui_prompt::Prompter;
 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt as _};
 use tokio::select;
 use tokio_util::codec::{FramedRead, LinesCodec};
-use remowt_ui_prompt::Prompter;
 
 pub async fn run_conversation<R, W, P>(reader: R, mut writer: W, prompt: P) -> anyhow::Result<()>
 where
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -11,8 +11,8 @@
 use bifrostlink_ports::stdio::from_stdio;
 use bifrostlink_ports::unix_socket::from_socket;
 use clap::Parser;
-use remowt_link_shared::editor::EditorEndpointsClient;
-use remowt_link_shared::{Address, BifConfig, Fs, Pty, Systemd};
+use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};
+use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};
 use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};
 use remowt_ui_prompt::bifrost::PromptEndpointsClient;
 use remowt_ui_prompt::rofi::RofiPrompter;
modifiedcmds/remowt-ssh/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -13,13 +13,13 @@
 use remowt_client::editor::SshEditor;
 use remowt_client::{AgentBundle, Remowt};
 use remowt_link_shared::editor::serve_editor;
+use remowt_ui_prompt::bifrost::serve_prompts;
+use remowt_ui_prompt::rofi::RofiPrompter;
+use remowt_ui_prompt::{PrependSourcePrompter, Source};
 use tokio::io::unix::AsyncFd;
 use tokio::io::{AsyncRead, ReadBuf};
 use tokio::signal::unix::{signal, SignalKind};
 use tracing::info;
-use remowt_ui_prompt::bifrost::serve_prompts;
-use remowt_ui_prompt::rofi::RofiPrompter;
-use remowt_ui_prompt::{PrependSourcePrompter, Source};
 
 #[derive(Parser)]
 struct Opts {
modifiedcrates/remowt-client/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -19,3 +19,4 @@
 tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }
 tracing.workspace = true
 uuid = { workspace = true, features = ["v4"] }
+remowt-endpoints.workspace = true
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
before · crates/remowt-client/src/lib.rs
1use std::collections::HashMap;2use std::io;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_link_shared::plugin::PluginEndpointsClient;13use remowt_link_shared::{14	Address, BifConfig, ElevateEndpoints, ElevateError, Elevator, Fs, Pty, PtyClient, ShellId,15	Systemd,16};17use russh::client::{connect, Config, Handle, Handler, Msg, Session};18use russh::keys::agent::client::AgentClient;19use russh::keys::agent::AgentIdentity;20use russh::keys::check_known_hosts;21use russh::keys::ssh_key::PublicKey;22use russh::{Channel, ChannelMsg, ChannelStream};23use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};24use tokio::join;25use tokio::net::UnixListener;26use tokio::sync::mpsc;27use tokio::sync::oneshot::{self, channel};28use tracing::error;29use uuid::Uuid;3031pub mod editor;3233type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3435async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {36	let len = srx.read_u32().await?;37	let mut buf = BytesMut::zeroed(len as usize);38	srx.read_exact(&mut buf).await?;39	Ok(buf)40}41async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {42	stx.write_u32(value.len().try_into().expect("can't be larger"))43		.await?;44	stx.write_all(&value).await?;45	Ok(())46}4748fn sh_quote(s: impl AsRef<str>) -> String {49	format!("'{}'", s.as_ref().replace('\'', "'\\''"))50}5152const ESCALATORS: [(&str, &[&str]); 3] = [53	("run0", &["--background=", "--pipe"]),54	("sudo", &[]),55	("doas", &[]),56];5758pub struct AgentBundle {59	dir: PathBuf,60	hashes: HashMap<String, String>,61}6263impl AgentBundle {64	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {65		let dir = dir.into();66		let hashes_path = dir.join("hashes");67		let raw = std::fs::read_to_string(&hashes_path)68			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;69		let mut hashes = HashMap::new();70		for line in raw.lines() {71			let line = line.trim();72			if line.is_empty() {73				continue;74			}75			let (arch, hash) = line76				.split_once(char::is_whitespace)77				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;78			hashes.insert(arch.to_owned(), hash.trim().to_owned());79		}80		ensure!(81			!hashes.is_empty(),82			"agent bundle {} has no hashes",83			dir.display()84		);85		Ok(Self { dir, hashes })86	}8788	fn binary(&self, arch: &str) -> PathBuf {89		self.dir.join(format!("remowt-agent-{arch}"))90	}91}9293async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {94	let mut ch = sess.channel_open_session().await?;95	ch.exec(true, cmd).await?;96	let mut out = Vec::new();97	let mut code = None;98	while let Some(msg) = ch.wait().await {99		match msg {100			ChannelMsg::Data { data } => out.extend(data.as_ref()),101			ChannelMsg::ExtendedData { data, .. } => {102				error!(103					"remote stderr: {}",104					String::from_utf8_lossy(data.as_ref()).trim()105				);106			}107			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),108			_ => {}109		}110	}111	Ok((code, out))112}113114async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {115	let (code, mut out) = run(sess, cmd).await?;116	ensure!(117		code == Some(0),118		"remote command failed (exit {code:?}): {cmd}"119	);120	ensure!(out.ends_with(b"\n"));121	out.pop();122	String::from_utf8(out).context("expected utf8 output for command")123}124125async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {126	let arch = run_string_ok(sess, "uname -m").await?;127	let hash = bundle128		.hashes129		.get(&arch)130		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;131132	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")133		.await?134		.trim()135		.to_owned();136	let dir = if cache.is_empty() {137		let home = run_string_ok(sess, "echo \"$HOME\"").await?;138		ensure!(139			!home.is_empty(),140			"remote $HOME and $XDG_CACHE_HOME both empty"141		);142		Utf8PathBuf::from(home).join("cache/remowt")143	} else {144		Utf8PathBuf::from(cache).join("remowt")145	};146	let path = dir.join(hash);147148	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;149	if present != Some(0) {150		let bin = bundle.binary(&arch);151		let bytes = std::fs::read(&bin)152			.with_context(|| format!("reading agent binary {}", bin.display()))?;153		upload_agent(sess, &dir, &path, bytes).await?;154	}155	Ok(path)156}157158async fn upload_agent(159	sess: &Handle<SshHandler>,160	dir: &Utf8Path,161	path: &Utf8Path,162	bytes: Vec<u8>,163) -> Result<()> {164	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;165166	let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));167	let ch = sess.channel_open_session().await?;168	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;169	ch.data_bytes(bytes).await?;170	ch.eof().await?;171	let mut ch = ch;172	let mut code = None;173	while let Some(msg) = ch.wait().await {174		match msg {175			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),176			ChannelMsg::ExtendedData { data, .. } => {177				error!(178					"agent upload: {}",179					String::from_utf8_lossy(data.as_ref()).trim()180				);181			}182			_ => {}183		}184	}185	ensure!(code == Some(0), "agent upload failed (exit {code:?})");186187	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;188	run_string_ok(189		sess,190		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),191	)192	.await?;193	Ok(())194}195196async fn detect_escalation(197	sess: &Handle<SshHandler>,198) -> Result<(&'static str, &'static [&'static str])> {199	for (tool, flags) in ESCALATORS {200		// `tool` is a fixed identifier (no metacharacters), safe to interpolate.201		let (code, _) = run(sess, &format!("command -v {tool}")).await?;202		if code == Some(0) {203			return Ok((tool, flags));204		}205	}206	bail!("no escalation tool (run0/sudo/doas) found on remote")207}208209fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {210	let mut parts = vec![tool.to_owned()];211	parts.extend(flags.iter().map(|f| f.to_string()));212	parts.push(sh_quote(agent_path));213	parts.push("real-agent".to_owned());214	parts.push("--privileged".to_owned());215	if let Some(p) = path {216		parts.push("--path".to_owned());217		parts.push(sh_quote(p));218	}219	parts.join(" ")220}221222fn find_in_path(name: &str) -> Option<std::path::PathBuf> {223	let path = std::env::var_os("PATH")?;224	std::env::split_paths(&path)225		.map(|dir| dir.join(name))226		.find(|p| p.is_file())227}228229fn port_from_channel(ch: Channel<Msg>) -> Port {230	Port::new(move |mut rx, tx| async move {231		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());232		let srx_task = async move {233			loop {234				match read(&mut srx).await {235					Ok(buf) => {236						if tx.send(buf.freeze()).is_err() {237							break;238						}239					}240					Err(e) => {241						error!("channel read failed: {e}");242						break;243					}244				}245			}246		};247		let stx_task = async move {248			while let Some(value) = rx.recv().await {249				if let Err(e) = write(&mut stx, value).await {250					error!("channel write failed: {e}");251					break;252				}253			}254		};255		join!(srx_task, stx_task);256	})257}258259pub struct SshHandler {260	host: String,261	port: u16,262	subs: Subs,263}264impl Handler for SshHandler {265	type Error = russh::Error;266	async fn check_server_key(267		&mut self,268		server_public_key: &PublicKey,269	) -> Result<bool, Self::Error> {270		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)271	}272	async fn server_channel_open_forwarded_streamlocal(273		&mut self,274		channel: Channel<Msg>,275		socket_path: &str,276		_session: &mut Session,277	) -> Result<(), Self::Error> {278		let Some(ch) = self279			.subs280			.lock()281			.expect("lock")282			.remove(&Utf8PathBuf::from(socket_path))283		else {284			return Err(russh::Error::WrongChannel);285		};286		let _ = ch.send(channel);287		Ok(())288	}289}290291struct SshElevator {292	sess: Arc<Handle<SshHandler>>,293	rpc: WeakRpc<BifConfig>,294	agent_path: Utf8PathBuf,295}296impl Elevator for SshElevator {297	async fn elevate(&self) -> Result<(), ElevateError> {298		let fail = |e: String| ElevateError::Failed(e);299		let (tool, flags) = detect_escalation(&self.sess)300			.await301			.map_err(|e| fail(e.to_string()))?;302		let ch = self303			.sess304			.channel_open_session()305			.await306			.map_err(|e| fail(e.to_string()))?;307		ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))308			.await309			.map_err(|e| fail(e.to_string()))?;310		let rpc = self311			.rpc312			.clone()313			.upgrade()314			.ok_or_else(|| fail("rpc is gone".to_owned()))?;315		rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));316		Ok(())317	}318}319320pub struct RemoteChild {321	pub stdout: DuplexStream,322	pub stderr: DuplexStream,323	pub exit: oneshot::Receiver<Option<u32>>,324}325326enum Transport {327	Ssh {328		sess: Arc<Handle<SshHandler>>,329		subs: Subs,330		remote_dir: Utf8PathBuf,331		agent_path: Utf8PathBuf,332	},333	Local {334		#[allow(dead_code)]335		agent: Rpc<BifConfig>,336		agent_path: String,337	},338}339340pub struct Remowt {341	transport: Transport,342	rpc: Rpc<BifConfig>,343	elevated: tokio::sync::OnceCell<()>,344	children: Mutex<Vec<tokio::process::Child>>,345}346347pub type RemowtRemote = Remote<BifConfig>;348349fn loopback() -> (Port, Port) {350	let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();351	let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();352	let user = Port::new(move |mut rx, tx| async move {353		loop {354			tokio::select! {355				msg = rx.recv() => match msg {356					Some(msg) => if a2b_tx.send(msg).is_err() { break },357					None => break,358				},359				msg = b2a_rx.recv() => match msg {360					Some(msg) => if tx.send(msg).is_err() { break },361					None => break,362				},363			}364		}365	});366	let agent = Port::new(move |mut rx, tx| async move {367		loop {368			tokio::select! {369				msg = rx.recv() => match msg {370					Some(msg) => if b2a_tx.send(msg).is_err() { break },371					None => break,372				},373				msg = a2b_rx.recv() => match msg {374					Some(msg) => if tx.send(msg).is_err() { break },375					None => break,376				},377			}378		}379	});380	(user, agent)381}382383impl Remowt {384	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {385		let conf = russh_config::parse_home(host)?;386		let port = conf.host_config.port.unwrap_or(22);387		let hostname = conf388			.host_config389			.hostname390			.clone()391			.unwrap_or_else(|| conf.host_name.clone());392		let user = conf393			.user394			.clone()395			.unwrap_or_else(|| std::env::var("USER").unwrap_or_else(|_| "root".to_owned()));396397		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));398		let mut sess = connect(399			Arc::new(Config::default()),400			(hostname.clone(), port),401			SshHandler {402				host: hostname,403				port,404				subs: subs.clone(),405			},406		)407		.await?;408409		let mut agent = AgentClient::connect_env().await?;410		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();411		let mut authenticated = false;412		for ident in agent.request_identities().await? {413			let AgentIdentity::PublicKey { key, .. } = ident else {414				continue;415			};416			if sess417				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)418				.await?419				.success()420			{421				authenticated = true;422				break;423			}424		}425		ensure!(authenticated, "ssh authentication failed");426427		// All remaining session ops take `&self`; share the handle.428		let sess = Arc::new(sess);429430		let agent_path = deploy_agent(&sess, bundle).await?;431432		let remote_dir = remote_mktemp(&sess).await?;433		let primary = remote_dir.join("primary.sock");434435		let (onetx, onerx) = channel();436		subs.lock().expect("lock").insert(primary.clone(), onetx);437		sess.streamlocal_forward(primary.clone()).await?;438439		let rpc = Rpc::<BifConfig>::new(Address::User);440441		// TODO: ensure no injection is possible in the socket path.442		let cmd_chan = sess.channel_open_session().await?;443		cmd_chan444			.exec(445				true,446				format!(447					"{} real-agent --path={}",448					sh_quote(&agent_path),449					sh_quote(&primary)450				),451			)452			.await?;453454		let port = port_from_channel(455			onerx456				.await457				.map_err(|_| anyhow!("agent never opened its channel"))?,458		);459		rpc.add_direct(Address::Agent, port, Rtt(0));460461		Ok(Self {462			transport: Transport::Ssh {463				sess,464				subs,465				remote_dir,466				agent_path,467			},468			rpc,469			elevated: tokio::sync::OnceCell::new(),470			children: Mutex::new(Vec::new()),471		})472	}473474	pub async fn connect_local(agent_path: &str) -> Result<Self> {475		let (port_user, port_agent) = loopback();476		let rpc = Rpc::<BifConfig>::new(Address::User);477		let mut agent = Rpc::<BifConfig>::new(Address::Agent);478479		// Register handlers before wiring up the link (see the agent binary).480		Fs::new().register_endpoints(&mut agent);481		Systemd.register_endpoints(&mut agent);482		Pty::new().register_endpoints(&mut agent);483484		agent.add_direct(Address::User, port_agent, Rtt(0));485		rpc.add_direct(Address::Agent, port_user, Rtt(0));486487		Ok(Self {488			transport: Transport::Local {489				agent,490				agent_path: agent_path.to_owned(),491			},492			rpc,493			elevated: tokio::sync::OnceCell::new(),494			children: Mutex::new(Vec::new()),495		})496	}497498	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {499		match &self.transport {500			Transport::Ssh { sess, .. } => Some(sess.clone()),501			Transport::Local { .. } => None,502		}503	}504505	pub fn rpc(&self) -> Rpc<BifConfig> {506		self.rpc.clone()507	}508509	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {510		R::wrap(self.rpc.remote(Address::Agent))511	}512513	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {514		let client: PluginEndpointsClient<BifConfig> = self.endpoints();515		client516			.load_plugin(id, name.to_owned())517			.await?518			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))519	}520	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {521		self.ensure_elevated().await?;522		let client: PluginEndpointsClient<BifConfig> =523			PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));524		client525			.load_plugin_path(id, path.to_owned())526			.await?527			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))528	}529	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {530		R::wrap(self.rpc.remote(Address::Plugin(id)))531	}532	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {533		self.ensure_elevated().await?;534		Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))535	}536537	async fn ensure_elevated(&self) -> Result<()> {538		self.elevated539			.get_or_try_init(|| async {540				let port = match &self.transport {541					Transport::Ssh {542						sess, agent_path, ..543					} => {544						let (tool, flags) = detect_escalation(sess).await?;545						let ch = sess.channel_open_session().await?;546						ch.exec(true, privileged_cmd(tool, flags, agent_path, None))547							.await?;548						port_from_channel(ch)549					}550					Transport::Local { agent_path, .. } => {551						let sock = std::env::temp_dir()552							.join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));553						let _ = std::fs::remove_file(&sock);554						let listener = UnixListener::bind(&sock)?;555						let (tool, flags) = ESCALATORS556							.iter()557							.find(|(t, _)| find_in_path(t).is_some())558							.ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;559						let child = tokio::process::Command::new(tool)560							.args(*flags)561							.arg(agent_path)562							.arg("real-agent")563							.arg("--privileged")564							.arg("--path")565							.arg(sock.to_str().expect("temp path is utf-8"))566							.kill_on_drop(true)567							.spawn()?;568						self.children.lock().expect("lock").push(child);569						let (stream, _) = listener.accept().await?;570						let _ = std::fs::remove_file(&sock);571						from_socket(stream)572					}573				};574				self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));575				anyhow::Ok(())576			})577			.await?;578		Ok(())579	}580581	pub async fn exec(&self, command: String) -> Result<RemoteChild> {582		let Some(sess) = self.ssh() else {583			bail!("exec should not be called on local")584		};585		let ch = sess.channel_open_session().await?;586		ch.exec(true, command).await?;587588		let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);589		let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);590		let (exit_tx, exit) = oneshot::channel();591592		tokio::spawn(async move {593			let mut ch = ch;594			let mut code = None;595			while let Some(msg) = ch.wait().await {596				match msg {597					ChannelMsg::Data { data } => {598						if out_w.write_all(&data).await.is_err() {599							break;600						}601					}602					ChannelMsg::ExtendedData { data, .. } => {603						if err_w.write_all(&data).await.is_err() {604							break;605						}606					}607					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),608					_ => {}609				}610			}611			let _ = out_w.shutdown().await;612			let _ = err_w.shutdown().await;613			let _ = exit_tx.send(code);614		});615616		Ok(RemoteChild {617			stdout,618			stderr,619			exit,620		})621	}622623	pub fn serve_elevate(&self) -> Result<()> {624		let Transport::Ssh {625			sess, agent_path, ..626		} = &self.transport627		else {628			bail!("elevate should not be called on local")629		};630		let mut rpc = self.rpc.clone();631		ElevateEndpoints(SshElevator {632			sess: sess.clone(),633			rpc: self.rpc.clone().downgrade(),634			agent_path: agent_path.to_owned(),635		})636		.register_endpoints(&mut rpc);637		Ok(())638	}639640	pub fn remote_dir(&self) -> Option<&Utf8Path> {641		match &self.transport {642			Transport::Ssh { remote_dir, .. } => Some(remote_dir),643			Transport::Local { .. } => None,644		}645	}646647	pub async fn forward_socket(648		&self,649		remote_path: &Utf8Path,650	) -> Result<oneshot::Receiver<Channel<Msg>>> {651		let Transport::Ssh { sess, subs, .. } = &self.transport else {652			bail!("forward_socket should not be called on local")653		};654		let (tx, rx) = oneshot::channel();655		subs.lock()656			.expect("lock")657			.insert(remote_path.to_owned(), tx);658		sess.streamlocal_forward(remote_path.to_owned()).await?;659		Ok(rx)660	}661662	pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {663		let Transport::Ssh { remote_dir, .. } = &self.transport else {664			bail!("open_shell should not be called on local")665		};666		let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));667668		let rx = self.forward_socket(&sock).await?;669		let client: PtyClient<BifConfig> = self.endpoints();670		let id = client671			.open_shell(sock, term.to_owned(), cols, rows)672			.await?673			.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;674		let ch = rx675			.await676			.map_err(|_| anyhow!("agent never connected the shell socket"))?;677678		Ok(Shell {679			id,680			stream: ch.into_stream(),681			remote: self.rpc.remote(Address::Agent),682		})683	}684}685686pub struct Shell {687	pub id: ShellId,688	pub stream: ChannelStream<Msg>,689	remote: Remote<BifConfig>,690}691692impl Shell {693	pub fn resizer(&self) -> ShellResizer {694		ShellResizer {695			remote: self.remote.clone(),696			id: self.id,697		}698	}699}700701#[derive(Clone)]702pub struct ShellResizer {703	remote: Remote<BifConfig>,704	id: ShellId,705}706707impl ShellResizer {708	pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {709		PtyClient::wrap(self.remote.clone())710			.resize(self.id, cols, rows)711			.await?712			.map_err(|e| anyhow!("failed to resize remote shell: {e}"))713	}714}715716async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {717	let mut cmd_chan = sess.channel_open_session().await?;718	cmd_chan719		.exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")720		.await?;721	let mut stdout = vec![];722	loop {723		let Some(msg) = cmd_chan.wait().await else {724			bail!("unexpected channel end");725		};726		match msg {727			russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),728			russh::ChannelMsg::ExitStatus { exit_status } => {729				if exit_status != 0 {730					bail!("mktemp failed");731				}732				break;733			}734			_ => {}735		}736	}737	ensure!(stdout.ends_with(b"\n"));738	stdout.pop();739	Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))740}
after · crates/remowt-client/src/lib.rs
1use std::collections::HashMap;2use std::path::PathBuf;3use std::sync::{Arc, Mutex};4use std::{env, io};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_endpoints::{13	fs::Fs,14	pty::{Pty, PtyClient, ShellId},15	systemd::Systemd,16};17use remowt_link_shared::plugin::PluginEndpointsClient;18use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator};19use russh::client::{connect, Config, Handle, Handler, Msg, Session};20use russh::keys::agent::client::AgentClient;21use russh::keys::agent::AgentIdentity;22use russh::keys::check_known_hosts;23use russh::keys::ssh_key::PublicKey;24use russh::{Channel, ChannelMsg, ChannelStream};25use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};26use tokio::join;27use tokio::net::UnixListener;28use tokio::sync::mpsc;29use tokio::sync::oneshot::{self, channel};30use tracing::error;31use uuid::Uuid;3233pub mod editor;3435type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3637async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {38	let len = srx.read_u32().await?;39	let mut buf = BytesMut::zeroed(len as usize);40	srx.read_exact(&mut buf).await?;41	Ok(buf)42}43async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {44	stx.write_u32(value.len().try_into().expect("can't be larger"))45		.await?;46	stx.write_all(&value).await?;47	Ok(())48}4950fn sh_quote(s: impl AsRef<str>) -> String {51	format!("'{}'", s.as_ref().replace('\'', "'\\''"))52}5354const ESCALATORS: [(&str, &[&str]); 3] = [55	("run0", &["--background=", "--pipe"]),56	("sudo", &[]),57	("doas", &[]),58];5960pub struct AgentBundle {61	dir: PathBuf,62	hashes: HashMap<String, String>,63}6465impl AgentBundle {66	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {67		let dir = dir.into();68		let hashes_path = dir.join("hashes");69		let raw = std::fs::read_to_string(&hashes_path)70			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;71		let mut hashes = HashMap::new();72		for line in raw.lines() {73			let line = line.trim();74			if line.is_empty() {75				continue;76			}77			let (arch, hash) = line78				.split_once(char::is_whitespace)79				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;80			hashes.insert(arch.to_owned(), hash.trim().to_owned());81		}82		ensure!(83			!hashes.is_empty(),84			"agent bundle {} has no hashes",85			dir.display()86		);87		Ok(Self { dir, hashes })88	}8990	fn binary(&self, arch: &str) -> PathBuf {91		self.dir.join(format!("remowt-agent-{arch}"))92	}93}9495async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {96	let mut ch = sess.channel_open_session().await?;97	ch.exec(true, cmd).await?;98	let mut out = Vec::new();99	let mut code = None;100	while let Some(msg) = ch.wait().await {101		match msg {102			ChannelMsg::Data { data } => out.extend(data.as_ref()),103			ChannelMsg::ExtendedData { data, .. } => {104				error!(105					"remote stderr: {}",106					String::from_utf8_lossy(data.as_ref()).trim()107				);108			}109			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),110			_ => {}111		}112	}113	Ok((code, out))114}115116async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {117	let (code, mut out) = run(sess, cmd).await?;118	ensure!(119		code == Some(0),120		"remote command failed (exit {code:?}): {cmd}"121	);122	ensure!(out.ends_with(b"\n"));123	out.pop();124	String::from_utf8(out).context("expected utf8 output for command")125}126127async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {128	let arch = run_string_ok(sess, "uname -m").await?;129	let hash = bundle130		.hashes131		.get(&arch)132		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133134	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")135		.await?136		.trim()137		.to_owned();138	let dir = if cache.is_empty() {139		let home = run_string_ok(sess, "echo \"$HOME\"").await?;140		ensure!(141			!home.is_empty(),142			"remote $HOME and $XDG_CACHE_HOME both empty"143		);144		Utf8PathBuf::from(home).join("cache/remowt")145	} else {146		Utf8PathBuf::from(cache).join("remowt")147	};148	let path = dir.join(hash);149150	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;151	if present != Some(0) {152		let bin = bundle.binary(&arch);153		let bytes = std::fs::read(&bin)154			.with_context(|| format!("reading agent binary {}", bin.display()))?;155		upload_agent(sess, &dir, &path, bytes).await?;156	}157	Ok(path)158}159160async fn upload_agent(161	sess: &Handle<SshHandler>,162	dir: &Utf8Path,163	path: &Utf8Path,164	bytes: Vec<u8>,165) -> Result<()> {166	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;167168	let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));169	let ch = sess.channel_open_session().await?;170	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;171	ch.data_bytes(bytes).await?;172	ch.eof().await?;173	let mut ch = ch;174	let mut code = None;175	while let Some(msg) = ch.wait().await {176		match msg {177			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),178			ChannelMsg::ExtendedData { data, .. } => {179				error!(180					"agent upload: {}",181					String::from_utf8_lossy(data.as_ref()).trim()182				);183			}184			_ => {}185		}186	}187	ensure!(code == Some(0), "agent upload failed (exit {code:?})");188189	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;190	run_string_ok(191		sess,192		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),193	)194	.await?;195	Ok(())196}197198async fn detect_escalation(199	sess: &Handle<SshHandler>,200) -> Result<(&'static str, &'static [&'static str])> {201	for (tool, flags) in ESCALATORS {202		// `tool` is a fixed identifier (no metacharacters), safe to interpolate.203		let (code, _) = run(sess, &format!("command -v {tool}")).await?;204		if code == Some(0) {205			return Ok((tool, flags));206		}207	}208	bail!("no escalation tool (run0/sudo/doas) found on remote")209}210211fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {212	let mut parts = vec![tool.to_owned()];213	parts.extend(flags.iter().map(|f| f.to_string()));214	parts.push(sh_quote(agent_path));215	parts.push("real-agent".to_owned());216	parts.push("--privileged".to_owned());217	if let Some(p) = path {218		parts.push("--path".to_owned());219		parts.push(sh_quote(p));220	}221	parts.join(" ")222}223224fn find_in_path(name: &str) -> Option<std::path::PathBuf> {225	let path = env::var_os("PATH")?;226	env::split_paths(&path)227		.map(|dir| dir.join(name))228		.find(|p| p.is_file())229}230231fn port_from_channel(ch: Channel<Msg>) -> Port {232	Port::new(move |mut rx, tx| async move {233		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());234		let srx_task = async move {235			loop {236				match read(&mut srx).await {237					Ok(buf) => {238						if tx.send(buf.freeze()).is_err() {239							break;240						}241					}242					Err(e) => {243						error!("channel read failed: {e}");244						break;245					}246				}247			}248		};249		let stx_task = async move {250			while let Some(value) = rx.recv().await {251				if let Err(e) = write(&mut stx, value).await {252					error!("channel write failed: {e}");253					break;254				}255			}256		};257		join!(srx_task, stx_task);258	})259}260261pub struct SshHandler {262	host: String,263	port: u16,264	subs: Subs,265}266impl Handler for SshHandler {267	type Error = russh::Error;268	async fn check_server_key(269		&mut self,270		server_public_key: &PublicKey,271	) -> Result<bool, Self::Error> {272		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)273	}274	async fn server_channel_open_forwarded_streamlocal(275		&mut self,276		channel: Channel<Msg>,277		socket_path: &str,278		_session: &mut Session,279	) -> Result<(), Self::Error> {280		let Some(ch) = self281			.subs282			.lock()283			.expect("lock")284			.remove(&Utf8PathBuf::from(socket_path))285		else {286			return Err(russh::Error::WrongChannel);287		};288		let _ = ch.send(channel);289		Ok(())290	}291}292293struct SshElevator {294	sess: Arc<Handle<SshHandler>>,295	rpc: WeakRpc<BifConfig>,296	agent_path: Utf8PathBuf,297}298impl Elevator for SshElevator {299	async fn elevate(&self) -> Result<(), ElevateError> {300		let fail = |e: String| ElevateError::Failed(e);301		let (tool, flags) = detect_escalation(&self.sess)302			.await303			.map_err(|e| fail(e.to_string()))?;304		let ch = self305			.sess306			.channel_open_session()307			.await308			.map_err(|e| fail(e.to_string()))?;309		ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))310			.await311			.map_err(|e| fail(e.to_string()))?;312		let rpc = self313			.rpc314			.clone()315			.upgrade()316			.ok_or_else(|| fail("rpc is gone".to_owned()))?;317		rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));318		Ok(())319	}320}321322pub struct RemoteChild {323	pub stdout: DuplexStream,324	pub stderr: DuplexStream,325	pub exit: oneshot::Receiver<Option<u32>>,326}327328enum Transport {329	Ssh {330		sess: Arc<Handle<SshHandler>>,331		subs: Subs,332		remote_dir: Utf8PathBuf,333		agent_path: Utf8PathBuf,334	},335	Local {336		#[allow(dead_code)]337		agent: Rpc<BifConfig>,338		agent_path: String,339	},340}341342pub struct Remowt {343	transport: Transport,344	rpc: Rpc<BifConfig>,345	elevated: tokio::sync::OnceCell<()>,346	children: Mutex<Vec<tokio::process::Child>>,347}348349pub type RemowtRemote = Remote<BifConfig>;350351fn loopback() -> (Port, Port) {352	let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();353	let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();354	let user = Port::new(move |mut rx, tx| async move {355		loop {356			tokio::select! {357				msg = rx.recv() => match msg {358					Some(msg) => if a2b_tx.send(msg).is_err() { break },359					None => break,360				},361				msg = b2a_rx.recv() => match msg {362					Some(msg) => if tx.send(msg).is_err() { break },363					None => break,364				},365			}366		}367	});368	let agent = Port::new(move |mut rx, tx| async move {369		loop {370			tokio::select! {371				msg = rx.recv() => match msg {372					Some(msg) => if b2a_tx.send(msg).is_err() { break },373					None => break,374				},375				msg = a2b_rx.recv() => match msg {376					Some(msg) => if tx.send(msg).is_err() { break },377					None => break,378				},379			}380		}381	});382	(user, agent)383}384385impl Remowt {386	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {387		let conf = russh_config::parse_home(host)?;388		let port = conf.host_config.port.or(conf.port).unwrap_or(22);389		let hostname = conf390			.host_config391			.hostname392			.clone()393			.unwrap_or_else(|| conf.host_name.clone());394		let user = conf395			.user396			.clone()397			.unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));398399		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));400		let mut sess = connect(401			Arc::new(Config::default()),402			(hostname.clone(), port),403			SshHandler {404				host: hostname,405				port,406				subs: subs.clone(),407			},408		)409		.await?;410411		let mut agent = AgentClient::connect_env().await?;412		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();413		let mut authenticated = false;414		for ident in agent.request_identities().await? {415			let AgentIdentity::PublicKey { key, .. } = ident else {416				continue;417			};418			if sess419				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)420				.await?421				.success()422			{423				authenticated = true;424				break;425			}426		}427		ensure!(authenticated, "ssh authentication failed");428429		// All remaining session ops take `&self`; share the handle.430		let sess = Arc::new(sess);431432		let agent_path = deploy_agent(&sess, bundle).await?;433434		let remote_dir = remote_mktemp(&sess).await?;435		let primary = remote_dir.join("primary.sock");436437		let (onetx, onerx) = channel();438		subs.lock().expect("lock").insert(primary.clone(), onetx);439		sess.streamlocal_forward(primary.clone()).await?;440441		let rpc = Rpc::<BifConfig>::new(Address::User);442443		// TODO: ensure no injection is possible in the socket path.444		let cmd_chan = sess.channel_open_session().await?;445		cmd_chan446			.exec(447				true,448				format!(449					"{} real-agent --path={}",450					sh_quote(&agent_path),451					sh_quote(&primary)452				),453			)454			.await?;455456		let port = port_from_channel(457			onerx458				.await459				.map_err(|_| anyhow!("agent never opened its channel"))?,460		);461		rpc.add_direct(Address::Agent, port, Rtt(0));462463		Ok(Self {464			transport: Transport::Ssh {465				sess,466				subs,467				remote_dir,468				agent_path,469			},470			rpc,471			elevated: tokio::sync::OnceCell::new(),472			children: Mutex::new(Vec::new()),473		})474	}475476	pub async fn connect_local(agent_path: &str) -> Result<Self> {477		let (port_user, port_agent) = loopback();478		let rpc = Rpc::<BifConfig>::new(Address::User);479		let mut agent = Rpc::<BifConfig>::new(Address::Agent);480481		// Register handlers before wiring up the link (see the agent binary).482		Fs::new().register_endpoints(&mut agent);483		Systemd.register_endpoints(&mut agent);484		Pty::new().register_endpoints(&mut agent);485486		agent.add_direct(Address::User, port_agent, Rtt(0));487		rpc.add_direct(Address::Agent, port_user, Rtt(0));488489		Ok(Self {490			transport: Transport::Local {491				agent,492				agent_path: agent_path.to_owned(),493			},494			rpc,495			elevated: tokio::sync::OnceCell::new(),496			children: Mutex::new(Vec::new()),497		})498	}499500	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {501		match &self.transport {502			Transport::Ssh { sess, .. } => Some(sess.clone()),503			Transport::Local { .. } => None,504		}505	}506507	pub fn rpc(&self) -> Rpc<BifConfig> {508		self.rpc.clone()509	}510511	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {512		R::wrap(self.rpc.remote(Address::Agent))513	}514515	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {516		let client: PluginEndpointsClient<BifConfig> = self.endpoints();517		client518			.load_plugin(id, name.to_owned())519			.await?520			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))521	}522	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {523		self.ensure_elevated().await?;524		let client: PluginEndpointsClient<BifConfig> =525			PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));526		client527			.load_plugin_path(id, path.to_owned())528			.await?529			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))530	}531	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {532		R::wrap(self.rpc.remote(Address::Plugin(id)))533	}534	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {535		self.ensure_elevated().await?;536		Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))537	}538539	async fn ensure_elevated(&self) -> Result<()> {540		self.elevated541			.get_or_try_init(|| async {542				let port = match &self.transport {543					Transport::Ssh {544						sess, agent_path, ..545					} => {546						let (tool, flags) = detect_escalation(sess).await?;547						let ch = sess.channel_open_session().await?;548						ch.exec(true, privileged_cmd(tool, flags, agent_path, None))549							.await?;550						port_from_channel(ch)551					}552					Transport::Local { agent_path, .. } => {553						let sock = env::temp_dir()554							.join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));555						let _ = std::fs::remove_file(&sock);556						let listener = UnixListener::bind(&sock)?;557						let (tool, flags) = ESCALATORS558							.iter()559							.find(|(t, _)| find_in_path(t).is_some())560							.ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;561						let child = tokio::process::Command::new(tool)562							.args(*flags)563							.arg(agent_path)564							.arg("real-agent")565							.arg("--privileged")566							.arg("--path")567							.arg(sock.to_str().expect("temp path is utf-8"))568							.kill_on_drop(true)569							.spawn()?;570						self.children.lock().expect("lock").push(child);571						let (stream, _) = listener.accept().await?;572						let _ = std::fs::remove_file(&sock);573						from_socket(stream)574					}575				};576				self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));577				anyhow::Ok(())578			})579			.await?;580		Ok(())581	}582583	pub async fn exec(&self, command: String) -> Result<RemoteChild> {584		let Some(sess) = self.ssh() else {585			bail!("exec should not be called on local")586		};587		let ch = sess.channel_open_session().await?;588		ch.exec(true, command).await?;589590		let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);591		let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);592		let (exit_tx, exit) = oneshot::channel();593594		tokio::spawn(async move {595			let mut ch = ch;596			let mut code = None;597			while let Some(msg) = ch.wait().await {598				match msg {599					ChannelMsg::Data { data } => {600						if out_w.write_all(&data).await.is_err() {601							break;602						}603					}604					ChannelMsg::ExtendedData { data, .. } => {605						if err_w.write_all(&data).await.is_err() {606							break;607						}608					}609					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),610					_ => {}611				}612			}613			let _ = out_w.shutdown().await;614			let _ = err_w.shutdown().await;615			let _ = exit_tx.send(code);616		});617618		Ok(RemoteChild {619			stdout,620			stderr,621			exit,622		})623	}624625	pub fn serve_elevate(&self) -> Result<()> {626		let Transport::Ssh {627			sess, agent_path, ..628		} = &self.transport629		else {630			bail!("elevate should not be called on local")631		};632		let mut rpc = self.rpc.clone();633		ElevateEndpoints(SshElevator {634			sess: sess.clone(),635			rpc: self.rpc.clone().downgrade(),636			agent_path: agent_path.to_owned(),637		})638		.register_endpoints(&mut rpc);639		Ok(())640	}641642	pub fn remote_dir(&self) -> Option<&Utf8Path> {643		match &self.transport {644			Transport::Ssh { remote_dir, .. } => Some(remote_dir),645			Transport::Local { .. } => None,646		}647	}648649	pub async fn forward_socket(650		&self,651		remote_path: &Utf8Path,652	) -> Result<oneshot::Receiver<Channel<Msg>>> {653		let Transport::Ssh { sess, subs, .. } = &self.transport else {654			bail!("forward_socket should not be called on local")655		};656		let (tx, rx) = oneshot::channel();657		subs.lock()658			.expect("lock")659			.insert(remote_path.to_owned(), tx);660		sess.streamlocal_forward(remote_path.to_owned()).await?;661		Ok(rx)662	}663664	pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {665		let Transport::Ssh { remote_dir, .. } = &self.transport else {666			bail!("open_shell should not be called on local")667		};668		let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));669670		let rx = self.forward_socket(&sock).await?;671		let client: PtyClient<BifConfig> = self.endpoints();672		let id = client673			.open_shell(sock, term.to_owned(), cols, rows)674			.await?675			.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;676		let ch = rx677			.await678			.map_err(|_| anyhow!("agent never connected the shell socket"))?;679680		Ok(Shell {681			id,682			stream: ch.into_stream(),683			remote: self.rpc.remote(Address::Agent),684		})685	}686}687688pub struct Shell {689	pub id: ShellId,690	pub stream: ChannelStream<Msg>,691	remote: Remote<BifConfig>,692}693694impl Shell {695	pub fn resizer(&self) -> ShellResizer {696		ShellResizer {697			remote: self.remote.clone(),698			id: self.id,699		}700	}701}702703#[derive(Clone)]704pub struct ShellResizer {705	remote: Remote<BifConfig>,706	id: ShellId,707}708709impl ShellResizer {710	pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {711		PtyClient::wrap(self.remote.clone())712			.resize(self.id, cols, rows)713			.await?714			.map_err(|e| anyhow!("failed to resize remote shell: {e}"))715	}716}717718async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {719	let mut cmd_chan = sess.channel_open_session().await?;720	cmd_chan721		.exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")722		.await?;723	let mut stdout = vec![];724	loop {725		let Some(msg) = cmd_chan.wait().await else {726			bail!("unexpected channel end");727		};728		match msg {729			russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),730			russh::ChannelMsg::ExitStatus { exit_status } => {731				if exit_status != 0 {732					bail!("mktemp failed");733				}734				break;735			}736			_ => {}737		}738	}739	ensure!(stdout.ends_with(b"\n"));740	stdout.pop();741	Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))742}
addedcrates/remowt-endpoints/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-endpoints/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "remowt-endpoints"
+description = "Nix daemon proxy"
+version.workspace = true
+edition = "2021"
+license.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+bifrostlink.workspace = true
+bifrostlink-macros.workspace = true
+camino.workspace = true
+serde = { workspace = true }
+tempfile.workspace = true
+thiserror.workspace = true
+tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }
+tracing.workspace = true
+uuid.workspace = true
+nix = { workspace = true, features = ["process", "term"] }
+zbus.workspace = true
addedcrates/remowt-endpoints/src/fs.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-endpoints/src/fs.rs
@@ -0,0 +1,105 @@
+use std::io::ErrorKind;
+use std::str::FromStr;
+use std::sync::Mutex;
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use camino::Utf8PathBuf;
+use serde::{Deserialize, Serialize};
+use tempfile::TempDir;
+
+#[derive(Default)]
+pub struct Fs {
+	tempdirs: Mutex<Vec<TempDir>>,
+}
+
+impl Fs {
+	pub fn new() -> Self {
+		Self::default()
+	}
+}
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+	#[error("file not found")]
+	NotFound,
+	#[error("file name/contents is not utf8")]
+	InvalidUtf8,
+	#[error("unknown fs error")]
+	Unknown,
+}
+
+#[endpoints(ns = 1)]
+impl Fs {
+	#[endpoints(id = 1)]
+	async fn read_file_tiny(&self, path: Utf8PathBuf) -> Result<Vec<u8>, Error> {
+		match tokio::fs::read(path).await {
+			Ok(v) => Ok(v),
+			Err(e) if e.kind() == ErrorKind::NotFound => Err(Error::NotFound),
+			_ => Err(Error::Unknown),
+		}
+	}
+	#[endpoints(id = 2)]
+	async fn file_exists(&self, path: Utf8PathBuf) -> bool {
+		tokio::fs::try_exists(path).await.unwrap_or(false)
+	}
+	#[endpoints(id = 3)]
+	async fn read_dir_raw(&self, path: Utf8PathBuf) -> Result<Vec<Utf8PathBuf>, Error> {
+		let mut dir = match tokio::fs::read_dir(path).await {
+			Ok(dir) => dir,
+			Err(e) if e.kind() == ErrorKind::NotFound => return Err(Error::NotFound),
+			Err(_) => return Err(Error::Unknown),
+		};
+		let mut out = Vec::new();
+		while let Ok(Some(entry)) = dir.next_entry().await {
+			let name = Utf8PathBuf::try_from(entry.file_name()).map_err(|_| Error::InvalidUtf8)?;
+			out.push(name);
+		}
+		Ok(out)
+	}
+	#[endpoints(id = 4)]
+	async fn mktemp_dir_raw(&self) -> Result<Utf8PathBuf, Error> {
+		let dir = tempfile::Builder::new()
+			.prefix("remowt.")
+			.tempdir()
+			.map_err(|_| Error::Unknown)?;
+		let mut tempdirs = self.tempdirs.lock().expect("not poisoned");
+		let path = Utf8PathBuf::try_from(dir.path().to_owned()).map_err(|_| Error::InvalidUtf8);
+		tempdirs.push(dir);
+		path
+	}
+	#[endpoints(id = 5)]
+	async fn rm_file(&self, path: Utf8PathBuf) -> Result<(), Error> {
+		match tokio::fs::remove_file(path).await {
+			Ok(()) => Ok(()),
+			Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
+			Err(_) => Err(Error::Unknown),
+		}
+	}
+}
+
+impl<C: Config> FsClient<C> {
+	pub async fn read_file_text(&self, path: impl Into<Utf8PathBuf>) -> Result<String, Error> {
+		let v = self
+			.read_file_tiny(path.into())
+			.await
+			.map_err(|_| Error::Unknown)?;
+		let v = v?;
+		String::from_utf8(v).map_err(|_| Error::InvalidUtf8)
+	}
+	pub async fn read_file_value<T: FromStr>(
+		&self,
+		path: impl Into<Utf8PathBuf>,
+	) -> Result<Result<T, T::Err>, Error> {
+		let text = self.read_file_text(path).await?;
+		Ok(T::from_str(&text))
+	}
+	pub async fn mktemp_dir(&self) -> Result<Utf8PathBuf, Error> {
+		self.mktemp_dir_raw().await.map_err(|_| Error::Unknown)?
+	}
+	pub async fn read_dir(&self, path: impl Into<Utf8PathBuf>) -> Result<Vec<Utf8PathBuf>, Error> {
+		self.read_dir_raw(path.into())
+			.await
+			.map_err(|_| Error::Unknown)?
+	}
+}
addedcrates/remowt-endpoints/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-endpoints/src/lib.rs
@@ -0,0 +1,4 @@
+pub mod fs;
+pub mod nix_daemon;
+pub mod pty;
+pub mod systemd;
addedcrates/remowt-endpoints/src/nix_daemon.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-endpoints/src/nix_daemon.rs
@@ -0,0 +1,65 @@
+use std::process::Stdio;
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use serde::{Deserialize, Serialize};
+use std::result::Result;
+use tokio::process::Command;
+
+pub const NIX_DAEMON_SOCKET: &str = "/nix/var/nix/daemon-socket/socket";
+
+pub struct NixDaemon;
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+	#[error("nix daemon unavailable: {0}")]
+	DaemonUnavailable(String),
+	#[error("tunnel socket unavailable: {0}")]
+	Tunnel(String),
+}
+
+#[endpoints(ns = 4)]
+impl NixDaemon {
+	#[endpoints(id = 1)]
+	async fn connect_daemon(&self, socket: String) -> Result<(), Error> {
+		let mut daemon = tokio::net::UnixStream::connect(NIX_DAEMON_SOCKET)
+			.await
+			.map_err(|e| Error::DaemonUnavailable(e.to_string()))?;
+		let mut tunnel = tokio::net::UnixStream::connect(&socket)
+			.await
+			.map_err(|e| Error::Tunnel(e.to_string()))?;
+		tokio::spawn(async move {
+			if let Err(e) = tokio::io::copy_bidirectional(&mut daemon, &mut tunnel).await {
+				tracing::debug!("nix daemon tunnel ended: {e}");
+			}
+		});
+		Ok(())
+	}
+
+	#[endpoints(id = 2)]
+	async fn serve_store(&self, store: String, socket: String) -> Result<(), Error> {
+		let mut child = Command::new("nix-daemon")
+			.arg("--stdio")
+			.arg("--store")
+			.arg(&store)
+			.stdin(Stdio::piped())
+			.stdout(Stdio::piped())
+			.spawn()
+			.map_err(|e| Error::DaemonUnavailable(e.to_string()))?;
+		let tunnel = tokio::net::UnixStream::connect(&socket)
+			.await
+			.map_err(|e| Error::Tunnel(e.to_string()))?;
+		let mut stdin = child.stdin.take().expect("piped");
+		let mut stdout = child.stdout.take().expect("piped");
+		tokio::spawn(async move {
+			let mut tunnel = tunnel;
+			let (mut tr, mut tw) = tunnel.split();
+			let _ = tokio::join!(
+				tokio::io::copy(&mut tr, &mut stdin),
+				tokio::io::copy(&mut stdout, &mut tw),
+			);
+			let _ = child.wait().await;
+		});
+		Ok(())
+	}
+}
addedcrates/remowt-endpoints/src/pty.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-endpoints/src/pty.rs
@@ -0,0 +1,256 @@
+use std::collections::HashMap;
+use std::io;
+use std::os::fd::{AsRawFd, OwnedFd};
+use std::pin::Pin;
+use std::process::Stdio;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use camino::Utf8PathBuf;
+use nix::libc;
+use nix::pty::{openpty, OpenptyResult, Winsize};
+use serde::{Deserialize, Serialize};
+use tokio::io::unix::AsyncFd;
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::net::UnixStream;
+use tracing::{info, warn};
+
+pub type ShellId = u64;
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+	#[error("openpty failed: {0}")]
+	Open(String),
+	#[error("failed to spawn shell: {0}")]
+	Spawn(String),
+	#[error("failed to connect to forwarded socket: {0}")]
+	Connect(String),
+	#[error("no shell with that id")]
+	NoSuchShell,
+	#[error("resize failed: {0}")]
+	Resize(String),
+	#[error("io error: {0}")]
+	Io(String),
+}
+
+impl From<io::Error> for Error {
+	fn from(e: io::Error) -> Self {
+		Error::Io(e.to_string())
+	}
+}
+
+#[derive(Clone, Default)]
+pub struct Pty {
+	shells: Arc<Mutex<HashMap<ShellId, OwnedFd>>>,
+	next_id: Arc<AtomicU64>,
+}
+
+impl Pty {
+	pub fn new() -> Self {
+		Self::default()
+	}
+}
+
+#[endpoints(ns = 7)]
+impl Pty {
+	#[endpoints(id = 1)]
+	async fn open_shell(
+		&self,
+		socket_path: Utf8PathBuf,
+		term: String,
+		cols: u16,
+		rows: u16,
+	) -> Result<ShellId, Error> {
+		let ws = Winsize {
+			ws_row: rows,
+			ws_col: cols,
+			ws_xpixel: 0,
+			ws_ypixel: 0,
+		};
+		let OpenptyResult { master, slave } =
+			openpty(Some(&ws), None).map_err(|e| Error::Open(e.to_string()))?;
+
+		let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_owned());
+
+		let slave_in = slave.try_clone()?;
+		let slave_out = slave.try_clone()?;
+		let slave_err = slave;
+
+		let mut cmd = tokio::process::Command::new(&shell);
+		cmd.env("TERM", &term);
+		if let Ok(home) = std::env::var("HOME") {
+			cmd.current_dir(home);
+		}
+		cmd.stdin(Stdio::from(slave_in));
+		cmd.stdout(Stdio::from(slave_out));
+		cmd.stderr(Stdio::from(slave_err));
+		// SAFETY: only async-signal-safe calls (setsid, ioctl) before exec.
+		unsafe {
+			cmd.pre_exec(|| {
+				nix::unistd::setsid().map_err(|e| io::Error::from_raw_os_error(e as i32))?;
+				if libc::ioctl(0, libc::TIOCSCTTY as _, 0) < 0 {
+					return Err(io::Error::last_os_error());
+				}
+				Ok(())
+			});
+		}
+
+		let mut child = cmd.spawn().map_err(|e| Error::Spawn(e.to_string()))?;
+
+		let resize_fd = master.try_clone()?;
+		let id = self.next_id.fetch_add(1, Ordering::Relaxed);
+		self.shells
+			.lock()
+			.expect("not poisoned")
+			.insert(id, resize_fd);
+
+		let sock = match UnixStream::connect(&socket_path).await {
+			Ok(s) => s,
+			Err(e) => {
+				self.shells.lock().expect("not poisoned").remove(&id);
+				let _ = child.kill().await;
+				return Err(Error::Connect(e.to_string()));
+			}
+		};
+		let pty = AsyncPty::new(master)?;
+
+		info!(id, shell, "shell opened");
+		let shells = self.shells.clone();
+		tokio::spawn(async move {
+			let mut pty = pty;
+			let mut sock = sock;
+			if let Err(e) = tokio::io::copy_bidirectional(&mut pty, &mut sock).await {
+				warn!(id, "shell pump ended: {e}");
+			}
+			let _ = child.kill().await;
+			shells.lock().expect("not poisoned").remove(&id);
+			info!(id, "shell closed");
+		});
+
+		Ok(id)
+	}
+
+	#[endpoints(id = 2)]
+	async fn resize(&self, id: ShellId, cols: u16, rows: u16) -> Result<(), Error> {
+		let ws = libc::winsize {
+			ws_row: rows,
+			ws_col: cols,
+			ws_xpixel: 0,
+			ws_ypixel: 0,
+		};
+		let shells = self.shells.lock().expect("not poisoned");
+		let fd = shells.get(&id).ok_or(Error::NoSuchShell)?;
+		// SAFETY: `fd` is a live PTY master
+		let rc = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ as _, &ws) };
+		if rc < 0 {
+			return Err(Error::Resize(io::Error::last_os_error().to_string()));
+		}
+		Ok(())
+	}
+}
+
+struct AsyncPty {
+	fd: AsyncFd<OwnedFd>,
+}
+
+impl AsyncPty {
+	fn new(fd: OwnedFd) -> io::Result<Self> {
+		let raw = fd.as_raw_fd();
+		// SAFETY: standard F_GETFL/F_SETFL round-trip on a valid fd.
+		unsafe {
+			let flags = libc::fcntl(raw, libc::F_GETFL);
+			if flags < 0 {
+				return Err(io::Error::last_os_error());
+			}
+			if libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
+				return Err(io::Error::last_os_error());
+			}
+		}
+		Ok(Self {
+			fd: AsyncFd::new(fd)?,
+		})
+	}
+}
+
+impl AsyncRead for AsyncPty {
+	fn poll_read(
+		self: Pin<&mut Self>,
+		cx: &mut Context<'_>,
+		buf: &mut ReadBuf<'_>,
+	) -> Poll<io::Result<()>> {
+		let this = self.get_mut();
+		loop {
+			let mut guard = match this.fd.poll_read_ready(cx) {
+				Poll::Ready(Ok(g)) => g,
+				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
+				Poll::Pending => return Poll::Pending,
+			};
+			let unfilled = buf.initialize_unfilled();
+			let res = guard.try_io(|inner| {
+				let fd = inner.get_ref().as_raw_fd();
+				// SAFETY: writing into `unfilled`'s own backing storage.
+				let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };
+				if n < 0 {
+					let err = io::Error::last_os_error();
+					if err.raw_os_error() == Some(libc::EIO) {
+						Ok(0)
+					} else {
+						Err(err)
+					}
+				} else {
+					Ok(n as usize)
+				}
+			});
+			match res {
+				Ok(Ok(n)) => {
+					buf.advance(n);
+					return Poll::Ready(Ok(()));
+				}
+				Ok(Err(e)) => return Poll::Ready(Err(e)),
+				Err(_would_block) => continue,
+			}
+		}
+	}
+}
+
+impl AsyncWrite for AsyncPty {
+	fn poll_write(
+		self: Pin<&mut Self>,
+		cx: &mut Context<'_>,
+		buf: &[u8],
+	) -> Poll<io::Result<usize>> {
+		let this = self.get_mut();
+		loop {
+			let mut guard = match this.fd.poll_write_ready(cx) {
+				Poll::Ready(Ok(g)) => g,
+				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
+				Poll::Pending => return Poll::Pending,
+			};
+			let res = guard.try_io(|inner| {
+				let fd = inner.get_ref().as_raw_fd();
+				// SAFETY: reading from `buf` for `buf.len()` bytes.
+				let n = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) };
+				if n < 0 {
+					Err(io::Error::last_os_error())
+				} else {
+					Ok(n as usize)
+				}
+			});
+			match res {
+				Ok(r) => return Poll::Ready(r),
+				Err(_would_block) => continue,
+			}
+		}
+	}
+
+	fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+		Poll::Ready(Ok(()))
+	}
+
+	fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+		Poll::Ready(Ok(()))
+	}
+}
addedcrates/remowt-endpoints/src/systemd.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-endpoints/src/systemd.rs
@@ -0,0 +1,54 @@
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use serde::{Deserialize, Serialize};
+use zbus::proxy;
+use zbus::zvariant::OwnedObjectPath;
+
+pub struct Systemd;
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+	#[error("systemd request failed: {0}")]
+	Failed(String),
+}
+
+#[proxy(
+	interface = "org.freedesktop.systemd1.Manager",
+	default_service = "org.freedesktop.systemd1",
+	default_path = "/org/freedesktop/systemd1"
+)]
+trait Manager {
+	fn start_unit(&self, name: &str, mode: &str) -> zbus::Result<OwnedObjectPath>;
+	fn stop_unit(&self, name: &str, mode: &str) -> zbus::Result<OwnedObjectPath>;
+}
+
+async fn manager() -> Result<ManagerProxy<'static>, Error> {
+	let conn = zbus::Connection::system()
+		.await
+		.map_err(|e| Error::Failed(e.to_string()))?;
+	ManagerProxy::new(&conn)
+		.await
+		.map_err(|e| Error::Failed(e.to_string()))
+}
+
+#[endpoints(ns = 5)]
+impl Systemd {
+	#[endpoints(id = 1)]
+	async fn start(&self, unit: String) -> Result<(), Error> {
+		manager()
+			.await?
+			.start_unit(&unit, "replace")
+			.await
+			.map_err(|e| Error::Failed(e.to_string()))?;
+		Ok(())
+	}
+	#[endpoints(id = 2)]
+	async fn stop(&self, unit: String) -> Result<(), Error> {
+		manager()
+			.await?
+			.stop_unit(&unit, "replace")
+			.await
+			.map_err(|e| Error::Failed(e.to_string()))?;
+		Ok(())
+	}
+}
deletedcrates/remowt-fs/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-fs/Cargo.toml
+++ /dev/null
@@ -1,15 +0,0 @@
-[package]
-name = "remowt-fs"
-description = "Filesystem endpoint for remowt/bifrostlink"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-camino = { workspace = true, features = ["serde1"] }
-serde = { workspace = true, features = ["derive"] }
-tempfile.workspace = true
-thiserror.workspace = true
-tokio = { workspace = true, features = ["fs"] }
deletedcrates/remowt-fs/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-fs/src/lib.rs
+++ /dev/null
@@ -1,105 +0,0 @@
-use std::io::ErrorKind;
-use std::str::FromStr;
-use std::sync::Mutex;
-
-use bifrostlink::declarative::endpoints;
-use bifrostlink::Config;
-use camino::Utf8PathBuf;
-use serde::{Deserialize, Serialize};
-use tempfile::TempDir;
-
-#[derive(Default)]
-pub struct Fs {
-	tempdirs: Mutex<Vec<TempDir>>,
-}
-
-impl Fs {
-	pub fn new() -> Self {
-		Self::default()
-	}
-}
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum Error {
-	#[error("file not found")]
-	NotFound,
-	#[error("file name/contents is not utf8")]
-	InvalidUtf8,
-	#[error("unknown fs error")]
-	Unknown,
-}
-
-#[endpoints(ns = 1)]
-impl Fs {
-	#[endpoints(id = 1)]
-	async fn read_file_tiny(&self, path: Utf8PathBuf) -> Result<Vec<u8>, Error> {
-		match tokio::fs::read(path).await {
-			Ok(v) => Ok(v),
-			Err(e) if e.kind() == ErrorKind::NotFound => Err(Error::NotFound),
-			_ => Err(Error::Unknown),
-		}
-	}
-	#[endpoints(id = 2)]
-	async fn file_exists(&self, path: Utf8PathBuf) -> bool {
-		tokio::fs::try_exists(path).await.unwrap_or(false)
-	}
-	#[endpoints(id = 3)]
-	async fn read_dir_raw(&self, path: Utf8PathBuf) -> Result<Vec<Utf8PathBuf>, Error> {
-		let mut dir = match tokio::fs::read_dir(path).await {
-			Ok(dir) => dir,
-			Err(e) if e.kind() == ErrorKind::NotFound => return Err(Error::NotFound),
-			Err(_) => return Err(Error::Unknown),
-		};
-		let mut out = Vec::new();
-		while let Ok(Some(entry)) = dir.next_entry().await {
-			let name = Utf8PathBuf::try_from(entry.file_name()).map_err(|_| Error::InvalidUtf8)?;
-			out.push(name);
-		}
-		Ok(out)
-	}
-	#[endpoints(id = 4)]
-	async fn mktemp_dir_raw(&self) -> Result<Utf8PathBuf, Error> {
-		let dir = tempfile::Builder::new()
-			.prefix("remowt.")
-			.tempdir()
-			.map_err(|_| Error::Unknown)?;
-		let mut tempdirs = self.tempdirs.lock().expect("not poisoned");
-		let path = Utf8PathBuf::try_from(dir.path().to_owned()).map_err(|_| Error::InvalidUtf8);
-		tempdirs.push(dir);
-		path
-	}
-	#[endpoints(id = 5)]
-	async fn rm_file(&self, path: Utf8PathBuf) -> Result<(), Error> {
-		match tokio::fs::remove_file(path).await {
-			Ok(()) => Ok(()),
-			Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
-			Err(_) => Err(Error::Unknown),
-		}
-	}
-}
-
-impl<C: Config> FsClient<C> {
-	pub async fn read_file_text(&self, path: impl Into<Utf8PathBuf>) -> Result<String, Error> {
-		let v = self
-			.read_file_tiny(path.into())
-			.await
-			.map_err(|_| Error::Unknown)?;
-		let v = v?;
-		String::from_utf8(v).map_err(|_| Error::InvalidUtf8)
-	}
-	pub async fn read_file_value<T: FromStr>(
-		&self,
-		path: impl Into<Utf8PathBuf>,
-	) -> Result<Result<T, T::Err>, Error> {
-		let text = self.read_file_text(path).await?;
-		Ok(T::from_str(&text))
-	}
-	pub async fn mktemp_dir(&self) -> Result<Utf8PathBuf, Error> {
-		self.mktemp_dir_raw().await.map_err(|_| Error::Unknown)?
-	}
-	pub async fn read_dir(&self, path: impl Into<Utf8PathBuf>) -> Result<Vec<Utf8PathBuf>, Error> {
-		self.read_dir_raw(path.into())
-			.await
-			.map_err(|_| Error::Unknown)?
-	}
-}
modifiedcrates/remowt-link-shared/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-link-shared/Cargo.toml
+++ b/crates/remowt-link-shared/Cargo.toml
@@ -12,8 +12,5 @@
 serde_json.workspace = true
 thiserror.workspace = true
 tokio = { workspace = true, features = ["fs"] }
-remowt-fs.workspace = true
-remowt-systemd.workspace = true
 remowt-ui-prompt.workspace = true
 camino = { workspace = true, features = ["serde1"] }
-remowt-pty.workspace = true
modifiedcrates/remowt-link-shared/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -21,10 +21,6 @@
 
 pub mod plugin;
 
-pub use remowt_fs::{Error as FsError, Fs, FsClient};
-pub use remowt_pty::{Error as PtyError, Pty, PtyClient, ShellId};
-pub use remowt_systemd::{Error as SystemdError, Systemd, SystemdClient};
-
 #[derive(Serialize, Deserialize, Debug, thiserror::Error)]
 pub enum ElevateError {
 	#[error("elevation failed: {0}")]
deletedcrates/remowt-nix-daemon/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-nix-daemon/Cargo.toml
+++ /dev/null
@@ -1,18 +0,0 @@
-[package]
-name = "remowt-nix-daemon"
-description = "Nix daemon proxy"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-anyhow.workspace = true
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-camino.workspace = true
-remowt-client.workspace = true
-serde = { workspace = true }
-thiserror.workspace = true
-tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }
-tracing.workspace = true
-uuid.workspace = true
deletedcrates/remowt-nix-daemon/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-nix-daemon/src/lib.rs
+++ /dev/null
@@ -1,65 +0,0 @@
-use std::process::Stdio;
-
-use bifrostlink::declarative::endpoints;
-use bifrostlink::Config;
-use serde::{Deserialize, Serialize};
-use std::result::Result;
-use tokio::process::Command;
-
-pub const NIX_DAEMON_SOCKET: &str = "/nix/var/nix/daemon-socket/socket";
-
-pub struct NixDaemon;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum Error {
-	#[error("nix daemon unavailable: {0}")]
-	DaemonUnavailable(String),
-	#[error("tunnel socket unavailable: {0}")]
-	Tunnel(String),
-}
-
-#[endpoints(ns = 4)]
-impl NixDaemon {
-	#[endpoints(id = 1)]
-	async fn connect_daemon(&self, socket: String) -> Result<(), Error> {
-		let mut daemon = tokio::net::UnixStream::connect(NIX_DAEMON_SOCKET)
-			.await
-			.map_err(|e| Error::DaemonUnavailable(e.to_string()))?;
-		let mut tunnel = tokio::net::UnixStream::connect(&socket)
-			.await
-			.map_err(|e| Error::Tunnel(e.to_string()))?;
-		tokio::spawn(async move {
-			if let Err(e) = tokio::io::copy_bidirectional(&mut daemon, &mut tunnel).await {
-				tracing::debug!("nix daemon tunnel ended: {e}");
-			}
-		});
-		Ok(())
-	}
-
-	#[endpoints(id = 2)]
-	async fn serve_store(&self, store: String, socket: String) -> Result<(), Error> {
-		let mut child = Command::new("nix-daemon")
-			.arg("--stdio")
-			.arg("--store")
-			.arg(&store)
-			.stdin(Stdio::piped())
-			.stdout(Stdio::piped())
-			.spawn()
-			.map_err(|e| Error::DaemonUnavailable(e.to_string()))?;
-		let tunnel = tokio::net::UnixStream::connect(&socket)
-			.await
-			.map_err(|e| Error::Tunnel(e.to_string()))?;
-		let mut stdin = child.stdin.take().expect("piped");
-		let mut stdout = child.stdout.take().expect("piped");
-		tokio::spawn(async move {
-			let mut tunnel = tunnel;
-			let (mut tr, mut tw) = tunnel.split();
-			let _ = tokio::join!(
-				tokio::io::copy(&mut tr, &mut stdin),
-				tokio::io::copy(&mut stdout, &mut tw),
-			);
-			let _ = child.wait().await;
-		});
-		Ok(())
-	}
-}
modifiedcrates/remowt-plugin/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-plugin/src/lib.rs
+++ b/crates/remowt-plugin/src/lib.rs
@@ -8,7 +8,7 @@
 pub mod host;
 
 pub use bifrostlink;
-pub use remowt_link_shared::{self, Address, BifConfig, Fs, Pty, Systemd};
+pub use remowt_link_shared::{self, Address, BifConfig};
 
 pub fn plugin_index() -> Result<u16> {
 	let arg = std::env::args()
deletedcrates/remowt-pty/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-pty/Cargo.toml
+++ /dev/null
@@ -1,23 +0,0 @@
-[package]
-name = "remowt-pty"
-description = "PTY/shell endpoint for remowt"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-camino = { workspace = true, features = ["serde1"] }
-nix = { workspace = true, features = ["process", "term"] }
-serde = { workspace = true, features = ["derive"] }
-thiserror.workspace = true
-tokio = { workspace = true, features = [
-	"net",
-	"io-util",
-	"rt",
-	"macros",
-	"process",
-	"sync",
-] }
-tracing.workspace = true
deletedcrates/remowt-pty/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-pty/src/lib.rs
+++ /dev/null
@@ -1,256 +0,0 @@
-use std::collections::HashMap;
-use std::io;
-use std::os::fd::{AsRawFd, OwnedFd};
-use std::pin::Pin;
-use std::process::Stdio;
-use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::{Arc, Mutex};
-use std::task::{Context, Poll};
-
-use bifrostlink::declarative::endpoints;
-use bifrostlink::Config;
-use camino::Utf8PathBuf;
-use nix::libc;
-use nix::pty::{openpty, OpenptyResult, Winsize};
-use serde::{Deserialize, Serialize};
-use tokio::io::unix::AsyncFd;
-use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
-use tokio::net::UnixStream;
-use tracing::{info, warn};
-
-pub type ShellId = u64;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum Error {
-	#[error("openpty failed: {0}")]
-	Open(String),
-	#[error("failed to spawn shell: {0}")]
-	Spawn(String),
-	#[error("failed to connect to forwarded socket: {0}")]
-	Connect(String),
-	#[error("no shell with that id")]
-	NoSuchShell,
-	#[error("resize failed: {0}")]
-	Resize(String),
-	#[error("io error: {0}")]
-	Io(String),
-}
-
-impl From<io::Error> for Error {
-	fn from(e: io::Error) -> Self {
-		Error::Io(e.to_string())
-	}
-}
-
-#[derive(Clone, Default)]
-pub struct Pty {
-	shells: Arc<Mutex<HashMap<ShellId, OwnedFd>>>,
-	next_id: Arc<AtomicU64>,
-}
-
-impl Pty {
-	pub fn new() -> Self {
-		Self::default()
-	}
-}
-
-#[endpoints(ns = 7)]
-impl Pty {
-	#[endpoints(id = 1)]
-	async fn open_shell(
-		&self,
-		socket_path: Utf8PathBuf,
-		term: String,
-		cols: u16,
-		rows: u16,
-	) -> Result<ShellId, Error> {
-		let ws = Winsize {
-			ws_row: rows,
-			ws_col: cols,
-			ws_xpixel: 0,
-			ws_ypixel: 0,
-		};
-		let OpenptyResult { master, slave } =
-			openpty(Some(&ws), None).map_err(|e| Error::Open(e.to_string()))?;
-
-		let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_owned());
-
-		let slave_in = slave.try_clone()?;
-		let slave_out = slave.try_clone()?;
-		let slave_err = slave;
-
-		let mut cmd = tokio::process::Command::new(&shell);
-		cmd.env("TERM", &term);
-		if let Ok(home) = std::env::var("HOME") {
-			cmd.current_dir(home);
-		}
-		cmd.stdin(Stdio::from(slave_in));
-		cmd.stdout(Stdio::from(slave_out));
-		cmd.stderr(Stdio::from(slave_err));
-		// SAFETY: only async-signal-safe calls (setsid, ioctl) before exec.
-		unsafe {
-			cmd.pre_exec(|| {
-				nix::unistd::setsid().map_err(|e| io::Error::from_raw_os_error(e as i32))?;
-				if libc::ioctl(0, libc::TIOCSCTTY as _, 0) < 0 {
-					return Err(io::Error::last_os_error());
-				}
-				Ok(())
-			});
-		}
-
-		let mut child = cmd.spawn().map_err(|e| Error::Spawn(e.to_string()))?;
-
-		let resize_fd = master.try_clone()?;
-		let id = self.next_id.fetch_add(1, Ordering::Relaxed);
-		self.shells
-			.lock()
-			.expect("not poisoned")
-			.insert(id, resize_fd);
-
-		let sock = match UnixStream::connect(&socket_path).await {
-			Ok(s) => s,
-			Err(e) => {
-				self.shells.lock().expect("not poisoned").remove(&id);
-				let _ = child.kill().await;
-				return Err(Error::Connect(e.to_string()));
-			}
-		};
-		let pty = AsyncPty::new(master)?;
-
-		info!(id, shell, "shell opened");
-		let shells = self.shells.clone();
-		tokio::spawn(async move {
-			let mut pty = pty;
-			let mut sock = sock;
-			if let Err(e) = tokio::io::copy_bidirectional(&mut pty, &mut sock).await {
-				warn!(id, "shell pump ended: {e}");
-			}
-			let _ = child.kill().await;
-			shells.lock().expect("not poisoned").remove(&id);
-			info!(id, "shell closed");
-		});
-
-		Ok(id)
-	}
-
-	#[endpoints(id = 2)]
-	async fn resize(&self, id: ShellId, cols: u16, rows: u16) -> Result<(), Error> {
-		let ws = libc::winsize {
-			ws_row: rows,
-			ws_col: cols,
-			ws_xpixel: 0,
-			ws_ypixel: 0,
-		};
-		let shells = self.shells.lock().expect("not poisoned");
-		let fd = shells.get(&id).ok_or(Error::NoSuchShell)?;
-		// SAFETY: `fd` is a live PTY master
-		let rc = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ as _, &ws) };
-		if rc < 0 {
-			return Err(Error::Resize(io::Error::last_os_error().to_string()));
-		}
-		Ok(())
-	}
-}
-
-struct AsyncPty {
-	fd: AsyncFd<OwnedFd>,
-}
-
-impl AsyncPty {
-	fn new(fd: OwnedFd) -> io::Result<Self> {
-		let raw = fd.as_raw_fd();
-		// SAFETY: standard F_GETFL/F_SETFL round-trip on a valid fd.
-		unsafe {
-			let flags = libc::fcntl(raw, libc::F_GETFL);
-			if flags < 0 {
-				return Err(io::Error::last_os_error());
-			}
-			if libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
-				return Err(io::Error::last_os_error());
-			}
-		}
-		Ok(Self {
-			fd: AsyncFd::new(fd)?,
-		})
-	}
-}
-
-impl AsyncRead for AsyncPty {
-	fn poll_read(
-		self: Pin<&mut Self>,
-		cx: &mut Context<'_>,
-		buf: &mut ReadBuf<'_>,
-	) -> Poll<io::Result<()>> {
-		let this = self.get_mut();
-		loop {
-			let mut guard = match this.fd.poll_read_ready(cx) {
-				Poll::Ready(Ok(g)) => g,
-				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
-				Poll::Pending => return Poll::Pending,
-			};
-			let unfilled = buf.initialize_unfilled();
-			let res = guard.try_io(|inner| {
-				let fd = inner.get_ref().as_raw_fd();
-				// SAFETY: writing into `unfilled`'s own backing storage.
-				let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };
-				if n < 0 {
-					let err = io::Error::last_os_error();
-					if err.raw_os_error() == Some(libc::EIO) {
-						Ok(0)
-					} else {
-						Err(err)
-					}
-				} else {
-					Ok(n as usize)
-				}
-			});
-			match res {
-				Ok(Ok(n)) => {
-					buf.advance(n);
-					return Poll::Ready(Ok(()));
-				}
-				Ok(Err(e)) => return Poll::Ready(Err(e)),
-				Err(_would_block) => continue,
-			}
-		}
-	}
-}
-
-impl AsyncWrite for AsyncPty {
-	fn poll_write(
-		self: Pin<&mut Self>,
-		cx: &mut Context<'_>,
-		buf: &[u8],
-	) -> Poll<io::Result<usize>> {
-		let this = self.get_mut();
-		loop {
-			let mut guard = match this.fd.poll_write_ready(cx) {
-				Poll::Ready(Ok(g)) => g,
-				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
-				Poll::Pending => return Poll::Pending,
-			};
-			let res = guard.try_io(|inner| {
-				let fd = inner.get_ref().as_raw_fd();
-				// SAFETY: reading from `buf` for `buf.len()` bytes.
-				let n = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) };
-				if n < 0 {
-					Err(io::Error::last_os_error())
-				} else {
-					Ok(n as usize)
-				}
-			});
-			match res {
-				Ok(r) => return Poll::Ready(r),
-				Err(_would_block) => continue,
-			}
-		}
-	}
-
-	fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
-		Poll::Ready(Ok(()))
-	}
-
-	fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
-		Poll::Ready(Ok(()))
-	}
-}
deletedcrates/remowt-systemd/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-systemd/Cargo.toml
+++ /dev/null
@@ -1,13 +0,0 @@
-[package]
-name = "remowt-systemd"
-description = "systemd control endpoint for remowt/bifrostlink (over D-Bus)"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-serde = { workspace = true, features = ["derive"] }
-thiserror.workspace = true
-zbus = { workspace = true, features = ["tokio"] }
deletedcrates/remowt-systemd/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-systemd/src/lib.rs
+++ /dev/null
@@ -1,54 +0,0 @@
-use bifrostlink::declarative::endpoints;
-use bifrostlink::Config;
-use serde::{Deserialize, Serialize};
-use zbus::proxy;
-use zbus::zvariant::OwnedObjectPath;
-
-pub struct Systemd;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum Error {
-	#[error("systemd request failed: {0}")]
-	Failed(String),
-}
-
-#[proxy(
-	interface = "org.freedesktop.systemd1.Manager",
-	default_service = "org.freedesktop.systemd1",
-	default_path = "/org/freedesktop/systemd1"
-)]
-trait Manager {
-	fn start_unit(&self, name: &str, mode: &str) -> zbus::Result<OwnedObjectPath>;
-	fn stop_unit(&self, name: &str, mode: &str) -> zbus::Result<OwnedObjectPath>;
-}
-
-async fn manager() -> Result<ManagerProxy<'static>, Error> {
-	let conn = zbus::Connection::system()
-		.await
-		.map_err(|e| Error::Failed(e.to_string()))?;
-	ManagerProxy::new(&conn)
-		.await
-		.map_err(|e| Error::Failed(e.to_string()))
-}
-
-#[endpoints(ns = 5)]
-impl Systemd {
-	#[endpoints(id = 1)]
-	async fn start(&self, unit: String) -> Result<(), Error> {
-		manager()
-			.await?
-			.start_unit(&unit, "replace")
-			.await
-			.map_err(|e| Error::Failed(e.to_string()))?;
-		Ok(())
-	}
-	#[endpoints(id = 2)]
-	async fn stop(&self, unit: String) -> Result<(), Error> {
-		manager()
-			.await?
-			.stop_unit(&unit, "replace")
-			.await
-			.map_err(|e| Error::Failed(e.to_string()))?;
-		Ok(())
-	}
-}
addedcrates/remowt-ui-prompt/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-ui-prompt/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "remowt-ui-prompt"
+description = "Interactive UI prompt endpoint for remowt (D-Bus)"
+version.workspace = true
+edition = "2021"
+license.workspace = true
+
+[dependencies]
+bifrostlink.workspace = true
+bifrostlink-macros.workspace = true
+serde.workspace = true
+serde_json.workspace = true
+thiserror.workspace = true
+tokio = { workspace = true, features = ["io-util", "macros", "process", "rt"] }
+tracing.workspace = true
+zbus = { workspace = true, optional = true }
+
+[features]
+default = ["dbus"]
+dbus = ["dep:zbus"]
addedcrates/remowt-ui-prompt/src/bifrost.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-ui-prompt/src/bifrost.rs
@@ -0,0 +1,109 @@
+use bifrostlink::{Config, Rpc};
+use bifrostlink_macros::endpoints;
+use serde::{Deserialize, Serialize};
+
+use crate::{Error, Prompter, Source};
+
+pub struct PromptEndpoints<P>(pub P);
+
+#[endpoints(ns = 2)]
+impl<P> PromptEndpoints<P>
+where
+	P: Prompter + Send + Sync + 'static,
+{
+	#[endpoints(id = 1, cancel)]
+	async fn prompt_enum(
+		&self,
+		prompt: String,
+		description: String,
+		variants: Vec<String>,
+		source: Vec<Source>,
+	) -> Result<u32, Error> {
+		let variants: Vec<&str> = variants.iter().map(|v| v.as_str()).collect();
+		self.0
+			.prompt_enum(&prompt, &description, &variants, &source)
+			.await
+	}
+
+	#[endpoints(id = 2, cancel)]
+	async fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: String,
+		description: String,
+		source: Vec<Source>,
+	) -> Result<String, Error> {
+		self.0
+			.prompt_text(echo, &prompt, &description, &source)
+			.await
+	}
+
+	#[endpoints(id = 3, cancel)]
+	async fn display_text(
+		&self,
+		error: bool,
+		description: String,
+		source: Vec<Source>,
+	) -> Result<(), Error> {
+		self.0.display_text(error, &description, &source).await
+	}
+}
+
+impl<C: Config> Prompter for PromptEndpointsClient<C>
+where
+	Error: ToString,
+{
+	async fn prompt_enum(
+		&self,
+		prompt: &str,
+		description: &str,
+		variants: &[&str],
+		source: &[Source],
+	) -> crate::Result<u32> {
+		self.prompt_enum(
+			prompt.to_owned(),
+			description.to_owned(),
+			variants.iter().map(|v| (*v).to_owned()).collect(),
+			source.to_vec(),
+		)
+		.await
+		.map_err(|e| Error::Remote(e.to_string()))?
+	}
+
+	async fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> crate::Result<String> {
+		self.prompt_text(
+			echo,
+			prompt.to_owned(),
+			description.to_owned(),
+			source.to_vec(),
+		)
+		.await
+		.map_err(|e| Error::Remote(e.to_string()))?
+	}
+
+	async fn display_text(
+		&self,
+		error: bool,
+		description: &str,
+		source: &[Source],
+	) -> crate::Result<()> {
+		self.display_text(error, description.to_owned(), source.to_vec())
+			.await
+			.map_err(|e| Error::Remote(e.to_string()))?
+	}
+}
+
+pub fn serve_prompts<P, C>(rpc: &mut Rpc<C>, prompt: P)
+where
+	P: Prompter + Send + Sync + 'static,
+	C: Config,
+	C::Error: From<Error>,
+{
+	PromptEndpoints(prompt).register_endpoints(rpc);
+}
addedcrates/remowt-ui-prompt/src/dbus.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-ui-prompt/src/dbus.rs
@@ -0,0 +1,135 @@
+use zbus::interface;
+use zbus::{fdo, proxy};
+
+use crate::Source;
+use crate::{BlockingPrompter, Result};
+use crate::{Error, Prompter};
+
+pub struct DbusPrompterInterface<P>(pub P);
+
+#[interface(name = "lach.PolkitInputHandler")]
+impl<P: Prompter + Send + Sync + 'static> DbusPrompterInterface<P> {
+	async fn prompt_radio(
+		&self,
+		prompt: &str,
+		description: &str,
+		source: Vec<Source>,
+	) -> fdo::Result<bool> {
+		Ok(self.0.prompt_radio(prompt, description, &source).await?)
+	}
+	async fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: Vec<Source>,
+	) -> fdo::Result<String> {
+		Ok(self
+			.0
+			.prompt_text(echo, prompt, description, &source)
+			.await?)
+	}
+	async fn display_text(
+		&self,
+		error: bool,
+		description: &str,
+		source: Vec<Source>,
+	) -> fdo::Result<()> {
+		Ok(self.0.display_text(error, description, &source).await?)
+	}
+}
+
+#[proxy(interface = "lach.PolkitInputHandler")]
+pub trait DbusPrompter {
+	async fn prompt_enum(
+		&self,
+		prompt: &str,
+		description: &str,
+		variants: &[&str],
+		source: &[Source],
+	) -> fdo::Result<u32>;
+	async fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> fdo::Result<String>;
+	async fn display_text(
+		&self,
+		error: bool,
+		description: &str,
+		source: &[Source],
+	) -> fdo::Result<()>;
+}
+
+impl Prompter for DbusPrompterProxy<'_> {
+	async fn prompt_enum(
+		&self,
+		prompt: &str,
+		description: &str,
+		variants: &[&str],
+		source: &[Source],
+	) -> Result<u32> {
+		Ok(self
+			.prompt_enum(prompt, description, variants, source)
+			.await?)
+	}
+
+	async fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> Result<String> {
+		Ok(self.prompt_text(echo, prompt, description, source).await?)
+	}
+
+	async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
+		Ok(self.display_text(error, description, source).await?)
+	}
+}
+impl BlockingPrompter for DbusPrompterProxyBlocking<'_> {
+	fn prompt_enum(
+		&self,
+		prompt: &str,
+		description: &str,
+		variants: &[&str],
+		source: &[Source],
+	) -> Result<u32> {
+		Ok(self.prompt_enum(prompt, description, variants, source)?)
+	}
+
+	fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> Result<String> {
+		Ok(self.prompt_text(echo, prompt, description, source)?)
+	}
+
+	fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
+		Ok(self.display_text(error, description, source)?)
+	}
+}
+
+impl From<fdo::Error> for Error {
+	fn from(value: fdo::Error) -> Self {
+		if matches!(value, fdo::Error::NoReply(_)) {
+			return Self::Cancel;
+		}
+		Self::InputError(format!("{value}"))
+	}
+}
+impl From<Error> for fdo::Error {
+	fn from(value: Error) -> Self {
+		match value {
+			Error::Cancel => fdo::Error::NoReply("input was cancelled".to_owned()),
+			Error::Remote(e) => fdo::Error::NoReply(format!("remote error occured: {e}")),
+			Error::InputError(e) => fdo::Error::Failed(e),
+		}
+	}
+}
addedcrates/remowt-ui-prompt/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-ui-prompt/src/lib.rs
@@ -0,0 +1,201 @@
+use core::fmt;
+use std::borrow::Cow;
+use std::future::Future;
+use std::result;
+
+pub mod bifrost;
+pub mod dbus;
+pub mod rofi;
+
+#[derive(thiserror::Error, Debug, serde::Serialize, serde::Deserialize)]
+pub enum Error {
+	#[error("user has cancelled input")]
+	Cancel,
+	#[error("input error: {0}")]
+	InputError(String),
+	#[error("unknown remote error: {0}")]
+	Remote(String),
+}
+
+pub type Result<T, E = Error> = result::Result<T, E>;
+
+#[cfg_attr(feature = "dbus", derive(zbus::zvariant::Type))]
+#[derive(serde::Serialize, serde::Deserialize, Clone)]
+pub struct Source(pub Cow<'static, str>);
+impl fmt::Display for Source {
+	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+		write!(f, "<u>{}</u>", self.0)
+	}
+}
+
+pub trait Prompter: Send + Sync {
+	fn prompt_radio(
+		&self,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> impl Future<Output = Result<bool>> + Send {
+		let fut = self.prompt_enum(prompt, description, &["No", "Yes"], source);
+		async { fut.await.map(|v| v == 1) }
+	}
+	fn prompt_enum(
+		&self,
+		prompt: &str,
+		description: &str,
+		variants: &[&str],
+		source: &[Source],
+	) -> impl Future<Output = Result<u32>> + Send;
+	fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> impl Future<Output = Result<String>> + Send;
+	fn display_text(
+		&self,
+		error: bool,
+		description: &str,
+		source: &[Source],
+	) -> impl Future<Output = Result<()>> + Send;
+}
+pub trait BlockingPrompter {
+	fn prompt_radio(&self, prompt: &str, description: &str, source: &[Source]) -> Result<bool> {
+		self.prompt_enum(prompt, description, &["No", "Yes"], source)
+			.map(|v| v == 1)
+	}
+	fn prompt_enum(
+		&self,
+		prompt: &str,
+		description: &str,
+		variants: &[&str],
+		source: &[Source],
+	) -> Result<u32>;
+	fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> Result<String>;
+	fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()>;
+}
+impl<P> Prompter for &P
+where
+	P: Prompter,
+{
+	fn prompt_radio(
+		&self,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> impl Future<Output = Result<bool>> + Send {
+		(*self).prompt_radio(prompt, description, source)
+	}
+
+	fn prompt_enum(
+		&self,
+		prompt: &str,
+		description: &str,
+		variants: &[&str],
+		source: &[Source],
+	) -> impl Future<Output = Result<u32>> + Send {
+		(*self).prompt_enum(prompt, description, variants, source)
+	}
+
+	fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> impl Future<Output = Result<String>> + Send {
+		(*self).prompt_text(echo, prompt, description, source)
+	}
+
+	fn display_text(
+		&self,
+		error: bool,
+		description: &str,
+		source: &[Source],
+	) -> impl Future<Output = Result<()>> + Send {
+		(*self).display_text(error, description, source)
+	}
+}
+
+pub struct PrependSourcePrompter<P> {
+	pub prompter: P,
+	pub source: Vec<Source>,
+	pub description: String,
+}
+impl<P> PrependSourcePrompter<P> {
+	fn source(&self, input: &[Source]) -> Vec<Source> {
+		let mut out = self.source.clone();
+		out.extend(input.iter().cloned());
+		out
+	}
+	fn description(&self, input: &str) -> String {
+		if self.description.is_empty() {
+			input.to_owned()
+		} else if input.is_empty() {
+			self.description.to_owned()
+		} else {
+			format!("{input}\n\n{}", self.description)
+		}
+	}
+}
+impl<P> Prompter for PrependSourcePrompter<P>
+where
+	P: Prompter + Sync,
+{
+	async fn prompt_radio(
+		&self,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> Result<bool> {
+		self.prompter
+			.prompt_radio(prompt, &self.description(description), &self.source(source))
+			.await
+	}
+
+	async fn prompt_enum(
+		&self,
+		prompt: &str,
+		description: &str,
+		variants: &[&str],
+		source: &[Source],
+	) -> Result<u32> {
+		self.prompter
+			.prompt_enum(
+				prompt,
+				&self.description(description),
+				variants,
+				&self.source(source),
+			)
+			.await
+	}
+
+	async fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> Result<String> {
+		self.prompter
+			.prompt_text(
+				echo,
+				prompt,
+				&self.description(description),
+				&self.source(source),
+			)
+			.await
+	}
+
+	async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
+		self.prompter
+			.display_text(error, &self.description(description), &self.source(source))
+			.await
+	}
+}
addedcrates/remowt-ui-prompt/src/rofi.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-ui-prompt/src/rofi.rs
@@ -0,0 +1,208 @@
+use std::process::Stdio;
+
+use tokio::io::AsyncWriteExt;
+use tokio::process::Command;
+use tracing::trace;
+
+use crate::{Error, Prompter, Result, Source};
+
+#[derive(Clone)]
+pub struct RofiPrompter;
+
+fn fixup_prompt(prompt: &str) -> &str {
+	// Rofi always appends such suffix
+	prompt.strip_suffix(": ").unwrap_or(prompt)
+}
+
+fn rofi_command() -> Command {
+	Command::new(option_env!("ROFI").unwrap_or("rofi"))
+}
+
+impl Prompter for RofiPrompter {
+	async fn prompt_enum(
+		&self,
+		prompt: &str,
+		description: &str,
+		variants: &[&str],
+		source: &[Source],
+	) -> Result<u32> {
+		trace!("rofi radio");
+		let mut cmd = rofi_command();
+		let mesg = if source.is_empty() {
+			description.to_owned()
+		} else {
+			let mut out = format!("{description}\n\n<b>Requested on ",);
+			for (i, s) in source.iter().enumerate() {
+				if i != 0 {
+					out.push_str(" -> ");
+				}
+				out.push_str(&s.to_string());
+			}
+			out.push_str("</b>");
+			out
+		};
+		cmd.args([
+			"-dmenu",
+			"-mesg",
+			&mesg,
+			"-sync",
+			"-only-match",
+			"-p",
+			fixup_prompt(prompt),
+			"-format",
+			"i",
+			"-markup-rows",
+		]);
+		cmd.stdin(Stdio::piped());
+		cmd.stdout(Stdio::piped());
+		cmd.kill_on_drop(true);
+		let mut child = cmd
+			.spawn()
+			.map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
+
+		let mut stdin = child.stdin.take().expect("stdin is piped");
+		for var in variants {
+			stdin
+				.write_all(var.replace('\n', " ").as_bytes())
+				.await
+				.map_err(|e| Error::InputError(format!("failed to write rofi variants: {e}")))?;
+			stdin
+				.write_all(b"\n")
+				.await
+				.map_err(|e| Error::InputError(format!("failed to write rofi variants: {e}")))?;
+		}
+		// write_all already flushes, just to be sure.
+		let _ = stdin.flush().await;
+		drop(stdin);
+
+		let out = child
+			.wait_with_output()
+			.await
+			.map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
+		let stdout = out
+			.stdout
+			.strip_suffix(b"\n")
+			.unwrap_or(&out.stdout)
+			.to_owned();
+
+		let id: u32 = String::from_utf8(stdout)
+			.map_err(|e| Error::InputError(format!("rofi produced invalid output: {e}")))?
+			.parse()
+			.map_err(|e| Error::InputError(format!("rofi produced invalid output: {e}")))?;
+		if id as usize >= variants.len() {
+			return Err(Error::InputError("invalid rofi response".to_owned()));
+		}
+
+		Ok(id)
+	}
+
+	async fn prompt_text(
+		&self,
+		echo: bool,
+		prompt: &str,
+		description: &str,
+		source: &[Source],
+	) -> Result<String> {
+		trace!("rofi text");
+		let mut cmd = rofi_command();
+		let mesg = if source.is_empty() {
+			description.to_owned()
+		} else {
+			let mut out = format!("{description}\n\n<b>Requested on ",);
+			for (i, s) in source.iter().enumerate() {
+				if i != 0 {
+					out.push_str(" -> ");
+				}
+				out.push_str(&s.to_string());
+			}
+			out.push_str("</b>");
+			out
+		};
+		cmd.args(["-dmenu", "-mesg", &mesg, "-p", fixup_prompt(prompt)]);
+		if !echo {
+			cmd.arg("-password");
+		}
+		cmd.stdin(Stdio::null());
+		cmd.stdout(Stdio::piped());
+		cmd.kill_on_drop(true);
+		let child = cmd
+			.spawn()
+			.map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
+
+		let out = child
+			.wait_with_output()
+			.await
+			.map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
+		let stdout = out
+			.stdout
+			.strip_suffix(b"\n")
+			.unwrap_or(&out.stdout)
+			.to_owned();
+
+		Ok(String::from_utf8_lossy(&stdout).to_string())
+	}
+
+	async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
+		trace!("rofi display");
+		let mut cmd = rofi_command();
+		let mut mesg = if source.is_empty() {
+			description.to_owned()
+		} else {
+			let mut out = format!("{description}\n\n<b>Coming from ",);
+			for s in source.iter() {
+				out.push_str(&s.to_string());
+			}
+			out.push_str("</b>");
+			out
+		};
+		if error {
+			mesg.insert_str(0, "<span color=\"red\">");
+			mesg.push_str("</span>");
+		}
+		cmd.args(["-e", &mesg, "-markup"]);
+		cmd.stdin(Stdio::null());
+		cmd.stdout(Stdio::null());
+		cmd.kill_on_drop(true);
+		let mut child = cmd
+			.spawn()
+			.map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
+
+		child
+			.wait()
+			.await
+			.map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
+
+		Ok(())
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use std::borrow::Cow;
+
+	use crate::rofi::RofiPrompter;
+	use crate::{PrependSourcePrompter, Prompter as _, Source};
+
+	// #[tokio::test]
+	#[tokio::test]
+	#[ignore = "interactive"]
+	async fn test() {
+		let prompter = PrependSourcePrompter {
+			prompter: RofiPrompter,
+			description: "test".to_owned(),
+			source: vec![Source(Cow::Borrowed("ssh"))],
+		};
+		prompter
+			.prompt_radio("Enable", "Polkit needs access", &[])
+			.await
+			.expect("rofi");
+		prompter
+			.prompt_text(false, "Password", "Polkit needs access", &[])
+			.await
+			.expect("rofi");
+		prompter
+			.display_text(true, "Polkit needs access", &[])
+			.await
+			.expect("rofi");
+	}
+}
deletedcrates/ui-prompt/Cargo.tomldiffbeforeafterboth
--- a/crates/ui-prompt/Cargo.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-[package]
-name = "remowt-ui-prompt"
-description = "Interactive UI prompt endpoint for remowt (D-Bus)"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-serde.workspace = true
-serde_json.workspace = true
-thiserror.workspace = true
-tokio = { workspace = true, features = ["io-util", "macros", "process", "rt"] }
-tracing.workspace = true
-zbus = { workspace = true, optional = true }
-
-[features]
-default = ["dbus"]
-dbus = ["dep:zbus"]
deletedcrates/ui-prompt/src/bifrost.rsdiffbeforeafterboth
--- a/crates/ui-prompt/src/bifrost.rs
+++ /dev/null
@@ -1,109 +0,0 @@
-use bifrostlink::{Config, Rpc};
-use bifrostlink_macros::endpoints;
-use serde::{Deserialize, Serialize};
-
-use crate::{Error, Prompter, Source};
-
-pub struct PromptEndpoints<P>(pub P);
-
-#[endpoints(ns = 2)]
-impl<P> PromptEndpoints<P>
-where
-	P: Prompter + Send + Sync + 'static,
-{
-	#[endpoints(id = 1, cancel)]
-	async fn prompt_enum(
-		&self,
-		prompt: String,
-		description: String,
-		variants: Vec<String>,
-		source: Vec<Source>,
-	) -> Result<u32, Error> {
-		let variants: Vec<&str> = variants.iter().map(|v| v.as_str()).collect();
-		self.0
-			.prompt_enum(&prompt, &description, &variants, &source)
-			.await
-	}
-
-	#[endpoints(id = 2, cancel)]
-	async fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: String,
-		description: String,
-		source: Vec<Source>,
-	) -> Result<String, Error> {
-		self.0
-			.prompt_text(echo, &prompt, &description, &source)
-			.await
-	}
-
-	#[endpoints(id = 3, cancel)]
-	async fn display_text(
-		&self,
-		error: bool,
-		description: String,
-		source: Vec<Source>,
-	) -> Result<(), Error> {
-		self.0.display_text(error, &description, &source).await
-	}
-}
-
-impl<C: Config> Prompter for PromptEndpointsClient<C>
-where
-	Error: ToString,
-{
-	async fn prompt_enum(
-		&self,
-		prompt: &str,
-		description: &str,
-		variants: &[&str],
-		source: &[Source],
-	) -> crate::Result<u32> {
-		self.prompt_enum(
-			prompt.to_owned(),
-			description.to_owned(),
-			variants.iter().map(|v| (*v).to_owned()).collect(),
-			source.to_vec(),
-		)
-		.await
-		.map_err(|e| Error::Remote(e.to_string()))?
-	}
-
-	async fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> crate::Result<String> {
-		self.prompt_text(
-			echo,
-			prompt.to_owned(),
-			description.to_owned(),
-			source.to_vec(),
-		)
-		.await
-		.map_err(|e| Error::Remote(e.to_string()))?
-	}
-
-	async fn display_text(
-		&self,
-		error: bool,
-		description: &str,
-		source: &[Source],
-	) -> crate::Result<()> {
-		self.display_text(error, description.to_owned(), source.to_vec())
-			.await
-			.map_err(|e| Error::Remote(e.to_string()))?
-	}
-}
-
-pub fn serve_prompts<P, C>(rpc: &mut Rpc<C>, prompt: P)
-where
-	P: Prompter + Send + Sync + 'static,
-	C: Config,
-	C::Error: From<Error>,
-{
-	PromptEndpoints(prompt).register_endpoints(rpc);
-}
deletedcrates/ui-prompt/src/dbus.rsdiffbeforeafterboth
--- a/crates/ui-prompt/src/dbus.rs
+++ /dev/null
@@ -1,135 +0,0 @@
-use zbus::interface;
-use zbus::{fdo, proxy};
-
-use crate::Source;
-use crate::{BlockingPrompter, Result};
-use crate::{Error, Prompter};
-
-pub struct DbusPrompterInterface<P>(pub P);
-
-#[interface(name = "lach.PolkitInputHandler")]
-impl<P: Prompter + Send + Sync + 'static> DbusPrompterInterface<P> {
-	async fn prompt_radio(
-		&self,
-		prompt: &str,
-		description: &str,
-		source: Vec<Source>,
-	) -> fdo::Result<bool> {
-		Ok(self.0.prompt_radio(prompt, description, &source).await?)
-	}
-	async fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: Vec<Source>,
-	) -> fdo::Result<String> {
-		Ok(self
-			.0
-			.prompt_text(echo, prompt, description, &source)
-			.await?)
-	}
-	async fn display_text(
-		&self,
-		error: bool,
-		description: &str,
-		source: Vec<Source>,
-	) -> fdo::Result<()> {
-		Ok(self.0.display_text(error, description, &source).await?)
-	}
-}
-
-#[proxy(interface = "lach.PolkitInputHandler")]
-pub trait DbusPrompter {
-	async fn prompt_enum(
-		&self,
-		prompt: &str,
-		description: &str,
-		variants: &[&str],
-		source: &[Source],
-	) -> fdo::Result<u32>;
-	async fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> fdo::Result<String>;
-	async fn display_text(
-		&self,
-		error: bool,
-		description: &str,
-		source: &[Source],
-	) -> fdo::Result<()>;
-}
-
-impl Prompter for DbusPrompterProxy<'_> {
-	async fn prompt_enum(
-		&self,
-		prompt: &str,
-		description: &str,
-		variants: &[&str],
-		source: &[Source],
-	) -> Result<u32> {
-		Ok(self
-			.prompt_enum(prompt, description, variants, source)
-			.await?)
-	}
-
-	async fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> Result<String> {
-		Ok(self.prompt_text(echo, prompt, description, source).await?)
-	}
-
-	async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
-		Ok(self.display_text(error, description, source).await?)
-	}
-}
-impl BlockingPrompter for DbusPrompterProxyBlocking<'_> {
-	fn prompt_enum(
-		&self,
-		prompt: &str,
-		description: &str,
-		variants: &[&str],
-		source: &[Source],
-	) -> Result<u32> {
-		Ok(self.prompt_enum(prompt, description, variants, source)?)
-	}
-
-	fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> Result<String> {
-		Ok(self.prompt_text(echo, prompt, description, source)?)
-	}
-
-	fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
-		Ok(self.display_text(error, description, source)?)
-	}
-}
-
-impl From<fdo::Error> for Error {
-	fn from(value: fdo::Error) -> Self {
-		if matches!(value, fdo::Error::NoReply(_)) {
-			return Self::Cancel;
-		}
-		Self::InputError(format!("{value}"))
-	}
-}
-impl From<Error> for fdo::Error {
-	fn from(value: Error) -> Self {
-		match value {
-			Error::Cancel => fdo::Error::NoReply("input was cancelled".to_owned()),
-			Error::Remote(e) => fdo::Error::NoReply(format!("remote error occured: {e}")),
-			Error::InputError(e) => fdo::Error::Failed(e),
-		}
-	}
-}
deletedcrates/ui-prompt/src/lib.rsdiffbeforeafterboth
--- a/crates/ui-prompt/src/lib.rs
+++ /dev/null
@@ -1,201 +0,0 @@
-use core::fmt;
-use std::borrow::Cow;
-use std::future::Future;
-use std::result;
-
-pub mod bifrost;
-pub mod dbus;
-pub mod rofi;
-
-#[derive(thiserror::Error, Debug, serde::Serialize, serde::Deserialize)]
-pub enum Error {
-	#[error("user has cancelled input")]
-	Cancel,
-	#[error("input error: {0}")]
-	InputError(String),
-	#[error("unknown remote error: {0}")]
-	Remote(String),
-}
-
-pub type Result<T, E = Error> = result::Result<T, E>;
-
-#[cfg_attr(feature = "dbus", derive(zbus::zvariant::Type))]
-#[derive(serde::Serialize, serde::Deserialize, Clone)]
-pub struct Source(pub Cow<'static, str>);
-impl fmt::Display for Source {
-	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-		write!(f, "<u>{}</u>", self.0)
-	}
-}
-
-pub trait Prompter: Send + Sync {
-	fn prompt_radio(
-		&self,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> impl Future<Output = Result<bool>> + Send {
-		let fut = self.prompt_enum(prompt, description, &["No", "Yes"], source);
-		async { fut.await.map(|v| v == 1) }
-	}
-	fn prompt_enum(
-		&self,
-		prompt: &str,
-		description: &str,
-		variants: &[&str],
-		source: &[Source],
-	) -> impl Future<Output = Result<u32>> + Send;
-	fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> impl Future<Output = Result<String>> + Send;
-	fn display_text(
-		&self,
-		error: bool,
-		description: &str,
-		source: &[Source],
-	) -> impl Future<Output = Result<()>> + Send;
-}
-pub trait BlockingPrompter {
-	fn prompt_radio(&self, prompt: &str, description: &str, source: &[Source]) -> Result<bool> {
-		self.prompt_enum(prompt, description, &["No", "Yes"], source)
-			.map(|v| v == 1)
-	}
-	fn prompt_enum(
-		&self,
-		prompt: &str,
-		description: &str,
-		variants: &[&str],
-		source: &[Source],
-	) -> Result<u32>;
-	fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> Result<String>;
-	fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()>;
-}
-impl<P> Prompter for &P
-where
-	P: Prompter,
-{
-	fn prompt_radio(
-		&self,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> impl Future<Output = Result<bool>> + Send {
-		(*self).prompt_radio(prompt, description, source)
-	}
-
-	fn prompt_enum(
-		&self,
-		prompt: &str,
-		description: &str,
-		variants: &[&str],
-		source: &[Source],
-	) -> impl Future<Output = Result<u32>> + Send {
-		(*self).prompt_enum(prompt, description, variants, source)
-	}
-
-	fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> impl Future<Output = Result<String>> + Send {
-		(*self).prompt_text(echo, prompt, description, source)
-	}
-
-	fn display_text(
-		&self,
-		error: bool,
-		description: &str,
-		source: &[Source],
-	) -> impl Future<Output = Result<()>> + Send {
-		(*self).display_text(error, description, source)
-	}
-}
-
-pub struct PrependSourcePrompter<P> {
-	pub prompter: P,
-	pub source: Vec<Source>,
-	pub description: String,
-}
-impl<P> PrependSourcePrompter<P> {
-	fn source(&self, input: &[Source]) -> Vec<Source> {
-		let mut out = self.source.clone();
-		out.extend(input.iter().cloned());
-		out
-	}
-	fn description(&self, input: &str) -> String {
-		if self.description.is_empty() {
-			input.to_owned()
-		} else if input.is_empty() {
-			self.description.to_owned()
-		} else {
-			format!("{input}\n\n{}", self.description)
-		}
-	}
-}
-impl<P> Prompter for PrependSourcePrompter<P>
-where
-	P: Prompter + Sync,
-{
-	async fn prompt_radio(
-		&self,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> Result<bool> {
-		self.prompter
-			.prompt_radio(prompt, &self.description(description), &self.source(source))
-			.await
-	}
-
-	async fn prompt_enum(
-		&self,
-		prompt: &str,
-		description: &str,
-		variants: &[&str],
-		source: &[Source],
-	) -> Result<u32> {
-		self.prompter
-			.prompt_enum(
-				prompt,
-				&self.description(description),
-				variants,
-				&self.source(source),
-			)
-			.await
-	}
-
-	async fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> Result<String> {
-		self.prompter
-			.prompt_text(
-				echo,
-				prompt,
-				&self.description(description),
-				&self.source(source),
-			)
-			.await
-	}
-
-	async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
-		self.prompter
-			.display_text(error, &self.description(description), &self.source(source))
-			.await
-	}
-}
deletedcrates/ui-prompt/src/rofi.rsdiffbeforeafterboth
--- a/crates/ui-prompt/src/rofi.rs
+++ /dev/null
@@ -1,208 +0,0 @@
-use std::process::Stdio;
-
-use tokio::io::AsyncWriteExt;
-use tokio::process::Command;
-use tracing::trace;
-
-use crate::{Error, Prompter, Result, Source};
-
-#[derive(Clone)]
-pub struct RofiPrompter;
-
-fn fixup_prompt(prompt: &str) -> &str {
-	// Rofi always appends such suffix
-	prompt.strip_suffix(": ").unwrap_or(prompt)
-}
-
-fn rofi_command() -> Command {
-	Command::new(option_env!("ROFI").unwrap_or("rofi"))
-}
-
-impl Prompter for RofiPrompter {
-	async fn prompt_enum(
-		&self,
-		prompt: &str,
-		description: &str,
-		variants: &[&str],
-		source: &[Source],
-	) -> Result<u32> {
-		trace!("rofi radio");
-		let mut cmd = rofi_command();
-		let mesg = if source.is_empty() {
-			description.to_owned()
-		} else {
-			let mut out = format!("{description}\n\n<b>Requested on ",);
-			for (i, s) in source.iter().enumerate() {
-				if i != 0 {
-					out.push_str(" -> ");
-				}
-				out.push_str(&s.to_string());
-			}
-			out.push_str("</b>");
-			out
-		};
-		cmd.args([
-			"-dmenu",
-			"-mesg",
-			&mesg,
-			"-sync",
-			"-only-match",
-			"-p",
-			fixup_prompt(prompt),
-			"-format",
-			"i",
-			"-markup-rows",
-		]);
-		cmd.stdin(Stdio::piped());
-		cmd.stdout(Stdio::piped());
-		cmd.kill_on_drop(true);
-		let mut child = cmd
-			.spawn()
-			.map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
-
-		let mut stdin = child.stdin.take().expect("stdin is piped");
-		for var in variants {
-			stdin
-				.write_all(var.replace('\n', " ").as_bytes())
-				.await
-				.map_err(|e| Error::InputError(format!("failed to write rofi variants: {e}")))?;
-			stdin
-				.write_all(b"\n")
-				.await
-				.map_err(|e| Error::InputError(format!("failed to write rofi variants: {e}")))?;
-		}
-		// write_all already flushes, just to be sure.
-		let _ = stdin.flush().await;
-		drop(stdin);
-
-		let out = child
-			.wait_with_output()
-			.await
-			.map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
-		let stdout = out
-			.stdout
-			.strip_suffix(b"\n")
-			.unwrap_or(&out.stdout)
-			.to_owned();
-
-		let id: u32 = String::from_utf8(stdout)
-			.map_err(|e| Error::InputError(format!("rofi produced invalid output: {e}")))?
-			.parse()
-			.map_err(|e| Error::InputError(format!("rofi produced invalid output: {e}")))?;
-		if id as usize >= variants.len() {
-			return Err(Error::InputError("invalid rofi response".to_owned()));
-		}
-
-		Ok(id)
-	}
-
-	async fn prompt_text(
-		&self,
-		echo: bool,
-		prompt: &str,
-		description: &str,
-		source: &[Source],
-	) -> Result<String> {
-		trace!("rofi text");
-		let mut cmd = rofi_command();
-		let mesg = if source.is_empty() {
-			description.to_owned()
-		} else {
-			let mut out = format!("{description}\n\n<b>Requested on ",);
-			for (i, s) in source.iter().enumerate() {
-				if i != 0 {
-					out.push_str(" -> ");
-				}
-				out.push_str(&s.to_string());
-			}
-			out.push_str("</b>");
-			out
-		};
-		cmd.args(["-dmenu", "-mesg", &mesg, "-p", fixup_prompt(prompt)]);
-		if !echo {
-			cmd.arg("-password");
-		}
-		cmd.stdin(Stdio::null());
-		cmd.stdout(Stdio::piped());
-		cmd.kill_on_drop(true);
-		let child = cmd
-			.spawn()
-			.map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
-
-		let out = child
-			.wait_with_output()
-			.await
-			.map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
-		let stdout = out
-			.stdout
-			.strip_suffix(b"\n")
-			.unwrap_or(&out.stdout)
-			.to_owned();
-
-		Ok(String::from_utf8_lossy(&stdout).to_string())
-	}
-
-	async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
-		trace!("rofi display");
-		let mut cmd = rofi_command();
-		let mut mesg = if source.is_empty() {
-			description.to_owned()
-		} else {
-			let mut out = format!("{description}\n\n<b>Coming from ",);
-			for s in source.iter() {
-				out.push_str(&s.to_string());
-			}
-			out.push_str("</b>");
-			out
-		};
-		if error {
-			mesg.insert_str(0, "<span color=\"red\">");
-			mesg.push_str("</span>");
-		}
-		cmd.args(["-e", &mesg, "-markup"]);
-		cmd.stdin(Stdio::null());
-		cmd.stdout(Stdio::null());
-		cmd.kill_on_drop(true);
-		let mut child = cmd
-			.spawn()
-			.map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
-
-		child
-			.wait()
-			.await
-			.map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
-
-		Ok(())
-	}
-}
-
-#[cfg(test)]
-mod tests {
-	use std::borrow::Cow;
-
-	use crate::rofi::RofiPrompter;
-	use crate::{PrependSourcePrompter, Prompter as _, Source};
-
-	// #[tokio::test]
-	#[tokio::test]
-	#[ignore = "interactive"]
-	async fn test() {
-		let prompter = PrependSourcePrompter {
-			prompter: RofiPrompter,
-			description: "test".to_owned(),
-			source: vec![Source(Cow::Borrowed("ssh"))],
-		};
-		prompter
-			.prompt_radio("Enable", "Polkit needs access", &[])
-			.await
-			.expect("rofi");
-		prompter
-			.prompt_text(false, "Password", "Polkit needs access", &[])
-			.await
-			.expect("rofi");
-		prompter
-			.display_text(true, "Polkit needs access", &[])
-			.await
-			.expect("rofi");
-	}
-}