difftreelog
feat basic plugin loading
in: trunk
11 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1080,22 +1080,6 @@
]
[[package]]
-name = "fleet-nix-daemon"
-version = "0.1.0"
-dependencies = [
- "anyhow",
- "bifrostlink",
- "bifrostlink-macros",
- "camino",
- "remowt-client",
- "serde",
- "thiserror 2.0.18",
- "tokio",
- "tracing",
- "uuid",
-]
-
-[[package]]
name = "foldhash"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2270,6 +2254,7 @@
"polkit-shared",
"rand 0.8.5",
"remowt-link-shared",
+ "remowt-plugin",
"remowt-pty",
"serde",
"tempfile",
@@ -2333,6 +2318,36 @@
]
[[package]]
+name = "remowt-nix-daemon"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "bifrostlink",
+ "bifrostlink-macros",
+ "camino",
+ "remowt-client",
+ "serde",
+ "thiserror 2.0.18",
+ "tokio",
+ "tracing",
+ "uuid",
+]
+
+[[package]]
+name = "remowt-plugin"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "bifrostlink",
+ "bifrostlink-ports",
+ "bytes",
+ "remowt-link-shared",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
name = "remowt-pty"
version = "0.1.0"
dependencies = [
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,6 +10,7 @@
remowt-client = { path = "crates/remowt-client" }
polkit-shared = { version = "0.1.0", path = "crates/polkit-shared" }
remowt-link-shared = { version = "0.1.0", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.0", path = "crates/remowt-plugin" }
ui-prompt = { version = "0.1.0", path = "crates/ui-prompt" }
bifrostlink = "0.2.0"
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- a/cmds/remowt-agent/Cargo.toml
+++ b/cmds/remowt-agent/Cargo.toml
@@ -14,6 +14,7 @@
polkit-shared.workspace = true
rand.workspace = true
remowt-link-shared.workspace = true
+remowt-plugin.workspace = true
remowt-pty.workspace = true
serde = { workspace = true, features = ["derive"] }
tempfile.workspace = true
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -253,6 +253,8 @@
Systemd.register_endpoints(&mut rpc);
Pty::new().register_endpoints(&mut rpc);
+ remowt_plugin::host::serve(&mut rpc);
+
let user_prompter = PromptEndpointsClient::wrap(rpc.remote(Address::User));
let editor_client = EditorEndpointsClient::wrap(rpc.remote(Address::User));
crates/remowt-client/src/lib.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::io;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_link_shared::plugin::PluginEndpointsClient;13use remowt_link_shared::{14 Address, BifConfig, ElevateEndpoints, ElevateError, Elevator, Fs, Pty, PtyClient, ShellId,15 Systemd,16};17use russh::client::{connect, Config, Handle, Handler, Msg, Session};18use russh::keys::agent::client::AgentClient;19use russh::keys::agent::AgentIdentity;20use russh::keys::check_known_hosts;21use russh::keys::ssh_key::PublicKey;22use russh::{Channel, ChannelMsg, ChannelStream};23use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};24use tokio::join;25use tokio::net::UnixListener;26use tokio::sync::mpsc;27use tokio::sync::oneshot::{self, channel};28use tracing::error;29use uuid::Uuid;3031pub mod editor;3233type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3435async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {36 let len = srx.read_u32().await?;37 let mut buf = BytesMut::zeroed(len as usize);38 srx.read_exact(&mut buf).await?;39 Ok(buf)40}41async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {42 stx.write_u32(value.len().try_into().expect("can't be larger"))43 .await?;44 stx.write_all(&value).await?;45 Ok(())46}4748fn sh_quote(s: impl AsRef<str>) -> String {49 format!("'{}'", s.as_ref().replace('\'', "'\\''"))50}5152const ESCALATORS: [(&str, &[&str]); 3] = [53 ("run0", &["--background=", "--pipe"]),54 ("sudo", &[]),55 ("doas", &[]),56];5758pub struct AgentBundle {59 dir: PathBuf,60 hashes: HashMap<String, String>,61}6263impl AgentBundle {64 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {65 let dir = dir.into();66 let hashes_path = dir.join("hashes");67 let raw = std::fs::read_to_string(&hashes_path)68 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;69 let mut hashes = HashMap::new();70 for line in raw.lines() {71 let line = line.trim();72 if line.is_empty() {73 continue;74 }75 let (arch, hash) = line76 .split_once(char::is_whitespace)77 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;78 hashes.insert(arch.to_owned(), hash.trim().to_owned());79 }80 ensure!(81 !hashes.is_empty(),82 "agent bundle {} has no hashes",83 dir.display()84 );85 Ok(Self { dir, hashes })86 }8788 fn binary(&self, arch: &str) -> PathBuf {89 self.dir.join(format!("remowt-agent-{arch}"))90 }91}9293async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {94 let mut ch = sess.channel_open_session().await?;95 ch.exec(true, cmd).await?;96 let mut out = Vec::new();97 let mut code = None;98 while let Some(msg) = ch.wait().await {99 match msg {100 ChannelMsg::Data { data } => out.extend(data.as_ref()),101 ChannelMsg::ExtendedData { data, .. } => {102 error!(103 "remote stderr: {}",104 String::from_utf8_lossy(data.as_ref()).trim()105 );106 }107 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),108 _ => {}109 }110 }111 Ok((code, out))112}113114async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {115 let (code, mut out) = run(sess, cmd).await?;116 ensure!(117 code == Some(0),118 "remote command failed (exit {code:?}): {cmd}"119 );120 ensure!(out.ends_with(b"\n"));121 out.pop();122 String::from_utf8(out).context("expected utf8 output for command")123}124125async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {126 let arch = run_string_ok(sess, "uname -m").await?;127 let hash = bundle128 .hashes129 .get(&arch)130 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;131132 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")133 .await?134 .trim()135 .to_owned();136 let dir = if cache.is_empty() {137 let home = run_string_ok(sess, "echo \"$HOME\"").await?;138 ensure!(139 !home.is_empty(),140 "remote $HOME and $XDG_CACHE_HOME both empty"141 );142 Utf8PathBuf::from(home).join("cache/remowt")143 } else {144 Utf8PathBuf::from(cache).join("remowt")145 };146 let path = dir.join(hash);147148 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;149 if present != Some(0) {150 let bin = bundle.binary(&arch);151 let bytes = std::fs::read(&bin)152 .with_context(|| format!("reading agent binary {}", bin.display()))?;153 upload_agent(sess, &dir, &path, bytes).await?;154 }155 Ok(path)156}157158async fn upload_agent(159 sess: &Handle<SshHandler>,160 dir: &Utf8Path,161 path: &Utf8Path,162 bytes: Vec<u8>,163) -> Result<()> {164 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;165166 let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));167 let ch = sess.channel_open_session().await?;168 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;169 ch.data_bytes(bytes).await?;170 ch.eof().await?;171 let mut ch = ch;172 let mut code = None;173 while let Some(msg) = ch.wait().await {174 match msg {175 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),176 ChannelMsg::ExtendedData { data, .. } => {177 error!(178 "agent upload: {}",179 String::from_utf8_lossy(data.as_ref()).trim()180 );181 }182 _ => {}183 }184 }185 ensure!(code == Some(0), "agent upload failed (exit {code:?})");186187 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;188 run_string_ok(189 sess,190 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),191 )192 .await?;193 Ok(())194}195196async fn detect_escalation(197 sess: &Handle<SshHandler>,198) -> Result<(&'static str, &'static [&'static str])> {199 for (tool, flags) in ESCALATORS {200 // `tool` is a fixed identifier (no metacharacters), safe to interpolate.201 let (code, _) = run(sess, &format!("command -v {tool}")).await?;202 if code == Some(0) {203 return Ok((tool, flags));204 }205 }206 bail!("no escalation tool (run0/sudo/doas) found on remote")207}208209fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {210 let mut parts = vec![tool.to_owned()];211 parts.extend(flags.iter().map(|f| f.to_string()));212 parts.push(sh_quote(agent_path));213 parts.push("real-agent".to_owned());214 parts.push("--privileged".to_owned());215 if let Some(p) = path {216 parts.push("--path".to_owned());217 parts.push(sh_quote(p));218 }219 parts.join(" ")220}221222fn find_in_path(name: &str) -> Option<std::path::PathBuf> {223 let path = std::env::var_os("PATH")?;224 std::env::split_paths(&path)225 .map(|dir| dir.join(name))226 .find(|p| p.is_file())227}228229fn port_from_channel(ch: Channel<Msg>) -> Port {230 Port::new(move |mut rx, tx| async move {231 let (mut srx, mut stx) = tokio::io::split(ch.into_stream());232 let srx_task = async move {233 loop {234 match read(&mut srx).await {235 Ok(buf) => {236 if tx.send(buf.freeze()).is_err() {237 break;238 }239 }240 Err(e) => {241 error!("channel read failed: {e}");242 break;243 }244 }245 }246 };247 let stx_task = async move {248 while let Some(value) = rx.recv().await {249 if let Err(e) = write(&mut stx, value).await {250 error!("channel write failed: {e}");251 break;252 }253 }254 };255 join!(srx_task, stx_task);256 })257}258259pub struct SshHandler {260 host: String,261 port: u16,262 subs: Subs,263}264impl Handler for SshHandler {265 type Error = russh::Error;266 async fn check_server_key(267 &mut self,268 server_public_key: &PublicKey,269 ) -> Result<bool, Self::Error> {270 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)271 }272 async fn server_channel_open_forwarded_streamlocal(273 &mut self,274 channel: Channel<Msg>,275 socket_path: &str,276 _session: &mut Session,277 ) -> Result<(), Self::Error> {278 let Some(ch) = self279 .subs280 .lock()281 .expect("lock")282 .remove(&Utf8PathBuf::from(socket_path))283 else {284 return Err(russh::Error::WrongChannel);285 };286 let _ = ch.send(channel);287 Ok(())288 }289}290291struct SshElevator {292 sess: Arc<Handle<SshHandler>>,293 rpc: WeakRpc<BifConfig>,294 agent_path: Utf8PathBuf,295}296impl Elevator for SshElevator {297 async fn elevate(&self) -> Result<(), ElevateError> {298 let fail = |e: String| ElevateError::Failed(e);299 let (tool, flags) = detect_escalation(&self.sess)300 .await301 .map_err(|e| fail(e.to_string()))?;302 let ch = self303 .sess304 .channel_open_session()305 .await306 .map_err(|e| fail(e.to_string()))?;307 ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))308 .await309 .map_err(|e| fail(e.to_string()))?;310 let rpc = self311 .rpc312 .clone()313 .upgrade()314 .ok_or_else(|| fail("rpc is gone".to_owned()))?;315 rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));316 Ok(())317 }318}319320pub struct RemoteChild {321 pub stdout: DuplexStream,322 pub stderr: DuplexStream,323 pub exit: oneshot::Receiver<Option<u32>>,324}325326enum Transport {327 Ssh {328 sess: Arc<Handle<SshHandler>>,329 subs: Subs,330 remote_dir: Utf8PathBuf,331 agent_path: Utf8PathBuf,332 },333 Local {334 #[allow(dead_code)]335 agent: Rpc<BifConfig>,336 agent_path: String,337 },338}339340pub struct Remowt {341 transport: Transport,342 rpc: Rpc<BifConfig>,343 elevated: tokio::sync::OnceCell<()>,344 children: Mutex<Vec<tokio::process::Child>>,345}346347pub type RemowtRemote = Remote<BifConfig>;348349fn loopback() -> (Port, Port) {350 let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();351 let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();352 let user = Port::new(move |mut rx, tx| async move {353 loop {354 tokio::select! {355 msg = rx.recv() => match msg {356 Some(msg) => if a2b_tx.send(msg).is_err() { break },357 None => break,358 },359 msg = b2a_rx.recv() => match msg {360 Some(msg) => if tx.send(msg).is_err() { break },361 None => break,362 },363 }364 }365 });366 let agent = Port::new(move |mut rx, tx| async move {367 loop {368 tokio::select! {369 msg = rx.recv() => match msg {370 Some(msg) => if b2a_tx.send(msg).is_err() { break },371 None => break,372 },373 msg = a2b_rx.recv() => match msg {374 Some(msg) => if tx.send(msg).is_err() { break },375 None => break,376 },377 }378 }379 });380 (user, agent)381}382383impl Remowt {384 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {385 let conf = russh_config::parse_home(host)?;386 let port = conf.host_config.port.unwrap_or(22);387 let hostname = conf388 .host_config389 .hostname390 .clone()391 .unwrap_or_else(|| conf.host_name.clone());392 let user = conf393 .user394 .clone()395 .unwrap_or_else(|| std::env::var("USER").unwrap_or_else(|_| "root".to_owned()));396397 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));398 let mut sess = connect(399 Arc::new(Config::default()),400 (hostname.clone(), port),401 SshHandler {402 host: hostname,403 port,404 subs: subs.clone(),405 },406 )407 .await?;408409 let mut agent = AgentClient::connect_env().await?;410 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();411 let mut authenticated = false;412 for ident in agent.request_identities().await? {413 let AgentIdentity::PublicKey { key, .. } = ident else {414 continue;415 };416 if sess417 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)418 .await?419 .success()420 {421 authenticated = true;422 break;423 }424 }425 ensure!(authenticated, "ssh authentication failed");426427 // All remaining session ops take `&self`; share the handle.428 let sess = Arc::new(sess);429430 let agent_path = deploy_agent(&sess, bundle).await?;431432 let remote_dir = remote_mktemp(&sess).await?;433 let primary = remote_dir.join("primary.sock");434435 let (onetx, onerx) = channel();436 subs.lock().expect("lock").insert(primary.clone(), onetx);437 sess.streamlocal_forward(primary.clone()).await?;438439 let rpc = Rpc::<BifConfig>::new(Address::User);440441 // TODO: ensure no injection is possible in the socket path.442 let cmd_chan = sess.channel_open_session().await?;443 cmd_chan444 .exec(445 true,446 format!(447 "{} real-agent --path={}",448 sh_quote(&agent_path),449 sh_quote(&primary)450 ),451 )452 .await?;453454 let port = port_from_channel(455 onerx456 .await457 .map_err(|_| anyhow!("agent never opened its channel"))?,458 );459 rpc.add_direct(Address::Agent, port, Rtt(0));460461 Ok(Self {462 transport: Transport::Ssh {463 sess,464 subs,465 remote_dir,466 agent_path,467 },468 rpc,469 elevated: tokio::sync::OnceCell::new(),470 children: Mutex::new(Vec::new()),471 })472 }473474 pub async fn connect_local(agent_path: &str) -> Result<Self> {475 let (port_user, port_agent) = loopback();476 let rpc = Rpc::<BifConfig>::new(Address::User);477 let mut agent = Rpc::<BifConfig>::new(Address::Agent);478479 // Register handlers before wiring up the link (see the agent binary).480 Fs::new().register_endpoints(&mut agent);481 Systemd.register_endpoints(&mut agent);482 Pty::new().register_endpoints(&mut agent);483484 agent.add_direct(Address::User, port_agent, Rtt(0));485 rpc.add_direct(Address::Agent, port_user, Rtt(0));486487 Ok(Self {488 transport: Transport::Local {489 agent,490 agent_path: agent_path.to_owned(),491 },492 rpc,493 elevated: tokio::sync::OnceCell::new(),494 children: Mutex::new(Vec::new()),495 })496 }497498 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {499 match &self.transport {500 Transport::Ssh { sess, .. } => Some(sess.clone()),501 Transport::Local { .. } => None,502 }503 }504505 pub fn rpc(&self) -> Rpc<BifConfig> {506 self.rpc.clone()507 }508509 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {510 R::wrap(self.rpc.remote(Address::Agent))511 }512513 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {514 let client: PluginEndpointsClient<BifConfig> = self.endpoints();515 client516 .load_plugin(id, name.to_owned())517 .await?518 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))519 }520 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {521 self.ensure_elevated().await?;522 let client: PluginEndpointsClient<BifConfig> =523 PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));524 client525 .load_plugin_path(id, path.to_owned())526 .await?527 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))528 }529 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {530 R::wrap(self.rpc.remote(Address::Plugin(id)))531 }532 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {533 self.ensure_elevated().await?;534 Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))535 }536537 async fn ensure_elevated(&self) -> Result<()> {538 self.elevated539 .get_or_try_init(|| async {540 let port = match &self.transport {541 Transport::Ssh {542 sess, agent_path, ..543 } => {544 let (tool, flags) = detect_escalation(sess).await?;545 let ch = sess.channel_open_session().await?;546 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))547 .await?;548 port_from_channel(ch)549 }550 Transport::Local { agent_path, .. } => {551 let sock = std::env::temp_dir()552 .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));553 let _ = std::fs::remove_file(&sock);554 let listener = UnixListener::bind(&sock)?;555 let (tool, flags) = ESCALATORS556 .iter()557 .find(|(t, _)| find_in_path(t).is_some())558 .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;559 let child = tokio::process::Command::new(tool)560 .args(*flags)561 .arg(agent_path)562 .arg("real-agent")563 .arg("--privileged")564 .arg("--path")565 .arg(sock.to_str().expect("temp path is utf-8"))566 .kill_on_drop(true)567 .spawn()?;568 self.children.lock().expect("lock").push(child);569 let (stream, _) = listener.accept().await?;570 let _ = std::fs::remove_file(&sock);571 from_socket(stream)572 }573 };574 self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));575 anyhow::Ok(())576 })577 .await?;578 Ok(())579 }580581 pub async fn exec(&self, command: String) -> Result<RemoteChild> {582 let Some(sess) = self.ssh() else {583 bail!("exec should not be called on local")584 };585 let ch = sess.channel_open_session().await?;586 ch.exec(true, command).await?;587588 let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);589 let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);590 let (exit_tx, exit) = oneshot::channel();591592 tokio::spawn(async move {593 let mut ch = ch;594 let mut code = None;595 while let Some(msg) = ch.wait().await {596 match msg {597 ChannelMsg::Data { data } => {598 if out_w.write_all(&data).await.is_err() {599 break;600 }601 }602 ChannelMsg::ExtendedData { data, .. } => {603 if err_w.write_all(&data).await.is_err() {604 break;605 }606 }607 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),608 _ => {}609 }610 }611 let _ = out_w.shutdown().await;612 let _ = err_w.shutdown().await;613 let _ = exit_tx.send(code);614 });615616 Ok(RemoteChild {617 stdout,618 stderr,619 exit,620 })621 }622623 pub fn serve_elevate(&self) -> Result<()> {624 let Transport::Ssh {625 sess, agent_path, ..626 } = &self.transport627 else {628 bail!("elevate should not be called on local")629 };630 let mut rpc = self.rpc.clone();631 ElevateEndpoints(SshElevator {632 sess: sess.clone(),633 rpc: self.rpc.clone().downgrade(),634 agent_path: agent_path.to_owned(),635 })636 .register_endpoints(&mut rpc);637 Ok(())638 }639640 pub fn remote_dir(&self) -> Option<&Utf8Path> {641 match &self.transport {642 Transport::Ssh { remote_dir, .. } => Some(remote_dir),643 Transport::Local { .. } => None,644 }645 }646647 pub async fn forward_socket(648 &self,649 remote_path: &Utf8Path,650 ) -> Result<oneshot::Receiver<Channel<Msg>>> {651 let Transport::Ssh { sess, subs, .. } = &self.transport else {652 bail!("forward_socket should not be called on local")653 };654 let (tx, rx) = oneshot::channel();655 subs.lock()656 .expect("lock")657 .insert(remote_path.to_owned(), tx);658 sess.streamlocal_forward(remote_path.to_owned()).await?;659 Ok(rx)660 }661662 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {663 let Transport::Ssh { remote_dir, .. } = &self.transport else {664 bail!("open_shell should not be called on local")665 };666 let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));667668 let rx = self.forward_socket(&sock).await?;669 let client: PtyClient<BifConfig> = self.endpoints();670 let id = client671 .open_shell(sock, term.to_owned(), cols, rows)672 .await?673 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;674 let ch = rx675 .await676 .map_err(|_| anyhow!("agent never connected the shell socket"))?;677678 Ok(Shell {679 id,680 stream: ch.into_stream(),681 remote: self.rpc.remote(Address::Agent),682 })683 }684}685686pub struct Shell {687 pub id: ShellId,688 pub stream: ChannelStream<Msg>,689 remote: Remote<BifConfig>,690}691692impl Shell {693 pub fn resizer(&self) -> ShellResizer {694 ShellResizer {695 remote: self.remote.clone(),696 id: self.id,697 }698 }699}700701#[derive(Clone)]702pub struct ShellResizer {703 remote: Remote<BifConfig>,704 id: ShellId,705}706707impl ShellResizer {708 pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {709 PtyClient::wrap(self.remote.clone())710 .resize(self.id, cols, rows)711 .await?712 .map_err(|e| anyhow!("failed to resize remote shell: {e}"))713 }714}715716async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {717 let mut cmd_chan = sess.channel_open_session().await?;718 cmd_chan719 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")720 .await?;721 let mut stdout = vec![];722 loop {723 let Some(msg) = cmd_chan.wait().await else {724 bail!("unexpected channel end");725 };726 match msg {727 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),728 russh::ChannelMsg::ExitStatus { exit_status } => {729 if exit_status != 0 {730 bail!("mktemp failed");731 }732 break;733 }734 _ => {}735 }736 }737 ensure!(stdout.ends_with(b"\n"));738 stdout.pop();739 Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))740}crates/remowt-link-shared/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -13,9 +13,12 @@
User,
Agent,
AgentPrivileged,
+ Plugin(u16),
}
impl AddressT for Address {}
+pub mod plugin;
+
pub use remowt_fs::{Error as FsError, Fs, FsClient};
pub use remowt_pty::{Error as PtyError, Pty, PtyClient, ShellId};
pub use remowt_systemd::{Error as SystemdError, Systemd, SystemdClient};
crates/remowt-link-shared/src/plugin.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-link-shared/src/plugin.rs
@@ -0,0 +1,39 @@
+use std::future::Future;
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+ #[error("plugin name must be a bare file name")]
+ BadName,
+ #[error("spawning plugin failed: {0}")]
+ Spawn(String),
+ #[error("agent is shutting down")]
+ Gone,
+}
+
+pub trait PluginHost: Send + Sync {
+ fn load_plugin(&self, id: u16, name: String) -> impl Future<Output = Result<(), Error>> + Send;
+
+ fn load_plugin_path(
+ &self,
+ id: u16,
+ path: String,
+ ) -> impl Future<Output = Result<(), Error>> + Send;
+}
+
+pub struct PluginEndpoints<H>(pub H);
+
+#[endpoints(ns = 9)]
+impl<H: PluginHost + 'static> PluginEndpoints<H> {
+ #[endpoints(id = 1)]
+ async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {
+ self.0.load_plugin(id, name).await
+ }
+ #[endpoints(id = 2)]
+ async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {
+ self.0.load_plugin_path(id, path).await
+ }
+}
crates/remowt-nix-daemon/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-nix-daemon/Cargo.toml
+++ b/crates/remowt-nix-daemon/Cargo.toml
@@ -1,6 +1,6 @@
[package]
-name = "fleet-nix-daemon"
-description = "Nix daemon proxy endpoint + connection logic for fleet"
+name = "remowt-nix-daemon"
+description = "Nix daemon proxy"
version.workspace = true
edition = "2021"
crates/remowt-plugin/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-plugin/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "remowt-plugin"
+version.workspace = true
+edition = "2021"
+
+[dependencies]
+anyhow.workspace = true
+bifrostlink.workspace = true
+bifrostlink-ports.workspace = true
+bytes.workspace = true
+remowt-link-shared.workspace = true
+tokio = { workspace = true, features = [
+ "rt",
+ "net",
+ "io-std",
+ "io-util",
+ "macros",
+ "time",
+ "process",
+] }
+tracing.workspace = true
+tracing-subscriber.workspace = true
crates/remowt-plugin/src/host.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-plugin/src/host.rs
@@ -0,0 +1,110 @@
+use std::ffi::OsStr;
+use std::io;
+use std::process::Stdio;
+use std::sync::Mutex;
+
+use bifrostlink::{Port, Rpc, Rtt, WeakRpc};
+use bytes::{Bytes, BytesMut};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
+use tokio::process::{Child, ChildStdin, ChildStdout, Command};
+
+use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};
+use remowt_link_shared::{Address, BifConfig};
+
+pub fn serve(rpc: &mut Rpc<BifConfig>) {
+ let host = Host {
+ rpc: rpc.clone().downgrade(),
+ children: Mutex::new(Vec::new()),
+ };
+ PluginEndpoints(host).register_endpoints(rpc);
+}
+
+struct Host {
+ rpc: WeakRpc<BifConfig>,
+ children: Mutex<Vec<Child>>,
+}
+
+impl Host {
+ fn spawn(&self, id: u16, path: impl AsRef<OsStr>) -> Result<(), Error> {
+ let rpc = self.rpc.clone().upgrade().ok_or(Error::Gone)?;
+
+ let mut child = Command::new(path)
+ .arg(id.to_string())
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .kill_on_drop(true)
+ .spawn()
+ .map_err(|e| Error::Spawn(e.to_string()))?;
+ let stdin = child.stdin.take().expect("stdin piped");
+ let stdout = child.stdout.take().expect("stdout piped");
+
+ rpc.add_direct(Address::Plugin(id), child_port(stdout, stdin), Rtt(0));
+ self.children.lock().expect("not poisoned").push(child);
+ Ok(())
+ }
+}
+
+impl PluginHost for Host {
+ async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {
+ // TODO: Right now loads plugin next to the binary...
+ // But with our CA addressed schema, the plugins should be located in content-addressed subdir...
+ // Maybe it should just be scrapped in favor of load_plugin_path.
+ if name.is_empty() || name == "." || name == ".." || name.contains(['/', '\0']) {
+ return Err(Error::BadName);
+ }
+ let exe = std::env::current_exe().map_err(|e| Error::Spawn(e.to_string()))?;
+ let dir = exe
+ .parent()
+ .ok_or_else(|| Error::Spawn("primary agent has no parent directory".to_owned()))?;
+ self.spawn(id, dir.join(&name))
+ }
+
+ async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {
+ if path.is_empty() || path.contains('\0') {
+ return Err(Error::BadName);
+ }
+ self.spawn(id, path)
+ }
+}
+
+fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {
+ Port::new(|mut rx, tx| async move {
+ let reader = async move {
+ loop {
+ let len = match stdout.read_u32().await {
+ Ok(len) => len,
+ Err(e) => {
+ tracing::error!("plugin stdout read failed: {e}");
+ break;
+ }
+ };
+ let mut buf = BytesMut::zeroed(len as usize);
+ if let Err(e) = stdout.read_exact(&mut buf).await {
+ tracing::error!("plugin stdout read failed: {e}");
+ break;
+ }
+ if tx.send(buf.freeze()).is_err() {
+ break;
+ }
+ }
+ };
+ let writer = async move {
+ while let Some(msg) = rx.recv().await {
+ if let Err(e) = write_frame(&mut stdin, msg).await {
+ tracing::error!("plugin stdin write failed: {e}");
+ break;
+ }
+ }
+ };
+ tokio::join!(reader, writer);
+ })
+}
+
+async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {
+ let len = u32::try_from(msg.len())
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
+ stdin.write_u32(len).await?;
+ stdin.write_all(&msg).await?;
+ stdin.flush().await?;
+ Ok(())
+}
crates/remowt-plugin/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-plugin/src/lib.rs
@@ -0,0 +1,38 @@
+use std::future::pending;
+
+use anyhow::Result;
+use bifrostlink::{Rpc, Rtt};
+use bifrostlink_ports::stdio::from_stdio;
+use tokio::runtime::Builder;
+
+pub mod host;
+
+pub use bifrostlink;
+pub use remowt_link_shared::{self, Address, BifConfig, Fs, Pty, Systemd};
+
+pub fn plugin_index() -> Result<u16> {
+ let arg = std::env::args()
+ .nth(1)
+ .ok_or_else(|| anyhow::anyhow!("missing plugin index argument"))?;
+ arg.parse()
+ .map_err(|e| anyhow::anyhow!("invalid plugin index {arg:?}: {e}"))
+}
+
+pub fn run<F>(register: F) -> Result<()>
+where
+ F: FnOnce(&mut Rpc<BifConfig>),
+{
+ tracing_subscriber::fmt()
+ .with_writer(std::io::stderr)
+ .init();
+
+ let index = plugin_index()?;
+ let runtime = Builder::new_current_thread().enable_all().build()?;
+ runtime.block_on(async move {
+ let mut rpc = Rpc::<BifConfig>::new(Address::Plugin(index));
+ rpc.add_direct(Address::Agent, from_stdio(), Rtt(0));
+ register(&mut rpc);
+ let _rpc = rpc;
+ pending::<Result<()>>().await
+ })
+}