git.delta.rocks / jrsonnet / refs/commits / dfbdb4ac5bb1

difftreelog

source

crates/fleet-base/src/command.rs11.2 KiBsourcehistory
1use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};23use anyhow::{anyhow, Result};4use better_command::{Handler, NixHandler, PlainHandler};5use futures::StreamExt;6use itertools::Either;7use openssh::{OverSsh, OwningCommand, Session};8use tokio::{io::AsyncRead, process::Command, select};9use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};10use tracing::debug;1112use crate::host::EscalationStrategy;1314fn escape_bash(input: &str, out: &mut String) {15	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17		out.push_str(input);18		return;19	}20	out.push('\'');21	for (i, v) in input.split('\'').enumerate() {22		if i != 0 {23			out.push_str("'\"'\"'");24		}25		out.push_str(v);26	}27	out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30	os.as_ref().to_str().expect("non-utf8 data").to_owned()31}3233#[derive(Clone, Debug)]34pub struct MyCommand {35	command: String,36	args: Vec<String>,37	env: Vec<(String, String)>,38	ssh_session: Option<Arc<Session>>,39	escalation: EscalationStrategy,40	escalate: bool,41}42impl MyCommand {43	pub fn new_on(44		escalation: EscalationStrategy,45		cmd: impl AsRef<OsStr>,46		session: Arc<Session>,47	) -> Self {48		assert!(!cmd.as_ref().is_empty());49		Self {50			command: ostoutf8(cmd),51			args: vec![],52			env: vec![],53			ssh_session: Some(session),54			escalation,55			escalate: false,56		}57	}58	pub fn new(escalation: EscalationStrategy, cmd: impl AsRef<OsStr>) -> Self {59		assert!(!cmd.as_ref().is_empty());60		Self {61			command: ostoutf8(cmd),62			args: vec![],63			env: vec![],64			ssh_session: None,65			escalation,66			escalate: false,67		}68	}69	fn new_here(&self, cmd: impl AsRef<OsStr>) -> Self {70		if let Some(ssh_session) = self.ssh_session.clone() {71			Self::new_on(self.escalation, cmd, ssh_session)72		} else {73			Self::new(self.escalation, cmd)74		}75	}7677	fn into_args(self) -> Vec<String> {78		let mut out = Vec::new();79		if !self.env.is_empty() {80			out.push("env".to_owned());81			for (k, v) in self.env {82				assert!(!k.contains('='));83				out.push(format!("{k}={v}"));84			}85		}86		out.push(self.command);87		out.extend(self.args);88		out89	}9091	/// Translates environment variables into env command execution.92	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).93	///94	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.95	/// Figure out some way to transfer environment using stdio?96	fn translate_env_into_env(self) -> Self {97		if self.env.is_empty() {98			return self;99		}100		let mut out = self.new_here("env");101		for (k, v) in self.env {102			assert!(!k.contains('='));103			out.arg(format!("{k}={v}"));104		}105		out.arg(self.command);106		out.args(self.args);107108		out109	}110	fn into_string(self) -> String {111		let mut out = String::new();112		if !self.env.is_empty() {113			out.push_str("env");114			for (k, v) in self.env {115				out.push(' ');116				assert!(!k.contains('='));117				escape_bash(&k, &mut out);118				out.push('=');119				escape_bash(&v, &mut out);120			}121		}122		if !out.is_empty() {123			out.push(' ');124		}125		escape_bash(&self.command, &mut out);126		for arg in self.args {127			out.push(' ');128			escape_bash(&arg, &mut out);129		}130		out131	}132	fn into_command(self) -> Command {133		let mut out = Command::new(self.command);134		out.args(self.args);135		for (k, v) in self.env {136			out.env(k, v);137		}138		out139	}140	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {141		Ok(if let Some(session) = self.ssh_session.clone() {142			let cmd = self.translate_env_into_env().into_command();143			Either::Right(144				cmd.over_ssh(session)145					.map_err(|e| anyhow!("ssh error: {e}"))?,146			)147		} else {148			let cmd = self.into_command();149			Either::Left(cmd)150		})151	}152	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {153		let arg = arg.as_ref();154		self.args.push(ostoutf8(arg));155		self156	}157	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158		let arg = arg.as_ref();159		let value = value.as_ref();160		let arg = ostoutf8(arg);161		let value = ostoutf8(value);162		self.arg(format!("{arg}={value}"));163		self164	}165	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {166		self.arg(arg);167		self.arg(value);168		self169	}170	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {171		self.env172			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));173		self174	}175	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {176		for arg in args.into_iter() {177			let arg = arg.as_ref();178			self.args.push(ostoutf8(arg));179		}180		self181	}182	pub fn sudo(mut self) -> Self {183		self.escalate = true;184		self185	}186	fn wrap_sudo_if_needed(self) -> Self {187		if !self.escalate {188			return self;189		}190		match self.escalation {191			EscalationStrategy::Su => {192				let mut out = self.new_here("su");193				out.arg("-c").arg(self.into_string());194				out195			}196			EscalationStrategy::Sudo => {197				let mut out = self.new_here("sudo");198				out.args(self.into_args());199				out200			}201			EscalationStrategy::Run0 => {202				// run0 wants interactive authentication by default.203				let mut run0 = self.new_here("run0");204				let mut out = self.new_here("script");205206				// Red backgrounds messes with fleet formatting207				run0.arg("--background=");208				run0.args(self.into_args());209210				out.arg("-q");211				out.arg("/dev/null");212				out.arg("-c");213				out.arg(run0.into_string());214				dbg!(&out);215				out216			}217		}218	}219220	pub async fn run(self) -> Result<()> {221		let str = self.clone().into_string();222		let cmd = self.wrap_sudo_if_needed().into_command_new()?;223		match cmd {224			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,225			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,226		};227		Ok(())228	}229	pub async fn run_string(self) -> Result<String> {230		let bytes = self.run_bytes().await?;231		Ok(String::from_utf8(bytes)?)232	}233	pub async fn run_bytes(self) -> Result<Vec<u8>> {234		let str = self.clone().into_string();235		let cmd = self.wrap_sudo_if_needed().into_command_new()?;236		let v = match cmd {237			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,238			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,239		};240		Ok(v)241	}242243	pub async fn run_nix_string(mut self) -> Result<String> {244		let str = self.clone().into_string();245		self.arg("--log-format").arg("internal-json");246		let cmd = self.wrap_sudo_if_needed().into_command();247		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;248		Ok(String::from_utf8(bytes)?)249	}250	pub async fn run_nix(mut self) -> Result<()> {251		let str = self.clone().into_string();252		self.arg("--log-format").arg("internal-json");253		let mut cmd = self.wrap_sudo_if_needed().into_command();254		cmd.stdout(Stdio::inherit());255		run_nix_inner(str, cmd, &mut NixHandler::default()).await256	}257}258259struct EmptyAsyncRead;260impl AsyncRead for EmptyAsyncRead {261	fn poll_read(262		self: std::pin::Pin<&mut Self>,263		_cx: &mut std::task::Context<'_>,264		_buf: &mut tokio::io::ReadBuf<'_>,265	) -> Poll<std::io::Result<()>> {266		Poll::Pending267	}268}269270async fn run_nix_inner_stdout(271	str: String,272	cmd: Command,273	handler: &mut dyn Handler,274) -> Result<Vec<u8>> {275	Ok(run_nix_inner_raw(str, cmd, true, handler, None)276		.await?277		.expect("has out"))278}279async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {280	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;281	assert!(v.is_none());282	Ok(())283}284async fn run_nix_inner_stdout_ssh(285	str: String,286	cmd: OwningCommand<Arc<Session>>,287	handler: &mut dyn Handler,288) -> Result<Vec<u8>> {289	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)290		.await?291		.expect("has out"))292}293async fn run_nix_inner_ssh(294	str: String,295	cmd: OwningCommand<Arc<Session>>,296	handler: &mut dyn Handler,297) -> Result<()> {298	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;299	assert!(v.is_none());300	Ok(())301}302303async fn run_nix_inner_raw(304	str: String,305	mut cmd: Command,306	want_stdout: bool,307	err_handler: &mut dyn Handler,308	mut out_handler: Option<&mut dyn Handler>,309) -> Result<Option<Vec<u8>>> {310	cmd.stderr(Stdio::piped());311	cmd.stdout(Stdio::piped());312	debug!("running command {str:?} on local");313	let mut child = cmd.spawn()?;314	let mut stderr = child.stderr.take().unwrap();315	let stdout = child.stdout.take().unwrap();316	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());317	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));318	let mut ob = want_stdout319		.then(|| out.take().unwrap())320		.unwrap_or_else(|| Box::new(EmptyAsyncRead));321	let mut ol = (!want_stdout)322		.then(|| out.take().unwrap())323		.unwrap_or_else(|| Box::new(EmptyAsyncRead));324	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());325	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());326327	// while let Some(line) = read.next().await? {}328329	let mut out_buf = if want_stdout { Some(vec![]) } else { None };330	loop {331		select! {332			e = err.next() => {333				if let Some(e) = e {334					let e = e?;335					err_handler.handle_line(&e);336				}337			},338			o = ob.next() => {339				if let Some(o) = o {340					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);341				}342			},343			o = ol.next() => {344				if let Some(o) = o {345					let o = o?;346					if let Some(out) = out_handler.as_mut() {347						out.handle_line(&o)348					} else {349						err_handler.handle_line(&o)350					}351					// out_handler.handle_info(&o);352				}353			},354			code = child.wait() => {355				let code = code?;356				if !code.success() {357					anyhow::bail!("command '{str}' failed with status {}", code);358				}359				break;360			}361		}362	}363364	Ok(out_buf)365}366async fn run_nix_inner_raw_ssh(367	str: String,368	mut cmd: OwningCommand<Arc<Session>>,369	want_stdout: bool,370	err_handler: &mut dyn Handler,371	mut out_handler: Option<&mut dyn Handler>,372) -> Result<Option<Vec<u8>>> {373	debug!("running command {str:?} over ssh");374	cmd.stderr(openssh::Stdio::piped());375	cmd.stdout(openssh::Stdio::piped());376	let mut child = cmd.spawn().await?;377	let mut stderr = child.stderr().take().unwrap();378	let stdout = child.stdout().take().unwrap();379	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());380	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));381	let mut ob = want_stdout382		.then(|| out.take().unwrap())383		.unwrap_or_else(|| Box::new(EmptyAsyncRead));384	let mut ol = (!want_stdout)385		.then(|| out.take().unwrap())386		.unwrap_or_else(|| Box::new(EmptyAsyncRead));387	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());388	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());389390	// while let Some(line) = read.next().await? {}391392	let mut out_buf = if want_stdout { Some(vec![]) } else { None };393394	let mut wait_future = pin::pin!(child.wait());395	loop {396		select! {397			e = err.next() => {398				if let Some(e) = e {399					let e = e?;400					err_handler.handle_line(&e);401				}402			},403			o = ob.next() => {404				if let Some(o) = o {405					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);406				}407			},408			o = ol.next() => {409				if let Some(o) = o {410					let o = o?;411					if let Some(out) = out_handler.as_mut() {412						out.handle_line(&o)413					} else {414						err_handler.handle_line(&o)415					}416					// out_handler.handle_info(&o);417				}418			},419			code = &mut wait_future => {420				let code = code?;421				if !code.success() {422					anyhow::bail!("command '{str}' failed with status {}", code);423				}424				break;425			}426		}427	}428429	Ok(out_buf)430}