1use std::collections::HashMap;2use std::io;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_link_shared::{13 Address, BifConfig, ElevateEndpoints, ElevateError, Elevator, Fs, Pty, PtyClient, ShellId,14 Systemd,15};16use russh::client::{connect, Config, Handle, Handler, Msg, Session};17use russh::keys::agent::client::AgentClient;18use russh::keys::agent::AgentIdentity;19use russh::keys::check_known_hosts;20use russh::keys::ssh_key::PublicKey;21use russh::{Channel, ChannelMsg, ChannelStream};22use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};23use tokio::join;24use tokio::net::UnixListener;25use tokio::sync::mpsc;26use tokio::sync::oneshot::{self, channel};27use tracing::error;28use uuid::Uuid;2930pub mod editor;3132type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3334async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {35 let len = srx.read_u32().await?;36 let mut buf = BytesMut::zeroed(len as usize);37 srx.read_exact(&mut buf).await?;38 Ok(buf)39}40async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {41 stx.write_u32(value.len().try_into().expect("can't be larger"))42 .await?;43 stx.write_all(&value).await?;44 Ok(())45}4647fn sh_quote(s: impl AsRef<str>) -> String {48 format!("'{}'", s.as_ref().replace('\'', "'\\''"))49}5051const ESCALATORS: [(&str, &[&str]); 3] = [52 ("run0", &["--background=", "--pipe"]),53 ("sudo", &[]),54 ("doas", &[]),55];5657pub struct AgentBundle {58 dir: PathBuf,59 hashes: HashMap<String, String>,60}6162impl AgentBundle {63 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {64 let dir = dir.into();65 let hashes_path = dir.join("hashes");66 let raw = std::fs::read_to_string(&hashes_path)67 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;68 let mut hashes = HashMap::new();69 for line in raw.lines() {70 let line = line.trim();71 if line.is_empty() {72 continue;73 }74 let (arch, hash) = line75 .split_once(char::is_whitespace)76 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;77 hashes.insert(arch.to_owned(), hash.trim().to_owned());78 }79 ensure!(80 !hashes.is_empty(),81 "agent bundle {} has no hashes",82 dir.display()83 );84 Ok(Self { dir, hashes })85 }8687 fn binary(&self, arch: &str) -> PathBuf {88 self.dir.join(format!("remowt-agent-{arch}"))89 }90}9192async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {93 let mut ch = sess.channel_open_session().await?;94 ch.exec(true, cmd).await?;95 let mut out = Vec::new();96 let mut code = None;97 while let Some(msg) = ch.wait().await {98 match msg {99 ChannelMsg::Data { data } => out.extend(data.as_ref()),100 ChannelMsg::ExtendedData { data, .. } => {101 error!(102 "remote stderr: {}",103 String::from_utf8_lossy(data.as_ref()).trim()104 );105 }106 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),107 _ => {}108 }109 }110 Ok((code, out))111}112113async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {114 let (code, mut out) = run(sess, cmd).await?;115 ensure!(116 code == Some(0),117 "remote command failed (exit {code:?}): {cmd}"118 );119 ensure!(out.ends_with(b"\n"));120 out.pop();121 String::from_utf8(out).context("expected utf8 output for command")122}123124async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {125 let arch = run_string_ok(sess, "uname -m").await?;126 let hash = bundle127 .hashes128 .get(&arch)129 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;130131 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")132 .await?133 .trim()134 .to_owned();135 let dir = if cache.is_empty() {136 let home = run_string_ok(sess, "echo \"$HOME\"").await?;137 ensure!(138 !home.is_empty(),139 "remote $HOME and $XDG_CACHE_HOME both empty"140 );141 Utf8PathBuf::from(home).join("cache/remowt")142 } else {143 Utf8PathBuf::from(cache).join("remowt")144 };145 let path = dir.join(hash);146147 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;148 if present != Some(0) {149 let bin = bundle.binary(&arch);150 let bytes = std::fs::read(&bin)151 .with_context(|| format!("reading agent binary {}", bin.display()))?;152 upload_agent(sess, &dir, &path, bytes).await?;153 }154 Ok(path)155}156157async fn upload_agent(158 sess: &Handle<SshHandler>,159 dir: &Utf8Path,160 path: &Utf8Path,161 bytes: Vec<u8>,162) -> Result<()> {163 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;164165 let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));166 let ch = sess.channel_open_session().await?;167 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;168 ch.data_bytes(bytes).await?;169 ch.eof().await?;170 let mut ch = ch;171 let mut code = None;172 while let Some(msg) = ch.wait().await {173 match msg {174 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),175 ChannelMsg::ExtendedData { data, .. } => {176 error!(177 "agent upload: {}",178 String::from_utf8_lossy(data.as_ref()).trim()179 );180 }181 _ => {}182 }183 }184 ensure!(code == Some(0), "agent upload failed (exit {code:?})");185186 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;187 run_string_ok(188 sess,189 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),190 )191 .await?;192 Ok(())193}194195async fn detect_escalation(196 sess: &Handle<SshHandler>,197) -> Result<(&'static str, &'static [&'static str])> {198 for (tool, flags) in ESCALATORS {199 200 let (code, _) = run(sess, &format!("command -v {tool}")).await?;201 if code == Some(0) {202 return Ok((tool, flags));203 }204 }205 bail!("no escalation tool (run0/sudo/doas) found on remote")206}207208fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {209 let mut parts = vec![tool.to_owned()];210 parts.extend(flags.iter().map(|f| f.to_string()));211 parts.push(sh_quote(agent_path));212 parts.push("real-agent".to_owned());213 parts.push("--privileged".to_owned());214 if let Some(p) = path {215 parts.push("--path".to_owned());216 parts.push(sh_quote(p));217 }218 parts.join(" ")219}220221fn find_in_path(name: &str) -> Option<std::path::PathBuf> {222 let path = std::env::var_os("PATH")?;223 std::env::split_paths(&path)224 .map(|dir| dir.join(name))225 .find(|p| p.is_file())226}227228fn port_from_channel(ch: Channel<Msg>) -> Port {229 Port::new(move |mut rx, tx| async move {230 let (mut srx, mut stx) = tokio::io::split(ch.into_stream());231 let srx_task = async move {232 loop {233 match read(&mut srx).await {234 Ok(buf) => {235 if tx.send(buf.freeze()).is_err() {236 break;237 }238 }239 Err(e) => {240 error!("channel read failed: {e}");241 break;242 }243 }244 }245 };246 let stx_task = async move {247 while let Some(value) = rx.recv().await {248 if let Err(e) = write(&mut stx, value).await {249 error!("channel write failed: {e}");250 break;251 }252 }253 };254 join!(srx_task, stx_task);255 })256}257258pub struct SshHandler {259 host: String,260 port: u16,261 subs: Subs,262}263impl Handler for SshHandler {264 type Error = russh::Error;265 async fn check_server_key(266 &mut self,267 server_public_key: &PublicKey,268 ) -> Result<bool, Self::Error> {269 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)270 }271 async fn server_channel_open_forwarded_streamlocal(272 &mut self,273 channel: Channel<Msg>,274 socket_path: &str,275 _session: &mut Session,276 ) -> Result<(), Self::Error> {277 let Some(ch) = self278 .subs279 .lock()280 .expect("lock")281 .remove(&Utf8PathBuf::from(socket_path))282 else {283 return Err(russh::Error::WrongChannel);284 };285 let _ = ch.send(channel);286 Ok(())287 }288}289290struct SshElevator {291 sess: Arc<Handle<SshHandler>>,292 rpc: WeakRpc<BifConfig>,293 agent_path: Utf8PathBuf,294}295impl Elevator for SshElevator {296 async fn elevate(&self) -> Result<(), ElevateError> {297 let fail = |e: String| ElevateError::Failed(e);298 let (tool, flags) = detect_escalation(&self.sess)299 .await300 .map_err(|e| fail(e.to_string()))?;301 let ch = self302 .sess303 .channel_open_session()304 .await305 .map_err(|e| fail(e.to_string()))?;306 ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))307 .await308 .map_err(|e| fail(e.to_string()))?;309 let rpc = self310 .rpc311 .clone()312 .upgrade()313 .ok_or_else(|| fail("rpc is gone".to_owned()))?;314 rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));315 Ok(())316 }317}318319pub struct RemoteChild {320 pub stdout: DuplexStream,321 pub stderr: DuplexStream,322 pub exit: oneshot::Receiver<Option<u32>>,323}324325enum Transport {326 Ssh {327 sess: Arc<Handle<SshHandler>>,328 subs: Subs,329 remote_dir: Utf8PathBuf,330 agent_path: Utf8PathBuf,331 },332 Local {333 #[allow(dead_code)]334 agent: Rpc<BifConfig>,335 agent_path: String,336 },337}338339pub struct Remowt {340 transport: Transport,341 rpc: Rpc<BifConfig>,342 elevated: tokio::sync::OnceCell<()>,343 children: Mutex<Vec<tokio::process::Child>>,344}345346pub type RemowtRemote = Remote<BifConfig>;347348fn loopback() -> (Port, Port) {349 let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();350 let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();351 let user = Port::new(move |mut rx, tx| async move {352 loop {353 tokio::select! {354 msg = rx.recv() => match msg {355 Some(msg) => if a2b_tx.send(msg).is_err() { break },356 None => break,357 },358 msg = b2a_rx.recv() => match msg {359 Some(msg) => if tx.send(msg).is_err() { break },360 None => break,361 },362 }363 }364 });365 let agent = Port::new(move |mut rx, tx| async move {366 loop {367 tokio::select! {368 msg = rx.recv() => match msg {369 Some(msg) => if b2a_tx.send(msg).is_err() { break },370 None => break,371 },372 msg = a2b_rx.recv() => match msg {373 Some(msg) => if tx.send(msg).is_err() { break },374 None => break,375 },376 }377 }378 });379 (user, agent)380}381382impl Remowt {383 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {384 let conf = russh_config::parse_home(host)?;385 let port = conf.host_config.port.unwrap_or(22);386 let hostname = conf387 .host_config388 .hostname389 .clone()390 .unwrap_or_else(|| conf.host_name.clone());391 let user = conf392 .user393 .clone()394 .unwrap_or_else(|| std::env::var("USER").unwrap_or_else(|_| "root".to_owned()));395396 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));397 let mut sess = connect(398 Arc::new(Config::default()),399 (hostname.clone(), port),400 SshHandler {401 host: hostname,402 port,403 subs: subs.clone(),404 },405 )406 .await?;407408 let mut agent = AgentClient::connect_env().await?;409 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();410 let mut authenticated = false;411 for ident in agent.request_identities().await? {412 let AgentIdentity::PublicKey { key, .. } = ident else {413 continue;414 };415 if sess416 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)417 .await?418 .success()419 {420 authenticated = true;421 break;422 }423 }424 ensure!(authenticated, "ssh authentication failed");425426 427 let sess = Arc::new(sess);428429 let agent_path = deploy_agent(&sess, bundle).await?;430431 let remote_dir = remote_mktemp(&sess).await?;432 let primary = remote_dir.join("primary.sock");433434 let (onetx, onerx) = channel();435 subs.lock().expect("lock").insert(primary.clone(), onetx);436 sess.streamlocal_forward(primary.clone()).await?;437438 let rpc = Rpc::<BifConfig>::new(Address::User);439440 441 let cmd_chan = sess.channel_open_session().await?;442 cmd_chan443 .exec(444 true,445 format!(446 "{} real-agent --path={}",447 sh_quote(&agent_path),448 sh_quote(&primary)449 ),450 )451 .await?;452453 let port = port_from_channel(454 onerx455 .await456 .map_err(|_| anyhow!("agent never opened its channel"))?,457 );458 rpc.add_direct(Address::Agent, port, Rtt(0));459460 Ok(Self {461 transport: Transport::Ssh {462 sess,463 subs,464 remote_dir,465 agent_path,466 },467 rpc,468 elevated: tokio::sync::OnceCell::new(),469 children: Mutex::new(Vec::new()),470 })471 }472473 pub async fn connect_local(agent_path: &str) -> Result<Self> {474 let (port_user, port_agent) = loopback();475 let rpc = Rpc::<BifConfig>::new(Address::User);476 let mut agent = Rpc::<BifConfig>::new(Address::Agent);477478 479 Fs::new().register_endpoints(&mut agent);480 Systemd.register_endpoints(&mut agent);481 Pty::new().register_endpoints(&mut agent);482483 agent.add_direct(Address::User, port_agent, Rtt(0));484 rpc.add_direct(Address::Agent, port_user, Rtt(0));485486 Ok(Self {487 transport: Transport::Local {488 agent,489 agent_path: agent_path.to_owned(),490 },491 rpc,492 elevated: tokio::sync::OnceCell::new(),493 children: Mutex::new(Vec::new()),494 })495 }496497 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {498 match &self.transport {499 Transport::Ssh { sess, .. } => Some(sess.clone()),500 Transport::Local { .. } => None,501 }502 }503504 pub fn rpc(&self) -> Rpc<BifConfig> {505 self.rpc.clone()506 }507508 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {509 R::wrap(self.rpc.remote(Address::Agent))510 }511 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {512 self.ensure_elevated().await?;513 Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))514 }515516 async fn ensure_elevated(&self) -> Result<()> {517 self.elevated518 .get_or_try_init(|| async {519 let port = match &self.transport {520 Transport::Ssh {521 sess, agent_path, ..522 } => {523 let (tool, flags) = detect_escalation(sess).await?;524 let ch = sess.channel_open_session().await?;525 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))526 .await?;527 port_from_channel(ch)528 }529 Transport::Local { agent_path, .. } => {530 let sock = std::env::temp_dir()531 .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));532 let _ = std::fs::remove_file(&sock);533 let listener = UnixListener::bind(&sock)?;534 let (tool, flags) = ESCALATORS535 .iter()536 .find(|(t, _)| find_in_path(t).is_some())537 .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;538 let child = tokio::process::Command::new(tool)539 .args(*flags)540 .arg(agent_path)541 .arg("real-agent")542 .arg("--privileged")543 .arg("--path")544 .arg(sock.to_str().expect("temp path is utf-8"))545 .kill_on_drop(true)546 .spawn()?;547 self.children.lock().expect("lock").push(child);548 let (stream, _) = listener.accept().await?;549 let _ = std::fs::remove_file(&sock);550 from_socket(stream)551 }552 };553 self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));554 anyhow::Ok(())555 })556 .await?;557 Ok(())558 }559560 pub async fn exec(&self, command: String) -> Result<RemoteChild> {561 let Some(sess) = self.ssh() else {562 bail!("exec should not be called on local")563 };564 let ch = sess.channel_open_session().await?;565 ch.exec(true, command).await?;566567 let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);568 let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);569 let (exit_tx, exit) = oneshot::channel();570571 tokio::spawn(async move {572 let mut ch = ch;573 let mut code = None;574 while let Some(msg) = ch.wait().await {575 match msg {576 ChannelMsg::Data { data } => {577 if out_w.write_all(&data).await.is_err() {578 break;579 }580 }581 ChannelMsg::ExtendedData { data, .. } => {582 if err_w.write_all(&data).await.is_err() {583 break;584 }585 }586 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),587 _ => {}588 }589 }590 let _ = out_w.shutdown().await;591 let _ = err_w.shutdown().await;592 let _ = exit_tx.send(code);593 });594595 Ok(RemoteChild {596 stdout,597 stderr,598 exit,599 })600 }601602 pub fn serve_elevate(&self) -> Result<()> {603 let Transport::Ssh {604 sess, agent_path, ..605 } = &self.transport606 else {607 bail!("elevate should not be called on local")608 };609 let mut rpc = self.rpc.clone();610 ElevateEndpoints(SshElevator {611 sess: sess.clone(),612 rpc: self.rpc.clone().downgrade(),613 agent_path: agent_path.to_owned(),614 })615 .register_endpoints(&mut rpc);616 Ok(())617 }618619 pub fn remote_dir(&self) -> Option<&Utf8Path> {620 match &self.transport {621 Transport::Ssh { remote_dir, .. } => Some(remote_dir),622 Transport::Local { .. } => None,623 }624 }625626 pub async fn forward_socket(627 &self,628 remote_path: &Utf8Path,629 ) -> Result<oneshot::Receiver<Channel<Msg>>> {630 let Transport::Ssh { sess, subs, .. } = &self.transport else {631 bail!("forward_socket should not be called on local")632 };633 let (tx, rx) = oneshot::channel();634 subs.lock()635 .expect("lock")636 .insert(remote_path.to_owned(), tx);637 sess.streamlocal_forward(remote_path.to_owned()).await?;638 Ok(rx)639 }640641 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {642 let Transport::Ssh { remote_dir, .. } = &self.transport else {643 bail!("open_shell should not be called on local")644 };645 let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));646647 let rx = self.forward_socket(&sock).await?;648 let client: PtyClient<BifConfig> = self.endpoints();649 let id = client650 .open_shell(sock, term.to_owned(), cols, rows)651 .await?652 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;653 let ch = rx654 .await655 .map_err(|_| anyhow!("agent never connected the shell socket"))?;656657 Ok(Shell {658 id,659 stream: ch.into_stream(),660 remote: self.rpc.remote(Address::Agent),661 })662 }663}664665pub struct Shell {666 pub id: ShellId,667 pub stream: ChannelStream<Msg>,668 remote: Remote<BifConfig>,669}670671impl Shell {672 pub fn resizer(&self) -> ShellResizer {673 ShellResizer {674 remote: self.remote.clone(),675 id: self.id,676 }677 }678}679680#[derive(Clone)]681pub struct ShellResizer {682 remote: Remote<BifConfig>,683 id: ShellId,684}685686impl ShellResizer {687 pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {688 PtyClient::wrap(self.remote.clone())689 .resize(self.id, cols, rows)690 .await?691 .map_err(|e| anyhow!("failed to resize remote shell: {e}"))692 }693}694695async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {696 let mut cmd_chan = sess.channel_open_session().await?;697 cmd_chan698 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")699 .await?;700 let mut stdout = vec![];701 loop {702 let Some(msg) = cmd_chan.wait().await else {703 bail!("unexpected channel end");704 };705 match msg {706 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),707 russh::ChannelMsg::ExitStatus { exit_status } => {708 if exit_status != 0 {709 bail!("mktemp failed");710 }711 break;712 }713 _ => {}714 }715 }716 ensure!(stdout.ends_with(b"\n"));717 stdout.pop();718 Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))719}