difftreelog
feat subprocess through agent
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth308308309[[package]]309[[package]]310name = "bifrostlink"310name = "bifrostlink"311version = "0.2.2"311version = "0.2.3"312source = "registry+https://github.com/rust-lang/crates.io-index"312source = "registry+https://github.com/rust-lang/crates.io-index"313checksum = "4bd6d4c7cf73270cbf6846a42361d409df23262d276ff15110e787067d89ad1d"313checksum = "704657867d2107831c57edd363726440c86675464b5fc113702854e17062cafc"314dependencies = [314dependencies = [315 "async-trait",315 "async-trait",316 "async_fn_traits",316 "async_fn_traits",327327328[[package]]328[[package]]329name = "bifrostlink-macros"329name = "bifrostlink-macros"330version = "0.2.2"330version = "0.2.3"331source = "registry+https://github.com/rust-lang/crates.io-index"331source = "registry+https://github.com/rust-lang/crates.io-index"332checksum = "329813d3e34c7e65638480cefbb09d1e1dc47fa5dbf0f5f0cf8d9eee424f4f19"332checksum = "158308eb569b467c0116680f79d0ecc389f4d540f6d5a0c9279bfe79b1cd5bdb"333dependencies = [333dependencies = [334 "proc-macro2",334 "proc-macro2",335 "quote",335 "quote",338338339[[package]]339[[package]]340name = "bifrostlink-ports"340name = "bifrostlink-ports"341version = "0.2.2"341version = "0.2.3"342source = "registry+https://github.com/rust-lang/crates.io-index"342source = "registry+https://github.com/rust-lang/crates.io-index"343checksum = "d6683eb0d4366b762fc45c0eb14e384d3554a23bc1ba3f1246368900d41e3700"343checksum = "7612993f0bd8bc6a71867461266567212a35a716b2a5aef5f9967ab08c891782"344dependencies = [344dependencies = [345 "bifrostlink",345 "bifrostlink",346 "bytes",346 "bytes",183518351836[[package]]1836[[package]]1837name = "polkit-backend"1837name = "polkit-backend"1838version = "0.1.1"1838version = "0.1.3"1839dependencies = [1839dependencies = [1840 "anyhow",1840 "anyhow",1841 "clap",1841 "clap",205520552056[[package]]2056[[package]]2057name = "remowt-agent"2057name = "remowt-agent"2058version = "0.1.1"2058version = "0.1.3"2059dependencies = [2059dependencies = [2060 "anyhow",2060 "anyhow",2061 "bifrostlink",2061 "bifrostlink",208320832084[[package]]2084[[package]]2085name = "remowt-client"2085name = "remowt-client"2086version = "0.1.1"2086version = "0.1.3"2087dependencies = [2087dependencies = [2088 "anyhow",2088 "anyhow",2089 "bifrostlink",2089 "bifrostlink",2090 "bifrostlink-ports",2090 "bifrostlink-ports",2091 "bytes",2091 "bytes",2092 "camino",2092 "camino",2093 "futures",2093 "remowt-endpoints",2094 "remowt-endpoints",2094 "remowt-link-shared",2095 "remowt-link-shared",2095 "russh",2096 "russh",2098 "serde_json",2099 "serde_json",2099 "tempfile",2100 "tempfile",2100 "tokio",2101 "tokio",2102 "tokio-util",2101 "tracing",2103 "tracing",2102 "uuid",2104 "uuid",2103]2105]210421062105[[package]]2107[[package]]2106name = "remowt-endpoints"2108name = "remowt-endpoints"2107version = "0.1.1"2109version = "0.1.3"2108dependencies = [2110dependencies = [2109 "anyhow",2111 "anyhow",2110 "bifrostlink",2112 "bifrostlink",212221242123[[package]]2125[[package]]2124name = "remowt-link-shared"2126name = "remowt-link-shared"2125version = "0.1.1"2127version = "0.1.3"2126dependencies = [2128dependencies = [2127 "bifrostlink",2129 "bifrostlink",2128 "bytes",2130 "bytes",213621382137[[package]]2139[[package]]2138name = "remowt-plugin"2140name = "remowt-plugin"2139version = "0.1.1"2141version = "0.1.3"2140dependencies = [2142dependencies = [2141 "anyhow",2143 "anyhow",2142 "bifrostlink",2144 "bifrostlink",215021522151[[package]]2153[[package]]2152name = "remowt-polkit-shared"2154name = "remowt-polkit-shared"2153version = "0.1.1"2155version = "0.1.3"2154dependencies = [2156dependencies = [2155 "nix",2157 "nix",2156 "serde",2158 "serde",215921612160[[package]]2162[[package]]2161name = "remowt-ssh"2163name = "remowt-ssh"2162version = "0.1.1"2164version = "0.1.3"2163dependencies = [2165dependencies = [2164 "anyhow",2166 "anyhow",2165 "async-trait",2167 "async-trait",218721892188[[package]]2190[[package]]2189name = "remowt-ui-prompt"2191name = "remowt-ui-prompt"2190version = "0.1.1"2192version = "0.1.3"2191dependencies = [2193dependencies = [2192 "bifrostlink",2194 "bifrostlink",2193 "bifrostlink-macros",2195 "bifrostlink-macros",Cargo.tomldiffbeforeafterboth3resolver = "2"3resolver = "2"445[workspace.package]5[workspace.package]6version = "0.1.1"6version = "0.1.3"7license = "MIT"7license = "MIT"8edition = "2021"8edition = "2021"9repository = "https://gitlab.delta.directory/iam/remowt"9repository = "https://git.delta.rocks/r/remowt"101011[workspace.dependencies]11[workspace.dependencies]12remowt-client = { version = "0.1.1", path = "crates/remowt-client" }12remowt-client = { version = "0.1.3", path = "crates/remowt-client" }13remowt-polkit-shared = { version = "0.1.1", path = "crates/polkit-shared" }13remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }14remowt-link-shared = { version = "0.1.1", path = "crates/remowt-link-shared" }14remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }15remowt-plugin = { version = "0.1.1", path = "crates/remowt-plugin" }15remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }16remowt-ui-prompt = { version = "0.1.1", path = "crates/remowt-ui-prompt" }16remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }17remowt-endpoints = { version = "0.1.1", path = "crates/remowt-endpoints" }17remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }181819bifrostlink = "0.2.0"19bifrostlink = "0.2.0"20bifrostlink-macros = "0.2.0"20bifrostlink-macros = "0.2.0"cmds/remowt-agent/src/main.rsdiffbeforeafterboth11use bifrostlink_ports::stdio::from_stdio;11use bifrostlink_ports::stdio::from_stdio;12use bifrostlink_ports::unix_socket::from_socket;12use bifrostlink_ports::unix_socket::from_socket;13use clap::Parser;13use clap::Parser;14use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};14use remowt_endpoints::{fs::Fs, nix_daemon::NixDaemon, pty::Pty, subprocess::Subprocess, systemd::Systemd};15use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};15use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};16use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};16use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};17use remowt_ui_prompt::bifrost::PromptEndpointsClient;17use remowt_ui_prompt::bifrost::PromptEndpointsClient;21use tokio::net::UnixStream;21use tokio::net::UnixStream;22use tokio::runtime::Builder;22use tokio::runtime::Builder;23use tokio::task::AbortHandle;23use tokio::task::AbortHandle;24use tracing::{debug, info, trace};24use tracing::{debug, trace};25use zbus::fdo;25use zbus::fdo;26use zbus::zvariant::{OwnedValue, Str};26use zbus::zvariant::{OwnedValue, Str};27use zbus::{interface, proxy, Connection};27use zbus::{interface, proxy, Connection};85 identities: Vec<Identity>,85 identities: Vec<Identity>,86 ) -> zbus::fdo::Result<()> {86 ) -> zbus::fdo::Result<()> {87 use std::fmt::Write;87 use std::fmt::Write;88 info!("begin auth");88 debug!("begin auth");89 let _cancel_guard = Arc::new(OnceLock::new());89 let _cancel_guard = Arc::new(OnceLock::new());90 let task = {90 let task = {91 let helper = self.helper.clone();91 let helper = self.helper.clone();220 /// Expect own address to be AgentPrivileged, skip installing polkit agent220 /// Expect own address to be AgentPrivileged, skip installing polkit agent221 #[arg(long)]221 #[arg(long)]222 privileged: bool,222 privileged: bool,223 #[arg(long)]224 local: bool,223 },225 },224 LocalAgent,226 LocalAgent,225}227}242 Opts::Editor { path } => runtime.block_on(editor::edit(path)),244 Opts::Editor { path } => runtime.block_on(editor::edit(path)),243 Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),245 Opts::RealAgent {246 path,247 privileged,248 local,249 } => runtime.block_on(main_real_agent(path, privileged, local)),244 }250 }245}251}246async fn main_real() -> anyhow::Result<()> {252async fn main_real() -> anyhow::Result<()> {255}261}256async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {262async fn main_real_agent(263 path: Option<PathBuf>,264 privileged: bool,265 local: bool,266) -> anyhow::Result<()> {257 let address = if privileged {267 let address = if privileged {258 Address::AgentPrivileged268 Address::AgentPrivileged264 Fs::new().register_endpoints(&mut rpc);274 Fs::new().register_endpoints(&mut rpc);265 Systemd.register_endpoints(&mut rpc);275 Systemd.register_endpoints(&mut rpc);266 Pty::new().register_endpoints(&mut rpc);276 Pty::new().register_endpoints(&mut rpc);277 Subprocess::new().register_endpoints(&mut rpc);278 NixDaemon.register_endpoints(&mut rpc);267279268 remowt_plugin::host::serve(&mut rpc);280 remowt_plugin::host::serve(&mut rpc);269281312 };324 };313 rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));325 rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));314326315 let polkit_conn = if !privileged {327 let polkit_conn = if !privileged && !local {316 // The unprivileged agent doubles as a polkit authentication agent so328 // The unprivileged agent doubles as a polkit authentication agent so317 // `run0` (e.g. our own elevation) routes its prompt to the User over329 // `run0` (e.g. our own elevation) routes its prompt to the User over318 // bifrost instead of failing on a tty-less session.330 // bifrost instead of failing on a tty-less session.cmds/remowt-ssh/Cargo.tomldiffbeforeafterboth11tracing-subscriber.workspace = true11tracing-subscriber.workspace = true12bifrostlink.workspace = true12bifrostlink.workspace = true13remowt-link-shared.workspace = true13remowt-link-shared.workspace = true14remowt-client = { workspace = true, features = ["shell"] }14remowt-client.workspace = true15tokio = { workspace = true, features = [15tokio = { workspace = true, features = [16 "macros",16 "macros",17 "fs",17 "fs",cmds/remowt-ssh/src/main.rsdiffbeforeafterboth25enum Opts {25enum Opts {26 /// Connect to remote host with remowt agent.26 /// Connect to remote host with remowt agent.27 Ssh { host: String },27 Ssh {28 host: String,29 #[arg(long)]30 escalate: bool,31 },28 /// Connect to local host for testing the connectivity.32 /// Connect to local host for testing the connectivity.29 Local,33 Local {34 #[arg(long)]35 escalate: bool,36 },30}37}313832fn agents_dir() -> anyhow::Result<PathBuf> {39fn agents_dir() -> anyhow::Result<PathBuf> {45 let opts = Opts::parse();52 let opts = Opts::parse();465347 let bundle = AgentBundle::from_dir(agents_dir()?)?;54 let bundle = AgentBundle::from_dir(agents_dir()?)?;48 let conn = match &opts {55 let (conn, escalate) = match &opts {49 Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,56 Opts::Ssh { host, escalate } => (Remowt::connect(host, &bundle).await?, *escalate),50 Opts::Local => Remowt::connect_local(&bundle).await?,57 Opts::Local { escalate } => (Remowt::connect_local(&bundle).await?, *escalate),51 };58 };52 let mut rpc = conn.rpc();59 let mut rpc = conn.rpc();536056 PrependSourcePrompter {63 PrependSourcePrompter {57 prompter: RofiPrompter,64 prompter: RofiPrompter,58 source: match opts {65 source: match opts {59 Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],66 Opts::Ssh { host, .. } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],60 Opts::Local => vec![],67 Opts::Local { .. } => vec![],61 },68 },62 description: "".to_owned(),69 description: "".to_owned(),63 },70 },67 }74 }687569 debug!("entering shell");76 debug!("entering shell");70 run_shell(&conn).await?;77 run_shell(&conn, escalate).await?;71 debug!("shell ended");78 debug!("shell ended");727973 Ok(())80 Ok(())74}81}758276async fn run_shell(conn: &Remowt) -> anyhow::Result<()> {83async fn run_shell(conn: &Remowt, escalate: bool) -> anyhow::Result<()> {77 let term = match std::env::var("TERM") {84 let term = match std::env::var("TERM") {78 Ok(v) => v,85 Ok(v) => v,79 Err(VarError::NotPresent) => "xterm-256color".to_owned(),86 Err(VarError::NotPresent) => "xterm-256color".to_owned(),80 Err(e) => return Err(e.into()),87 Err(e) => return Err(e.into()),81 };88 };82 let (cols, rows) = term_size().unwrap_or((80, 24));89 let (cols, rows) = term_size().unwrap_or((80, 24));839084 let shell = conn.open_shell(&term, cols, rows).await?;91 let shell = conn.open_shell(&term, cols, rows, escalate).await?;85 let resizer = shell.resizer();92 let resizer = shell.resizer();86 let stream = shell.stream;93 let stream = shell.stream;8794crates/remowt-client/Cargo.tomldiffbeforeafterboth27] }27] }28tracing.workspace = true28tracing.workspace = true29uuid = { workspace = true, features = ["v4"] }29uuid = { workspace = true, features = ["v4"] }30remowt-endpoints = { workspace = true, optional = true }30remowt-endpoints.workspace = true3131tokio-util = { workspace = true, features = ["codec"] }32[features]32futures.workspace = true33shell = ["dep:remowt-endpoints"]3433crates/remowt-client/src/lib.rsdiffbeforeafterboth6use anyhow::{anyhow, bail, ensure, Context as _, Result};6use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};8use bifrostlink::{Remote, Rpc, Rtt};9use bifrostlink_ports::unix_socket::from_socket;10use camino::{Utf8Path, Utf8PathBuf};9use camino::{Utf8Path, Utf8PathBuf};11use remowt_link_shared::plugin::PluginEndpointsClient;10use remowt_link_shared::plugin::PluginEndpointsClient;12use remowt_link_shared::port::child_port;11use remowt_link_shared::port::child_port;19use russh::Channel;18use russh::Channel;20use tempfile::TempDir;19use tempfile::TempDir;21use tokio::net::UnixListener;20use tokio::net::UnixListener;22use tokio::sync::oneshot::{self, channel};21use tokio::sync::oneshot;23use tokio::{22use tokio::{24 fs,23 fs,25 io::{AsyncReadExt as _, AsyncWriteExt as _},24 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},26};25};27use tracing::{debug, error};26use tracing::{debug, warn};28use uuid::Uuid;27use uuid::Uuid;292830use self::port::channel_port;31use self::subprocess::RemowtChild;3233pub mod editor;29pub mod editor;34mod forwarded;30mod forwarded;35mod port;36#[cfg(feature = "shell")]37mod shell;31mod shell;32mod ssh_exec;38mod subprocess;33mod subprocess;393435use self::ssh_exec::SshExecChild;36pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};40pub use forwarded::{RemowtListener, RemowtStream};37pub use forwarded::{RemowtListener, RemowtStream};41#[cfg(feature = "shell")]42pub use shell::{RemowtShell, RemowtShellResizer};38pub use shell::{RemowtShell, RemowtShellResizer};433944type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;40type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;99 let ch = sess.channel_open_session().await?;95 let ch = sess.channel_open_session().await?;100 ch.exec(true, cmd).await?;96 ch.exec(true, cmd).await?;10197102 let mut child = RemowtChild::from_exec(ch);98 let mut child = SshExecChild::from_exec(ch);103 drop(child.stdin);99 drop(child.stdin);100 drain_stderr(child.stderr, cmd.to_owned());104101105 let mut out = Vec::new();102 let mut out = Vec::new();106 let mut err = Vec::new();103 child.stdout.read_to_end(&mut out).await?;107 tokio::try_join!(108 child.stdout.read_to_end(&mut out),109 child.stderr.read_to_end(&mut err),110 )?;111 if !err.is_empty() {112 error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());113 }114 let code = child.exit.await.ok().flatten();104 let code = child.exit.await.ok().flatten();115 Ok((code, out))105 Ok((code, out))116}106}140 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;130 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;141131142 debug!("get dir");132 debug!("get dir");143 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")133 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;144 .await?145 .to_owned();146 let dir = if cache.is_empty() {134 let dir = if cache.is_empty() {147 let home = run_string_ok(sess, "echo \"$HOME\"").await?;135 let home = run_string_ok(sess, "echo \"$HOME\"").await?;148 ensure!(136 ensure!(183 debug!("cat");171 debug!("cat");184 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;172 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;185173186 let mut child = RemowtChild::from_exec(ch);174 let mut child = SshExecChild::from_exec(ch);187 child175 child188 .stdin176 .stdin189 .write_all(&bytes)177 .write_all(&bytes)207 Ok(())195 Ok(())208}196}209197210async fn detect_escalation(211 sess: &Handle<SshHandler>,212) -> Result<(&'static str, &'static [&'static str])> {213 for (tool, flags) in ESCALATORS {214 // `tool` is a fixed identifier (no metacharacters), safe to interpolate.215 let (code, _) = run(sess, &format!("command -v {tool}")).await?;216 if code == Some(0) {217 return Ok((tool, flags));218 }219 }220 bail!("no escalation tool found on remote")221}222223fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {224 let mut parts = vec![tool.to_owned()];225 parts.extend(flags.iter().map(|f| f.to_string()));226 parts.push(sh_quote(agent_path));227 parts.push("real-agent".to_owned());228 parts.push("--privileged".to_owned());229 if let Some(p) = path {230 parts.push("--path".to_owned());231 parts.push(sh_quote(p));232 }233 parts.join(" ")234}235236fn find_in_path(name: &str) -> Option<std::path::PathBuf> {237 let path = env::var_os("PATH")?;238 env::split_paths(&path)239 .map(|dir| dir.join(name))240 .find(|p| p.is_file())241}242243pub struct SshHandler {198pub struct SshHandler {244 host: String,199 host: String,245 port: u16,200 port: u16,285 },240 },286}241}287242288pub struct Remowt {243struct RemowtInner {289 transport: Transport,244 transport: Transport,290 rpc: Rpc<BifConfig>,245 rpc: Rpc<BifConfig>,291 elevated: tokio::sync::OnceCell<()>,246 elevated: tokio::sync::OnceCell<()>,247 #[allow(dead_code)]292 children: Mutex<Vec<tokio::process::Child>>,248 children: Mutex<Vec<tokio::process::Child>>,293 _runtime_tmp: Option<TempDir>,249 _runtime_tmp: Option<TempDir>,294}250}295251252#[derive(Clone)]253pub struct Remowt(Arc<RemowtInner>);254296pub type RemowtRemote = Remote<BifConfig>;255pub type RemowtRemote = Remote<BifConfig>;297256298impl Remowt {257impl Remowt {258 /// Connect to the remote host over ssh, detect the architecture and deploy the required259 /// agent binary.299 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {260 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {300 let conf = russh_config::parse_home(host)?;261 let conf = russh_config::parse_home(host)?;301 let port = conf.host_config.port.or(conf.port).unwrap_or(22);262 let port = conf.host_config.port.or(conf.port).unwrap_or(22);346307347 debug!("runtime dir");308 debug!("runtime dir");348 let runtime_dir = remote_runtime_dir(&sess).await?;309 let runtime_dir = remote_runtime_dir(&sess).await?;349 let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));350310351 let (onetx, onerx) = channel();352 subs.lock().expect("lock").insert(primary.clone(), onetx);353 sess.streamlocal_forward(primary.clone()).await?;354355 let rpc = Rpc::<BifConfig>::new(Address::User);311 let rpc = Rpc::<BifConfig>::new(Address::User);356312357 // TODO: ensure no injection is possible in the socket path.358 let cmd_chan = sess.channel_open_session().await?;313 let cmd_chan = sess.channel_open_session().await?;359 debug!("starting agent");314 debug!("starting agent");360 cmd_chan315 cmd_chan361 .exec(316 .exec(true, format!("{} real-agent", sh_quote(&agent_path)))362 true,363 format!(364 "{} real-agent --path={}",365 sh_quote(&agent_path),366 sh_quote(&primary)367 ),368 )369 .await?;317 .await?;370318371 let port = channel_port(319 let child = SshExecChild::from_exec(cmd_chan);372 onerx320 drain_stderr(child.stderr, "agent".to_owned());321 rpc.add_direct(373 .await322 Address::Agent,323 child_port(child.stdout, child.stdin),374 .map_err(|_| anyhow!("agent never opened its channel"))?,324 Rtt(0),375 );325 );376 rpc.add_direct(Address::Agent, port, Rtt(0));377326378 Ok(Self {327 Ok(Self(Arc::new(RemowtInner {379 transport: Transport::Ssh {328 transport: Transport::Ssh {380 sess,329 sess,381 subs,330 subs,386 elevated: tokio::sync::OnceCell::new(),335 elevated: tokio::sync::OnceCell::new(),387 children: Mutex::new(Vec::new()),336 children: Mutex::new(Vec::new()),388 _runtime_tmp: None,337 _runtime_tmp: None,389 })338 })))390 }339 }391340341 /// "Connect" to the local machine's agent, by starting the agent binary locally.392 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {342 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {393 let agent_path = bundle.local_binary()?;343 let agent_path = bundle.local_binary()?;394 let mut child = tokio::process::Command::new(&agent_path)344 let mut child = tokio::process::Command::new(&agent_path)395 .arg("real-agent")345 .arg("real-agent")346 .arg("--local")396 .stdin(std::process::Stdio::piped())347 .stdin(std::process::Stdio::piped())397 .stdout(std::process::Stdio::piped())348 .stdout(std::process::Stdio::piped())398 .kill_on_drop(true)349 .kill_on_drop(true)406357407 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;358 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;408359409 Ok(Self {360 Ok(Self(Arc::new(RemowtInner {410 transport: Transport::Local {361 transport: Transport::Local {411 agent_path,362 agent_path,412 runtime_dir,363 runtime_dir,415 elevated: tokio::sync::OnceCell::new(),366 elevated: tokio::sync::OnceCell::new(),416 children: Mutex::new(vec![child]),367 children: Mutex::new(vec![child]),417 _runtime_tmp: runtime_tmp,368 _runtime_tmp: runtime_tmp,418 })369 })))419 }370 }420371372 /// Get the handle to the underlying russh session handle.421 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {373 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {422 match &self.transport {374 match &self.0.transport {423 Transport::Ssh { sess, .. } => Some(sess.clone()),375 Transport::Ssh { sess, .. } => Some(sess.clone()),424 Transport::Local { .. } => None,376 Transport::Local { .. } => None,425 }377 }426 }378 }427379428 pub fn rpc(&self) -> Rpc<BifConfig> {380 pub fn rpc(&self) -> Rpc<BifConfig> {429 self.rpc.clone()381 self.0.rpc.clone()430 }382 }431383432 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {433 R::wrap(self.rpc.remote(Address::Agent))434 }435436 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {384 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {437 let client: PluginEndpointsClient<BifConfig> = self.endpoints();385 let client: PluginEndpointsClient<BifConfig> = self.endpoints();438 client386 client441 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))389 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))442 }390 }443 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {391 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {444 self.ensure_elevated().await?;392 self.ensure_escalated().await?;445 let client: PluginEndpointsClient<BifConfig> =393 let client: PluginEndpointsClient<BifConfig> =446 PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));394 PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));447 client395 client448 .load_plugin_path(id, path.to_owned())396 .load_plugin_path(id, path.to_owned())449 .await?397 .await?450 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))398 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))451 }399 }452 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {400 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {453 R::wrap(self.rpc.remote(Address::Plugin(id)))401 R::wrap(self.0.rpc.remote(Address::Plugin(id)))454 }402 }403404 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {405 R::wrap(self.0.rpc.remote(Address::Agent))406 }455 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {407 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {456 self.ensure_elevated().await?;408 self.ensure_escalated().await?;457 Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))409 Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))458 }410 }459411460 async fn ensure_elevated(&self) -> Result<()> {412 async fn ensure_escalated(&self) -> Result<()> {461 self.elevated413 self.0414 .elevated462 .get_or_try_init(|| async {415 .get_or_try_init(|| async {463 let port = match &self.transport {416 let (agent_path, local) = match &self.0.transport {464 Transport::Ssh {417 Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),465 sess, agent_path, ..466 } => {467 let (tool, flags) = detect_escalation(sess).await?;468 let ch = sess.channel_open_session().await?;469 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))470 .await?;418 Transport::Local { agent_path, .. } => (471 channel_port(ch)472 }473 Transport::Local { agent_path, .. } => {474 let sock = self475 .runtime_dir()476 .join(format!("remowt-priv-{}.sock", Uuid::new_v4()));419 agent_path477 let _ = std::fs::remove_file(&sock);420 .to_str()478 let listener = UnixListener::bind(&sock)?;479 let (tool, flags) = ESCALATORS480 .iter()481 .find(|(t, _)| find_in_path(t).is_some())421 .ok_or_else(|| anyhow!("local agent path is not utf-8"))?482 .ok_or_else(|| anyhow!("no escalation tool found"))?;483 let child = tokio::process::Command::new(tool)484 .args(*flags)485 .arg(agent_path)486 .arg("real-agent")487 .arg("--privileged")488 .arg("--path")422 .to_owned(),489 .arg(sock.as_str())423 true,490 .kill_on_drop(true)491 .spawn()?;492 self.children.lock().expect("lock").push(child);493 let (stream, _) = listener.accept().await?;494 let _ = std::fs::remove_file(&sock);424 ),495 from_socket(stream)496 }497 };425 };426498 self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));427 let (tool, flags) = self.detect_escalation().await?;428 let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();429 args.push(agent_path);430 args.push("real-agent".to_owned());431 args.push("--privileged".to_owned());432 if local {433 args.push("--local".to_owned());434 }435436 let child = self437 .spawn(SpawnOptions {438 program: tool.to_owned(),439 args,440 stdin: StdioMode::Pipe,441 stdout: StdioMode::Pipe,442 stderr: StderrMode::Inherit,443 ..Default::default()444 })445 .await446 .context("spawning privileged agent")?;447448 let stdin = child449 .stdin450 .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;451 let stdout = child452 .stdout453 .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;454455 let port = child_port(stdout, stdin);456 self.0457 .rpc458 .add_direct(Address::AgentPrivileged, port, Rtt(0));499 anyhow::Ok(())459 anyhow::Ok(())500 })460 })501 .await?;461 .await?;502 Ok(())462 Ok(())503 }463 }504464505 pub async fn exec(&self, command: String) -> Result<RemowtChild> {465 async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {506 let Some(sess) = self.ssh() else {466 for (tool, flags) in ESCALATORS {467 let probe = self468 .spawn(SpawnOptions {507 bail!("exec should not be called on local")469 program: (*tool).to_owned(),470 args: vec!["--version".to_owned()],471 stdout: StdioMode::Null,472 stderr: StderrMode::Null,473 ..Default::default()508 };474 })475 .await;509 let ch = sess.channel_open_session().await?;476 if let Ok(child) = probe {477 let _ = child.wait().await;510 ch.exec(true, command).await?;478 return Ok((tool, flags));511 Ok(RemowtChild::from_exec(ch))479 }480 }481 bail!("no escalation tool found")512 }482 }513483514 fn runtime_dir(&self) -> Utf8PathBuf {484 /// XDG_RUNTIME_DIR on the remote machine.485 pub fn runtime_dir(&self) -> Utf8PathBuf {515 match &self.transport {486 match &self.0.transport {516 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),487 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),517 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),488 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),518 }489 }519 }490 }520491521 pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {492 /// Bind unix listener socket on the remote machine with auto-generated path, returning the path.493 pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {494 let sock = self495 .runtime_dir()496 .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));497 let listener = self.bind_unix(&sock).await?;498 Ok((listener, sock))499 }500501 /// Bind unix listener socket on the remote machine on the specified path.502 pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {522 match &self.transport {503 match &self.0.transport {523 Transport::Ssh { sess, subs, .. } => {504 Transport::Ssh { sess, subs, .. } => {524 let (tx, rx) = oneshot::channel();505 let (tx, rx) = oneshot::channel();525 subs.lock().expect("lock").insert(path.to_owned(), tx);506 subs.lock().expect("lock").insert(path.to_owned(), tx);537 }518 }538}519}539520521fn drain_stderr(stream: DuplexStream, context: String) {522 tokio::spawn(async move {523 let mut reader = BufReader::new(stream).lines();524 loop {525 match reader.next_line().await {526 Ok(Some(line)) => warn!(context = %context, "{line}"),527 Ok(None) => break,528 Err(e) => {529 warn!(context = %context, "stderr read failed: {e}");530 break;531 }532 }533 }534 });535}536540fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {537fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {541 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {538 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {542 if !dir.is_empty() {539 if !dir.is_empty() {556 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;553 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;557 let dir = dir.trim();554 let dir = dir.trim();558 if dir.is_empty() {555 if dir.is_empty() {559 remote_mktemp(sess).await556 let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;557 Ok(Utf8PathBuf::from(tmp))560 } else {558 } else {561 Ok(Utf8PathBuf::from(dir))559 Ok(Utf8PathBuf::from(dir))562 }563}564565async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {566 let mut cmd_chan = sess.channel_open_session().await?;567 cmd_chan568 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")569 .await?;570 let mut stdout = vec![];571 loop {572 let Some(msg) = cmd_chan.wait().await else {573 bail!("unexpected channel end");574 };575 match msg {576 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),577 russh::ChannelMsg::ExitStatus { exit_status } => {578 if exit_status != 0 {579 bail!("mktemp failed");580 }581 break;582 }583 _ => {}584 }585 }560 }586 ensure!(stdout.ends_with(b"\n"));587 stdout.pop();588 Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))589}561}590562crates/remowt-client/src/port.rsdiffbeforeafterbothno changes
crates/remowt-client/src/shell.rsdiffbeforeafterboth3use bifrostlink::Remote;3use bifrostlink::Remote;4use remowt_endpoints::pty::{PtyClient, ShellId};4use remowt_endpoints::pty::{PtyClient, ShellId};5use remowt_link_shared::{Address, BifConfig};5use remowt_link_shared::{Address, BifConfig};6use uuid::Uuid;768use crate::forwarded::RemowtStream;7use crate::forwarded::RemowtStream;9use crate::Remowt;8use crate::Remowt;393840impl Remowt {39impl Remowt {41 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {40 pub async fn open_shell(42 let sock = self41 &self,43 .runtime_dir()42 term: &str,44 .join(format!("remowt-shell-{}.sock", Uuid::new_v4()));43 cols: u16,4544 rows: u16,45 escalate: bool,46 ) -> Result<RemowtShell> {46 let forwarded = self.forward_socket(&sock).await?;47 let (forwarded, path) = self.bind_runtime_unix("shell").await?;47 let client: PtyClient<BifConfig> = self.endpoints();48 let client: PtyClient<BifConfig> = if escalate {49 self.run0_endpoints().await?50 } else {51 self.endpoints()52 };48 let id = client53 let id = client49 .open_shell(sock, term.to_owned(), cols, rows)54 .open_shell(path, term.to_owned(), cols, rows)50 .await?55 .await?51 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;56 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;52 let stream = forwarded.accept().await?;57 let stream = forwarded.accept().await?;535854 Ok(RemowtShell {59 Ok(RemowtShell {55 id,60 id,56 stream,61 stream,57 remote: self.rpc.remote(Address::Agent),62 remote: self.0.rpc.remote(Address::Agent),58 })63 })59 }64 }60}65}crates/remowt-client/src/ssh_exec.rsdiffbeforeafterbothno changes
crates/remowt-client/src/subprocess.rsdiffbeforeafterboth1use bytes::Bytes;1use std::pin::pin;2use russh::client::Msg;3use russh::{Channel, ChannelMsg};4use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};5use tokio::sync::oneshot;627const BUF: usize = 64 * 1024;3use anyhow::{anyhow, bail, Result};4use camino::Utf8PathBuf;5use futures::StreamExt as _;6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};7use remowt_link_shared::BifConfig;8use serde::de::DeserializeOwned;9use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};10use tokio::select;11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{debug, info, warn};81314use crate::forwarded::{RemowtListener, RemowtStream};15use crate::Remowt;1617#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]18pub enum StdioMode {19 #[default]20 Null,21 Pipe,22 Inherit,23}2425#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]26pub enum StderrMode {27 #[default]28 Null,29 Pipe,30 Inherit,31 MergeWithStdout,32}3334#[derive(Default)]35pub struct SpawnOptions {36 pub program: String,37 pub args: Vec<String>,38 pub env: Vec<(String, String)>,39 pub env_clear: bool,40 pub cwd: Option<Utf8PathBuf>,41 pub escalated: bool,42 pub stdin: StdioMode,43 pub stdout: StdioMode,44 pub stderr: StderrMode,45}469pub struct RemowtChild {47pub struct RemowtChild {10 pub stdin: DuplexStream,48 pub stdin: Option<RemowtStream>,11 pub stdout: DuplexStream,49 pub stdout: Option<RemowtStream>,12 pub stderr: DuplexStream,50 pub stderr: Option<RemowtStream>,13 pub exit: oneshot::Receiver<Option<u32>>,51 id: ProcId,52 client: SubprocessClient<BifConfig>,14}53}155416impl RemowtChild {55impl RemowtChild {17 /// Manage channel returned by russh exec().56 pub async fn wait(self) -> Result<Option<i32>> {18 pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {19 let (stdin, mut stdin_r) = tokio::io::duplex(BUF);57 let RemowtChild {58 stdin,59 stdout,60 stderr,61 id,62 client,63 } = self;64 drop(stdin);20 let (mut out_w, stdout) = tokio::io::duplex(BUF);65 drop(stdout);21 let (mut err_w, stderr) = tokio::io::duplex(BUF);66 drop(stderr);67 client68 .wait(id)22 let (exit_tx, exit) = oneshot::channel();69 .await?70 .map_err(|e| anyhow!("agent wait failed: {e}"))71 }237224 tokio::spawn(async move {73 pub async fn kill(&self, signal: i32) -> Result<()> {25 let (mut read, write) = ch.split();74 self.client75 .kill(self.id, signal)76 .await?77 .map_err(|e| anyhow!("agent kill failed: {e}"))78 }79}268027 // Forward our stdin to the channel, signalling EOF when it closes.81fn needs_socket(m: StdioMode) -> bool {28 let stdin_pump = tokio::spawn(async move {82 matches!(m, StdioMode::Pipe | StdioMode::Inherit)29 let mut buf = vec![0u8; BUF];83}30 loop {8431 match stdin_r.read(&mut buf).await {85fn stderr_needs_socket(m: StderrMode) -> bool {32 Ok(0) | Err(_) => break,86 matches!(m, StderrMode::Pipe | StderrMode::Inherit)33 Ok(n) => {87}34 if write8835 .data_bytes(Bytes::copy_from_slice(&buf[..n]))89impl Remowt {36 .await90 pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {37 .is_err()91 let SpawnOptions {38 {92 program,39 return;93 args,40 }94 env,41 }95 env_clear,96 cwd,97 escalated,98 stdin,99 stdout,100 stderr,101 } = opts;102103 if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {104 bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");105 }106107 let stdin_bound = if needs_socket(stdin) {108 Some(self.bind_runtime_unix("proc-stdin").await?)109 } else {110 None111 };112 let stdout_bound = if needs_socket(stdout) {113 Some(self.bind_runtime_unix("proc-stdout").await?)114 } else {115 None116 };117 let stderr_bound = if stderr_needs_socket(stderr) {118 Some(self.bind_runtime_unix("proc-stderr").await?)119 } else {120 None121 };122123 let stdin_spec = match &stdin_bound {124 Some((_, p)) => StdioSpec::Socket(p.clone()),125 None => StdioSpec::Null,126 };127 let stdout_spec = match &stdout_bound {128 Some((_, p)) => StdioSpec::Socket(p.clone()),129 None => StdioSpec::Null,130 };131 let stderr_spec = match (&stderr, &stderr_bound) {132 (StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),133 (StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,134 _ => StderrSpec::Null,135 };136137 let client: SubprocessClient<BifConfig> = if escalated {138 // Boxed to break the async-fn type cycle139 Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?140 } else {141 self.endpoints()142 };143144 let spec = SpawnSpec {145 program: program.clone(),146 args,147 env,148 env_clear,149 cwd,150 stdin: stdin_spec,151 stdout: stdout_spec,152 stderr: stderr_spec,153 };154 let id = client155 .spawn(spec)156 .await?157 .map_err(|e| anyhow!("agent spawn failed: {e}"))?;158159 let (stdin_res, stdout_res, stderr_res) = tokio::join!(160 accept(stdin_bound),161 accept(stdout_bound),162 accept(stderr_bound),163 );164165 let stdin_stream = handle_stdin(stdin, stdin_res?, &program);166 let stdout_stream = handle_output(stdout, stdout_res?, &program, false);167 let stderr_stream = handle_output_err(stderr, stderr_res?, &program);168169 Ok(RemowtChild {170 stdin: stdin_stream,171 stdout: stdout_stream,172 stderr: stderr_stream,173 id,174 client,175 })176 }177178 pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {179 let program = program.as_ref().to_owned();180 RemowtCommand {181 program,182 args: vec![],183 env: vec![],184 remowt: self.clone(),185 escalated: false,186 }187 }188}189190async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {191 match b {192 Some((l, _)) => Ok(Some(l.accept().await?)),193 None => Ok(None),194 }195}196197fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {198 match mode {199 StdioMode::Pipe => s,200 StdioMode::Inherit => {201 if let Some(s) = s {202 let program = program.to_owned();203 tokio::spawn(async move {204 let mut stdin = tokio::io::stdin();205 let mut s = s;206 if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {207 warn!(program, "stdin forward ended: {e}");42 }208 }43 }209 let _ = s.shutdown().await;44 let _ = write.eof().await;45 });210 });211 }212 None213 }214 StdioMode::Null => None,215 }216}46217218fn handle_output(47 let mut code = None;219 mode: StdioMode,220 s: Option<RemowtStream>,221 program: &str,222 is_stderr: bool,223) -> Option<RemowtStream> {224 match mode {225 StdioMode::Pipe => s,48 while let Some(msg) = read.wait().await {226 StdioMode::Inherit => {227 if let Some(s) = s {228 let program = program.to_owned();49 match msg {229 tokio::spawn(pump_to_tracing(s, program, is_stderr));50 ChannelMsg::Data { data } => {230 }231 None232 }233 StdioMode::Null => None,234 }235}236237fn handle_output_err(238 mode: StderrMode,239 s: Option<RemowtStream>,240 program: &str,241) -> Option<RemowtStream> {242 match mode {243 StderrMode::Pipe => s,51 if out_w.write_all(&data).await.is_err() {244 StderrMode::Inherit => {245 if let Some(s) = s {52 break;246 let program = program.to_owned();247 tokio::spawn(pump_to_tracing(s, program, true));53 }248 }54 }249 None250 }55 ChannelMsg::ExtendedData { data, .. } => {251 StderrMode::MergeWithStdout | StderrMode::Null => None,252 }253}254255async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {56 if err_w.write_all(&data).await.is_err() {256 let mut reader = BufReader::new(stream).lines();257 loop {258 match reader.next_line().await {57 break;259 Ok(Some(line)) => {58 }260 if is_stderr {59 }261 warn!(program, "{line}");60 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),262 } else {61 _ => {}263 info!(program, "{line}");62 }264 }63 }265 }266 Ok(None) => break,267 Err(e) => {268 warn!(program, "child log stream error: {e}");269 break;270 }271 }272 }273}6427465 // The process is gone; stop waiting on stdin we'll never forward.275fn escape_bash(input: &str, out: &mut String) {276 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";277 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {66 stdin_pump.abort();278 out.push_str(input);67 let _ = out_w.shutdown().await;279 return;280 }281 out.push('\'');282 for (i, v) in input.split('\'').enumerate() {68 let _ = err_w.shutdown().await;283 if i != 0 {284 out.push_str("'\"'\"'");69 let _ = exit_tx.send(code);285 }286 out.push_str(v);70 });287 }288 out.push('\'');289}71290291#[derive(Clone)]72 RemowtChild {292pub struct RemowtCommand {73 stdin,293 program: String,294 args: Vec<String>,295 env: Vec<(String, String)>,74 stdout,296 remowt: Remowt,297 escalated: bool,298}299300impl RemowtCommand {301 pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {75 stderr,302 self.args.push(arg.as_ref().to_owned());303 self304 }305 pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {76 exit,306 for arg in args {307 self.args.push(arg.as_ref().to_owned());77 }308 }309 self78 }310 }311 pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {312 self.args313 .push(format!("{}={}", key.as_ref(), value.as_ref()));314 self315 }316 pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {317 self.args.push(key.as_ref().to_owned());318 self.args.push(value.as_ref().to_owned());319 self320 }321 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {322 self.env323 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));324 self325 }326327 pub fn sudo(mut self) -> Self {328 self.escalated = true;329 self330 }331332 /// Only for display.333 fn shell_line(&self) -> String {334 let mut out = String::new();335 if self.escalated {336 out.push_str("run0 ");337 }338 if !self.env.is_empty() {339 out.push_str("env");340 for (k, v) in &self.env {341 out.push(' ');342 assert!(!k.contains('='));343 escape_bash(k, &mut out);344 out.push('=');345 escape_bash(v, &mut out);346 }347 out.push(' ');348 }349 escape_bash(&self.program, &mut out);350 for arg in &self.args {351 out.push(' ');352 escape_bash(arg, &mut out);353 }354 out355 }356357 fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {358 let line = self.shell_line();359 let opts = SpawnOptions {360 program: self.program,361 args: self.args,362 env: self.env,363 env_clear: false,364 cwd: None,365 escalated: self.escalated,366 stdin: StdioMode::Null,367 stdout: StdioMode::Pipe,368 stderr: StderrMode::Pipe,369 };370 (self.remowt, opts, line)371 }372373 pub async fn run(self) -> Result<()> {374 run_inner(self, false).await.map(|_| ())375 }376 pub async fn run_string(self) -> Result<String> {377 let bytes = run_inner(self, true).await?.expect("want_stdout");378 Ok(String::from_utf8(bytes)?)379 }380 pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {381 let s = self.run_string().await?;382 Ok(serde_json::from_str(&s)?)383 }384}385386async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {387 let (remowt, opts, line) = cmd.into_spawn_options();388 debug!("running command {line:?} over remowt");389 let program = opts.program.clone();390 let mut child = remowt.spawn(opts).await?;391 let stderr = child.stderr.take().expect("stderr=Pipe");392 let stdout = child.stdout.take().expect("stdout=Pipe");393394 let mut err = FramedRead::new(stderr, LinesCodec::new());395 let (mut out_bytes, mut out_lines) = if want_stdout {396 (Some(FramedRead::new(stdout, BytesCodec::new())), None)397 } else {398 (None, Some(FramedRead::new(stdout, LinesCodec::new())))399 };400401 let mut buf = if want_stdout { Some(Vec::new()) } else { None };402403 let mut wait = pin!(child.wait());404 let exit = loop {405 select! {406 biased;407408 Some(e) = err.next() => {409 let e = e?;410 warn!(program = %program, "{e}");411 }412 Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {413 buf.as_mut().expect("want_stdout").extend_from_slice(&o?);414 }415 Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {416 let o = o?;417 info!(program = %program, "{o}");418 }419 res = &mut wait => {420 break res?;421 }422 }423 };7942480 /// Wait for the process to finish, returning its exit status.425 match exit {426 Some(0) => Ok(buf),427 Some(c) => bail!("command '{line}' failed with status {c}"),81 pub async fn wait(self) -> Option<u32> {428 None => Err(anyhow!("command '{line}' ended without an exit status")),82 self.exit.await.ok().flatten()83 }429 }84}430}85431crates/remowt-endpoints/Cargo.tomldiffbeforeafterboth16tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }16tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }17tracing.workspace = true17tracing.workspace = true18uuid.workspace = true18uuid.workspace = true19nix = { workspace = true, features = ["process", "term"] }19nix = { workspace = true, features = ["process", "signal", "term"] }20zbus.workspace = true20zbus.workspace = true2121crates/remowt-endpoints/src/lib.rsdiffbeforeafterboth1pub mod fs;1pub mod fs;2pub mod nix_daemon;2pub mod nix_daemon;3pub mod pty;3pub mod pty;4pub mod subprocess;4pub mod systemd;5pub mod systemd;56crates/remowt-endpoints/src/pty.rsdiffbeforeafterboth16use tokio::io::unix::AsyncFd;16use tokio::io::unix::AsyncFd;17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};18use tokio::net::UnixStream;18use tokio::net::UnixStream;19use tracing::{info, warn};19use tracing::{debug, info, warn};202021pub type ShellId = u64;21pub type ShellId = u64;2222117 };117 };118 let pty = AsyncPty::new(master)?;118 let pty = AsyncPty::new(master)?;119119120 info!(id, shell, "shell opened");120 debug!(id, shell, "shell opened");121 let shells = self.shells.clone();121 let shells = self.shells.clone();122 tokio::spawn(async move {122 tokio::spawn(async move {123 let mut pty = pty;123 let mut pty = pty;crates/remowt-endpoints/src/subprocess.rsdiffbeforeafterbothno changes