difftreelog
refactor drop NixDaemon::connect_daemon
in: trunk
4 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,9 +308,9 @@
[[package]]
name = "bifrostlink"
-version = "0.2.5"
+version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39"
+checksum = "2fb01af731c11dd31b23783a83a36a29f644cc1972481f6fa4f4fabc709079eb"
dependencies = [
"async-trait",
"async_fn_traits",
@@ -327,9 +327,9 @@
[[package]]
name = "bifrostlink-macros"
-version = "0.2.5"
+version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0"
+checksum = "8c4b7a5fb38b36bd81910c17ebf369f9296e508d92b1277a768a63c8a2254fdb"
dependencies = [
"proc-macro2",
"quote",
@@ -338,9 +338,9 @@
[[package]]
name = "bifrostlink-ports"
-version = "0.2.5"
+version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70"
+checksum = "977acfcb8ed3c24ab7c2f76fb3eeebff1533c72708733ce6020f2501980b7cf2"
dependencies = [
"bifrostlink",
"bytes",
@@ -1835,7 +1835,7 @@
[[package]]
name = "polkit-backend"
-version = "0.1.6"
+version = "0.1.7"
dependencies = [
"anyhow",
"clap",
@@ -2055,7 +2055,7 @@
[[package]]
name = "remowt-agent"
-version = "0.1.6"
+version = "0.1.7"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2083,7 +2083,7 @@
[[package]]
name = "remowt-client"
-version = "0.1.6"
+version = "0.1.7"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2106,7 +2106,7 @@
[[package]]
name = "remowt-endpoints"
-version = "0.1.6"
+version = "0.1.7"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2124,7 +2124,7 @@
[[package]]
name = "remowt-link-shared"
-version = "0.1.6"
+version = "0.1.7"
dependencies = [
"bifrostlink",
"bytes",
@@ -2138,7 +2138,7 @@
[[package]]
name = "remowt-plugin"
-version = "0.1.6"
+version = "0.1.7"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2152,7 +2152,7 @@
[[package]]
name = "remowt-polkit-shared"
-version = "0.1.6"
+version = "0.1.7"
dependencies = [
"nix",
"serde",
@@ -2161,7 +2161,7 @@
[[package]]
name = "remowt-ssh"
-version = "0.1.6"
+version = "0.1.7"
dependencies = [
"anyhow",
"async-trait",
@@ -2189,7 +2189,7 @@
[[package]]
name = "remowt-ui-prompt"
-version = "0.1.6"
+version = "0.1.7"
dependencies = [
"anyhow",
"bifrostlink",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,18 +3,18 @@
resolver = "2"
[workspace.package]
-version = "0.1.6"
+version = "0.1.7"
license = "MIT"
edition = "2021"
repository = "https://git.delta.rocks/r/remowt"
[workspace.dependencies]
-remowt-client = { version = "0.1.6", path = "crates/remowt-client" }
-remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" }
-remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" }
-remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" }
-remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" }
+remowt-client = { version = "0.1.7", path = "crates/remowt-client" }
+remowt-polkit-shared = { version = "0.1.7", path = "crates/polkit-shared" }
+remowt-link-shared = { version = "0.1.7", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.7", path = "crates/remowt-plugin" }
+remowt-ui-prompt = { version = "0.1.7", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.7", path = "crates/remowt-endpoints" }
bifrostlink = "0.2.0"
bifrostlink-macros = "0.2.0"
crates/remowt-client/src/lib.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use camino::{Utf8Path, Utf8PathBuf};10use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};13use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::keys::agent::client::AgentClient;15use russh::keys::agent::AgentIdentity;16use russh::keys::check_known_hosts;17use russh::keys::ssh_key::PublicKey;18use russh::Channel;19use tempfile::TempDir;20use tokio::io::AsyncRead;21use tokio::net::UnixListener;22use tokio::sync::oneshot;23use tokio::task::JoinHandle;24use tokio::{25 fs,26 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},27};28use tracing::{debug, info, warn};29use uuid::Uuid;3031pub mod editor;32mod forwarded;33mod shell;34mod ssh_exec;35mod subprocess;3637use self::ssh_exec::SshExecChild;38pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};39pub use forwarded::{RemowtListener, RemowtStream};40pub use shell::{RemowtShell, RemowtShellResizer};4142type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4344fn sh_quote(s: impl AsRef<str>) -> String {45 format!("'{}'", s.as_ref().replace('\'', "'\\''"))46}4748const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];4950pub struct AgentBundle {51 dir: PathBuf,52 hashes: HashMap<String, String>,53}5455impl AgentBundle {56 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {57 let dir = dir.into();58 let hashes_path = dir.join("hashes");59 let raw = std::fs::read_to_string(&hashes_path)60 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;61 let mut hashes = HashMap::new();62 for line in raw.lines() {63 let line = line.trim();64 if line.is_empty() {65 continue;66 }67 let (arch, hash) = line68 .split_once(char::is_whitespace)69 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;70 hashes.insert(arch.to_owned(), hash.trim().to_owned());71 }72 ensure!(73 !hashes.is_empty(),74 "agent bundle {} has no hashes",75 dir.display()76 );77 Ok(Self { dir, hashes })78 }7980 fn binary(&self, arch: &str) -> PathBuf {81 self.dir.join(format!("remowt-agent-{arch}"))82 }8384 fn local_binary(&self) -> Result<PathBuf> {85 let arch = env::consts::ARCH;86 let path = self.binary(arch);87 ensure!(88 path.is_file(),89 "no local remowt-agent build for arch {arch} in bundle {}",90 self.dir.display()91 );92 Ok(path)93 }94}9596async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {97 let ch = sess.channel_open_session().await?;98 ch.exec(true, cmd).await?;99100 let mut child = SshExecChild::from_exec(ch);101 drop(child.stdin);102 drain_to_tracing(child.stderr, cmd.to_owned(), true);103104 let mut out = Vec::new();105 child.stdout.read_to_end(&mut out).await?;106 let code = child.exit.await.ok().flatten();107 Ok((code, out))108}109110async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {111 let (code, mut out) = run(sess, cmd).await?;112 ensure!(113 code == Some(0),114 "remote command failed (exit {code:?}): {cmd}"115 );116 if !out.is_empty() {117 ensure!(118 out.ends_with(b"\n"),119 "remote command was not newline-terminated: {cmd}: {out:?}"120 );121 out.pop();122 }123 String::from_utf8(out).context("expected utf8 output for command")124}125126async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {127 debug!("uname -a");128 let arch = run_string_ok(sess, "uname -m").await?;129 let hash = bundle130 .hashes131 .get(&arch)132 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133134 debug!("get dir");135 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;136 let dir = if cache.is_empty() {137 let home = run_string_ok(sess, "echo \"$HOME\"").await?;138 ensure!(139 !home.is_empty(),140 "remote $HOME and $XDG_CACHE_HOME both empty"141 );142 Utf8PathBuf::from(home).join(".cache/remowt")143 } else {144 Utf8PathBuf::from(cache).join("remowt")145 };146 let path = dir.join(hash);147148 debug!("presence");149 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;150 if present != Some(0) {151 let bin = bundle.binary(&arch);152 debug!("read");153 let bytes = fs::read(&bin)154 .await155 .with_context(|| format!("reading agent binary {}", bin.display()))?;156 debug!("upload");157 upload_agent(sess, &dir, &path, bytes).await?;158 }159 Ok(path)160}161162async fn upload_agent(163 sess: &Handle<SshHandler>,164 dir: &Utf8Path,165 path: &Utf8Path,166 bytes: Vec<u8>,167) -> Result<()> {168 debug!("mkdirp");169 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;170171 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));172 let ch = sess.channel_open_session().await?;173 debug!("cat");174 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;175176 let mut child = SshExecChild::from_exec(ch);177 child178 .stdin179 .write_all(&bytes)180 .await181 .context("sending agent binary")?;182 child183 .stdin184 .shutdown()185 .await186 .context("sending agent binary")?;187 let code = child.wait().await;188 ensure!(code == Some(0), "agent upload failed (exit {code:?})");189190 debug!("chmod");191 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;192 run_string_ok(193 sess,194 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),195 )196 .await?;197 Ok(())198}199200pub struct SshHandler {201 host: String,202 port: u16,203 subs: Subs,204}205impl Handler for SshHandler {206 type Error = russh::Error;207 async fn check_server_key(208 &mut self,209 server_public_key: &PublicKey,210 ) -> Result<bool, Self::Error> {211 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)212 }213 async fn server_channel_open_forwarded_streamlocal(214 &mut self,215 channel: Channel<Msg>,216 socket_path: &str,217 _session: &mut Session,218 ) -> Result<(), Self::Error> {219 let Some(ch) = self220 .subs221 .lock()222 .expect("lock")223 .remove(&Utf8PathBuf::from(socket_path))224 else {225 return Err(russh::Error::WrongChannel);226 };227 let _ = ch.send(channel);228 Ok(())229 }230}231232enum Transport {233 Ssh {234 sess: Arc<Handle<SshHandler>>,235 subs: Subs,236 runtime_dir: Utf8PathBuf,237 agent_path: Utf8PathBuf,238 },239 Local {240 agent_path: PathBuf,241 runtime_dir: Utf8PathBuf,242 },243}244245struct RemowtInner {246 transport: Transport,247 rpc: Rpc<BifConfig>,248 elevated: tokio::sync::OnceCell<()>,249 #[allow(dead_code)]250 children: Mutex<Vec<tokio::process::Child>>,251 _runtime_tmp: Option<TempDir>,252}253254#[derive(Clone)]255pub struct Remowt(Arc<RemowtInner>);256257pub type RemowtRemote = Remote<BifConfig>;258259impl Remowt {260 /// Connect to the remote host over ssh, detect the architecture and deploy the required261 /// agent binary.262 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {263 let conf = russh_config::parse_home(host)?;264 let port = conf.host_config.port.or(conf.port).unwrap_or(22);265 let hostname = conf266 .host_config267 .hostname268 .clone()269 .unwrap_or_else(|| conf.host_name.clone());270 let user = conf271 .user272 .clone()273 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));274275 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));276 let mut sess = connect(277 Arc::new(Config::default()),278 (hostname.clone(), port),279 SshHandler {280 host: hostname,281 port,282 subs: subs.clone(),283 },284 )285 .await?;286287 let mut agent = AgentClient::connect_env().await?;288 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();289 let mut authenticated = false;290 for ident in agent.request_identities().await? {291 let AgentIdentity::PublicKey { key, .. } = ident else {292 continue;293 };294 if sess295 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)296 .await?297 .success()298 {299 authenticated = true;300 break;301 }302 }303 ensure!(authenticated, "ssh authentication failed");304305 let sess = Arc::new(sess);306307 debug!("deploying agent");308 let agent_path = deploy_agent(&sess, bundle).await?;309310 debug!("runtime dir");311 let runtime_dir = remote_runtime_dir(&sess).await?;312313 let rpc = Rpc::<BifConfig>::new(Address::User);314315 let cmd_chan = sess.channel_open_session().await?;316 debug!("starting agent");317 cmd_chan318 .exec(true, format!("{} real-agent", sh_quote(&agent_path)))319 .await?;320321 let child = SshExecChild::from_exec(cmd_chan);322 drain_to_tracing(child.stderr, "agent".to_owned(), true);323 rpc.add_direct(324 Address::Agent,325 child_port(child.stdout, child.stdin),326 Rtt(0),327 );328329 Ok(Self(Arc::new(RemowtInner {330 transport: Transport::Ssh {331 sess,332 subs,333 runtime_dir,334 agent_path,335 },336 rpc,337 elevated: tokio::sync::OnceCell::new(),338 children: Mutex::new(Vec::new()),339 _runtime_tmp: None,340 })))341 }342343 /// "Connect" to the local machine's agent, by starting the agent binary locally.344 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {345 let agent_path = bundle.local_binary()?;346 let mut child = tokio::process::Command::new(&agent_path)347 .arg("real-agent")348 .arg("--local")349 .stdin(std::process::Stdio::piped())350 .stdout(std::process::Stdio::piped())351 .kill_on_drop(true)352 .spawn()353 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;354 let stdin = child.stdin.take().expect("stdin piped");355 let stdout = child.stdout.take().expect("stdout piped");356357 let rpc = Rpc::<BifConfig>::new(Address::User);358 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));359360 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;361362 Ok(Self(Arc::new(RemowtInner {363 transport: Transport::Local {364 agent_path,365 runtime_dir,366 },367 rpc,368 elevated: tokio::sync::OnceCell::new(),369 children: Mutex::new(vec![child]),370 _runtime_tmp: runtime_tmp,371 })))372 }373374 /// Get the handle to the underlying russh session handle.375 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {376 match &self.0.transport {377 Transport::Ssh { sess, .. } => Some(sess.clone()),378 Transport::Local { .. } => None,379 }380 }381382 pub fn rpc(&self) -> Rpc<BifConfig> {383 self.0.rpc.clone()384 }385386 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {387 let client: PluginEndpointsClient<BifConfig> = self.endpoints();388 client389 .load_plugin(id, name.to_owned())390 .await?391 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))392 }393 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {394 self.ensure_escalated().await?;395 let client: PluginEndpointsClient<BifConfig> =396 PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));397 client398 .load_plugin_path(id, path.to_owned())399 .await?400 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))401 }402 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {403 R::wrap(self.0.rpc.remote(Address::Plugin(id)))404 }405406 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {407 R::wrap(self.0.rpc.remote(Address::Agent))408 }409 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {410 self.ensure_escalated().await?;411 Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))412 }413414 async fn ensure_escalated(&self) -> Result<()> {415 self.0416 .elevated417 .get_or_try_init(|| async {418 let (agent_path, local) = match &self.0.transport {419 Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),420 Transport::Local { agent_path, .. } => (421 agent_path422 .to_str()423 .ok_or_else(|| anyhow!("local agent path is not utf-8"))?424 .to_owned(),425 true,426 ),427 };428429 let (tool, flags) = self.detect_escalation().await?;430 let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();431 args.push(agent_path);432 args.push("real-agent".to_owned());433 args.push("--privileged".to_owned());434 if local {435 args.push("--local".to_owned());436 }437438 let child = self439 .spawn(SpawnOptions {440 program: tool.to_owned(),441 args,442 stdin: StdioMode::Pipe,443 stdout: StdioMode::Pipe,444 stderr: StderrMode::Inherit,445 ..Default::default()446 })447 .await448 .context("spawning privileged agent")?;449450 let stdin = child451 .stdin452 .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;453 let stdout = child454 .stdout455 .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;456457 let port = child_port(stdout, stdin);458 self.0459 .rpc460 .add_direct(Address::AgentPrivileged, port, Rtt(0));461 anyhow::Ok(())462 })463 .await?;464 Ok(())465 }466467 async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {468 for (tool, flags) in ESCALATORS {469 let probe = self470 .spawn(SpawnOptions {471 program: (*tool).to_owned(),472 args: vec!["--version".to_owned()],473 stdout: StdioMode::Null,474 stderr: StderrMode::Null,475 ..Default::default()476 })477 .await;478 if let Ok(child) = probe {479 let _ = child.wait().await;480 return Ok((tool, flags));481 }482 }483 bail!("no escalation tool found")484 }485486 /// XDG_RUNTIME_DIR on the remote machine.487 pub fn runtime_dir(&self) -> Utf8PathBuf {488 match &self.0.transport {489 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),490 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),491 }492 }493494 /// Bind unix listener socket on the remote machine with auto-generated path, returning the path.495 pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {496 let sock = self497 .runtime_dir()498 .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));499 let listener = self.bind_unix(&sock).await?;500 Ok((listener, sock))501 }502503 /// Bind unix listener socket on the remote machine on the specified path.504 pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {505 match &self.0.transport {506 Transport::Ssh { sess, subs, .. } => {507 let (tx, rx) = oneshot::channel();508 subs.lock().expect("lock").insert(path.to_owned(), tx);509 sess.streamlocal_forward(path.to_owned()).await?;510 Ok(RemowtListener::Ssh(rx))511 }512 Transport::Local { .. } => {513 let _ = std::fs::remove_file(path);514 Ok(RemowtListener::Local(515 UnixListener::bind(path)?,516 path.to_owned(),517 ))518 }519 }520 }521}522523pub(crate) fn drain_to_tracing(524 stream: impl AsyncRead + Unpin + 'static + Send,525 context: String,526 stderr: bool,527) -> JoinHandle<()> {528 tokio::spawn(async move {529 let mut reader = BufReader::new(stream);530 let mut buf = Vec::with_capacity(4096);531 loop {532 buf.clear();533 match reader.read_until(b'\n', &mut buf).await {534 Ok(0) => break,535 Ok(_) => {536 let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));537 if stderr {538 warn!(context = %context, "{line}");539 } else {540 info!(context = %context, "{line}");541 }542 }543 Err(e) => {544 warn!(context = %context, "child stdio read failed: {e}");545 break;546 }547 }548 }549 })550}551552fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {553 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {554 if !dir.is_empty() {555 return Ok((Utf8PathBuf::from(dir), None));556 }557 }558 let tmp = tempfile::Builder::new()559 .prefix("remowt.")560 .rand_bytes(12)561 .tempdir()?;562 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())563 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;564 Ok((dir, Some(tmp)))565}566567async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {568 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;569 let dir = dir.trim();570 if dir.is_empty() {571 let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;572 Ok(Utf8PathBuf::from(tmp))573 } else {574 Ok(Utf8PathBuf::from(dir))575 }576}1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use camino::{Utf8Path, Utf8PathBuf};10use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};13use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::keys::agent::client::AgentClient;15use russh::keys::agent::AgentIdentity;16use russh::keys::check_known_hosts;17use russh::keys::ssh_key::PublicKey;18use russh::Channel;19use tempfile::TempDir;20use tokio::io::AsyncRead;21use tokio::net::UnixListener;22use tokio::sync::oneshot;23use tokio::task::JoinHandle;24use tokio::{25 fs,26 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},27};28use tracing::{debug, info, warn};29use uuid::Uuid;3031pub mod editor;32mod forwarded;33mod shell;34mod ssh_exec;35mod subprocess;3637use self::ssh_exec::SshExecChild;38pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};39pub use forwarded::{RemowtListener, RemowtStream};40pub use shell::{RemowtShell, RemowtShellResizer};4142type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4344fn sh_quote(s: impl AsRef<str>) -> String {45 format!("'{}'", s.as_ref().replace('\'', "'\\''"))46}4748const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];4950pub struct AgentBundle {51 dir: PathBuf,52 hashes: HashMap<String, String>,53}5455impl AgentBundle {56 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {57 let dir = dir.into();58 let hashes_path = dir.join("hashes");59 let raw = std::fs::read_to_string(&hashes_path)60 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;61 let mut hashes = HashMap::new();62 for line in raw.lines() {63 let line = line.trim();64 if line.is_empty() {65 continue;66 }67 let (arch, hash) = line68 .split_once(char::is_whitespace)69 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;70 hashes.insert(arch.to_owned(), hash.trim().to_owned());71 }72 ensure!(73 !hashes.is_empty(),74 "agent bundle {} has no hashes",75 dir.display()76 );77 Ok(Self { dir, hashes })78 }7980 fn binary(&self, arch: &str) -> PathBuf {81 self.dir.join(format!("remowt-agent-{arch}"))82 }8384 fn local_binary(&self) -> Result<PathBuf> {85 let arch = env::consts::ARCH;86 let path = self.binary(arch);87 ensure!(88 path.is_file(),89 "no local remowt-agent build for arch {arch} in bundle {}",90 self.dir.display()91 );92 Ok(path)93 }94}9596async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {97 let ch = sess.channel_open_session().await?;98 ch.exec(true, cmd).await?;99100 let mut child = SshExecChild::from_exec(ch);101 drop(child.stdin);102 drain_to_tracing(child.stderr, cmd.to_owned(), true);103104 let mut out = Vec::new();105 child.stdout.read_to_end(&mut out).await?;106 let code = child.exit.await.ok().flatten();107 Ok((code, out))108}109110async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {111 let (code, mut out) = run(sess, cmd).await?;112 ensure!(113 code == Some(0),114 "remote command failed (exit {code:?}): {cmd}"115 );116 if !out.is_empty() {117 ensure!(118 out.ends_with(b"\n"),119 "remote command was not newline-terminated: {cmd}: {out:?}"120 );121 out.pop();122 }123 String::from_utf8(out).context("expected utf8 output for command")124}125126async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {127 debug!("uname -a");128 let arch = run_string_ok(sess, "uname -m").await?;129 let hash = bundle130 .hashes131 .get(&arch)132 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133134 debug!("get dir");135 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;136 let dir = if cache.is_empty() {137 let home = run_string_ok(sess, "echo \"$HOME\"").await?;138 ensure!(139 !home.is_empty(),140 "remote $HOME and $XDG_CACHE_HOME both empty"141 );142 Utf8PathBuf::from(home).join(".cache/remowt")143 } else {144 Utf8PathBuf::from(cache).join("remowt")145 };146 let path = dir.join(hash);147148 debug!("presence");149 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;150 if present != Some(0) {151 let bin = bundle.binary(&arch);152 debug!("read");153 let bytes = fs::read(&bin)154 .await155 .with_context(|| format!("reading agent binary {}", bin.display()))?;156 debug!("upload");157 upload_agent(sess, &dir, &path, bytes).await?;158 }159 Ok(path)160}161162async fn upload_agent(163 sess: &Handle<SshHandler>,164 dir: &Utf8Path,165 path: &Utf8Path,166 bytes: Vec<u8>,167) -> Result<()> {168 debug!("mkdirp");169 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;170171 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));172 let ch = sess.channel_open_session().await?;173 debug!("cat");174 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;175176 let mut child = SshExecChild::from_exec(ch);177 child178 .stdin179 .write_all(&bytes)180 .await181 .context("sending agent binary")?;182 child183 .stdin184 .shutdown()185 .await186 .context("sending agent binary")?;187 let code = child.wait().await;188 ensure!(code == Some(0), "agent upload failed (exit {code:?})");189190 debug!("chmod");191 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;192 run_string_ok(193 sess,194 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),195 )196 .await?;197 Ok(())198}199200pub struct SshHandler {201 host: String,202 port: u16,203 subs: Subs,204}205impl Handler for SshHandler {206 type Error = russh::Error;207 async fn check_server_key(208 &mut self,209 server_public_key: &PublicKey,210 ) -> Result<bool, Self::Error> {211 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)212 }213 async fn server_channel_open_forwarded_streamlocal(214 &mut self,215 channel: Channel<Msg>,216 socket_path: &str,217 _session: &mut Session,218 ) -> Result<(), Self::Error> {219 let Some(ch) = self220 .subs221 .lock()222 .expect("lock")223 .remove(&Utf8PathBuf::from(socket_path))224 else {225 return Err(russh::Error::WrongChannel);226 };227 let _ = ch.send(channel);228 Ok(())229 }230}231232enum Transport {233 Ssh {234 sess: Arc<Handle<SshHandler>>,235 subs: Subs,236 runtime_dir: Utf8PathBuf,237 agent_path: Utf8PathBuf,238 },239 Local {240 agent_path: PathBuf,241 runtime_dir: Utf8PathBuf,242 },243}244245struct RemowtInner {246 transport: Transport,247 rpc: Rpc<BifConfig>,248 elevated: tokio::sync::OnceCell<()>,249 #[allow(dead_code)]250 children: Mutex<Vec<tokio::process::Child>>,251 _runtime_tmp: Option<TempDir>,252}253254#[derive(Clone)]255pub struct Remowt(Arc<RemowtInner>);256257pub type RemowtRemote = Remote<BifConfig>;258259impl Remowt {260 /// Connect to the remote host over ssh, detect the architecture and deploy the required261 /// agent binary.262 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {263 let conf = russh_config::parse_home(host)?;264 let port = conf.host_config.port.or(conf.port).unwrap_or(22);265 let hostname = conf266 .host_config267 .hostname268 .clone()269 .unwrap_or_else(|| conf.host_name.clone());270 let user = conf271 .user272 .clone()273 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));274275 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));276 let mut sess = connect(277 Arc::new(Config::default()),278 (hostname.clone(), port),279 SshHandler {280 host: hostname,281 port,282 subs: subs.clone(),283 },284 )285 .await?;286287 let mut agent = AgentClient::connect_env().await?;288 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();289 let mut authenticated = false;290 for ident in agent.request_identities().await? {291 let AgentIdentity::PublicKey { key, .. } = ident else {292 continue;293 };294 if sess295 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)296 .await?297 .success()298 {299 authenticated = true;300 break;301 }302 }303 ensure!(authenticated, "ssh authentication failed");304305 let sess = Arc::new(sess);306307 debug!("deploying agent");308 let agent_path = deploy_agent(&sess, bundle).await?;309310 debug!("runtime dir");311 let runtime_dir = remote_runtime_dir(&sess).await?;312313 let rpc = Rpc::<BifConfig>::new(Address::User);314315 let cmd_chan = sess.channel_open_session().await?;316 debug!("starting agent");317 cmd_chan318 .exec(true, format!("{} real-agent", sh_quote(&agent_path)))319 .await?;320321 let child = SshExecChild::from_exec(cmd_chan);322 drain_to_tracing(child.stderr, "agent".to_owned(), true);323 rpc.add_direct(324 Address::Agent,325 child_port(child.stdout, child.stdin),326 Rtt(0),327 );328329 Ok(Self(Arc::new(RemowtInner {330 transport: Transport::Ssh {331 sess,332 subs,333 runtime_dir,334 agent_path,335 },336 rpc,337 elevated: tokio::sync::OnceCell::new(),338 children: Mutex::new(Vec::new()),339 _runtime_tmp: None,340 })))341 }342343 /// "Connect" to the local machine's agent, by starting the agent binary locally.344 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {345 let agent_path = bundle.local_binary()?;346 let mut child = tokio::process::Command::new(&agent_path)347 .arg("real-agent")348 .arg("--local")349 .stdin(std::process::Stdio::piped())350 .stdout(std::process::Stdio::piped())351 .kill_on_drop(true)352 .spawn()353 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;354 let stdin = child.stdin.take().expect("stdin piped");355 let stdout = child.stdout.take().expect("stdout piped");356357 let rpc = Rpc::<BifConfig>::new(Address::User);358 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));359360 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;361362 Ok(Self(Arc::new(RemowtInner {363 transport: Transport::Local {364 agent_path,365 runtime_dir,366 },367 rpc,368 elevated: tokio::sync::OnceCell::new(),369 children: Mutex::new(vec![child]),370 _runtime_tmp: runtime_tmp,371 })))372 }373374 /// Get the handle to the underlying russh session handle.375 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {376 match &self.0.transport {377 Transport::Ssh { sess, .. } => Some(sess.clone()),378 Transport::Local { .. } => None,379 }380 }381382 pub fn rpc(&self) -> Rpc<BifConfig> {383 self.0.rpc.clone()384 }385386 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {387 let client: PluginEndpointsClient<BifConfig> = self.endpoints();388 client389 .load_plugin(id, name.to_owned())390 .await?391 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))392 }393 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {394 self.ensure_escalated().await?;395 let client: PluginEndpointsClient<BifConfig> =396 PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));397 client398 .load_plugin_path(id, path.to_owned())399 .await?400 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))401 }402 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {403 R::wrap(self.0.rpc.remote(Address::Plugin(id)))404 }405406 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {407 R::wrap(self.0.rpc.remote(Address::Agent))408 }409 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {410 self.ensure_escalated().await?;411 Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))412 }413414 async fn ensure_escalated(&self) -> Result<()> {415 self.0416 .elevated417 .get_or_try_init(|| async {418 let (agent_path, local) = match &self.0.transport {419 Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),420 Transport::Local { agent_path, .. } => (421 agent_path422 .to_str()423 .ok_or_else(|| anyhow!("local agent path is not utf-8"))?424 .to_owned(),425 true,426 ),427 };428429 let (tool, flags) = self.detect_escalation().await?;430 let mut args: Vec<String> = Vec::new();431 args.push("-w".to_owned());432 args.push(tool.to_owned());433 args.extend(flags.iter().copied().map(str::to_owned));434 args.push(agent_path);435 args.push("real-agent".to_owned());436 args.push("--privileged".to_owned());437 if local {438 args.push("--local".to_owned());439 }440441 let child = self442 .spawn(SpawnOptions {443 program: "setsid".to_owned(),444 args,445 stdin: StdioMode::Pipe,446 stdout: StdioMode::Pipe,447 stderr: StderrMode::Inherit,448 ..Default::default()449 })450 .await451 .context("spawning privileged agent")?;452453 let stdin = child454 .stdin455 .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;456 let stdout = child457 .stdout458 .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;459460 let port = child_port(stdout, stdin);461 self.0462 .rpc463 .add_direct(Address::AgentPrivileged, port, Rtt(0));464 anyhow::Ok(())465 })466 .await?;467 Ok(())468 }469470 async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {471 for (tool, flags) in ESCALATORS {472 let probe = self473 .spawn(SpawnOptions {474 program: (*tool).to_owned(),475 args: vec!["--version".to_owned()],476 stdout: StdioMode::Null,477 stderr: StderrMode::Null,478 ..Default::default()479 })480 .await;481 if let Ok(child) = probe {482 let _ = child.wait().await;483 return Ok((tool, flags));484 }485 }486 bail!("no escalation tool found")487 }488489 /// XDG_RUNTIME_DIR on the remote machine.490 pub fn runtime_dir(&self) -> Utf8PathBuf {491 match &self.0.transport {492 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),493 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),494 }495 }496497 /// Bind unix listener socket on the remote machine with auto-generated path, returning the path.498 pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {499 let sock = self500 .runtime_dir()501 .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));502 let listener = self.bind_unix(&sock).await?;503 Ok((listener, sock))504 }505506 /// Bind unix listener socket on the remote machine on the specified path.507 pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {508 match &self.0.transport {509 Transport::Ssh { sess, subs, .. } => {510 let (tx, rx) = oneshot::channel();511 subs.lock().expect("lock").insert(path.to_owned(), tx);512 sess.streamlocal_forward(path.to_owned()).await?;513 Ok(RemowtListener::Ssh(rx))514 }515 Transport::Local { .. } => {516 let _ = std::fs::remove_file(path);517 Ok(RemowtListener::Local(518 UnixListener::bind(path)?,519 path.to_owned(),520 ))521 }522 }523 }524}525526pub(crate) fn drain_to_tracing(527 stream: impl AsyncRead + Unpin + 'static + Send,528 context: String,529 stderr: bool,530) -> JoinHandle<()> {531 tokio::spawn(async move {532 let mut reader = BufReader::new(stream);533 let mut buf = Vec::with_capacity(4096);534 loop {535 buf.clear();536 match reader.read_until(b'\n', &mut buf).await {537 Ok(0) => break,538 Ok(_) => {539 let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));540 if stderr {541 warn!(context = %context, "{line}");542 } else {543 info!(context = %context, "{line}");544 }545 }546 Err(e) => {547 warn!(context = %context, "child stdio read failed: {e}");548 break;549 }550 }551 }552 })553}554555fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {556 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {557 if !dir.is_empty() {558 return Ok((Utf8PathBuf::from(dir), None));559 }560 }561 let tmp = tempfile::Builder::new()562 .prefix("remowt.")563 .rand_bytes(12)564 .tempdir()?;565 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())566 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;567 Ok((dir, Some(tmp)))568}569570async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {571 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;572 let dir = dir.trim();573 if dir.is_empty() {574 let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;575 Ok(Utf8PathBuf::from(tmp))576 } else {577 Ok(Utf8PathBuf::from(dir))578 }579}crates/remowt-endpoints/src/nix_daemon.rsdiffbeforeafterboth--- a/crates/remowt-endpoints/src/nix_daemon.rs
+++ b/crates/remowt-endpoints/src/nix_daemon.rs
@@ -6,8 +6,6 @@
use std::result::Result;
use tokio::process::Command;
-pub const NIX_DAEMON_SOCKET: &str = "/nix/var/nix/daemon-socket/socket";
-
pub struct NixDaemon;
#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
@@ -20,22 +18,6 @@
#[endpoints(ns = 4)]
impl NixDaemon {
- #[endpoints(id = 1)]
- async fn connect_daemon(&self, socket: String) -> Result<(), Error> {
- let mut daemon = tokio::net::UnixStream::connect(NIX_DAEMON_SOCKET)
- .await
- .map_err(|e| Error::DaemonUnavailable(e.to_string()))?;
- let mut tunnel = tokio::net::UnixStream::connect(&socket)
- .await
- .map_err(|e| Error::Tunnel(e.to_string()))?;
- tokio::spawn(async move {
- if let Err(e) = tokio::io::copy_bidirectional(&mut daemon, &mut tunnel).await {
- tracing::debug!("nix daemon tunnel ended: {e}");
- }
- });
- Ok(())
- }
-
#[endpoints(id = 2)]
async fn serve_store(&self, store: String, socket: String) -> Result<(), Error> {
let mut child = Command::new("nix-daemon")