git.delta.rocks / remowt / refs/commits / 751b8f1ca8aa

difftreelog

feat subprocess through agent

kwlonlsvYaroslav Bolyukin5 days agoparent: #99930b3.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
308308
309[[package]]309[[package]]
310name = "bifrostlink"310name = "bifrostlink"
311version = "0.2.2"311version = "0.2.3"
312source = "registry+https://github.com/rust-lang/crates.io-index"312source = "registry+https://github.com/rust-lang/crates.io-index"
313checksum = "4bd6d4c7cf73270cbf6846a42361d409df23262d276ff15110e787067d89ad1d"313checksum = "704657867d2107831c57edd363726440c86675464b5fc113702854e17062cafc"
314dependencies = [314dependencies = [
315 "async-trait",315 "async-trait",
316 "async_fn_traits",316 "async_fn_traits",
327327
328[[package]]328[[package]]
329name = "bifrostlink-macros"329name = "bifrostlink-macros"
330version = "0.2.2"330version = "0.2.3"
331source = "registry+https://github.com/rust-lang/crates.io-index"331source = "registry+https://github.com/rust-lang/crates.io-index"
332checksum = "329813d3e34c7e65638480cefbb09d1e1dc47fa5dbf0f5f0cf8d9eee424f4f19"332checksum = "158308eb569b467c0116680f79d0ecc389f4d540f6d5a0c9279bfe79b1cd5bdb"
333dependencies = [333dependencies = [
334 "proc-macro2",334 "proc-macro2",
335 "quote",335 "quote",
338338
339[[package]]339[[package]]
340name = "bifrostlink-ports"340name = "bifrostlink-ports"
341version = "0.2.2"341version = "0.2.3"
342source = "registry+https://github.com/rust-lang/crates.io-index"342source = "registry+https://github.com/rust-lang/crates.io-index"
343checksum = "d6683eb0d4366b762fc45c0eb14e384d3554a23bc1ba3f1246368900d41e3700"343checksum = "7612993f0bd8bc6a71867461266567212a35a716b2a5aef5f9967ab08c891782"
344dependencies = [344dependencies = [
345 "bifrostlink",345 "bifrostlink",
346 "bytes",346 "bytes",
18351835
1836[[package]]1836[[package]]
1837name = "polkit-backend"1837name = "polkit-backend"
1838version = "0.1.1"1838version = "0.1.3"
1839dependencies = [1839dependencies = [
1840 "anyhow",1840 "anyhow",
1841 "clap",1841 "clap",
20552055
2056[[package]]2056[[package]]
2057name = "remowt-agent"2057name = "remowt-agent"
2058version = "0.1.1"2058version = "0.1.3"
2059dependencies = [2059dependencies = [
2060 "anyhow",2060 "anyhow",
2061 "bifrostlink",2061 "bifrostlink",
20832083
2084[[package]]2084[[package]]
2085name = "remowt-client"2085name = "remowt-client"
2086version = "0.1.1"2086version = "0.1.3"
2087dependencies = [2087dependencies = [
2088 "anyhow",2088 "anyhow",
2089 "bifrostlink",2089 "bifrostlink",
2090 "bifrostlink-ports",2090 "bifrostlink-ports",
2091 "bytes",2091 "bytes",
2092 "camino",2092 "camino",
2093 "futures",
2093 "remowt-endpoints",2094 "remowt-endpoints",
2094 "remowt-link-shared",2095 "remowt-link-shared",
2095 "russh",2096 "russh",
2098 "serde_json",2099 "serde_json",
2099 "tempfile",2100 "tempfile",
2100 "tokio",2101 "tokio",
2102 "tokio-util",
2101 "tracing",2103 "tracing",
2102 "uuid",2104 "uuid",
2103]2105]
21042106
2105[[package]]2107[[package]]
2106name = "remowt-endpoints"2108name = "remowt-endpoints"
2107version = "0.1.1"2109version = "0.1.3"
2108dependencies = [2110dependencies = [
2109 "anyhow",2111 "anyhow",
2110 "bifrostlink",2112 "bifrostlink",
21222124
2123[[package]]2125[[package]]
2124name = "remowt-link-shared"2126name = "remowt-link-shared"
2125version = "0.1.1"2127version = "0.1.3"
2126dependencies = [2128dependencies = [
2127 "bifrostlink",2129 "bifrostlink",
2128 "bytes",2130 "bytes",
21362138
2137[[package]]2139[[package]]
2138name = "remowt-plugin"2140name = "remowt-plugin"
2139version = "0.1.1"2141version = "0.1.3"
2140dependencies = [2142dependencies = [
2141 "anyhow",2143 "anyhow",
2142 "bifrostlink",2144 "bifrostlink",
21502152
2151[[package]]2153[[package]]
2152name = "remowt-polkit-shared"2154name = "remowt-polkit-shared"
2153version = "0.1.1"2155version = "0.1.3"
2154dependencies = [2156dependencies = [
2155 "nix",2157 "nix",
2156 "serde",2158 "serde",
21592161
2160[[package]]2162[[package]]
2161name = "remowt-ssh"2163name = "remowt-ssh"
2162version = "0.1.1"2164version = "0.1.3"
2163dependencies = [2165dependencies = [
2164 "anyhow",2166 "anyhow",
2165 "async-trait",2167 "async-trait",
21872189
2188[[package]]2190[[package]]
2189name = "remowt-ui-prompt"2191name = "remowt-ui-prompt"
2190version = "0.1.1"2192version = "0.1.3"
2191dependencies = [2193dependencies = [
2192 "bifrostlink",2194 "bifrostlink",
2193 "bifrostlink-macros",2195 "bifrostlink-macros",
modifiedCargo.tomldiffbeforeafterboth
3resolver = "2"3resolver = "2"
44
5[workspace.package]5[workspace.package]
6version = "0.1.1"6version = "0.1.3"
7license = "MIT"7license = "MIT"
8edition = "2021"8edition = "2021"
9repository = "https://gitlab.delta.directory/iam/remowt"9repository = "https://git.delta.rocks/r/remowt"
1010
11[workspace.dependencies]11[workspace.dependencies]
12remowt-client = { version = "0.1.1", path = "crates/remowt-client" }12remowt-client = { version = "0.1.3", path = "crates/remowt-client" }
13remowt-polkit-shared = { version = "0.1.1", path = "crates/polkit-shared" }13remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }
14remowt-link-shared = { version = "0.1.1", path = "crates/remowt-link-shared" }14remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }
15remowt-plugin = { version = "0.1.1", path = "crates/remowt-plugin" }15remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }
16remowt-ui-prompt = { version = "0.1.1", path = "crates/remowt-ui-prompt" }16remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }
17remowt-endpoints = { version = "0.1.1", path = "crates/remowt-endpoints" }17remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }
1818
19bifrostlink = "0.2.0"19bifrostlink = "0.2.0"
20bifrostlink-macros = "0.2.0"20bifrostlink-macros = "0.2.0"
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
11use bifrostlink_ports::stdio::from_stdio;11use bifrostlink_ports::stdio::from_stdio;
12use bifrostlink_ports::unix_socket::from_socket;12use bifrostlink_ports::unix_socket::from_socket;
13use clap::Parser;13use clap::Parser;
14use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};14use remowt_endpoints::{fs::Fs, nix_daemon::NixDaemon, pty::Pty, subprocess::Subprocess, systemd::Systemd};
15use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};15use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};
16use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};16use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};
17use remowt_ui_prompt::bifrost::PromptEndpointsClient;17use remowt_ui_prompt::bifrost::PromptEndpointsClient;
21use tokio::net::UnixStream;21use tokio::net::UnixStream;
22use tokio::runtime::Builder;22use tokio::runtime::Builder;
23use tokio::task::AbortHandle;23use tokio::task::AbortHandle;
24use tracing::{debug, info, trace};24use tracing::{debug, trace};
25use zbus::fdo;25use zbus::fdo;
26use zbus::zvariant::{OwnedValue, Str};26use zbus::zvariant::{OwnedValue, Str};
27use zbus::{interface, proxy, Connection};27use zbus::{interface, proxy, Connection};
85 identities: Vec<Identity>,85 identities: Vec<Identity>,
86 ) -> zbus::fdo::Result<()> {86 ) -> zbus::fdo::Result<()> {
87 use std::fmt::Write;87 use std::fmt::Write;
88 info!("begin auth");88 debug!("begin auth");
89 let _cancel_guard = Arc::new(OnceLock::new());89 let _cancel_guard = Arc::new(OnceLock::new());
90 let task = {90 let task = {
91 let helper = self.helper.clone();91 let helper = self.helper.clone();
220 /// Expect own address to be AgentPrivileged, skip installing polkit agent220 /// Expect own address to be AgentPrivileged, skip installing polkit agent
221 #[arg(long)]221 #[arg(long)]
222 privileged: bool,222 privileged: bool,
223 #[arg(long)]
224 local: bool,
223 },225 },
224 LocalAgent,226 LocalAgent,
225}227}
242 Opts::Editor { path } => runtime.block_on(editor::edit(path)),244 Opts::Editor { path } => runtime.block_on(editor::edit(path)),
243 Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),245 Opts::RealAgent {
246 path,
247 privileged,
248 local,
249 } => runtime.block_on(main_real_agent(path, privileged, local)),
244 }250 }
245}251}
246async fn main_real() -> anyhow::Result<()> {252async fn main_real() -> anyhow::Result<()> {
255}261}
256async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {262async fn main_real_agent(
263 path: Option<PathBuf>,
264 privileged: bool,
265 local: bool,
266) -> anyhow::Result<()> {
257 let address = if privileged {267 let address = if privileged {
258 Address::AgentPrivileged268 Address::AgentPrivileged
264 Fs::new().register_endpoints(&mut rpc);274 Fs::new().register_endpoints(&mut rpc);
265 Systemd.register_endpoints(&mut rpc);275 Systemd.register_endpoints(&mut rpc);
266 Pty::new().register_endpoints(&mut rpc);276 Pty::new().register_endpoints(&mut rpc);
277 Subprocess::new().register_endpoints(&mut rpc);
278 NixDaemon.register_endpoints(&mut rpc);
267279
268 remowt_plugin::host::serve(&mut rpc);280 remowt_plugin::host::serve(&mut rpc);
269281
312 };324 };
313 rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));325 rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));
314326
315 let polkit_conn = if !privileged {327 let polkit_conn = if !privileged && !local {
316 // The unprivileged agent doubles as a polkit authentication agent so328 // The unprivileged agent doubles as a polkit authentication agent so
317 // `run0` (e.g. our own elevation) routes its prompt to the User over329 // `run0` (e.g. our own elevation) routes its prompt to the User over
318 // bifrost instead of failing on a tty-less session.330 // bifrost instead of failing on a tty-less session.
modifiedcmds/remowt-ssh/Cargo.tomldiffbeforeafterboth
11tracing-subscriber.workspace = true11tracing-subscriber.workspace = true
12bifrostlink.workspace = true12bifrostlink.workspace = true
13remowt-link-shared.workspace = true13remowt-link-shared.workspace = true
14remowt-client = { workspace = true, features = ["shell"] }14remowt-client.workspace = true
15tokio = { workspace = true, features = [15tokio = { workspace = true, features = [
16 "macros",16 "macros",
17 "fs",17 "fs",
modifiedcmds/remowt-ssh/src/main.rsdiffbeforeafterboth
25enum Opts {25enum Opts {
26 /// Connect to remote host with remowt agent.26 /// Connect to remote host with remowt agent.
27 Ssh { host: String },27 Ssh {
28 host: String,
29 #[arg(long)]
30 escalate: bool,
31 },
28 /// Connect to local host for testing the connectivity.32 /// Connect to local host for testing the connectivity.
29 Local,33 Local {
34 #[arg(long)]
35 escalate: bool,
36 },
30}37}
3138
32fn agents_dir() -> anyhow::Result<PathBuf> {39fn agents_dir() -> anyhow::Result<PathBuf> {
45 let opts = Opts::parse();52 let opts = Opts::parse();
4653
47 let bundle = AgentBundle::from_dir(agents_dir()?)?;54 let bundle = AgentBundle::from_dir(agents_dir()?)?;
48 let conn = match &opts {55 let (conn, escalate) = match &opts {
49 Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,56 Opts::Ssh { host, escalate } => (Remowt::connect(host, &bundle).await?, *escalate),
50 Opts::Local => Remowt::connect_local(&bundle).await?,57 Opts::Local { escalate } => (Remowt::connect_local(&bundle).await?, *escalate),
51 };58 };
52 let mut rpc = conn.rpc();59 let mut rpc = conn.rpc();
5360
56 PrependSourcePrompter {63 PrependSourcePrompter {
57 prompter: RofiPrompter,64 prompter: RofiPrompter,
58 source: match opts {65 source: match opts {
59 Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],66 Opts::Ssh { host, .. } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
60 Opts::Local => vec![],67 Opts::Local { .. } => vec![],
61 },68 },
62 description: "".to_owned(),69 description: "".to_owned(),
63 },70 },
67 }74 }
6875
69 debug!("entering shell");76 debug!("entering shell");
70 run_shell(&conn).await?;77 run_shell(&conn, escalate).await?;
71 debug!("shell ended");78 debug!("shell ended");
7279
73 Ok(())80 Ok(())
74}81}
7582
76async fn run_shell(conn: &Remowt) -> anyhow::Result<()> {83async fn run_shell(conn: &Remowt, escalate: bool) -> anyhow::Result<()> {
77 let term = match std::env::var("TERM") {84 let term = match std::env::var("TERM") {
78 Ok(v) => v,85 Ok(v) => v,
79 Err(VarError::NotPresent) => "xterm-256color".to_owned(),86 Err(VarError::NotPresent) => "xterm-256color".to_owned(),
80 Err(e) => return Err(e.into()),87 Err(e) => return Err(e.into()),
81 };88 };
82 let (cols, rows) = term_size().unwrap_or((80, 24));89 let (cols, rows) = term_size().unwrap_or((80, 24));
8390
84 let shell = conn.open_shell(&term, cols, rows).await?;91 let shell = conn.open_shell(&term, cols, rows, escalate).await?;
85 let resizer = shell.resizer();92 let resizer = shell.resizer();
86 let stream = shell.stream;93 let stream = shell.stream;
8794
modifiedcrates/remowt-client/Cargo.tomldiffbeforeafterboth
27] }27] }
28tracing.workspace = true28tracing.workspace = true
29uuid = { workspace = true, features = ["v4"] }29uuid = { workspace = true, features = ["v4"] }
30remowt-endpoints = { workspace = true, optional = true }30remowt-endpoints.workspace = true
3131tokio-util = { workspace = true, features = ["codec"] }
32[features]32futures.workspace = true
33shell = ["dep:remowt-endpoints"]
3433
modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
6use anyhow::{anyhow, bail, ensure, Context as _, Result};6use anyhow::{anyhow, bail, ensure, Context as _, Result};
7use bifrostlink::declarative::RemoteEndpoints;7use bifrostlink::declarative::RemoteEndpoints;
8use bifrostlink::{Remote, Rpc, Rtt};8use bifrostlink::{Remote, Rpc, Rtt};
9use bifrostlink_ports::unix_socket::from_socket;
10use camino::{Utf8Path, Utf8PathBuf};9use camino::{Utf8Path, Utf8PathBuf};
11use remowt_link_shared::plugin::PluginEndpointsClient;10use remowt_link_shared::plugin::PluginEndpointsClient;
12use remowt_link_shared::port::child_port;11use remowt_link_shared::port::child_port;
19use russh::Channel;18use russh::Channel;
20use tempfile::TempDir;19use tempfile::TempDir;
21use tokio::net::UnixListener;20use tokio::net::UnixListener;
22use tokio::sync::oneshot::{self, channel};21use tokio::sync::oneshot;
23use tokio::{22use tokio::{
24 fs,23 fs,
25 io::{AsyncReadExt as _, AsyncWriteExt as _},24 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},
26};25};
27use tracing::{debug, error};26use tracing::{debug, warn};
28use uuid::Uuid;27use uuid::Uuid;
2928
30use self::port::channel_port;
31use self::subprocess::RemowtChild;
32
33pub mod editor;29pub mod editor;
34mod forwarded;30mod forwarded;
35mod port;
36#[cfg(feature = "shell")]
37mod shell;31mod shell;
32mod ssh_exec;
38mod subprocess;33mod subprocess;
3934
35use self::ssh_exec::SshExecChild;
36pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};
40pub use forwarded::{RemowtListener, RemowtStream};37pub use forwarded::{RemowtListener, RemowtStream};
41#[cfg(feature = "shell")]
42pub use shell::{RemowtShell, RemowtShellResizer};38pub use shell::{RemowtShell, RemowtShellResizer};
4339
44type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;40type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
99 let ch = sess.channel_open_session().await?;95 let ch = sess.channel_open_session().await?;
100 ch.exec(true, cmd).await?;96 ch.exec(true, cmd).await?;
10197
102 let mut child = RemowtChild::from_exec(ch);98 let mut child = SshExecChild::from_exec(ch);
103 drop(child.stdin);99 drop(child.stdin);
100 drain_stderr(child.stderr, cmd.to_owned());
104101
105 let mut out = Vec::new();102 let mut out = Vec::new();
106 let mut err = Vec::new();103 child.stdout.read_to_end(&mut out).await?;
107 tokio::try_join!(
108 child.stdout.read_to_end(&mut out),
109 child.stderr.read_to_end(&mut err),
110 )?;
111 if !err.is_empty() {
112 error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());
113 }
114 let code = child.exit.await.ok().flatten();104 let code = child.exit.await.ok().flatten();
115 Ok((code, out))105 Ok((code, out))
116}106}
140 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;130 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;
141131
142 debug!("get dir");132 debug!("get dir");
143 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")133 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;
144 .await?
145 .to_owned();
146 let dir = if cache.is_empty() {134 let dir = if cache.is_empty() {
147 let home = run_string_ok(sess, "echo \"$HOME\"").await?;135 let home = run_string_ok(sess, "echo \"$HOME\"").await?;
148 ensure!(136 ensure!(
183 debug!("cat");171 debug!("cat");
184 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;172 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;
185173
186 let mut child = RemowtChild::from_exec(ch);174 let mut child = SshExecChild::from_exec(ch);
187 child175 child
188 .stdin176 .stdin
189 .write_all(&bytes)177 .write_all(&bytes)
207 Ok(())195 Ok(())
208}196}
209197
210async fn detect_escalation(
211 sess: &Handle<SshHandler>,
212) -> Result<(&'static str, &'static [&'static str])> {
213 for (tool, flags) in ESCALATORS {
214 // `tool` is a fixed identifier (no metacharacters), safe to interpolate.
215 let (code, _) = run(sess, &format!("command -v {tool}")).await?;
216 if code == Some(0) {
217 return Ok((tool, flags));
218 }
219 }
220 bail!("no escalation tool found on remote")
221}
222
223fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {
224 let mut parts = vec![tool.to_owned()];
225 parts.extend(flags.iter().map(|f| f.to_string()));
226 parts.push(sh_quote(agent_path));
227 parts.push("real-agent".to_owned());
228 parts.push("--privileged".to_owned());
229 if let Some(p) = path {
230 parts.push("--path".to_owned());
231 parts.push(sh_quote(p));
232 }
233 parts.join(" ")
234}
235
236fn find_in_path(name: &str) -> Option<std::path::PathBuf> {
237 let path = env::var_os("PATH")?;
238 env::split_paths(&path)
239 .map(|dir| dir.join(name))
240 .find(|p| p.is_file())
241}
242
243pub struct SshHandler {198pub struct SshHandler {
244 host: String,199 host: String,
245 port: u16,200 port: u16,
285 },240 },
286}241}
287242
288pub struct Remowt {243struct RemowtInner {
289 transport: Transport,244 transport: Transport,
290 rpc: Rpc<BifConfig>,245 rpc: Rpc<BifConfig>,
291 elevated: tokio::sync::OnceCell<()>,246 elevated: tokio::sync::OnceCell<()>,
247 #[allow(dead_code)]
292 children: Mutex<Vec<tokio::process::Child>>,248 children: Mutex<Vec<tokio::process::Child>>,
293 _runtime_tmp: Option<TempDir>,249 _runtime_tmp: Option<TempDir>,
294}250}
295251
252#[derive(Clone)]
253pub struct Remowt(Arc<RemowtInner>);
254
296pub type RemowtRemote = Remote<BifConfig>;255pub type RemowtRemote = Remote<BifConfig>;
297256
298impl Remowt {257impl Remowt {
258 /// Connect to the remote host over ssh, detect the architecture and deploy the required
259 /// agent binary.
299 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {260 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {
300 let conf = russh_config::parse_home(host)?;261 let conf = russh_config::parse_home(host)?;
301 let port = conf.host_config.port.or(conf.port).unwrap_or(22);262 let port = conf.host_config.port.or(conf.port).unwrap_or(22);
346307
347 debug!("runtime dir");308 debug!("runtime dir");
348 let runtime_dir = remote_runtime_dir(&sess).await?;309 let runtime_dir = remote_runtime_dir(&sess).await?;
349 let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));
350310
351 let (onetx, onerx) = channel();
352 subs.lock().expect("lock").insert(primary.clone(), onetx);
353 sess.streamlocal_forward(primary.clone()).await?;
354
355 let rpc = Rpc::<BifConfig>::new(Address::User);311 let rpc = Rpc::<BifConfig>::new(Address::User);
356312
357 // TODO: ensure no injection is possible in the socket path.
358 let cmd_chan = sess.channel_open_session().await?;313 let cmd_chan = sess.channel_open_session().await?;
359 debug!("starting agent");314 debug!("starting agent");
360 cmd_chan315 cmd_chan
361 .exec(316 .exec(true, format!("{} real-agent", sh_quote(&agent_path)))
362 true,
363 format!(
364 "{} real-agent --path={}",
365 sh_quote(&agent_path),
366 sh_quote(&primary)
367 ),
368 )
369 .await?;317 .await?;
370318
371 let port = channel_port(319 let child = SshExecChild::from_exec(cmd_chan);
372 onerx320 drain_stderr(child.stderr, "agent".to_owned());
321 rpc.add_direct(
373 .await322 Address::Agent,
323 child_port(child.stdout, child.stdin),
374 .map_err(|_| anyhow!("agent never opened its channel"))?,324 Rtt(0),
375 );325 );
376 rpc.add_direct(Address::Agent, port, Rtt(0));
377326
378 Ok(Self {327 Ok(Self(Arc::new(RemowtInner {
379 transport: Transport::Ssh {328 transport: Transport::Ssh {
380 sess,329 sess,
381 subs,330 subs,
386 elevated: tokio::sync::OnceCell::new(),335 elevated: tokio::sync::OnceCell::new(),
387 children: Mutex::new(Vec::new()),336 children: Mutex::new(Vec::new()),
388 _runtime_tmp: None,337 _runtime_tmp: None,
389 })338 })))
390 }339 }
391340
341 /// "Connect" to the local machine's agent, by starting the agent binary locally.
392 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {342 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {
393 let agent_path = bundle.local_binary()?;343 let agent_path = bundle.local_binary()?;
394 let mut child = tokio::process::Command::new(&agent_path)344 let mut child = tokio::process::Command::new(&agent_path)
395 .arg("real-agent")345 .arg("real-agent")
346 .arg("--local")
396 .stdin(std::process::Stdio::piped())347 .stdin(std::process::Stdio::piped())
397 .stdout(std::process::Stdio::piped())348 .stdout(std::process::Stdio::piped())
398 .kill_on_drop(true)349 .kill_on_drop(true)
406357
407 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;358 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;
408359
409 Ok(Self {360 Ok(Self(Arc::new(RemowtInner {
410 transport: Transport::Local {361 transport: Transport::Local {
411 agent_path,362 agent_path,
412 runtime_dir,363 runtime_dir,
415 elevated: tokio::sync::OnceCell::new(),366 elevated: tokio::sync::OnceCell::new(),
416 children: Mutex::new(vec![child]),367 children: Mutex::new(vec![child]),
417 _runtime_tmp: runtime_tmp,368 _runtime_tmp: runtime_tmp,
418 })369 })))
419 }370 }
420371
372 /// Get the handle to the underlying russh session handle.
421 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {373 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {
422 match &self.transport {374 match &self.0.transport {
423 Transport::Ssh { sess, .. } => Some(sess.clone()),375 Transport::Ssh { sess, .. } => Some(sess.clone()),
424 Transport::Local { .. } => None,376 Transport::Local { .. } => None,
425 }377 }
426 }378 }
427379
428 pub fn rpc(&self) -> Rpc<BifConfig> {380 pub fn rpc(&self) -> Rpc<BifConfig> {
429 self.rpc.clone()381 self.0.rpc.clone()
430 }382 }
431383
432 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {
433 R::wrap(self.rpc.remote(Address::Agent))
434 }
435
436 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {384 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {
437 let client: PluginEndpointsClient<BifConfig> = self.endpoints();385 let client: PluginEndpointsClient<BifConfig> = self.endpoints();
438 client386 client
441 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))389 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))
442 }390 }
443 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {391 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {
444 self.ensure_elevated().await?;392 self.ensure_escalated().await?;
445 let client: PluginEndpointsClient<BifConfig> =393 let client: PluginEndpointsClient<BifConfig> =
446 PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));394 PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));
447 client395 client
448 .load_plugin_path(id, path.to_owned())396 .load_plugin_path(id, path.to_owned())
449 .await?397 .await?
450 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))398 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))
451 }399 }
452 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {400 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {
453 R::wrap(self.rpc.remote(Address::Plugin(id)))401 R::wrap(self.0.rpc.remote(Address::Plugin(id)))
454 }402 }
403
404 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {
405 R::wrap(self.0.rpc.remote(Address::Agent))
406 }
455 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {407 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {
456 self.ensure_elevated().await?;408 self.ensure_escalated().await?;
457 Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))409 Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))
458 }410 }
459411
460 async fn ensure_elevated(&self) -> Result<()> {412 async fn ensure_escalated(&self) -> Result<()> {
461 self.elevated413 self.0
414 .elevated
462 .get_or_try_init(|| async {415 .get_or_try_init(|| async {
463 let port = match &self.transport {416 let (agent_path, local) = match &self.0.transport {
464 Transport::Ssh {417 Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),
465 sess, agent_path, ..
466 } => {
467 let (tool, flags) = detect_escalation(sess).await?;
468 let ch = sess.channel_open_session().await?;
469 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))
470 .await?;418 Transport::Local { agent_path, .. } => (
471 channel_port(ch)
472 }
473 Transport::Local { agent_path, .. } => {
474 let sock = self
475 .runtime_dir()
476 .join(format!("remowt-priv-{}.sock", Uuid::new_v4()));419 agent_path
477 let _ = std::fs::remove_file(&sock);420 .to_str()
478 let listener = UnixListener::bind(&sock)?;
479 let (tool, flags) = ESCALATORS
480 .iter()
481 .find(|(t, _)| find_in_path(t).is_some())421 .ok_or_else(|| anyhow!("local agent path is not utf-8"))?
482 .ok_or_else(|| anyhow!("no escalation tool found"))?;
483 let child = tokio::process::Command::new(tool)
484 .args(*flags)
485 .arg(agent_path)
486 .arg("real-agent")
487 .arg("--privileged")
488 .arg("--path")422 .to_owned(),
489 .arg(sock.as_str())423 true,
490 .kill_on_drop(true)
491 .spawn()?;
492 self.children.lock().expect("lock").push(child);
493 let (stream, _) = listener.accept().await?;
494 let _ = std::fs::remove_file(&sock);424 ),
495 from_socket(stream)
496 }
497 };425 };
426
498 self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));427 let (tool, flags) = self.detect_escalation().await?;
428 let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();
429 args.push(agent_path);
430 args.push("real-agent".to_owned());
431 args.push("--privileged".to_owned());
432 if local {
433 args.push("--local".to_owned());
434 }
435
436 let child = self
437 .spawn(SpawnOptions {
438 program: tool.to_owned(),
439 args,
440 stdin: StdioMode::Pipe,
441 stdout: StdioMode::Pipe,
442 stderr: StderrMode::Inherit,
443 ..Default::default()
444 })
445 .await
446 .context("spawning privileged agent")?;
447
448 let stdin = child
449 .stdin
450 .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;
451 let stdout = child
452 .stdout
453 .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;
454
455 let port = child_port(stdout, stdin);
456 self.0
457 .rpc
458 .add_direct(Address::AgentPrivileged, port, Rtt(0));
499 anyhow::Ok(())459 anyhow::Ok(())
500 })460 })
501 .await?;461 .await?;
502 Ok(())462 Ok(())
503 }463 }
504464
505 pub async fn exec(&self, command: String) -> Result<RemowtChild> {465 async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {
506 let Some(sess) = self.ssh() else {466 for (tool, flags) in ESCALATORS {
467 let probe = self
468 .spawn(SpawnOptions {
507 bail!("exec should not be called on local")469 program: (*tool).to_owned(),
470 args: vec!["--version".to_owned()],
471 stdout: StdioMode::Null,
472 stderr: StderrMode::Null,
473 ..Default::default()
508 };474 })
475 .await;
509 let ch = sess.channel_open_session().await?;476 if let Ok(child) = probe {
477 let _ = child.wait().await;
510 ch.exec(true, command).await?;478 return Ok((tool, flags));
511 Ok(RemowtChild::from_exec(ch))479 }
480 }
481 bail!("no escalation tool found")
512 }482 }
513483
514 fn runtime_dir(&self) -> Utf8PathBuf {484 /// XDG_RUNTIME_DIR on the remote machine.
485 pub fn runtime_dir(&self) -> Utf8PathBuf {
515 match &self.transport {486 match &self.0.transport {
516 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),487 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),
517 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),488 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),
518 }489 }
519 }490 }
520491
521 pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {492 /// Bind unix listener socket on the remote machine with auto-generated path, returning the path.
493 pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {
494 let sock = self
495 .runtime_dir()
496 .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));
497 let listener = self.bind_unix(&sock).await?;
498 Ok((listener, sock))
499 }
500
501 /// Bind unix listener socket on the remote machine on the specified path.
502 pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {
522 match &self.transport {503 match &self.0.transport {
523 Transport::Ssh { sess, subs, .. } => {504 Transport::Ssh { sess, subs, .. } => {
524 let (tx, rx) = oneshot::channel();505 let (tx, rx) = oneshot::channel();
525 subs.lock().expect("lock").insert(path.to_owned(), tx);506 subs.lock().expect("lock").insert(path.to_owned(), tx);
537 }518 }
538}519}
539520
521fn drain_stderr(stream: DuplexStream, context: String) {
522 tokio::spawn(async move {
523 let mut reader = BufReader::new(stream).lines();
524 loop {
525 match reader.next_line().await {
526 Ok(Some(line)) => warn!(context = %context, "{line}"),
527 Ok(None) => break,
528 Err(e) => {
529 warn!(context = %context, "stderr read failed: {e}");
530 break;
531 }
532 }
533 }
534 });
535}
536
540fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {537fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
541 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {538 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {
542 if !dir.is_empty() {539 if !dir.is_empty() {
556 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;553 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;
557 let dir = dir.trim();554 let dir = dir.trim();
558 if dir.is_empty() {555 if dir.is_empty() {
559 remote_mktemp(sess).await556 let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;
557 Ok(Utf8PathBuf::from(tmp))
560 } else {558 } else {
561 Ok(Utf8PathBuf::from(dir))559 Ok(Utf8PathBuf::from(dir))
562 }
563}
564
565async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {
566 let mut cmd_chan = sess.channel_open_session().await?;
567 cmd_chan
568 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")
569 .await?;
570 let mut stdout = vec![];
571 loop {
572 let Some(msg) = cmd_chan.wait().await else {
573 bail!("unexpected channel end");
574 };
575 match msg {
576 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),
577 russh::ChannelMsg::ExitStatus { exit_status } => {
578 if exit_status != 0 {
579 bail!("mktemp failed");
580 }
581 break;
582 }
583 _ => {}
584 }
585 }560 }
586 ensure!(stdout.ends_with(b"\n"));
587 stdout.pop();
588 Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))
589}561}
590562
deletedcrates/remowt-client/src/port.rsdiffbeforeafterboth

no changes

modifiedcrates/remowt-client/src/shell.rsdiffbeforeafterboth
3use bifrostlink::Remote;3use bifrostlink::Remote;
4use remowt_endpoints::pty::{PtyClient, ShellId};4use remowt_endpoints::pty::{PtyClient, ShellId};
5use remowt_link_shared::{Address, BifConfig};5use remowt_link_shared::{Address, BifConfig};
6use uuid::Uuid;
76
8use crate::forwarded::RemowtStream;7use crate::forwarded::RemowtStream;
9use crate::Remowt;8use crate::Remowt;
3938
40impl Remowt {39impl Remowt {
41 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {40 pub async fn open_shell(
42 let sock = self41 &self,
43 .runtime_dir()42 term: &str,
44 .join(format!("remowt-shell-{}.sock", Uuid::new_v4()));43 cols: u16,
4544 rows: u16,
45 escalate: bool,
46 ) -> Result<RemowtShell> {
46 let forwarded = self.forward_socket(&sock).await?;47 let (forwarded, path) = self.bind_runtime_unix("shell").await?;
47 let client: PtyClient<BifConfig> = self.endpoints();48 let client: PtyClient<BifConfig> = if escalate {
49 self.run0_endpoints().await?
50 } else {
51 self.endpoints()
52 };
48 let id = client53 let id = client
49 .open_shell(sock, term.to_owned(), cols, rows)54 .open_shell(path, term.to_owned(), cols, rows)
50 .await?55 .await?
51 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;56 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
52 let stream = forwarded.accept().await?;57 let stream = forwarded.accept().await?;
5358
54 Ok(RemowtShell {59 Ok(RemowtShell {
55 id,60 id,
56 stream,61 stream,
57 remote: self.rpc.remote(Address::Agent),62 remote: self.0.rpc.remote(Address::Agent),
58 })63 })
59 }64 }
60}65}
addedcrates/remowt-client/src/ssh_exec.rsdiffbeforeafterboth

no changes

modifiedcrates/remowt-client/src/subprocess.rsdiffbeforeafterboth
1use bytes::Bytes;1use std::pin::pin;
2use russh::client::Msg;
3use russh::{Channel, ChannelMsg};
4use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
5use tokio::sync::oneshot;
62
7const BUF: usize = 64 * 1024;3use anyhow::{anyhow, bail, Result};
4use camino::Utf8PathBuf;
5use futures::StreamExt as _;
6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
7use remowt_link_shared::BifConfig;
8use serde::de::DeserializeOwned;
9use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};
10use tokio::select;
11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
12use tracing::{debug, info, warn};
813
14use crate::forwarded::{RemowtListener, RemowtStream};
15use crate::Remowt;
16
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum StdioMode {
19 #[default]
20 Null,
21 Pipe,
22 Inherit,
23}
24
25#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
26pub enum StderrMode {
27 #[default]
28 Null,
29 Pipe,
30 Inherit,
31 MergeWithStdout,
32}
33
34#[derive(Default)]
35pub struct SpawnOptions {
36 pub program: String,
37 pub args: Vec<String>,
38 pub env: Vec<(String, String)>,
39 pub env_clear: bool,
40 pub cwd: Option<Utf8PathBuf>,
41 pub escalated: bool,
42 pub stdin: StdioMode,
43 pub stdout: StdioMode,
44 pub stderr: StderrMode,
45}
46
9pub struct RemowtChild {47pub struct RemowtChild {
10 pub stdin: DuplexStream,48 pub stdin: Option<RemowtStream>,
11 pub stdout: DuplexStream,49 pub stdout: Option<RemowtStream>,
12 pub stderr: DuplexStream,50 pub stderr: Option<RemowtStream>,
13 pub exit: oneshot::Receiver<Option<u32>>,51 id: ProcId,
52 client: SubprocessClient<BifConfig>,
14}53}
1554
16impl RemowtChild {55impl RemowtChild {
17 /// Manage channel returned by russh exec().56 pub async fn wait(self) -> Result<Option<i32>> {
18 pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
19 let (stdin, mut stdin_r) = tokio::io::duplex(BUF);57 let RemowtChild {
58 stdin,
59 stdout,
60 stderr,
61 id,
62 client,
63 } = self;
64 drop(stdin);
20 let (mut out_w, stdout) = tokio::io::duplex(BUF);65 drop(stdout);
21 let (mut err_w, stderr) = tokio::io::duplex(BUF);66 drop(stderr);
67 client
68 .wait(id)
22 let (exit_tx, exit) = oneshot::channel();69 .await?
70 .map_err(|e| anyhow!("agent wait failed: {e}"))
71 }
2372
24 tokio::spawn(async move {73 pub async fn kill(&self, signal: i32) -> Result<()> {
25 let (mut read, write) = ch.split();74 self.client
75 .kill(self.id, signal)
76 .await?
77 .map_err(|e| anyhow!("agent kill failed: {e}"))
78 }
79}
2680
27 // Forward our stdin to the channel, signalling EOF when it closes.81fn needs_socket(m: StdioMode) -> bool {
28 let stdin_pump = tokio::spawn(async move {82 matches!(m, StdioMode::Pipe | StdioMode::Inherit)
29 let mut buf = vec![0u8; BUF];83}
30 loop {84
31 match stdin_r.read(&mut buf).await {85fn stderr_needs_socket(m: StderrMode) -> bool {
32 Ok(0) | Err(_) => break,86 matches!(m, StderrMode::Pipe | StderrMode::Inherit)
33 Ok(n) => {87}
34 if write88
35 .data_bytes(Bytes::copy_from_slice(&buf[..n]))89impl Remowt {
36 .await90 pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {
37 .is_err()91 let SpawnOptions {
38 {92 program,
39 return;93 args,
40 }94 env,
41 }95 env_clear,
96 cwd,
97 escalated,
98 stdin,
99 stdout,
100 stderr,
101 } = opts;
102
103 if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {
104 bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");
105 }
106
107 let stdin_bound = if needs_socket(stdin) {
108 Some(self.bind_runtime_unix("proc-stdin").await?)
109 } else {
110 None
111 };
112 let stdout_bound = if needs_socket(stdout) {
113 Some(self.bind_runtime_unix("proc-stdout").await?)
114 } else {
115 None
116 };
117 let stderr_bound = if stderr_needs_socket(stderr) {
118 Some(self.bind_runtime_unix("proc-stderr").await?)
119 } else {
120 None
121 };
122
123 let stdin_spec = match &stdin_bound {
124 Some((_, p)) => StdioSpec::Socket(p.clone()),
125 None => StdioSpec::Null,
126 };
127 let stdout_spec = match &stdout_bound {
128 Some((_, p)) => StdioSpec::Socket(p.clone()),
129 None => StdioSpec::Null,
130 };
131 let stderr_spec = match (&stderr, &stderr_bound) {
132 (StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),
133 (StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,
134 _ => StderrSpec::Null,
135 };
136
137 let client: SubprocessClient<BifConfig> = if escalated {
138 // Boxed to break the async-fn type cycle
139 Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?
140 } else {
141 self.endpoints()
142 };
143
144 let spec = SpawnSpec {
145 program: program.clone(),
146 args,
147 env,
148 env_clear,
149 cwd,
150 stdin: stdin_spec,
151 stdout: stdout_spec,
152 stderr: stderr_spec,
153 };
154 let id = client
155 .spawn(spec)
156 .await?
157 .map_err(|e| anyhow!("agent spawn failed: {e}"))?;
158
159 let (stdin_res, stdout_res, stderr_res) = tokio::join!(
160 accept(stdin_bound),
161 accept(stdout_bound),
162 accept(stderr_bound),
163 );
164
165 let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
166 let stdout_stream = handle_output(stdout, stdout_res?, &program, false);
167 let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
168
169 Ok(RemowtChild {
170 stdin: stdin_stream,
171 stdout: stdout_stream,
172 stderr: stderr_stream,
173 id,
174 client,
175 })
176 }
177
178 pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {
179 let program = program.as_ref().to_owned();
180 RemowtCommand {
181 program,
182 args: vec![],
183 env: vec![],
184 remowt: self.clone(),
185 escalated: false,
186 }
187 }
188}
189
190async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {
191 match b {
192 Some((l, _)) => Ok(Some(l.accept().await?)),
193 None => Ok(None),
194 }
195}
196
197fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
198 match mode {
199 StdioMode::Pipe => s,
200 StdioMode::Inherit => {
201 if let Some(s) = s {
202 let program = program.to_owned();
203 tokio::spawn(async move {
204 let mut stdin = tokio::io::stdin();
205 let mut s = s;
206 if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {
207 warn!(program, "stdin forward ended: {e}");
42 }208 }
43 }209 let _ = s.shutdown().await;
44 let _ = write.eof().await;
45 });210 });
211 }
212 None
213 }
214 StdioMode::Null => None,
215 }
216}
46217
218fn handle_output(
47 let mut code = None;219 mode: StdioMode,
220 s: Option<RemowtStream>,
221 program: &str,
222 is_stderr: bool,
223) -> Option<RemowtStream> {
224 match mode {
225 StdioMode::Pipe => s,
48 while let Some(msg) = read.wait().await {226 StdioMode::Inherit => {
227 if let Some(s) = s {
228 let program = program.to_owned();
49 match msg {229 tokio::spawn(pump_to_tracing(s, program, is_stderr));
50 ChannelMsg::Data { data } => {230 }
231 None
232 }
233 StdioMode::Null => None,
234 }
235}
236
237fn handle_output_err(
238 mode: StderrMode,
239 s: Option<RemowtStream>,
240 program: &str,
241) -> Option<RemowtStream> {
242 match mode {
243 StderrMode::Pipe => s,
51 if out_w.write_all(&data).await.is_err() {244 StderrMode::Inherit => {
245 if let Some(s) = s {
52 break;246 let program = program.to_owned();
247 tokio::spawn(pump_to_tracing(s, program, true));
53 }248 }
54 }249 None
250 }
55 ChannelMsg::ExtendedData { data, .. } => {251 StderrMode::MergeWithStdout | StderrMode::Null => None,
252 }
253}
254
255async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {
56 if err_w.write_all(&data).await.is_err() {256 let mut reader = BufReader::new(stream).lines();
257 loop {
258 match reader.next_line().await {
57 break;259 Ok(Some(line)) => {
58 }260 if is_stderr {
59 }261 warn!(program, "{line}");
60 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),262 } else {
61 _ => {}263 info!(program, "{line}");
62 }264 }
63 }265 }
266 Ok(None) => break,
267 Err(e) => {
268 warn!(program, "child log stream error: {e}");
269 break;
270 }
271 }
272 }
273}
64274
65 // The process is gone; stop waiting on stdin we'll never forward.275fn escape_bash(input: &str, out: &mut String) {
276 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
277 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
66 stdin_pump.abort();278 out.push_str(input);
67 let _ = out_w.shutdown().await;279 return;
280 }
281 out.push('\'');
282 for (i, v) in input.split('\'').enumerate() {
68 let _ = err_w.shutdown().await;283 if i != 0 {
284 out.push_str("'\"'\"'");
69 let _ = exit_tx.send(code);285 }
286 out.push_str(v);
70 });287 }
288 out.push('\'');
289}
71290
291#[derive(Clone)]
72 RemowtChild {292pub struct RemowtCommand {
73 stdin,293 program: String,
294 args: Vec<String>,
295 env: Vec<(String, String)>,
74 stdout,296 remowt: Remowt,
297 escalated: bool,
298}
299
300impl RemowtCommand {
301 pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
75 stderr,302 self.args.push(arg.as_ref().to_owned());
303 self
304 }
305 pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
76 exit,306 for arg in args {
307 self.args.push(arg.as_ref().to_owned());
77 }308 }
309 self
78 }310 }
311 pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
312 self.args
313 .push(format!("{}={}", key.as_ref(), value.as_ref()));
314 self
315 }
316 pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
317 self.args.push(key.as_ref().to_owned());
318 self.args.push(value.as_ref().to_owned());
319 self
320 }
321 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
322 self.env
323 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));
324 self
325 }
326
327 pub fn sudo(mut self) -> Self {
328 self.escalated = true;
329 self
330 }
331
332 /// Only for display.
333 fn shell_line(&self) -> String {
334 let mut out = String::new();
335 if self.escalated {
336 out.push_str("run0 ");
337 }
338 if !self.env.is_empty() {
339 out.push_str("env");
340 for (k, v) in &self.env {
341 out.push(' ');
342 assert!(!k.contains('='));
343 escape_bash(k, &mut out);
344 out.push('=');
345 escape_bash(v, &mut out);
346 }
347 out.push(' ');
348 }
349 escape_bash(&self.program, &mut out);
350 for arg in &self.args {
351 out.push(' ');
352 escape_bash(arg, &mut out);
353 }
354 out
355 }
356
357 fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {
358 let line = self.shell_line();
359 let opts = SpawnOptions {
360 program: self.program,
361 args: self.args,
362 env: self.env,
363 env_clear: false,
364 cwd: None,
365 escalated: self.escalated,
366 stdin: StdioMode::Null,
367 stdout: StdioMode::Pipe,
368 stderr: StderrMode::Pipe,
369 };
370 (self.remowt, opts, line)
371 }
372
373 pub async fn run(self) -> Result<()> {
374 run_inner(self, false).await.map(|_| ())
375 }
376 pub async fn run_string(self) -> Result<String> {
377 let bytes = run_inner(self, true).await?.expect("want_stdout");
378 Ok(String::from_utf8(bytes)?)
379 }
380 pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {
381 let s = self.run_string().await?;
382 Ok(serde_json::from_str(&s)?)
383 }
384}
385
386async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {
387 let (remowt, opts, line) = cmd.into_spawn_options();
388 debug!("running command {line:?} over remowt");
389 let program = opts.program.clone();
390 let mut child = remowt.spawn(opts).await?;
391 let stderr = child.stderr.take().expect("stderr=Pipe");
392 let stdout = child.stdout.take().expect("stdout=Pipe");
393
394 let mut err = FramedRead::new(stderr, LinesCodec::new());
395 let (mut out_bytes, mut out_lines) = if want_stdout {
396 (Some(FramedRead::new(stdout, BytesCodec::new())), None)
397 } else {
398 (None, Some(FramedRead::new(stdout, LinesCodec::new())))
399 };
400
401 let mut buf = if want_stdout { Some(Vec::new()) } else { None };
402
403 let mut wait = pin!(child.wait());
404 let exit = loop {
405 select! {
406 biased;
407
408 Some(e) = err.next() => {
409 let e = e?;
410 warn!(program = %program, "{e}");
411 }
412 Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {
413 buf.as_mut().expect("want_stdout").extend_from_slice(&o?);
414 }
415 Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {
416 let o = o?;
417 info!(program = %program, "{o}");
418 }
419 res = &mut wait => {
420 break res?;
421 }
422 }
423 };
79424
80 /// Wait for the process to finish, returning its exit status.425 match exit {
426 Some(0) => Ok(buf),
427 Some(c) => bail!("command '{line}' failed with status {c}"),
81 pub async fn wait(self) -> Option<u32> {428 None => Err(anyhow!("command '{line}' ended without an exit status")),
82 self.exit.await.ok().flatten()
83 }429 }
84}430}
85431
modifiedcrates/remowt-endpoints/Cargo.tomldiffbeforeafterboth
16tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }16tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }
17tracing.workspace = true17tracing.workspace = true
18uuid.workspace = true18uuid.workspace = true
19nix = { workspace = true, features = ["process", "term"] }19nix = { workspace = true, features = ["process", "signal", "term"] }
20zbus.workspace = true20zbus.workspace = true
2121
modifiedcrates/remowt-endpoints/src/lib.rsdiffbeforeafterboth
1pub mod fs;1pub mod fs;
2pub mod nix_daemon;2pub mod nix_daemon;
3pub mod pty;3pub mod pty;
4pub mod subprocess;
4pub mod systemd;5pub mod systemd;
56
modifiedcrates/remowt-endpoints/src/pty.rsdiffbeforeafterboth
16use tokio::io::unix::AsyncFd;16use tokio::io::unix::AsyncFd;
17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
18use tokio::net::UnixStream;18use tokio::net::UnixStream;
19use tracing::{info, warn};19use tracing::{debug, info, warn};
2020
21pub type ShellId = u64;21pub type ShellId = u64;
2222
117 };117 };
118 let pty = AsyncPty::new(master)?;118 let pty = AsyncPty::new(master)?;
119119
120 info!(id, shell, "shell opened");120 debug!(id, shell, "shell opened");
121 let shells = self.shells.clone();121 let shells = self.shells.clone();
122 tokio::spawn(async move {122 tokio::spawn(async move {
123 let mut pty = pty;123 let mut pty = pty;
addedcrates/remowt-endpoints/src/subprocess.rsdiffbeforeafterboth

no changes