1use std::pin::pin;23use anyhow::{anyhow, bail, Result};4use camino::Utf8PathBuf;5use futures::StreamExt as _;6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};7use remowt_link_shared::BifConfig;8use serde::de::DeserializeOwned;9use tokio::io::AsyncWriteExt as _;10use tokio::select;11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{debug, info, warn};1314use crate::forwarded::{RemowtListener, RemowtStream};15use crate::{drain_to_tracing, Remowt};1617#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]18pub enum StdioMode {19 #[default]20 Null,21 Pipe,22 Inherit,23}2425#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]26pub enum StderrMode {27 #[default]28 Null,29 Pipe,30 Inherit,31 MergeWithStdout,32}3334#[derive(Default)]35pub struct SpawnOptions {36 pub program: String,37 pub args: Vec<String>,38 pub env: Vec<(String, String)>,39 pub env_clear: bool,40 pub cwd: Option<Utf8PathBuf>,41 pub escalated: bool,42 pub stdin: StdioMode,43 pub stdout: StdioMode,44 pub stderr: StderrMode,45}4647pub struct RemowtChild {48 pub stdin: Option<RemowtStream>,49 pub stdout: Option<RemowtStream>,50 pub stderr: Option<RemowtStream>,51 id: ProcId,52 client: SubprocessClient<BifConfig>,53}5455impl RemowtChild {56 pub async fn wait(self) -> Result<Option<i32>> {57 let RemowtChild {58 stdin,59 stdout,60 stderr,61 id,62 client,63 } = self;64 drop(stdin);65 let drain_out = async move {66 if let Some(s) = stdout {67 drain_to_tracing(s, "<child stdout>".to_owned(), false).await;68 }69 };70 let drain_err = async move {71 if let Some(s) = stderr {72 drain_to_tracing(s, "<child stderr>".to_owned(), true).await;73 }74 };75 let wait = async move {76 client77 .wait(id)78 .await?79 .map_err(|e| anyhow!("agent wait failed: {e}"))80 };81 let (code, _, _) = tokio::join!(wait, drain_out, drain_err);82 code83 }8485 pub async fn kill(&self, signal: i32) -> Result<()> {86 self.client87 .kill(self.id, signal)88 .await?89 .map_err(|e| anyhow!("agent kill failed: {e}"))90 }91}9293fn needs_socket(m: StdioMode) -> bool {94 matches!(m, StdioMode::Pipe | StdioMode::Inherit)95}9697fn stderr_needs_socket(m: StderrMode) -> bool {98 matches!(m, StderrMode::Pipe | StderrMode::Inherit)99}100101impl Remowt {102 pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {103 let SpawnOptions {104 program,105 args,106 env,107 env_clear,108 cwd,109 escalated,110 stdin,111 stdout,112 stderr,113 } = opts;114115 if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {116 bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");117 }118119 let stdin_bound = if needs_socket(stdin) {120 Some(self.bind_runtime_unix("proc-stdin").await?)121 } else {122 None123 };124 let stdout_bound = if needs_socket(stdout) {125 Some(self.bind_runtime_unix("proc-stdout").await?)126 } else {127 None128 };129 let stderr_bound = if stderr_needs_socket(stderr) {130 Some(self.bind_runtime_unix("proc-stderr").await?)131 } else {132 None133 };134135 let stdin_spec = match &stdin_bound {136 Some((_, p)) => StdioSpec::Socket(p.clone()),137 None => StdioSpec::Null,138 };139 let stdout_spec = match &stdout_bound {140 Some((_, p)) => StdioSpec::Socket(p.clone()),141 None => StdioSpec::Null,142 };143 let stderr_spec = match (&stderr, &stderr_bound) {144 (StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),145 (StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,146 _ => StderrSpec::Null,147 };148149 let client: SubprocessClient<BifConfig> = if escalated {150 151 Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?152 } else {153 self.endpoints()154 };155156 let spec = SpawnSpec {157 program: program.clone(),158 args,159 env,160 env_clear,161 cwd,162 stdin: stdin_spec,163 stdout: stdout_spec,164 stderr: stderr_spec,165 };166 let id = client167 .spawn(spec)168 .await?169 .map_err(|e| anyhow!("agent spawn failed: {e}"))?;170171 let (stdin_res, stdout_res, stderr_res) = tokio::join!(172 accept(stdin_bound),173 accept(stdout_bound),174 accept(stderr_bound),175 );176177 let stdin_stream = handle_stdin(stdin, stdin_res?, &program);178 let stdout_stream = handle_output(stdout, stdout_res?, &program);179 let stderr_stream = handle_output_err(stderr, stderr_res?, &program);180181 Ok(RemowtChild {182 stdin: stdin_stream,183 stdout: stdout_stream,184 stderr: stderr_stream,185 id,186 client,187 })188 }189190 pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {191 let program = program.as_ref().to_owned();192 RemowtCommand {193 program,194 args: vec![],195 env: vec![],196 remowt: self.clone(),197 escalated: false,198 }199 }200}201202async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {203 match b {204 Some((l, _)) => Ok(Some(l.accept().await?)),205 None => Ok(None),206 }207}208209fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {210 match mode {211 StdioMode::Pipe => s,212 StdioMode::Inherit => {213 if let Some(s) = s {214 let program = program.to_owned();215 tokio::spawn(async move {216 let mut stdin = tokio::io::stdin();217 let mut s = s;218 if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {219 warn!(program, "stdin forward ended: {e}");220 }221 let _ = s.shutdown().await;222 });223 }224 None225 }226 StdioMode::Null => None,227 }228}229230fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {231 match mode {232 StdioMode::Pipe => s,233 StdioMode::Inherit => {234 if let Some(s) = s {235 let program = program.to_owned();236 tokio::spawn(drain_to_tracing(s, program, false));237 }238 None239 }240 StdioMode::Null => None,241 }242}243244fn handle_output_err(245 mode: StderrMode,246 s: Option<RemowtStream>,247 program: &str,248) -> Option<RemowtStream> {249 match mode {250 StderrMode::Pipe => s,251 StderrMode::Inherit => {252 if let Some(s) = s {253 let program = program.to_owned();254 tokio::spawn(drain_to_tracing(s, program, true));255 }256 None257 }258 StderrMode::MergeWithStdout | StderrMode::Null => None,259 }260}261262fn escape_bash(input: &str, out: &mut String) {263 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";264 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {265 out.push_str(input);266 return;267 }268 out.push('\'');269 for (i, v) in input.split('\'').enumerate() {270 if i != 0 {271 out.push_str("'\"'\"'");272 }273 out.push_str(v);274 }275 out.push('\'');276}277278#[derive(Clone)]279pub struct RemowtCommand {280 program: String,281 args: Vec<String>,282 env: Vec<(String, String)>,283 remowt: Remowt,284 escalated: bool,285}286287impl RemowtCommand {288 pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {289 self.args.push(arg.as_ref().to_owned());290 self291 }292 pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {293 for arg in args {294 self.args.push(arg.as_ref().to_owned());295 }296 self297 }298 pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {299 self.args300 .push(format!("{}={}", key.as_ref(), value.as_ref()));301 self302 }303 pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {304 self.args.push(key.as_ref().to_owned());305 self.args.push(value.as_ref().to_owned());306 self307 }308 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {309 self.env310 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));311 self312 }313314 pub fn sudo(mut self) -> Self {315 self.escalated = true;316 self317 }318319 320 fn shell_line(&self) -> String {321 let mut out = String::new();322 if self.escalated {323 out.push_str("run0 ");324 }325 if !self.env.is_empty() {326 out.push_str("env");327 for (k, v) in &self.env {328 out.push(' ');329 assert!(!k.contains('='));330 escape_bash(k, &mut out);331 out.push('=');332 escape_bash(v, &mut out);333 }334 out.push(' ');335 }336 escape_bash(&self.program, &mut out);337 for arg in &self.args {338 out.push(' ');339 escape_bash(arg, &mut out);340 }341 out342 }343344 fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {345 let line = self.shell_line();346 let opts = SpawnOptions {347 program: self.program,348 args: self.args,349 env: self.env,350 env_clear: false,351 cwd: None,352 escalated: self.escalated,353 stdin: StdioMode::Null,354 stdout: StdioMode::Pipe,355 stderr: StderrMode::Pipe,356 };357 (self.remowt, opts, line)358 }359360 pub async fn run(self) -> Result<()> {361 run_inner(self, false).await.map(|_| ())362 }363 pub async fn run_string(self) -> Result<String> {364 let bytes = run_inner(self, true).await?.expect("want_stdout");365 Ok(String::from_utf8(bytes)?)366 }367 pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {368 let s = self.run_string().await?;369 Ok(serde_json::from_str(&s)?)370 }371}372373async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {374 let (remowt, opts, line) = cmd.into_spawn_options();375 debug!("running command {line:?} over remowt");376 let program = opts.program.clone();377 let mut child = remowt.spawn(opts).await?;378 let stderr = child.stderr.take().expect("stderr=Pipe");379 let stdout = child.stdout.take().expect("stdout=Pipe");380381 let mut err = FramedRead::new(stderr, LinesCodec::new());382 let (mut out_bytes, mut out_lines) = if want_stdout {383 (Some(FramedRead::new(stdout, BytesCodec::new())), None)384 } else {385 (None, Some(FramedRead::new(stdout, LinesCodec::new())))386 };387388 let mut buf = if want_stdout { Some(Vec::new()) } else { None };389390 let mut wait = pin!(child.wait());391 let exit = loop {392 select! {393 biased;394395 Some(e) = err.next() => {396 let e = e?;397 warn!(program = %program, "{e}");398 }399 Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {400 buf.as_mut().expect("want_stdout").extend_from_slice(&o?);401 }402 Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {403 let o = o?;404 info!(program = %program, "{o}");405 }406 res = &mut wait => {407 break res?;408 }409 }410 };411412 while let Some(e) = err.next().await {413 if let Ok(line) = e {414 warn!(program = %program, "{line}");415 }416 }417 if want_stdout {418 if let Some(out_bytes) = out_bytes.as_mut() {419 while let Some(o) = out_bytes.next().await {420 if let Ok(chunk) = o {421 buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);422 }423 }424 }425 } else if let Some(out_lines) = out_lines.as_mut() {426 while let Some(o) = out_lines.next().await {427 if let Ok(line) = o {428 info!(program = %program, "{line}");429 }430 }431 }432433 match exit {434 Some(0) => Ok(buf),435 Some(c) => bail!("command '{line}' failed with status {c}"),436 None => Err(anyhow!("command '{line}' ended without an exit status")),437 }438}