difftreelog
refactor merge well-known endpoints into a single crate
in: trunk
36 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2065,10 +2065,10 @@
"futures-util",
"nix",
"rand 0.10.1",
+ "remowt-endpoints",
"remowt-link-shared",
"remowt-plugin",
"remowt-polkit-shared",
- "remowt-pty",
"remowt-ui-prompt",
"serde",
"tempfile",
@@ -2090,6 +2090,7 @@
"bifrostlink-ports",
"bytes",
"camino",
+ "remowt-endpoints",
"remowt-link-shared",
"russh",
"russh-config",
@@ -2101,16 +2102,21 @@
]
[[package]]
-name = "remowt-fs"
+name = "remowt-endpoints"
version = "0.1.1"
dependencies = [
+ "anyhow",
"bifrostlink",
"bifrostlink-macros",
"camino",
+ "nix",
"serde",
"tempfile",
"thiserror",
"tokio",
+ "tracing",
+ "uuid",
+ "zbus",
]
[[package]]
@@ -2120,30 +2126,11 @@
"bifrostlink",
"bytes",
"camino",
- "remowt-fs",
- "remowt-pty",
- "remowt-systemd",
"remowt-ui-prompt",
"serde",
"serde_json",
- "thiserror",
- "tokio",
-]
-
-[[package]]
-name = "remowt-nix-daemon"
-version = "0.1.1"
-dependencies = [
- "anyhow",
- "bifrostlink",
- "bifrostlink-macros",
- "camino",
- "remowt-client",
- "serde",
"thiserror",
"tokio",
- "tracing",
- "uuid",
]
[[package]]
@@ -2171,20 +2158,6 @@
]
[[package]]
-name = "remowt-pty"
-version = "0.1.1"
-dependencies = [
- "bifrostlink",
- "bifrostlink-macros",
- "camino",
- "nix",
- "serde",
- "thiserror",
- "tokio",
- "tracing",
-]
-
-[[package]]
name = "remowt-ssh"
version = "0.1.1"
dependencies = [
@@ -2210,17 +2183,6 @@
"tracing",
"tracing-subscriber",
"uuid",
-]
-
-[[package]]
-name = "remowt-systemd"
-version = "0.1.1"
-dependencies = [
- "bifrostlink",
- "bifrostlink-macros",
- "serde",
- "thiserror",
- "zbus",
]
[[package]]
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,14 +9,12 @@
repository = "https://gitlab.delta.directory/iam/remowt"
[workspace.dependencies]
-remowt-fs = { version = "0.1.1", path = "crates/remowt-fs" }
-remowt-pty = { version = "0.1.1", path = "crates/remowt-pty" }
-remowt-systemd = { version = "0.1.1", path = "crates/remowt-systemd" }
remowt-client = { version = "0.1.1", path = "crates/remowt-client" }
remowt-polkit-shared = { version = "0.1.1", path = "crates/polkit-shared" }
remowt-link-shared = { version = "0.1.1", path = "crates/remowt-link-shared" }
remowt-plugin = { version = "0.1.1", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.1", path = "crates/ui-prompt" }
+remowt-ui-prompt = { version = "0.1.1", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.1", path = "crates/remowt-endpoints" }
bifrostlink = "0.2.0"
bifrostlink-macros = "0.2.0"
cmds/polkit-dbus-helper/src/main.rsdiffbeforeafterboth--- a/cmds/polkit-dbus-helper/src/main.rs
+++ b/cmds/polkit-dbus-helper/src/main.rs
@@ -8,10 +8,10 @@
use nix::unistd::{setuid, Uid, User};
use pam_client::{Context, ConversationHandler, ErrorCode, Flag};
use remowt_polkit_shared::BackendRequest;
+use remowt_ui_prompt::dbus::DbusPrompterProxyBlocking;
+use remowt_ui_prompt::BlockingPrompter;
use tokio::task::{block_in_place, spawn_blocking};
use tracing::trace;
-use remowt_ui_prompt::dbus::DbusPrompterProxyBlocking;
-use remowt_ui_prompt::BlockingPrompter;
use zbus::fdo;
use zbus::message::Header;
use zbus::zvariant::OwnedValue;
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- a/cmds/remowt-agent/Cargo.toml
+++ b/cmds/remowt-agent/Cargo.toml
@@ -17,7 +17,6 @@
rand.workspace = true
remowt-link-shared.workspace = true
remowt-plugin.workspace = true
-remowt-pty.workspace = true
serde = { workspace = true, features = ["derive"] }
tempfile.workspace = true
tokio = { workspace = true, features = [
@@ -36,3 +35,4 @@
uuid = { workspace = true, features = ["v4"] }
zbus = { workspace = true, features = ["tokio"] }
zbus_polkit = { workspace = true, features = ["tokio"] }
+remowt-endpoints.workspace = true
cmds/remowt-agent/src/helper/protocol.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/helper/protocol.rs
+++ b/cmds/remowt-agent/src/helper/protocol.rs
@@ -3,10 +3,10 @@
use anyhow::bail;
use futures::stream::Peekable;
use futures::StreamExt as _;
+use remowt_ui_prompt::Prompter;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt as _};
use tokio::select;
use tokio_util::codec::{FramedRead, LinesCodec};
-use remowt_ui_prompt::Prompter;
pub async fn run_conversation<R, W, P>(reader: R, mut writer: W, prompt: P) -> anyhow::Result<()>
where
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -11,8 +11,8 @@
use bifrostlink_ports::stdio::from_stdio;
use bifrostlink_ports::unix_socket::from_socket;
use clap::Parser;
-use remowt_link_shared::editor::EditorEndpointsClient;
-use remowt_link_shared::{Address, BifConfig, Fs, Pty, Systemd};
+use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};
+use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};
use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};
use remowt_ui_prompt::bifrost::PromptEndpointsClient;
use remowt_ui_prompt::rofi::RofiPrompter;
cmds/remowt-ssh/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -13,13 +13,13 @@
use remowt_client::editor::SshEditor;
use remowt_client::{AgentBundle, Remowt};
use remowt_link_shared::editor::serve_editor;
+use remowt_ui_prompt::bifrost::serve_prompts;
+use remowt_ui_prompt::rofi::RofiPrompter;
+use remowt_ui_prompt::{PrependSourcePrompter, Source};
use tokio::io::unix::AsyncFd;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::signal::unix::{signal, SignalKind};
use tracing::info;
-use remowt_ui_prompt::bifrost::serve_prompts;
-use remowt_ui_prompt::rofi::RofiPrompter;
-use remowt_ui_prompt::{PrependSourcePrompter, Source};
#[derive(Parser)]
struct Opts {
crates/remowt-client/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -19,3 +19,4 @@
tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }
tracing.workspace = true
uuid = { workspace = true, features = ["v4"] }
+remowt-endpoints.workspace = true
crates/remowt-client/src/lib.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::io;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_link_shared::plugin::PluginEndpointsClient;13use remowt_link_shared::{14 Address, BifConfig, ElevateEndpoints, ElevateError, Elevator, Fs, Pty, PtyClient, ShellId,15 Systemd,16};17use russh::client::{connect, Config, Handle, Handler, Msg, Session};18use russh::keys::agent::client::AgentClient;19use russh::keys::agent::AgentIdentity;20use russh::keys::check_known_hosts;21use russh::keys::ssh_key::PublicKey;22use russh::{Channel, ChannelMsg, ChannelStream};23use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};24use tokio::join;25use tokio::net::UnixListener;26use tokio::sync::mpsc;27use tokio::sync::oneshot::{self, channel};28use tracing::error;29use uuid::Uuid;3031pub mod editor;3233type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3435async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {36 let len = srx.read_u32().await?;37 let mut buf = BytesMut::zeroed(len as usize);38 srx.read_exact(&mut buf).await?;39 Ok(buf)40}41async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {42 stx.write_u32(value.len().try_into().expect("can't be larger"))43 .await?;44 stx.write_all(&value).await?;45 Ok(())46}4748fn sh_quote(s: impl AsRef<str>) -> String {49 format!("'{}'", s.as_ref().replace('\'', "'\\''"))50}5152const ESCALATORS: [(&str, &[&str]); 3] = [53 ("run0", &["--background=", "--pipe"]),54 ("sudo", &[]),55 ("doas", &[]),56];5758pub struct AgentBundle {59 dir: PathBuf,60 hashes: HashMap<String, String>,61}6263impl AgentBundle {64 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {65 let dir = dir.into();66 let hashes_path = dir.join("hashes");67 let raw = std::fs::read_to_string(&hashes_path)68 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;69 let mut hashes = HashMap::new();70 for line in raw.lines() {71 let line = line.trim();72 if line.is_empty() {73 continue;74 }75 let (arch, hash) = line76 .split_once(char::is_whitespace)77 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;78 hashes.insert(arch.to_owned(), hash.trim().to_owned());79 }80 ensure!(81 !hashes.is_empty(),82 "agent bundle {} has no hashes",83 dir.display()84 );85 Ok(Self { dir, hashes })86 }8788 fn binary(&self, arch: &str) -> PathBuf {89 self.dir.join(format!("remowt-agent-{arch}"))90 }91}9293async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {94 let mut ch = sess.channel_open_session().await?;95 ch.exec(true, cmd).await?;96 let mut out = Vec::new();97 let mut code = None;98 while let Some(msg) = ch.wait().await {99 match msg {100 ChannelMsg::Data { data } => out.extend(data.as_ref()),101 ChannelMsg::ExtendedData { data, .. } => {102 error!(103 "remote stderr: {}",104 String::from_utf8_lossy(data.as_ref()).trim()105 );106 }107 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),108 _ => {}109 }110 }111 Ok((code, out))112}113114async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {115 let (code, mut out) = run(sess, cmd).await?;116 ensure!(117 code == Some(0),118 "remote command failed (exit {code:?}): {cmd}"119 );120 ensure!(out.ends_with(b"\n"));121 out.pop();122 String::from_utf8(out).context("expected utf8 output for command")123}124125async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {126 let arch = run_string_ok(sess, "uname -m").await?;127 let hash = bundle128 .hashes129 .get(&arch)130 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;131132 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")133 .await?134 .trim()135 .to_owned();136 let dir = if cache.is_empty() {137 let home = run_string_ok(sess, "echo \"$HOME\"").await?;138 ensure!(139 !home.is_empty(),140 "remote $HOME and $XDG_CACHE_HOME both empty"141 );142 Utf8PathBuf::from(home).join("cache/remowt")143 } else {144 Utf8PathBuf::from(cache).join("remowt")145 };146 let path = dir.join(hash);147148 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;149 if present != Some(0) {150 let bin = bundle.binary(&arch);151 let bytes = std::fs::read(&bin)152 .with_context(|| format!("reading agent binary {}", bin.display()))?;153 upload_agent(sess, &dir, &path, bytes).await?;154 }155 Ok(path)156}157158async fn upload_agent(159 sess: &Handle<SshHandler>,160 dir: &Utf8Path,161 path: &Utf8Path,162 bytes: Vec<u8>,163) -> Result<()> {164 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;165166 let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));167 let ch = sess.channel_open_session().await?;168 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;169 ch.data_bytes(bytes).await?;170 ch.eof().await?;171 let mut ch = ch;172 let mut code = None;173 while let Some(msg) = ch.wait().await {174 match msg {175 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),176 ChannelMsg::ExtendedData { data, .. } => {177 error!(178 "agent upload: {}",179 String::from_utf8_lossy(data.as_ref()).trim()180 );181 }182 _ => {}183 }184 }185 ensure!(code == Some(0), "agent upload failed (exit {code:?})");186187 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;188 run_string_ok(189 sess,190 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),191 )192 .await?;193 Ok(())194}195196async fn detect_escalation(197 sess: &Handle<SshHandler>,198) -> Result<(&'static str, &'static [&'static str])> {199 for (tool, flags) in ESCALATORS {200 // `tool` is a fixed identifier (no metacharacters), safe to interpolate.201 let (code, _) = run(sess, &format!("command -v {tool}")).await?;202 if code == Some(0) {203 return Ok((tool, flags));204 }205 }206 bail!("no escalation tool (run0/sudo/doas) found on remote")207}208209fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {210 let mut parts = vec![tool.to_owned()];211 parts.extend(flags.iter().map(|f| f.to_string()));212 parts.push(sh_quote(agent_path));213 parts.push("real-agent".to_owned());214 parts.push("--privileged".to_owned());215 if let Some(p) = path {216 parts.push("--path".to_owned());217 parts.push(sh_quote(p));218 }219 parts.join(" ")220}221222fn find_in_path(name: &str) -> Option<std::path::PathBuf> {223 let path = std::env::var_os("PATH")?;224 std::env::split_paths(&path)225 .map(|dir| dir.join(name))226 .find(|p| p.is_file())227}228229fn port_from_channel(ch: Channel<Msg>) -> Port {230 Port::new(move |mut rx, tx| async move {231 let (mut srx, mut stx) = tokio::io::split(ch.into_stream());232 let srx_task = async move {233 loop {234 match read(&mut srx).await {235 Ok(buf) => {236 if tx.send(buf.freeze()).is_err() {237 break;238 }239 }240 Err(e) => {241 error!("channel read failed: {e}");242 break;243 }244 }245 }246 };247 let stx_task = async move {248 while let Some(value) = rx.recv().await {249 if let Err(e) = write(&mut stx, value).await {250 error!("channel write failed: {e}");251 break;252 }253 }254 };255 join!(srx_task, stx_task);256 })257}258259pub struct SshHandler {260 host: String,261 port: u16,262 subs: Subs,263}264impl Handler for SshHandler {265 type Error = russh::Error;266 async fn check_server_key(267 &mut self,268 server_public_key: &PublicKey,269 ) -> Result<bool, Self::Error> {270 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)271 }272 async fn server_channel_open_forwarded_streamlocal(273 &mut self,274 channel: Channel<Msg>,275 socket_path: &str,276 _session: &mut Session,277 ) -> Result<(), Self::Error> {278 let Some(ch) = self279 .subs280 .lock()281 .expect("lock")282 .remove(&Utf8PathBuf::from(socket_path))283 else {284 return Err(russh::Error::WrongChannel);285 };286 let _ = ch.send(channel);287 Ok(())288 }289}290291struct SshElevator {292 sess: Arc<Handle<SshHandler>>,293 rpc: WeakRpc<BifConfig>,294 agent_path: Utf8PathBuf,295}296impl Elevator for SshElevator {297 async fn elevate(&self) -> Result<(), ElevateError> {298 let fail = |e: String| ElevateError::Failed(e);299 let (tool, flags) = detect_escalation(&self.sess)300 .await301 .map_err(|e| fail(e.to_string()))?;302 let ch = self303 .sess304 .channel_open_session()305 .await306 .map_err(|e| fail(e.to_string()))?;307 ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))308 .await309 .map_err(|e| fail(e.to_string()))?;310 let rpc = self311 .rpc312 .clone()313 .upgrade()314 .ok_or_else(|| fail("rpc is gone".to_owned()))?;315 rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));316 Ok(())317 }318}319320pub struct RemoteChild {321 pub stdout: DuplexStream,322 pub stderr: DuplexStream,323 pub exit: oneshot::Receiver<Option<u32>>,324}325326enum Transport {327 Ssh {328 sess: Arc<Handle<SshHandler>>,329 subs: Subs,330 remote_dir: Utf8PathBuf,331 agent_path: Utf8PathBuf,332 },333 Local {334 #[allow(dead_code)]335 agent: Rpc<BifConfig>,336 agent_path: String,337 },338}339340pub struct Remowt {341 transport: Transport,342 rpc: Rpc<BifConfig>,343 elevated: tokio::sync::OnceCell<()>,344 children: Mutex<Vec<tokio::process::Child>>,345}346347pub type RemowtRemote = Remote<BifConfig>;348349fn loopback() -> (Port, Port) {350 let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();351 let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();352 let user = Port::new(move |mut rx, tx| async move {353 loop {354 tokio::select! {355 msg = rx.recv() => match msg {356 Some(msg) => if a2b_tx.send(msg).is_err() { break },357 None => break,358 },359 msg = b2a_rx.recv() => match msg {360 Some(msg) => if tx.send(msg).is_err() { break },361 None => break,362 },363 }364 }365 });366 let agent = Port::new(move |mut rx, tx| async move {367 loop {368 tokio::select! {369 msg = rx.recv() => match msg {370 Some(msg) => if b2a_tx.send(msg).is_err() { break },371 None => break,372 },373 msg = a2b_rx.recv() => match msg {374 Some(msg) => if tx.send(msg).is_err() { break },375 None => break,376 },377 }378 }379 });380 (user, agent)381}382383impl Remowt {384 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {385 let conf = russh_config::parse_home(host)?;386 let port = conf.host_config.port.unwrap_or(22);387 let hostname = conf388 .host_config389 .hostname390 .clone()391 .unwrap_or_else(|| conf.host_name.clone());392 let user = conf393 .user394 .clone()395 .unwrap_or_else(|| std::env::var("USER").unwrap_or_else(|_| "root".to_owned()));396397 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));398 let mut sess = connect(399 Arc::new(Config::default()),400 (hostname.clone(), port),401 SshHandler {402 host: hostname,403 port,404 subs: subs.clone(),405 },406 )407 .await?;408409 let mut agent = AgentClient::connect_env().await?;410 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();411 let mut authenticated = false;412 for ident in agent.request_identities().await? {413 let AgentIdentity::PublicKey { key, .. } = ident else {414 continue;415 };416 if sess417 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)418 .await?419 .success()420 {421 authenticated = true;422 break;423 }424 }425 ensure!(authenticated, "ssh authentication failed");426427 // All remaining session ops take `&self`; share the handle.428 let sess = Arc::new(sess);429430 let agent_path = deploy_agent(&sess, bundle).await?;431432 let remote_dir = remote_mktemp(&sess).await?;433 let primary = remote_dir.join("primary.sock");434435 let (onetx, onerx) = channel();436 subs.lock().expect("lock").insert(primary.clone(), onetx);437 sess.streamlocal_forward(primary.clone()).await?;438439 let rpc = Rpc::<BifConfig>::new(Address::User);440441 // TODO: ensure no injection is possible in the socket path.442 let cmd_chan = sess.channel_open_session().await?;443 cmd_chan444 .exec(445 true,446 format!(447 "{} real-agent --path={}",448 sh_quote(&agent_path),449 sh_quote(&primary)450 ),451 )452 .await?;453454 let port = port_from_channel(455 onerx456 .await457 .map_err(|_| anyhow!("agent never opened its channel"))?,458 );459 rpc.add_direct(Address::Agent, port, Rtt(0));460461 Ok(Self {462 transport: Transport::Ssh {463 sess,464 subs,465 remote_dir,466 agent_path,467 },468 rpc,469 elevated: tokio::sync::OnceCell::new(),470 children: Mutex::new(Vec::new()),471 })472 }473474 pub async fn connect_local(agent_path: &str) -> Result<Self> {475 let (port_user, port_agent) = loopback();476 let rpc = Rpc::<BifConfig>::new(Address::User);477 let mut agent = Rpc::<BifConfig>::new(Address::Agent);478479 // Register handlers before wiring up the link (see the agent binary).480 Fs::new().register_endpoints(&mut agent);481 Systemd.register_endpoints(&mut agent);482 Pty::new().register_endpoints(&mut agent);483484 agent.add_direct(Address::User, port_agent, Rtt(0));485 rpc.add_direct(Address::Agent, port_user, Rtt(0));486487 Ok(Self {488 transport: Transport::Local {489 agent,490 agent_path: agent_path.to_owned(),491 },492 rpc,493 elevated: tokio::sync::OnceCell::new(),494 children: Mutex::new(Vec::new()),495 })496 }497498 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {499 match &self.transport {500 Transport::Ssh { sess, .. } => Some(sess.clone()),501 Transport::Local { .. } => None,502 }503 }504505 pub fn rpc(&self) -> Rpc<BifConfig> {506 self.rpc.clone()507 }508509 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {510 R::wrap(self.rpc.remote(Address::Agent))511 }512513 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {514 let client: PluginEndpointsClient<BifConfig> = self.endpoints();515 client516 .load_plugin(id, name.to_owned())517 .await?518 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))519 }520 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {521 self.ensure_elevated().await?;522 let client: PluginEndpointsClient<BifConfig> =523 PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));524 client525 .load_plugin_path(id, path.to_owned())526 .await?527 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))528 }529 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {530 R::wrap(self.rpc.remote(Address::Plugin(id)))531 }532 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {533 self.ensure_elevated().await?;534 Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))535 }536537 async fn ensure_elevated(&self) -> Result<()> {538 self.elevated539 .get_or_try_init(|| async {540 let port = match &self.transport {541 Transport::Ssh {542 sess, agent_path, ..543 } => {544 let (tool, flags) = detect_escalation(sess).await?;545 let ch = sess.channel_open_session().await?;546 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))547 .await?;548 port_from_channel(ch)549 }550 Transport::Local { agent_path, .. } => {551 let sock = std::env::temp_dir()552 .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));553 let _ = std::fs::remove_file(&sock);554 let listener = UnixListener::bind(&sock)?;555 let (tool, flags) = ESCALATORS556 .iter()557 .find(|(t, _)| find_in_path(t).is_some())558 .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;559 let child = tokio::process::Command::new(tool)560 .args(*flags)561 .arg(agent_path)562 .arg("real-agent")563 .arg("--privileged")564 .arg("--path")565 .arg(sock.to_str().expect("temp path is utf-8"))566 .kill_on_drop(true)567 .spawn()?;568 self.children.lock().expect("lock").push(child);569 let (stream, _) = listener.accept().await?;570 let _ = std::fs::remove_file(&sock);571 from_socket(stream)572 }573 };574 self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));575 anyhow::Ok(())576 })577 .await?;578 Ok(())579 }580581 pub async fn exec(&self, command: String) -> Result<RemoteChild> {582 let Some(sess) = self.ssh() else {583 bail!("exec should not be called on local")584 };585 let ch = sess.channel_open_session().await?;586 ch.exec(true, command).await?;587588 let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);589 let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);590 let (exit_tx, exit) = oneshot::channel();591592 tokio::spawn(async move {593 let mut ch = ch;594 let mut code = None;595 while let Some(msg) = ch.wait().await {596 match msg {597 ChannelMsg::Data { data } => {598 if out_w.write_all(&data).await.is_err() {599 break;600 }601 }602 ChannelMsg::ExtendedData { data, .. } => {603 if err_w.write_all(&data).await.is_err() {604 break;605 }606 }607 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),608 _ => {}609 }610 }611 let _ = out_w.shutdown().await;612 let _ = err_w.shutdown().await;613 let _ = exit_tx.send(code);614 });615616 Ok(RemoteChild {617 stdout,618 stderr,619 exit,620 })621 }622623 pub fn serve_elevate(&self) -> Result<()> {624 let Transport::Ssh {625 sess, agent_path, ..626 } = &self.transport627 else {628 bail!("elevate should not be called on local")629 };630 let mut rpc = self.rpc.clone();631 ElevateEndpoints(SshElevator {632 sess: sess.clone(),633 rpc: self.rpc.clone().downgrade(),634 agent_path: agent_path.to_owned(),635 })636 .register_endpoints(&mut rpc);637 Ok(())638 }639640 pub fn remote_dir(&self) -> Option<&Utf8Path> {641 match &self.transport {642 Transport::Ssh { remote_dir, .. } => Some(remote_dir),643 Transport::Local { .. } => None,644 }645 }646647 pub async fn forward_socket(648 &self,649 remote_path: &Utf8Path,650 ) -> Result<oneshot::Receiver<Channel<Msg>>> {651 let Transport::Ssh { sess, subs, .. } = &self.transport else {652 bail!("forward_socket should not be called on local")653 };654 let (tx, rx) = oneshot::channel();655 subs.lock()656 .expect("lock")657 .insert(remote_path.to_owned(), tx);658 sess.streamlocal_forward(remote_path.to_owned()).await?;659 Ok(rx)660 }661662 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {663 let Transport::Ssh { remote_dir, .. } = &self.transport else {664 bail!("open_shell should not be called on local")665 };666 let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));667668 let rx = self.forward_socket(&sock).await?;669 let client: PtyClient<BifConfig> = self.endpoints();670 let id = client671 .open_shell(sock, term.to_owned(), cols, rows)672 .await?673 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;674 let ch = rx675 .await676 .map_err(|_| anyhow!("agent never connected the shell socket"))?;677678 Ok(Shell {679 id,680 stream: ch.into_stream(),681 remote: self.rpc.remote(Address::Agent),682 })683 }684}685686pub struct Shell {687 pub id: ShellId,688 pub stream: ChannelStream<Msg>,689 remote: Remote<BifConfig>,690}691692impl Shell {693 pub fn resizer(&self) -> ShellResizer {694 ShellResizer {695 remote: self.remote.clone(),696 id: self.id,697 }698 }699}700701#[derive(Clone)]702pub struct ShellResizer {703 remote: Remote<BifConfig>,704 id: ShellId,705}706707impl ShellResizer {708 pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {709 PtyClient::wrap(self.remote.clone())710 .resize(self.id, cols, rows)711 .await?712 .map_err(|e| anyhow!("failed to resize remote shell: {e}"))713 }714}715716async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {717 let mut cmd_chan = sess.channel_open_session().await?;718 cmd_chan719 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")720 .await?;721 let mut stdout = vec![];722 loop {723 let Some(msg) = cmd_chan.wait().await else {724 bail!("unexpected channel end");725 };726 match msg {727 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),728 russh::ChannelMsg::ExitStatus { exit_status } => {729 if exit_status != 0 {730 bail!("mktemp failed");731 }732 break;733 }734 _ => {}735 }736 }737 ensure!(stdout.ends_with(b"\n"));738 stdout.pop();739 Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))740}1use std::collections::HashMap;2use std::path::PathBuf;3use std::sync::{Arc, Mutex};4use std::{env, io};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_endpoints::{13 fs::Fs,14 pty::{Pty, PtyClient, ShellId},15 systemd::Systemd,16};17use remowt_link_shared::plugin::PluginEndpointsClient;18use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator};19use russh::client::{connect, Config, Handle, Handler, Msg, Session};20use russh::keys::agent::client::AgentClient;21use russh::keys::agent::AgentIdentity;22use russh::keys::check_known_hosts;23use russh::keys::ssh_key::PublicKey;24use russh::{Channel, ChannelMsg, ChannelStream};25use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};26use tokio::join;27use tokio::net::UnixListener;28use tokio::sync::mpsc;29use tokio::sync::oneshot::{self, channel};30use tracing::error;31use uuid::Uuid;3233pub mod editor;3435type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3637async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {38 let len = srx.read_u32().await?;39 let mut buf = BytesMut::zeroed(len as usize);40 srx.read_exact(&mut buf).await?;41 Ok(buf)42}43async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {44 stx.write_u32(value.len().try_into().expect("can't be larger"))45 .await?;46 stx.write_all(&value).await?;47 Ok(())48}4950fn sh_quote(s: impl AsRef<str>) -> String {51 format!("'{}'", s.as_ref().replace('\'', "'\\''"))52}5354const ESCALATORS: [(&str, &[&str]); 3] = [55 ("run0", &["--background=", "--pipe"]),56 ("sudo", &[]),57 ("doas", &[]),58];5960pub struct AgentBundle {61 dir: PathBuf,62 hashes: HashMap<String, String>,63}6465impl AgentBundle {66 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {67 let dir = dir.into();68 let hashes_path = dir.join("hashes");69 let raw = std::fs::read_to_string(&hashes_path)70 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;71 let mut hashes = HashMap::new();72 for line in raw.lines() {73 let line = line.trim();74 if line.is_empty() {75 continue;76 }77 let (arch, hash) = line78 .split_once(char::is_whitespace)79 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;80 hashes.insert(arch.to_owned(), hash.trim().to_owned());81 }82 ensure!(83 !hashes.is_empty(),84 "agent bundle {} has no hashes",85 dir.display()86 );87 Ok(Self { dir, hashes })88 }8990 fn binary(&self, arch: &str) -> PathBuf {91 self.dir.join(format!("remowt-agent-{arch}"))92 }93}9495async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {96 let mut ch = sess.channel_open_session().await?;97 ch.exec(true, cmd).await?;98 let mut out = Vec::new();99 let mut code = None;100 while let Some(msg) = ch.wait().await {101 match msg {102 ChannelMsg::Data { data } => out.extend(data.as_ref()),103 ChannelMsg::ExtendedData { data, .. } => {104 error!(105 "remote stderr: {}",106 String::from_utf8_lossy(data.as_ref()).trim()107 );108 }109 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),110 _ => {}111 }112 }113 Ok((code, out))114}115116async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {117 let (code, mut out) = run(sess, cmd).await?;118 ensure!(119 code == Some(0),120 "remote command failed (exit {code:?}): {cmd}"121 );122 ensure!(out.ends_with(b"\n"));123 out.pop();124 String::from_utf8(out).context("expected utf8 output for command")125}126127async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {128 let arch = run_string_ok(sess, "uname -m").await?;129 let hash = bundle130 .hashes131 .get(&arch)132 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133134 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")135 .await?136 .trim()137 .to_owned();138 let dir = if cache.is_empty() {139 let home = run_string_ok(sess, "echo \"$HOME\"").await?;140 ensure!(141 !home.is_empty(),142 "remote $HOME and $XDG_CACHE_HOME both empty"143 );144 Utf8PathBuf::from(home).join("cache/remowt")145 } else {146 Utf8PathBuf::from(cache).join("remowt")147 };148 let path = dir.join(hash);149150 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;151 if present != Some(0) {152 let bin = bundle.binary(&arch);153 let bytes = std::fs::read(&bin)154 .with_context(|| format!("reading agent binary {}", bin.display()))?;155 upload_agent(sess, &dir, &path, bytes).await?;156 }157 Ok(path)158}159160async fn upload_agent(161 sess: &Handle<SshHandler>,162 dir: &Utf8Path,163 path: &Utf8Path,164 bytes: Vec<u8>,165) -> Result<()> {166 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;167168 let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));169 let ch = sess.channel_open_session().await?;170 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;171 ch.data_bytes(bytes).await?;172 ch.eof().await?;173 let mut ch = ch;174 let mut code = None;175 while let Some(msg) = ch.wait().await {176 match msg {177 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),178 ChannelMsg::ExtendedData { data, .. } => {179 error!(180 "agent upload: {}",181 String::from_utf8_lossy(data.as_ref()).trim()182 );183 }184 _ => {}185 }186 }187 ensure!(code == Some(0), "agent upload failed (exit {code:?})");188189 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;190 run_string_ok(191 sess,192 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),193 )194 .await?;195 Ok(())196}197198async fn detect_escalation(199 sess: &Handle<SshHandler>,200) -> Result<(&'static str, &'static [&'static str])> {201 for (tool, flags) in ESCALATORS {202 // `tool` is a fixed identifier (no metacharacters), safe to interpolate.203 let (code, _) = run(sess, &format!("command -v {tool}")).await?;204 if code == Some(0) {205 return Ok((tool, flags));206 }207 }208 bail!("no escalation tool (run0/sudo/doas) found on remote")209}210211fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {212 let mut parts = vec![tool.to_owned()];213 parts.extend(flags.iter().map(|f| f.to_string()));214 parts.push(sh_quote(agent_path));215 parts.push("real-agent".to_owned());216 parts.push("--privileged".to_owned());217 if let Some(p) = path {218 parts.push("--path".to_owned());219 parts.push(sh_quote(p));220 }221 parts.join(" ")222}223224fn find_in_path(name: &str) -> Option<std::path::PathBuf> {225 let path = env::var_os("PATH")?;226 env::split_paths(&path)227 .map(|dir| dir.join(name))228 .find(|p| p.is_file())229}230231fn port_from_channel(ch: Channel<Msg>) -> Port {232 Port::new(move |mut rx, tx| async move {233 let (mut srx, mut stx) = tokio::io::split(ch.into_stream());234 let srx_task = async move {235 loop {236 match read(&mut srx).await {237 Ok(buf) => {238 if tx.send(buf.freeze()).is_err() {239 break;240 }241 }242 Err(e) => {243 error!("channel read failed: {e}");244 break;245 }246 }247 }248 };249 let stx_task = async move {250 while let Some(value) = rx.recv().await {251 if let Err(e) = write(&mut stx, value).await {252 error!("channel write failed: {e}");253 break;254 }255 }256 };257 join!(srx_task, stx_task);258 })259}260261pub struct SshHandler {262 host: String,263 port: u16,264 subs: Subs,265}266impl Handler for SshHandler {267 type Error = russh::Error;268 async fn check_server_key(269 &mut self,270 server_public_key: &PublicKey,271 ) -> Result<bool, Self::Error> {272 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)273 }274 async fn server_channel_open_forwarded_streamlocal(275 &mut self,276 channel: Channel<Msg>,277 socket_path: &str,278 _session: &mut Session,279 ) -> Result<(), Self::Error> {280 let Some(ch) = self281 .subs282 .lock()283 .expect("lock")284 .remove(&Utf8PathBuf::from(socket_path))285 else {286 return Err(russh::Error::WrongChannel);287 };288 let _ = ch.send(channel);289 Ok(())290 }291}292293struct SshElevator {294 sess: Arc<Handle<SshHandler>>,295 rpc: WeakRpc<BifConfig>,296 agent_path: Utf8PathBuf,297}298impl Elevator for SshElevator {299 async fn elevate(&self) -> Result<(), ElevateError> {300 let fail = |e: String| ElevateError::Failed(e);301 let (tool, flags) = detect_escalation(&self.sess)302 .await303 .map_err(|e| fail(e.to_string()))?;304 let ch = self305 .sess306 .channel_open_session()307 .await308 .map_err(|e| fail(e.to_string()))?;309 ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))310 .await311 .map_err(|e| fail(e.to_string()))?;312 let rpc = self313 .rpc314 .clone()315 .upgrade()316 .ok_or_else(|| fail("rpc is gone".to_owned()))?;317 rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));318 Ok(())319 }320}321322pub struct RemoteChild {323 pub stdout: DuplexStream,324 pub stderr: DuplexStream,325 pub exit: oneshot::Receiver<Option<u32>>,326}327328enum Transport {329 Ssh {330 sess: Arc<Handle<SshHandler>>,331 subs: Subs,332 remote_dir: Utf8PathBuf,333 agent_path: Utf8PathBuf,334 },335 Local {336 #[allow(dead_code)]337 agent: Rpc<BifConfig>,338 agent_path: String,339 },340}341342pub struct Remowt {343 transport: Transport,344 rpc: Rpc<BifConfig>,345 elevated: tokio::sync::OnceCell<()>,346 children: Mutex<Vec<tokio::process::Child>>,347}348349pub type RemowtRemote = Remote<BifConfig>;350351fn loopback() -> (Port, Port) {352 let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();353 let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();354 let user = Port::new(move |mut rx, tx| async move {355 loop {356 tokio::select! {357 msg = rx.recv() => match msg {358 Some(msg) => if a2b_tx.send(msg).is_err() { break },359 None => break,360 },361 msg = b2a_rx.recv() => match msg {362 Some(msg) => if tx.send(msg).is_err() { break },363 None => break,364 },365 }366 }367 });368 let agent = Port::new(move |mut rx, tx| async move {369 loop {370 tokio::select! {371 msg = rx.recv() => match msg {372 Some(msg) => if b2a_tx.send(msg).is_err() { break },373 None => break,374 },375 msg = a2b_rx.recv() => match msg {376 Some(msg) => if tx.send(msg).is_err() { break },377 None => break,378 },379 }380 }381 });382 (user, agent)383}384385impl Remowt {386 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {387 let conf = russh_config::parse_home(host)?;388 let port = conf.host_config.port.or(conf.port).unwrap_or(22);389 let hostname = conf390 .host_config391 .hostname392 .clone()393 .unwrap_or_else(|| conf.host_name.clone());394 let user = conf395 .user396 .clone()397 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));398399 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));400 let mut sess = connect(401 Arc::new(Config::default()),402 (hostname.clone(), port),403 SshHandler {404 host: hostname,405 port,406 subs: subs.clone(),407 },408 )409 .await?;410411 let mut agent = AgentClient::connect_env().await?;412 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();413 let mut authenticated = false;414 for ident in agent.request_identities().await? {415 let AgentIdentity::PublicKey { key, .. } = ident else {416 continue;417 };418 if sess419 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)420 .await?421 .success()422 {423 authenticated = true;424 break;425 }426 }427 ensure!(authenticated, "ssh authentication failed");428429 // All remaining session ops take `&self`; share the handle.430 let sess = Arc::new(sess);431432 let agent_path = deploy_agent(&sess, bundle).await?;433434 let remote_dir = remote_mktemp(&sess).await?;435 let primary = remote_dir.join("primary.sock");436437 let (onetx, onerx) = channel();438 subs.lock().expect("lock").insert(primary.clone(), onetx);439 sess.streamlocal_forward(primary.clone()).await?;440441 let rpc = Rpc::<BifConfig>::new(Address::User);442443 // TODO: ensure no injection is possible in the socket path.444 let cmd_chan = sess.channel_open_session().await?;445 cmd_chan446 .exec(447 true,448 format!(449 "{} real-agent --path={}",450 sh_quote(&agent_path),451 sh_quote(&primary)452 ),453 )454 .await?;455456 let port = port_from_channel(457 onerx458 .await459 .map_err(|_| anyhow!("agent never opened its channel"))?,460 );461 rpc.add_direct(Address::Agent, port, Rtt(0));462463 Ok(Self {464 transport: Transport::Ssh {465 sess,466 subs,467 remote_dir,468 agent_path,469 },470 rpc,471 elevated: tokio::sync::OnceCell::new(),472 children: Mutex::new(Vec::new()),473 })474 }475476 pub async fn connect_local(agent_path: &str) -> Result<Self> {477 let (port_user, port_agent) = loopback();478 let rpc = Rpc::<BifConfig>::new(Address::User);479 let mut agent = Rpc::<BifConfig>::new(Address::Agent);480481 // Register handlers before wiring up the link (see the agent binary).482 Fs::new().register_endpoints(&mut agent);483 Systemd.register_endpoints(&mut agent);484 Pty::new().register_endpoints(&mut agent);485486 agent.add_direct(Address::User, port_agent, Rtt(0));487 rpc.add_direct(Address::Agent, port_user, Rtt(0));488489 Ok(Self {490 transport: Transport::Local {491 agent,492 agent_path: agent_path.to_owned(),493 },494 rpc,495 elevated: tokio::sync::OnceCell::new(),496 children: Mutex::new(Vec::new()),497 })498 }499500 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {501 match &self.transport {502 Transport::Ssh { sess, .. } => Some(sess.clone()),503 Transport::Local { .. } => None,504 }505 }506507 pub fn rpc(&self) -> Rpc<BifConfig> {508 self.rpc.clone()509 }510511 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {512 R::wrap(self.rpc.remote(Address::Agent))513 }514515 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {516 let client: PluginEndpointsClient<BifConfig> = self.endpoints();517 client518 .load_plugin(id, name.to_owned())519 .await?520 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))521 }522 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {523 self.ensure_elevated().await?;524 let client: PluginEndpointsClient<BifConfig> =525 PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));526 client527 .load_plugin_path(id, path.to_owned())528 .await?529 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))530 }531 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {532 R::wrap(self.rpc.remote(Address::Plugin(id)))533 }534 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {535 self.ensure_elevated().await?;536 Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))537 }538539 async fn ensure_elevated(&self) -> Result<()> {540 self.elevated541 .get_or_try_init(|| async {542 let port = match &self.transport {543 Transport::Ssh {544 sess, agent_path, ..545 } => {546 let (tool, flags) = detect_escalation(sess).await?;547 let ch = sess.channel_open_session().await?;548 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))549 .await?;550 port_from_channel(ch)551 }552 Transport::Local { agent_path, .. } => {553 let sock = env::temp_dir()554 .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));555 let _ = std::fs::remove_file(&sock);556 let listener = UnixListener::bind(&sock)?;557 let (tool, flags) = ESCALATORS558 .iter()559 .find(|(t, _)| find_in_path(t).is_some())560 .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;561 let child = tokio::process::Command::new(tool)562 .args(*flags)563 .arg(agent_path)564 .arg("real-agent")565 .arg("--privileged")566 .arg("--path")567 .arg(sock.to_str().expect("temp path is utf-8"))568 .kill_on_drop(true)569 .spawn()?;570 self.children.lock().expect("lock").push(child);571 let (stream, _) = listener.accept().await?;572 let _ = std::fs::remove_file(&sock);573 from_socket(stream)574 }575 };576 self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));577 anyhow::Ok(())578 })579 .await?;580 Ok(())581 }582583 pub async fn exec(&self, command: String) -> Result<RemoteChild> {584 let Some(sess) = self.ssh() else {585 bail!("exec should not be called on local")586 };587 let ch = sess.channel_open_session().await?;588 ch.exec(true, command).await?;589590 let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);591 let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);592 let (exit_tx, exit) = oneshot::channel();593594 tokio::spawn(async move {595 let mut ch = ch;596 let mut code = None;597 while let Some(msg) = ch.wait().await {598 match msg {599 ChannelMsg::Data { data } => {600 if out_w.write_all(&data).await.is_err() {601 break;602 }603 }604 ChannelMsg::ExtendedData { data, .. } => {605 if err_w.write_all(&data).await.is_err() {606 break;607 }608 }609 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),610 _ => {}611 }612 }613 let _ = out_w.shutdown().await;614 let _ = err_w.shutdown().await;615 let _ = exit_tx.send(code);616 });617618 Ok(RemoteChild {619 stdout,620 stderr,621 exit,622 })623 }624625 pub fn serve_elevate(&self) -> Result<()> {626 let Transport::Ssh {627 sess, agent_path, ..628 } = &self.transport629 else {630 bail!("elevate should not be called on local")631 };632 let mut rpc = self.rpc.clone();633 ElevateEndpoints(SshElevator {634 sess: sess.clone(),635 rpc: self.rpc.clone().downgrade(),636 agent_path: agent_path.to_owned(),637 })638 .register_endpoints(&mut rpc);639 Ok(())640 }641642 pub fn remote_dir(&self) -> Option<&Utf8Path> {643 match &self.transport {644 Transport::Ssh { remote_dir, .. } => Some(remote_dir),645 Transport::Local { .. } => None,646 }647 }648649 pub async fn forward_socket(650 &self,651 remote_path: &Utf8Path,652 ) -> Result<oneshot::Receiver<Channel<Msg>>> {653 let Transport::Ssh { sess, subs, .. } = &self.transport else {654 bail!("forward_socket should not be called on local")655 };656 let (tx, rx) = oneshot::channel();657 subs.lock()658 .expect("lock")659 .insert(remote_path.to_owned(), tx);660 sess.streamlocal_forward(remote_path.to_owned()).await?;661 Ok(rx)662 }663664 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {665 let Transport::Ssh { remote_dir, .. } = &self.transport else {666 bail!("open_shell should not be called on local")667 };668 let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));669670 let rx = self.forward_socket(&sock).await?;671 let client: PtyClient<BifConfig> = self.endpoints();672 let id = client673 .open_shell(sock, term.to_owned(), cols, rows)674 .await?675 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;676 let ch = rx677 .await678 .map_err(|_| anyhow!("agent never connected the shell socket"))?;679680 Ok(Shell {681 id,682 stream: ch.into_stream(),683 remote: self.rpc.remote(Address::Agent),684 })685 }686}687688pub struct Shell {689 pub id: ShellId,690 pub stream: ChannelStream<Msg>,691 remote: Remote<BifConfig>,692}693694impl Shell {695 pub fn resizer(&self) -> ShellResizer {696 ShellResizer {697 remote: self.remote.clone(),698 id: self.id,699 }700 }701}702703#[derive(Clone)]704pub struct ShellResizer {705 remote: Remote<BifConfig>,706 id: ShellId,707}708709impl ShellResizer {710 pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {711 PtyClient::wrap(self.remote.clone())712 .resize(self.id, cols, rows)713 .await?714 .map_err(|e| anyhow!("failed to resize remote shell: {e}"))715 }716}717718async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {719 let mut cmd_chan = sess.channel_open_session().await?;720 cmd_chan721 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")722 .await?;723 let mut stdout = vec![];724 loop {725 let Some(msg) = cmd_chan.wait().await else {726 bail!("unexpected channel end");727 };728 match msg {729 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),730 russh::ChannelMsg::ExitStatus { exit_status } => {731 if exit_status != 0 {732 bail!("mktemp failed");733 }734 break;735 }736 _ => {}737 }738 }739 ensure!(stdout.ends_with(b"\n"));740 stdout.pop();741 Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))742}crates/remowt-endpoints/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-endpoints/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "remowt-endpoints"
+description = "Nix daemon proxy"
+version.workspace = true
+edition = "2021"
+license.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+bifrostlink.workspace = true
+bifrostlink-macros.workspace = true
+camino.workspace = true
+serde = { workspace = true }
+tempfile.workspace = true
+thiserror.workspace = true
+tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }
+tracing.workspace = true
+uuid.workspace = true
+nix = { workspace = true, features = ["process", "term"] }
+zbus.workspace = true
crates/remowt-endpoints/src/fs.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-endpoints/src/fs.rs
@@ -0,0 +1,105 @@
+use std::io::ErrorKind;
+use std::str::FromStr;
+use std::sync::Mutex;
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use camino::Utf8PathBuf;
+use serde::{Deserialize, Serialize};
+use tempfile::TempDir;
+
+#[derive(Default)]
+pub struct Fs {
+ tempdirs: Mutex<Vec<TempDir>>,
+}
+
+impl Fs {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+ #[error("file not found")]
+ NotFound,
+ #[error("file name/contents is not utf8")]
+ InvalidUtf8,
+ #[error("unknown fs error")]
+ Unknown,
+}
+
+#[endpoints(ns = 1)]
+impl Fs {
+ #[endpoints(id = 1)]
+ async fn read_file_tiny(&self, path: Utf8PathBuf) -> Result<Vec<u8>, Error> {
+ match tokio::fs::read(path).await {
+ Ok(v) => Ok(v),
+ Err(e) if e.kind() == ErrorKind::NotFound => Err(Error::NotFound),
+ _ => Err(Error::Unknown),
+ }
+ }
+ #[endpoints(id = 2)]
+ async fn file_exists(&self, path: Utf8PathBuf) -> bool {
+ tokio::fs::try_exists(path).await.unwrap_or(false)
+ }
+ #[endpoints(id = 3)]
+ async fn read_dir_raw(&self, path: Utf8PathBuf) -> Result<Vec<Utf8PathBuf>, Error> {
+ let mut dir = match tokio::fs::read_dir(path).await {
+ Ok(dir) => dir,
+ Err(e) if e.kind() == ErrorKind::NotFound => return Err(Error::NotFound),
+ Err(_) => return Err(Error::Unknown),
+ };
+ let mut out = Vec::new();
+ while let Ok(Some(entry)) = dir.next_entry().await {
+ let name = Utf8PathBuf::try_from(entry.file_name()).map_err(|_| Error::InvalidUtf8)?;
+ out.push(name);
+ }
+ Ok(out)
+ }
+ #[endpoints(id = 4)]
+ async fn mktemp_dir_raw(&self) -> Result<Utf8PathBuf, Error> {
+ let dir = tempfile::Builder::new()
+ .prefix("remowt.")
+ .tempdir()
+ .map_err(|_| Error::Unknown)?;
+ let mut tempdirs = self.tempdirs.lock().expect("not poisoned");
+ let path = Utf8PathBuf::try_from(dir.path().to_owned()).map_err(|_| Error::InvalidUtf8);
+ tempdirs.push(dir);
+ path
+ }
+ #[endpoints(id = 5)]
+ async fn rm_file(&self, path: Utf8PathBuf) -> Result<(), Error> {
+ match tokio::fs::remove_file(path).await {
+ Ok(()) => Ok(()),
+ Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
+ Err(_) => Err(Error::Unknown),
+ }
+ }
+}
+
+impl<C: Config> FsClient<C> {
+ pub async fn read_file_text(&self, path: impl Into<Utf8PathBuf>) -> Result<String, Error> {
+ let v = self
+ .read_file_tiny(path.into())
+ .await
+ .map_err(|_| Error::Unknown)?;
+ let v = v?;
+ String::from_utf8(v).map_err(|_| Error::InvalidUtf8)
+ }
+ pub async fn read_file_value<T: FromStr>(
+ &self,
+ path: impl Into<Utf8PathBuf>,
+ ) -> Result<Result<T, T::Err>, Error> {
+ let text = self.read_file_text(path).await?;
+ Ok(T::from_str(&text))
+ }
+ pub async fn mktemp_dir(&self) -> Result<Utf8PathBuf, Error> {
+ self.mktemp_dir_raw().await.map_err(|_| Error::Unknown)?
+ }
+ pub async fn read_dir(&self, path: impl Into<Utf8PathBuf>) -> Result<Vec<Utf8PathBuf>, Error> {
+ self.read_dir_raw(path.into())
+ .await
+ .map_err(|_| Error::Unknown)?
+ }
+}
crates/remowt-endpoints/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-endpoints/src/lib.rs
@@ -0,0 +1,4 @@
+pub mod fs;
+pub mod nix_daemon;
+pub mod pty;
+pub mod systemd;
crates/remowt-endpoints/src/nix_daemon.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-endpoints/src/nix_daemon.rs
@@ -0,0 +1,65 @@
+use std::process::Stdio;
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use serde::{Deserialize, Serialize};
+use std::result::Result;
+use tokio::process::Command;
+
+pub const NIX_DAEMON_SOCKET: &str = "/nix/var/nix/daemon-socket/socket";
+
+pub struct NixDaemon;
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+ #[error("nix daemon unavailable: {0}")]
+ DaemonUnavailable(String),
+ #[error("tunnel socket unavailable: {0}")]
+ Tunnel(String),
+}
+
+#[endpoints(ns = 4)]
+impl NixDaemon {
+ #[endpoints(id = 1)]
+ async fn connect_daemon(&self, socket: String) -> Result<(), Error> {
+ let mut daemon = tokio::net::UnixStream::connect(NIX_DAEMON_SOCKET)
+ .await
+ .map_err(|e| Error::DaemonUnavailable(e.to_string()))?;
+ let mut tunnel = tokio::net::UnixStream::connect(&socket)
+ .await
+ .map_err(|e| Error::Tunnel(e.to_string()))?;
+ tokio::spawn(async move {
+ if let Err(e) = tokio::io::copy_bidirectional(&mut daemon, &mut tunnel).await {
+ tracing::debug!("nix daemon tunnel ended: {e}");
+ }
+ });
+ Ok(())
+ }
+
+ #[endpoints(id = 2)]
+ async fn serve_store(&self, store: String, socket: String) -> Result<(), Error> {
+ let mut child = Command::new("nix-daemon")
+ .arg("--stdio")
+ .arg("--store")
+ .arg(&store)
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .spawn()
+ .map_err(|e| Error::DaemonUnavailable(e.to_string()))?;
+ let tunnel = tokio::net::UnixStream::connect(&socket)
+ .await
+ .map_err(|e| Error::Tunnel(e.to_string()))?;
+ let mut stdin = child.stdin.take().expect("piped");
+ let mut stdout = child.stdout.take().expect("piped");
+ tokio::spawn(async move {
+ let mut tunnel = tunnel;
+ let (mut tr, mut tw) = tunnel.split();
+ let _ = tokio::join!(
+ tokio::io::copy(&mut tr, &mut stdin),
+ tokio::io::copy(&mut stdout, &mut tw),
+ );
+ let _ = child.wait().await;
+ });
+ Ok(())
+ }
+}
crates/remowt-endpoints/src/pty.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-endpoints/src/pty.rs
@@ -0,0 +1,256 @@
+use std::collections::HashMap;
+use std::io;
+use std::os::fd::{AsRawFd, OwnedFd};
+use std::pin::Pin;
+use std::process::Stdio;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use camino::Utf8PathBuf;
+use nix::libc;
+use nix::pty::{openpty, OpenptyResult, Winsize};
+use serde::{Deserialize, Serialize};
+use tokio::io::unix::AsyncFd;
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::net::UnixStream;
+use tracing::{info, warn};
+
+pub type ShellId = u64;
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+ #[error("openpty failed: {0}")]
+ Open(String),
+ #[error("failed to spawn shell: {0}")]
+ Spawn(String),
+ #[error("failed to connect to forwarded socket: {0}")]
+ Connect(String),
+ #[error("no shell with that id")]
+ NoSuchShell,
+ #[error("resize failed: {0}")]
+ Resize(String),
+ #[error("io error: {0}")]
+ Io(String),
+}
+
+impl From<io::Error> for Error {
+ fn from(e: io::Error) -> Self {
+ Error::Io(e.to_string())
+ }
+}
+
+#[derive(Clone, Default)]
+pub struct Pty {
+ shells: Arc<Mutex<HashMap<ShellId, OwnedFd>>>,
+ next_id: Arc<AtomicU64>,
+}
+
+impl Pty {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+#[endpoints(ns = 7)]
+impl Pty {
+ #[endpoints(id = 1)]
+ async fn open_shell(
+ &self,
+ socket_path: Utf8PathBuf,
+ term: String,
+ cols: u16,
+ rows: u16,
+ ) -> Result<ShellId, Error> {
+ let ws = Winsize {
+ ws_row: rows,
+ ws_col: cols,
+ ws_xpixel: 0,
+ ws_ypixel: 0,
+ };
+ let OpenptyResult { master, slave } =
+ openpty(Some(&ws), None).map_err(|e| Error::Open(e.to_string()))?;
+
+ let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_owned());
+
+ let slave_in = slave.try_clone()?;
+ let slave_out = slave.try_clone()?;
+ let slave_err = slave;
+
+ let mut cmd = tokio::process::Command::new(&shell);
+ cmd.env("TERM", &term);
+ if let Ok(home) = std::env::var("HOME") {
+ cmd.current_dir(home);
+ }
+ cmd.stdin(Stdio::from(slave_in));
+ cmd.stdout(Stdio::from(slave_out));
+ cmd.stderr(Stdio::from(slave_err));
+ // SAFETY: only async-signal-safe calls (setsid, ioctl) before exec.
+ unsafe {
+ cmd.pre_exec(|| {
+ nix::unistd::setsid().map_err(|e| io::Error::from_raw_os_error(e as i32))?;
+ if libc::ioctl(0, libc::TIOCSCTTY as _, 0) < 0 {
+ return Err(io::Error::last_os_error());
+ }
+ Ok(())
+ });
+ }
+
+ let mut child = cmd.spawn().map_err(|e| Error::Spawn(e.to_string()))?;
+
+ let resize_fd = master.try_clone()?;
+ let id = self.next_id.fetch_add(1, Ordering::Relaxed);
+ self.shells
+ .lock()
+ .expect("not poisoned")
+ .insert(id, resize_fd);
+
+ let sock = match UnixStream::connect(&socket_path).await {
+ Ok(s) => s,
+ Err(e) => {
+ self.shells.lock().expect("not poisoned").remove(&id);
+ let _ = child.kill().await;
+ return Err(Error::Connect(e.to_string()));
+ }
+ };
+ let pty = AsyncPty::new(master)?;
+
+ info!(id, shell, "shell opened");
+ let shells = self.shells.clone();
+ tokio::spawn(async move {
+ let mut pty = pty;
+ let mut sock = sock;
+ if let Err(e) = tokio::io::copy_bidirectional(&mut pty, &mut sock).await {
+ warn!(id, "shell pump ended: {e}");
+ }
+ let _ = child.kill().await;
+ shells.lock().expect("not poisoned").remove(&id);
+ info!(id, "shell closed");
+ });
+
+ Ok(id)
+ }
+
+ #[endpoints(id = 2)]
+ async fn resize(&self, id: ShellId, cols: u16, rows: u16) -> Result<(), Error> {
+ let ws = libc::winsize {
+ ws_row: rows,
+ ws_col: cols,
+ ws_xpixel: 0,
+ ws_ypixel: 0,
+ };
+ let shells = self.shells.lock().expect("not poisoned");
+ let fd = shells.get(&id).ok_or(Error::NoSuchShell)?;
+ // SAFETY: `fd` is a live PTY master
+ let rc = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ as _, &ws) };
+ if rc < 0 {
+ return Err(Error::Resize(io::Error::last_os_error().to_string()));
+ }
+ Ok(())
+ }
+}
+
+struct AsyncPty {
+ fd: AsyncFd<OwnedFd>,
+}
+
+impl AsyncPty {
+ fn new(fd: OwnedFd) -> io::Result<Self> {
+ let raw = fd.as_raw_fd();
+ // SAFETY: standard F_GETFL/F_SETFL round-trip on a valid fd.
+ unsafe {
+ let flags = libc::fcntl(raw, libc::F_GETFL);
+ if flags < 0 {
+ return Err(io::Error::last_os_error());
+ }
+ if libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
+ return Err(io::Error::last_os_error());
+ }
+ }
+ Ok(Self {
+ fd: AsyncFd::new(fd)?,
+ })
+ }
+}
+
+impl AsyncRead for AsyncPty {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ let this = self.get_mut();
+ loop {
+ let mut guard = match this.fd.poll_read_ready(cx) {
+ Poll::Ready(Ok(g)) => g,
+ Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
+ Poll::Pending => return Poll::Pending,
+ };
+ let unfilled = buf.initialize_unfilled();
+ let res = guard.try_io(|inner| {
+ let fd = inner.get_ref().as_raw_fd();
+ // SAFETY: writing into `unfilled`'s own backing storage.
+ let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };
+ if n < 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(libc::EIO) {
+ Ok(0)
+ } else {
+ Err(err)
+ }
+ } else {
+ Ok(n as usize)
+ }
+ });
+ match res {
+ Ok(Ok(n)) => {
+ buf.advance(n);
+ return Poll::Ready(Ok(()));
+ }
+ Ok(Err(e)) => return Poll::Ready(Err(e)),
+ Err(_would_block) => continue,
+ }
+ }
+ }
+}
+
+impl AsyncWrite for AsyncPty {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ let this = self.get_mut();
+ loop {
+ let mut guard = match this.fd.poll_write_ready(cx) {
+ Poll::Ready(Ok(g)) => g,
+ Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
+ Poll::Pending => return Poll::Pending,
+ };
+ let res = guard.try_io(|inner| {
+ let fd = inner.get_ref().as_raw_fd();
+ // SAFETY: reading from `buf` for `buf.len()` bytes.
+ let n = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) };
+ if n < 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(n as usize)
+ }
+ });
+ match res {
+ Ok(r) => return Poll::Ready(r),
+ Err(_would_block) => continue,
+ }
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+}
crates/remowt-endpoints/src/systemd.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-endpoints/src/systemd.rs
@@ -0,0 +1,54 @@
+use bifrostlink::declarative::endpoints;
+use bifrostlink::Config;
+use serde::{Deserialize, Serialize};
+use zbus::proxy;
+use zbus::zvariant::OwnedObjectPath;
+
+pub struct Systemd;
+
+#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
+pub enum Error {
+ #[error("systemd request failed: {0}")]
+ Failed(String),
+}
+
+#[proxy(
+ interface = "org.freedesktop.systemd1.Manager",
+ default_service = "org.freedesktop.systemd1",
+ default_path = "/org/freedesktop/systemd1"
+)]
+trait Manager {
+ fn start_unit(&self, name: &str, mode: &str) -> zbus::Result<OwnedObjectPath>;
+ fn stop_unit(&self, name: &str, mode: &str) -> zbus::Result<OwnedObjectPath>;
+}
+
+async fn manager() -> Result<ManagerProxy<'static>, Error> {
+ let conn = zbus::Connection::system()
+ .await
+ .map_err(|e| Error::Failed(e.to_string()))?;
+ ManagerProxy::new(&conn)
+ .await
+ .map_err(|e| Error::Failed(e.to_string()))
+}
+
+#[endpoints(ns = 5)]
+impl Systemd {
+ #[endpoints(id = 1)]
+ async fn start(&self, unit: String) -> Result<(), Error> {
+ manager()
+ .await?
+ .start_unit(&unit, "replace")
+ .await
+ .map_err(|e| Error::Failed(e.to_string()))?;
+ Ok(())
+ }
+ #[endpoints(id = 2)]
+ async fn stop(&self, unit: String) -> Result<(), Error> {
+ manager()
+ .await?
+ .stop_unit(&unit, "replace")
+ .await
+ .map_err(|e| Error::Failed(e.to_string()))?;
+ Ok(())
+ }
+}
crates/remowt-fs/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-fs/Cargo.toml
+++ /dev/null
@@ -1,15 +0,0 @@
-[package]
-name = "remowt-fs"
-description = "Filesystem endpoint for remowt/bifrostlink"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-camino = { workspace = true, features = ["serde1"] }
-serde = { workspace = true, features = ["derive"] }
-tempfile.workspace = true
-thiserror.workspace = true
-tokio = { workspace = true, features = ["fs"] }
crates/remowt-fs/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-fs/src/lib.rs
+++ /dev/null
@@ -1,105 +0,0 @@
-use std::io::ErrorKind;
-use std::str::FromStr;
-use std::sync::Mutex;
-
-use bifrostlink::declarative::endpoints;
-use bifrostlink::Config;
-use camino::Utf8PathBuf;
-use serde::{Deserialize, Serialize};
-use tempfile::TempDir;
-
-#[derive(Default)]
-pub struct Fs {
- tempdirs: Mutex<Vec<TempDir>>,
-}
-
-impl Fs {
- pub fn new() -> Self {
- Self::default()
- }
-}
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum Error {
- #[error("file not found")]
- NotFound,
- #[error("file name/contents is not utf8")]
- InvalidUtf8,
- #[error("unknown fs error")]
- Unknown,
-}
-
-#[endpoints(ns = 1)]
-impl Fs {
- #[endpoints(id = 1)]
- async fn read_file_tiny(&self, path: Utf8PathBuf) -> Result<Vec<u8>, Error> {
- match tokio::fs::read(path).await {
- Ok(v) => Ok(v),
- Err(e) if e.kind() == ErrorKind::NotFound => Err(Error::NotFound),
- _ => Err(Error::Unknown),
- }
- }
- #[endpoints(id = 2)]
- async fn file_exists(&self, path: Utf8PathBuf) -> bool {
- tokio::fs::try_exists(path).await.unwrap_or(false)
- }
- #[endpoints(id = 3)]
- async fn read_dir_raw(&self, path: Utf8PathBuf) -> Result<Vec<Utf8PathBuf>, Error> {
- let mut dir = match tokio::fs::read_dir(path).await {
- Ok(dir) => dir,
- Err(e) if e.kind() == ErrorKind::NotFound => return Err(Error::NotFound),
- Err(_) => return Err(Error::Unknown),
- };
- let mut out = Vec::new();
- while let Ok(Some(entry)) = dir.next_entry().await {
- let name = Utf8PathBuf::try_from(entry.file_name()).map_err(|_| Error::InvalidUtf8)?;
- out.push(name);
- }
- Ok(out)
- }
- #[endpoints(id = 4)]
- async fn mktemp_dir_raw(&self) -> Result<Utf8PathBuf, Error> {
- let dir = tempfile::Builder::new()
- .prefix("remowt.")
- .tempdir()
- .map_err(|_| Error::Unknown)?;
- let mut tempdirs = self.tempdirs.lock().expect("not poisoned");
- let path = Utf8PathBuf::try_from(dir.path().to_owned()).map_err(|_| Error::InvalidUtf8);
- tempdirs.push(dir);
- path
- }
- #[endpoints(id = 5)]
- async fn rm_file(&self, path: Utf8PathBuf) -> Result<(), Error> {
- match tokio::fs::remove_file(path).await {
- Ok(()) => Ok(()),
- Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
- Err(_) => Err(Error::Unknown),
- }
- }
-}
-
-impl<C: Config> FsClient<C> {
- pub async fn read_file_text(&self, path: impl Into<Utf8PathBuf>) -> Result<String, Error> {
- let v = self
- .read_file_tiny(path.into())
- .await
- .map_err(|_| Error::Unknown)?;
- let v = v?;
- String::from_utf8(v).map_err(|_| Error::InvalidUtf8)
- }
- pub async fn read_file_value<T: FromStr>(
- &self,
- path: impl Into<Utf8PathBuf>,
- ) -> Result<Result<T, T::Err>, Error> {
- let text = self.read_file_text(path).await?;
- Ok(T::from_str(&text))
- }
- pub async fn mktemp_dir(&self) -> Result<Utf8PathBuf, Error> {
- self.mktemp_dir_raw().await.map_err(|_| Error::Unknown)?
- }
- pub async fn read_dir(&self, path: impl Into<Utf8PathBuf>) -> Result<Vec<Utf8PathBuf>, Error> {
- self.read_dir_raw(path.into())
- .await
- .map_err(|_| Error::Unknown)?
- }
-}
crates/remowt-link-shared/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-link-shared/Cargo.toml
+++ b/crates/remowt-link-shared/Cargo.toml
@@ -12,8 +12,5 @@
serde_json.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["fs"] }
-remowt-fs.workspace = true
-remowt-systemd.workspace = true
remowt-ui-prompt.workspace = true
camino = { workspace = true, features = ["serde1"] }
-remowt-pty.workspace = true
crates/remowt-link-shared/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -21,10 +21,6 @@
pub mod plugin;
-pub use remowt_fs::{Error as FsError, Fs, FsClient};
-pub use remowt_pty::{Error as PtyError, Pty, PtyClient, ShellId};
-pub use remowt_systemd::{Error as SystemdError, Systemd, SystemdClient};
-
#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
pub enum ElevateError {
#[error("elevation failed: {0}")]
crates/remowt-nix-daemon/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-nix-daemon/Cargo.toml
+++ /dev/null
@@ -1,18 +0,0 @@
-[package]
-name = "remowt-nix-daemon"
-description = "Nix daemon proxy"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-anyhow.workspace = true
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-camino.workspace = true
-remowt-client.workspace = true
-serde = { workspace = true }
-thiserror.workspace = true
-tokio = { workspace = true, features = ["net", "io-util", "rt", "process"] }
-tracing.workspace = true
-uuid.workspace = true
crates/remowt-nix-daemon/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-nix-daemon/src/lib.rs
+++ /dev/null
@@ -1,65 +0,0 @@
-use std::process::Stdio;
-
-use bifrostlink::declarative::endpoints;
-use bifrostlink::Config;
-use serde::{Deserialize, Serialize};
-use std::result::Result;
-use tokio::process::Command;
-
-pub const NIX_DAEMON_SOCKET: &str = "/nix/var/nix/daemon-socket/socket";
-
-pub struct NixDaemon;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum Error {
- #[error("nix daemon unavailable: {0}")]
- DaemonUnavailable(String),
- #[error("tunnel socket unavailable: {0}")]
- Tunnel(String),
-}
-
-#[endpoints(ns = 4)]
-impl NixDaemon {
- #[endpoints(id = 1)]
- async fn connect_daemon(&self, socket: String) -> Result<(), Error> {
- let mut daemon = tokio::net::UnixStream::connect(NIX_DAEMON_SOCKET)
- .await
- .map_err(|e| Error::DaemonUnavailable(e.to_string()))?;
- let mut tunnel = tokio::net::UnixStream::connect(&socket)
- .await
- .map_err(|e| Error::Tunnel(e.to_string()))?;
- tokio::spawn(async move {
- if let Err(e) = tokio::io::copy_bidirectional(&mut daemon, &mut tunnel).await {
- tracing::debug!("nix daemon tunnel ended: {e}");
- }
- });
- Ok(())
- }
-
- #[endpoints(id = 2)]
- async fn serve_store(&self, store: String, socket: String) -> Result<(), Error> {
- let mut child = Command::new("nix-daemon")
- .arg("--stdio")
- .arg("--store")
- .arg(&store)
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .spawn()
- .map_err(|e| Error::DaemonUnavailable(e.to_string()))?;
- let tunnel = tokio::net::UnixStream::connect(&socket)
- .await
- .map_err(|e| Error::Tunnel(e.to_string()))?;
- let mut stdin = child.stdin.take().expect("piped");
- let mut stdout = child.stdout.take().expect("piped");
- tokio::spawn(async move {
- let mut tunnel = tunnel;
- let (mut tr, mut tw) = tunnel.split();
- let _ = tokio::join!(
- tokio::io::copy(&mut tr, &mut stdin),
- tokio::io::copy(&mut stdout, &mut tw),
- );
- let _ = child.wait().await;
- });
- Ok(())
- }
-}
crates/remowt-plugin/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-plugin/src/lib.rs
+++ b/crates/remowt-plugin/src/lib.rs
@@ -8,7 +8,7 @@
pub mod host;
pub use bifrostlink;
-pub use remowt_link_shared::{self, Address, BifConfig, Fs, Pty, Systemd};
+pub use remowt_link_shared::{self, Address, BifConfig};
pub fn plugin_index() -> Result<u16> {
let arg = std::env::args()
crates/remowt-pty/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-pty/Cargo.toml
+++ /dev/null
@@ -1,23 +0,0 @@
-[package]
-name = "remowt-pty"
-description = "PTY/shell endpoint for remowt"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-camino = { workspace = true, features = ["serde1"] }
-nix = { workspace = true, features = ["process", "term"] }
-serde = { workspace = true, features = ["derive"] }
-thiserror.workspace = true
-tokio = { workspace = true, features = [
- "net",
- "io-util",
- "rt",
- "macros",
- "process",
- "sync",
-] }
-tracing.workspace = true
crates/remowt-pty/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-pty/src/lib.rs
+++ /dev/null
@@ -1,256 +0,0 @@
-use std::collections::HashMap;
-use std::io;
-use std::os::fd::{AsRawFd, OwnedFd};
-use std::pin::Pin;
-use std::process::Stdio;
-use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::{Arc, Mutex};
-use std::task::{Context, Poll};
-
-use bifrostlink::declarative::endpoints;
-use bifrostlink::Config;
-use camino::Utf8PathBuf;
-use nix::libc;
-use nix::pty::{openpty, OpenptyResult, Winsize};
-use serde::{Deserialize, Serialize};
-use tokio::io::unix::AsyncFd;
-use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
-use tokio::net::UnixStream;
-use tracing::{info, warn};
-
-pub type ShellId = u64;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum Error {
- #[error("openpty failed: {0}")]
- Open(String),
- #[error("failed to spawn shell: {0}")]
- Spawn(String),
- #[error("failed to connect to forwarded socket: {0}")]
- Connect(String),
- #[error("no shell with that id")]
- NoSuchShell,
- #[error("resize failed: {0}")]
- Resize(String),
- #[error("io error: {0}")]
- Io(String),
-}
-
-impl From<io::Error> for Error {
- fn from(e: io::Error) -> Self {
- Error::Io(e.to_string())
- }
-}
-
-#[derive(Clone, Default)]
-pub struct Pty {
- shells: Arc<Mutex<HashMap<ShellId, OwnedFd>>>,
- next_id: Arc<AtomicU64>,
-}
-
-impl Pty {
- pub fn new() -> Self {
- Self::default()
- }
-}
-
-#[endpoints(ns = 7)]
-impl Pty {
- #[endpoints(id = 1)]
- async fn open_shell(
- &self,
- socket_path: Utf8PathBuf,
- term: String,
- cols: u16,
- rows: u16,
- ) -> Result<ShellId, Error> {
- let ws = Winsize {
- ws_row: rows,
- ws_col: cols,
- ws_xpixel: 0,
- ws_ypixel: 0,
- };
- let OpenptyResult { master, slave } =
- openpty(Some(&ws), None).map_err(|e| Error::Open(e.to_string()))?;
-
- let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_owned());
-
- let slave_in = slave.try_clone()?;
- let slave_out = slave.try_clone()?;
- let slave_err = slave;
-
- let mut cmd = tokio::process::Command::new(&shell);
- cmd.env("TERM", &term);
- if let Ok(home) = std::env::var("HOME") {
- cmd.current_dir(home);
- }
- cmd.stdin(Stdio::from(slave_in));
- cmd.stdout(Stdio::from(slave_out));
- cmd.stderr(Stdio::from(slave_err));
- // SAFETY: only async-signal-safe calls (setsid, ioctl) before exec.
- unsafe {
- cmd.pre_exec(|| {
- nix::unistd::setsid().map_err(|e| io::Error::from_raw_os_error(e as i32))?;
- if libc::ioctl(0, libc::TIOCSCTTY as _, 0) < 0 {
- return Err(io::Error::last_os_error());
- }
- Ok(())
- });
- }
-
- let mut child = cmd.spawn().map_err(|e| Error::Spawn(e.to_string()))?;
-
- let resize_fd = master.try_clone()?;
- let id = self.next_id.fetch_add(1, Ordering::Relaxed);
- self.shells
- .lock()
- .expect("not poisoned")
- .insert(id, resize_fd);
-
- let sock = match UnixStream::connect(&socket_path).await {
- Ok(s) => s,
- Err(e) => {
- self.shells.lock().expect("not poisoned").remove(&id);
- let _ = child.kill().await;
- return Err(Error::Connect(e.to_string()));
- }
- };
- let pty = AsyncPty::new(master)?;
-
- info!(id, shell, "shell opened");
- let shells = self.shells.clone();
- tokio::spawn(async move {
- let mut pty = pty;
- let mut sock = sock;
- if let Err(e) = tokio::io::copy_bidirectional(&mut pty, &mut sock).await {
- warn!(id, "shell pump ended: {e}");
- }
- let _ = child.kill().await;
- shells.lock().expect("not poisoned").remove(&id);
- info!(id, "shell closed");
- });
-
- Ok(id)
- }
-
- #[endpoints(id = 2)]
- async fn resize(&self, id: ShellId, cols: u16, rows: u16) -> Result<(), Error> {
- let ws = libc::winsize {
- ws_row: rows,
- ws_col: cols,
- ws_xpixel: 0,
- ws_ypixel: 0,
- };
- let shells = self.shells.lock().expect("not poisoned");
- let fd = shells.get(&id).ok_or(Error::NoSuchShell)?;
- // SAFETY: `fd` is a live PTY master
- let rc = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ as _, &ws) };
- if rc < 0 {
- return Err(Error::Resize(io::Error::last_os_error().to_string()));
- }
- Ok(())
- }
-}
-
-struct AsyncPty {
- fd: AsyncFd<OwnedFd>,
-}
-
-impl AsyncPty {
- fn new(fd: OwnedFd) -> io::Result<Self> {
- let raw = fd.as_raw_fd();
- // SAFETY: standard F_GETFL/F_SETFL round-trip on a valid fd.
- unsafe {
- let flags = libc::fcntl(raw, libc::F_GETFL);
- if flags < 0 {
- return Err(io::Error::last_os_error());
- }
- if libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
- return Err(io::Error::last_os_error());
- }
- }
- Ok(Self {
- fd: AsyncFd::new(fd)?,
- })
- }
-}
-
-impl AsyncRead for AsyncPty {
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- let this = self.get_mut();
- loop {
- let mut guard = match this.fd.poll_read_ready(cx) {
- Poll::Ready(Ok(g)) => g,
- Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
- Poll::Pending => return Poll::Pending,
- };
- let unfilled = buf.initialize_unfilled();
- let res = guard.try_io(|inner| {
- let fd = inner.get_ref().as_raw_fd();
- // SAFETY: writing into `unfilled`'s own backing storage.
- let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };
- if n < 0 {
- let err = io::Error::last_os_error();
- if err.raw_os_error() == Some(libc::EIO) {
- Ok(0)
- } else {
- Err(err)
- }
- } else {
- Ok(n as usize)
- }
- });
- match res {
- Ok(Ok(n)) => {
- buf.advance(n);
- return Poll::Ready(Ok(()));
- }
- Ok(Err(e)) => return Poll::Ready(Err(e)),
- Err(_would_block) => continue,
- }
- }
- }
-}
-
-impl AsyncWrite for AsyncPty {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- let this = self.get_mut();
- loop {
- let mut guard = match this.fd.poll_write_ready(cx) {
- Poll::Ready(Ok(g)) => g,
- Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
- Poll::Pending => return Poll::Pending,
- };
- let res = guard.try_io(|inner| {
- let fd = inner.get_ref().as_raw_fd();
- // SAFETY: reading from `buf` for `buf.len()` bytes.
- let n = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) };
- if n < 0 {
- Err(io::Error::last_os_error())
- } else {
- Ok(n as usize)
- }
- });
- match res {
- Ok(r) => return Poll::Ready(r),
- Err(_would_block) => continue,
- }
- }
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Poll::Ready(Ok(()))
- }
-}
crates/remowt-systemd/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-systemd/Cargo.toml
+++ /dev/null
@@ -1,13 +0,0 @@
-[package]
-name = "remowt-systemd"
-description = "systemd control endpoint for remowt/bifrostlink (over D-Bus)"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-serde = { workspace = true, features = ["derive"] }
-thiserror.workspace = true
-zbus = { workspace = true, features = ["tokio"] }
crates/remowt-systemd/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-systemd/src/lib.rs
+++ /dev/null
@@ -1,54 +0,0 @@
-use bifrostlink::declarative::endpoints;
-use bifrostlink::Config;
-use serde::{Deserialize, Serialize};
-use zbus::proxy;
-use zbus::zvariant::OwnedObjectPath;
-
-pub struct Systemd;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum Error {
- #[error("systemd request failed: {0}")]
- Failed(String),
-}
-
-#[proxy(
- interface = "org.freedesktop.systemd1.Manager",
- default_service = "org.freedesktop.systemd1",
- default_path = "/org/freedesktop/systemd1"
-)]
-trait Manager {
- fn start_unit(&self, name: &str, mode: &str) -> zbus::Result<OwnedObjectPath>;
- fn stop_unit(&self, name: &str, mode: &str) -> zbus::Result<OwnedObjectPath>;
-}
-
-async fn manager() -> Result<ManagerProxy<'static>, Error> {
- let conn = zbus::Connection::system()
- .await
- .map_err(|e| Error::Failed(e.to_string()))?;
- ManagerProxy::new(&conn)
- .await
- .map_err(|e| Error::Failed(e.to_string()))
-}
-
-#[endpoints(ns = 5)]
-impl Systemd {
- #[endpoints(id = 1)]
- async fn start(&self, unit: String) -> Result<(), Error> {
- manager()
- .await?
- .start_unit(&unit, "replace")
- .await
- .map_err(|e| Error::Failed(e.to_string()))?;
- Ok(())
- }
- #[endpoints(id = 2)]
- async fn stop(&self, unit: String) -> Result<(), Error> {
- manager()
- .await?
- .stop_unit(&unit, "replace")
- .await
- .map_err(|e| Error::Failed(e.to_string()))?;
- Ok(())
- }
-}
crates/remowt-ui-prompt/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-ui-prompt/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "remowt-ui-prompt"
+description = "Interactive UI prompt endpoint for remowt (D-Bus)"
+version.workspace = true
+edition = "2021"
+license.workspace = true
+
+[dependencies]
+bifrostlink.workspace = true
+bifrostlink-macros.workspace = true
+serde.workspace = true
+serde_json.workspace = true
+thiserror.workspace = true
+tokio = { workspace = true, features = ["io-util", "macros", "process", "rt"] }
+tracing.workspace = true
+zbus = { workspace = true, optional = true }
+
+[features]
+default = ["dbus"]
+dbus = ["dep:zbus"]
crates/remowt-ui-prompt/src/bifrost.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-ui-prompt/src/bifrost.rs
@@ -0,0 +1,109 @@
+use bifrostlink::{Config, Rpc};
+use bifrostlink_macros::endpoints;
+use serde::{Deserialize, Serialize};
+
+use crate::{Error, Prompter, Source};
+
+pub struct PromptEndpoints<P>(pub P);
+
+#[endpoints(ns = 2)]
+impl<P> PromptEndpoints<P>
+where
+ P: Prompter + Send + Sync + 'static,
+{
+ #[endpoints(id = 1, cancel)]
+ async fn prompt_enum(
+ &self,
+ prompt: String,
+ description: String,
+ variants: Vec<String>,
+ source: Vec<Source>,
+ ) -> Result<u32, Error> {
+ let variants: Vec<&str> = variants.iter().map(|v| v.as_str()).collect();
+ self.0
+ .prompt_enum(&prompt, &description, &variants, &source)
+ .await
+ }
+
+ #[endpoints(id = 2, cancel)]
+ async fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: String,
+ description: String,
+ source: Vec<Source>,
+ ) -> Result<String, Error> {
+ self.0
+ .prompt_text(echo, &prompt, &description, &source)
+ .await
+ }
+
+ #[endpoints(id = 3, cancel)]
+ async fn display_text(
+ &self,
+ error: bool,
+ description: String,
+ source: Vec<Source>,
+ ) -> Result<(), Error> {
+ self.0.display_text(error, &description, &source).await
+ }
+}
+
+impl<C: Config> Prompter for PromptEndpointsClient<C>
+where
+ Error: ToString,
+{
+ async fn prompt_enum(
+ &self,
+ prompt: &str,
+ description: &str,
+ variants: &[&str],
+ source: &[Source],
+ ) -> crate::Result<u32> {
+ self.prompt_enum(
+ prompt.to_owned(),
+ description.to_owned(),
+ variants.iter().map(|v| (*v).to_owned()).collect(),
+ source.to_vec(),
+ )
+ .await
+ .map_err(|e| Error::Remote(e.to_string()))?
+ }
+
+ async fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> crate::Result<String> {
+ self.prompt_text(
+ echo,
+ prompt.to_owned(),
+ description.to_owned(),
+ source.to_vec(),
+ )
+ .await
+ .map_err(|e| Error::Remote(e.to_string()))?
+ }
+
+ async fn display_text(
+ &self,
+ error: bool,
+ description: &str,
+ source: &[Source],
+ ) -> crate::Result<()> {
+ self.display_text(error, description.to_owned(), source.to_vec())
+ .await
+ .map_err(|e| Error::Remote(e.to_string()))?
+ }
+}
+
+pub fn serve_prompts<P, C>(rpc: &mut Rpc<C>, prompt: P)
+where
+ P: Prompter + Send + Sync + 'static,
+ C: Config,
+ C::Error: From<Error>,
+{
+ PromptEndpoints(prompt).register_endpoints(rpc);
+}
crates/remowt-ui-prompt/src/dbus.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-ui-prompt/src/dbus.rs
@@ -0,0 +1,135 @@
+use zbus::interface;
+use zbus::{fdo, proxy};
+
+use crate::Source;
+use crate::{BlockingPrompter, Result};
+use crate::{Error, Prompter};
+
+pub struct DbusPrompterInterface<P>(pub P);
+
+#[interface(name = "lach.PolkitInputHandler")]
+impl<P: Prompter + Send + Sync + 'static> DbusPrompterInterface<P> {
+ async fn prompt_radio(
+ &self,
+ prompt: &str,
+ description: &str,
+ source: Vec<Source>,
+ ) -> fdo::Result<bool> {
+ Ok(self.0.prompt_radio(prompt, description, &source).await?)
+ }
+ async fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: Vec<Source>,
+ ) -> fdo::Result<String> {
+ Ok(self
+ .0
+ .prompt_text(echo, prompt, description, &source)
+ .await?)
+ }
+ async fn display_text(
+ &self,
+ error: bool,
+ description: &str,
+ source: Vec<Source>,
+ ) -> fdo::Result<()> {
+ Ok(self.0.display_text(error, description, &source).await?)
+ }
+}
+
+#[proxy(interface = "lach.PolkitInputHandler")]
+pub trait DbusPrompter {
+ async fn prompt_enum(
+ &self,
+ prompt: &str,
+ description: &str,
+ variants: &[&str],
+ source: &[Source],
+ ) -> fdo::Result<u32>;
+ async fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> fdo::Result<String>;
+ async fn display_text(
+ &self,
+ error: bool,
+ description: &str,
+ source: &[Source],
+ ) -> fdo::Result<()>;
+}
+
+impl Prompter for DbusPrompterProxy<'_> {
+ async fn prompt_enum(
+ &self,
+ prompt: &str,
+ description: &str,
+ variants: &[&str],
+ source: &[Source],
+ ) -> Result<u32> {
+ Ok(self
+ .prompt_enum(prompt, description, variants, source)
+ .await?)
+ }
+
+ async fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> Result<String> {
+ Ok(self.prompt_text(echo, prompt, description, source).await?)
+ }
+
+ async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
+ Ok(self.display_text(error, description, source).await?)
+ }
+}
+impl BlockingPrompter for DbusPrompterProxyBlocking<'_> {
+ fn prompt_enum(
+ &self,
+ prompt: &str,
+ description: &str,
+ variants: &[&str],
+ source: &[Source],
+ ) -> Result<u32> {
+ Ok(self.prompt_enum(prompt, description, variants, source)?)
+ }
+
+ fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> Result<String> {
+ Ok(self.prompt_text(echo, prompt, description, source)?)
+ }
+
+ fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
+ Ok(self.display_text(error, description, source)?)
+ }
+}
+
+impl From<fdo::Error> for Error {
+ fn from(value: fdo::Error) -> Self {
+ if matches!(value, fdo::Error::NoReply(_)) {
+ return Self::Cancel;
+ }
+ Self::InputError(format!("{value}"))
+ }
+}
+impl From<Error> for fdo::Error {
+ fn from(value: Error) -> Self {
+ match value {
+ Error::Cancel => fdo::Error::NoReply("input was cancelled".to_owned()),
+ Error::Remote(e) => fdo::Error::NoReply(format!("remote error occured: {e}")),
+ Error::InputError(e) => fdo::Error::Failed(e),
+ }
+ }
+}
crates/remowt-ui-prompt/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-ui-prompt/src/lib.rs
@@ -0,0 +1,201 @@
+use core::fmt;
+use std::borrow::Cow;
+use std::future::Future;
+use std::result;
+
+pub mod bifrost;
+pub mod dbus;
+pub mod rofi;
+
+#[derive(thiserror::Error, Debug, serde::Serialize, serde::Deserialize)]
+pub enum Error {
+ #[error("user has cancelled input")]
+ Cancel,
+ #[error("input error: {0}")]
+ InputError(String),
+ #[error("unknown remote error: {0}")]
+ Remote(String),
+}
+
+pub type Result<T, E = Error> = result::Result<T, E>;
+
+#[cfg_attr(feature = "dbus", derive(zbus::zvariant::Type))]
+#[derive(serde::Serialize, serde::Deserialize, Clone)]
+pub struct Source(pub Cow<'static, str>);
+impl fmt::Display for Source {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "<u>{}</u>", self.0)
+ }
+}
+
+pub trait Prompter: Send + Sync {
+ fn prompt_radio(
+ &self,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> impl Future<Output = Result<bool>> + Send {
+ let fut = self.prompt_enum(prompt, description, &["No", "Yes"], source);
+ async { fut.await.map(|v| v == 1) }
+ }
+ fn prompt_enum(
+ &self,
+ prompt: &str,
+ description: &str,
+ variants: &[&str],
+ source: &[Source],
+ ) -> impl Future<Output = Result<u32>> + Send;
+ fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> impl Future<Output = Result<String>> + Send;
+ fn display_text(
+ &self,
+ error: bool,
+ description: &str,
+ source: &[Source],
+ ) -> impl Future<Output = Result<()>> + Send;
+}
+pub trait BlockingPrompter {
+ fn prompt_radio(&self, prompt: &str, description: &str, source: &[Source]) -> Result<bool> {
+ self.prompt_enum(prompt, description, &["No", "Yes"], source)
+ .map(|v| v == 1)
+ }
+ fn prompt_enum(
+ &self,
+ prompt: &str,
+ description: &str,
+ variants: &[&str],
+ source: &[Source],
+ ) -> Result<u32>;
+ fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> Result<String>;
+ fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()>;
+}
+impl<P> Prompter for &P
+where
+ P: Prompter,
+{
+ fn prompt_radio(
+ &self,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> impl Future<Output = Result<bool>> + Send {
+ (*self).prompt_radio(prompt, description, source)
+ }
+
+ fn prompt_enum(
+ &self,
+ prompt: &str,
+ description: &str,
+ variants: &[&str],
+ source: &[Source],
+ ) -> impl Future<Output = Result<u32>> + Send {
+ (*self).prompt_enum(prompt, description, variants, source)
+ }
+
+ fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> impl Future<Output = Result<String>> + Send {
+ (*self).prompt_text(echo, prompt, description, source)
+ }
+
+ fn display_text(
+ &self,
+ error: bool,
+ description: &str,
+ source: &[Source],
+ ) -> impl Future<Output = Result<()>> + Send {
+ (*self).display_text(error, description, source)
+ }
+}
+
+pub struct PrependSourcePrompter<P> {
+ pub prompter: P,
+ pub source: Vec<Source>,
+ pub description: String,
+}
+impl<P> PrependSourcePrompter<P> {
+ fn source(&self, input: &[Source]) -> Vec<Source> {
+ let mut out = self.source.clone();
+ out.extend(input.iter().cloned());
+ out
+ }
+ fn description(&self, input: &str) -> String {
+ if self.description.is_empty() {
+ input.to_owned()
+ } else if input.is_empty() {
+ self.description.to_owned()
+ } else {
+ format!("{input}\n\n{}", self.description)
+ }
+ }
+}
+impl<P> Prompter for PrependSourcePrompter<P>
+where
+ P: Prompter + Sync,
+{
+ async fn prompt_radio(
+ &self,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> Result<bool> {
+ self.prompter
+ .prompt_radio(prompt, &self.description(description), &self.source(source))
+ .await
+ }
+
+ async fn prompt_enum(
+ &self,
+ prompt: &str,
+ description: &str,
+ variants: &[&str],
+ source: &[Source],
+ ) -> Result<u32> {
+ self.prompter
+ .prompt_enum(
+ prompt,
+ &self.description(description),
+ variants,
+ &self.source(source),
+ )
+ .await
+ }
+
+ async fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> Result<String> {
+ self.prompter
+ .prompt_text(
+ echo,
+ prompt,
+ &self.description(description),
+ &self.source(source),
+ )
+ .await
+ }
+
+ async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
+ self.prompter
+ .display_text(error, &self.description(description), &self.source(source))
+ .await
+ }
+}
crates/remowt-ui-prompt/src/rofi.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-ui-prompt/src/rofi.rs
@@ -0,0 +1,208 @@
+use std::process::Stdio;
+
+use tokio::io::AsyncWriteExt;
+use tokio::process::Command;
+use tracing::trace;
+
+use crate::{Error, Prompter, Result, Source};
+
+#[derive(Clone)]
+pub struct RofiPrompter;
+
+fn fixup_prompt(prompt: &str) -> &str {
+ // Rofi always appends such suffix
+ prompt.strip_suffix(": ").unwrap_or(prompt)
+}
+
+fn rofi_command() -> Command {
+ Command::new(option_env!("ROFI").unwrap_or("rofi"))
+}
+
+impl Prompter for RofiPrompter {
+ async fn prompt_enum(
+ &self,
+ prompt: &str,
+ description: &str,
+ variants: &[&str],
+ source: &[Source],
+ ) -> Result<u32> {
+ trace!("rofi radio");
+ let mut cmd = rofi_command();
+ let mesg = if source.is_empty() {
+ description.to_owned()
+ } else {
+ let mut out = format!("{description}\n\n<b>Requested on ",);
+ for (i, s) in source.iter().enumerate() {
+ if i != 0 {
+ out.push_str(" -> ");
+ }
+ out.push_str(&s.to_string());
+ }
+ out.push_str("</b>");
+ out
+ };
+ cmd.args([
+ "-dmenu",
+ "-mesg",
+ &mesg,
+ "-sync",
+ "-only-match",
+ "-p",
+ fixup_prompt(prompt),
+ "-format",
+ "i",
+ "-markup-rows",
+ ]);
+ cmd.stdin(Stdio::piped());
+ cmd.stdout(Stdio::piped());
+ cmd.kill_on_drop(true);
+ let mut child = cmd
+ .spawn()
+ .map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
+
+ let mut stdin = child.stdin.take().expect("stdin is piped");
+ for var in variants {
+ stdin
+ .write_all(var.replace('\n', " ").as_bytes())
+ .await
+ .map_err(|e| Error::InputError(format!("failed to write rofi variants: {e}")))?;
+ stdin
+ .write_all(b"\n")
+ .await
+ .map_err(|e| Error::InputError(format!("failed to write rofi variants: {e}")))?;
+ }
+ // write_all already flushes, just to be sure.
+ let _ = stdin.flush().await;
+ drop(stdin);
+
+ let out = child
+ .wait_with_output()
+ .await
+ .map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
+ let stdout = out
+ .stdout
+ .strip_suffix(b"\n")
+ .unwrap_or(&out.stdout)
+ .to_owned();
+
+ let id: u32 = String::from_utf8(stdout)
+ .map_err(|e| Error::InputError(format!("rofi produced invalid output: {e}")))?
+ .parse()
+ .map_err(|e| Error::InputError(format!("rofi produced invalid output: {e}")))?;
+ if id as usize >= variants.len() {
+ return Err(Error::InputError("invalid rofi response".to_owned()));
+ }
+
+ Ok(id)
+ }
+
+ async fn prompt_text(
+ &self,
+ echo: bool,
+ prompt: &str,
+ description: &str,
+ source: &[Source],
+ ) -> Result<String> {
+ trace!("rofi text");
+ let mut cmd = rofi_command();
+ let mesg = if source.is_empty() {
+ description.to_owned()
+ } else {
+ let mut out = format!("{description}\n\n<b>Requested on ",);
+ for (i, s) in source.iter().enumerate() {
+ if i != 0 {
+ out.push_str(" -> ");
+ }
+ out.push_str(&s.to_string());
+ }
+ out.push_str("</b>");
+ out
+ };
+ cmd.args(["-dmenu", "-mesg", &mesg, "-p", fixup_prompt(prompt)]);
+ if !echo {
+ cmd.arg("-password");
+ }
+ cmd.stdin(Stdio::null());
+ cmd.stdout(Stdio::piped());
+ cmd.kill_on_drop(true);
+ let child = cmd
+ .spawn()
+ .map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
+
+ let out = child
+ .wait_with_output()
+ .await
+ .map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
+ let stdout = out
+ .stdout
+ .strip_suffix(b"\n")
+ .unwrap_or(&out.stdout)
+ .to_owned();
+
+ Ok(String::from_utf8_lossy(&stdout).to_string())
+ }
+
+ async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
+ trace!("rofi display");
+ let mut cmd = rofi_command();
+ let mut mesg = if source.is_empty() {
+ description.to_owned()
+ } else {
+ let mut out = format!("{description}\n\n<b>Coming from ",);
+ for s in source.iter() {
+ out.push_str(&s.to_string());
+ }
+ out.push_str("</b>");
+ out
+ };
+ if error {
+ mesg.insert_str(0, "<span color=\"red\">");
+ mesg.push_str("</span>");
+ }
+ cmd.args(["-e", &mesg, "-markup"]);
+ cmd.stdin(Stdio::null());
+ cmd.stdout(Stdio::null());
+ cmd.kill_on_drop(true);
+ let mut child = cmd
+ .spawn()
+ .map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
+
+ child
+ .wait()
+ .await
+ .map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
+
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::borrow::Cow;
+
+ use crate::rofi::RofiPrompter;
+ use crate::{PrependSourcePrompter, Prompter as _, Source};
+
+ // #[tokio::test]
+ #[tokio::test]
+ #[ignore = "interactive"]
+ async fn test() {
+ let prompter = PrependSourcePrompter {
+ prompter: RofiPrompter,
+ description: "test".to_owned(),
+ source: vec![Source(Cow::Borrowed("ssh"))],
+ };
+ prompter
+ .prompt_radio("Enable", "Polkit needs access", &[])
+ .await
+ .expect("rofi");
+ prompter
+ .prompt_text(false, "Password", "Polkit needs access", &[])
+ .await
+ .expect("rofi");
+ prompter
+ .display_text(true, "Polkit needs access", &[])
+ .await
+ .expect("rofi");
+ }
+}
crates/ui-prompt/Cargo.tomldiffbeforeafterboth--- a/crates/ui-prompt/Cargo.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-[package]
-name = "remowt-ui-prompt"
-description = "Interactive UI prompt endpoint for remowt (D-Bus)"
-version.workspace = true
-edition = "2021"
-license.workspace = true
-
-[dependencies]
-bifrostlink.workspace = true
-bifrostlink-macros.workspace = true
-serde.workspace = true
-serde_json.workspace = true
-thiserror.workspace = true
-tokio = { workspace = true, features = ["io-util", "macros", "process", "rt"] }
-tracing.workspace = true
-zbus = { workspace = true, optional = true }
-
-[features]
-default = ["dbus"]
-dbus = ["dep:zbus"]
crates/ui-prompt/src/bifrost.rsdiffbeforeafterboth--- a/crates/ui-prompt/src/bifrost.rs
+++ /dev/null
@@ -1,109 +0,0 @@
-use bifrostlink::{Config, Rpc};
-use bifrostlink_macros::endpoints;
-use serde::{Deserialize, Serialize};
-
-use crate::{Error, Prompter, Source};
-
-pub struct PromptEndpoints<P>(pub P);
-
-#[endpoints(ns = 2)]
-impl<P> PromptEndpoints<P>
-where
- P: Prompter + Send + Sync + 'static,
-{
- #[endpoints(id = 1, cancel)]
- async fn prompt_enum(
- &self,
- prompt: String,
- description: String,
- variants: Vec<String>,
- source: Vec<Source>,
- ) -> Result<u32, Error> {
- let variants: Vec<&str> = variants.iter().map(|v| v.as_str()).collect();
- self.0
- .prompt_enum(&prompt, &description, &variants, &source)
- .await
- }
-
- #[endpoints(id = 2, cancel)]
- async fn prompt_text(
- &self,
- echo: bool,
- prompt: String,
- description: String,
- source: Vec<Source>,
- ) -> Result<String, Error> {
- self.0
- .prompt_text(echo, &prompt, &description, &source)
- .await
- }
-
- #[endpoints(id = 3, cancel)]
- async fn display_text(
- &self,
- error: bool,
- description: String,
- source: Vec<Source>,
- ) -> Result<(), Error> {
- self.0.display_text(error, &description, &source).await
- }
-}
-
-impl<C: Config> Prompter for PromptEndpointsClient<C>
-where
- Error: ToString,
-{
- async fn prompt_enum(
- &self,
- prompt: &str,
- description: &str,
- variants: &[&str],
- source: &[Source],
- ) -> crate::Result<u32> {
- self.prompt_enum(
- prompt.to_owned(),
- description.to_owned(),
- variants.iter().map(|v| (*v).to_owned()).collect(),
- source.to_vec(),
- )
- .await
- .map_err(|e| Error::Remote(e.to_string()))?
- }
-
- async fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> crate::Result<String> {
- self.prompt_text(
- echo,
- prompt.to_owned(),
- description.to_owned(),
- source.to_vec(),
- )
- .await
- .map_err(|e| Error::Remote(e.to_string()))?
- }
-
- async fn display_text(
- &self,
- error: bool,
- description: &str,
- source: &[Source],
- ) -> crate::Result<()> {
- self.display_text(error, description.to_owned(), source.to_vec())
- .await
- .map_err(|e| Error::Remote(e.to_string()))?
- }
-}
-
-pub fn serve_prompts<P, C>(rpc: &mut Rpc<C>, prompt: P)
-where
- P: Prompter + Send + Sync + 'static,
- C: Config,
- C::Error: From<Error>,
-{
- PromptEndpoints(prompt).register_endpoints(rpc);
-}
crates/ui-prompt/src/dbus.rsdiffbeforeafterboth--- a/crates/ui-prompt/src/dbus.rs
+++ /dev/null
@@ -1,135 +0,0 @@
-use zbus::interface;
-use zbus::{fdo, proxy};
-
-use crate::Source;
-use crate::{BlockingPrompter, Result};
-use crate::{Error, Prompter};
-
-pub struct DbusPrompterInterface<P>(pub P);
-
-#[interface(name = "lach.PolkitInputHandler")]
-impl<P: Prompter + Send + Sync + 'static> DbusPrompterInterface<P> {
- async fn prompt_radio(
- &self,
- prompt: &str,
- description: &str,
- source: Vec<Source>,
- ) -> fdo::Result<bool> {
- Ok(self.0.prompt_radio(prompt, description, &source).await?)
- }
- async fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: Vec<Source>,
- ) -> fdo::Result<String> {
- Ok(self
- .0
- .prompt_text(echo, prompt, description, &source)
- .await?)
- }
- async fn display_text(
- &self,
- error: bool,
- description: &str,
- source: Vec<Source>,
- ) -> fdo::Result<()> {
- Ok(self.0.display_text(error, description, &source).await?)
- }
-}
-
-#[proxy(interface = "lach.PolkitInputHandler")]
-pub trait DbusPrompter {
- async fn prompt_enum(
- &self,
- prompt: &str,
- description: &str,
- variants: &[&str],
- source: &[Source],
- ) -> fdo::Result<u32>;
- async fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> fdo::Result<String>;
- async fn display_text(
- &self,
- error: bool,
- description: &str,
- source: &[Source],
- ) -> fdo::Result<()>;
-}
-
-impl Prompter for DbusPrompterProxy<'_> {
- async fn prompt_enum(
- &self,
- prompt: &str,
- description: &str,
- variants: &[&str],
- source: &[Source],
- ) -> Result<u32> {
- Ok(self
- .prompt_enum(prompt, description, variants, source)
- .await?)
- }
-
- async fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> Result<String> {
- Ok(self.prompt_text(echo, prompt, description, source).await?)
- }
-
- async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
- Ok(self.display_text(error, description, source).await?)
- }
-}
-impl BlockingPrompter for DbusPrompterProxyBlocking<'_> {
- fn prompt_enum(
- &self,
- prompt: &str,
- description: &str,
- variants: &[&str],
- source: &[Source],
- ) -> Result<u32> {
- Ok(self.prompt_enum(prompt, description, variants, source)?)
- }
-
- fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> Result<String> {
- Ok(self.prompt_text(echo, prompt, description, source)?)
- }
-
- fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
- Ok(self.display_text(error, description, source)?)
- }
-}
-
-impl From<fdo::Error> for Error {
- fn from(value: fdo::Error) -> Self {
- if matches!(value, fdo::Error::NoReply(_)) {
- return Self::Cancel;
- }
- Self::InputError(format!("{value}"))
- }
-}
-impl From<Error> for fdo::Error {
- fn from(value: Error) -> Self {
- match value {
- Error::Cancel => fdo::Error::NoReply("input was cancelled".to_owned()),
- Error::Remote(e) => fdo::Error::NoReply(format!("remote error occured: {e}")),
- Error::InputError(e) => fdo::Error::Failed(e),
- }
- }
-}
crates/ui-prompt/src/lib.rsdiffbeforeafterboth--- a/crates/ui-prompt/src/lib.rs
+++ /dev/null
@@ -1,201 +0,0 @@
-use core::fmt;
-use std::borrow::Cow;
-use std::future::Future;
-use std::result;
-
-pub mod bifrost;
-pub mod dbus;
-pub mod rofi;
-
-#[derive(thiserror::Error, Debug, serde::Serialize, serde::Deserialize)]
-pub enum Error {
- #[error("user has cancelled input")]
- Cancel,
- #[error("input error: {0}")]
- InputError(String),
- #[error("unknown remote error: {0}")]
- Remote(String),
-}
-
-pub type Result<T, E = Error> = result::Result<T, E>;
-
-#[cfg_attr(feature = "dbus", derive(zbus::zvariant::Type))]
-#[derive(serde::Serialize, serde::Deserialize, Clone)]
-pub struct Source(pub Cow<'static, str>);
-impl fmt::Display for Source {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "<u>{}</u>", self.0)
- }
-}
-
-pub trait Prompter: Send + Sync {
- fn prompt_radio(
- &self,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> impl Future<Output = Result<bool>> + Send {
- let fut = self.prompt_enum(prompt, description, &["No", "Yes"], source);
- async { fut.await.map(|v| v == 1) }
- }
- fn prompt_enum(
- &self,
- prompt: &str,
- description: &str,
- variants: &[&str],
- source: &[Source],
- ) -> impl Future<Output = Result<u32>> + Send;
- fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> impl Future<Output = Result<String>> + Send;
- fn display_text(
- &self,
- error: bool,
- description: &str,
- source: &[Source],
- ) -> impl Future<Output = Result<()>> + Send;
-}
-pub trait BlockingPrompter {
- fn prompt_radio(&self, prompt: &str, description: &str, source: &[Source]) -> Result<bool> {
- self.prompt_enum(prompt, description, &["No", "Yes"], source)
- .map(|v| v == 1)
- }
- fn prompt_enum(
- &self,
- prompt: &str,
- description: &str,
- variants: &[&str],
- source: &[Source],
- ) -> Result<u32>;
- fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> Result<String>;
- fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()>;
-}
-impl<P> Prompter for &P
-where
- P: Prompter,
-{
- fn prompt_radio(
- &self,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> impl Future<Output = Result<bool>> + Send {
- (*self).prompt_radio(prompt, description, source)
- }
-
- fn prompt_enum(
- &self,
- prompt: &str,
- description: &str,
- variants: &[&str],
- source: &[Source],
- ) -> impl Future<Output = Result<u32>> + Send {
- (*self).prompt_enum(prompt, description, variants, source)
- }
-
- fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> impl Future<Output = Result<String>> + Send {
- (*self).prompt_text(echo, prompt, description, source)
- }
-
- fn display_text(
- &self,
- error: bool,
- description: &str,
- source: &[Source],
- ) -> impl Future<Output = Result<()>> + Send {
- (*self).display_text(error, description, source)
- }
-}
-
-pub struct PrependSourcePrompter<P> {
- pub prompter: P,
- pub source: Vec<Source>,
- pub description: String,
-}
-impl<P> PrependSourcePrompter<P> {
- fn source(&self, input: &[Source]) -> Vec<Source> {
- let mut out = self.source.clone();
- out.extend(input.iter().cloned());
- out
- }
- fn description(&self, input: &str) -> String {
- if self.description.is_empty() {
- input.to_owned()
- } else if input.is_empty() {
- self.description.to_owned()
- } else {
- format!("{input}\n\n{}", self.description)
- }
- }
-}
-impl<P> Prompter for PrependSourcePrompter<P>
-where
- P: Prompter + Sync,
-{
- async fn prompt_radio(
- &self,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> Result<bool> {
- self.prompter
- .prompt_radio(prompt, &self.description(description), &self.source(source))
- .await
- }
-
- async fn prompt_enum(
- &self,
- prompt: &str,
- description: &str,
- variants: &[&str],
- source: &[Source],
- ) -> Result<u32> {
- self.prompter
- .prompt_enum(
- prompt,
- &self.description(description),
- variants,
- &self.source(source),
- )
- .await
- }
-
- async fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> Result<String> {
- self.prompter
- .prompt_text(
- echo,
- prompt,
- &self.description(description),
- &self.source(source),
- )
- .await
- }
-
- async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
- self.prompter
- .display_text(error, &self.description(description), &self.source(source))
- .await
- }
-}
crates/ui-prompt/src/rofi.rsdiffbeforeafterboth--- a/crates/ui-prompt/src/rofi.rs
+++ /dev/null
@@ -1,208 +0,0 @@
-use std::process::Stdio;
-
-use tokio::io::AsyncWriteExt;
-use tokio::process::Command;
-use tracing::trace;
-
-use crate::{Error, Prompter, Result, Source};
-
-#[derive(Clone)]
-pub struct RofiPrompter;
-
-fn fixup_prompt(prompt: &str) -> &str {
- // Rofi always appends such suffix
- prompt.strip_suffix(": ").unwrap_or(prompt)
-}
-
-fn rofi_command() -> Command {
- Command::new(option_env!("ROFI").unwrap_or("rofi"))
-}
-
-impl Prompter for RofiPrompter {
- async fn prompt_enum(
- &self,
- prompt: &str,
- description: &str,
- variants: &[&str],
- source: &[Source],
- ) -> Result<u32> {
- trace!("rofi radio");
- let mut cmd = rofi_command();
- let mesg = if source.is_empty() {
- description.to_owned()
- } else {
- let mut out = format!("{description}\n\n<b>Requested on ",);
- for (i, s) in source.iter().enumerate() {
- if i != 0 {
- out.push_str(" -> ");
- }
- out.push_str(&s.to_string());
- }
- out.push_str("</b>");
- out
- };
- cmd.args([
- "-dmenu",
- "-mesg",
- &mesg,
- "-sync",
- "-only-match",
- "-p",
- fixup_prompt(prompt),
- "-format",
- "i",
- "-markup-rows",
- ]);
- cmd.stdin(Stdio::piped());
- cmd.stdout(Stdio::piped());
- cmd.kill_on_drop(true);
- let mut child = cmd
- .spawn()
- .map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
-
- let mut stdin = child.stdin.take().expect("stdin is piped");
- for var in variants {
- stdin
- .write_all(var.replace('\n', " ").as_bytes())
- .await
- .map_err(|e| Error::InputError(format!("failed to write rofi variants: {e}")))?;
- stdin
- .write_all(b"\n")
- .await
- .map_err(|e| Error::InputError(format!("failed to write rofi variants: {e}")))?;
- }
- // write_all already flushes, just to be sure.
- let _ = stdin.flush().await;
- drop(stdin);
-
- let out = child
- .wait_with_output()
- .await
- .map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
- let stdout = out
- .stdout
- .strip_suffix(b"\n")
- .unwrap_or(&out.stdout)
- .to_owned();
-
- let id: u32 = String::from_utf8(stdout)
- .map_err(|e| Error::InputError(format!("rofi produced invalid output: {e}")))?
- .parse()
- .map_err(|e| Error::InputError(format!("rofi produced invalid output: {e}")))?;
- if id as usize >= variants.len() {
- return Err(Error::InputError("invalid rofi response".to_owned()));
- }
-
- Ok(id)
- }
-
- async fn prompt_text(
- &self,
- echo: bool,
- prompt: &str,
- description: &str,
- source: &[Source],
- ) -> Result<String> {
- trace!("rofi text");
- let mut cmd = rofi_command();
- let mesg = if source.is_empty() {
- description.to_owned()
- } else {
- let mut out = format!("{description}\n\n<b>Requested on ",);
- for (i, s) in source.iter().enumerate() {
- if i != 0 {
- out.push_str(" -> ");
- }
- out.push_str(&s.to_string());
- }
- out.push_str("</b>");
- out
- };
- cmd.args(["-dmenu", "-mesg", &mesg, "-p", fixup_prompt(prompt)]);
- if !echo {
- cmd.arg("-password");
- }
- cmd.stdin(Stdio::null());
- cmd.stdout(Stdio::piped());
- cmd.kill_on_drop(true);
- let child = cmd
- .spawn()
- .map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
-
- let out = child
- .wait_with_output()
- .await
- .map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
- let stdout = out
- .stdout
- .strip_suffix(b"\n")
- .unwrap_or(&out.stdout)
- .to_owned();
-
- Ok(String::from_utf8_lossy(&stdout).to_string())
- }
-
- async fn display_text(&self, error: bool, description: &str, source: &[Source]) -> Result<()> {
- trace!("rofi display");
- let mut cmd = rofi_command();
- let mut mesg = if source.is_empty() {
- description.to_owned()
- } else {
- let mut out = format!("{description}\n\n<b>Coming from ",);
- for s in source.iter() {
- out.push_str(&s.to_string());
- }
- out.push_str("</b>");
- out
- };
- if error {
- mesg.insert_str(0, "<span color=\"red\">");
- mesg.push_str("</span>");
- }
- cmd.args(["-e", &mesg, "-markup"]);
- cmd.stdin(Stdio::null());
- cmd.stdout(Stdio::null());
- cmd.kill_on_drop(true);
- let mut child = cmd
- .spawn()
- .map_err(|e| Error::InputError(format!("failed to spawn rofi: {e}")))?;
-
- child
- .wait()
- .await
- .map_err(|e| Error::InputError(format!("failed to wait for rofi: {e}")))?;
-
- Ok(())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::borrow::Cow;
-
- use crate::rofi::RofiPrompter;
- use crate::{PrependSourcePrompter, Prompter as _, Source};
-
- // #[tokio::test]
- #[tokio::test]
- #[ignore = "interactive"]
- async fn test() {
- let prompter = PrependSourcePrompter {
- prompter: RofiPrompter,
- description: "test".to_owned(),
- source: vec![Source(Cow::Borrowed("ssh"))],
- };
- prompter
- .prompt_radio("Enable", "Polkit needs access", &[])
- .await
- .expect("rofi");
- prompter
- .prompt_text(false, "Password", "Polkit needs access", &[])
- .await
- .expect("rofi");
- prompter
- .display_text(true, "Polkit needs access", &[])
- .await
- .expect("rofi");
- }
-}