difftreelog
feat subprocess through agent
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,9 +308,9 @@
[[package]]
name = "bifrostlink"
-version = "0.2.2"
+version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4bd6d4c7cf73270cbf6846a42361d409df23262d276ff15110e787067d89ad1d"
+checksum = "704657867d2107831c57edd363726440c86675464b5fc113702854e17062cafc"
dependencies = [
"async-trait",
"async_fn_traits",
@@ -327,9 +327,9 @@
[[package]]
name = "bifrostlink-macros"
-version = "0.2.2"
+version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "329813d3e34c7e65638480cefbb09d1e1dc47fa5dbf0f5f0cf8d9eee424f4f19"
+checksum = "158308eb569b467c0116680f79d0ecc389f4d540f6d5a0c9279bfe79b1cd5bdb"
dependencies = [
"proc-macro2",
"quote",
@@ -338,9 +338,9 @@
[[package]]
name = "bifrostlink-ports"
-version = "0.2.2"
+version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d6683eb0d4366b762fc45c0eb14e384d3554a23bc1ba3f1246368900d41e3700"
+checksum = "7612993f0bd8bc6a71867461266567212a35a716b2a5aef5f9967ab08c891782"
dependencies = [
"bifrostlink",
"bytes",
@@ -1835,7 +1835,7 @@
[[package]]
name = "polkit-backend"
-version = "0.1.1"
+version = "0.1.3"
dependencies = [
"anyhow",
"clap",
@@ -2055,7 +2055,7 @@
[[package]]
name = "remowt-agent"
-version = "0.1.1"
+version = "0.1.3"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2083,13 +2083,14 @@
[[package]]
name = "remowt-client"
-version = "0.1.1"
+version = "0.1.3"
dependencies = [
"anyhow",
"bifrostlink",
"bifrostlink-ports",
"bytes",
"camino",
+ "futures",
"remowt-endpoints",
"remowt-link-shared",
"russh",
@@ -2098,13 +2099,14 @@
"serde_json",
"tempfile",
"tokio",
+ "tokio-util",
"tracing",
"uuid",
]
[[package]]
name = "remowt-endpoints"
-version = "0.1.1"
+version = "0.1.3"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2122,7 +2124,7 @@
[[package]]
name = "remowt-link-shared"
-version = "0.1.1"
+version = "0.1.3"
dependencies = [
"bifrostlink",
"bytes",
@@ -2136,7 +2138,7 @@
[[package]]
name = "remowt-plugin"
-version = "0.1.1"
+version = "0.1.3"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2150,7 +2152,7 @@
[[package]]
name = "remowt-polkit-shared"
-version = "0.1.1"
+version = "0.1.3"
dependencies = [
"nix",
"serde",
@@ -2159,7 +2161,7 @@
[[package]]
name = "remowt-ssh"
-version = "0.1.1"
+version = "0.1.3"
dependencies = [
"anyhow",
"async-trait",
@@ -2187,7 +2189,7 @@
[[package]]
name = "remowt-ui-prompt"
-version = "0.1.1"
+version = "0.1.3"
dependencies = [
"bifrostlink",
"bifrostlink-macros",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,18 +3,18 @@
resolver = "2"
[workspace.package]
-version = "0.1.1"
+version = "0.1.3"
license = "MIT"
edition = "2021"
-repository = "https://gitlab.delta.directory/iam/remowt"
+repository = "https://git.delta.rocks/r/remowt"
[workspace.dependencies]
-remowt-client = { version = "0.1.1", path = "crates/remowt-client" }
-remowt-polkit-shared = { version = "0.1.1", path = "crates/polkit-shared" }
-remowt-link-shared = { version = "0.1.1", path = "crates/remowt-link-shared" }
-remowt-plugin = { version = "0.1.1", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.1", path = "crates/remowt-ui-prompt" }
-remowt-endpoints = { version = "0.1.1", path = "crates/remowt-endpoints" }
+remowt-client = { version = "0.1.3", path = "crates/remowt-client" }
+remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }
+remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }
+remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }
bifrostlink = "0.2.0"
bifrostlink-macros = "0.2.0"
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -11,7 +11,7 @@
use bifrostlink_ports::stdio::from_stdio;
use bifrostlink_ports::unix_socket::from_socket;
use clap::Parser;
-use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};
+use remowt_endpoints::{fs::Fs, nix_daemon::NixDaemon, pty::Pty, subprocess::Subprocess, systemd::Systemd};
use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};
use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};
use remowt_ui_prompt::bifrost::PromptEndpointsClient;
@@ -21,7 +21,7 @@
use tokio::net::UnixStream;
use tokio::runtime::Builder;
use tokio::task::AbortHandle;
-use tracing::{debug, info, trace};
+use tracing::{debug, trace};
use zbus::fdo;
use zbus::zvariant::{OwnedValue, Str};
use zbus::{interface, proxy, Connection};
@@ -85,7 +85,7 @@
identities: Vec<Identity>,
) -> zbus::fdo::Result<()> {
use std::fmt::Write;
- info!("begin auth");
+ debug!("begin auth");
let _cancel_guard = Arc::new(OnceLock::new());
let task = {
let helper = self.helper.clone();
@@ -220,6 +220,8 @@
/// Expect own address to be AgentPrivileged, skip installing polkit agent
#[arg(long)]
privileged: bool,
+ #[arg(long)]
+ local: bool,
},
LocalAgent,
}
@@ -240,7 +242,11 @@
} => runtime.block_on(askpass::ask(&prompt, description)),
Opts::LocalAgent => runtime.block_on(main_real()),
Opts::Editor { path } => runtime.block_on(editor::edit(path)),
- Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),
+ Opts::RealAgent {
+ path,
+ privileged,
+ local,
+ } => runtime.block_on(main_real_agent(path, privileged, local)),
}
}
async fn main_real() -> anyhow::Result<()> {
@@ -253,7 +259,11 @@
let _conn = conn;
pending().await
}
-async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {
+async fn main_real_agent(
+ path: Option<PathBuf>,
+ privileged: bool,
+ local: bool,
+) -> anyhow::Result<()> {
let address = if privileged {
Address::AgentPrivileged
} else {
@@ -264,6 +274,8 @@
Fs::new().register_endpoints(&mut rpc);
Systemd.register_endpoints(&mut rpc);
Pty::new().register_endpoints(&mut rpc);
+ Subprocess::new().register_endpoints(&mut rpc);
+ NixDaemon.register_endpoints(&mut rpc);
remowt_plugin::host::serve(&mut rpc);
@@ -312,7 +324,7 @@
};
rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));
- let polkit_conn = if !privileged {
+ let polkit_conn = if !privileged && !local {
// The unprivileged agent doubles as a polkit authentication agent so
// `run0` (e.g. our own elevation) routes its prompt to the User over
// bifrost instead of failing on a tty-less session.
cmds/remowt-ssh/Cargo.tomldiffbeforeafterboth--- a/cmds/remowt-ssh/Cargo.toml
+++ b/cmds/remowt-ssh/Cargo.toml
@@ -11,7 +11,7 @@
tracing-subscriber.workspace = true
bifrostlink.workspace = true
remowt-link-shared.workspace = true
-remowt-client = { workspace = true, features = ["shell"] }
+remowt-client.workspace = true
tokio = { workspace = true, features = [
"macros",
"fs",
cmds/remowt-ssh/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -24,9 +24,16 @@
#[derive(Parser)]
enum Opts {
/// Connect to remote host with remowt agent.
- Ssh { host: String },
+ Ssh {
+ host: String,
+ #[arg(long)]
+ escalate: bool,
+ },
/// Connect to local host for testing the connectivity.
- Local,
+ Local {
+ #[arg(long)]
+ escalate: bool,
+ },
}
fn agents_dir() -> anyhow::Result<PathBuf> {
@@ -45,9 +52,9 @@
let opts = Opts::parse();
let bundle = AgentBundle::from_dir(agents_dir()?)?;
- let conn = match &opts {
- Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,
- Opts::Local => Remowt::connect_local(&bundle).await?,
+ let (conn, escalate) = match &opts {
+ Opts::Ssh { host, escalate } => (Remowt::connect(host, &bundle).await?, *escalate),
+ Opts::Local { escalate } => (Remowt::connect_local(&bundle).await?, *escalate),
};
let mut rpc = conn.rpc();
@@ -56,8 +63,8 @@
PrependSourcePrompter {
prompter: RofiPrompter,
source: match opts {
- Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
- Opts::Local => vec![],
+ Opts::Ssh { host, .. } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
+ Opts::Local { .. } => vec![],
},
description: "".to_owned(),
},
@@ -67,13 +74,13 @@
}
debug!("entering shell");
- run_shell(&conn).await?;
+ run_shell(&conn, escalate).await?;
debug!("shell ended");
Ok(())
}
-async fn run_shell(conn: &Remowt) -> anyhow::Result<()> {
+async fn run_shell(conn: &Remowt, escalate: bool) -> anyhow::Result<()> {
let term = match std::env::var("TERM") {
Ok(v) => v,
Err(VarError::NotPresent) => "xterm-256color".to_owned(),
@@ -81,7 +88,7 @@
};
let (cols, rows) = term_size().unwrap_or((80, 24));
- let shell = conn.open_shell(&term, cols, rows).await?;
+ let shell = conn.open_shell(&term, cols, rows, escalate).await?;
let resizer = shell.resizer();
let stream = shell.stream;
crates/remowt-client/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -27,7 +27,6 @@
] }
tracing.workspace = true
uuid = { workspace = true, features = ["v4"] }
-remowt-endpoints = { workspace = true, optional = true }
-
-[features]
-shell = ["dep:remowt-endpoints"]
+remowt-endpoints.workspace = true
+tokio-util = { workspace = true, features = ["codec"] }
+futures.workspace = true
crates/remowt-client/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -6,7 +6,6 @@
use anyhow::{anyhow, bail, ensure, Context as _, Result};
use bifrostlink::declarative::RemoteEndpoints;
use bifrostlink::{Remote, Rpc, Rtt};
-use bifrostlink_ports::unix_socket::from_socket;
use camino::{Utf8Path, Utf8PathBuf};
use remowt_link_shared::plugin::PluginEndpointsClient;
use remowt_link_shared::port::child_port;
@@ -19,26 +18,23 @@
use russh::Channel;
use tempfile::TempDir;
use tokio::net::UnixListener;
-use tokio::sync::oneshot::{self, channel};
+use tokio::sync::oneshot;
use tokio::{
fs,
- io::{AsyncReadExt as _, AsyncWriteExt as _},
+ io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},
};
-use tracing::{debug, error};
+use tracing::{debug, warn};
use uuid::Uuid;
-
-use self::port::channel_port;
-use self::subprocess::RemowtChild;
pub mod editor;
mod forwarded;
-mod port;
-#[cfg(feature = "shell")]
mod shell;
+mod ssh_exec;
mod subprocess;
+use self::ssh_exec::SshExecChild;
+pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};
pub use forwarded::{RemowtListener, RemowtStream};
-#[cfg(feature = "shell")]
pub use shell::{RemowtShell, RemowtShellResizer};
type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
@@ -99,18 +95,12 @@
let ch = sess.channel_open_session().await?;
ch.exec(true, cmd).await?;
- let mut child = RemowtChild::from_exec(ch);
+ let mut child = SshExecChild::from_exec(ch);
drop(child.stdin);
+ drain_stderr(child.stderr, cmd.to_owned());
let mut out = Vec::new();
- let mut err = Vec::new();
- tokio::try_join!(
- child.stdout.read_to_end(&mut out),
- child.stderr.read_to_end(&mut err),
- )?;
- if !err.is_empty() {
- error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());
- }
+ child.stdout.read_to_end(&mut out).await?;
let code = child.exit.await.ok().flatten();
Ok((code, out))
}
@@ -140,9 +130,7 @@
.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;
debug!("get dir");
- let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")
- .await?
- .to_owned();
+ let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;
let dir = if cache.is_empty() {
let home = run_string_ok(sess, "echo \"$HOME\"").await?;
ensure!(
@@ -183,7 +171,7 @@
debug!("cat");
ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;
- let mut child = RemowtChild::from_exec(ch);
+ let mut child = SshExecChild::from_exec(ch);
child
.stdin
.write_all(&bytes)
@@ -205,39 +193,6 @@
)
.await?;
Ok(())
-}
-
-async fn detect_escalation(
- sess: &Handle<SshHandler>,
-) -> Result<(&'static str, &'static [&'static str])> {
- for (tool, flags) in ESCALATORS {
- // `tool` is a fixed identifier (no metacharacters), safe to interpolate.
- let (code, _) = run(sess, &format!("command -v {tool}")).await?;
- if code == Some(0) {
- return Ok((tool, flags));
- }
- }
- bail!("no escalation tool found on remote")
-}
-
-fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {
- let mut parts = vec![tool.to_owned()];
- parts.extend(flags.iter().map(|f| f.to_string()));
- parts.push(sh_quote(agent_path));
- parts.push("real-agent".to_owned());
- parts.push("--privileged".to_owned());
- if let Some(p) = path {
- parts.push("--path".to_owned());
- parts.push(sh_quote(p));
- }
- parts.join(" ")
-}
-
-fn find_in_path(name: &str) -> Option<std::path::PathBuf> {
- let path = env::var_os("PATH")?;
- env::split_paths(&path)
- .map(|dir| dir.join(name))
- .find(|p| p.is_file())
}
pub struct SshHandler {
@@ -285,17 +240,23 @@
},
}
-pub struct Remowt {
+struct RemowtInner {
transport: Transport,
rpc: Rpc<BifConfig>,
elevated: tokio::sync::OnceCell<()>,
+ #[allow(dead_code)]
children: Mutex<Vec<tokio::process::Child>>,
_runtime_tmp: Option<TempDir>,
}
+#[derive(Clone)]
+pub struct Remowt(Arc<RemowtInner>);
+
pub type RemowtRemote = Remote<BifConfig>;
impl Remowt {
+ /// Connect to the remote host over ssh, detect the architecture and deploy the required
+ /// agent binary.
pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {
let conf = russh_config::parse_home(host)?;
let port = conf.host_config.port.or(conf.port).unwrap_or(22);
@@ -346,36 +307,24 @@
debug!("runtime dir");
let runtime_dir = remote_runtime_dir(&sess).await?;
- let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));
- let (onetx, onerx) = channel();
- subs.lock().expect("lock").insert(primary.clone(), onetx);
- sess.streamlocal_forward(primary.clone()).await?;
-
let rpc = Rpc::<BifConfig>::new(Address::User);
- // TODO: ensure no injection is possible in the socket path.
let cmd_chan = sess.channel_open_session().await?;
debug!("starting agent");
cmd_chan
- .exec(
- true,
- format!(
- "{} real-agent --path={}",
- sh_quote(&agent_path),
- sh_quote(&primary)
- ),
- )
+ .exec(true, format!("{} real-agent", sh_quote(&agent_path)))
.await?;
- let port = channel_port(
- onerx
- .await
- .map_err(|_| anyhow!("agent never opened its channel"))?,
+ let child = SshExecChild::from_exec(cmd_chan);
+ drain_stderr(child.stderr, "agent".to_owned());
+ rpc.add_direct(
+ Address::Agent,
+ child_port(child.stdout, child.stdin),
+ Rtt(0),
);
- rpc.add_direct(Address::Agent, port, Rtt(0));
- Ok(Self {
+ Ok(Self(Arc::new(RemowtInner {
transport: Transport::Ssh {
sess,
subs,
@@ -386,13 +335,15 @@
elevated: tokio::sync::OnceCell::new(),
children: Mutex::new(Vec::new()),
_runtime_tmp: None,
- })
+ })))
}
+ /// "Connect" to the local machine's agent, by starting the agent binary locally.
pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {
let agent_path = bundle.local_binary()?;
let mut child = tokio::process::Command::new(&agent_path)
.arg("real-agent")
+ .arg("--local")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.kill_on_drop(true)
@@ -406,7 +357,7 @@
let (runtime_dir, runtime_tmp) = local_runtime_dir()?;
- Ok(Self {
+ Ok(Self(Arc::new(RemowtInner {
transport: Transport::Local {
agent_path,
runtime_dir,
@@ -415,22 +366,19 @@
elevated: tokio::sync::OnceCell::new(),
children: Mutex::new(vec![child]),
_runtime_tmp: runtime_tmp,
- })
+ })))
}
+ /// Get the handle to the underlying russh session handle.
pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {
- match &self.transport {
+ match &self.0.transport {
Transport::Ssh { sess, .. } => Some(sess.clone()),
Transport::Local { .. } => None,
}
}
pub fn rpc(&self) -> Rpc<BifConfig> {
- self.rpc.clone()
- }
-
- pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {
- R::wrap(self.rpc.remote(Address::Agent))
+ self.0.rpc.clone()
}
pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {
@@ -441,85 +389,118 @@
.map_err(|e| anyhow!("agent failed to load plugin: {e}"))
}
pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {
- self.ensure_elevated().await?;
+ self.ensure_escalated().await?;
let client: PluginEndpointsClient<BifConfig> =
- PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));
+ PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));
client
.load_plugin_path(id, path.to_owned())
.await?
.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))
}
pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {
- R::wrap(self.rpc.remote(Address::Plugin(id)))
+ R::wrap(self.0.rpc.remote(Address::Plugin(id)))
}
+
+ pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {
+ R::wrap(self.0.rpc.remote(Address::Agent))
+ }
pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {
- self.ensure_elevated().await?;
- Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))
+ self.ensure_escalated().await?;
+ Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))
}
- async fn ensure_elevated(&self) -> Result<()> {
- self.elevated
+ async fn ensure_escalated(&self) -> Result<()> {
+ self.0
+ .elevated
.get_or_try_init(|| async {
- let port = match &self.transport {
- Transport::Ssh {
- sess, agent_path, ..
- } => {
- let (tool, flags) = detect_escalation(sess).await?;
- let ch = sess.channel_open_session().await?;
- ch.exec(true, privileged_cmd(tool, flags, agent_path, None))
- .await?;
- channel_port(ch)
- }
- Transport::Local { agent_path, .. } => {
- let sock = self
- .runtime_dir()
- .join(format!("remowt-priv-{}.sock", Uuid::new_v4()));
- let _ = std::fs::remove_file(&sock);
- let listener = UnixListener::bind(&sock)?;
- let (tool, flags) = ESCALATORS
- .iter()
- .find(|(t, _)| find_in_path(t).is_some())
- .ok_or_else(|| anyhow!("no escalation tool found"))?;
- let child = tokio::process::Command::new(tool)
- .args(*flags)
- .arg(agent_path)
- .arg("real-agent")
- .arg("--privileged")
- .arg("--path")
- .arg(sock.as_str())
- .kill_on_drop(true)
- .spawn()?;
- self.children.lock().expect("lock").push(child);
- let (stream, _) = listener.accept().await?;
- let _ = std::fs::remove_file(&sock);
- from_socket(stream)
- }
+ let (agent_path, local) = match &self.0.transport {
+ Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),
+ Transport::Local { agent_path, .. } => (
+ agent_path
+ .to_str()
+ .ok_or_else(|| anyhow!("local agent path is not utf-8"))?
+ .to_owned(),
+ true,
+ ),
};
- self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));
+
+ let (tool, flags) = self.detect_escalation().await?;
+ let mut args: Vec<String> = flags.iter().map(|f| (*f).to_owned()).collect();
+ args.push(agent_path);
+ args.push("real-agent".to_owned());
+ args.push("--privileged".to_owned());
+ if local {
+ args.push("--local".to_owned());
+ }
+
+ let child = self
+ .spawn(SpawnOptions {
+ program: tool.to_owned(),
+ args,
+ stdin: StdioMode::Pipe,
+ stdout: StdioMode::Pipe,
+ stderr: StderrMode::Inherit,
+ ..Default::default()
+ })
+ .await
+ .context("spawning privileged agent")?;
+
+ let stdin = child
+ .stdin
+ .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;
+ let stdout = child
+ .stdout
+ .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;
+
+ let port = child_port(stdout, stdin);
+ self.0
+ .rpc
+ .add_direct(Address::AgentPrivileged, port, Rtt(0));
anyhow::Ok(())
})
.await?;
Ok(())
}
- pub async fn exec(&self, command: String) -> Result<RemowtChild> {
- let Some(sess) = self.ssh() else {
- bail!("exec should not be called on local")
- };
- let ch = sess.channel_open_session().await?;
- ch.exec(true, command).await?;
- Ok(RemowtChild::from_exec(ch))
+ async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {
+ for (tool, flags) in ESCALATORS {
+ let probe = self
+ .spawn(SpawnOptions {
+ program: (*tool).to_owned(),
+ args: vec!["--version".to_owned()],
+ stdout: StdioMode::Null,
+ stderr: StderrMode::Null,
+ ..Default::default()
+ })
+ .await;
+ if let Ok(child) = probe {
+ let _ = child.wait().await;
+ return Ok((tool, flags));
+ }
+ }
+ bail!("no escalation tool found")
}
- fn runtime_dir(&self) -> Utf8PathBuf {
- match &self.transport {
+ /// XDG_RUNTIME_DIR on the remote machine.
+ pub fn runtime_dir(&self) -> Utf8PathBuf {
+ match &self.0.transport {
Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),
Transport::Local { runtime_dir, .. } => runtime_dir.clone(),
}
}
- pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {
- match &self.transport {
+ /// Bind unix listener socket on the remote machine with auto-generated path, returning the path.
+ pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {
+ let sock = self
+ .runtime_dir()
+ .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));
+ let listener = self.bind_unix(&sock).await?;
+ Ok((listener, sock))
+ }
+
+ /// Bind unix listener socket on the remote machine on the specified path.
+ pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {
+ match &self.0.transport {
Transport::Ssh { sess, subs, .. } => {
let (tx, rx) = oneshot::channel();
subs.lock().expect("lock").insert(path.to_owned(), tx);
@@ -537,6 +518,22 @@
}
}
+fn drain_stderr(stream: DuplexStream, context: String) {
+ tokio::spawn(async move {
+ let mut reader = BufReader::new(stream).lines();
+ loop {
+ match reader.next_line().await {
+ Ok(Some(line)) => warn!(context = %context, "{line}"),
+ Ok(None) => break,
+ Err(e) => {
+ warn!(context = %context, "stderr read failed: {e}");
+ break;
+ }
+ }
+ }
+ });
+}
+
fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {
if !dir.is_empty() {
@@ -556,34 +553,9 @@
let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;
let dir = dir.trim();
if dir.is_empty() {
- remote_mktemp(sess).await
+ let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;
+ Ok(Utf8PathBuf::from(tmp))
} else {
Ok(Utf8PathBuf::from(dir))
- }
-}
-
-async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {
- let mut cmd_chan = sess.channel_open_session().await?;
- cmd_chan
- .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")
- .await?;
- let mut stdout = vec![];
- loop {
- let Some(msg) = cmd_chan.wait().await else {
- bail!("unexpected channel end");
- };
- match msg {
- russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),
- russh::ChannelMsg::ExitStatus { exit_status } => {
- if exit_status != 0 {
- bail!("mktemp failed");
- }
- break;
- }
- _ => {}
- }
}
- ensure!(stdout.ends_with(b"\n"));
- stdout.pop();
- Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))
}
crates/remowt-client/src/port.rsdiffbeforeafterboth--- a/crates/remowt-client/src/port.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-use std::io;
-
-use bifrostlink::Port;
-use bytes::{Bytes, BytesMut};
-use russh::{Channel, ChannelStream};
-use russh::client::Msg;
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};
-use tokio::join;
-use tracing::error;
-
-async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
- let len = srx.read_u32().await?;
- let mut buf = BytesMut::zeroed(len as usize);
- srx.read_exact(&mut buf).await?;
- Ok(buf)
-}
-async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
- stx.write_u32(value.len().try_into().expect("can't be larger"))
- .await?;
- stx.write_all(&value).await?;
- Ok(())
-}
-
-pub fn channel_port(ch: Channel<Msg>) -> Port {
- Port::new(move |mut rx, tx| async move {
- let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
- let srx_task = async move {
- loop {
- match read(&mut srx).await {
- Ok(buf) => {
- if tx.send(buf.freeze()).is_err() {
- break;
- }
- }
- Err(e) => {
- error!("channel read failed: {e}");
- break;
- }
- }
- }
- };
- let stx_task = async move {
- while let Some(value) = rx.recv().await {
- if let Err(e) = write(&mut stx, value).await {
- error!("channel write failed: {e}");
- break;
- }
- }
- };
- join!(srx_task, stx_task);
- })
-}
crates/remowt-client/src/shell.rsdiffbeforeafterboth--- a/crates/remowt-client/src/shell.rs
+++ b/crates/remowt-client/src/shell.rs
@@ -3,7 +3,6 @@
use bifrostlink::Remote;
use remowt_endpoints::pty::{PtyClient, ShellId};
use remowt_link_shared::{Address, BifConfig};
-use uuid::Uuid;
use crate::forwarded::RemowtStream;
use crate::Remowt;
@@ -38,15 +37,21 @@
}
impl Remowt {
- pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {
- let sock = self
- .runtime_dir()
- .join(format!("remowt-shell-{}.sock", Uuid::new_v4()));
-
- let forwarded = self.forward_socket(&sock).await?;
- let client: PtyClient<BifConfig> = self.endpoints();
+ pub async fn open_shell(
+ &self,
+ term: &str,
+ cols: u16,
+ rows: u16,
+ escalate: bool,
+ ) -> Result<RemowtShell> {
+ let (forwarded, path) = self.bind_runtime_unix("shell").await?;
+ let client: PtyClient<BifConfig> = if escalate {
+ self.run0_endpoints().await?
+ } else {
+ self.endpoints()
+ };
let id = client
- .open_shell(sock, term.to_owned(), cols, rows)
+ .open_shell(path, term.to_owned(), cols, rows)
.await?
.map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
let stream = forwarded.accept().await?;
@@ -54,7 +59,7 @@
Ok(RemowtShell {
id,
stream,
- remote: self.rpc.remote(Address::Agent),
+ remote: self.0.rpc.remote(Address::Agent),
})
}
}
crates/remowt-client/src/ssh_exec.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/ssh_exec.rs
@@ -0,0 +1,82 @@
+use bytes::Bytes;
+use russh::client::Msg;
+use russh::{Channel, ChannelMsg};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
+use tokio::sync::oneshot;
+
+const BUF: usize = 64 * 1024;
+
+pub(crate) struct SshExecChild {
+ pub stdin: DuplexStream,
+ pub stdout: DuplexStream,
+ pub stderr: DuplexStream,
+ pub exit: oneshot::Receiver<Option<u32>>,
+}
+
+impl SshExecChild {
+ /// Manage channel returned by russh exec().
+ pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
+ let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
+ let (mut out_w, stdout) = tokio::io::duplex(BUF);
+ let (mut err_w, stderr) = tokio::io::duplex(BUF);
+ let (exit_tx, exit) = oneshot::channel();
+
+ tokio::spawn(async move {
+ let (mut read, write) = ch.split();
+
+ let stdin_pump = tokio::spawn(async move {
+ let mut buf = vec![0u8; BUF];
+ loop {
+ match stdin_r.read(&mut buf).await {
+ Ok(0) | Err(_) => break,
+ Ok(n) => {
+ if write
+ .data_bytes(Bytes::copy_from_slice(&buf[..n]))
+ .await
+ .is_err()
+ {
+ return;
+ }
+ }
+ }
+ }
+ let _ = write.eof().await;
+ });
+
+ let mut code = None;
+ while let Some(msg) = read.wait().await {
+ match msg {
+ ChannelMsg::Data { data } => {
+ if out_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExtendedData { data, .. } => {
+ if err_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
+ _ => {}
+ }
+ }
+
+ stdin_pump.abort();
+ let _ = out_w.shutdown().await;
+ let _ = err_w.shutdown().await;
+ let _ = exit_tx.send(code);
+ });
+
+ SshExecChild {
+ stdin,
+ stdout,
+ stderr,
+ exit,
+ }
+ }
+
+ /// Wait for the process to finish, returning its exit status.
+ pub(crate) async fn wait(self) -> Option<u32> {
+ self.exit.await.ok().flatten()
+ }
+}
crates/remowt-client/src/subprocess.rsdiffbeforeafterboth--- a/crates/remowt-client/src/subprocess.rs
+++ b/crates/remowt-client/src/subprocess.rs
@@ -1,84 +1,430 @@
-use bytes::Bytes;
-use russh::client::Msg;
-use russh::{Channel, ChannelMsg};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
-use tokio::sync::oneshot;
+use std::pin::pin;
+
+use anyhow::{anyhow, bail, Result};
+use camino::Utf8PathBuf;
+use futures::StreamExt as _;
+use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
+use remowt_link_shared::BifConfig;
+use serde::de::DeserializeOwned;
+use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};
+use tokio::select;
+use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
+use tracing::{debug, info, warn};
+
+use crate::forwarded::{RemowtListener, RemowtStream};
+use crate::Remowt;
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
+pub enum StdioMode {
+ #[default]
+ Null,
+ Pipe,
+ Inherit,
+}
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
+pub enum StderrMode {
+ #[default]
+ Null,
+ Pipe,
+ Inherit,
+ MergeWithStdout,
+}
-const BUF: usize = 64 * 1024;
+#[derive(Default)]
+pub struct SpawnOptions {
+ pub program: String,
+ pub args: Vec<String>,
+ pub env: Vec<(String, String)>,
+ pub env_clear: bool,
+ pub cwd: Option<Utf8PathBuf>,
+ pub escalated: bool,
+ pub stdin: StdioMode,
+ pub stdout: StdioMode,
+ pub stderr: StderrMode,
+}
pub struct RemowtChild {
- pub stdin: DuplexStream,
- pub stdout: DuplexStream,
- pub stderr: DuplexStream,
- pub exit: oneshot::Receiver<Option<u32>>,
+ pub stdin: Option<RemowtStream>,
+ pub stdout: Option<RemowtStream>,
+ pub stderr: Option<RemowtStream>,
+ id: ProcId,
+ client: SubprocessClient<BifConfig>,
}
impl RemowtChild {
- /// Manage channel returned by russh exec().
- pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
- let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
- let (mut out_w, stdout) = tokio::io::duplex(BUF);
- let (mut err_w, stderr) = tokio::io::duplex(BUF);
- let (exit_tx, exit) = oneshot::channel();
+ pub async fn wait(self) -> Result<Option<i32>> {
+ let RemowtChild {
+ stdin,
+ stdout,
+ stderr,
+ id,
+ client,
+ } = self;
+ drop(stdin);
+ drop(stdout);
+ drop(stderr);
+ client
+ .wait(id)
+ .await?
+ .map_err(|e| anyhow!("agent wait failed: {e}"))
+ }
+
+ pub async fn kill(&self, signal: i32) -> Result<()> {
+ self.client
+ .kill(self.id, signal)
+ .await?
+ .map_err(|e| anyhow!("agent kill failed: {e}"))
+ }
+}
+
+fn needs_socket(m: StdioMode) -> bool {
+ matches!(m, StdioMode::Pipe | StdioMode::Inherit)
+}
+
+fn stderr_needs_socket(m: StderrMode) -> bool {
+ matches!(m, StderrMode::Pipe | StderrMode::Inherit)
+}
+
+impl Remowt {
+ pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {
+ let SpawnOptions {
+ program,
+ args,
+ env,
+ env_clear,
+ cwd,
+ escalated,
+ stdin,
+ stdout,
+ stderr,
+ } = opts;
+
+ if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {
+ bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");
+ }
+
+ let stdin_bound = if needs_socket(stdin) {
+ Some(self.bind_runtime_unix("proc-stdin").await?)
+ } else {
+ None
+ };
+ let stdout_bound = if needs_socket(stdout) {
+ Some(self.bind_runtime_unix("proc-stdout").await?)
+ } else {
+ None
+ };
+ let stderr_bound = if stderr_needs_socket(stderr) {
+ Some(self.bind_runtime_unix("proc-stderr").await?)
+ } else {
+ None
+ };
+
+ let stdin_spec = match &stdin_bound {
+ Some((_, p)) => StdioSpec::Socket(p.clone()),
+ None => StdioSpec::Null,
+ };
+ let stdout_spec = match &stdout_bound {
+ Some((_, p)) => StdioSpec::Socket(p.clone()),
+ None => StdioSpec::Null,
+ };
+ let stderr_spec = match (&stderr, &stderr_bound) {
+ (StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),
+ (StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,
+ _ => StderrSpec::Null,
+ };
- tokio::spawn(async move {
- let (mut read, write) = ch.split();
+ let client: SubprocessClient<BifConfig> = if escalated {
+ // Boxed to break the async-fn type cycle
+ Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?
+ } else {
+ self.endpoints()
+ };
- // Forward our stdin to the channel, signalling EOF when it closes.
- let stdin_pump = tokio::spawn(async move {
- let mut buf = vec![0u8; BUF];
- loop {
- match stdin_r.read(&mut buf).await {
- Ok(0) | Err(_) => break,
- Ok(n) => {
- if write
- .data_bytes(Bytes::copy_from_slice(&buf[..n]))
- .await
- .is_err()
- {
- return;
- }
- }
+ let spec = SpawnSpec {
+ program: program.clone(),
+ args,
+ env,
+ env_clear,
+ cwd,
+ stdin: stdin_spec,
+ stdout: stdout_spec,
+ stderr: stderr_spec,
+ };
+ let id = client
+ .spawn(spec)
+ .await?
+ .map_err(|e| anyhow!("agent spawn failed: {e}"))?;
+
+ let (stdin_res, stdout_res, stderr_res) = tokio::join!(
+ accept(stdin_bound),
+ accept(stdout_bound),
+ accept(stderr_bound),
+ );
+
+ let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
+ let stdout_stream = handle_output(stdout, stdout_res?, &program, false);
+ let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
+
+ Ok(RemowtChild {
+ stdin: stdin_stream,
+ stdout: stdout_stream,
+ stderr: stderr_stream,
+ id,
+ client,
+ })
+ }
+
+ pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {
+ let program = program.as_ref().to_owned();
+ RemowtCommand {
+ program,
+ args: vec![],
+ env: vec![],
+ remowt: self.clone(),
+ escalated: false,
+ }
+ }
+}
+
+async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {
+ match b {
+ Some((l, _)) => Ok(Some(l.accept().await?)),
+ None => Ok(None),
+ }
+}
+
+fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
+ match mode {
+ StdioMode::Pipe => s,
+ StdioMode::Inherit => {
+ if let Some(s) = s {
+ let program = program.to_owned();
+ tokio::spawn(async move {
+ let mut stdin = tokio::io::stdin();
+ let mut s = s;
+ if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {
+ warn!(program, "stdin forward ended: {e}");
}
- }
- let _ = write.eof().await;
- });
+ let _ = s.shutdown().await;
+ });
+ }
+ None
+ }
+ StdioMode::Null => None,
+ }
+}
+
+fn handle_output(
+ mode: StdioMode,
+ s: Option<RemowtStream>,
+ program: &str,
+ is_stderr: bool,
+) -> Option<RemowtStream> {
+ match mode {
+ StdioMode::Pipe => s,
+ StdioMode::Inherit => {
+ if let Some(s) = s {
+ let program = program.to_owned();
+ tokio::spawn(pump_to_tracing(s, program, is_stderr));
+ }
+ None
+ }
+ StdioMode::Null => None,
+ }
+}
+
+fn handle_output_err(
+ mode: StderrMode,
+ s: Option<RemowtStream>,
+ program: &str,
+) -> Option<RemowtStream> {
+ match mode {
+ StderrMode::Pipe => s,
+ StderrMode::Inherit => {
+ if let Some(s) = s {
+ let program = program.to_owned();
+ tokio::spawn(pump_to_tracing(s, program, true));
+ }
+ None
+ }
+ StderrMode::MergeWithStdout | StderrMode::Null => None,
+ }
+}
- let mut code = None;
- while let Some(msg) = read.wait().await {
- match msg {
- ChannelMsg::Data { data } => {
- if out_w.write_all(&data).await.is_err() {
- break;
- }
- }
- ChannelMsg::ExtendedData { data, .. } => {
- if err_w.write_all(&data).await.is_err() {
- break;
- }
- }
- ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
- _ => {}
+async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {
+ let mut reader = BufReader::new(stream).lines();
+ loop {
+ match reader.next_line().await {
+ Ok(Some(line)) => {
+ if is_stderr {
+ warn!(program, "{line}");
+ } else {
+ info!(program, "{line}");
}
}
+ Ok(None) => break,
+ Err(e) => {
+ warn!(program, "child log stream error: {e}");
+ break;
+ }
+ }
+ }
+}
+
+fn escape_bash(input: &str, out: &mut String) {
+ const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
+ if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
+ out.push_str(input);
+ return;
+ }
+ out.push('\'');
+ for (i, v) in input.split('\'').enumerate() {
+ if i != 0 {
+ out.push_str("'\"'\"'");
+ }
+ out.push_str(v);
+ }
+ out.push('\'');
+}
+
+#[derive(Clone)]
+pub struct RemowtCommand {
+ program: String,
+ args: Vec<String>,
+ env: Vec<(String, String)>,
+ remowt: Remowt,
+ escalated: bool,
+}
+
+impl RemowtCommand {
+ pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
+ self.args.push(arg.as_ref().to_owned());
+ self
+ }
+ pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
+ for arg in args {
+ self.args.push(arg.as_ref().to_owned());
+ }
+ self
+ }
+ pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+ self.args
+ .push(format!("{}={}", key.as_ref(), value.as_ref()));
+ self
+ }
+ pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+ self.args.push(key.as_ref().to_owned());
+ self.args.push(value.as_ref().to_owned());
+ self
+ }
+ pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+ self.env
+ .push((name.as_ref().to_owned(), value.as_ref().to_owned()));
+ self
+ }
- // The process is gone; stop waiting on stdin we'll never forward.
- stdin_pump.abort();
- let _ = out_w.shutdown().await;
- let _ = err_w.shutdown().await;
- let _ = exit_tx.send(code);
- });
+ pub fn sudo(mut self) -> Self {
+ self.escalated = true;
+ self
+ }
- RemowtChild {
- stdin,
- stdout,
- stderr,
- exit,
+ /// Only for display.
+ fn shell_line(&self) -> String {
+ let mut out = String::new();
+ if self.escalated {
+ out.push_str("run0 ");
+ }
+ if !self.env.is_empty() {
+ out.push_str("env");
+ for (k, v) in &self.env {
+ out.push(' ');
+ assert!(!k.contains('='));
+ escape_bash(k, &mut out);
+ out.push('=');
+ escape_bash(v, &mut out);
+ }
+ out.push(' ');
}
+ escape_bash(&self.program, &mut out);
+ for arg in &self.args {
+ out.push(' ');
+ escape_bash(arg, &mut out);
+ }
+ out
}
- /// Wait for the process to finish, returning its exit status.
- pub async fn wait(self) -> Option<u32> {
- self.exit.await.ok().flatten()
+ fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {
+ let line = self.shell_line();
+ let opts = SpawnOptions {
+ program: self.program,
+ args: self.args,
+ env: self.env,
+ env_clear: false,
+ cwd: None,
+ escalated: self.escalated,
+ stdin: StdioMode::Null,
+ stdout: StdioMode::Pipe,
+ stderr: StderrMode::Pipe,
+ };
+ (self.remowt, opts, line)
+ }
+
+ pub async fn run(self) -> Result<()> {
+ run_inner(self, false).await.map(|_| ())
+ }
+ pub async fn run_string(self) -> Result<String> {
+ let bytes = run_inner(self, true).await?.expect("want_stdout");
+ Ok(String::from_utf8(bytes)?)
+ }
+ pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {
+ let s = self.run_string().await?;
+ Ok(serde_json::from_str(&s)?)
+ }
+}
+
+async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {
+ let (remowt, opts, line) = cmd.into_spawn_options();
+ debug!("running command {line:?} over remowt");
+ let program = opts.program.clone();
+ let mut child = remowt.spawn(opts).await?;
+ let stderr = child.stderr.take().expect("stderr=Pipe");
+ let stdout = child.stdout.take().expect("stdout=Pipe");
+
+ let mut err = FramedRead::new(stderr, LinesCodec::new());
+ let (mut out_bytes, mut out_lines) = if want_stdout {
+ (Some(FramedRead::new(stdout, BytesCodec::new())), None)
+ } else {
+ (None, Some(FramedRead::new(stdout, LinesCodec::new())))
+ };
+
+ let mut buf = if want_stdout { Some(Vec::new()) } else { None };
+
+ let mut wait = pin!(child.wait());
+ let exit = loop {
+ select! {
+ biased;
+
+ Some(e) = err.next() => {
+ let e = e?;
+ warn!(program = %program, "{e}");
+ }
+ Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {
+ buf.as_mut().expect("want_stdout").extend_from_slice(&o?);
+ }
+ Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {
+ let o = o?;
+ info!(program = %program, "{o}");
+ }
+ res = &mut wait => {
+ break res?;
+ }
+ }
+ };
+
+ match exit {
+ Some(0) => Ok(buf),
+ Some(c) => bail!("command '{line}' failed with status {c}"),
+ None => Err(anyhow!("command '{line}' ended without an exit status")),
}
}
crates/remowt-endpoints/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-endpoints/Cargo.toml
+++ b/crates/remowt-endpoints/Cargo.toml
@@ -16,5 +16,5 @@
tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }
tracing.workspace = true
uuid.workspace = true
-nix = { workspace = true, features = ["process", "term"] }
+nix = { workspace = true, features = ["process", "signal", "term"] }
zbus.workspace = true
crates/remowt-endpoints/src/lib.rsdiffbeforeafterbothcrates/remowt-endpoints/src/pty.rsdiffbeforeafterboth--- a/crates/remowt-endpoints/src/pty.rs
+++ b/crates/remowt-endpoints/src/pty.rs
@@ -16,7 +16,7 @@
use tokio::io::unix::AsyncFd;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::UnixStream;
-use tracing::{info, warn};
+use tracing::{debug, info, warn};
pub type ShellId = u64;
@@ -117,7 +117,7 @@
};
let pty = AsyncPty::new(master)?;
- info!(id, shell, "shell opened");
+ debug!(id, shell, "shell opened");
let shells = self.shells.clone();
tokio::spawn(async move {
let mut pty = pty;
crates/remowt-endpoints/src/subprocess.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-endpoints/src/subprocess.rs
@@ -0,0 +1,278 @@
+use std::collections::HashMap;
+use std::io;
+use std::process::Stdio;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{Arc, Mutex};
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use camino::Utf8PathBuf;
+use nix::sys::signal::{self, Signal};
+use nix::unistd::Pid;
+use serde::{Deserialize, Serialize};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
+use tokio::net::UnixStream;
+use tokio::process::{ChildStderr, ChildStdout, Command};
+use tokio::sync::{mpsc, watch};
+use tracing::{debug, warn};
+
+pub type ProcId = u64;
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum StdioSpec {
+ Null,
+ Socket(Utf8PathBuf),
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum StderrSpec {
+ Null,
+ Socket(Utf8PathBuf),
+ MergeWithStdout,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct SpawnSpec {
+ pub program: String,
+ pub args: Vec<String>,
+ pub env: Vec<(String, String)>,
+ pub env_clear: bool,
+ pub cwd: Option<Utf8PathBuf>,
+ pub stdin: StdioSpec,
+ pub stdout: StdioSpec,
+ pub stderr: StderrSpec,
+}
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+ #[error("spawn failed: {0}")]
+ Spawn(String),
+ #[error("connect to forwarded socket failed: {0}")]
+ Connect(String),
+ #[error("no process with that id")]
+ NoSuchProcess,
+ #[error("MergeWithStdout requires stdout=Socket")]
+ BadMerge,
+ #[error("invalid signal: {0}")]
+ BadSignal(i32),
+ #[error("kill failed: {0}")]
+ Kill(String),
+ #[error("io error: {0}")]
+ Io(String),
+}
+
+impl From<io::Error> for Error {
+ fn from(e: io::Error) -> Self {
+ Error::Io(e.to_string())
+ }
+}
+
+struct ChildState {
+ pid: u32,
+ exit_rx: watch::Receiver<Option<Option<i32>>>,
+}
+
+#[derive(Clone, Default)]
+pub struct Subprocess {
+ children: Arc<Mutex<HashMap<ProcId, ChildState>>>,
+ next_id: Arc<AtomicU64>,
+}
+
+impl Subprocess {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+#[endpoints(ns = 10)]
+impl Subprocess {
+ #[endpoints(id = 1)]
+ async fn spawn(&self, spec: SpawnSpec) -> Result<ProcId, Error> {
+ let SpawnSpec {
+ program,
+ args,
+ env,
+ env_clear,
+ cwd,
+ stdin,
+ stdout,
+ stderr,
+ } = spec;
+
+ if matches!(stderr, StderrSpec::MergeWithStdout) && !matches!(stdout, StdioSpec::Socket(_))
+ {
+ return Err(Error::BadMerge);
+ }
+
+ let mut cmd = Command::new(&program);
+ cmd.args(&args);
+ if env_clear {
+ cmd.env_clear();
+ }
+ for (k, v) in &env {
+ cmd.env(k, v);
+ }
+ if let Some(cwd) = &cwd {
+ cmd.current_dir(cwd);
+ }
+ cmd.stdin(match &stdin {
+ StdioSpec::Socket(_) => Stdio::piped(),
+ StdioSpec::Null => Stdio::null(),
+ });
+ cmd.stdout(match &stdout {
+ StdioSpec::Socket(_) => Stdio::piped(),
+ StdioSpec::Null => Stdio::null(),
+ });
+ cmd.stderr(match &stderr {
+ StderrSpec::Socket(_) | StderrSpec::MergeWithStdout => Stdio::piped(),
+ StderrSpec::Null => Stdio::null(),
+ });
+ cmd.kill_on_drop(false);
+
+ let mut child = cmd.spawn().map_err(|e| Error::Spawn(e.to_string()))?;
+ let pid = child
+ .id()
+ .ok_or_else(|| Error::Spawn("child exited before pid available".to_owned()))?;
+
+ if let StdioSpec::Socket(path) = &stdin {
+ let sock = UnixStream::connect(path)
+ .await
+ .map_err(|e| Error::Connect(e.to_string()))?;
+ let mut stdin_w = child.stdin.take().expect("piped");
+ tokio::spawn(async move {
+ let (mut sr, _) = tokio::io::split(sock);
+ let _ = tokio::io::copy(&mut sr, &mut stdin_w).await;
+ let _ = stdin_w.shutdown().await;
+ });
+ }
+
+ let stdout_handle = child.stdout.take();
+ let stderr_handle = child.stderr.take();
+
+ match (&stdout, &stderr, stdout_handle, stderr_handle) {
+ (StdioSpec::Socket(out_path), StderrSpec::MergeWithStdout, Some(out), Some(err)) => {
+ let sock = UnixStream::connect(out_path)
+ .await
+ .map_err(|e| Error::Connect(e.to_string()))?;
+ tokio::spawn(merge_to_sock(out, err, sock));
+ }
+ (StdioSpec::Socket(out_path), _, Some(out), err_opt) => {
+ let sock = UnixStream::connect(out_path)
+ .await
+ .map_err(|e| Error::Connect(e.to_string()))?;
+ tokio::spawn(pump_to_sock(out, sock));
+ if let (StderrSpec::Socket(err_path), Some(err)) = (&stderr, err_opt) {
+ let err_sock = UnixStream::connect(err_path)
+ .await
+ .map_err(|e| Error::Connect(e.to_string()))?;
+ tokio::spawn(pump_to_sock(err, err_sock));
+ }
+ }
+ (StdioSpec::Null, StderrSpec::Socket(err_path), _, Some(err)) => {
+ let sock = UnixStream::connect(err_path)
+ .await
+ .map_err(|e| Error::Connect(e.to_string()))?;
+ tokio::spawn(pump_to_sock(err, sock));
+ }
+ _ => {}
+ }
+
+ let (exit_tx, exit_rx) = watch::channel(None);
+ let id = self.next_id.fetch_add(1, Ordering::Relaxed);
+ self.children
+ .lock()
+ .expect("not poisoned")
+ .insert(id, ChildState { pid, exit_rx });
+
+ debug!(id, pid, program, "subprocess spawned");
+ tokio::spawn(async move {
+ let result = child.wait().await;
+ let code = match result {
+ Ok(status) => status.code(),
+ Err(e) => {
+ warn!(id, "child.wait failed: {e}");
+ None
+ }
+ };
+ let _ = exit_tx.send(Some(code));
+ });
+
+ Ok(id)
+ }
+
+ #[endpoints(id = 2)]
+ async fn wait(&self, id: ProcId) -> Result<Option<i32>, Error> {
+ let mut rx = {
+ let map = self.children.lock().expect("not poisoned");
+ let entry = map.get(&id).ok_or(Error::NoSuchProcess)?;
+ entry.exit_rx.clone()
+ };
+ rx.wait_for(|v| v.is_some())
+ .await
+ .map_err(|_| Error::Io("exit channel closed".to_owned()))?;
+ let code = rx.borrow().flatten();
+ self.children.lock().expect("not poisoned").remove(&id);
+ Ok(code)
+ }
+
+ #[endpoints(id = 3)]
+ async fn kill(&self, id: ProcId, signal: i32) -> Result<(), Error> {
+ let pid = {
+ let map = self.children.lock().expect("not poisoned");
+ let entry = map.get(&id).ok_or(Error::NoSuchProcess)?;
+ entry.pid
+ };
+ let sig = Signal::try_from(signal).map_err(|_| Error::BadSignal(signal))?;
+ signal::kill(Pid::from_raw(pid as i32), sig).map_err(|e| Error::Kill(e.to_string()))?;
+ Ok(())
+ }
+}
+
+async fn pump_to_sock<R>(mut from: R, sock: UnixStream)
+where
+ R: tokio::io::AsyncRead + Unpin,
+{
+ let (_, mut sw) = tokio::io::split(sock);
+ let _ = tokio::io::copy(&mut from, &mut sw).await;
+ let _ = sw.shutdown().await;
+}
+
+async fn merge_to_sock(mut stdout: ChildStdout, mut stderr: ChildStderr, sock: UnixStream) {
+ let (_, mut sw) = tokio::io::split(sock);
+ let (tx, mut rx) = mpsc::channel::<Vec<u8>>(64);
+ let tx_out = tx.clone();
+ let out_pump = tokio::spawn(async move {
+ let mut buf = vec![0u8; 4096];
+ loop {
+ match stdout.read(&mut buf).await {
+ Ok(0) | Err(_) => break,
+ Ok(n) => {
+ if tx_out.send(buf[..n].to_vec()).await.is_err() {
+ break;
+ }
+ }
+ }
+ }
+ });
+ let err_pump = tokio::spawn(async move {
+ let mut buf = vec![0u8; 4096];
+ loop {
+ match stderr.read(&mut buf).await {
+ Ok(0) | Err(_) => break,
+ Ok(n) => {
+ if tx.send(buf[..n].to_vec()).await.is_err() {
+ break;
+ }
+ }
+ }
+ }
+ });
+ while let Some(chunk) = rx.recv().await {
+ if sw.write_all(&chunk).await.is_err() {
+ break;
+ }
+ }
+ let _ = out_pump.await;
+ let _ = err_pump.await;
+ let _ = sw.shutdown().await;
+}