1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use camino::{Utf8Path, Utf8PathBuf};10use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};13use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::keys::agent::client::AgentClient;15use russh::keys::agent::AgentIdentity;16use russh::keys::check_known_hosts;17use russh::keys::ssh_key::PublicKey;18use russh::Channel;19use tempfile::TempDir;20use tokio::net::UnixListener;21use tokio::sync::oneshot;22use tokio::{23 fs,24 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},25};26use tracing::{debug, warn};27use uuid::Uuid;2829pub mod editor;30mod forwarded;31mod shell;32mod ssh_exec;33mod subprocess;3435use self::ssh_exec::SshExecChild;36pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};37pub use forwarded::{RemowtListener, RemowtStream};38pub use shell::{RemowtShell, RemowtShellResizer};3940type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4142fn sh_quote(s: impl AsRef<str>) -> String {43 format!("'{}'", s.as_ref().replace('\'', "'\\''"))44}4546const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];4748pub struct AgentBundle {49 dir: PathBuf,50 hashes: HashMap<String, String>,51}5253impl AgentBundle {54 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {55 let dir = dir.into();56 let hashes_path = dir.join("hashes");57 let raw = std::fs::read_to_string(&hashes_path)58 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;59 let mut hashes = HashMap::new();60 for line in raw.lines() {61 let line = line.trim();62 if line.is_empty() {63 continue;64 }65 let (arch, hash) = line66 .split_once(char::is_whitespace)67 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;68 hashes.insert(arch.to_owned(), hash.trim().to_owned());69 }70 ensure!(71 !hashes.is_empty(),72 "agent bundle {} has no hashes",73 dir.display()74 );75 Ok(Self { dir, hashes })76 }7778 fn binary(&self, arch: &str) -> PathBuf {79 self.dir.join(format!("remowt-agent-{arch}"))80 }8182 fn local_binary(&self) -> Result<PathBuf> {83 let arch = env::consts::ARCH;84 let path = self.binary(arch);85 ensure!(86 path.is_file(),87 "no local remowt-agent build for arch {arch} in bundle {}",88 self.dir.display()89 );90 Ok(path)91 }92}9394async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {95 let ch = sess.channel_open_session().await?;96 ch.exec(true, cmd).await?;9798 let mut child = SshExecChild::from_exec(ch);99 drop(child.stdin);100 drain_stderr(child.stderr, cmd.to_owned());101102 let mut out = Vec::new();103 child.stdout.read_to_end(&mut out).await?;104 let code = child.exit.await.ok().flatten();105 Ok((code, out))106}107108async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {109 let (code, mut out) = run(sess, cmd).await?;110 ensure!(111 code == Some(0),112 "remote command failed (exit {code:?}): {cmd}"113 );114 if !out.is_empty() {115 ensure!(116 out.ends_with(b"\n"),117 "remote command was not newline-terminated: {cmd}: {out:?}"118 );119 out.pop();120 }121 String::from_utf8(out).context("expected utf8 output for command")122}123124async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {125 debug!("uname -a");126 let arch = run_string_ok(sess, "uname -m").await?;127 let hash = bundle128 .hashes129 .get(&arch)130 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;131132 debug!("get dir");133 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;134 let dir = if cache.is_empty() {135 let home = run_string_ok(sess, "echo \"$HOME\"").await?;136 ensure!(137 !home.is_empty(),138 "remote $HOME and $XDG_CACHE_HOME both empty"139 );140 Utf8PathBuf::from(home).join(".cache/remowt")141 } else {142 Utf8PathBuf::from(cache).join("remowt")143 };144 let path = dir.join(hash);145146 debug!("presence");147 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;148 if present != Some(0) {149 let bin = bundle.binary(&arch);150 debug!("read");151 let bytes = fs::read(&bin)152 .await153 .with_context(|| format!("reading agent binary {}", bin.display()))?;154 debug!("upload");155 upload_agent(sess, &dir, &path, bytes).await?;156 }157 Ok(path)158}159160async fn upload_agent(161 sess: &Handle<SshHandler>,162 dir: &Utf8Path,163 path: &Utf8Path,164 bytes: Vec<u8>,165) -> Result<()> {166 debug!("mkdirp");167 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;168169 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));170 let ch = sess.channel_open_session().await?;171 debug!("cat");172 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;173174 let mut child = SshExecChild::from_exec(ch);175 child176 .stdin177 .write_all(&bytes)178 .await179 .context("sending agent binary")?;180 child181 .stdin182 .shutdown()183 .await184 .context("sending agent binary")?;185 let code = child.wait().await;186 ensure!(code == Some(0), "agent upload failed (exit {code:?})");187188 debug!("chmod");189 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;190 run_string_ok(191 sess,192 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),193 )194 .await?;195 Ok(())196}197198pub struct SshHandler {199 host: String,200 port: u16,201 subs: Subs,202}203impl Handler for SshHandler {204 type Error = russh::Error;205 async fn check_server_key(206 &mut self,207 server_public_key: &PublicKey,208 ) -> Result<bool, Self::Error> {209 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)210 }211 async fn server_channel_open_forwarded_streamlocal(212 &mut self,213 channel: Channel<Msg>,214 socket_path: &str,215 _session: &mut Session,216 ) -> Result<(), Self::Error> {217 let Some(ch) = self218 .subs219 .lock()220 .expect("lock")221 .remove(&Utf8PathBuf::from(socket_path))222 else {223 return Err(russh::Error::WrongChannel);224 };225 let _ = ch.send(channel);226 Ok(())227 }228}229230enum Transport {231 Ssh {232 sess: Arc<Handle<SshHandler>>,233 subs: Subs,234 runtime_dir: Utf8PathBuf,235 agent_path: Utf8PathBuf,236 },237 Local {238 agent_path: PathBuf,239 runtime_dir: Utf8PathBuf,240 },241}242243struct RemowtInner {244 transport: Transport,245 rpc: Rpc<BifConfig>,246 elevated: tokio::sync::OnceCell<()>,247 #[allow(dead_code)]248 children: Mutex<Vec<tokio::process::Child>>,249 _runtime_tmp: Option<TempDir>,250}251252#[derive(Clone)]253pub struct Remowt(Arc<RemowtInner>);254255pub type RemowtRemote = Remote<BifConfig>;256257impl Remowt {258 259 260 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {261 let conf = russh_config::parse_home(host)?;262 let port = conf.host_config.port.or(conf.port).unwrap_or(22);263 let hostname = conf264 .host_config265 .hostname266 .clone()267 .unwrap_or_else(|| conf.host_name.clone());268 let user = conf269 .user270 .clone()271 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));272273 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));274 let mut sess = connect(275 Arc::new(Config::default()),276 (hostname.clone(), port),277 SshHandler {278 host: hostname,279 port,280 subs: subs.clone(),281 },282 )283 .await?;284285 let mut agent = AgentClient::connect_env().await?;286 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();287 let mut authenticated = false;288 for ident in agent.request_identities().await? {289 let AgentIdentity::PublicKey { key, .. } = ident else {290 continue;291 };292 if sess293 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)294 .await?295 .success()296 {297 authenticated = true;298 break;299 }300 }301 ensure!(authenticated, "ssh authentication failed");302303 let sess = Arc::new(sess);304305 debug!("deploying agent");306 let agent_path = deploy_agent(&sess, bundle).await?;307308 debug!("runtime dir");309 let runtime_dir = remote_runtime_dir(&sess).await?;310311 let rpc = Rpc::<BifConfig>::new(Address::User);312313 let cmd_chan = sess.channel_open_session().await?;314 debug!("starting agent");315 cmd_chan316 .exec(true, format!("{} real-agent", sh_quote(&agent_path)))317 .await?;318319 let child = SshExecChild::from_exec(cmd_chan);320 drain_stderr(child.stderr, "agent".to_owned());321 rpc.add_direct(322 Address::Agent,323 child_port(child.stdout, child.stdin),324 Rtt(0),325 );326327 Ok(Self(Arc::new(RemowtInner {328 transport: Transport::Ssh {329 sess,330 subs,331 runtime_dir,332 agent_path,333 },334 rpc,335 elevated: tokio::sync::OnceCell::new(),336 children: Mutex::new(Vec::new()),337 _runtime_tmp: None,338 })))339 }340341 342 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {343 let agent_path = bundle.local_binary()?;344 let mut child = tokio::process::Command::new(&agent_path)345 .arg("real-agent")346 .arg("--local")347 .stdin(std::process::Stdio::piped())348 .stdout(std::process::Stdio::piped())349 .kill_on_drop(true)350 .spawn()351 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;352 let stdin = child.stdin.take().expect("stdin piped");353 let stdout = child.stdout.take().expect("stdout piped");354355 let rpc = Rpc::<BifConfig>::new(Address::User);356 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));357358 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;359360 Ok(Self(Arc::new(RemowtInner {361 transport: Transport::Local {362 agent_path,363 runtime_dir,364 },365 rpc,366 elevated: tokio::sync::OnceCell::new(),367 children: Mutex::new(vec![child]),368 _runtime_tmp: runtime_tmp,369 })))370 }371372 373 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {374 match &self.0.transport {375 Transport::Ssh { sess, .. } => Some(sess.clone()),376 Transport::Local { .. } => None,377 }378 }379380 pub fn rpc(&self) -> Rpc<BifConfig> {381 self.0.rpc.clone()382 }383384 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {385 let client: PluginEndpointsClient<BifConfig> = self.endpoints();386 client387 .load_plugin(id, name.to_owned())388 .await?389 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))390 }391 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {392 self.ensure_escalated().await?;393 let client: PluginEndpointsClient<BifConfig> =394 PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));395 client396 .load_plugin_path(id, path.to_owned())397 .await?398 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))399 }400 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {401 R::wrap(self.0.rpc.remote(Address::Plugin(id)))402 }403404 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {405 R::wrap(self.0.rpc.remote(Address::Agent))406 }407 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {408 self.ensure_escalated().await?;409 Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))410 }411412 async fn ensure_escalated(&self) -> Result<()> {413 self.0414 .elevated415 .get_or_try_init(|| async {416 let (agent_path, local) = match &self.0.transport {417 Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),418 Transport::Local { agent_path, .. } => (419 agent_path420 .to_str()421 .ok_or_else(|| anyhow!("local agent path is not utf-8"))?422 .to_owned(),423 true,424 ),425 };426427 let (tool, flags) = self.detect_escalation().await?;428 let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();429 args.push(agent_path);430 args.push("real-agent".to_owned());431 args.push("--privileged".to_owned());432 if local {433 args.push("--local".to_owned());434 }435436 let child = self437 .spawn(SpawnOptions {438 program: tool.to_owned(),439 args,440 stdin: StdioMode::Pipe,441 stdout: StdioMode::Pipe,442 stderr: StderrMode::Inherit,443 ..Default::default()444 })445 .await446 .context("spawning privileged agent")?;447448 let stdin = child449 .stdin450 .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;451 let stdout = child452 .stdout453 .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;454455 let port = child_port(stdout, stdin);456 self.0457 .rpc458 .add_direct(Address::AgentPrivileged, port, Rtt(0));459 anyhow::Ok(())460 })461 .await?;462 Ok(())463 }464465 async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {466 for (tool, flags) in ESCALATORS {467 let probe = self468 .spawn(SpawnOptions {469 program: (*tool).to_owned(),470 args: vec!["--version".to_owned()],471 stdout: StdioMode::Null,472 stderr: StderrMode::Null,473 ..Default::default()474 })475 .await;476 if let Ok(child) = probe {477 let _ = child.wait().await;478 return Ok((tool, flags));479 }480 }481 bail!("no escalation tool found")482 }483484 485 pub fn runtime_dir(&self) -> Utf8PathBuf {486 match &self.0.transport {487 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),488 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),489 }490 }491492 493 pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {494 let sock = self495 .runtime_dir()496 .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));497 let listener = self.bind_unix(&sock).await?;498 Ok((listener, sock))499 }500501 502 pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {503 match &self.0.transport {504 Transport::Ssh { sess, subs, .. } => {505 let (tx, rx) = oneshot::channel();506 subs.lock().expect("lock").insert(path.to_owned(), tx);507 sess.streamlocal_forward(path.to_owned()).await?;508 Ok(RemowtListener::Ssh(rx))509 }510 Transport::Local { .. } => {511 let _ = std::fs::remove_file(path);512 Ok(RemowtListener::Local(513 UnixListener::bind(path)?,514 path.to_owned(),515 ))516 }517 }518 }519}520521fn drain_stderr(stream: DuplexStream, context: String) {522 tokio::spawn(async move {523 let mut reader = BufReader::new(stream).lines();524 loop {525 match reader.next_line().await {526 Ok(Some(line)) => warn!(context = %context, "{line}"),527 Ok(None) => break,528 Err(e) => {529 warn!(context = %context, "stderr read failed: {e}");530 break;531 }532 }533 }534 });535}536537fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {538 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {539 if !dir.is_empty() {540 return Ok((Utf8PathBuf::from(dir), None));541 }542 }543 let tmp = tempfile::Builder::new()544 .prefix("remowt.")545 .rand_bytes(12)546 .tempdir()?;547 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())548 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;549 Ok((dir, Some(tmp)))550}551552async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {553 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;554 let dir = dir.trim();555 if dir.is_empty() {556 let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;557 Ok(Utf8PathBuf::from(tmp))558 } else {559 Ok(Utf8PathBuf::from(dir))560 }561}