git.delta.rocks / remowt / refs/commits / 42e2f16609cb

difftreelog

feat basic plugin loading

mwqtzvovYaroslav Bolyukin2026-06-07parent: #69f690d.patch.diff
in: trunk

11 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1080,22 +1080,6 @@
 ]
 
 [[package]]
-name = "fleet-nix-daemon"
-version = "0.1.0"
-dependencies = [
- "anyhow",
- "bifrostlink",
- "bifrostlink-macros",
- "camino",
- "remowt-client",
- "serde",
- "thiserror 2.0.18",
- "tokio",
- "tracing",
- "uuid",
-]
-
-[[package]]
 name = "foldhash"
 version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2270,6 +2254,7 @@
  "polkit-shared",
  "rand 0.8.5",
  "remowt-link-shared",
+ "remowt-plugin",
  "remowt-pty",
  "serde",
  "tempfile",
@@ -2333,6 +2318,36 @@
 ]
 
 [[package]]
+name = "remowt-nix-daemon"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "bifrostlink",
+ "bifrostlink-macros",
+ "camino",
+ "remowt-client",
+ "serde",
+ "thiserror 2.0.18",
+ "tokio",
+ "tracing",
+ "uuid",
+]
+
+[[package]]
+name = "remowt-plugin"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "bifrostlink",
+ "bifrostlink-ports",
+ "bytes",
+ "remowt-link-shared",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
 name = "remowt-pty"
 version = "0.1.0"
 dependencies = [
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,6 +10,7 @@
 remowt-client = { path = "crates/remowt-client" }
 polkit-shared = { version = "0.1.0", path = "crates/polkit-shared" }
 remowt-link-shared = { version = "0.1.0", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.0", path = "crates/remowt-plugin" }
 ui-prompt = { version = "0.1.0", path = "crates/ui-prompt" }
 
 bifrostlink = "0.2.0"
modifiedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth
--- a/cmds/remowt-agent/Cargo.toml
+++ b/cmds/remowt-agent/Cargo.toml
@@ -14,6 +14,7 @@
 polkit-shared.workspace = true
 rand.workspace = true
 remowt-link-shared.workspace = true
+remowt-plugin.workspace = true
 remowt-pty.workspace = true
 serde = { workspace = true, features = ["derive"] }
 tempfile.workspace = true
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -253,6 +253,8 @@
 	Systemd.register_endpoints(&mut rpc);
 	Pty::new().register_endpoints(&mut rpc);
 
+	remowt_plugin::host::serve(&mut rpc);
+
 	let user_prompter = PromptEndpointsClient::wrap(rpc.remote(Address::User));
 	let editor_client = EditorEndpointsClient::wrap(rpc.remote(Address::User));
 
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
after · crates/remowt-client/src/lib.rs
1use std::collections::HashMap;2use std::io;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_link_shared::plugin::PluginEndpointsClient;13use remowt_link_shared::{14	Address, BifConfig, ElevateEndpoints, ElevateError, Elevator, Fs, Pty, PtyClient, ShellId,15	Systemd,16};17use russh::client::{connect, Config, Handle, Handler, Msg, Session};18use russh::keys::agent::client::AgentClient;19use russh::keys::agent::AgentIdentity;20use russh::keys::check_known_hosts;21use russh::keys::ssh_key::PublicKey;22use russh::{Channel, ChannelMsg, ChannelStream};23use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};24use tokio::join;25use tokio::net::UnixListener;26use tokio::sync::mpsc;27use tokio::sync::oneshot::{self, channel};28use tracing::error;29use uuid::Uuid;3031pub mod editor;3233type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3435async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {36	let len = srx.read_u32().await?;37	let mut buf = BytesMut::zeroed(len as usize);38	srx.read_exact(&mut buf).await?;39	Ok(buf)40}41async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {42	stx.write_u32(value.len().try_into().expect("can't be larger"))43		.await?;44	stx.write_all(&value).await?;45	Ok(())46}4748fn sh_quote(s: impl AsRef<str>) -> String {49	format!("'{}'", s.as_ref().replace('\'', "'\\''"))50}5152const ESCALATORS: [(&str, &[&str]); 3] = [53	("run0", &["--background=", "--pipe"]),54	("sudo", &[]),55	("doas", &[]),56];5758pub struct AgentBundle {59	dir: PathBuf,60	hashes: HashMap<String, String>,61}6263impl AgentBundle {64	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {65		let dir = dir.into();66		let hashes_path = dir.join("hashes");67		let raw = std::fs::read_to_string(&hashes_path)68			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;69		let mut hashes = HashMap::new();70		for line in raw.lines() {71			let line = line.trim();72			if line.is_empty() {73				continue;74			}75			let (arch, hash) = line76				.split_once(char::is_whitespace)77				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;78			hashes.insert(arch.to_owned(), hash.trim().to_owned());79		}80		ensure!(81			!hashes.is_empty(),82			"agent bundle {} has no hashes",83			dir.display()84		);85		Ok(Self { dir, hashes })86	}8788	fn binary(&self, arch: &str) -> PathBuf {89		self.dir.join(format!("remowt-agent-{arch}"))90	}91}9293async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {94	let mut ch = sess.channel_open_session().await?;95	ch.exec(true, cmd).await?;96	let mut out = Vec::new();97	let mut code = None;98	while let Some(msg) = ch.wait().await {99		match msg {100			ChannelMsg::Data { data } => out.extend(data.as_ref()),101			ChannelMsg::ExtendedData { data, .. } => {102				error!(103					"remote stderr: {}",104					String::from_utf8_lossy(data.as_ref()).trim()105				);106			}107			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),108			_ => {}109		}110	}111	Ok((code, out))112}113114async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {115	let (code, mut out) = run(sess, cmd).await?;116	ensure!(117		code == Some(0),118		"remote command failed (exit {code:?}): {cmd}"119	);120	ensure!(out.ends_with(b"\n"));121	out.pop();122	String::from_utf8(out).context("expected utf8 output for command")123}124125async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {126	let arch = run_string_ok(sess, "uname -m").await?;127	let hash = bundle128		.hashes129		.get(&arch)130		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;131132	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")133		.await?134		.trim()135		.to_owned();136	let dir = if cache.is_empty() {137		let home = run_string_ok(sess, "echo \"$HOME\"").await?;138		ensure!(139			!home.is_empty(),140			"remote $HOME and $XDG_CACHE_HOME both empty"141		);142		Utf8PathBuf::from(home).join("cache/remowt")143	} else {144		Utf8PathBuf::from(cache).join("remowt")145	};146	let path = dir.join(hash);147148	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;149	if present != Some(0) {150		let bin = bundle.binary(&arch);151		let bytes = std::fs::read(&bin)152			.with_context(|| format!("reading agent binary {}", bin.display()))?;153		upload_agent(sess, &dir, &path, bytes).await?;154	}155	Ok(path)156}157158async fn upload_agent(159	sess: &Handle<SshHandler>,160	dir: &Utf8Path,161	path: &Utf8Path,162	bytes: Vec<u8>,163) -> Result<()> {164	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;165166	let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));167	let ch = sess.channel_open_session().await?;168	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;169	ch.data_bytes(bytes).await?;170	ch.eof().await?;171	let mut ch = ch;172	let mut code = None;173	while let Some(msg) = ch.wait().await {174		match msg {175			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),176			ChannelMsg::ExtendedData { data, .. } => {177				error!(178					"agent upload: {}",179					String::from_utf8_lossy(data.as_ref()).trim()180				);181			}182			_ => {}183		}184	}185	ensure!(code == Some(0), "agent upload failed (exit {code:?})");186187	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;188	run_string_ok(189		sess,190		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),191	)192	.await?;193	Ok(())194}195196async fn detect_escalation(197	sess: &Handle<SshHandler>,198) -> Result<(&'static str, &'static [&'static str])> {199	for (tool, flags) in ESCALATORS {200		// `tool` is a fixed identifier (no metacharacters), safe to interpolate.201		let (code, _) = run(sess, &format!("command -v {tool}")).await?;202		if code == Some(0) {203			return Ok((tool, flags));204		}205	}206	bail!("no escalation tool (run0/sudo/doas) found on remote")207}208209fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {210	let mut parts = vec![tool.to_owned()];211	parts.extend(flags.iter().map(|f| f.to_string()));212	parts.push(sh_quote(agent_path));213	parts.push("real-agent".to_owned());214	parts.push("--privileged".to_owned());215	if let Some(p) = path {216		parts.push("--path".to_owned());217		parts.push(sh_quote(p));218	}219	parts.join(" ")220}221222fn find_in_path(name: &str) -> Option<std::path::PathBuf> {223	let path = std::env::var_os("PATH")?;224	std::env::split_paths(&path)225		.map(|dir| dir.join(name))226		.find(|p| p.is_file())227}228229fn port_from_channel(ch: Channel<Msg>) -> Port {230	Port::new(move |mut rx, tx| async move {231		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());232		let srx_task = async move {233			loop {234				match read(&mut srx).await {235					Ok(buf) => {236						if tx.send(buf.freeze()).is_err() {237							break;238						}239					}240					Err(e) => {241						error!("channel read failed: {e}");242						break;243					}244				}245			}246		};247		let stx_task = async move {248			while let Some(value) = rx.recv().await {249				if let Err(e) = write(&mut stx, value).await {250					error!("channel write failed: {e}");251					break;252				}253			}254		};255		join!(srx_task, stx_task);256	})257}258259pub struct SshHandler {260	host: String,261	port: u16,262	subs: Subs,263}264impl Handler for SshHandler {265	type Error = russh::Error;266	async fn check_server_key(267		&mut self,268		server_public_key: &PublicKey,269	) -> Result<bool, Self::Error> {270		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)271	}272	async fn server_channel_open_forwarded_streamlocal(273		&mut self,274		channel: Channel<Msg>,275		socket_path: &str,276		_session: &mut Session,277	) -> Result<(), Self::Error> {278		let Some(ch) = self279			.subs280			.lock()281			.expect("lock")282			.remove(&Utf8PathBuf::from(socket_path))283		else {284			return Err(russh::Error::WrongChannel);285		};286		let _ = ch.send(channel);287		Ok(())288	}289}290291struct SshElevator {292	sess: Arc<Handle<SshHandler>>,293	rpc: WeakRpc<BifConfig>,294	agent_path: Utf8PathBuf,295}296impl Elevator for SshElevator {297	async fn elevate(&self) -> Result<(), ElevateError> {298		let fail = |e: String| ElevateError::Failed(e);299		let (tool, flags) = detect_escalation(&self.sess)300			.await301			.map_err(|e| fail(e.to_string()))?;302		let ch = self303			.sess304			.channel_open_session()305			.await306			.map_err(|e| fail(e.to_string()))?;307		ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))308			.await309			.map_err(|e| fail(e.to_string()))?;310		let rpc = self311			.rpc312			.clone()313			.upgrade()314			.ok_or_else(|| fail("rpc is gone".to_owned()))?;315		rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));316		Ok(())317	}318}319320pub struct RemoteChild {321	pub stdout: DuplexStream,322	pub stderr: DuplexStream,323	pub exit: oneshot::Receiver<Option<u32>>,324}325326enum Transport {327	Ssh {328		sess: Arc<Handle<SshHandler>>,329		subs: Subs,330		remote_dir: Utf8PathBuf,331		agent_path: Utf8PathBuf,332	},333	Local {334		#[allow(dead_code)]335		agent: Rpc<BifConfig>,336		agent_path: String,337	},338}339340pub struct Remowt {341	transport: Transport,342	rpc: Rpc<BifConfig>,343	elevated: tokio::sync::OnceCell<()>,344	children: Mutex<Vec<tokio::process::Child>>,345}346347pub type RemowtRemote = Remote<BifConfig>;348349fn loopback() -> (Port, Port) {350	let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();351	let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();352	let user = Port::new(move |mut rx, tx| async move {353		loop {354			tokio::select! {355				msg = rx.recv() => match msg {356					Some(msg) => if a2b_tx.send(msg).is_err() { break },357					None => break,358				},359				msg = b2a_rx.recv() => match msg {360					Some(msg) => if tx.send(msg).is_err() { break },361					None => break,362				},363			}364		}365	});366	let agent = Port::new(move |mut rx, tx| async move {367		loop {368			tokio::select! {369				msg = rx.recv() => match msg {370					Some(msg) => if b2a_tx.send(msg).is_err() { break },371					None => break,372				},373				msg = a2b_rx.recv() => match msg {374					Some(msg) => if tx.send(msg).is_err() { break },375					None => break,376				},377			}378		}379	});380	(user, agent)381}382383impl Remowt {384	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {385		let conf = russh_config::parse_home(host)?;386		let port = conf.host_config.port.unwrap_or(22);387		let hostname = conf388			.host_config389			.hostname390			.clone()391			.unwrap_or_else(|| conf.host_name.clone());392		let user = conf393			.user394			.clone()395			.unwrap_or_else(|| std::env::var("USER").unwrap_or_else(|_| "root".to_owned()));396397		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));398		let mut sess = connect(399			Arc::new(Config::default()),400			(hostname.clone(), port),401			SshHandler {402				host: hostname,403				port,404				subs: subs.clone(),405			},406		)407		.await?;408409		let mut agent = AgentClient::connect_env().await?;410		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();411		let mut authenticated = false;412		for ident in agent.request_identities().await? {413			let AgentIdentity::PublicKey { key, .. } = ident else {414				continue;415			};416			if sess417				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)418				.await?419				.success()420			{421				authenticated = true;422				break;423			}424		}425		ensure!(authenticated, "ssh authentication failed");426427		// All remaining session ops take `&self`; share the handle.428		let sess = Arc::new(sess);429430		let agent_path = deploy_agent(&sess, bundle).await?;431432		let remote_dir = remote_mktemp(&sess).await?;433		let primary = remote_dir.join("primary.sock");434435		let (onetx, onerx) = channel();436		subs.lock().expect("lock").insert(primary.clone(), onetx);437		sess.streamlocal_forward(primary.clone()).await?;438439		let rpc = Rpc::<BifConfig>::new(Address::User);440441		// TODO: ensure no injection is possible in the socket path.442		let cmd_chan = sess.channel_open_session().await?;443		cmd_chan444			.exec(445				true,446				format!(447					"{} real-agent --path={}",448					sh_quote(&agent_path),449					sh_quote(&primary)450				),451			)452			.await?;453454		let port = port_from_channel(455			onerx456				.await457				.map_err(|_| anyhow!("agent never opened its channel"))?,458		);459		rpc.add_direct(Address::Agent, port, Rtt(0));460461		Ok(Self {462			transport: Transport::Ssh {463				sess,464				subs,465				remote_dir,466				agent_path,467			},468			rpc,469			elevated: tokio::sync::OnceCell::new(),470			children: Mutex::new(Vec::new()),471		})472	}473474	pub async fn connect_local(agent_path: &str) -> Result<Self> {475		let (port_user, port_agent) = loopback();476		let rpc = Rpc::<BifConfig>::new(Address::User);477		let mut agent = Rpc::<BifConfig>::new(Address::Agent);478479		// Register handlers before wiring up the link (see the agent binary).480		Fs::new().register_endpoints(&mut agent);481		Systemd.register_endpoints(&mut agent);482		Pty::new().register_endpoints(&mut agent);483484		agent.add_direct(Address::User, port_agent, Rtt(0));485		rpc.add_direct(Address::Agent, port_user, Rtt(0));486487		Ok(Self {488			transport: Transport::Local {489				agent,490				agent_path: agent_path.to_owned(),491			},492			rpc,493			elevated: tokio::sync::OnceCell::new(),494			children: Mutex::new(Vec::new()),495		})496	}497498	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {499		match &self.transport {500			Transport::Ssh { sess, .. } => Some(sess.clone()),501			Transport::Local { .. } => None,502		}503	}504505	pub fn rpc(&self) -> Rpc<BifConfig> {506		self.rpc.clone()507	}508509	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {510		R::wrap(self.rpc.remote(Address::Agent))511	}512513	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {514		let client: PluginEndpointsClient<BifConfig> = self.endpoints();515		client516			.load_plugin(id, name.to_owned())517			.await?518			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))519	}520	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {521		self.ensure_elevated().await?;522		let client: PluginEndpointsClient<BifConfig> =523			PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));524		client525			.load_plugin_path(id, path.to_owned())526			.await?527			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))528	}529	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {530		R::wrap(self.rpc.remote(Address::Plugin(id)))531	}532	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {533		self.ensure_elevated().await?;534		Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))535	}536537	async fn ensure_elevated(&self) -> Result<()> {538		self.elevated539			.get_or_try_init(|| async {540				let port = match &self.transport {541					Transport::Ssh {542						sess, agent_path, ..543					} => {544						let (tool, flags) = detect_escalation(sess).await?;545						let ch = sess.channel_open_session().await?;546						ch.exec(true, privileged_cmd(tool, flags, agent_path, None))547							.await?;548						port_from_channel(ch)549					}550					Transport::Local { agent_path, .. } => {551						let sock = std::env::temp_dir()552							.join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));553						let _ = std::fs::remove_file(&sock);554						let listener = UnixListener::bind(&sock)?;555						let (tool, flags) = ESCALATORS556							.iter()557							.find(|(t, _)| find_in_path(t).is_some())558							.ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;559						let child = tokio::process::Command::new(tool)560							.args(*flags)561							.arg(agent_path)562							.arg("real-agent")563							.arg("--privileged")564							.arg("--path")565							.arg(sock.to_str().expect("temp path is utf-8"))566							.kill_on_drop(true)567							.spawn()?;568						self.children.lock().expect("lock").push(child);569						let (stream, _) = listener.accept().await?;570						let _ = std::fs::remove_file(&sock);571						from_socket(stream)572					}573				};574				self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));575				anyhow::Ok(())576			})577			.await?;578		Ok(())579	}580581	pub async fn exec(&self, command: String) -> Result<RemoteChild> {582		let Some(sess) = self.ssh() else {583			bail!("exec should not be called on local")584		};585		let ch = sess.channel_open_session().await?;586		ch.exec(true, command).await?;587588		let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);589		let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);590		let (exit_tx, exit) = oneshot::channel();591592		tokio::spawn(async move {593			let mut ch = ch;594			let mut code = None;595			while let Some(msg) = ch.wait().await {596				match msg {597					ChannelMsg::Data { data } => {598						if out_w.write_all(&data).await.is_err() {599							break;600						}601					}602					ChannelMsg::ExtendedData { data, .. } => {603						if err_w.write_all(&data).await.is_err() {604							break;605						}606					}607					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),608					_ => {}609				}610			}611			let _ = out_w.shutdown().await;612			let _ = err_w.shutdown().await;613			let _ = exit_tx.send(code);614		});615616		Ok(RemoteChild {617			stdout,618			stderr,619			exit,620		})621	}622623	pub fn serve_elevate(&self) -> Result<()> {624		let Transport::Ssh {625			sess, agent_path, ..626		} = &self.transport627		else {628			bail!("elevate should not be called on local")629		};630		let mut rpc = self.rpc.clone();631		ElevateEndpoints(SshElevator {632			sess: sess.clone(),633			rpc: self.rpc.clone().downgrade(),634			agent_path: agent_path.to_owned(),635		})636		.register_endpoints(&mut rpc);637		Ok(())638	}639640	pub fn remote_dir(&self) -> Option<&Utf8Path> {641		match &self.transport {642			Transport::Ssh { remote_dir, .. } => Some(remote_dir),643			Transport::Local { .. } => None,644		}645	}646647	pub async fn forward_socket(648		&self,649		remote_path: &Utf8Path,650	) -> Result<oneshot::Receiver<Channel<Msg>>> {651		let Transport::Ssh { sess, subs, .. } = &self.transport else {652			bail!("forward_socket should not be called on local")653		};654		let (tx, rx) = oneshot::channel();655		subs.lock()656			.expect("lock")657			.insert(remote_path.to_owned(), tx);658		sess.streamlocal_forward(remote_path.to_owned()).await?;659		Ok(rx)660	}661662	pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {663		let Transport::Ssh { remote_dir, .. } = &self.transport else {664			bail!("open_shell should not be called on local")665		};666		let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));667668		let rx = self.forward_socket(&sock).await?;669		let client: PtyClient<BifConfig> = self.endpoints();670		let id = client671			.open_shell(sock, term.to_owned(), cols, rows)672			.await?673			.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;674		let ch = rx675			.await676			.map_err(|_| anyhow!("agent never connected the shell socket"))?;677678		Ok(Shell {679			id,680			stream: ch.into_stream(),681			remote: self.rpc.remote(Address::Agent),682		})683	}684}685686pub struct Shell {687	pub id: ShellId,688	pub stream: ChannelStream<Msg>,689	remote: Remote<BifConfig>,690}691692impl Shell {693	pub fn resizer(&self) -> ShellResizer {694		ShellResizer {695			remote: self.remote.clone(),696			id: self.id,697		}698	}699}700701#[derive(Clone)]702pub struct ShellResizer {703	remote: Remote<BifConfig>,704	id: ShellId,705}706707impl ShellResizer {708	pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {709		PtyClient::wrap(self.remote.clone())710			.resize(self.id, cols, rows)711			.await?712			.map_err(|e| anyhow!("failed to resize remote shell: {e}"))713	}714}715716async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {717	let mut cmd_chan = sess.channel_open_session().await?;718	cmd_chan719		.exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")720		.await?;721	let mut stdout = vec![];722	loop {723		let Some(msg) = cmd_chan.wait().await else {724			bail!("unexpected channel end");725		};726		match msg {727			russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),728			russh::ChannelMsg::ExitStatus { exit_status } => {729				if exit_status != 0 {730					bail!("mktemp failed");731				}732				break;733			}734			_ => {}735		}736	}737	ensure!(stdout.ends_with(b"\n"));738	stdout.pop();739	Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))740}
modifiedcrates/remowt-link-shared/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -13,9 +13,12 @@
 	User,
 	Agent,
 	AgentPrivileged,
+	Plugin(u16),
 }
 impl AddressT for Address {}
 
+pub mod plugin;
+
 pub use remowt_fs::{Error as FsError, Fs, FsClient};
 pub use remowt_pty::{Error as PtyError, Pty, PtyClient, ShellId};
 pub use remowt_systemd::{Error as SystemdError, Systemd, SystemdClient};
addedcrates/remowt-link-shared/src/plugin.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-link-shared/src/plugin.rs
@@ -0,0 +1,39 @@
+use std::future::Future;
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+	#[error("plugin name must be a bare file name")]
+	BadName,
+	#[error("spawning plugin failed: {0}")]
+	Spawn(String),
+	#[error("agent is shutting down")]
+	Gone,
+}
+
+pub trait PluginHost: Send + Sync {
+	fn load_plugin(&self, id: u16, name: String) -> impl Future<Output = Result<(), Error>> + Send;
+
+	fn load_plugin_path(
+		&self,
+		id: u16,
+		path: String,
+	) -> impl Future<Output = Result<(), Error>> + Send;
+}
+
+pub struct PluginEndpoints<H>(pub H);
+
+#[endpoints(ns = 9)]
+impl<H: PluginHost + 'static> PluginEndpoints<H> {
+	#[endpoints(id = 1)]
+	async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {
+		self.0.load_plugin(id, name).await
+	}
+	#[endpoints(id = 2)]
+	async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {
+		self.0.load_plugin_path(id, path).await
+	}
+}
modifiedcrates/remowt-nix-daemon/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-nix-daemon/Cargo.toml
+++ b/crates/remowt-nix-daemon/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
-name = "fleet-nix-daemon"
-description = "Nix daemon proxy endpoint + connection logic for fleet"
+name = "remowt-nix-daemon"
+description = "Nix daemon proxy"
 version.workspace = true
 edition = "2021"
 
addedcrates/remowt-plugin/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-plugin/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "remowt-plugin"
+version.workspace = true
+edition = "2021"
+
+[dependencies]
+anyhow.workspace = true
+bifrostlink.workspace = true
+bifrostlink-ports.workspace = true
+bytes.workspace = true
+remowt-link-shared.workspace = true
+tokio = { workspace = true, features = [
+	"rt",
+	"net",
+	"io-std",
+	"io-util",
+	"macros",
+	"time",
+	"process",
+] }
+tracing.workspace = true
+tracing-subscriber.workspace = true
addedcrates/remowt-plugin/src/host.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-plugin/src/host.rs
@@ -0,0 +1,110 @@
+use std::ffi::OsStr;
+use std::io;
+use std::process::Stdio;
+use std::sync::Mutex;
+
+use bifrostlink::{Port, Rpc, Rtt, WeakRpc};
+use bytes::{Bytes, BytesMut};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
+use tokio::process::{Child, ChildStdin, ChildStdout, Command};
+
+use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};
+use remowt_link_shared::{Address, BifConfig};
+
+pub fn serve(rpc: &mut Rpc<BifConfig>) {
+	let host = Host {
+		rpc: rpc.clone().downgrade(),
+		children: Mutex::new(Vec::new()),
+	};
+	PluginEndpoints(host).register_endpoints(rpc);
+}
+
+struct Host {
+	rpc: WeakRpc<BifConfig>,
+	children: Mutex<Vec<Child>>,
+}
+
+impl Host {
+	fn spawn(&self, id: u16, path: impl AsRef<OsStr>) -> Result<(), Error> {
+		let rpc = self.rpc.clone().upgrade().ok_or(Error::Gone)?;
+
+		let mut child = Command::new(path)
+			.arg(id.to_string())
+			.stdin(Stdio::piped())
+			.stdout(Stdio::piped())
+			.kill_on_drop(true)
+			.spawn()
+			.map_err(|e| Error::Spawn(e.to_string()))?;
+		let stdin = child.stdin.take().expect("stdin piped");
+		let stdout = child.stdout.take().expect("stdout piped");
+
+		rpc.add_direct(Address::Plugin(id), child_port(stdout, stdin), Rtt(0));
+		self.children.lock().expect("not poisoned").push(child);
+		Ok(())
+	}
+}
+
+impl PluginHost for Host {
+	async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {
+		// TODO: Right now loads plugin next to the binary...
+		// But with our CA addressed schema, the plugins should be located in content-addressed subdir...
+		// Maybe it should just be scrapped in favor of load_plugin_path.
+		if name.is_empty() || name == "." || name == ".." || name.contains(['/', '\0']) {
+			return Err(Error::BadName);
+		}
+		let exe = std::env::current_exe().map_err(|e| Error::Spawn(e.to_string()))?;
+		let dir = exe
+			.parent()
+			.ok_or_else(|| Error::Spawn("primary agent has no parent directory".to_owned()))?;
+		self.spawn(id, dir.join(&name))
+	}
+
+	async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {
+		if path.is_empty() || path.contains('\0') {
+			return Err(Error::BadName);
+		}
+		self.spawn(id, path)
+	}
+}
+
+fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {
+	Port::new(|mut rx, tx| async move {
+		let reader = async move {
+			loop {
+				let len = match stdout.read_u32().await {
+					Ok(len) => len,
+					Err(e) => {
+						tracing::error!("plugin stdout read failed: {e}");
+						break;
+					}
+				};
+				let mut buf = BytesMut::zeroed(len as usize);
+				if let Err(e) = stdout.read_exact(&mut buf).await {
+					tracing::error!("plugin stdout read failed: {e}");
+					break;
+				}
+				if tx.send(buf.freeze()).is_err() {
+					break;
+				}
+			}
+		};
+		let writer = async move {
+			while let Some(msg) = rx.recv().await {
+				if let Err(e) = write_frame(&mut stdin, msg).await {
+					tracing::error!("plugin stdin write failed: {e}");
+					break;
+				}
+			}
+		};
+		tokio::join!(reader, writer);
+	})
+}
+
+async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {
+	let len = u32::try_from(msg.len())
+		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
+	stdin.write_u32(len).await?;
+	stdin.write_all(&msg).await?;
+	stdin.flush().await?;
+	Ok(())
+}
addedcrates/remowt-plugin/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-plugin/src/lib.rs
@@ -0,0 +1,38 @@
+use std::future::pending;
+
+use anyhow::Result;
+use bifrostlink::{Rpc, Rtt};
+use bifrostlink_ports::stdio::from_stdio;
+use tokio::runtime::Builder;
+
+pub mod host;
+
+pub use bifrostlink;
+pub use remowt_link_shared::{self, Address, BifConfig, Fs, Pty, Systemd};
+
+pub fn plugin_index() -> Result<u16> {
+	let arg = std::env::args()
+		.nth(1)
+		.ok_or_else(|| anyhow::anyhow!("missing plugin index argument"))?;
+	arg.parse()
+		.map_err(|e| anyhow::anyhow!("invalid plugin index {arg:?}: {e}"))
+}
+
+pub fn run<F>(register: F) -> Result<()>
+where
+	F: FnOnce(&mut Rpc<BifConfig>),
+{
+	tracing_subscriber::fmt()
+		.with_writer(std::io::stderr)
+		.init();
+
+	let index = plugin_index()?;
+	let runtime = Builder::new_current_thread().enable_all().build()?;
+	runtime.block_on(async move {
+		let mut rpc = Rpc::<BifConfig>::new(Address::Plugin(index));
+		rpc.add_direct(Address::Agent, from_stdio(), Rtt(0));
+		register(&mut rpc);
+		let _rpc = rpc;
+		pending::<Result<()>>().await
+	})
+}