git.delta.rocks / remowt / refs/commits / eadb0bb3c19e

difftreelog

source

crates/remowt-client/src/lib.rs15.0 KiBsourcehistory
1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use camino::{Utf8Path, Utf8PathBuf};10use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};13use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::keys::agent::client::AgentClient;15use russh::keys::agent::AgentIdentity;16use russh::keys::check_known_hosts;17use russh::keys::ssh_key::PublicKey;18use russh::Channel;19use tempfile::TempDir;20use tokio::net::UnixListener;21use tokio::sync::oneshot;22use tokio::{23	fs,24	io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},25};26use tracing::{debug, warn};27use uuid::Uuid;2829pub mod editor;30mod forwarded;31mod shell;32mod ssh_exec;33mod subprocess;3435use self::ssh_exec::SshExecChild;36pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};37pub use forwarded::{RemowtListener, RemowtStream};38pub use shell::{RemowtShell, RemowtShellResizer};3940type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4142fn sh_quote(s: impl AsRef<str>) -> String {43	format!("'{}'", s.as_ref().replace('\'', "'\\''"))44}4546const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];4748pub struct AgentBundle {49	dir: PathBuf,50	hashes: HashMap<String, String>,51}5253impl AgentBundle {54	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {55		let dir = dir.into();56		let hashes_path = dir.join("hashes");57		let raw = std::fs::read_to_string(&hashes_path)58			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;59		let mut hashes = HashMap::new();60		for line in raw.lines() {61			let line = line.trim();62			if line.is_empty() {63				continue;64			}65			let (arch, hash) = line66				.split_once(char::is_whitespace)67				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;68			hashes.insert(arch.to_owned(), hash.trim().to_owned());69		}70		ensure!(71			!hashes.is_empty(),72			"agent bundle {} has no hashes",73			dir.display()74		);75		Ok(Self { dir, hashes })76	}7778	fn binary(&self, arch: &str) -> PathBuf {79		self.dir.join(format!("remowt-agent-{arch}"))80	}8182	fn local_binary(&self) -> Result<PathBuf> {83		let arch = env::consts::ARCH;84		let path = self.binary(arch);85		ensure!(86			path.is_file(),87			"no local remowt-agent build for arch {arch} in bundle {}",88			self.dir.display()89		);90		Ok(path)91	}92}9394async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {95	let ch = sess.channel_open_session().await?;96	ch.exec(true, cmd).await?;9798	let mut child = SshExecChild::from_exec(ch);99	drop(child.stdin);100	drain_stderr(child.stderr, cmd.to_owned());101102	let mut out = Vec::new();103	child.stdout.read_to_end(&mut out).await?;104	let code = child.exit.await.ok().flatten();105	Ok((code, out))106}107108async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {109	let (code, mut out) = run(sess, cmd).await?;110	ensure!(111		code == Some(0),112		"remote command failed (exit {code:?}): {cmd}"113	);114	if !out.is_empty() {115		ensure!(116			out.ends_with(b"\n"),117			"remote command was not newline-terminated: {cmd}: {out:?}"118		);119		out.pop();120	}121	String::from_utf8(out).context("expected utf8 output for command")122}123124async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {125	debug!("uname -a");126	let arch = run_string_ok(sess, "uname -m").await?;127	let hash = bundle128		.hashes129		.get(&arch)130		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;131132	debug!("get dir");133	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;134	let dir = if cache.is_empty() {135		let home = run_string_ok(sess, "echo \"$HOME\"").await?;136		ensure!(137			!home.is_empty(),138			"remote $HOME and $XDG_CACHE_HOME both empty"139		);140		Utf8PathBuf::from(home).join(".cache/remowt")141	} else {142		Utf8PathBuf::from(cache).join("remowt")143	};144	let path = dir.join(hash);145146	debug!("presence");147	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;148	if present != Some(0) {149		let bin = bundle.binary(&arch);150		debug!("read");151		let bytes = fs::read(&bin)152			.await153			.with_context(|| format!("reading agent binary {}", bin.display()))?;154		debug!("upload");155		upload_agent(sess, &dir, &path, bytes).await?;156	}157	Ok(path)158}159160async fn upload_agent(161	sess: &Handle<SshHandler>,162	dir: &Utf8Path,163	path: &Utf8Path,164	bytes: Vec<u8>,165) -> Result<()> {166	debug!("mkdirp");167	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;168169	let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));170	let ch = sess.channel_open_session().await?;171	debug!("cat");172	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;173174	let mut child = SshExecChild::from_exec(ch);175	child176		.stdin177		.write_all(&bytes)178		.await179		.context("sending agent binary")?;180	child181		.stdin182		.shutdown()183		.await184		.context("sending agent binary")?;185	let code = child.wait().await;186	ensure!(code == Some(0), "agent upload failed (exit {code:?})");187188	debug!("chmod");189	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;190	run_string_ok(191		sess,192		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),193	)194	.await?;195	Ok(())196}197198pub struct SshHandler {199	host: String,200	port: u16,201	subs: Subs,202}203impl Handler for SshHandler {204	type Error = russh::Error;205	async fn check_server_key(206		&mut self,207		server_public_key: &PublicKey,208	) -> Result<bool, Self::Error> {209		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)210	}211	async fn server_channel_open_forwarded_streamlocal(212		&mut self,213		channel: Channel<Msg>,214		socket_path: &str,215		_session: &mut Session,216	) -> Result<(), Self::Error> {217		let Some(ch) = self218			.subs219			.lock()220			.expect("lock")221			.remove(&Utf8PathBuf::from(socket_path))222		else {223			return Err(russh::Error::WrongChannel);224		};225		let _ = ch.send(channel);226		Ok(())227	}228}229230enum Transport {231	Ssh {232		sess: Arc<Handle<SshHandler>>,233		subs: Subs,234		runtime_dir: Utf8PathBuf,235		agent_path: Utf8PathBuf,236	},237	Local {238		agent_path: PathBuf,239		runtime_dir: Utf8PathBuf,240	},241}242243struct RemowtInner {244	transport: Transport,245	rpc: Rpc<BifConfig>,246	elevated: tokio::sync::OnceCell<()>,247	#[allow(dead_code)]248	children: Mutex<Vec<tokio::process::Child>>,249	_runtime_tmp: Option<TempDir>,250}251252#[derive(Clone)]253pub struct Remowt(Arc<RemowtInner>);254255pub type RemowtRemote = Remote<BifConfig>;256257impl Remowt {258	/// Connect to the remote host over ssh, detect the architecture and deploy the required259	/// agent binary.260	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {261		let conf = russh_config::parse_home(host)?;262		let port = conf.host_config.port.or(conf.port).unwrap_or(22);263		let hostname = conf264			.host_config265			.hostname266			.clone()267			.unwrap_or_else(|| conf.host_name.clone());268		let user = conf269			.user270			.clone()271			.unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));272273		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));274		let mut sess = connect(275			Arc::new(Config::default()),276			(hostname.clone(), port),277			SshHandler {278				host: hostname,279				port,280				subs: subs.clone(),281			},282		)283		.await?;284285		let mut agent = AgentClient::connect_env().await?;286		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();287		let mut authenticated = false;288		for ident in agent.request_identities().await? {289			let AgentIdentity::PublicKey { key, .. } = ident else {290				continue;291			};292			if sess293				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)294				.await?295				.success()296			{297				authenticated = true;298				break;299			}300		}301		ensure!(authenticated, "ssh authentication failed");302303		let sess = Arc::new(sess);304305		debug!("deploying agent");306		let agent_path = deploy_agent(&sess, bundle).await?;307308		debug!("runtime dir");309		let runtime_dir = remote_runtime_dir(&sess).await?;310311		let rpc = Rpc::<BifConfig>::new(Address::User);312313		let cmd_chan = sess.channel_open_session().await?;314		debug!("starting agent");315		cmd_chan316			.exec(true, format!("{} real-agent", sh_quote(&agent_path)))317			.await?;318319		let child = SshExecChild::from_exec(cmd_chan);320		drain_stderr(child.stderr, "agent".to_owned());321		rpc.add_direct(322			Address::Agent,323			child_port(child.stdout, child.stdin),324			Rtt(0),325		);326327		Ok(Self(Arc::new(RemowtInner {328			transport: Transport::Ssh {329				sess,330				subs,331				runtime_dir,332				agent_path,333			},334			rpc,335			elevated: tokio::sync::OnceCell::new(),336			children: Mutex::new(Vec::new()),337			_runtime_tmp: None,338		})))339	}340341	/// "Connect" to the local machine's agent, by starting the agent binary locally.342	pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {343		let agent_path = bundle.local_binary()?;344		let mut child = tokio::process::Command::new(&agent_path)345			.arg("real-agent")346			.arg("--local")347			.stdin(std::process::Stdio::piped())348			.stdout(std::process::Stdio::piped())349			.kill_on_drop(true)350			.spawn()351			.with_context(|| format!("spawning agent binary {}", agent_path.display()))?;352		let stdin = child.stdin.take().expect("stdin piped");353		let stdout = child.stdout.take().expect("stdout piped");354355		let rpc = Rpc::<BifConfig>::new(Address::User);356		rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));357358		let (runtime_dir, runtime_tmp) = local_runtime_dir()?;359360		Ok(Self(Arc::new(RemowtInner {361			transport: Transport::Local {362				agent_path,363				runtime_dir,364			},365			rpc,366			elevated: tokio::sync::OnceCell::new(),367			children: Mutex::new(vec![child]),368			_runtime_tmp: runtime_tmp,369		})))370	}371372	/// Get the handle to the underlying russh session handle.373	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {374		match &self.0.transport {375			Transport::Ssh { sess, .. } => Some(sess.clone()),376			Transport::Local { .. } => None,377		}378	}379380	pub fn rpc(&self) -> Rpc<BifConfig> {381		self.0.rpc.clone()382	}383384	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {385		let client: PluginEndpointsClient<BifConfig> = self.endpoints();386		client387			.load_plugin(id, name.to_owned())388			.await?389			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))390	}391	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {392		self.ensure_escalated().await?;393		let client: PluginEndpointsClient<BifConfig> =394			PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));395		client396			.load_plugin_path(id, path.to_owned())397			.await?398			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))399	}400	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {401		R::wrap(self.0.rpc.remote(Address::Plugin(id)))402	}403404	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {405		R::wrap(self.0.rpc.remote(Address::Agent))406	}407	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {408		self.ensure_escalated().await?;409		Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))410	}411412	async fn ensure_escalated(&self) -> Result<()> {413		self.0414			.elevated415			.get_or_try_init(|| async {416				let (agent_path, local) = match &self.0.transport {417					Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),418					Transport::Local { agent_path, .. } => (419						agent_path420							.to_str()421							.ok_or_else(|| anyhow!("local agent path is not utf-8"))?422							.to_owned(),423						true,424					),425				};426427				let (tool, flags) = self.detect_escalation().await?;428				let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();429				args.push(agent_path);430				args.push("real-agent".to_owned());431				args.push("--privileged".to_owned());432				if local {433					args.push("--local".to_owned());434				}435436				let child = self437					.spawn(SpawnOptions {438						program: tool.to_owned(),439						args,440						stdin: StdioMode::Pipe,441						stdout: StdioMode::Pipe,442						stderr: StderrMode::Inherit,443						..Default::default()444					})445					.await446					.context("spawning privileged agent")?;447448				let stdin = child449					.stdin450					.ok_or_else(|| anyhow!("privileged agent stdin missing"))?;451				let stdout = child452					.stdout453					.ok_or_else(|| anyhow!("privileged agent stdout missing"))?;454455				let port = child_port(stdout, stdin);456				self.0457					.rpc458					.add_direct(Address::AgentPrivileged, port, Rtt(0));459				anyhow::Ok(())460			})461			.await?;462		Ok(())463	}464465	async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {466		for (tool, flags) in ESCALATORS {467			let probe = self468				.spawn(SpawnOptions {469					program: (*tool).to_owned(),470					args: vec!["--version".to_owned()],471					stdout: StdioMode::Null,472					stderr: StderrMode::Null,473					..Default::default()474				})475				.await;476			if let Ok(child) = probe {477				let _ = child.wait().await;478				return Ok((tool, flags));479			}480		}481		bail!("no escalation tool found")482	}483484	/// XDG_RUNTIME_DIR on the remote machine.485	pub fn runtime_dir(&self) -> Utf8PathBuf {486		match &self.0.transport {487			Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),488			Transport::Local { runtime_dir, .. } => runtime_dir.clone(),489		}490	}491492	/// Bind unix listener socket on the remote machine with auto-generated path, returning the path.493	pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {494		let sock = self495			.runtime_dir()496			.join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));497		let listener = self.bind_unix(&sock).await?;498		Ok((listener, sock))499	}500501	/// Bind unix listener socket on the remote machine on the specified path.502	pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {503		match &self.0.transport {504			Transport::Ssh { sess, subs, .. } => {505				let (tx, rx) = oneshot::channel();506				subs.lock().expect("lock").insert(path.to_owned(), tx);507				sess.streamlocal_forward(path.to_owned()).await?;508				Ok(RemowtListener::Ssh(rx))509			}510			Transport::Local { .. } => {511				let _ = std::fs::remove_file(path);512				Ok(RemowtListener::Local(513					UnixListener::bind(path)?,514					path.to_owned(),515				))516			}517		}518	}519}520521fn drain_stderr(stream: DuplexStream, context: String) {522	tokio::spawn(async move {523		let mut reader = BufReader::new(stream).lines();524		loop {525			match reader.next_line().await {526				Ok(Some(line)) => warn!(context = %context, "{line}"),527				Ok(None) => break,528				Err(e) => {529					warn!(context = %context, "stderr read failed: {e}");530					break;531				}532			}533		}534	});535}536537fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {538	if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {539		if !dir.is_empty() {540			return Ok((Utf8PathBuf::from(dir), None));541		}542	}543	let tmp = tempfile::Builder::new()544		.prefix("remowt.")545		.rand_bytes(12)546		.tempdir()?;547	let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())548		.map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;549	Ok((dir, Some(tmp)))550}551552async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {553	let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;554	let dir = dir.trim();555	if dir.is_empty() {556		let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;557		Ok(Utf8PathBuf::from(tmp))558	} else {559		Ok(Utf8PathBuf::from(dir))560	}561}