difftreelog
feat drain stdout/stderr after signal
in: trunk
6 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,9 +308,9 @@
[[package]]
name = "bifrostlink"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad2d0e30a2aa432b78f41f9676572f88201d4dc73bc2b7bc90704d2e02b7d062"
+checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39"
dependencies = [
"async-trait",
"async_fn_traits",
@@ -327,9 +327,9 @@
[[package]]
name = "bifrostlink-macros"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2121559c45cbe89c4f8d1d741360d5b028577254f6beca053dc02332da85b43"
+checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0"
dependencies = [
"proc-macro2",
"quote",
@@ -338,9 +338,9 @@
[[package]]
name = "bifrostlink-ports"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e9395c4ccca497b0c50583e6de57aca921c046ae0c10f56030cd2c5a20db05f8"
+checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70"
dependencies = [
"bifrostlink",
"bytes",
@@ -1835,7 +1835,7 @@
[[package]]
name = "polkit-backend"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"clap",
@@ -2055,7 +2055,7 @@
[[package]]
name = "remowt-agent"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2083,7 +2083,7 @@
[[package]]
name = "remowt-client"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2106,7 +2106,7 @@
[[package]]
name = "remowt-endpoints"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2124,7 +2124,7 @@
[[package]]
name = "remowt-link-shared"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"bifrostlink",
"bytes",
@@ -2138,7 +2138,7 @@
[[package]]
name = "remowt-plugin"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2152,7 +2152,7 @@
[[package]]
name = "remowt-polkit-shared"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"nix",
"serde",
@@ -2161,7 +2161,7 @@
[[package]]
name = "remowt-ssh"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"async-trait",
@@ -2189,7 +2189,7 @@
[[package]]
name = "remowt-ui-prompt"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,18 +3,18 @@
resolver = "2"
[workspace.package]
-version = "0.1.4"
+version = "0.1.6"
license = "MIT"
edition = "2021"
repository = "https://git.delta.rocks/r/remowt"
[workspace.dependencies]
-remowt-client = { version = "0.1.3", path = "crates/remowt-client" }
-remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }
-remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }
-remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }
-remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }
+remowt-client = { version = "0.1.6", path = "crates/remowt-client" }
+remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" }
+remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" }
+remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" }
bifrostlink = "0.2.0"
bifrostlink-macros = "0.2.0"
@@ -52,7 +52,9 @@
thiserror = "2.0.18"
[profile.release]
-panic = "abort"
+panic = "unwind"
opt-level = "z"
lto = true
codegen-units = 1
+debug = "full"
+split-debuginfo = "off"
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -2,6 +2,7 @@
use std::collections::{BTreeMap, HashMap};
use std::fs::Permissions;
use std::future::pending;
+use std::io;
use std::os::unix::fs::PermissionsExt as _;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, OnceLock};
@@ -230,7 +231,7 @@
fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
- .with_writer(std::io::stderr)
+ .with_writer(io::stderr)
.without_time()
.init();
let opts = Opts::parse();
crates/remowt-client/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -17,13 +17,15 @@
use russh::keys::ssh_key::PublicKey;
use russh::Channel;
use tempfile::TempDir;
+use tokio::io::AsyncRead;
use tokio::net::UnixListener;
use tokio::sync::oneshot;
+use tokio::task::JoinHandle;
use tokio::{
fs,
- io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},
+ io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},
};
-use tracing::{debug, warn};
+use tracing::{debug, info, warn};
use uuid::Uuid;
pub mod editor;
@@ -97,7 +99,7 @@
let mut child = SshExecChild::from_exec(ch);
drop(child.stdin);
- drain_stderr(child.stderr, cmd.to_owned());
+ drain_to_tracing(child.stderr, cmd.to_owned(), true);
let mut out = Vec::new();
child.stdout.read_to_end(&mut out).await?;
@@ -317,7 +319,7 @@
.await?;
let child = SshExecChild::from_exec(cmd_chan);
- drain_stderr(child.stderr, "agent".to_owned());
+ drain_to_tracing(child.stderr, "agent".to_owned(), true);
rpc.add_direct(
Address::Agent,
child_port(child.stdout, child.stdin),
@@ -518,20 +520,33 @@
}
}
-fn drain_stderr(stream: DuplexStream, context: String) {
+pub(crate) fn drain_to_tracing(
+ stream: impl AsyncRead + Unpin + 'static + Send,
+ context: String,
+ stderr: bool,
+) -> JoinHandle<()> {
tokio::spawn(async move {
- let mut reader = BufReader::new(stream).lines();
+ let mut reader = BufReader::new(stream);
+ let mut buf = Vec::with_capacity(4096);
loop {
- match reader.next_line().await {
- Ok(Some(line)) => warn!(context = %context, "{line}"),
- Ok(None) => break,
+ buf.clear();
+ match reader.read_until(b'\n', &mut buf).await {
+ Ok(0) => break,
+ Ok(_) => {
+ let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));
+ if stderr {
+ warn!(context = %context, "{line}");
+ } else {
+ info!(context = %context, "{line}");
+ }
+ }
Err(e) => {
- warn!(context = %context, "stderr read failed: {e}");
+ warn!(context = %context, "child stdio read failed: {e}");
break;
}
}
}
- });
+ })
}
fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
crates/remowt-client/src/subprocess.rsdiffbeforeafterboth1use std::pin::pin;23use anyhow::{anyhow, bail, Result};4use camino::Utf8PathBuf;5use futures::StreamExt as _;6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};7use remowt_link_shared::BifConfig;8use serde::de::DeserializeOwned;9use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};10use tokio::select;11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{debug, info, warn};1314use crate::forwarded::{RemowtListener, RemowtStream};15use crate::Remowt;1617#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]18pub enum StdioMode {19 #[default]20 Null,21 Pipe,22 Inherit,23}2425#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]26pub enum StderrMode {27 #[default]28 Null,29 Pipe,30 Inherit,31 MergeWithStdout,32}3334#[derive(Default)]35pub struct SpawnOptions {36 pub program: String,37 pub args: Vec<String>,38 pub env: Vec<(String, String)>,39 pub env_clear: bool,40 pub cwd: Option<Utf8PathBuf>,41 pub escalated: bool,42 pub stdin: StdioMode,43 pub stdout: StdioMode,44 pub stderr: StderrMode,45}4647pub struct RemowtChild {48 pub stdin: Option<RemowtStream>,49 pub stdout: Option<RemowtStream>,50 pub stderr: Option<RemowtStream>,51 id: ProcId,52 client: SubprocessClient<BifConfig>,53}5455impl RemowtChild {56 pub async fn wait(self) -> Result<Option<i32>> {57 let RemowtChild {58 stdin,59 stdout,60 stderr,61 id,62 client,63 } = self;64 drop(stdin);65 drop(stdout);66 drop(stderr);67 client68 .wait(id)69 .await?70 .map_err(|e| anyhow!("agent wait failed: {e}"))71 }7273 pub async fn kill(&self, signal: i32) -> Result<()> {74 self.client75 .kill(self.id, signal)76 .await?77 .map_err(|e| anyhow!("agent kill failed: {e}"))78 }79}8081fn needs_socket(m: StdioMode) -> bool {82 matches!(m, StdioMode::Pipe | StdioMode::Inherit)83}8485fn stderr_needs_socket(m: StderrMode) -> bool {86 matches!(m, StderrMode::Pipe | StderrMode::Inherit)87}8889impl Remowt {90 pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {91 let SpawnOptions {92 program,93 args,94 env,95 env_clear,96 cwd,97 escalated,98 stdin,99 stdout,100 stderr,101 } = opts;102103 if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {104 bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");105 }106107 let stdin_bound = if needs_socket(stdin) {108 Some(self.bind_runtime_unix("proc-stdin").await?)109 } else {110 None111 };112 let stdout_bound = if needs_socket(stdout) {113 Some(self.bind_runtime_unix("proc-stdout").await?)114 } else {115 None116 };117 let stderr_bound = if stderr_needs_socket(stderr) {118 Some(self.bind_runtime_unix("proc-stderr").await?)119 } else {120 None121 };122123 let stdin_spec = match &stdin_bound {124 Some((_, p)) => StdioSpec::Socket(p.clone()),125 None => StdioSpec::Null,126 };127 let stdout_spec = match &stdout_bound {128 Some((_, p)) => StdioSpec::Socket(p.clone()),129 None => StdioSpec::Null,130 };131 let stderr_spec = match (&stderr, &stderr_bound) {132 (StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),133 (StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,134 _ => StderrSpec::Null,135 };136137 let client: SubprocessClient<BifConfig> = if escalated {138 // Boxed to break the async-fn type cycle139 Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?140 } else {141 self.endpoints()142 };143144 let spec = SpawnSpec {145 program: program.clone(),146 args,147 env,148 env_clear,149 cwd,150 stdin: stdin_spec,151 stdout: stdout_spec,152 stderr: stderr_spec,153 };154 let id = client155 .spawn(spec)156 .await?157 .map_err(|e| anyhow!("agent spawn failed: {e}"))?;158159 let (stdin_res, stdout_res, stderr_res) = tokio::join!(160 accept(stdin_bound),161 accept(stdout_bound),162 accept(stderr_bound),163 );164165 let stdin_stream = handle_stdin(stdin, stdin_res?, &program);166 let stdout_stream = handle_output(stdout, stdout_res?, &program, false);167 let stderr_stream = handle_output_err(stderr, stderr_res?, &program);168169 Ok(RemowtChild {170 stdin: stdin_stream,171 stdout: stdout_stream,172 stderr: stderr_stream,173 id,174 client,175 })176 }177178 pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {179 let program = program.as_ref().to_owned();180 RemowtCommand {181 program,182 args: vec![],183 env: vec![],184 remowt: self.clone(),185 escalated: false,186 }187 }188}189190async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {191 match b {192 Some((l, _)) => Ok(Some(l.accept().await?)),193 None => Ok(None),194 }195}196197fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {198 match mode {199 StdioMode::Pipe => s,200 StdioMode::Inherit => {201 if let Some(s) = s {202 let program = program.to_owned();203 tokio::spawn(async move {204 let mut stdin = tokio::io::stdin();205 let mut s = s;206 if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {207 warn!(program, "stdin forward ended: {e}");208 }209 let _ = s.shutdown().await;210 });211 }212 None213 }214 StdioMode::Null => None,215 }216}217218fn handle_output(219 mode: StdioMode,220 s: Option<RemowtStream>,221 program: &str,222 is_stderr: bool,223) -> Option<RemowtStream> {224 match mode {225 StdioMode::Pipe => s,226 StdioMode::Inherit => {227 if let Some(s) = s {228 let program = program.to_owned();229 tokio::spawn(pump_to_tracing(s, program, is_stderr));230 }231 None232 }233 StdioMode::Null => None,234 }235}236237fn handle_output_err(238 mode: StderrMode,239 s: Option<RemowtStream>,240 program: &str,241) -> Option<RemowtStream> {242 match mode {243 StderrMode::Pipe => s,244 StderrMode::Inherit => {245 if let Some(s) = s {246 let program = program.to_owned();247 tokio::spawn(pump_to_tracing(s, program, true));248 }249 None250 }251 StderrMode::MergeWithStdout | StderrMode::Null => None,252 }253}254255async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {256 let mut reader = BufReader::new(stream).lines();257 loop {258 match reader.next_line().await {259 Ok(Some(line)) => {260 if is_stderr {261 warn!(program, "{line}");262 } else {263 info!(program, "{line}");264 }265 }266 Ok(None) => break,267 Err(e) => {268 warn!(program, "child log stream error: {e}");269 break;270 }271 }272 }273}274275fn escape_bash(input: &str, out: &mut String) {276 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";277 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {278 out.push_str(input);279 return;280 }281 out.push('\'');282 for (i, v) in input.split('\'').enumerate() {283 if i != 0 {284 out.push_str("'\"'\"'");285 }286 out.push_str(v);287 }288 out.push('\'');289}290291#[derive(Clone)]292pub struct RemowtCommand {293 program: String,294 args: Vec<String>,295 env: Vec<(String, String)>,296 remowt: Remowt,297 escalated: bool,298}299300impl RemowtCommand {301 pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {302 self.args.push(arg.as_ref().to_owned());303 self304 }305 pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {306 for arg in args {307 self.args.push(arg.as_ref().to_owned());308 }309 self310 }311 pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {312 self.args313 .push(format!("{}={}", key.as_ref(), value.as_ref()));314 self315 }316 pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {317 self.args.push(key.as_ref().to_owned());318 self.args.push(value.as_ref().to_owned());319 self320 }321 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {322 self.env323 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));324 self325 }326327 pub fn sudo(mut self) -> Self {328 self.escalated = true;329 self330 }331332 /// Only for display.333 fn shell_line(&self) -> String {334 let mut out = String::new();335 if self.escalated {336 out.push_str("run0 ");337 }338 if !self.env.is_empty() {339 out.push_str("env");340 for (k, v) in &self.env {341 out.push(' ');342 assert!(!k.contains('='));343 escape_bash(k, &mut out);344 out.push('=');345 escape_bash(v, &mut out);346 }347 out.push(' ');348 }349 escape_bash(&self.program, &mut out);350 for arg in &self.args {351 out.push(' ');352 escape_bash(arg, &mut out);353 }354 out355 }356357 fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {358 let line = self.shell_line();359 let opts = SpawnOptions {360 program: self.program,361 args: self.args,362 env: self.env,363 env_clear: false,364 cwd: None,365 escalated: self.escalated,366 stdin: StdioMode::Null,367 stdout: StdioMode::Pipe,368 stderr: StderrMode::Pipe,369 };370 (self.remowt, opts, line)371 }372373 pub async fn run(self) -> Result<()> {374 run_inner(self, false).await.map(|_| ())375 }376 pub async fn run_string(self) -> Result<String> {377 let bytes = run_inner(self, true).await?.expect("want_stdout");378 Ok(String::from_utf8(bytes)?)379 }380 pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {381 let s = self.run_string().await?;382 Ok(serde_json::from_str(&s)?)383 }384}385386async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {387 let (remowt, opts, line) = cmd.into_spawn_options();388 debug!("running command {line:?} over remowt");389 let program = opts.program.clone();390 let mut child = remowt.spawn(opts).await?;391 let stderr = child.stderr.take().expect("stderr=Pipe");392 let stdout = child.stdout.take().expect("stdout=Pipe");393394 let mut err = FramedRead::new(stderr, LinesCodec::new());395 let (mut out_bytes, mut out_lines) = if want_stdout {396 (Some(FramedRead::new(stdout, BytesCodec::new())), None)397 } else {398 (None, Some(FramedRead::new(stdout, LinesCodec::new())))399 };400401 let mut buf = if want_stdout { Some(Vec::new()) } else { None };402403 let mut wait = pin!(child.wait());404 let exit = loop {405 select! {406 biased;407408 Some(e) = err.next() => {409 let e = e?;410 warn!(program = %program, "{e}");411 }412 Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {413 buf.as_mut().expect("want_stdout").extend_from_slice(&o?);414 }415 Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {416 let o = o?;417 info!(program = %program, "{o}");418 }419 res = &mut wait => {420 break res?;421 }422 }423 };424425 match exit {426 Some(0) => Ok(buf),427 Some(c) => bail!("command '{line}' failed with status {c}"),428 None => Err(anyhow!("command '{line}' ended without an exit status")),429 }430}1use std::pin::pin;23use anyhow::{anyhow, bail, Result};4use camino::Utf8PathBuf;5use futures::StreamExt as _;6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};7use remowt_link_shared::BifConfig;8use serde::de::DeserializeOwned;9use tokio::io::AsyncWriteExt as _;10use tokio::select;11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{debug, info, warn};1314use crate::forwarded::{RemowtListener, RemowtStream};15use crate::{drain_to_tracing, Remowt};1617#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]18pub enum StdioMode {19 #[default]20 Null,21 Pipe,22 Inherit,23}2425#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]26pub enum StderrMode {27 #[default]28 Null,29 Pipe,30 Inherit,31 MergeWithStdout,32}3334#[derive(Default)]35pub struct SpawnOptions {36 pub program: String,37 pub args: Vec<String>,38 pub env: Vec<(String, String)>,39 pub env_clear: bool,40 pub cwd: Option<Utf8PathBuf>,41 pub escalated: bool,42 pub stdin: StdioMode,43 pub stdout: StdioMode,44 pub stderr: StderrMode,45}4647pub struct RemowtChild {48 pub stdin: Option<RemowtStream>,49 pub stdout: Option<RemowtStream>,50 pub stderr: Option<RemowtStream>,51 id: ProcId,52 client: SubprocessClient<BifConfig>,53}5455impl RemowtChild {56 pub async fn wait(self) -> Result<Option<i32>> {57 let RemowtChild {58 stdin,59 stdout,60 stderr,61 id,62 client,63 } = self;64 drop(stdin);65 let drain_out = async move {66 if let Some(s) = stdout {67 drain_to_tracing(s, "<child stdout>".to_owned(), false).await;68 }69 };70 let drain_err = async move {71 if let Some(s) = stderr {72 drain_to_tracing(s, "<child stderr>".to_owned(), true).await;73 }74 };75 let wait = async move {76 client77 .wait(id)78 .await?79 .map_err(|e| anyhow!("agent wait failed: {e}"))80 };81 let (code, _, _) = tokio::join!(wait, drain_out, drain_err);82 code83 }8485 pub async fn kill(&self, signal: i32) -> Result<()> {86 self.client87 .kill(self.id, signal)88 .await?89 .map_err(|e| anyhow!("agent kill failed: {e}"))90 }91}9293fn needs_socket(m: StdioMode) -> bool {94 matches!(m, StdioMode::Pipe | StdioMode::Inherit)95}9697fn stderr_needs_socket(m: StderrMode) -> bool {98 matches!(m, StderrMode::Pipe | StderrMode::Inherit)99}100101impl Remowt {102 pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {103 let SpawnOptions {104 program,105 args,106 env,107 env_clear,108 cwd,109 escalated,110 stdin,111 stdout,112 stderr,113 } = opts;114115 if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {116 bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");117 }118119 let stdin_bound = if needs_socket(stdin) {120 Some(self.bind_runtime_unix("proc-stdin").await?)121 } else {122 None123 };124 let stdout_bound = if needs_socket(stdout) {125 Some(self.bind_runtime_unix("proc-stdout").await?)126 } else {127 None128 };129 let stderr_bound = if stderr_needs_socket(stderr) {130 Some(self.bind_runtime_unix("proc-stderr").await?)131 } else {132 None133 };134135 let stdin_spec = match &stdin_bound {136 Some((_, p)) => StdioSpec::Socket(p.clone()),137 None => StdioSpec::Null,138 };139 let stdout_spec = match &stdout_bound {140 Some((_, p)) => StdioSpec::Socket(p.clone()),141 None => StdioSpec::Null,142 };143 let stderr_spec = match (&stderr, &stderr_bound) {144 (StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),145 (StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,146 _ => StderrSpec::Null,147 };148149 let client: SubprocessClient<BifConfig> = if escalated {150 // Boxed to break the async-fn type cycle151 Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?152 } else {153 self.endpoints()154 };155156 let spec = SpawnSpec {157 program: program.clone(),158 args,159 env,160 env_clear,161 cwd,162 stdin: stdin_spec,163 stdout: stdout_spec,164 stderr: stderr_spec,165 };166 let id = client167 .spawn(spec)168 .await?169 .map_err(|e| anyhow!("agent spawn failed: {e}"))?;170171 let (stdin_res, stdout_res, stderr_res) = tokio::join!(172 accept(stdin_bound),173 accept(stdout_bound),174 accept(stderr_bound),175 );176177 let stdin_stream = handle_stdin(stdin, stdin_res?, &program);178 let stdout_stream = handle_output(stdout, stdout_res?, &program);179 let stderr_stream = handle_output_err(stderr, stderr_res?, &program);180181 Ok(RemowtChild {182 stdin: stdin_stream,183 stdout: stdout_stream,184 stderr: stderr_stream,185 id,186 client,187 })188 }189190 pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {191 let program = program.as_ref().to_owned();192 RemowtCommand {193 program,194 args: vec![],195 env: vec![],196 remowt: self.clone(),197 escalated: false,198 }199 }200}201202async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {203 match b {204 Some((l, _)) => Ok(Some(l.accept().await?)),205 None => Ok(None),206 }207}208209fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {210 match mode {211 StdioMode::Pipe => s,212 StdioMode::Inherit => {213 if let Some(s) = s {214 let program = program.to_owned();215 tokio::spawn(async move {216 let mut stdin = tokio::io::stdin();217 let mut s = s;218 if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {219 warn!(program, "stdin forward ended: {e}");220 }221 let _ = s.shutdown().await;222 });223 }224 None225 }226 StdioMode::Null => None,227 }228}229230fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {231 match mode {232 StdioMode::Pipe => s,233 StdioMode::Inherit => {234 if let Some(s) = s {235 let program = program.to_owned();236 tokio::spawn(drain_to_tracing(s, program, false));237 }238 None239 }240 StdioMode::Null => None,241 }242}243244fn handle_output_err(245 mode: StderrMode,246 s: Option<RemowtStream>,247 program: &str,248) -> Option<RemowtStream> {249 match mode {250 StderrMode::Pipe => s,251 StderrMode::Inherit => {252 if let Some(s) = s {253 let program = program.to_owned();254 tokio::spawn(drain_to_tracing(s, program, true));255 }256 None257 }258 StderrMode::MergeWithStdout | StderrMode::Null => None,259 }260}261262fn escape_bash(input: &str, out: &mut String) {263 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";264 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {265 out.push_str(input);266 return;267 }268 out.push('\'');269 for (i, v) in input.split('\'').enumerate() {270 if i != 0 {271 out.push_str("'\"'\"'");272 }273 out.push_str(v);274 }275 out.push('\'');276}277278#[derive(Clone)]279pub struct RemowtCommand {280 program: String,281 args: Vec<String>,282 env: Vec<(String, String)>,283 remowt: Remowt,284 escalated: bool,285}286287impl RemowtCommand {288 pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {289 self.args.push(arg.as_ref().to_owned());290 self291 }292 pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {293 for arg in args {294 self.args.push(arg.as_ref().to_owned());295 }296 self297 }298 pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {299 self.args300 .push(format!("{}={}", key.as_ref(), value.as_ref()));301 self302 }303 pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {304 self.args.push(key.as_ref().to_owned());305 self.args.push(value.as_ref().to_owned());306 self307 }308 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {309 self.env310 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));311 self312 }313314 pub fn sudo(mut self) -> Self {315 self.escalated = true;316 self317 }318319 /// Only for display.320 fn shell_line(&self) -> String {321 let mut out = String::new();322 if self.escalated {323 out.push_str("run0 ");324 }325 if !self.env.is_empty() {326 out.push_str("env");327 for (k, v) in &self.env {328 out.push(' ');329 assert!(!k.contains('='));330 escape_bash(k, &mut out);331 out.push('=');332 escape_bash(v, &mut out);333 }334 out.push(' ');335 }336 escape_bash(&self.program, &mut out);337 for arg in &self.args {338 out.push(' ');339 escape_bash(arg, &mut out);340 }341 out342 }343344 fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {345 let line = self.shell_line();346 let opts = SpawnOptions {347 program: self.program,348 args: self.args,349 env: self.env,350 env_clear: false,351 cwd: None,352 escalated: self.escalated,353 stdin: StdioMode::Null,354 stdout: StdioMode::Pipe,355 stderr: StderrMode::Pipe,356 };357 (self.remowt, opts, line)358 }359360 pub async fn run(self) -> Result<()> {361 run_inner(self, false).await.map(|_| ())362 }363 pub async fn run_string(self) -> Result<String> {364 let bytes = run_inner(self, true).await?.expect("want_stdout");365 Ok(String::from_utf8(bytes)?)366 }367 pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {368 let s = self.run_string().await?;369 Ok(serde_json::from_str(&s)?)370 }371}372373async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {374 let (remowt, opts, line) = cmd.into_spawn_options();375 debug!("running command {line:?} over remowt");376 let program = opts.program.clone();377 let mut child = remowt.spawn(opts).await?;378 let stderr = child.stderr.take().expect("stderr=Pipe");379 let stdout = child.stdout.take().expect("stdout=Pipe");380381 let mut err = FramedRead::new(stderr, LinesCodec::new());382 let (mut out_bytes, mut out_lines) = if want_stdout {383 (Some(FramedRead::new(stdout, BytesCodec::new())), None)384 } else {385 (None, Some(FramedRead::new(stdout, LinesCodec::new())))386 };387388 let mut buf = if want_stdout { Some(Vec::new()) } else { None };389390 let mut wait = pin!(child.wait());391 let exit = loop {392 select! {393 biased;394395 Some(e) = err.next() => {396 let e = e?;397 warn!(program = %program, "{e}");398 }399 Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {400 buf.as_mut().expect("want_stdout").extend_from_slice(&o?);401 }402 Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {403 let o = o?;404 info!(program = %program, "{o}");405 }406 res = &mut wait => {407 break res?;408 }409 }410 };411412 while let Some(e) = err.next().await {413 if let Ok(line) = e {414 warn!(program = %program, "{line}");415 }416 }417 if want_stdout {418 if let Some(out_bytes) = out_bytes.as_mut() {419 while let Some(o) = out_bytes.next().await {420 if let Ok(chunk) = o {421 buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);422 }423 }424 }425 } else if let Some(out_lines) = out_lines.as_mut() {426 while let Some(o) = out_lines.next().await {427 if let Ok(line) = o {428 info!(program = %program, "{line}");429 }430 }431 }432433 match exit {434 Some(0) => Ok(buf),435 Some(c) => bail!("command '{line}' failed with status {c}"),436 None => Err(anyhow!("command '{line}' ended without an exit status")),437 }438}crates/remowt-link-shared/src/port.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/port.rs
+++ b/crates/remowt-link-shared/src/port.rs
@@ -3,10 +3,8 @@
use bifrostlink::Port;
use bytes::{Bytes, BytesMut};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+use tokio::select;
-/// Wire a length-prefixed duplex byte stream (e.g. a child process's
-/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
-/// length followed by that many payload bytes.
pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
where
R: AsyncRead + Unpin + Send + 'static,
@@ -18,13 +16,13 @@
let len = match reader.read_u32().await {
Ok(len) => len,
Err(e) => {
- tracing::error!("child read failed: {e}");
+ log_read_end(&e);
break;
}
};
let mut buf = BytesMut::zeroed(len as usize);
if let Err(e) = reader.read_exact(&mut buf).await {
- tracing::error!("child read failed: {e}");
+ log_read_end(&e);
break;
}
if tx.send(buf.freeze()).is_err() {
@@ -35,15 +33,45 @@
let write_task = async move {
while let Some(msg) = rx.recv().await {
if let Err(e) = write_frame(&mut writer, msg).await {
- tracing::error!("child write failed: {e}");
+ log_write_end(&e);
break;
}
}
};
- tokio::join!(read_task, write_task);
+ select! {
+ _ = read_task => {},
+ _ = write_task => {},
+ }
})
}
+fn log_read_end(e: &io::Error) {
+ if matches!(
+ e.kind(),
+ io::ErrorKind::UnexpectedEof
+ | io::ErrorKind::BrokenPipe
+ | io::ErrorKind::ConnectionReset
+ | io::ErrorKind::ConnectionAborted
+ ) {
+ tracing::debug!("child read ended: {e}");
+ } else {
+ tracing::error!("child read failed: {e}");
+ }
+}
+
+fn log_write_end(e: &io::Error) {
+ if matches!(
+ e.kind(),
+ io::ErrorKind::BrokenPipe
+ | io::ErrorKind::ConnectionReset
+ | io::ErrorKind::ConnectionAborted
+ ) {
+ tracing::debug!("child write ended: {e}");
+ } else {
+ tracing::error!("child write failed: {e}");
+ }
+}
+
async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
let len = u32::try_from(msg.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;