git.delta.rocks / remowt / refs/commits / 600e6edc0e86

difftreelog

source

crates/remowt-client/src/lib.rs19.3 KiBsourcehistory
1use std::collections::HashMap;2use std::path::PathBuf;3use std::sync::{Arc, Mutex};4use std::{env, io};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_endpoints::{13	fs::Fs,14	pty::{Pty, PtyClient, ShellId},15	systemd::Systemd,16};17use remowt_link_shared::plugin::PluginEndpointsClient;18use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator};19use russh::client::{connect, Config, Handle, Handler, Msg, Session};20use russh::keys::agent::client::AgentClient;21use russh::keys::agent::AgentIdentity;22use russh::keys::check_known_hosts;23use russh::keys::ssh_key::PublicKey;24use russh::{Channel, ChannelMsg, ChannelStream};25use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};26use tokio::join;27use tokio::net::UnixListener;28use tokio::sync::mpsc;29use tokio::sync::oneshot::{self, channel};30use tracing::error;31use uuid::Uuid;3233pub mod editor;3435type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3637async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {38	let len = srx.read_u32().await?;39	let mut buf = BytesMut::zeroed(len as usize);40	srx.read_exact(&mut buf).await?;41	Ok(buf)42}43async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {44	stx.write_u32(value.len().try_into().expect("can't be larger"))45		.await?;46	stx.write_all(&value).await?;47	Ok(())48}4950fn sh_quote(s: impl AsRef<str>) -> String {51	format!("'{}'", s.as_ref().replace('\'', "'\\''"))52}5354const ESCALATORS: [(&str, &[&str]); 3] = [55	("run0", &["--background=", "--pipe"]),56	("sudo", &[]),57	("doas", &[]),58];5960pub struct AgentBundle {61	dir: PathBuf,62	hashes: HashMap<String, String>,63}6465impl AgentBundle {66	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {67		let dir = dir.into();68		let hashes_path = dir.join("hashes");69		let raw = std::fs::read_to_string(&hashes_path)70			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;71		let mut hashes = HashMap::new();72		for line in raw.lines() {73			let line = line.trim();74			if line.is_empty() {75				continue;76			}77			let (arch, hash) = line78				.split_once(char::is_whitespace)79				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;80			hashes.insert(arch.to_owned(), hash.trim().to_owned());81		}82		ensure!(83			!hashes.is_empty(),84			"agent bundle {} has no hashes",85			dir.display()86		);87		Ok(Self { dir, hashes })88	}8990	fn binary(&self, arch: &str) -> PathBuf {91		self.dir.join(format!("remowt-agent-{arch}"))92	}93}9495async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {96	let mut ch = sess.channel_open_session().await?;97	ch.exec(true, cmd).await?;98	let mut out = Vec::new();99	let mut code = None;100	while let Some(msg) = ch.wait().await {101		match msg {102			ChannelMsg::Data { data } => out.extend(data.as_ref()),103			ChannelMsg::ExtendedData { data, .. } => {104				error!(105					"remote stderr: {}",106					String::from_utf8_lossy(data.as_ref()).trim()107				);108			}109			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),110			_ => {}111		}112	}113	Ok((code, out))114}115116async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {117	let (code, mut out) = run(sess, cmd).await?;118	ensure!(119		code == Some(0),120		"remote command failed (exit {code:?}): {cmd}"121	);122	ensure!(out.ends_with(b"\n"));123	out.pop();124	String::from_utf8(out).context("expected utf8 output for command")125}126127async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {128	let arch = run_string_ok(sess, "uname -m").await?;129	let hash = bundle130		.hashes131		.get(&arch)132		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133134	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")135		.await?136		.trim()137		.to_owned();138	let dir = if cache.is_empty() {139		let home = run_string_ok(sess, "echo \"$HOME\"").await?;140		ensure!(141			!home.is_empty(),142			"remote $HOME and $XDG_CACHE_HOME both empty"143		);144		Utf8PathBuf::from(home).join("cache/remowt")145	} else {146		Utf8PathBuf::from(cache).join("remowt")147	};148	let path = dir.join(hash);149150	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;151	if present != Some(0) {152		let bin = bundle.binary(&arch);153		let bytes = std::fs::read(&bin)154			.with_context(|| format!("reading agent binary {}", bin.display()))?;155		upload_agent(sess, &dir, &path, bytes).await?;156	}157	Ok(path)158}159160async fn upload_agent(161	sess: &Handle<SshHandler>,162	dir: &Utf8Path,163	path: &Utf8Path,164	bytes: Vec<u8>,165) -> Result<()> {166	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;167168	let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));169	let ch = sess.channel_open_session().await?;170	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;171	ch.data_bytes(bytes).await?;172	ch.eof().await?;173	let mut ch = ch;174	let mut code = None;175	while let Some(msg) = ch.wait().await {176		match msg {177			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),178			ChannelMsg::ExtendedData { data, .. } => {179				error!(180					"agent upload: {}",181					String::from_utf8_lossy(data.as_ref()).trim()182				);183			}184			_ => {}185		}186	}187	ensure!(code == Some(0), "agent upload failed (exit {code:?})");188189	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;190	run_string_ok(191		sess,192		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),193	)194	.await?;195	Ok(())196}197198async fn detect_escalation(199	sess: &Handle<SshHandler>,200) -> Result<(&'static str, &'static [&'static str])> {201	for (tool, flags) in ESCALATORS {202		// `tool` is a fixed identifier (no metacharacters), safe to interpolate.203		let (code, _) = run(sess, &format!("command -v {tool}")).await?;204		if code == Some(0) {205			return Ok((tool, flags));206		}207	}208	bail!("no escalation tool (run0/sudo/doas) found on remote")209}210211fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {212	let mut parts = vec![tool.to_owned()];213	parts.extend(flags.iter().map(|f| f.to_string()));214	parts.push(sh_quote(agent_path));215	parts.push("real-agent".to_owned());216	parts.push("--privileged".to_owned());217	if let Some(p) = path {218		parts.push("--path".to_owned());219		parts.push(sh_quote(p));220	}221	parts.join(" ")222}223224fn find_in_path(name: &str) -> Option<std::path::PathBuf> {225	let path = env::var_os("PATH")?;226	env::split_paths(&path)227		.map(|dir| dir.join(name))228		.find(|p| p.is_file())229}230231fn port_from_channel(ch: Channel<Msg>) -> Port {232	Port::new(move |mut rx, tx| async move {233		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());234		let srx_task = async move {235			loop {236				match read(&mut srx).await {237					Ok(buf) => {238						if tx.send(buf.freeze()).is_err() {239							break;240						}241					}242					Err(e) => {243						error!("channel read failed: {e}");244						break;245					}246				}247			}248		};249		let stx_task = async move {250			while let Some(value) = rx.recv().await {251				if let Err(e) = write(&mut stx, value).await {252					error!("channel write failed: {e}");253					break;254				}255			}256		};257		join!(srx_task, stx_task);258	})259}260261pub struct SshHandler {262	host: String,263	port: u16,264	subs: Subs,265}266impl Handler for SshHandler {267	type Error = russh::Error;268	async fn check_server_key(269		&mut self,270		server_public_key: &PublicKey,271	) -> Result<bool, Self::Error> {272		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)273	}274	async fn server_channel_open_forwarded_streamlocal(275		&mut self,276		channel: Channel<Msg>,277		socket_path: &str,278		_session: &mut Session,279	) -> Result<(), Self::Error> {280		let Some(ch) = self281			.subs282			.lock()283			.expect("lock")284			.remove(&Utf8PathBuf::from(socket_path))285		else {286			return Err(russh::Error::WrongChannel);287		};288		let _ = ch.send(channel);289		Ok(())290	}291}292293struct SshElevator {294	sess: Arc<Handle<SshHandler>>,295	rpc: WeakRpc<BifConfig>,296	agent_path: Utf8PathBuf,297}298impl Elevator for SshElevator {299	async fn elevate(&self) -> Result<(), ElevateError> {300		let fail = |e: String| ElevateError::Failed(e);301		let (tool, flags) = detect_escalation(&self.sess)302			.await303			.map_err(|e| fail(e.to_string()))?;304		let ch = self305			.sess306			.channel_open_session()307			.await308			.map_err(|e| fail(e.to_string()))?;309		ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))310			.await311			.map_err(|e| fail(e.to_string()))?;312		let rpc = self313			.rpc314			.clone()315			.upgrade()316			.ok_or_else(|| fail("rpc is gone".to_owned()))?;317		rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));318		Ok(())319	}320}321322pub struct RemoteChild {323	pub stdout: DuplexStream,324	pub stderr: DuplexStream,325	pub exit: oneshot::Receiver<Option<u32>>,326}327328enum Transport {329	Ssh {330		sess: Arc<Handle<SshHandler>>,331		subs: Subs,332		remote_dir: Utf8PathBuf,333		agent_path: Utf8PathBuf,334	},335	Local {336		#[allow(dead_code)]337		agent: Rpc<BifConfig>,338		agent_path: String,339	},340}341342pub struct Remowt {343	transport: Transport,344	rpc: Rpc<BifConfig>,345	elevated: tokio::sync::OnceCell<()>,346	children: Mutex<Vec<tokio::process::Child>>,347}348349pub type RemowtRemote = Remote<BifConfig>;350351fn loopback() -> (Port, Port) {352	let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();353	let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();354	let user = Port::new(move |mut rx, tx| async move {355		loop {356			tokio::select! {357				msg = rx.recv() => match msg {358					Some(msg) => if a2b_tx.send(msg).is_err() { break },359					None => break,360				},361				msg = b2a_rx.recv() => match msg {362					Some(msg) => if tx.send(msg).is_err() { break },363					None => break,364				},365			}366		}367	});368	let agent = Port::new(move |mut rx, tx| async move {369		loop {370			tokio::select! {371				msg = rx.recv() => match msg {372					Some(msg) => if b2a_tx.send(msg).is_err() { break },373					None => break,374				},375				msg = a2b_rx.recv() => match msg {376					Some(msg) => if tx.send(msg).is_err() { break },377					None => break,378				},379			}380		}381	});382	(user, agent)383}384385impl Remowt {386	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {387		let conf = russh_config::parse_home(host)?;388		let port = conf.host_config.port.or(conf.port).unwrap_or(22);389		let hostname = conf390			.host_config391			.hostname392			.clone()393			.unwrap_or_else(|| conf.host_name.clone());394		let user = conf395			.user396			.clone()397			.unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));398399		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));400		let mut sess = connect(401			Arc::new(Config::default()),402			(hostname.clone(), port),403			SshHandler {404				host: hostname,405				port,406				subs: subs.clone(),407			},408		)409		.await?;410411		let mut agent = AgentClient::connect_env().await?;412		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();413		let mut authenticated = false;414		for ident in agent.request_identities().await? {415			let AgentIdentity::PublicKey { key, .. } = ident else {416				continue;417			};418			if sess419				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)420				.await?421				.success()422			{423				authenticated = true;424				break;425			}426		}427		ensure!(authenticated, "ssh authentication failed");428429		// All remaining session ops take `&self`; share the handle.430		let sess = Arc::new(sess);431432		let agent_path = deploy_agent(&sess, bundle).await?;433434		let remote_dir = remote_mktemp(&sess).await?;435		let primary = remote_dir.join("primary.sock");436437		let (onetx, onerx) = channel();438		subs.lock().expect("lock").insert(primary.clone(), onetx);439		sess.streamlocal_forward(primary.clone()).await?;440441		let rpc = Rpc::<BifConfig>::new(Address::User);442443		// TODO: ensure no injection is possible in the socket path.444		let cmd_chan = sess.channel_open_session().await?;445		cmd_chan446			.exec(447				true,448				format!(449					"{} real-agent --path={}",450					sh_quote(&agent_path),451					sh_quote(&primary)452				),453			)454			.await?;455456		let port = port_from_channel(457			onerx458				.await459				.map_err(|_| anyhow!("agent never opened its channel"))?,460		);461		rpc.add_direct(Address::Agent, port, Rtt(0));462463		Ok(Self {464			transport: Transport::Ssh {465				sess,466				subs,467				remote_dir,468				agent_path,469			},470			rpc,471			elevated: tokio::sync::OnceCell::new(),472			children: Mutex::new(Vec::new()),473		})474	}475476	pub async fn connect_local(agent_path: &str) -> Result<Self> {477		let (port_user, port_agent) = loopback();478		let rpc = Rpc::<BifConfig>::new(Address::User);479		let mut agent = Rpc::<BifConfig>::new(Address::Agent);480481		// Register handlers before wiring up the link (see the agent binary).482		Fs::new().register_endpoints(&mut agent);483		Systemd.register_endpoints(&mut agent);484		Pty::new().register_endpoints(&mut agent);485486		agent.add_direct(Address::User, port_agent, Rtt(0));487		rpc.add_direct(Address::Agent, port_user, Rtt(0));488489		Ok(Self {490			transport: Transport::Local {491				agent,492				agent_path: agent_path.to_owned(),493			},494			rpc,495			elevated: tokio::sync::OnceCell::new(),496			children: Mutex::new(Vec::new()),497		})498	}499500	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {501		match &self.transport {502			Transport::Ssh { sess, .. } => Some(sess.clone()),503			Transport::Local { .. } => None,504		}505	}506507	pub fn rpc(&self) -> Rpc<BifConfig> {508		self.rpc.clone()509	}510511	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {512		R::wrap(self.rpc.remote(Address::Agent))513	}514515	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {516		let client: PluginEndpointsClient<BifConfig> = self.endpoints();517		client518			.load_plugin(id, name.to_owned())519			.await?520			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))521	}522	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {523		self.ensure_elevated().await?;524		let client: PluginEndpointsClient<BifConfig> =525			PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));526		client527			.load_plugin_path(id, path.to_owned())528			.await?529			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))530	}531	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {532		R::wrap(self.rpc.remote(Address::Plugin(id)))533	}534	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {535		self.ensure_elevated().await?;536		Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))537	}538539	async fn ensure_elevated(&self) -> Result<()> {540		self.elevated541			.get_or_try_init(|| async {542				let port = match &self.transport {543					Transport::Ssh {544						sess, agent_path, ..545					} => {546						let (tool, flags) = detect_escalation(sess).await?;547						let ch = sess.channel_open_session().await?;548						ch.exec(true, privileged_cmd(tool, flags, agent_path, None))549							.await?;550						port_from_channel(ch)551					}552					Transport::Local { agent_path, .. } => {553						let sock = env::temp_dir()554							.join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));555						let _ = std::fs::remove_file(&sock);556						let listener = UnixListener::bind(&sock)?;557						let (tool, flags) = ESCALATORS558							.iter()559							.find(|(t, _)| find_in_path(t).is_some())560							.ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;561						let child = tokio::process::Command::new(tool)562							.args(*flags)563							.arg(agent_path)564							.arg("real-agent")565							.arg("--privileged")566							.arg("--path")567							.arg(sock.to_str().expect("temp path is utf-8"))568							.kill_on_drop(true)569							.spawn()?;570						self.children.lock().expect("lock").push(child);571						let (stream, _) = listener.accept().await?;572						let _ = std::fs::remove_file(&sock);573						from_socket(stream)574					}575				};576				self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));577				anyhow::Ok(())578			})579			.await?;580		Ok(())581	}582583	pub async fn exec(&self, command: String) -> Result<RemoteChild> {584		let Some(sess) = self.ssh() else {585			bail!("exec should not be called on local")586		};587		let ch = sess.channel_open_session().await?;588		ch.exec(true, command).await?;589590		let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);591		let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);592		let (exit_tx, exit) = oneshot::channel();593594		tokio::spawn(async move {595			let mut ch = ch;596			let mut code = None;597			while let Some(msg) = ch.wait().await {598				match msg {599					ChannelMsg::Data { data } => {600						if out_w.write_all(&data).await.is_err() {601							break;602						}603					}604					ChannelMsg::ExtendedData { data, .. } => {605						if err_w.write_all(&data).await.is_err() {606							break;607						}608					}609					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),610					_ => {}611				}612			}613			let _ = out_w.shutdown().await;614			let _ = err_w.shutdown().await;615			let _ = exit_tx.send(code);616		});617618		Ok(RemoteChild {619			stdout,620			stderr,621			exit,622		})623	}624625	pub fn serve_elevate(&self) -> Result<()> {626		let Transport::Ssh {627			sess, agent_path, ..628		} = &self.transport629		else {630			bail!("elevate should not be called on local")631		};632		let mut rpc = self.rpc.clone();633		ElevateEndpoints(SshElevator {634			sess: sess.clone(),635			rpc: self.rpc.clone().downgrade(),636			agent_path: agent_path.to_owned(),637		})638		.register_endpoints(&mut rpc);639		Ok(())640	}641642	pub fn remote_dir(&self) -> Option<&Utf8Path> {643		match &self.transport {644			Transport::Ssh { remote_dir, .. } => Some(remote_dir),645			Transport::Local { .. } => None,646		}647	}648649	pub async fn forward_socket(650		&self,651		remote_path: &Utf8Path,652	) -> Result<oneshot::Receiver<Channel<Msg>>> {653		let Transport::Ssh { sess, subs, .. } = &self.transport else {654			bail!("forward_socket should not be called on local")655		};656		let (tx, rx) = oneshot::channel();657		subs.lock()658			.expect("lock")659			.insert(remote_path.to_owned(), tx);660		sess.streamlocal_forward(remote_path.to_owned()).await?;661		Ok(rx)662	}663664	pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {665		let Transport::Ssh { remote_dir, .. } = &self.transport else {666			bail!("open_shell should not be called on local")667		};668		let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));669670		let rx = self.forward_socket(&sock).await?;671		let client: PtyClient<BifConfig> = self.endpoints();672		let id = client673			.open_shell(sock, term.to_owned(), cols, rows)674			.await?675			.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;676		let ch = rx677			.await678			.map_err(|_| anyhow!("agent never connected the shell socket"))?;679680		Ok(Shell {681			id,682			stream: ch.into_stream(),683			remote: self.rpc.remote(Address::Agent),684		})685	}686}687688pub struct Shell {689	pub id: ShellId,690	pub stream: ChannelStream<Msg>,691	remote: Remote<BifConfig>,692}693694impl Shell {695	pub fn resizer(&self) -> ShellResizer {696		ShellResizer {697			remote: self.remote.clone(),698			id: self.id,699		}700	}701}702703#[derive(Clone)]704pub struct ShellResizer {705	remote: Remote<BifConfig>,706	id: ShellId,707}708709impl ShellResizer {710	pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {711		PtyClient::wrap(self.remote.clone())712			.resize(self.id, cols, rows)713			.await?714			.map_err(|e| anyhow!("failed to resize remote shell: {e}"))715	}716}717718async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {719	let mut cmd_chan = sess.channel_open_session().await?;720	cmd_chan721		.exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")722		.await?;723	let mut stdout = vec![];724	loop {725		let Some(msg) = cmd_chan.wait().await else {726			bail!("unexpected channel end");727		};728		match msg {729			russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),730			russh::ChannelMsg::ExitStatus { exit_status } => {731				if exit_status != 0 {732					bail!("mktemp failed");733				}734				break;735			}736			_ => {}737		}738	}739	ensure!(stdout.ends_with(b"\n"));740	stdout.pop();741	Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))742}