1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use camino::{Utf8Path, Utf8PathBuf};10use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};13use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::keys::agent::client::AgentClient;15use russh::keys::agent::AgentIdentity;16use russh::keys::check_known_hosts;17use russh::keys::ssh_key::PublicKey;18use russh::Channel;19use tempfile::TempDir;20use tokio::io::AsyncRead;21use tokio::net::UnixListener;22use tokio::sync::oneshot;23use tokio::task::JoinHandle;24use tokio::{25 fs,26 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},27};28use tracing::{debug, info, warn};29use uuid::Uuid;3031pub mod editor;32mod forwarded;33mod shell;34mod ssh_exec;35mod subprocess;3637use self::ssh_exec::SshExecChild;38pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};39pub use forwarded::{RemowtListener, RemowtStream};40pub use shell::{RemowtShell, RemowtShellResizer};4142type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4344fn sh_quote(s: impl AsRef<str>) -> String {45 format!("'{}'", s.as_ref().replace('\'', "'\\''"))46}4748const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];4950pub struct AgentBundle {51 dir: PathBuf,52 hashes: HashMap<String, String>,53}5455impl AgentBundle {56 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {57 let dir = dir.into();58 let hashes_path = dir.join("hashes");59 let raw = std::fs::read_to_string(&hashes_path)60 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;61 let mut hashes = HashMap::new();62 for line in raw.lines() {63 let line = line.trim();64 if line.is_empty() {65 continue;66 }67 let (arch, hash) = line68 .split_once(char::is_whitespace)69 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;70 hashes.insert(arch.to_owned(), hash.trim().to_owned());71 }72 ensure!(73 !hashes.is_empty(),74 "agent bundle {} has no hashes",75 dir.display()76 );77 Ok(Self { dir, hashes })78 }7980 fn binary(&self, arch: &str) -> PathBuf {81 self.dir.join(format!("remowt-agent-{arch}"))82 }8384 fn local_binary(&self) -> Result<PathBuf> {85 let arch = env::consts::ARCH;86 let path = self.binary(arch);87 ensure!(88 path.is_file(),89 "no local remowt-agent build for arch {arch} in bundle {}",90 self.dir.display()91 );92 Ok(path)93 }94}9596async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {97 let ch = sess.channel_open_session().await?;98 ch.exec(true, cmd).await?;99100 let mut child = SshExecChild::from_exec(ch);101 drop(child.stdin);102 drain_to_tracing(child.stderr, cmd.to_owned(), true);103104 let mut out = Vec::new();105 child.stdout.read_to_end(&mut out).await?;106 let code = child.exit.await.ok().flatten();107 Ok((code, out))108}109110async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {111 let (code, mut out) = run(sess, cmd).await?;112 ensure!(113 code == Some(0),114 "remote command failed (exit {code:?}): {cmd}"115 );116 if !out.is_empty() {117 ensure!(118 out.ends_with(b"\n"),119 "remote command was not newline-terminated: {cmd}: {out:?}"120 );121 out.pop();122 }123 String::from_utf8(out).context("expected utf8 output for command")124}125126async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {127 debug!("uname -a");128 let arch = run_string_ok(sess, "uname -m").await?;129 let hash = bundle130 .hashes131 .get(&arch)132 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133134 debug!("get dir");135 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;136 let dir = if cache.is_empty() {137 let home = run_string_ok(sess, "echo \"$HOME\"").await?;138 ensure!(139 !home.is_empty(),140 "remote $HOME and $XDG_CACHE_HOME both empty"141 );142 Utf8PathBuf::from(home).join(".cache/remowt")143 } else {144 Utf8PathBuf::from(cache).join("remowt")145 };146 let path = dir.join(hash);147148 debug!("presence");149 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;150 if present != Some(0) {151 let bin = bundle.binary(&arch);152 debug!("read");153 let bytes = fs::read(&bin)154 .await155 .with_context(|| format!("reading agent binary {}", bin.display()))?;156 debug!("upload");157 upload_agent(sess, &dir, &path, bytes).await?;158 }159 Ok(path)160}161162async fn upload_agent(163 sess: &Handle<SshHandler>,164 dir: &Utf8Path,165 path: &Utf8Path,166 bytes: Vec<u8>,167) -> Result<()> {168 debug!("mkdirp");169 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;170171 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));172 let ch = sess.channel_open_session().await?;173 debug!("cat");174 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;175176 let mut child = SshExecChild::from_exec(ch);177 child178 .stdin179 .write_all(&bytes)180 .await181 .context("sending agent binary")?;182 child183 .stdin184 .shutdown()185 .await186 .context("sending agent binary")?;187 let code = child.wait().await;188 ensure!(code == Some(0), "agent upload failed (exit {code:?})");189190 debug!("chmod");191 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;192 run_string_ok(193 sess,194 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),195 )196 .await?;197 Ok(())198}199200pub struct SshHandler {201 host: String,202 port: u16,203 subs: Subs,204}205impl Handler for SshHandler {206 type Error = russh::Error;207 async fn check_server_key(208 &mut self,209 server_public_key: &PublicKey,210 ) -> Result<bool, Self::Error> {211 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)212 }213 async fn server_channel_open_forwarded_streamlocal(214 &mut self,215 channel: Channel<Msg>,216 socket_path: &str,217 _session: &mut Session,218 ) -> Result<(), Self::Error> {219 let Some(ch) = self220 .subs221 .lock()222 .expect("lock")223 .remove(&Utf8PathBuf::from(socket_path))224 else {225 return Err(russh::Error::WrongChannel);226 };227 let _ = ch.send(channel);228 Ok(())229 }230}231232enum Transport {233 Ssh {234 sess: Arc<Handle<SshHandler>>,235 subs: Subs,236 runtime_dir: Utf8PathBuf,237 agent_path: Utf8PathBuf,238 },239 Local {240 agent_path: PathBuf,241 runtime_dir: Utf8PathBuf,242 },243}244245struct RemowtInner {246 transport: Transport,247 rpc: Rpc<BifConfig>,248 elevated: tokio::sync::OnceCell<()>,249 #[allow(dead_code)]250 children: Mutex<Vec<tokio::process::Child>>,251 _runtime_tmp: Option<TempDir>,252 user: String,253}254255#[derive(Clone)]256pub struct Remowt(Arc<RemowtInner>);257258pub type RemowtRemote = Remote<BifConfig>;259260impl Remowt {261 262 263 pub async fn connect(host: &str, bundle: &AgentBundle, remowt_user: String) -> Result<Self> {264 let conf = russh_config::parse_home(host)?;265 let port = conf.host_config.port.or(conf.port).unwrap_or(22);266 let hostname = conf267 .host_config268 .hostname269 .clone()270 .unwrap_or_else(|| conf.host_name.clone());271 let user = conf272 .user273 .clone()274 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));275276 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));277 let mut sess = connect(278 Arc::new(Config::default()),279 (hostname.clone(), port),280 SshHandler {281 host: hostname,282 port,283 subs: subs.clone(),284 },285 )286 .await?;287288 let mut agent = AgentClient::connect_env().await?;289 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();290 let mut authenticated = false;291 for ident in agent.request_identities().await? {292 let AgentIdentity::PublicKey { key, .. } = ident else {293 continue;294 };295 if sess296 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)297 .await?298 .success()299 {300 authenticated = true;301 break;302 }303 }304 ensure!(authenticated, "ssh authentication failed");305306 let sess = Arc::new(sess);307308 debug!("deploying agent");309 let agent_path = deploy_agent(&sess, bundle).await?;310311 debug!("runtime dir");312 let runtime_dir = remote_runtime_dir(&sess).await?;313314 let rpc = Rpc::<BifConfig>::new(Address::User);315316 let cmd_chan = sess.channel_open_session().await?;317 debug!("starting agent");318 cmd_chan319 .exec(true, format!("{} real-agent", sh_quote(&agent_path)))320 .await?;321322 let child = SshExecChild::from_exec(cmd_chan);323 drain_to_tracing(child.stderr, "agent".to_owned(), true);324 rpc.add_direct(325 Address::Agent,326 child_port(child.stdout, child.stdin),327 Rtt(0),328 );329330 Ok(Self(Arc::new(RemowtInner {331 transport: Transport::Ssh {332 sess,333 subs,334 runtime_dir,335 agent_path,336 },337 rpc,338 elevated: tokio::sync::OnceCell::new(),339 children: Mutex::new(Vec::new()),340 _runtime_tmp: None,341 user: remowt_user,342 })))343 }344345 346 pub async fn connect_local(bundle: &AgentBundle, user: String) -> Result<Self> {347 let agent_path = bundle.local_binary()?;348 let mut child = tokio::process::Command::new(&agent_path)349 .arg("real-agent")350 .arg("--local")351 .stdin(std::process::Stdio::piped())352 .stdout(std::process::Stdio::piped())353 .kill_on_drop(true)354 .spawn()355 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;356 let stdin = child.stdin.take().expect("stdin piped");357 let stdout = child.stdout.take().expect("stdout piped");358359 let rpc = Rpc::<BifConfig>::new(Address::User);360 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));361362 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;363364 Ok(Self(Arc::new(RemowtInner {365 transport: Transport::Local {366 agent_path,367 runtime_dir,368 },369 rpc,370 elevated: tokio::sync::OnceCell::new(),371 children: Mutex::new(vec![child]),372 _runtime_tmp: runtime_tmp,373 user,374 })))375 }376377 378 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {379 match &self.0.transport {380 Transport::Ssh { sess, .. } => Some(sess.clone()),381 Transport::Local { .. } => None,382 }383 }384385 pub fn rpc(&self) -> Rpc<BifConfig> {386 self.0.rpc.clone()387 }388389 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {390 let client: PluginEndpointsClient<BifConfig> = self.endpoints();391 client392 .load_plugin(id, name.to_owned())393 .await?394 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))395 }396 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {397 self.ensure_escalated().await?;398 let client: PluginEndpointsClient<BifConfig> =399 PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));400 client401 .load_plugin_path(id, path.to_owned())402 .await?403 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))404 }405 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {406 R::wrap(self.0.rpc.remote(Address::Plugin(id)))407 }408409 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {410 R::wrap(self.0.rpc.remote(Address::Agent))411 }412 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {413 self.ensure_escalated().await?;414 Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))415 }416417 async fn ensure_escalated(&self) -> Result<()> {418 self.0419 .elevated420 .get_or_try_init(|| async {421 let (agent_path, local) = match &self.0.transport {422 Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),423 Transport::Local { agent_path, .. } => (424 agent_path425 .to_str()426 .ok_or_else(|| anyhow!("local agent path is not utf-8"))?427 .to_owned(),428 true,429 ),430 };431432 let (tool, flags) = self.detect_escalation().await?;433 let mut args: Vec<String> = Vec::new();434 args.push("-w".to_owned());435 args.push(tool.to_owned());436 args.extend(flags.iter().copied().map(str::to_owned));437 if tool == "run0" {438 args.push(format!(439 "--unit={}-{}-{}.scope",440 self.0.user,441 std::process::id(),442 Uuid::new_v4()443 ));444 }445 args.push(agent_path);446 args.push("real-agent".to_owned());447 args.push("--privileged".to_owned());448 if local {449 args.push("--local".to_owned());450 }451452 let child = self453 .spawn(SpawnOptions {454 program: "setsid".to_owned(),455 args,456 stdin: StdioMode::Pipe,457 stdout: StdioMode::Pipe,458 stderr: StderrMode::Inherit,459 ..Default::default()460 })461 .await462 .context("spawning privileged agent")?;463464 let stdin = child465 .stdin466 .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;467 let stdout = child468 .stdout469 .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;470471 let port = child_port(stdout, stdin);472 self.0473 .rpc474 .add_direct(Address::AgentPrivileged, port, Rtt(0));475 anyhow::Ok(())476 })477 .await?;478 Ok(())479 }480481 async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {482 for (tool, flags) in ESCALATORS {483 let probe = self484 .spawn(SpawnOptions {485 program: (*tool).to_owned(),486 args: vec!["--version".to_owned()],487 stdout: StdioMode::Null,488 stderr: StderrMode::Null,489 ..Default::default()490 })491 .await;492 if let Ok(child) = probe {493 let _ = child.wait().await;494 return Ok((tool, flags));495 }496 }497 bail!("no escalation tool found")498 }499500 501 pub fn runtime_dir(&self) -> Utf8PathBuf {502 match &self.0.transport {503 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),504 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),505 }506 }507508 509 pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {510 let sock = self511 .runtime_dir()512 .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));513 let listener = self.bind_unix(&sock).await?;514 Ok((listener, sock))515 }516517 518 pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {519 match &self.0.transport {520 Transport::Ssh { sess, subs, .. } => {521 let (tx, rx) = oneshot::channel();522 subs.lock().expect("lock").insert(path.to_owned(), tx);523 sess.streamlocal_forward(path.to_owned()).await?;524 Ok(RemowtListener::Ssh(rx))525 }526 Transport::Local { .. } => {527 let _ = std::fs::remove_file(path);528 Ok(RemowtListener::Local(529 UnixListener::bind(path)?,530 path.to_owned(),531 ))532 }533 }534 }535}536537pub(crate) fn drain_to_tracing(538 stream: impl AsyncRead + Unpin + 'static + Send,539 context: String,540 stderr: bool,541) -> JoinHandle<()> {542 tokio::spawn(async move {543 let mut reader = BufReader::new(stream);544 let mut buf = Vec::with_capacity(4096);545 loop {546 buf.clear();547 match reader.read_until(b'\n', &mut buf).await {548 Ok(0) => break,549 Ok(_) => {550 let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));551 if stderr {552 warn!(context = %context, "{line}");553 } else {554 info!(context = %context, "{line}");555 }556 }557 Err(e) => {558 warn!(context = %context, "child stdio read failed: {e}");559 break;560 }561 }562 }563 })564}565566fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {567 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {568 if !dir.is_empty() {569 return Ok((Utf8PathBuf::from(dir), None));570 }571 }572 let tmp = tempfile::Builder::new()573 .prefix("remowt.")574 .rand_bytes(12)575 .tempdir()?;576 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())577 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;578 Ok((dir, Some(tmp)))579}580581async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {582 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;583 let dir = dir.trim();584 if dir.is_empty() {585 let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;586 Ok(Utf8PathBuf::from(tmp))587 } else {588 Ok(Utf8PathBuf::from(dir))589 }590}