difftreelog
feat drain stdout/stderr after signal
in: trunk
6 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,9 +308,9 @@
[[package]]
name = "bifrostlink"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad2d0e30a2aa432b78f41f9676572f88201d4dc73bc2b7bc90704d2e02b7d062"
+checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39"
dependencies = [
"async-trait",
"async_fn_traits",
@@ -327,9 +327,9 @@
[[package]]
name = "bifrostlink-macros"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2121559c45cbe89c4f8d1d741360d5b028577254f6beca053dc02332da85b43"
+checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0"
dependencies = [
"proc-macro2",
"quote",
@@ -338,9 +338,9 @@
[[package]]
name = "bifrostlink-ports"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e9395c4ccca497b0c50583e6de57aca921c046ae0c10f56030cd2c5a20db05f8"
+checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70"
dependencies = [
"bifrostlink",
"bytes",
@@ -1835,7 +1835,7 @@
[[package]]
name = "polkit-backend"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"clap",
@@ -2055,7 +2055,7 @@
[[package]]
name = "remowt-agent"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2083,7 +2083,7 @@
[[package]]
name = "remowt-client"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2106,7 +2106,7 @@
[[package]]
name = "remowt-endpoints"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2124,7 +2124,7 @@
[[package]]
name = "remowt-link-shared"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"bifrostlink",
"bytes",
@@ -2138,7 +2138,7 @@
[[package]]
name = "remowt-plugin"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2152,7 +2152,7 @@
[[package]]
name = "remowt-polkit-shared"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"nix",
"serde",
@@ -2161,7 +2161,7 @@
[[package]]
name = "remowt-ssh"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"async-trait",
@@ -2189,7 +2189,7 @@
[[package]]
name = "remowt-ui-prompt"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,18 +3,18 @@
resolver = "2"
[workspace.package]
-version = "0.1.4"
+version = "0.1.6"
license = "MIT"
edition = "2021"
repository = "https://git.delta.rocks/r/remowt"
[workspace.dependencies]
-remowt-client = { version = "0.1.3", path = "crates/remowt-client" }
-remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }
-remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }
-remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }
-remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }
+remowt-client = { version = "0.1.6", path = "crates/remowt-client" }
+remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" }
+remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" }
+remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" }
bifrostlink = "0.2.0"
bifrostlink-macros = "0.2.0"
@@ -52,7 +52,9 @@
thiserror = "2.0.18"
[profile.release]
-panic = "abort"
+panic = "unwind"
opt-level = "z"
lto = true
codegen-units = 1
+debug = "full"
+split-debuginfo = "off"
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -2,6 +2,7 @@
use std::collections::{BTreeMap, HashMap};
use std::fs::Permissions;
use std::future::pending;
+use std::io;
use std::os::unix::fs::PermissionsExt as _;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, OnceLock};
@@ -230,7 +231,7 @@
fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
- .with_writer(std::io::stderr)
+ .with_writer(io::stderr)
.without_time()
.init();
let opts = Opts::parse();
crates/remowt-client/src/lib.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use camino::{Utf8Path, Utf8PathBuf};10use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};13use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::keys::agent::client::AgentClient;15use russh::keys::agent::AgentIdentity;16use russh::keys::check_known_hosts;17use russh::keys::ssh_key::PublicKey;18use russh::Channel;19use tempfile::TempDir;20use tokio::net::UnixListener;21use tokio::sync::oneshot;22use tokio::{23 fs,24 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},25};26use tracing::{debug, warn};27use uuid::Uuid;2829pub mod editor;30mod forwarded;31mod shell;32mod ssh_exec;33mod subprocess;3435use self::ssh_exec::SshExecChild;36pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};37pub use forwarded::{RemowtListener, RemowtStream};38pub use shell::{RemowtShell, RemowtShellResizer};3940type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4142fn sh_quote(s: impl AsRef<str>) -> String {43 format!("'{}'", s.as_ref().replace('\'', "'\\''"))44}4546const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];4748pub struct AgentBundle {49 dir: PathBuf,50 hashes: HashMap<String, String>,51}5253impl AgentBundle {54 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {55 let dir = dir.into();56 let hashes_path = dir.join("hashes");57 let raw = std::fs::read_to_string(&hashes_path)58 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;59 let mut hashes = HashMap::new();60 for line in raw.lines() {61 let line = line.trim();62 if line.is_empty() {63 continue;64 }65 let (arch, hash) = line66 .split_once(char::is_whitespace)67 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;68 hashes.insert(arch.to_owned(), hash.trim().to_owned());69 }70 ensure!(71 !hashes.is_empty(),72 "agent bundle {} has no hashes",73 dir.display()74 );75 Ok(Self { dir, hashes })76 }7778 fn binary(&self, arch: &str) -> PathBuf {79 self.dir.join(format!("remowt-agent-{arch}"))80 }8182 fn local_binary(&self) -> Result<PathBuf> {83 let arch = env::consts::ARCH;84 let path = self.binary(arch);85 ensure!(86 path.is_file(),87 "no local remowt-agent build for arch {arch} in bundle {}",88 self.dir.display()89 );90 Ok(path)91 }92}9394async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {95 let ch = sess.channel_open_session().await?;96 ch.exec(true, cmd).await?;9798 let mut child = SshExecChild::from_exec(ch);99 drop(child.stdin);100 drain_stderr(child.stderr, cmd.to_owned());101102 let mut out = Vec::new();103 child.stdout.read_to_end(&mut out).await?;104 let code = child.exit.await.ok().flatten();105 Ok((code, out))106}107108async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {109 let (code, mut out) = run(sess, cmd).await?;110 ensure!(111 code == Some(0),112 "remote command failed (exit {code:?}): {cmd}"113 );114 if !out.is_empty() {115 ensure!(116 out.ends_with(b"\n"),117 "remote command was not newline-terminated: {cmd}: {out:?}"118 );119 out.pop();120 }121 String::from_utf8(out).context("expected utf8 output for command")122}123124async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {125 debug!("uname -a");126 let arch = run_string_ok(sess, "uname -m").await?;127 let hash = bundle128 .hashes129 .get(&arch)130 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;131132 debug!("get dir");133 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;134 let dir = if cache.is_empty() {135 let home = run_string_ok(sess, "echo \"$HOME\"").await?;136 ensure!(137 !home.is_empty(),138 "remote $HOME and $XDG_CACHE_HOME both empty"139 );140 Utf8PathBuf::from(home).join(".cache/remowt")141 } else {142 Utf8PathBuf::from(cache).join("remowt")143 };144 let path = dir.join(hash);145146 debug!("presence");147 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;148 if present != Some(0) {149 let bin = bundle.binary(&arch);150 debug!("read");151 let bytes = fs::read(&bin)152 .await153 .with_context(|| format!("reading agent binary {}", bin.display()))?;154 debug!("upload");155 upload_agent(sess, &dir, &path, bytes).await?;156 }157 Ok(path)158}159160async fn upload_agent(161 sess: &Handle<SshHandler>,162 dir: &Utf8Path,163 path: &Utf8Path,164 bytes: Vec<u8>,165) -> Result<()> {166 debug!("mkdirp");167 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;168169 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));170 let ch = sess.channel_open_session().await?;171 debug!("cat");172 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;173174 let mut child = SshExecChild::from_exec(ch);175 child176 .stdin177 .write_all(&bytes)178 .await179 .context("sending agent binary")?;180 child181 .stdin182 .shutdown()183 .await184 .context("sending agent binary")?;185 let code = child.wait().await;186 ensure!(code == Some(0), "agent upload failed (exit {code:?})");187188 debug!("chmod");189 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;190 run_string_ok(191 sess,192 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),193 )194 .await?;195 Ok(())196}197198pub struct SshHandler {199 host: String,200 port: u16,201 subs: Subs,202}203impl Handler for SshHandler {204 type Error = russh::Error;205 async fn check_server_key(206 &mut self,207 server_public_key: &PublicKey,208 ) -> Result<bool, Self::Error> {209 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)210 }211 async fn server_channel_open_forwarded_streamlocal(212 &mut self,213 channel: Channel<Msg>,214 socket_path: &str,215 _session: &mut Session,216 ) -> Result<(), Self::Error> {217 let Some(ch) = self218 .subs219 .lock()220 .expect("lock")221 .remove(&Utf8PathBuf::from(socket_path))222 else {223 return Err(russh::Error::WrongChannel);224 };225 let _ = ch.send(channel);226 Ok(())227 }228}229230enum Transport {231 Ssh {232 sess: Arc<Handle<SshHandler>>,233 subs: Subs,234 runtime_dir: Utf8PathBuf,235 agent_path: Utf8PathBuf,236 },237 Local {238 agent_path: PathBuf,239 runtime_dir: Utf8PathBuf,240 },241}242243struct RemowtInner {244 transport: Transport,245 rpc: Rpc<BifConfig>,246 elevated: tokio::sync::OnceCell<()>,247 #[allow(dead_code)]248 children: Mutex<Vec<tokio::process::Child>>,249 _runtime_tmp: Option<TempDir>,250}251252#[derive(Clone)]253pub struct Remowt(Arc<RemowtInner>);254255pub type RemowtRemote = Remote<BifConfig>;256257impl Remowt {258 /// Connect to the remote host over ssh, detect the architecture and deploy the required259 /// agent binary.260 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {261 let conf = russh_config::parse_home(host)?;262 let port = conf.host_config.port.or(conf.port).unwrap_or(22);263 let hostname = conf264 .host_config265 .hostname266 .clone()267 .unwrap_or_else(|| conf.host_name.clone());268 let user = conf269 .user270 .clone()271 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));272273 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));274 let mut sess = connect(275 Arc::new(Config::default()),276 (hostname.clone(), port),277 SshHandler {278 host: hostname,279 port,280 subs: subs.clone(),281 },282 )283 .await?;284285 let mut agent = AgentClient::connect_env().await?;286 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();287 let mut authenticated = false;288 for ident in agent.request_identities().await? {289 let AgentIdentity::PublicKey { key, .. } = ident else {290 continue;291 };292 if sess293 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)294 .await?295 .success()296 {297 authenticated = true;298 break;299 }300 }301 ensure!(authenticated, "ssh authentication failed");302303 let sess = Arc::new(sess);304305 debug!("deploying agent");306 let agent_path = deploy_agent(&sess, bundle).await?;307308 debug!("runtime dir");309 let runtime_dir = remote_runtime_dir(&sess).await?;310311 let rpc = Rpc::<BifConfig>::new(Address::User);312313 let cmd_chan = sess.channel_open_session().await?;314 debug!("starting agent");315 cmd_chan316 .exec(true, format!("{} real-agent", sh_quote(&agent_path)))317 .await?;318319 let child = SshExecChild::from_exec(cmd_chan);320 drain_stderr(child.stderr, "agent".to_owned());321 rpc.add_direct(322 Address::Agent,323 child_port(child.stdout, child.stdin),324 Rtt(0),325 );326327 Ok(Self(Arc::new(RemowtInner {328 transport: Transport::Ssh {329 sess,330 subs,331 runtime_dir,332 agent_path,333 },334 rpc,335 elevated: tokio::sync::OnceCell::new(),336 children: Mutex::new(Vec::new()),337 _runtime_tmp: None,338 })))339 }340341 /// "Connect" to the local machine's agent, by starting the agent binary locally.342 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {343 let agent_path = bundle.local_binary()?;344 let mut child = tokio::process::Command::new(&agent_path)345 .arg("real-agent")346 .arg("--local")347 .stdin(std::process::Stdio::piped())348 .stdout(std::process::Stdio::piped())349 .kill_on_drop(true)350 .spawn()351 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;352 let stdin = child.stdin.take().expect("stdin piped");353 let stdout = child.stdout.take().expect("stdout piped");354355 let rpc = Rpc::<BifConfig>::new(Address::User);356 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));357358 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;359360 Ok(Self(Arc::new(RemowtInner {361 transport: Transport::Local {362 agent_path,363 runtime_dir,364 },365 rpc,366 elevated: tokio::sync::OnceCell::new(),367 children: Mutex::new(vec![child]),368 _runtime_tmp: runtime_tmp,369 })))370 }371372 /// Get the handle to the underlying russh session handle.373 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {374 match &self.0.transport {375 Transport::Ssh { sess, .. } => Some(sess.clone()),376 Transport::Local { .. } => None,377 }378 }379380 pub fn rpc(&self) -> Rpc<BifConfig> {381 self.0.rpc.clone()382 }383384 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {385 let client: PluginEndpointsClient<BifConfig> = self.endpoints();386 client387 .load_plugin(id, name.to_owned())388 .await?389 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))390 }391 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {392 self.ensure_escalated().await?;393 let client: PluginEndpointsClient<BifConfig> =394 PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));395 client396 .load_plugin_path(id, path.to_owned())397 .await?398 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))399 }400 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {401 R::wrap(self.0.rpc.remote(Address::Plugin(id)))402 }403404 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {405 R::wrap(self.0.rpc.remote(Address::Agent))406 }407 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {408 self.ensure_escalated().await?;409 Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))410 }411412 async fn ensure_escalated(&self) -> Result<()> {413 self.0414 .elevated415 .get_or_try_init(|| async {416 let (agent_path, local) = match &self.0.transport {417 Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),418 Transport::Local { agent_path, .. } => (419 agent_path420 .to_str()421 .ok_or_else(|| anyhow!("local agent path is not utf-8"))?422 .to_owned(),423 true,424 ),425 };426427 let (tool, flags) = self.detect_escalation().await?;428 let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();429 args.push(agent_path);430 args.push("real-agent".to_owned());431 args.push("--privileged".to_owned());432 if local {433 args.push("--local".to_owned());434 }435436 let child = self437 .spawn(SpawnOptions {438 program: tool.to_owned(),439 args,440 stdin: StdioMode::Pipe,441 stdout: StdioMode::Pipe,442 stderr: StderrMode::Inherit,443 ..Default::default()444 })445 .await446 .context("spawning privileged agent")?;447448 let stdin = child449 .stdin450 .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;451 let stdout = child452 .stdout453 .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;454455 let port = child_port(stdout, stdin);456 self.0457 .rpc458 .add_direct(Address::AgentPrivileged, port, Rtt(0));459 anyhow::Ok(())460 })461 .await?;462 Ok(())463 }464465 async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {466 for (tool, flags) in ESCALATORS {467 let probe = self468 .spawn(SpawnOptions {469 program: (*tool).to_owned(),470 args: vec!["--version".to_owned()],471 stdout: StdioMode::Null,472 stderr: StderrMode::Null,473 ..Default::default()474 })475 .await;476 if let Ok(child) = probe {477 let _ = child.wait().await;478 return Ok((tool, flags));479 }480 }481 bail!("no escalation tool found")482 }483484 /// XDG_RUNTIME_DIR on the remote machine.485 pub fn runtime_dir(&self) -> Utf8PathBuf {486 match &self.0.transport {487 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),488 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),489 }490 }491492 /// Bind unix listener socket on the remote machine with auto-generated path, returning the path.493 pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {494 let sock = self495 .runtime_dir()496 .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));497 let listener = self.bind_unix(&sock).await?;498 Ok((listener, sock))499 }500501 /// Bind unix listener socket on the remote machine on the specified path.502 pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {503 match &self.0.transport {504 Transport::Ssh { sess, subs, .. } => {505 let (tx, rx) = oneshot::channel();506 subs.lock().expect("lock").insert(path.to_owned(), tx);507 sess.streamlocal_forward(path.to_owned()).await?;508 Ok(RemowtListener::Ssh(rx))509 }510 Transport::Local { .. } => {511 let _ = std::fs::remove_file(path);512 Ok(RemowtListener::Local(513 UnixListener::bind(path)?,514 path.to_owned(),515 ))516 }517 }518 }519}520521fn drain_stderr(stream: DuplexStream, context: String) {522 tokio::spawn(async move {523 let mut reader = BufReader::new(stream).lines();524 loop {525 match reader.next_line().await {526 Ok(Some(line)) => warn!(context = %context, "{line}"),527 Ok(None) => break,528 Err(e) => {529 warn!(context = %context, "stderr read failed: {e}");530 break;531 }532 }533 }534 });535}536537fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {538 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {539 if !dir.is_empty() {540 return Ok((Utf8PathBuf::from(dir), None));541 }542 }543 let tmp = tempfile::Builder::new()544 .prefix("remowt.")545 .rand_bytes(12)546 .tempdir()?;547 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())548 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;549 Ok((dir, Some(tmp)))550}551552async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {553 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;554 let dir = dir.trim();555 if dir.is_empty() {556 let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;557 Ok(Utf8PathBuf::from(tmp))558 } else {559 Ok(Utf8PathBuf::from(dir))560 }561}1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use camino::{Utf8Path, Utf8PathBuf};10use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};13use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::keys::agent::client::AgentClient;15use russh::keys::agent::AgentIdentity;16use russh::keys::check_known_hosts;17use russh::keys::ssh_key::PublicKey;18use russh::Channel;19use tempfile::TempDir;20use tokio::io::AsyncRead;21use tokio::net::UnixListener;22use tokio::sync::oneshot;23use tokio::task::JoinHandle;24use tokio::{25 fs,26 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},27};28use tracing::{debug, info, warn};29use uuid::Uuid;3031pub mod editor;32mod forwarded;33mod shell;34mod ssh_exec;35mod subprocess;3637use self::ssh_exec::SshExecChild;38pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};39pub use forwarded::{RemowtListener, RemowtStream};40pub use shell::{RemowtShell, RemowtShellResizer};4142type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4344fn sh_quote(s: impl AsRef<str>) -> String {45 format!("'{}'", s.as_ref().replace('\'', "'\\''"))46}4748const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];4950pub struct AgentBundle {51 dir: PathBuf,52 hashes: HashMap<String, String>,53}5455impl AgentBundle {56 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {57 let dir = dir.into();58 let hashes_path = dir.join("hashes");59 let raw = std::fs::read_to_string(&hashes_path)60 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;61 let mut hashes = HashMap::new();62 for line in raw.lines() {63 let line = line.trim();64 if line.is_empty() {65 continue;66 }67 let (arch, hash) = line68 .split_once(char::is_whitespace)69 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;70 hashes.insert(arch.to_owned(), hash.trim().to_owned());71 }72 ensure!(73 !hashes.is_empty(),74 "agent bundle {} has no hashes",75 dir.display()76 );77 Ok(Self { dir, hashes })78 }7980 fn binary(&self, arch: &str) -> PathBuf {81 self.dir.join(format!("remowt-agent-{arch}"))82 }8384 fn local_binary(&self) -> Result<PathBuf> {85 let arch = env::consts::ARCH;86 let path = self.binary(arch);87 ensure!(88 path.is_file(),89 "no local remowt-agent build for arch {arch} in bundle {}",90 self.dir.display()91 );92 Ok(path)93 }94}9596async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {97 let ch = sess.channel_open_session().await?;98 ch.exec(true, cmd).await?;99100 let mut child = SshExecChild::from_exec(ch);101 drop(child.stdin);102 drain_to_tracing(child.stderr, cmd.to_owned(), true);103104 let mut out = Vec::new();105 child.stdout.read_to_end(&mut out).await?;106 let code = child.exit.await.ok().flatten();107 Ok((code, out))108}109110async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {111 let (code, mut out) = run(sess, cmd).await?;112 ensure!(113 code == Some(0),114 "remote command failed (exit {code:?}): {cmd}"115 );116 if !out.is_empty() {117 ensure!(118 out.ends_with(b"\n"),119 "remote command was not newline-terminated: {cmd}: {out:?}"120 );121 out.pop();122 }123 String::from_utf8(out).context("expected utf8 output for command")124}125126async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {127 debug!("uname -a");128 let arch = run_string_ok(sess, "uname -m").await?;129 let hash = bundle130 .hashes131 .get(&arch)132 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133134 debug!("get dir");135 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;136 let dir = if cache.is_empty() {137 let home = run_string_ok(sess, "echo \"$HOME\"").await?;138 ensure!(139 !home.is_empty(),140 "remote $HOME and $XDG_CACHE_HOME both empty"141 );142 Utf8PathBuf::from(home).join(".cache/remowt")143 } else {144 Utf8PathBuf::from(cache).join("remowt")145 };146 let path = dir.join(hash);147148 debug!("presence");149 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;150 if present != Some(0) {151 let bin = bundle.binary(&arch);152 debug!("read");153 let bytes = fs::read(&bin)154 .await155 .with_context(|| format!("reading agent binary {}", bin.display()))?;156 debug!("upload");157 upload_agent(sess, &dir, &path, bytes).await?;158 }159 Ok(path)160}161162async fn upload_agent(163 sess: &Handle<SshHandler>,164 dir: &Utf8Path,165 path: &Utf8Path,166 bytes: Vec<u8>,167) -> Result<()> {168 debug!("mkdirp");169 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;170171 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));172 let ch = sess.channel_open_session().await?;173 debug!("cat");174 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;175176 let mut child = SshExecChild::from_exec(ch);177 child178 .stdin179 .write_all(&bytes)180 .await181 .context("sending agent binary")?;182 child183 .stdin184 .shutdown()185 .await186 .context("sending agent binary")?;187 let code = child.wait().await;188 ensure!(code == Some(0), "agent upload failed (exit {code:?})");189190 debug!("chmod");191 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;192 run_string_ok(193 sess,194 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),195 )196 .await?;197 Ok(())198}199200pub struct SshHandler {201 host: String,202 port: u16,203 subs: Subs,204}205impl Handler for SshHandler {206 type Error = russh::Error;207 async fn check_server_key(208 &mut self,209 server_public_key: &PublicKey,210 ) -> Result<bool, Self::Error> {211 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)212 }213 async fn server_channel_open_forwarded_streamlocal(214 &mut self,215 channel: Channel<Msg>,216 socket_path: &str,217 _session: &mut Session,218 ) -> Result<(), Self::Error> {219 let Some(ch) = self220 .subs221 .lock()222 .expect("lock")223 .remove(&Utf8PathBuf::from(socket_path))224 else {225 return Err(russh::Error::WrongChannel);226 };227 let _ = ch.send(channel);228 Ok(())229 }230}231232enum Transport {233 Ssh {234 sess: Arc<Handle<SshHandler>>,235 subs: Subs,236 runtime_dir: Utf8PathBuf,237 agent_path: Utf8PathBuf,238 },239 Local {240 agent_path: PathBuf,241 runtime_dir: Utf8PathBuf,242 },243}244245struct RemowtInner {246 transport: Transport,247 rpc: Rpc<BifConfig>,248 elevated: tokio::sync::OnceCell<()>,249 #[allow(dead_code)]250 children: Mutex<Vec<tokio::process::Child>>,251 _runtime_tmp: Option<TempDir>,252}253254#[derive(Clone)]255pub struct Remowt(Arc<RemowtInner>);256257pub type RemowtRemote = Remote<BifConfig>;258259impl Remowt {260 /// Connect to the remote host over ssh, detect the architecture and deploy the required261 /// agent binary.262 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {263 let conf = russh_config::parse_home(host)?;264 let port = conf.host_config.port.or(conf.port).unwrap_or(22);265 let hostname = conf266 .host_config267 .hostname268 .clone()269 .unwrap_or_else(|| conf.host_name.clone());270 let user = conf271 .user272 .clone()273 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));274275 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));276 let mut sess = connect(277 Arc::new(Config::default()),278 (hostname.clone(), port),279 SshHandler {280 host: hostname,281 port,282 subs: subs.clone(),283 },284 )285 .await?;286287 let mut agent = AgentClient::connect_env().await?;288 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();289 let mut authenticated = false;290 for ident in agent.request_identities().await? {291 let AgentIdentity::PublicKey { key, .. } = ident else {292 continue;293 };294 if sess295 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)296 .await?297 .success()298 {299 authenticated = true;300 break;301 }302 }303 ensure!(authenticated, "ssh authentication failed");304305 let sess = Arc::new(sess);306307 debug!("deploying agent");308 let agent_path = deploy_agent(&sess, bundle).await?;309310 debug!("runtime dir");311 let runtime_dir = remote_runtime_dir(&sess).await?;312313 let rpc = Rpc::<BifConfig>::new(Address::User);314315 let cmd_chan = sess.channel_open_session().await?;316 debug!("starting agent");317 cmd_chan318 .exec(true, format!("{} real-agent", sh_quote(&agent_path)))319 .await?;320321 let child = SshExecChild::from_exec(cmd_chan);322 drain_to_tracing(child.stderr, "agent".to_owned(), true);323 rpc.add_direct(324 Address::Agent,325 child_port(child.stdout, child.stdin),326 Rtt(0),327 );328329 Ok(Self(Arc::new(RemowtInner {330 transport: Transport::Ssh {331 sess,332 subs,333 runtime_dir,334 agent_path,335 },336 rpc,337 elevated: tokio::sync::OnceCell::new(),338 children: Mutex::new(Vec::new()),339 _runtime_tmp: None,340 })))341 }342343 /// "Connect" to the local machine's agent, by starting the agent binary locally.344 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {345 let agent_path = bundle.local_binary()?;346 let mut child = tokio::process::Command::new(&agent_path)347 .arg("real-agent")348 .arg("--local")349 .stdin(std::process::Stdio::piped())350 .stdout(std::process::Stdio::piped())351 .kill_on_drop(true)352 .spawn()353 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;354 let stdin = child.stdin.take().expect("stdin piped");355 let stdout = child.stdout.take().expect("stdout piped");356357 let rpc = Rpc::<BifConfig>::new(Address::User);358 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));359360 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;361362 Ok(Self(Arc::new(RemowtInner {363 transport: Transport::Local {364 agent_path,365 runtime_dir,366 },367 rpc,368 elevated: tokio::sync::OnceCell::new(),369 children: Mutex::new(vec![child]),370 _runtime_tmp: runtime_tmp,371 })))372 }373374 /// Get the handle to the underlying russh session handle.375 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {376 match &self.0.transport {377 Transport::Ssh { sess, .. } => Some(sess.clone()),378 Transport::Local { .. } => None,379 }380 }381382 pub fn rpc(&self) -> Rpc<BifConfig> {383 self.0.rpc.clone()384 }385386 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {387 let client: PluginEndpointsClient<BifConfig> = self.endpoints();388 client389 .load_plugin(id, name.to_owned())390 .await?391 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))392 }393 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {394 self.ensure_escalated().await?;395 let client: PluginEndpointsClient<BifConfig> =396 PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));397 client398 .load_plugin_path(id, path.to_owned())399 .await?400 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))401 }402 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {403 R::wrap(self.0.rpc.remote(Address::Plugin(id)))404 }405406 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {407 R::wrap(self.0.rpc.remote(Address::Agent))408 }409 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {410 self.ensure_escalated().await?;411 Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))412 }413414 async fn ensure_escalated(&self) -> Result<()> {415 self.0416 .elevated417 .get_or_try_init(|| async {418 let (agent_path, local) = match &self.0.transport {419 Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),420 Transport::Local { agent_path, .. } => (421 agent_path422 .to_str()423 .ok_or_else(|| anyhow!("local agent path is not utf-8"))?424 .to_owned(),425 true,426 ),427 };428429 let (tool, flags) = self.detect_escalation().await?;430 let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();431 args.push(agent_path);432 args.push("real-agent".to_owned());433 args.push("--privileged".to_owned());434 if local {435 args.push("--local".to_owned());436 }437438 let child = self439 .spawn(SpawnOptions {440 program: tool.to_owned(),441 args,442 stdin: StdioMode::Pipe,443 stdout: StdioMode::Pipe,444 stderr: StderrMode::Inherit,445 ..Default::default()446 })447 .await448 .context("spawning privileged agent")?;449450 let stdin = child451 .stdin452 .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;453 let stdout = child454 .stdout455 .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;456457 let port = child_port(stdout, stdin);458 self.0459 .rpc460 .add_direct(Address::AgentPrivileged, port, Rtt(0));461 anyhow::Ok(())462 })463 .await?;464 Ok(())465 }466467 async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {468 for (tool, flags) in ESCALATORS {469 let probe = self470 .spawn(SpawnOptions {471 program: (*tool).to_owned(),472 args: vec!["--version".to_owned()],473 stdout: StdioMode::Null,474 stderr: StderrMode::Null,475 ..Default::default()476 })477 .await;478 if let Ok(child) = probe {479 let _ = child.wait().await;480 return Ok((tool, flags));481 }482 }483 bail!("no escalation tool found")484 }485486 /// XDG_RUNTIME_DIR on the remote machine.487 pub fn runtime_dir(&self) -> Utf8PathBuf {488 match &self.0.transport {489 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),490 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),491 }492 }493494 /// Bind unix listener socket on the remote machine with auto-generated path, returning the path.495 pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {496 let sock = self497 .runtime_dir()498 .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));499 let listener = self.bind_unix(&sock).await?;500 Ok((listener, sock))501 }502503 /// Bind unix listener socket on the remote machine on the specified path.504 pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {505 match &self.0.transport {506 Transport::Ssh { sess, subs, .. } => {507 let (tx, rx) = oneshot::channel();508 subs.lock().expect("lock").insert(path.to_owned(), tx);509 sess.streamlocal_forward(path.to_owned()).await?;510 Ok(RemowtListener::Ssh(rx))511 }512 Transport::Local { .. } => {513 let _ = std::fs::remove_file(path);514 Ok(RemowtListener::Local(515 UnixListener::bind(path)?,516 path.to_owned(),517 ))518 }519 }520 }521}522523pub(crate) fn drain_to_tracing(524 stream: impl AsyncRead + Unpin + 'static + Send,525 context: String,526 stderr: bool,527) -> JoinHandle<()> {528 tokio::spawn(async move {529 let mut reader = BufReader::new(stream);530 let mut buf = Vec::with_capacity(4096);531 loop {532 buf.clear();533 match reader.read_until(b'\n', &mut buf).await {534 Ok(0) => break,535 Ok(_) => {536 let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));537 if stderr {538 warn!(context = %context, "{line}");539 } else {540 info!(context = %context, "{line}");541 }542 }543 Err(e) => {544 warn!(context = %context, "child stdio read failed: {e}");545 break;546 }547 }548 }549 })550}551552fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {553 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {554 if !dir.is_empty() {555 return Ok((Utf8PathBuf::from(dir), None));556 }557 }558 let tmp = tempfile::Builder::new()559 .prefix("remowt.")560 .rand_bytes(12)561 .tempdir()?;562 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())563 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;564 Ok((dir, Some(tmp)))565}566567async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {568 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;569 let dir = dir.trim();570 if dir.is_empty() {571 let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;572 Ok(Utf8PathBuf::from(tmp))573 } else {574 Ok(Utf8PathBuf::from(dir))575 }576}crates/remowt-client/src/subprocess.rsdiffbeforeafterboth--- a/crates/remowt-client/src/subprocess.rs
+++ b/crates/remowt-client/src/subprocess.rs
@@ -6,13 +6,13 @@
use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
use remowt_link_shared::BifConfig;
use serde::de::DeserializeOwned;
-use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};
+use tokio::io::AsyncWriteExt as _;
use tokio::select;
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
use tracing::{debug, info, warn};
use crate::forwarded::{RemowtListener, RemowtStream};
-use crate::Remowt;
+use crate::{drain_to_tracing, Remowt};
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum StdioMode {
@@ -62,12 +62,24 @@
client,
} = self;
drop(stdin);
- drop(stdout);
- drop(stderr);
- client
- .wait(id)
- .await?
- .map_err(|e| anyhow!("agent wait failed: {e}"))
+ let drain_out = async move {
+ if let Some(s) = stdout {
+ drain_to_tracing(s, "<child stdout>".to_owned(), false).await;
+ }
+ };
+ let drain_err = async move {
+ if let Some(s) = stderr {
+ drain_to_tracing(s, "<child stderr>".to_owned(), true).await;
+ }
+ };
+ let wait = async move {
+ client
+ .wait(id)
+ .await?
+ .map_err(|e| anyhow!("agent wait failed: {e}"))
+ };
+ let (code, _, _) = tokio::join!(wait, drain_out, drain_err);
+ code
}
pub async fn kill(&self, signal: i32) -> Result<()> {
@@ -163,7 +175,7 @@
);
let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
- let stdout_stream = handle_output(stdout, stdout_res?, &program, false);
+ let stdout_stream = handle_output(stdout, stdout_res?, &program);
let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
Ok(RemowtChild {
@@ -215,18 +227,13 @@
}
}
-fn handle_output(
- mode: StdioMode,
- s: Option<RemowtStream>,
- program: &str,
- is_stderr: bool,
-) -> Option<RemowtStream> {
+fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
match mode {
StdioMode::Pipe => s,
StdioMode::Inherit => {
if let Some(s) = s {
let program = program.to_owned();
- tokio::spawn(pump_to_tracing(s, program, is_stderr));
+ tokio::spawn(drain_to_tracing(s, program, false));
}
None
}
@@ -244,7 +251,7 @@
StderrMode::Inherit => {
if let Some(s) = s {
let program = program.to_owned();
- tokio::spawn(pump_to_tracing(s, program, true));
+ tokio::spawn(drain_to_tracing(s, program, true));
}
None
}
@@ -252,26 +259,6 @@
}
}
-async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {
- let mut reader = BufReader::new(stream).lines();
- loop {
- match reader.next_line().await {
- Ok(Some(line)) => {
- if is_stderr {
- warn!(program, "{line}");
- } else {
- info!(program, "{line}");
- }
- }
- Ok(None) => break,
- Err(e) => {
- warn!(program, "child log stream error: {e}");
- break;
- }
- }
- }
-}
-
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
@@ -422,6 +409,27 @@
}
};
+ while let Some(e) = err.next().await {
+ if let Ok(line) = e {
+ warn!(program = %program, "{line}");
+ }
+ }
+ if want_stdout {
+ if let Some(out_bytes) = out_bytes.as_mut() {
+ while let Some(o) = out_bytes.next().await {
+ if let Ok(chunk) = o {
+ buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);
+ }
+ }
+ }
+ } else if let Some(out_lines) = out_lines.as_mut() {
+ while let Some(o) = out_lines.next().await {
+ if let Ok(line) = o {
+ info!(program = %program, "{line}");
+ }
+ }
+ }
+
match exit {
Some(0) => Ok(buf),
Some(c) => bail!("command '{line}' failed with status {c}"),
crates/remowt-link-shared/src/port.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/port.rs
+++ b/crates/remowt-link-shared/src/port.rs
@@ -3,10 +3,8 @@
use bifrostlink::Port;
use bytes::{Bytes, BytesMut};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+use tokio::select;
-/// Wire a length-prefixed duplex byte stream (e.g. a child process's
-/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
-/// length followed by that many payload bytes.
pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
where
R: AsyncRead + Unpin + Send + 'static,
@@ -18,13 +16,13 @@
let len = match reader.read_u32().await {
Ok(len) => len,
Err(e) => {
- tracing::error!("child read failed: {e}");
+ log_read_end(&e);
break;
}
};
let mut buf = BytesMut::zeroed(len as usize);
if let Err(e) = reader.read_exact(&mut buf).await {
- tracing::error!("child read failed: {e}");
+ log_read_end(&e);
break;
}
if tx.send(buf.freeze()).is_err() {
@@ -35,15 +33,45 @@
let write_task = async move {
while let Some(msg) = rx.recv().await {
if let Err(e) = write_frame(&mut writer, msg).await {
- tracing::error!("child write failed: {e}");
+ log_write_end(&e);
break;
}
}
};
- tokio::join!(read_task, write_task);
+ select! {
+ _ = read_task => {},
+ _ = write_task => {},
+ }
})
}
+fn log_read_end(e: &io::Error) {
+ if matches!(
+ e.kind(),
+ io::ErrorKind::UnexpectedEof
+ | io::ErrorKind::BrokenPipe
+ | io::ErrorKind::ConnectionReset
+ | io::ErrorKind::ConnectionAborted
+ ) {
+ tracing::debug!("child read ended: {e}");
+ } else {
+ tracing::error!("child read failed: {e}");
+ }
+}
+
+fn log_write_end(e: &io::Error) {
+ if matches!(
+ e.kind(),
+ io::ErrorKind::BrokenPipe
+ | io::ErrorKind::ConnectionReset
+ | io::ErrorKind::ConnectionAborted
+ ) {
+ tracing::debug!("child write ended: {e}");
+ } else {
+ tracing::error!("child write failed: {e}");
+ }
+}
+
async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
let len = u32::try_from(msg.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;