1use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};23use anyhow::{anyhow, Result};4use better_command::{Handler, NixHandler, PlainHandler};5use futures::StreamExt;6use itertools::Either;7use openssh::{OverSsh, OwningCommand, Session};8use tokio::{io::AsyncRead, process::Command, select};9use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};10use tracing::debug;1112use crate::host::EscalationStrategy;1314fn escape_bash(input: &str, out: &mut String) {15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17 out.push_str(input);18 return;19 }20 out.push('\'');21 for (i, v) in input.split('\'').enumerate() {22 if i != 0 {23 out.push_str("'\"'\"'");24 }25 out.push_str(v);26 }27 out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30 os.as_ref().to_str().expect("non-utf8 data").to_owned()31}3233#[derive(Clone, Debug)]34pub struct MyCommand {35 command: String,36 args: Vec<String>,37 env: Vec<(String, String)>,38 ssh_session: Option<Arc<Session>>,39 escalation: EscalationStrategy,40 escalate: bool,41}42impl MyCommand {43 pub fn new_on(44 escalation: EscalationStrategy,45 cmd: impl AsRef<OsStr>,46 session: Arc<Session>,47 ) -> Self {48 assert!(!cmd.as_ref().is_empty());49 Self {50 command: ostoutf8(cmd),51 args: vec![],52 env: vec![],53 ssh_session: Some(session),54 escalation,55 escalate: false,56 }57 }58 pub fn new(escalation: EscalationStrategy, cmd: impl AsRef<OsStr>) -> Self {59 assert!(!cmd.as_ref().is_empty());60 Self {61 command: ostoutf8(cmd),62 args: vec![],63 env: vec![],64 ssh_session: None,65 escalation,66 escalate: false,67 }68 }69 fn new_here(&self, cmd: impl AsRef<OsStr>) -> Self {70 if let Some(ssh_session) = self.ssh_session.clone() {71 Self::new_on(self.escalation, cmd, ssh_session)72 } else {73 Self::new(self.escalation, cmd)74 }75 }7677 fn into_args(self) -> Vec<String> {78 let mut out = Vec::new();79 if !self.env.is_empty() {80 out.push("env".to_owned());81 for (k, v) in self.env {82 assert!(!k.contains('='));83 out.push(format!("{k}={v}"));84 }85 }86 out.push(self.command);87 out.extend(self.args);88 out89 }9091 92 93 94 95 96 fn translate_env_into_env(self) -> Self {97 if self.env.is_empty() {98 return self;99 }100 let mut out = self.new_here("env");101 for (k, v) in self.env {102 assert!(!k.contains('='));103 out.arg(format!("{k}={v}"));104 }105 out.arg(self.command);106 out.args(self.args);107108 out109 }110 fn into_string(self) -> String {111 let mut out = String::new();112 if !self.env.is_empty() {113 out.push_str("env");114 for (k, v) in self.env {115 out.push(' ');116 assert!(!k.contains('='));117 escape_bash(&k, &mut out);118 out.push('=');119 escape_bash(&v, &mut out);120 }121 }122 if !out.is_empty() {123 out.push(' ');124 }125 escape_bash(&self.command, &mut out);126 for arg in self.args {127 out.push(' ');128 escape_bash(&arg, &mut out);129 }130 out131 }132 fn into_command(self) -> Command {133 let mut out = Command::new(self.command);134 out.args(self.args);135 for (k, v) in self.env {136 out.env(k, v);137 }138 out139 }140 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {141 Ok(if let Some(session) = self.ssh_session.clone() {142 let cmd = self.translate_env_into_env().into_command();143 Either::Right(144 cmd.over_ssh(session)145 .map_err(|e| anyhow!("ssh error: {e}"))?,146 )147 } else {148 let cmd = self.into_command();149 Either::Left(cmd)150 })151 }152 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {153 let arg = arg.as_ref();154 self.args.push(ostoutf8(arg));155 self156 }157 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158 let arg = arg.as_ref();159 let value = value.as_ref();160 let arg = ostoutf8(arg);161 let value = ostoutf8(value);162 self.arg(format!("{arg}={value}"));163 self164 }165 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {166 self.arg(arg);167 self.arg(value);168 self169 }170 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {171 self.env172 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));173 self174 }175 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {176 for arg in args.into_iter() {177 let arg = arg.as_ref();178 self.args.push(ostoutf8(arg));179 }180 self181 }182 pub fn sudo(mut self) -> Self {183 self.escalate = true;184 self185 }186 fn wrap_sudo_if_needed(self) -> Self {187 if !self.escalate {188 return self;189 }190 match self.escalation {191 EscalationStrategy::Su => {192 let mut out = self.new_here("su");193 out.arg("-c").arg(self.into_string());194 out195 }196 EscalationStrategy::Sudo => {197 let mut out = self.new_here("sudo");198 out.args(self.into_args());199 out200 }201 EscalationStrategy::Run0 => {202 203 let mut run0 = self.new_here("run0");204 let mut out = self.new_here("script");205206 207 run0.arg("--background=");208 run0.args(self.into_args());209210 out.arg("-q");211 out.arg("/dev/null");212 out.arg("-c");213 out.arg(run0.into_string());214 dbg!(&out);215 out216 }217 }218 }219220 pub async fn run(self) -> Result<()> {221 let str = self.clone().into_string();222 let cmd = self.wrap_sudo_if_needed().into_command_new()?;223 match cmd {224 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,225 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,226 };227 Ok(())228 }229 pub async fn run_string(self) -> Result<String> {230 let bytes = self.run_bytes().await?;231 Ok(String::from_utf8(bytes)?)232 }233 pub async fn run_bytes(self) -> Result<Vec<u8>> {234 let str = self.clone().into_string();235 let cmd = self.wrap_sudo_if_needed().into_command_new()?;236 let v = match cmd {237 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,238 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,239 };240 Ok(v)241 }242243 pub async fn run_nix_string(mut self) -> Result<String> {244 let str = self.clone().into_string();245 self.arg("--log-format").arg("internal-json");246 let cmd = self.wrap_sudo_if_needed().into_command();247 let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;248 Ok(String::from_utf8(bytes)?)249 }250 pub async fn run_nix(mut self) -> Result<()> {251 let str = self.clone().into_string();252 self.arg("--log-format").arg("internal-json");253 let mut cmd = self.wrap_sudo_if_needed().into_command();254 cmd.stdout(Stdio::inherit());255 run_nix_inner(str, cmd, &mut NixHandler::default()).await256 }257}258259struct EmptyAsyncRead;260impl AsyncRead for EmptyAsyncRead {261 fn poll_read(262 self: std::pin::Pin<&mut Self>,263 _cx: &mut std::task::Context<'_>,264 _buf: &mut tokio::io::ReadBuf<'_>,265 ) -> Poll<std::io::Result<()>> {266 Poll::Pending267 }268}269270async fn run_nix_inner_stdout(271 str: String,272 cmd: Command,273 handler: &mut dyn Handler,274) -> Result<Vec<u8>> {275 Ok(run_nix_inner_raw(str, cmd, true, handler, None)276 .await?277 .expect("has out"))278}279async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {280 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;281 assert!(v.is_none());282 Ok(())283}284async fn run_nix_inner_stdout_ssh(285 str: String,286 cmd: OwningCommand<Arc<Session>>,287 handler: &mut dyn Handler,288) -> Result<Vec<u8>> {289 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)290 .await?291 .expect("has out"))292}293async fn run_nix_inner_ssh(294 str: String,295 cmd: OwningCommand<Arc<Session>>,296 handler: &mut dyn Handler,297) -> Result<()> {298 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;299 assert!(v.is_none());300 Ok(())301}302303async fn run_nix_inner_raw(304 str: String,305 mut cmd: Command,306 want_stdout: bool,307 err_handler: &mut dyn Handler,308 mut out_handler: Option<&mut dyn Handler>,309) -> Result<Option<Vec<u8>>> {310 cmd.stderr(Stdio::piped());311 cmd.stdout(Stdio::piped());312 debug!("running command {str:?} on local");313 let mut child = cmd.spawn()?;314 let mut stderr = child.stderr.take().unwrap();315 let stdout = child.stdout.take().unwrap();316 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());317 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));318 let mut ob = want_stdout319 .then(|| out.take().unwrap())320 .unwrap_or_else(|| Box::new(EmptyAsyncRead));321 let mut ol = (!want_stdout)322 .then(|| out.take().unwrap())323 .unwrap_or_else(|| Box::new(EmptyAsyncRead));324 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());325 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());326327 328329 let mut out_buf = if want_stdout { Some(vec![]) } else { None };330 loop {331 select! {332 e = err.next() => {333 if let Some(e) = e {334 let e = e?;335 err_handler.handle_line(&e);336 }337 },338 o = ob.next() => {339 if let Some(o) = o {340 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);341 }342 },343 o = ol.next() => {344 if let Some(o) = o {345 let o = o?;346 if let Some(out) = out_handler.as_mut() {347 out.handle_line(&o)348 } else {349 err_handler.handle_line(&o)350 }351 352 }353 },354 code = child.wait() => {355 let code = code?;356 if !code.success() {357 anyhow::bail!("command '{str}' failed with status {}", code);358 }359 break;360 }361 }362 }363364 Ok(out_buf)365}366async fn run_nix_inner_raw_ssh(367 str: String,368 mut cmd: OwningCommand<Arc<Session>>,369 want_stdout: bool,370 err_handler: &mut dyn Handler,371 mut out_handler: Option<&mut dyn Handler>,372) -> Result<Option<Vec<u8>>> {373 debug!("running command {str:?} over ssh");374 cmd.stderr(openssh::Stdio::piped());375 cmd.stdout(openssh::Stdio::piped());376 let mut child = cmd.spawn().await?;377 let mut stderr = child.stderr().take().unwrap();378 let stdout = child.stdout().take().unwrap();379 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());380 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));381 let mut ob = want_stdout382 .then(|| out.take().unwrap())383 .unwrap_or_else(|| Box::new(EmptyAsyncRead));384 let mut ol = (!want_stdout)385 .then(|| out.take().unwrap())386 .unwrap_or_else(|| Box::new(EmptyAsyncRead));387 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());388 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());389390 391392 let mut out_buf = if want_stdout { Some(vec![]) } else { None };393394 let mut wait_future = pin::pin!(child.wait());395 loop {396 select! {397 e = err.next() => {398 if let Some(e) = e {399 let e = e?;400 err_handler.handle_line(&e);401 }402 },403 o = ob.next() => {404 if let Some(o) = o {405 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);406 }407 },408 o = ol.next() => {409 if let Some(o) = o {410 let o = o?;411 if let Some(out) = out_handler.as_mut() {412 out.handle_line(&o)413 } else {414 err_handler.handle_line(&o)415 }416 417 }418 },419 code = &mut wait_future => {420 let code = code?;421 if !code.success() {422 anyhow::bail!("command '{str}' failed with status {}", code);423 }424 break;425 }426 }427 }428429 Ok(out_buf)430}