git.delta.rocks / jrsonnet / refs/commits / 7c6930a6bff0

difftreelog

source

cmds/fleet/src/command.rs10.3 KiBsourcehistory
1use std::thread::sleep;2use std::time::Duration;3use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};45use anyhow::{anyhow, Result};6use better_command::{Handler, NixHandler, PlainHandler};7use futures::StreamExt;8use itertools::Either;9use openssh::{OverSsh, OwningCommand, Session};10use tokio::{io::AsyncRead, process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, debug};1314fn escape_bash(input: &str, out: &mut String) {15	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17		out.push_str(input);18		return;19	}20	out.push('\'');21	for (i, v) in input.split('\'').enumerate() {22		if i != 0 {23			out.push_str("'\"'\"'");24		}25		out.push_str(v);26	}27	out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30	os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34	command: String,35	args: Vec<String>,36	env: Vec<(String, String)>,37	ssh_session: Option<Arc<Session>>,38}39impl MyCommand {40	pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {41		assert!(!cmd.as_ref().is_empty());42		Self {43			command: ostoutf8(cmd),44			args: vec![],45			env: vec![],46			ssh_session: Some(session),47		}48	}49	pub fn new(cmd: impl AsRef<OsStr>) -> Self {50		assert!(!cmd.as_ref().is_empty());51		Self {52			command: ostoutf8(cmd),53			args: vec![],54			env: vec![],55			ssh_session: None,56		}57	}58	fn into_args(self) -> Vec<String> {59		let mut out = Vec::new();60		if !self.env.is_empty() {61			out.push("env".to_owned());62			for (k, v) in self.env {63				assert!(!k.contains('='));64				out.push(format!("{k}={v}"));65			}66		}67		out.push(self.command);68		out.extend(self.args);69		out70	}7172	/// Translates environment variables into env command execution.73	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).74	///75	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.76	/// Figure out some way to transfer environment using stdio?77	fn translate_env_into_env(self) -> Self {78		if self.env.is_empty() {79			return self;80		}81		let mut out = Self::new("env");82		out.ssh_session = self.ssh_session;83		for (k, v) in self.env {84			assert!(!k.contains('='));85			out.arg(format!("{k}={v}"));86		}87		out.arg(self.command);88		out.args(self.args);8990		out91	}92	fn into_string(self) -> String {93		let mut out = String::new();94		if !self.env.is_empty() {95			out.push_str("env");96			for (k, v) in self.env {97				out.push(' ');98				assert!(!k.contains('='));99				escape_bash(&k, &mut out);100				out.push('=');101				escape_bash(&v, &mut out);102			}103		}104		if !out.is_empty() {105			out.push(' ');106		}107		escape_bash(&self.command, &mut out);108		for arg in self.args {109			out.push(' ');110			escape_bash(&arg, &mut out);111		}112		out113	}114	fn into_command(self) -> Command {115		let mut out = Command::new(self.command);116		out.args(self.args);117		for (k, v) in self.env {118			out.env(k, v);119		}120		out121	}122	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {123		Ok(if let Some(session) = self.ssh_session.clone() {124			let cmd = self.translate_env_into_env().into_command();125			Either::Right(126				cmd.over_ssh(session)127					.map_err(|e| anyhow!("ssh error: {e}"))?,128			)129		} else {130			let cmd = self.into_command();131			Either::Left(cmd)132		})133	}134	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {135		let arg = arg.as_ref();136		self.args.push(ostoutf8(arg));137		self138	}139	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {140		let arg = arg.as_ref();141		let value = value.as_ref();142		let arg = ostoutf8(arg);143		let value = ostoutf8(value);144		self.arg(format!("{arg}={value}"));145		self146	}147	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {148		self.arg(arg);149		self.arg(value);150		self151	}152	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {153		self.env154			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));155		self156	}157	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {158		for arg in args.into_iter() {159			let arg = arg.as_ref();160			self.args.push(ostoutf8(arg));161		}162		self163	}164	pub fn sudo(mut self) -> Self {165		if std::env::var_os("NO_SUDO").is_some() {166			let mut out = Self::new("su");167			out.ssh_session = self.ssh_session.take();168			out.arg("-c").arg(self.into_string());169			out170		} else {171			let mut out = Self::new("sudo");172			out.ssh_session = self.ssh_session.take();173			out.args(self.into_args());174			out175		}176	}177178	pub async fn run(self) -> Result<()> {179		let str = self.clone().into_string();180		let cmd = self.into_command_new()?;181		match cmd {182			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,183			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,184		};185		Ok(())186	}187	pub async fn run_string(self) -> Result<String> {188		let bytes = self.run_bytes().await?;189		Ok(String::from_utf8(bytes)?)190	}191	pub async fn run_bytes(self) -> Result<Vec<u8>> {192		let str = self.clone().into_string();193		let cmd = self.into_command_new()?;194		let v = match cmd {195			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,196			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,197		};198		Ok(v)199	}200201	pub async fn run_nix_string(self) -> Result<String> {202		let str = self.clone().into_string();203		let mut cmd = self.into_command();204		cmd.arg("--log-format").arg("internal-json");205		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;206		Ok(String::from_utf8(bytes)?)207	}208	pub async fn run_nix(self) -> Result<()> {209		let str = self.clone().into_string();210		let mut cmd = self.into_command();211		cmd.arg("--log-format").arg("internal-json");212		cmd.stdout(Stdio::inherit());213		run_nix_inner(str, cmd, &mut NixHandler::default()).await214	}215}216217struct EmptyAsyncRead;218impl AsyncRead for EmptyAsyncRead {219	fn poll_read(220		self: std::pin::Pin<&mut Self>,221		_cx: &mut std::task::Context<'_>,222		_buf: &mut tokio::io::ReadBuf<'_>,223	) -> Poll<std::io::Result<()>> {224		Poll::Pending225	}226}227228async fn run_nix_inner_stdout(229	str: String,230	cmd: Command,231	handler: &mut dyn Handler,232) -> Result<Vec<u8>> {233	Ok(run_nix_inner_raw(str, cmd, true, handler, None)234		.await?235		.expect("has out"))236}237async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {238	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;239	assert!(v.is_none());240	Ok(())241}242async fn run_nix_inner_stdout_ssh(243	str: String,244	cmd: OwningCommand<Arc<Session>>,245	handler: &mut dyn Handler,246) -> Result<Vec<u8>> {247	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)248		.await?249		.expect("has out"))250}251async fn run_nix_inner_ssh(252	str: String,253	cmd: OwningCommand<Arc<Session>>,254	handler: &mut dyn Handler,255) -> Result<()> {256	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;257	assert!(v.is_none());258	Ok(())259}260261async fn run_nix_inner_raw(262	str: String,263	mut cmd: Command,264	want_stdout: bool,265	err_handler: &mut dyn Handler,266	mut out_handler: Option<&mut dyn Handler>,267) -> Result<Option<Vec<u8>>> {268	cmd.stderr(Stdio::piped());269	cmd.stdout(Stdio::piped());270	debug!("running command {cmd:?} on local");271	let mut child = cmd.spawn()?;272	let mut stderr = child.stderr.take().unwrap();273	let stdout = child.stdout.take().unwrap();274	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());275	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));276	let mut ob = want_stdout277		.then(|| out.take().unwrap())278		.unwrap_or_else(|| Box::new(EmptyAsyncRead));279	let mut ol = (!want_stdout)280		.then(|| out.take().unwrap())281		.unwrap_or_else(|| Box::new(EmptyAsyncRead));282	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());283	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());284285	// while let Some(line) = read.next().await? {}286287	let mut out_buf = if want_stdout { Some(vec![]) } else { None };288	loop {289		select! {290			e = err.next() => {291				if let Some(e) = e {292					let e = e?;293					err_handler.handle_line(&e);294				}295			},296			o = ob.next() => {297				if let Some(o) = o {298					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);299				}300			},301			o = ol.next() => {302				if let Some(o) = o {303					let o = o?;304					if let Some(out) = out_handler.as_mut() {305						out.handle_line(&o)306					} else {307						err_handler.handle_line(&o)308					}309					// out_handler.handle_info(&o);310				}311			},312			code = child.wait() => {313				let code = code?;314				if !code.success() {315					anyhow::bail!("command '{str}' failed with status {}", code);316				}317				break;318			}319		}320	}321322	Ok(out_buf)323}324async fn run_nix_inner_raw_ssh(325	str: String,326	mut cmd: OwningCommand<Arc<Session>>,327	want_stdout: bool,328	err_handler: &mut dyn Handler,329	mut out_handler: Option<&mut dyn Handler>,330) -> Result<Option<Vec<u8>>> {331	debug!("running command {cmd:?} over ssh");332	cmd.stderr(openssh::Stdio::piped());333	cmd.stdout(openssh::Stdio::piped());334	let mut child = cmd.spawn().await?;335	let mut stderr = child.stderr().take().unwrap();336	let stdout = child.stdout().take().unwrap();337	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());338	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));339	let mut ob = want_stdout340		.then(|| out.take().unwrap())341		.unwrap_or_else(|| Box::new(EmptyAsyncRead));342	let mut ol = (!want_stdout)343		.then(|| out.take().unwrap())344		.unwrap_or_else(|| Box::new(EmptyAsyncRead));345	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());346	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());347348	// while let Some(line) = read.next().await? {}349350	let mut out_buf = if want_stdout { Some(vec![]) } else { None };351352	let mut wait_future = pin::pin!(child.wait());353	loop {354		select! {355			e = err.next() => {356				if let Some(e) = e {357					let e = e?;358					err_handler.handle_line(&e);359				}360			},361			o = ob.next() => {362				if let Some(o) = o {363					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);364				}365			},366			o = ol.next() => {367				if let Some(o) = o {368					let o = o?;369					if let Some(out) = out_handler.as_mut() {370						out.handle_line(&o)371					} else {372						err_handler.handle_line(&o)373					}374					// out_handler.handle_info(&o);375				}376			},377			code = &mut wait_future => {378				let code = code?;379				if !code.success() {380					anyhow::bail!("command '{str}' failed with status {}", code);381				}382				break;383			}384		}385	}386387	Ok(out_buf)388}