1use std::collections::HashMap;2use std::io;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_link_shared::plugin::PluginEndpointsClient;13use remowt_link_shared::{14 Address, BifConfig, ElevateEndpoints, ElevateError, Elevator, Fs, Pty, PtyClient, ShellId,15 Systemd,16};17use russh::client::{connect, Config, Handle, Handler, Msg, Session};18use russh::keys::agent::client::AgentClient;19use russh::keys::agent::AgentIdentity;20use russh::keys::check_known_hosts;21use russh::keys::ssh_key::PublicKey;22use russh::{Channel, ChannelMsg, ChannelStream};23use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};24use tokio::join;25use tokio::net::UnixListener;26use tokio::sync::mpsc;27use tokio::sync::oneshot::{self, channel};28use tracing::error;29use uuid::Uuid;3031pub mod editor;3233type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3435async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {36 let len = srx.read_u32().await?;37 let mut buf = BytesMut::zeroed(len as usize);38 srx.read_exact(&mut buf).await?;39 Ok(buf)40}41async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {42 stx.write_u32(value.len().try_into().expect("can't be larger"))43 .await?;44 stx.write_all(&value).await?;45 Ok(())46}4748fn sh_quote(s: impl AsRef<str>) -> String {49 format!("'{}'", s.as_ref().replace('\'', "'\\''"))50}5152const ESCALATORS: [(&str, &[&str]); 3] = [53 ("run0", &["--background=", "--pipe"]),54 ("sudo", &[]),55 ("doas", &[]),56];5758pub struct AgentBundle {59 dir: PathBuf,60 hashes: HashMap<String, String>,61}6263impl AgentBundle {64 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {65 let dir = dir.into();66 let hashes_path = dir.join("hashes");67 let raw = std::fs::read_to_string(&hashes_path)68 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;69 let mut hashes = HashMap::new();70 for line in raw.lines() {71 let line = line.trim();72 if line.is_empty() {73 continue;74 }75 let (arch, hash) = line76 .split_once(char::is_whitespace)77 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;78 hashes.insert(arch.to_owned(), hash.trim().to_owned());79 }80 ensure!(81 !hashes.is_empty(),82 "agent bundle {} has no hashes",83 dir.display()84 );85 Ok(Self { dir, hashes })86 }8788 fn binary(&self, arch: &str) -> PathBuf {89 self.dir.join(format!("remowt-agent-{arch}"))90 }91}9293async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {94 let mut ch = sess.channel_open_session().await?;95 ch.exec(true, cmd).await?;96 let mut out = Vec::new();97 let mut code = None;98 while let Some(msg) = ch.wait().await {99 match msg {100 ChannelMsg::Data { data } => out.extend(data.as_ref()),101 ChannelMsg::ExtendedData { data, .. } => {102 error!(103 "remote stderr: {}",104 String::from_utf8_lossy(data.as_ref()).trim()105 );106 }107 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),108 _ => {}109 }110 }111 Ok((code, out))112}113114async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {115 let (code, mut out) = run(sess, cmd).await?;116 ensure!(117 code == Some(0),118 "remote command failed (exit {code:?}): {cmd}"119 );120 ensure!(out.ends_with(b"\n"));121 out.pop();122 String::from_utf8(out).context("expected utf8 output for command")123}124125async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {126 let arch = run_string_ok(sess, "uname -m").await?;127 let hash = bundle128 .hashes129 .get(&arch)130 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;131132 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")133 .await?134 .trim()135 .to_owned();136 let dir = if cache.is_empty() {137 let home = run_string_ok(sess, "echo \"$HOME\"").await?;138 ensure!(139 !home.is_empty(),140 "remote $HOME and $XDG_CACHE_HOME both empty"141 );142 Utf8PathBuf::from(home).join("cache/remowt")143 } else {144 Utf8PathBuf::from(cache).join("remowt")145 };146 let path = dir.join(hash);147148 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;149 if present != Some(0) {150 let bin = bundle.binary(&arch);151 let bytes = std::fs::read(&bin)152 .with_context(|| format!("reading agent binary {}", bin.display()))?;153 upload_agent(sess, &dir, &path, bytes).await?;154 }155 Ok(path)156}157158async fn upload_agent(159 sess: &Handle<SshHandler>,160 dir: &Utf8Path,161 path: &Utf8Path,162 bytes: Vec<u8>,163) -> Result<()> {164 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;165166 let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));167 let ch = sess.channel_open_session().await?;168 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;169 ch.data_bytes(bytes).await?;170 ch.eof().await?;171 let mut ch = ch;172 let mut code = None;173 while let Some(msg) = ch.wait().await {174 match msg {175 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),176 ChannelMsg::ExtendedData { data, .. } => {177 error!(178 "agent upload: {}",179 String::from_utf8_lossy(data.as_ref()).trim()180 );181 }182 _ => {}183 }184 }185 ensure!(code == Some(0), "agent upload failed (exit {code:?})");186187 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;188 run_string_ok(189 sess,190 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),191 )192 .await?;193 Ok(())194}195196async fn detect_escalation(197 sess: &Handle<SshHandler>,198) -> Result<(&'static str, &'static [&'static str])> {199 for (tool, flags) in ESCALATORS {200 201 let (code, _) = run(sess, &format!("command -v {tool}")).await?;202 if code == Some(0) {203 return Ok((tool, flags));204 }205 }206 bail!("no escalation tool (run0/sudo/doas) found on remote")207}208209fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {210 let mut parts = vec![tool.to_owned()];211 parts.extend(flags.iter().map(|f| f.to_string()));212 parts.push(sh_quote(agent_path));213 parts.push("real-agent".to_owned());214 parts.push("--privileged".to_owned());215 if let Some(p) = path {216 parts.push("--path".to_owned());217 parts.push(sh_quote(p));218 }219 parts.join(" ")220}221222fn find_in_path(name: &str) -> Option<std::path::PathBuf> {223 let path = std::env::var_os("PATH")?;224 std::env::split_paths(&path)225 .map(|dir| dir.join(name))226 .find(|p| p.is_file())227}228229fn port_from_channel(ch: Channel<Msg>) -> Port {230 Port::new(move |mut rx, tx| async move {231 let (mut srx, mut stx) = tokio::io::split(ch.into_stream());232 let srx_task = async move {233 loop {234 match read(&mut srx).await {235 Ok(buf) => {236 if tx.send(buf.freeze()).is_err() {237 break;238 }239 }240 Err(e) => {241 error!("channel read failed: {e}");242 break;243 }244 }245 }246 };247 let stx_task = async move {248 while let Some(value) = rx.recv().await {249 if let Err(e) = write(&mut stx, value).await {250 error!("channel write failed: {e}");251 break;252 }253 }254 };255 join!(srx_task, stx_task);256 })257}258259pub struct SshHandler {260 host: String,261 port: u16,262 subs: Subs,263}264impl Handler for SshHandler {265 type Error = russh::Error;266 async fn check_server_key(267 &mut self,268 server_public_key: &PublicKey,269 ) -> Result<bool, Self::Error> {270 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)271 }272 async fn server_channel_open_forwarded_streamlocal(273 &mut self,274 channel: Channel<Msg>,275 socket_path: &str,276 _session: &mut Session,277 ) -> Result<(), Self::Error> {278 let Some(ch) = self279 .subs280 .lock()281 .expect("lock")282 .remove(&Utf8PathBuf::from(socket_path))283 else {284 return Err(russh::Error::WrongChannel);285 };286 let _ = ch.send(channel);287 Ok(())288 }289}290291struct SshElevator {292 sess: Arc<Handle<SshHandler>>,293 rpc: WeakRpc<BifConfig>,294 agent_path: Utf8PathBuf,295}296impl Elevator for SshElevator {297 async fn elevate(&self) -> Result<(), ElevateError> {298 let fail = |e: String| ElevateError::Failed(e);299 let (tool, flags) = detect_escalation(&self.sess)300 .await301 .map_err(|e| fail(e.to_string()))?;302 let ch = self303 .sess304 .channel_open_session()305 .await306 .map_err(|e| fail(e.to_string()))?;307 ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))308 .await309 .map_err(|e| fail(e.to_string()))?;310 let rpc = self311 .rpc312 .clone()313 .upgrade()314 .ok_or_else(|| fail("rpc is gone".to_owned()))?;315 rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));316 Ok(())317 }318}319320pub struct RemoteChild {321 pub stdout: DuplexStream,322 pub stderr: DuplexStream,323 pub exit: oneshot::Receiver<Option<u32>>,324}325326enum Transport {327 Ssh {328 sess: Arc<Handle<SshHandler>>,329 subs: Subs,330 remote_dir: Utf8PathBuf,331 agent_path: Utf8PathBuf,332 },333 Local {334 #[allow(dead_code)]335 agent: Rpc<BifConfig>,336 agent_path: String,337 },338}339340pub struct Remowt {341 transport: Transport,342 rpc: Rpc<BifConfig>,343 elevated: tokio::sync::OnceCell<()>,344 children: Mutex<Vec<tokio::process::Child>>,345}346347pub type RemowtRemote = Remote<BifConfig>;348349fn loopback() -> (Port, Port) {350 let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();351 let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();352 let user = Port::new(move |mut rx, tx| async move {353 loop {354 tokio::select! {355 msg = rx.recv() => match msg {356 Some(msg) => if a2b_tx.send(msg).is_err() { break },357 None => break,358 },359 msg = b2a_rx.recv() => match msg {360 Some(msg) => if tx.send(msg).is_err() { break },361 None => break,362 },363 }364 }365 });366 let agent = Port::new(move |mut rx, tx| async move {367 loop {368 tokio::select! {369 msg = rx.recv() => match msg {370 Some(msg) => if b2a_tx.send(msg).is_err() { break },371 None => break,372 },373 msg = a2b_rx.recv() => match msg {374 Some(msg) => if tx.send(msg).is_err() { break },375 None => break,376 },377 }378 }379 });380 (user, agent)381}382383impl Remowt {384 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {385 let conf = russh_config::parse_home(host)?;386 let port = conf.host_config.port.unwrap_or(22);387 let hostname = conf388 .host_config389 .hostname390 .clone()391 .unwrap_or_else(|| conf.host_name.clone());392 let user = conf393 .user394 .clone()395 .unwrap_or_else(|| std::env::var("USER").unwrap_or_else(|_| "root".to_owned()));396397 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));398 let mut sess = connect(399 Arc::new(Config::default()),400 (hostname.clone(), port),401 SshHandler {402 host: hostname,403 port,404 subs: subs.clone(),405 },406 )407 .await?;408409 let mut agent = AgentClient::connect_env().await?;410 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();411 let mut authenticated = false;412 for ident in agent.request_identities().await? {413 let AgentIdentity::PublicKey { key, .. } = ident else {414 continue;415 };416 if sess417 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)418 .await?419 .success()420 {421 authenticated = true;422 break;423 }424 }425 ensure!(authenticated, "ssh authentication failed");426427 428 let sess = Arc::new(sess);429430 let agent_path = deploy_agent(&sess, bundle).await?;431432 let remote_dir = remote_mktemp(&sess).await?;433 let primary = remote_dir.join("primary.sock");434435 let (onetx, onerx) = channel();436 subs.lock().expect("lock").insert(primary.clone(), onetx);437 sess.streamlocal_forward(primary.clone()).await?;438439 let rpc = Rpc::<BifConfig>::new(Address::User);440441 442 let cmd_chan = sess.channel_open_session().await?;443 cmd_chan444 .exec(445 true,446 format!(447 "{} real-agent --path={}",448 sh_quote(&agent_path),449 sh_quote(&primary)450 ),451 )452 .await?;453454 let port = port_from_channel(455 onerx456 .await457 .map_err(|_| anyhow!("agent never opened its channel"))?,458 );459 rpc.add_direct(Address::Agent, port, Rtt(0));460461 Ok(Self {462 transport: Transport::Ssh {463 sess,464 subs,465 remote_dir,466 agent_path,467 },468 rpc,469 elevated: tokio::sync::OnceCell::new(),470 children: Mutex::new(Vec::new()),471 })472 }473474 pub async fn connect_local(agent_path: &str) -> Result<Self> {475 let (port_user, port_agent) = loopback();476 let rpc = Rpc::<BifConfig>::new(Address::User);477 let mut agent = Rpc::<BifConfig>::new(Address::Agent);478479 480 Fs::new().register_endpoints(&mut agent);481 Systemd.register_endpoints(&mut agent);482 Pty::new().register_endpoints(&mut agent);483484 agent.add_direct(Address::User, port_agent, Rtt(0));485 rpc.add_direct(Address::Agent, port_user, Rtt(0));486487 Ok(Self {488 transport: Transport::Local {489 agent,490 agent_path: agent_path.to_owned(),491 },492 rpc,493 elevated: tokio::sync::OnceCell::new(),494 children: Mutex::new(Vec::new()),495 })496 }497498 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {499 match &self.transport {500 Transport::Ssh { sess, .. } => Some(sess.clone()),501 Transport::Local { .. } => None,502 }503 }504505 pub fn rpc(&self) -> Rpc<BifConfig> {506 self.rpc.clone()507 }508509 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {510 R::wrap(self.rpc.remote(Address::Agent))511 }512513 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {514 let client: PluginEndpointsClient<BifConfig> = self.endpoints();515 client516 .load_plugin(id, name.to_owned())517 .await?518 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))519 }520 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {521 self.ensure_elevated().await?;522 let client: PluginEndpointsClient<BifConfig> =523 PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));524 client525 .load_plugin_path(id, path.to_owned())526 .await?527 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))528 }529 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {530 R::wrap(self.rpc.remote(Address::Plugin(id)))531 }532 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {533 self.ensure_elevated().await?;534 Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))535 }536537 async fn ensure_elevated(&self) -> Result<()> {538 self.elevated539 .get_or_try_init(|| async {540 let port = match &self.transport {541 Transport::Ssh {542 sess, agent_path, ..543 } => {544 let (tool, flags) = detect_escalation(sess).await?;545 let ch = sess.channel_open_session().await?;546 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))547 .await?;548 port_from_channel(ch)549 }550 Transport::Local { agent_path, .. } => {551 let sock = std::env::temp_dir()552 .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));553 let _ = std::fs::remove_file(&sock);554 let listener = UnixListener::bind(&sock)?;555 let (tool, flags) = ESCALATORS556 .iter()557 .find(|(t, _)| find_in_path(t).is_some())558 .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;559 let child = tokio::process::Command::new(tool)560 .args(*flags)561 .arg(agent_path)562 .arg("real-agent")563 .arg("--privileged")564 .arg("--path")565 .arg(sock.to_str().expect("temp path is utf-8"))566 .kill_on_drop(true)567 .spawn()?;568 self.children.lock().expect("lock").push(child);569 let (stream, _) = listener.accept().await?;570 let _ = std::fs::remove_file(&sock);571 from_socket(stream)572 }573 };574 self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));575 anyhow::Ok(())576 })577 .await?;578 Ok(())579 }580581 pub async fn exec(&self, command: String) -> Result<RemoteChild> {582 let Some(sess) = self.ssh() else {583 bail!("exec should not be called on local")584 };585 let ch = sess.channel_open_session().await?;586 ch.exec(true, command).await?;587588 let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);589 let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);590 let (exit_tx, exit) = oneshot::channel();591592 tokio::spawn(async move {593 let mut ch = ch;594 let mut code = None;595 while let Some(msg) = ch.wait().await {596 match msg {597 ChannelMsg::Data { data } => {598 if out_w.write_all(&data).await.is_err() {599 break;600 }601 }602 ChannelMsg::ExtendedData { data, .. } => {603 if err_w.write_all(&data).await.is_err() {604 break;605 }606 }607 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),608 _ => {}609 }610 }611 let _ = out_w.shutdown().await;612 let _ = err_w.shutdown().await;613 let _ = exit_tx.send(code);614 });615616 Ok(RemoteChild {617 stdout,618 stderr,619 exit,620 })621 }622623 pub fn serve_elevate(&self) -> Result<()> {624 let Transport::Ssh {625 sess, agent_path, ..626 } = &self.transport627 else {628 bail!("elevate should not be called on local")629 };630 let mut rpc = self.rpc.clone();631 ElevateEndpoints(SshElevator {632 sess: sess.clone(),633 rpc: self.rpc.clone().downgrade(),634 agent_path: agent_path.to_owned(),635 })636 .register_endpoints(&mut rpc);637 Ok(())638 }639640 pub fn remote_dir(&self) -> Option<&Utf8Path> {641 match &self.transport {642 Transport::Ssh { remote_dir, .. } => Some(remote_dir),643 Transport::Local { .. } => None,644 }645 }646647 pub async fn forward_socket(648 &self,649 remote_path: &Utf8Path,650 ) -> Result<oneshot::Receiver<Channel<Msg>>> {651 let Transport::Ssh { sess, subs, .. } = &self.transport else {652 bail!("forward_socket should not be called on local")653 };654 let (tx, rx) = oneshot::channel();655 subs.lock()656 .expect("lock")657 .insert(remote_path.to_owned(), tx);658 sess.streamlocal_forward(remote_path.to_owned()).await?;659 Ok(rx)660 }661662 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {663 let Transport::Ssh { remote_dir, .. } = &self.transport else {664 bail!("open_shell should not be called on local")665 };666 let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));667668 let rx = self.forward_socket(&sock).await?;669 let client: PtyClient<BifConfig> = self.endpoints();670 let id = client671 .open_shell(sock, term.to_owned(), cols, rows)672 .await?673 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;674 let ch = rx675 .await676 .map_err(|_| anyhow!("agent never connected the shell socket"))?;677678 Ok(Shell {679 id,680 stream: ch.into_stream(),681 remote: self.rpc.remote(Address::Agent),682 })683 }684}685686pub struct Shell {687 pub id: ShellId,688 pub stream: ChannelStream<Msg>,689 remote: Remote<BifConfig>,690}691692impl Shell {693 pub fn resizer(&self) -> ShellResizer {694 ShellResizer {695 remote: self.remote.clone(),696 id: self.id,697 }698 }699}700701#[derive(Clone)]702pub struct ShellResizer {703 remote: Remote<BifConfig>,704 id: ShellId,705}706707impl ShellResizer {708 pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {709 PtyClient::wrap(self.remote.clone())710 .resize(self.id, cols, rows)711 .await?712 .map_err(|e| anyhow!("failed to resize remote shell: {e}"))713 }714}715716async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {717 let mut cmd_chan = sess.channel_open_session().await?;718 cmd_chan719 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")720 .await?;721 let mut stdout = vec![];722 loop {723 let Some(msg) = cmd_chan.wait().await else {724 bail!("unexpected channel end");725 };726 match msg {727 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),728 russh::ChannelMsg::ExitStatus { exit_status } => {729 if exit_status != 0 {730 bail!("mktemp failed");731 }732 break;733 }734 _ => {}735 }736 }737 ensure!(stdout.ends_with(b"\n"));738 stdout.pop();739 Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))740}