git.delta.rocks / remowt / refs/commits / 36902687f0c5

difftreelog

source

crates/remowt-client/src/lib.rs18.4 KiBsourcehistory
1use std::collections::HashMap;2use std::io;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_link_shared::{13	Address, BifConfig, ElevateEndpoints, ElevateError, Elevator, Fs, Pty, PtyClient, ShellId,14	Systemd,15};16use russh::client::{connect, Config, Handle, Handler, Msg, Session};17use russh::keys::agent::client::AgentClient;18use russh::keys::agent::AgentIdentity;19use russh::keys::check_known_hosts;20use russh::keys::ssh_key::PublicKey;21use russh::{Channel, ChannelMsg, ChannelStream};22use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};23use tokio::join;24use tokio::net::UnixListener;25use tokio::sync::mpsc;26use tokio::sync::oneshot::{self, channel};27use tracing::error;28use uuid::Uuid;2930pub mod editor;3132type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3334async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {35	let len = srx.read_u32().await?;36	let mut buf = BytesMut::zeroed(len as usize);37	srx.read_exact(&mut buf).await?;38	Ok(buf)39}40async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {41	stx.write_u32(value.len().try_into().expect("can't be larger"))42		.await?;43	stx.write_all(&value).await?;44	Ok(())45}4647fn sh_quote(s: impl AsRef<str>) -> String {48	format!("'{}'", s.as_ref().replace('\'', "'\\''"))49}5051const ESCALATORS: [(&str, &[&str]); 3] = [52	("run0", &["--background=", "--pipe"]),53	("sudo", &[]),54	("doas", &[]),55];5657pub struct AgentBundle {58	dir: PathBuf,59	hashes: HashMap<String, String>,60}6162impl AgentBundle {63	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {64		let dir = dir.into();65		let hashes_path = dir.join("hashes");66		let raw = std::fs::read_to_string(&hashes_path)67			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;68		let mut hashes = HashMap::new();69		for line in raw.lines() {70			let line = line.trim();71			if line.is_empty() {72				continue;73			}74			let (arch, hash) = line75				.split_once(char::is_whitespace)76				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;77			hashes.insert(arch.to_owned(), hash.trim().to_owned());78		}79		ensure!(80			!hashes.is_empty(),81			"agent bundle {} has no hashes",82			dir.display()83		);84		Ok(Self { dir, hashes })85	}8687	fn binary(&self, arch: &str) -> PathBuf {88		self.dir.join(format!("remowt-agent-{arch}"))89	}90}9192async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {93	let mut ch = sess.channel_open_session().await?;94	ch.exec(true, cmd).await?;95	let mut out = Vec::new();96	let mut code = None;97	while let Some(msg) = ch.wait().await {98		match msg {99			ChannelMsg::Data { data } => out.extend(data.as_ref()),100			ChannelMsg::ExtendedData { data, .. } => {101				error!(102					"remote stderr: {}",103					String::from_utf8_lossy(data.as_ref()).trim()104				);105			}106			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),107			_ => {}108		}109	}110	Ok((code, out))111}112113async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {114	let (code, mut out) = run(sess, cmd).await?;115	ensure!(116		code == Some(0),117		"remote command failed (exit {code:?}): {cmd}"118	);119	ensure!(out.ends_with(b"\n"));120	out.pop();121	String::from_utf8(out).context("expected utf8 output for command")122}123124async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {125	let arch = run_string_ok(sess, "uname -m").await?;126	let hash = bundle127		.hashes128		.get(&arch)129		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;130131	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")132		.await?133		.trim()134		.to_owned();135	let dir = if cache.is_empty() {136		let home = run_string_ok(sess, "echo \"$HOME\"").await?;137		ensure!(138			!home.is_empty(),139			"remote $HOME and $XDG_CACHE_HOME both empty"140		);141		Utf8PathBuf::from(home).join("cache/remowt")142	} else {143		Utf8PathBuf::from(cache).join("remowt")144	};145	let path = dir.join(hash);146147	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;148	if present != Some(0) {149		let bin = bundle.binary(&arch);150		let bytes = std::fs::read(&bin)151			.with_context(|| format!("reading agent binary {}", bin.display()))?;152		upload_agent(sess, &dir, &path, bytes).await?;153	}154	Ok(path)155}156157async fn upload_agent(158	sess: &Handle<SshHandler>,159	dir: &Utf8Path,160	path: &Utf8Path,161	bytes: Vec<u8>,162) -> Result<()> {163	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;164165	let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));166	let ch = sess.channel_open_session().await?;167	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;168	ch.data_bytes(bytes).await?;169	ch.eof().await?;170	let mut ch = ch;171	let mut code = None;172	while let Some(msg) = ch.wait().await {173		match msg {174			ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),175			ChannelMsg::ExtendedData { data, .. } => {176				error!(177					"agent upload: {}",178					String::from_utf8_lossy(data.as_ref()).trim()179				);180			}181			_ => {}182		}183	}184	ensure!(code == Some(0), "agent upload failed (exit {code:?})");185186	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;187	run_string_ok(188		sess,189		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),190	)191	.await?;192	Ok(())193}194195async fn detect_escalation(196	sess: &Handle<SshHandler>,197) -> Result<(&'static str, &'static [&'static str])> {198	for (tool, flags) in ESCALATORS {199		// `tool` is a fixed identifier (no metacharacters), safe to interpolate.200		let (code, _) = run(sess, &format!("command -v {tool}")).await?;201		if code == Some(0) {202			return Ok((tool, flags));203		}204	}205	bail!("no escalation tool (run0/sudo/doas) found on remote")206}207208fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {209	let mut parts = vec![tool.to_owned()];210	parts.extend(flags.iter().map(|f| f.to_string()));211	parts.push(sh_quote(agent_path));212	parts.push("real-agent".to_owned());213	parts.push("--privileged".to_owned());214	if let Some(p) = path {215		parts.push("--path".to_owned());216		parts.push(sh_quote(p));217	}218	parts.join(" ")219}220221fn find_in_path(name: &str) -> Option<std::path::PathBuf> {222	let path = std::env::var_os("PATH")?;223	std::env::split_paths(&path)224		.map(|dir| dir.join(name))225		.find(|p| p.is_file())226}227228fn port_from_channel(ch: Channel<Msg>) -> Port {229	Port::new(move |mut rx, tx| async move {230		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());231		let srx_task = async move {232			loop {233				match read(&mut srx).await {234					Ok(buf) => {235						if tx.send(buf.freeze()).is_err() {236							break;237						}238					}239					Err(e) => {240						error!("channel read failed: {e}");241						break;242					}243				}244			}245		};246		let stx_task = async move {247			while let Some(value) = rx.recv().await {248				if let Err(e) = write(&mut stx, value).await {249					error!("channel write failed: {e}");250					break;251				}252			}253		};254		join!(srx_task, stx_task);255	})256}257258pub struct SshHandler {259	host: String,260	port: u16,261	subs: Subs,262}263impl Handler for SshHandler {264	type Error = russh::Error;265	async fn check_server_key(266		&mut self,267		server_public_key: &PublicKey,268	) -> Result<bool, Self::Error> {269		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)270	}271	async fn server_channel_open_forwarded_streamlocal(272		&mut self,273		channel: Channel<Msg>,274		socket_path: &str,275		_session: &mut Session,276	) -> Result<(), Self::Error> {277		let Some(ch) = self278			.subs279			.lock()280			.expect("lock")281			.remove(&Utf8PathBuf::from(socket_path))282		else {283			return Err(russh::Error::WrongChannel);284		};285		let _ = ch.send(channel);286		Ok(())287	}288}289290struct SshElevator {291	sess: Arc<Handle<SshHandler>>,292	rpc: WeakRpc<BifConfig>,293	agent_path: Utf8PathBuf,294}295impl Elevator for SshElevator {296	async fn elevate(&self) -> Result<(), ElevateError> {297		let fail = |e: String| ElevateError::Failed(e);298		let (tool, flags) = detect_escalation(&self.sess)299			.await300			.map_err(|e| fail(e.to_string()))?;301		let ch = self302			.sess303			.channel_open_session()304			.await305			.map_err(|e| fail(e.to_string()))?;306		ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))307			.await308			.map_err(|e| fail(e.to_string()))?;309		let rpc = self310			.rpc311			.clone()312			.upgrade()313			.ok_or_else(|| fail("rpc is gone".to_owned()))?;314		rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));315		Ok(())316	}317}318319pub struct RemoteChild {320	pub stdout: DuplexStream,321	pub stderr: DuplexStream,322	pub exit: oneshot::Receiver<Option<u32>>,323}324325enum Transport {326	Ssh {327		sess: Arc<Handle<SshHandler>>,328		subs: Subs,329		remote_dir: Utf8PathBuf,330		agent_path: Utf8PathBuf,331	},332	Local {333		#[allow(dead_code)]334		agent: Rpc<BifConfig>,335		agent_path: String,336	},337}338339pub struct Remowt {340	transport: Transport,341	rpc: Rpc<BifConfig>,342	elevated: tokio::sync::OnceCell<()>,343	children: Mutex<Vec<tokio::process::Child>>,344}345346pub type RemowtRemote = Remote<BifConfig>;347348fn loopback() -> (Port, Port) {349	let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();350	let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();351	let user = Port::new(move |mut rx, tx| async move {352		loop {353			tokio::select! {354				msg = rx.recv() => match msg {355					Some(msg) => if a2b_tx.send(msg).is_err() { break },356					None => break,357				},358				msg = b2a_rx.recv() => match msg {359					Some(msg) => if tx.send(msg).is_err() { break },360					None => break,361				},362			}363		}364	});365	let agent = Port::new(move |mut rx, tx| async move {366		loop {367			tokio::select! {368				msg = rx.recv() => match msg {369					Some(msg) => if b2a_tx.send(msg).is_err() { break },370					None => break,371				},372				msg = a2b_rx.recv() => match msg {373					Some(msg) => if tx.send(msg).is_err() { break },374					None => break,375				},376			}377		}378	});379	(user, agent)380}381382impl Remowt {383	pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {384		let conf = russh_config::parse_home(host)?;385		let port = conf.host_config.port.unwrap_or(22);386		let hostname = conf387			.host_config388			.hostname389			.clone()390			.unwrap_or_else(|| conf.host_name.clone());391		let user = conf392			.user393			.clone()394			.unwrap_or_else(|| std::env::var("USER").unwrap_or_else(|_| "root".to_owned()));395396		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));397		let mut sess = connect(398			Arc::new(Config::default()),399			(hostname.clone(), port),400			SshHandler {401				host: hostname,402				port,403				subs: subs.clone(),404			},405		)406		.await?;407408		let mut agent = AgentClient::connect_env().await?;409		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();410		let mut authenticated = false;411		for ident in agent.request_identities().await? {412			let AgentIdentity::PublicKey { key, .. } = ident else {413				continue;414			};415			if sess416				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)417				.await?418				.success()419			{420				authenticated = true;421				break;422			}423		}424		ensure!(authenticated, "ssh authentication failed");425426		// All remaining session ops take `&self`; share the handle.427		let sess = Arc::new(sess);428429		let agent_path = deploy_agent(&sess, bundle).await?;430431		let remote_dir = remote_mktemp(&sess).await?;432		let primary = remote_dir.join("primary.sock");433434		let (onetx, onerx) = channel();435		subs.lock().expect("lock").insert(primary.clone(), onetx);436		sess.streamlocal_forward(primary.clone()).await?;437438		let rpc = Rpc::<BifConfig>::new(Address::User);439440		// TODO: ensure no injection is possible in the socket path.441		let cmd_chan = sess.channel_open_session().await?;442		cmd_chan443			.exec(444				true,445				format!(446					"{} real-agent --path={}",447					sh_quote(&agent_path),448					sh_quote(&primary)449				),450			)451			.await?;452453		let port = port_from_channel(454			onerx455				.await456				.map_err(|_| anyhow!("agent never opened its channel"))?,457		);458		rpc.add_direct(Address::Agent, port, Rtt(0));459460		Ok(Self {461			transport: Transport::Ssh {462				sess,463				subs,464				remote_dir,465				agent_path,466			},467			rpc,468			elevated: tokio::sync::OnceCell::new(),469			children: Mutex::new(Vec::new()),470		})471	}472473	pub async fn connect_local(agent_path: &str) -> Result<Self> {474		let (port_user, port_agent) = loopback();475		let rpc = Rpc::<BifConfig>::new(Address::User);476		let mut agent = Rpc::<BifConfig>::new(Address::Agent);477478		// Register handlers before wiring up the link (see the agent binary).479		Fs::new().register_endpoints(&mut agent);480		Systemd.register_endpoints(&mut agent);481		Pty::new().register_endpoints(&mut agent);482483		agent.add_direct(Address::User, port_agent, Rtt(0));484		rpc.add_direct(Address::Agent, port_user, Rtt(0));485486		Ok(Self {487			transport: Transport::Local {488				agent,489				agent_path: agent_path.to_owned(),490			},491			rpc,492			elevated: tokio::sync::OnceCell::new(),493			children: Mutex::new(Vec::new()),494		})495	}496497	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {498		match &self.transport {499			Transport::Ssh { sess, .. } => Some(sess.clone()),500			Transport::Local { .. } => None,501		}502	}503504	pub fn rpc(&self) -> Rpc<BifConfig> {505		self.rpc.clone()506	}507508	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {509		R::wrap(self.rpc.remote(Address::Agent))510	}511	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {512		self.ensure_elevated().await?;513		Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))514	}515516	async fn ensure_elevated(&self) -> Result<()> {517		self.elevated518			.get_or_try_init(|| async {519				let port = match &self.transport {520					Transport::Ssh {521						sess, agent_path, ..522					} => {523						let (tool, flags) = detect_escalation(sess).await?;524						let ch = sess.channel_open_session().await?;525						ch.exec(true, privileged_cmd(tool, flags, agent_path, None))526							.await?;527						port_from_channel(ch)528					}529					Transport::Local { agent_path, .. } => {530						let sock = std::env::temp_dir()531							.join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));532						let _ = std::fs::remove_file(&sock);533						let listener = UnixListener::bind(&sock)?;534						let (tool, flags) = ESCALATORS535							.iter()536							.find(|(t, _)| find_in_path(t).is_some())537							.ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;538						let child = tokio::process::Command::new(tool)539							.args(*flags)540							.arg(agent_path)541							.arg("real-agent")542							.arg("--privileged")543							.arg("--path")544							.arg(sock.to_str().expect("temp path is utf-8"))545							.kill_on_drop(true)546							.spawn()?;547						self.children.lock().expect("lock").push(child);548						let (stream, _) = listener.accept().await?;549						let _ = std::fs::remove_file(&sock);550						from_socket(stream)551					}552				};553				self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));554				anyhow::Ok(())555			})556			.await?;557		Ok(())558	}559560	pub async fn exec(&self, command: String) -> Result<RemoteChild> {561		let Some(sess) = self.ssh() else {562			bail!("exec should not be called on local")563		};564		let ch = sess.channel_open_session().await?;565		ch.exec(true, command).await?;566567		let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);568		let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);569		let (exit_tx, exit) = oneshot::channel();570571		tokio::spawn(async move {572			let mut ch = ch;573			let mut code = None;574			while let Some(msg) = ch.wait().await {575				match msg {576					ChannelMsg::Data { data } => {577						if out_w.write_all(&data).await.is_err() {578							break;579						}580					}581					ChannelMsg::ExtendedData { data, .. } => {582						if err_w.write_all(&data).await.is_err() {583							break;584						}585					}586					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),587					_ => {}588				}589			}590			let _ = out_w.shutdown().await;591			let _ = err_w.shutdown().await;592			let _ = exit_tx.send(code);593		});594595		Ok(RemoteChild {596			stdout,597			stderr,598			exit,599		})600	}601602	pub fn serve_elevate(&self) -> Result<()> {603		let Transport::Ssh {604			sess, agent_path, ..605		} = &self.transport606		else {607			bail!("elevate should not be called on local")608		};609		let mut rpc = self.rpc.clone();610		ElevateEndpoints(SshElevator {611			sess: sess.clone(),612			rpc: self.rpc.clone().downgrade(),613			agent_path: agent_path.to_owned(),614		})615		.register_endpoints(&mut rpc);616		Ok(())617	}618619	pub fn remote_dir(&self) -> Option<&Utf8Path> {620		match &self.transport {621			Transport::Ssh { remote_dir, .. } => Some(remote_dir),622			Transport::Local { .. } => None,623		}624	}625626	pub async fn forward_socket(627		&self,628		remote_path: &Utf8Path,629	) -> Result<oneshot::Receiver<Channel<Msg>>> {630		let Transport::Ssh { sess, subs, .. } = &self.transport else {631			bail!("forward_socket should not be called on local")632		};633		let (tx, rx) = oneshot::channel();634		subs.lock()635			.expect("lock")636			.insert(remote_path.to_owned(), tx);637		sess.streamlocal_forward(remote_path.to_owned()).await?;638		Ok(rx)639	}640641	pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {642		let Transport::Ssh { remote_dir, .. } = &self.transport else {643			bail!("open_shell should not be called on local")644		};645		let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));646647		let rx = self.forward_socket(&sock).await?;648		let client: PtyClient<BifConfig> = self.endpoints();649		let id = client650			.open_shell(sock, term.to_owned(), cols, rows)651			.await?652			.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;653		let ch = rx654			.await655			.map_err(|_| anyhow!("agent never connected the shell socket"))?;656657		Ok(Shell {658			id,659			stream: ch.into_stream(),660			remote: self.rpc.remote(Address::Agent),661		})662	}663}664665pub struct Shell {666	pub id: ShellId,667	pub stream: ChannelStream<Msg>,668	remote: Remote<BifConfig>,669}670671impl Shell {672	pub fn resizer(&self) -> ShellResizer {673		ShellResizer {674			remote: self.remote.clone(),675			id: self.id,676		}677	}678}679680#[derive(Clone)]681pub struct ShellResizer {682	remote: Remote<BifConfig>,683	id: ShellId,684}685686impl ShellResizer {687	pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {688		PtyClient::wrap(self.remote.clone())689			.resize(self.id, cols, rows)690			.await?691			.map_err(|e| anyhow!("failed to resize remote shell: {e}"))692	}693}694695async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {696	let mut cmd_chan = sess.channel_open_session().await?;697	cmd_chan698		.exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")699		.await?;700	let mut stdout = vec![];701	loop {702		let Some(msg) = cmd_chan.wait().await else {703			bail!("unexpected channel end");704		};705		match msg {706			russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),707			russh::ChannelMsg::ExitStatus { exit_status } => {708				if exit_status != 0 {709					bail!("mktemp failed");710				}711				break;712			}713			_ => {}714		}715	}716	ensure!(stdout.ends_with(b"\n"));717	stdout.pop();718	Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))719}