git.delta.rocks / remowt / refs/commits / 6d9cf16dada2

difftreelog

feat drain stdout/stderr after signal

vmkslqpuYaroslav Bolyukin4 days agoparent: #eadb0bb.patch.diff
in: trunk

6 files changed

modifiedCargo.lockdiffbeforeafterboth
308308
309[[package]]309[[package]]
310name = "bifrostlink"310name = "bifrostlink"
311version = "0.2.4"311version = "0.2.5"
312source = "registry+https://github.com/rust-lang/crates.io-index"312source = "registry+https://github.com/rust-lang/crates.io-index"
313checksum = "ad2d0e30a2aa432b78f41f9676572f88201d4dc73bc2b7bc90704d2e02b7d062"313checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39"
314dependencies = [314dependencies = [
315 "async-trait",315 "async-trait",
316 "async_fn_traits",316 "async_fn_traits",
327327
328[[package]]328[[package]]
329name = "bifrostlink-macros"329name = "bifrostlink-macros"
330version = "0.2.4"330version = "0.2.5"
331source = "registry+https://github.com/rust-lang/crates.io-index"331source = "registry+https://github.com/rust-lang/crates.io-index"
332checksum = "e2121559c45cbe89c4f8d1d741360d5b028577254f6beca053dc02332da85b43"332checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0"
333dependencies = [333dependencies = [
334 "proc-macro2",334 "proc-macro2",
335 "quote",335 "quote",
338338
339[[package]]339[[package]]
340name = "bifrostlink-ports"340name = "bifrostlink-ports"
341version = "0.2.4"341version = "0.2.5"
342source = "registry+https://github.com/rust-lang/crates.io-index"342source = "registry+https://github.com/rust-lang/crates.io-index"
343checksum = "e9395c4ccca497b0c50583e6de57aca921c046ae0c10f56030cd2c5a20db05f8"343checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70"
344dependencies = [344dependencies = [
345 "bifrostlink",345 "bifrostlink",
346 "bytes",346 "bytes",
18351835
1836[[package]]1836[[package]]
1837name = "polkit-backend"1837name = "polkit-backend"
1838version = "0.1.4"1838version = "0.1.6"
1839dependencies = [1839dependencies = [
1840 "anyhow",1840 "anyhow",
1841 "clap",1841 "clap",
20552055
2056[[package]]2056[[package]]
2057name = "remowt-agent"2057name = "remowt-agent"
2058version = "0.1.4"2058version = "0.1.6"
2059dependencies = [2059dependencies = [
2060 "anyhow",2060 "anyhow",
2061 "bifrostlink",2061 "bifrostlink",
20832083
2084[[package]]2084[[package]]
2085name = "remowt-client"2085name = "remowt-client"
2086version = "0.1.4"2086version = "0.1.6"
2087dependencies = [2087dependencies = [
2088 "anyhow",2088 "anyhow",
2089 "bifrostlink",2089 "bifrostlink",
21062106
2107[[package]]2107[[package]]
2108name = "remowt-endpoints"2108name = "remowt-endpoints"
2109version = "0.1.4"2109version = "0.1.6"
2110dependencies = [2110dependencies = [
2111 "anyhow",2111 "anyhow",
2112 "bifrostlink",2112 "bifrostlink",
21242124
2125[[package]]2125[[package]]
2126name = "remowt-link-shared"2126name = "remowt-link-shared"
2127version = "0.1.4"2127version = "0.1.6"
2128dependencies = [2128dependencies = [
2129 "bifrostlink",2129 "bifrostlink",
2130 "bytes",2130 "bytes",
21382138
2139[[package]]2139[[package]]
2140name = "remowt-plugin"2140name = "remowt-plugin"
2141version = "0.1.4"2141version = "0.1.6"
2142dependencies = [2142dependencies = [
2143 "anyhow",2143 "anyhow",
2144 "bifrostlink",2144 "bifrostlink",
21522152
2153[[package]]2153[[package]]
2154name = "remowt-polkit-shared"2154name = "remowt-polkit-shared"
2155version = "0.1.4"2155version = "0.1.6"
2156dependencies = [2156dependencies = [
2157 "nix",2157 "nix",
2158 "serde",2158 "serde",
21612161
2162[[package]]2162[[package]]
2163name = "remowt-ssh"2163name = "remowt-ssh"
2164version = "0.1.4"2164version = "0.1.6"
2165dependencies = [2165dependencies = [
2166 "anyhow",2166 "anyhow",
2167 "async-trait",2167 "async-trait",
21892189
2190[[package]]2190[[package]]
2191name = "remowt-ui-prompt"2191name = "remowt-ui-prompt"
2192version = "0.1.4"2192version = "0.1.6"
2193dependencies = [2193dependencies = [
2194 "anyhow",2194 "anyhow",
2195 "bifrostlink",2195 "bifrostlink",
modifiedCargo.tomldiffbeforeafterboth
3resolver = "2"3resolver = "2"
44
5[workspace.package]5[workspace.package]
6version = "0.1.4"6version = "0.1.6"
7license = "MIT"7license = "MIT"
8edition = "2021"8edition = "2021"
9repository = "https://git.delta.rocks/r/remowt"9repository = "https://git.delta.rocks/r/remowt"
1010
11[workspace.dependencies]11[workspace.dependencies]
12remowt-client = { version = "0.1.3", path = "crates/remowt-client" }12remowt-client = { version = "0.1.6", path = "crates/remowt-client" }
13remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }13remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" }
14remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }14remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" }
15remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }15remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" }
16remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }16remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" }
17remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }17remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" }
1818
19bifrostlink = "0.2.0"19bifrostlink = "0.2.0"
20bifrostlink-macros = "0.2.0"20bifrostlink-macros = "0.2.0"
52thiserror = "2.0.18"52thiserror = "2.0.18"
5353
54[profile.release]54[profile.release]
55panic = "abort"55panic = "unwind"
56opt-level = "z"56opt-level = "z"
57lto = true57lto = true
58codegen-units = 158codegen-units = 1
59debug = "full"
60split-debuginfo = "off"
5961
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
2use std::collections::{BTreeMap, HashMap};2use std::collections::{BTreeMap, HashMap};
3use std::fs::Permissions;3use std::fs::Permissions;
4use std::future::pending;4use std::future::pending;
5use std::io;
5use std::os::unix::fs::PermissionsExt as _;6use std::os::unix::fs::PermissionsExt as _;
6use std::path::PathBuf;7use std::path::PathBuf;
7use std::sync::{Arc, Mutex, OnceLock};8use std::sync::{Arc, Mutex, OnceLock};
230231
231fn main() -> anyhow::Result<()> {232fn main() -> anyhow::Result<()> {
232 tracing_subscriber::fmt()233 tracing_subscriber::fmt()
233 .with_writer(std::io::stderr)234 .with_writer(io::stderr)
234 .without_time()235 .without_time()
235 .init();236 .init();
236 let opts = Opts::parse();237 let opts = Opts::parse();
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
17use russh::keys::ssh_key::PublicKey;17use russh::keys::ssh_key::PublicKey;
18use russh::Channel;18use russh::Channel;
19use tempfile::TempDir;19use tempfile::TempDir;
20use tokio::io::AsyncRead;
20use tokio::net::UnixListener;21use tokio::net::UnixListener;
21use tokio::sync::oneshot;22use tokio::sync::oneshot;
23use tokio::task::JoinHandle;
22use tokio::{24use tokio::{
23 fs,25 fs,
24 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},26 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},
25};27};
26use tracing::{debug, warn};28use tracing::{debug, info, warn};
27use uuid::Uuid;29use uuid::Uuid;
2830
29pub mod editor;31pub mod editor;
9799
98 let mut child = SshExecChild::from_exec(ch);100 let mut child = SshExecChild::from_exec(ch);
99 drop(child.stdin);101 drop(child.stdin);
100 drain_stderr(child.stderr, cmd.to_owned());102 drain_to_tracing(child.stderr, cmd.to_owned(), true);
101103
102 let mut out = Vec::new();104 let mut out = Vec::new();
103 child.stdout.read_to_end(&mut out).await?;105 child.stdout.read_to_end(&mut out).await?;
317 .await?;319 .await?;
318320
319 let child = SshExecChild::from_exec(cmd_chan);321 let child = SshExecChild::from_exec(cmd_chan);
320 drain_stderr(child.stderr, "agent".to_owned());322 drain_to_tracing(child.stderr, "agent".to_owned(), true);
321 rpc.add_direct(323 rpc.add_direct(
322 Address::Agent,324 Address::Agent,
323 child_port(child.stdout, child.stdin),325 child_port(child.stdout, child.stdin),
518 }520 }
519}521}
520522
521fn drain_stderr(stream: DuplexStream, context: String) {523pub(crate) fn drain_to_tracing(
524 stream: impl AsyncRead + Unpin + 'static + Send,
525 context: String,
526 stderr: bool,
527) -> JoinHandle<()> {
522 tokio::spawn(async move {528 tokio::spawn(async move {
523 let mut reader = BufReader::new(stream).lines();529 let mut reader = BufReader::new(stream);
530 let mut buf = Vec::with_capacity(4096);
524 loop {531 loop {
532 buf.clear();
525 match reader.next_line().await {533 match reader.read_until(b'\n', &mut buf).await {
534 Ok(0) => break,
526 Ok(Some(line)) => warn!(context = %context, "{line}"),535 Ok(_) => {
527 Ok(None) => break,536 let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));
537 if stderr {
538 warn!(context = %context, "{line}");
539 } else {
540 info!(context = %context, "{line}");
541 }
542 }
528 Err(e) => {543 Err(e) => {
529 warn!(context = %context, "stderr read failed: {e}");544 warn!(context = %context, "child stdio read failed: {e}");
530 break;545 break;
531 }546 }
532 }547 }
533 }548 }
534 });549 })
535}550}
536551
537fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {552fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
modifiedcrates/remowt-client/src/subprocess.rsdiffbeforeafterboth
6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
7use remowt_link_shared::BifConfig;7use remowt_link_shared::BifConfig;
8use serde::de::DeserializeOwned;8use serde::de::DeserializeOwned;
9use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};9use tokio::io::AsyncWriteExt as _;
10use tokio::select;10use tokio::select;
11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
12use tracing::{debug, info, warn};12use tracing::{debug, info, warn};
1313
14use crate::forwarded::{RemowtListener, RemowtStream};14use crate::forwarded::{RemowtListener, RemowtStream};
15use crate::Remowt;15use crate::{drain_to_tracing, Remowt};
1616
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum StdioMode {18pub enum StdioMode {
62 client,62 client,
63 } = self;63 } = self;
64 drop(stdin);64 drop(stdin);
65 let drain_out = async move {
65 drop(stdout);66 if let Some(s) = stdout {
67 drain_to_tracing(s, "<child stdout>".to_owned(), false).await;
68 }
69 };
70 let drain_err = async move {
66 drop(stderr);71 if let Some(s) = stderr {
72 drain_to_tracing(s, "<child stderr>".to_owned(), true).await;
73 }
74 };
75 let wait = async move {
67 client76 client
68 .wait(id)77 .wait(id)
69 .await?78 .await?
70 .map_err(|e| anyhow!("agent wait failed: {e}"))79 .map_err(|e| anyhow!("agent wait failed: {e}"))
80 };
81 let (code, _, _) = tokio::join!(wait, drain_out, drain_err);
82 code
71 }83 }
7284
73 pub async fn kill(&self, signal: i32) -> Result<()> {85 pub async fn kill(&self, signal: i32) -> Result<()> {
163 );175 );
164176
165 let stdin_stream = handle_stdin(stdin, stdin_res?, &program);177 let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
166 let stdout_stream = handle_output(stdout, stdout_res?, &program, false);178 let stdout_stream = handle_output(stdout, stdout_res?, &program);
167 let stderr_stream = handle_output_err(stderr, stderr_res?, &program);179 let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
168180
169 Ok(RemowtChild {181 Ok(RemowtChild {
218fn handle_output(230fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
219 mode: StdioMode,
220 s: Option<RemowtStream>,
221 program: &str,
222 is_stderr: bool,
223) -> Option<RemowtStream> {
224 match mode {231 match mode {
225 StdioMode::Pipe => s,232 StdioMode::Pipe => s,
226 StdioMode::Inherit => {233 StdioMode::Inherit => {
227 if let Some(s) = s {234 if let Some(s) = s {
228 let program = program.to_owned();235 let program = program.to_owned();
229 tokio::spawn(pump_to_tracing(s, program, is_stderr));236 tokio::spawn(drain_to_tracing(s, program, false));
230 }237 }
231 None238 None
232 }239 }
244 StderrMode::Inherit => {251 StderrMode::Inherit => {
245 if let Some(s) = s {252 if let Some(s) = s {
246 let program = program.to_owned();253 let program = program.to_owned();
247 tokio::spawn(pump_to_tracing(s, program, true));254 tokio::spawn(drain_to_tracing(s, program, true));
248 }255 }
249 None256 None
250 }257 }
251 StderrMode::MergeWithStdout | StderrMode::Null => None,258 StderrMode::MergeWithStdout | StderrMode::Null => None,
252 }259 }
253}260}
254
255async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {
256 let mut reader = BufReader::new(stream).lines();
257 loop {
258 match reader.next_line().await {
259 Ok(Some(line)) => {
260 if is_stderr {
261 warn!(program, "{line}");
262 } else {
263 info!(program, "{line}");
264 }
265 }
266 Ok(None) => break,
267 Err(e) => {
268 warn!(program, "child log stream error: {e}");
269 break;
270 }
271 }
272 }
273}
274261
275fn escape_bash(input: &str, out: &mut String) {262fn escape_bash(input: &str, out: &mut String) {
276 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";263 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
422 }409 }
423 };410 };
411
412 while let Some(e) = err.next().await {
413 if let Ok(line) = e {
414 warn!(program = %program, "{line}");
415 }
416 }
417 if want_stdout {
418 if let Some(out_bytes) = out_bytes.as_mut() {
419 while let Some(o) = out_bytes.next().await {
420 if let Ok(chunk) = o {
421 buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);
422 }
423 }
424 }
425 } else if let Some(out_lines) = out_lines.as_mut() {
426 while let Some(o) = out_lines.next().await {
427 if let Ok(line) = o {
428 info!(program = %program, "{line}");
429 }
430 }
431 }
424432
425 match exit {433 match exit {
426 Some(0) => Ok(buf),434 Some(0) => Ok(buf),
modifiedcrates/remowt-link-shared/src/port.rsdiffbeforeafterboth
3use bifrostlink::Port;3use bifrostlink::Port;
4use bytes::{Bytes, BytesMut};4use bytes::{Bytes, BytesMut};
5use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};5use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
66use tokio::select;
7/// Wire a length-prefixed duplex byte stream (e.g. a child process's7
8/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
9/// length followed by that many payload bytes.
10pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port8pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
11where9where
12 R: AsyncRead + Unpin + Send + 'static,10 R: AsyncRead + Unpin + Send + 'static,
18 let len = match reader.read_u32().await {16 let len = match reader.read_u32().await {
19 Ok(len) => len,17 Ok(len) => len,
20 Err(e) => {18 Err(e) => {
21 tracing::error!("child read failed: {e}");19 log_read_end(&e);
22 break;20 break;
23 }21 }
24 };22 };
25 let mut buf = BytesMut::zeroed(len as usize);23 let mut buf = BytesMut::zeroed(len as usize);
26 if let Err(e) = reader.read_exact(&mut buf).await {24 if let Err(e) = reader.read_exact(&mut buf).await {
27 tracing::error!("child read failed: {e}");25 log_read_end(&e);
28 break;26 break;
29 }27 }
30 if tx.send(buf.freeze()).is_err() {28 if tx.send(buf.freeze()).is_err() {
35 let write_task = async move {33 let write_task = async move {
36 while let Some(msg) = rx.recv().await {34 while let Some(msg) = rx.recv().await {
37 if let Err(e) = write_frame(&mut writer, msg).await {35 if let Err(e) = write_frame(&mut writer, msg).await {
38 tracing::error!("child write failed: {e}");36 log_write_end(&e);
39 break;37 break;
40 }38 }
41 }39 }
42 };40 };
43 tokio::join!(read_task, write_task);41 select! {
42 _ = read_task => {},
43 _ = write_task => {},
44 }
44 })45 })
45}46}
47
48fn log_read_end(e: &io::Error) {
49 if matches!(
50 e.kind(),
51 io::ErrorKind::UnexpectedEof
52 | io::ErrorKind::BrokenPipe
53 | io::ErrorKind::ConnectionReset
54 | io::ErrorKind::ConnectionAborted
55 ) {
56 tracing::debug!("child read ended: {e}");
57 } else {
58 tracing::error!("child read failed: {e}");
59 }
60}
61
62fn log_write_end(e: &io::Error) {
63 if matches!(
64 e.kind(),
65 io::ErrorKind::BrokenPipe
66 | io::ErrorKind::ConnectionReset
67 | io::ErrorKind::ConnectionAborted
68 ) {
69 tracing::debug!("child write ended: {e}");
70 } else {
71 tracing::error!("child write failed: {e}");
72 }
73}
4674
47async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {75async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
48 let len = u32::try_from(msg.len())76 let len = u32::try_from(msg.len())