difftreelog
feat drain stdout/stderr after signal
in: trunk
6 files changed
Cargo.lockdiffbeforeafterboth308308309[[package]]309[[package]]310name = "bifrostlink"310name = "bifrostlink"311version = "0.2.4"311version = "0.2.5"312source = "registry+https://github.com/rust-lang/crates.io-index"312source = "registry+https://github.com/rust-lang/crates.io-index"313checksum = "ad2d0e30a2aa432b78f41f9676572f88201d4dc73bc2b7bc90704d2e02b7d062"313checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39"314dependencies = [314dependencies = [315 "async-trait",315 "async-trait",316 "async_fn_traits",316 "async_fn_traits",327327328[[package]]328[[package]]329name = "bifrostlink-macros"329name = "bifrostlink-macros"330version = "0.2.4"330version = "0.2.5"331source = "registry+https://github.com/rust-lang/crates.io-index"331source = "registry+https://github.com/rust-lang/crates.io-index"332checksum = "e2121559c45cbe89c4f8d1d741360d5b028577254f6beca053dc02332da85b43"332checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0"333dependencies = [333dependencies = [334 "proc-macro2",334 "proc-macro2",335 "quote",335 "quote",338338339[[package]]339[[package]]340name = "bifrostlink-ports"340name = "bifrostlink-ports"341version = "0.2.4"341version = "0.2.5"342source = "registry+https://github.com/rust-lang/crates.io-index"342source = "registry+https://github.com/rust-lang/crates.io-index"343checksum = "e9395c4ccca497b0c50583e6de57aca921c046ae0c10f56030cd2c5a20db05f8"343checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70"344dependencies = [344dependencies = [345 "bifrostlink",345 "bifrostlink",346 "bytes",346 "bytes",183518351836[[package]]1836[[package]]1837name = "polkit-backend"1837name = "polkit-backend"1838version = "0.1.4"1838version = "0.1.6"1839dependencies = [1839dependencies = [1840 "anyhow",1840 "anyhow",1841 "clap",1841 "clap",205520552056[[package]]2056[[package]]2057name = "remowt-agent"2057name = "remowt-agent"2058version = "0.1.4"2058version = "0.1.6"2059dependencies = [2059dependencies = [2060 "anyhow",2060 "anyhow",2061 "bifrostlink",2061 "bifrostlink",208320832084[[package]]2084[[package]]2085name = "remowt-client"2085name = "remowt-client"2086version = "0.1.4"2086version = "0.1.6"2087dependencies = [2087dependencies = [2088 "anyhow",2088 "anyhow",2089 "bifrostlink",2089 "bifrostlink",210621062107[[package]]2107[[package]]2108name = "remowt-endpoints"2108name = "remowt-endpoints"2109version = "0.1.4"2109version = "0.1.6"2110dependencies = [2110dependencies = [2111 "anyhow",2111 "anyhow",2112 "bifrostlink",2112 "bifrostlink",212421242125[[package]]2125[[package]]2126name = "remowt-link-shared"2126name = "remowt-link-shared"2127version = "0.1.4"2127version = "0.1.6"2128dependencies = [2128dependencies = [2129 "bifrostlink",2129 "bifrostlink",2130 "bytes",2130 "bytes",213821382139[[package]]2139[[package]]2140name = "remowt-plugin"2140name = "remowt-plugin"2141version = "0.1.4"2141version = "0.1.6"2142dependencies = [2142dependencies = [2143 "anyhow",2143 "anyhow",2144 "bifrostlink",2144 "bifrostlink",215221522153[[package]]2153[[package]]2154name = "remowt-polkit-shared"2154name = "remowt-polkit-shared"2155version = "0.1.4"2155version = "0.1.6"2156dependencies = [2156dependencies = [2157 "nix",2157 "nix",2158 "serde",2158 "serde",216121612162[[package]]2162[[package]]2163name = "remowt-ssh"2163name = "remowt-ssh"2164version = "0.1.4"2164version = "0.1.6"2165dependencies = [2165dependencies = [2166 "anyhow",2166 "anyhow",2167 "async-trait",2167 "async-trait",218921892190[[package]]2190[[package]]2191name = "remowt-ui-prompt"2191name = "remowt-ui-prompt"2192version = "0.1.4"2192version = "0.1.6"2193dependencies = [2193dependencies = [2194 "anyhow",2194 "anyhow",2195 "bifrostlink",2195 "bifrostlink",Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,18 +3,18 @@
resolver = "2"
[workspace.package]
-version = "0.1.4"
+version = "0.1.6"
license = "MIT"
edition = "2021"
repository = "https://git.delta.rocks/r/remowt"
[workspace.dependencies]
-remowt-client = { version = "0.1.3", path = "crates/remowt-client" }
-remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }
-remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }
-remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }
-remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }
+remowt-client = { version = "0.1.6", path = "crates/remowt-client" }
+remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" }
+remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" }
+remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" }
bifrostlink = "0.2.0"
bifrostlink-macros = "0.2.0"
@@ -52,7 +52,9 @@
thiserror = "2.0.18"
[profile.release]
-panic = "abort"
+panic = "unwind"
opt-level = "z"
lto = true
codegen-units = 1
+debug = "full"
+split-debuginfo = "off"
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -2,6 +2,7 @@
use std::collections::{BTreeMap, HashMap};
use std::fs::Permissions;
use std::future::pending;
+use std::io;
use std::os::unix::fs::PermissionsExt as _;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, OnceLock};
@@ -230,7 +231,7 @@
fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
- .with_writer(std::io::stderr)
+ .with_writer(io::stderr)
.without_time()
.init();
let opts = Opts::parse();
crates/remowt-client/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -17,13 +17,15 @@
use russh::keys::ssh_key::PublicKey;
use russh::Channel;
use tempfile::TempDir;
+use tokio::io::AsyncRead;
use tokio::net::UnixListener;
use tokio::sync::oneshot;
+use tokio::task::JoinHandle;
use tokio::{
fs,
- io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},
+ io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},
};
-use tracing::{debug, warn};
+use tracing::{debug, info, warn};
use uuid::Uuid;
pub mod editor;
@@ -97,7 +99,7 @@
let mut child = SshExecChild::from_exec(ch);
drop(child.stdin);
- drain_stderr(child.stderr, cmd.to_owned());
+ drain_to_tracing(child.stderr, cmd.to_owned(), true);
let mut out = Vec::new();
child.stdout.read_to_end(&mut out).await?;
@@ -317,7 +319,7 @@
.await?;
let child = SshExecChild::from_exec(cmd_chan);
- drain_stderr(child.stderr, "agent".to_owned());
+ drain_to_tracing(child.stderr, "agent".to_owned(), true);
rpc.add_direct(
Address::Agent,
child_port(child.stdout, child.stdin),
@@ -518,20 +520,33 @@
}
}
-fn drain_stderr(stream: DuplexStream, context: String) {
+pub(crate) fn drain_to_tracing(
+ stream: impl AsyncRead + Unpin + 'static + Send,
+ context: String,
+ stderr: bool,
+) -> JoinHandle<()> {
tokio::spawn(async move {
- let mut reader = BufReader::new(stream).lines();
+ let mut reader = BufReader::new(stream);
+ let mut buf = Vec::with_capacity(4096);
loop {
- match reader.next_line().await {
- Ok(Some(line)) => warn!(context = %context, "{line}"),
- Ok(None) => break,
+ buf.clear();
+ match reader.read_until(b'\n', &mut buf).await {
+ Ok(0) => break,
+ Ok(_) => {
+ let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));
+ if stderr {
+ warn!(context = %context, "{line}");
+ } else {
+ info!(context = %context, "{line}");
+ }
+ }
Err(e) => {
- warn!(context = %context, "stderr read failed: {e}");
+ warn!(context = %context, "child stdio read failed: {e}");
break;
}
}
}
- });
+ })
}
fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
crates/remowt-client/src/subprocess.rsdiffbeforeafterboth--- a/crates/remowt-client/src/subprocess.rs
+++ b/crates/remowt-client/src/subprocess.rs
@@ -6,13 +6,13 @@
use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
use remowt_link_shared::BifConfig;
use serde::de::DeserializeOwned;
-use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};
+use tokio::io::AsyncWriteExt as _;
use tokio::select;
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
use tracing::{debug, info, warn};
use crate::forwarded::{RemowtListener, RemowtStream};
-use crate::Remowt;
+use crate::{drain_to_tracing, Remowt};
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum StdioMode {
@@ -62,12 +62,24 @@
client,
} = self;
drop(stdin);
- drop(stdout);
- drop(stderr);
- client
- .wait(id)
- .await?
- .map_err(|e| anyhow!("agent wait failed: {e}"))
+ let drain_out = async move {
+ if let Some(s) = stdout {
+ drain_to_tracing(s, "<child stdout>".to_owned(), false).await;
+ }
+ };
+ let drain_err = async move {
+ if let Some(s) = stderr {
+ drain_to_tracing(s, "<child stderr>".to_owned(), true).await;
+ }
+ };
+ let wait = async move {
+ client
+ .wait(id)
+ .await?
+ .map_err(|e| anyhow!("agent wait failed: {e}"))
+ };
+ let (code, _, _) = tokio::join!(wait, drain_out, drain_err);
+ code
}
pub async fn kill(&self, signal: i32) -> Result<()> {
@@ -163,7 +175,7 @@
);
let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
- let stdout_stream = handle_output(stdout, stdout_res?, &program, false);
+ let stdout_stream = handle_output(stdout, stdout_res?, &program);
let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
Ok(RemowtChild {
@@ -215,18 +227,13 @@
}
}
-fn handle_output(
- mode: StdioMode,
- s: Option<RemowtStream>,
- program: &str,
- is_stderr: bool,
-) -> Option<RemowtStream> {
+fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
match mode {
StdioMode::Pipe => s,
StdioMode::Inherit => {
if let Some(s) = s {
let program = program.to_owned();
- tokio::spawn(pump_to_tracing(s, program, is_stderr));
+ tokio::spawn(drain_to_tracing(s, program, false));
}
None
}
@@ -244,7 +251,7 @@
StderrMode::Inherit => {
if let Some(s) = s {
let program = program.to_owned();
- tokio::spawn(pump_to_tracing(s, program, true));
+ tokio::spawn(drain_to_tracing(s, program, true));
}
None
}
@@ -252,26 +259,6 @@
}
}
-async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {
- let mut reader = BufReader::new(stream).lines();
- loop {
- match reader.next_line().await {
- Ok(Some(line)) => {
- if is_stderr {
- warn!(program, "{line}");
- } else {
- info!(program, "{line}");
- }
- }
- Ok(None) => break,
- Err(e) => {
- warn!(program, "child log stream error: {e}");
- break;
- }
- }
- }
-}
-
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
@@ -422,6 +409,27 @@
}
};
+ while let Some(e) = err.next().await {
+ if let Ok(line) = e {
+ warn!(program = %program, "{line}");
+ }
+ }
+ if want_stdout {
+ if let Some(out_bytes) = out_bytes.as_mut() {
+ while let Some(o) = out_bytes.next().await {
+ if let Ok(chunk) = o {
+ buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);
+ }
+ }
+ }
+ } else if let Some(out_lines) = out_lines.as_mut() {
+ while let Some(o) = out_lines.next().await {
+ if let Ok(line) = o {
+ info!(program = %program, "{line}");
+ }
+ }
+ }
+
match exit {
Some(0) => Ok(buf),
Some(c) => bail!("command '{line}' failed with status {c}"),
crates/remowt-link-shared/src/port.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/port.rs
+++ b/crates/remowt-link-shared/src/port.rs
@@ -3,10 +3,8 @@
use bifrostlink::Port;
use bytes::{Bytes, BytesMut};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+use tokio::select;
-/// Wire a length-prefixed duplex byte stream (e.g. a child process's
-/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
-/// length followed by that many payload bytes.
pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
where
R: AsyncRead + Unpin + Send + 'static,
@@ -18,13 +16,13 @@
let len = match reader.read_u32().await {
Ok(len) => len,
Err(e) => {
- tracing::error!("child read failed: {e}");
+ log_read_end(&e);
break;
}
};
let mut buf = BytesMut::zeroed(len as usize);
if let Err(e) = reader.read_exact(&mut buf).await {
- tracing::error!("child read failed: {e}");
+ log_read_end(&e);
break;
}
if tx.send(buf.freeze()).is_err() {
@@ -35,15 +33,45 @@
let write_task = async move {
while let Some(msg) = rx.recv().await {
if let Err(e) = write_frame(&mut writer, msg).await {
- tracing::error!("child write failed: {e}");
+ log_write_end(&e);
break;
}
}
};
- tokio::join!(read_task, write_task);
+ select! {
+ _ = read_task => {},
+ _ = write_task => {},
+ }
})
}
+fn log_read_end(e: &io::Error) {
+ if matches!(
+ e.kind(),
+ io::ErrorKind::UnexpectedEof
+ | io::ErrorKind::BrokenPipe
+ | io::ErrorKind::ConnectionReset
+ | io::ErrorKind::ConnectionAborted
+ ) {
+ tracing::debug!("child read ended: {e}");
+ } else {
+ tracing::error!("child read failed: {e}");
+ }
+}
+
+fn log_write_end(e: &io::Error) {
+ if matches!(
+ e.kind(),
+ io::ErrorKind::BrokenPipe
+ | io::ErrorKind::ConnectionReset
+ | io::ErrorKind::ConnectionAborted
+ ) {
+ tracing::debug!("child write ended: {e}");
+ } else {
+ tracing::error!("child write failed: {e}");
+ }
+}
+
async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
let len = u32::try_from(msg.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;