1use std::thread::sleep;2use std::time::Duration;3use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};45use anyhow::{anyhow, Result};6use better_command::{Handler, NixHandler, PlainHandler};7use futures::StreamExt;8use itertools::Either;9use openssh::{OverSsh, OwningCommand, Session};10use tokio::{io::AsyncRead, process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, debug};1314fn escape_bash(input: &str, out: &mut String) {15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17 out.push_str(input);18 return;19 }20 out.push('\'');21 for (i, v) in input.split('\'').enumerate() {22 if i != 0 {23 out.push_str("'\"'\"'");24 }25 out.push_str(v);26 }27 out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30 os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34 command: String,35 args: Vec<String>,36 env: Vec<(String, String)>,37 ssh_session: Option<Arc<Session>>,38}39impl MyCommand {40 pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {41 assert!(!cmd.as_ref().is_empty());42 Self {43 command: ostoutf8(cmd),44 args: vec![],45 env: vec![],46 ssh_session: Some(session),47 }48 }49 pub fn new(cmd: impl AsRef<OsStr>) -> Self {50 assert!(!cmd.as_ref().is_empty());51 Self {52 command: ostoutf8(cmd),53 args: vec![],54 env: vec![],55 ssh_session: None,56 }57 }58 fn into_args(self) -> Vec<String> {59 let mut out = Vec::new();60 if !self.env.is_empty() {61 out.push("env".to_owned());62 for (k, v) in self.env {63 assert!(!k.contains('='));64 out.push(format!("{k}={v}"));65 }66 }67 out.push(self.command);68 out.extend(self.args);69 out70 }7172 73 74 75 76 77 fn translate_env_into_env(self) -> Self {78 if self.env.is_empty() {79 return self;80 }81 let mut out = Self::new("env");82 out.ssh_session = self.ssh_session;83 for (k, v) in self.env {84 assert!(!k.contains('='));85 out.arg(format!("{k}={v}"));86 }87 out.arg(self.command);88 out.args(self.args);8990 out91 }92 fn into_string(self) -> String {93 let mut out = String::new();94 if !self.env.is_empty() {95 out.push_str("env");96 for (k, v) in self.env {97 out.push(' ');98 assert!(!k.contains('='));99 escape_bash(&k, &mut out);100 out.push('=');101 escape_bash(&v, &mut out);102 }103 }104 if !out.is_empty() {105 out.push(' ');106 }107 escape_bash(&self.command, &mut out);108 for arg in self.args {109 out.push(' ');110 escape_bash(&arg, &mut out);111 }112 out113 }114 fn into_command(self) -> Command {115 let mut out = Command::new(self.command);116 out.args(self.args);117 for (k, v) in self.env {118 out.env(k, v);119 }120 out121 }122 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {123 Ok(if let Some(session) = self.ssh_session.clone() {124 let cmd = self.translate_env_into_env().into_command();125 Either::Right(126 cmd.over_ssh(session)127 .map_err(|e| anyhow!("ssh error: {e}"))?,128 )129 } else {130 let cmd = self.into_command();131 Either::Left(cmd)132 })133 }134 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {135 let arg = arg.as_ref();136 self.args.push(ostoutf8(arg));137 self138 }139 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {140 let arg = arg.as_ref();141 let value = value.as_ref();142 let arg = ostoutf8(arg);143 let value = ostoutf8(value);144 self.arg(format!("{arg}={value}"));145 self146 }147 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {148 self.arg(arg);149 self.arg(value);150 self151 }152 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {153 self.env154 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));155 self156 }157 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {158 for arg in args.into_iter() {159 let arg = arg.as_ref();160 self.args.push(ostoutf8(arg));161 }162 self163 }164 pub fn sudo(mut self) -> Self {165 if std::env::var_os("NO_SUDO").is_some() {166 let mut out = Self::new("su");167 out.ssh_session = self.ssh_session.take();168 out.arg("-c").arg(self.into_string());169 out170 } else {171 let mut out = Self::new("sudo");172 out.ssh_session = self.ssh_session.take();173 out.args(self.into_args());174 out175 }176 }177178 pub async fn run(self) -> Result<()> {179 let str = self.clone().into_string();180 let cmd = self.into_command_new()?;181 match cmd {182 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,183 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,184 };185 Ok(())186 }187 pub async fn run_string(self) -> Result<String> {188 let bytes = self.run_bytes().await?;189 Ok(String::from_utf8(bytes)?)190 }191 pub async fn run_bytes(self) -> Result<Vec<u8>> {192 let str = self.clone().into_string();193 let cmd = self.into_command_new()?;194 let v = match cmd {195 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,196 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,197 };198 Ok(v)199 }200201 pub async fn run_nix_string(self) -> Result<String> {202 let str = self.clone().into_string();203 let mut cmd = self.into_command();204 cmd.arg("--log-format").arg("internal-json");205 let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;206 Ok(String::from_utf8(bytes)?)207 }208 pub async fn run_nix(self) -> Result<()> {209 let str = self.clone().into_string();210 let mut cmd = self.into_command();211 cmd.arg("--log-format").arg("internal-json");212 cmd.stdout(Stdio::inherit());213 run_nix_inner(str, cmd, &mut NixHandler::default()).await214 }215}216217struct EmptyAsyncRead;218impl AsyncRead for EmptyAsyncRead {219 fn poll_read(220 self: std::pin::Pin<&mut Self>,221 _cx: &mut std::task::Context<'_>,222 _buf: &mut tokio::io::ReadBuf<'_>,223 ) -> Poll<std::io::Result<()>> {224 Poll::Pending225 }226}227228async fn run_nix_inner_stdout(229 str: String,230 cmd: Command,231 handler: &mut dyn Handler,232) -> Result<Vec<u8>> {233 Ok(run_nix_inner_raw(str, cmd, true, handler, None)234 .await?235 .expect("has out"))236}237async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {238 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;239 assert!(v.is_none());240 Ok(())241}242async fn run_nix_inner_stdout_ssh(243 str: String,244 cmd: OwningCommand<Arc<Session>>,245 handler: &mut dyn Handler,246) -> Result<Vec<u8>> {247 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)248 .await?249 .expect("has out"))250}251async fn run_nix_inner_ssh(252 str: String,253 cmd: OwningCommand<Arc<Session>>,254 handler: &mut dyn Handler,255) -> Result<()> {256 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;257 assert!(v.is_none());258 Ok(())259}260261async fn run_nix_inner_raw(262 str: String,263 mut cmd: Command,264 want_stdout: bool,265 err_handler: &mut dyn Handler,266 mut out_handler: Option<&mut dyn Handler>,267) -> Result<Option<Vec<u8>>> {268 cmd.stderr(Stdio::piped());269 cmd.stdout(Stdio::piped());270 debug!("running command {cmd:?} on local");271 let mut child = cmd.spawn()?;272 let mut stderr = child.stderr.take().unwrap();273 let stdout = child.stdout.take().unwrap();274 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());275 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));276 let mut ob = want_stdout277 .then(|| out.take().unwrap())278 .unwrap_or_else(|| Box::new(EmptyAsyncRead));279 let mut ol = (!want_stdout)280 .then(|| out.take().unwrap())281 .unwrap_or_else(|| Box::new(EmptyAsyncRead));282 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());283 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());284285 286287 let mut out_buf = if want_stdout { Some(vec![]) } else { None };288 loop {289 select! {290 e = err.next() => {291 if let Some(e) = e {292 let e = e?;293 err_handler.handle_line(&e);294 }295 },296 o = ob.next() => {297 if let Some(o) = o {298 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);299 }300 },301 o = ol.next() => {302 if let Some(o) = o {303 let o = o?;304 if let Some(out) = out_handler.as_mut() {305 out.handle_line(&o)306 } else {307 err_handler.handle_line(&o)308 }309 310 }311 },312 code = child.wait() => {313 let code = code?;314 if !code.success() {315 anyhow::bail!("command '{str}' failed with status {}", code);316 }317 break;318 }319 }320 }321322 Ok(out_buf)323}324async fn run_nix_inner_raw_ssh(325 str: String,326 mut cmd: OwningCommand<Arc<Session>>,327 want_stdout: bool,328 err_handler: &mut dyn Handler,329 mut out_handler: Option<&mut dyn Handler>,330) -> Result<Option<Vec<u8>>> {331 debug!("running command {cmd:?} over ssh");332 cmd.stderr(openssh::Stdio::piped());333 cmd.stdout(openssh::Stdio::piped());334 let mut child = cmd.spawn().await?;335 let mut stderr = child.stderr().take().unwrap();336 let stdout = child.stdout().take().unwrap();337 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());338 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));339 let mut ob = want_stdout340 .then(|| out.take().unwrap())341 .unwrap_or_else(|| Box::new(EmptyAsyncRead));342 let mut ol = (!want_stdout)343 .then(|| out.take().unwrap())344 .unwrap_or_else(|| Box::new(EmptyAsyncRead));345 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());346 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());347348 349350 let mut out_buf = if want_stdout { Some(vec![]) } else { None };351352 let mut wait_future = pin::pin!(child.wait());353 loop {354 select! {355 e = err.next() => {356 if let Some(e) = e {357 let e = e?;358 err_handler.handle_line(&e);359 }360 },361 o = ob.next() => {362 if let Some(o) = o {363 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);364 }365 },366 o = ol.next() => {367 if let Some(o) = o {368 let o = o?;369 if let Some(out) = out_handler.as_mut() {370 out.handle_line(&o)371 } else {372 err_handler.handle_line(&o)373 }374 375 }376 },377 code = &mut wait_future => {378 let code = code?;379 if !code.success() {380 anyhow::bail!("command '{str}' failed with status {}", code);381 }382 break;383 }384 }385 }386387 Ok(out_buf)388}