difftreelog
feat drain stdout/stderr after signal
in: trunk
6 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,9 +308,9 @@
[[package]]
name = "bifrostlink"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad2d0e30a2aa432b78f41f9676572f88201d4dc73bc2b7bc90704d2e02b7d062"
+checksum = "910f9286588d13e3dbdbbc1ad4d292656e704bc93e1f41b8a13b48e3a8e95f39"
dependencies = [
"async-trait",
"async_fn_traits",
@@ -327,9 +327,9 @@
[[package]]
name = "bifrostlink-macros"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2121559c45cbe89c4f8d1d741360d5b028577254f6beca053dc02332da85b43"
+checksum = "a0ea5c423c3831c523c8ef78debdf6a64e72b21ec92148a44163a4c25c05dfd0"
dependencies = [
"proc-macro2",
"quote",
@@ -338,9 +338,9 @@
[[package]]
name = "bifrostlink-ports"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e9395c4ccca497b0c50583e6de57aca921c046ae0c10f56030cd2c5a20db05f8"
+checksum = "9e3a9a01ec1b8bd7d44b47cd0183a1465880e241027d9f5afcb076e11704ec70"
dependencies = [
"bifrostlink",
"bytes",
@@ -1835,7 +1835,7 @@
[[package]]
name = "polkit-backend"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"clap",
@@ -2055,7 +2055,7 @@
[[package]]
name = "remowt-agent"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2083,7 +2083,7 @@
[[package]]
name = "remowt-client"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2106,7 +2106,7 @@
[[package]]
name = "remowt-endpoints"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2124,7 +2124,7 @@
[[package]]
name = "remowt-link-shared"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"bifrostlink",
"bytes",
@@ -2138,7 +2138,7 @@
[[package]]
name = "remowt-plugin"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
@@ -2152,7 +2152,7 @@
[[package]]
name = "remowt-polkit-shared"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"nix",
"serde",
@@ -2161,7 +2161,7 @@
[[package]]
name = "remowt-ssh"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"async-trait",
@@ -2189,7 +2189,7 @@
[[package]]
name = "remowt-ui-prompt"
-version = "0.1.4"
+version = "0.1.6"
dependencies = [
"anyhow",
"bifrostlink",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,18 +3,18 @@
resolver = "2"
[workspace.package]
-version = "0.1.4"
+version = "0.1.6"
license = "MIT"
edition = "2021"
repository = "https://git.delta.rocks/r/remowt"
[workspace.dependencies]
-remowt-client = { version = "0.1.3", path = "crates/remowt-client" }
-remowt-polkit-shared = { version = "0.1.3", path = "crates/polkit-shared" }
-remowt-link-shared = { version = "0.1.3", path = "crates/remowt-link-shared" }
-remowt-plugin = { version = "0.1.3", path = "crates/remowt-plugin" }
-remowt-ui-prompt = { version = "0.1.3", path = "crates/remowt-ui-prompt" }
-remowt-endpoints = { version = "0.1.3", path = "crates/remowt-endpoints" }
+remowt-client = { version = "0.1.6", path = "crates/remowt-client" }
+remowt-polkit-shared = { version = "0.1.6", path = "crates/polkit-shared" }
+remowt-link-shared = { version = "0.1.6", path = "crates/remowt-link-shared" }
+remowt-plugin = { version = "0.1.6", path = "crates/remowt-plugin" }
+remowt-ui-prompt = { version = "0.1.6", path = "crates/remowt-ui-prompt" }
+remowt-endpoints = { version = "0.1.6", path = "crates/remowt-endpoints" }
bifrostlink = "0.2.0"
bifrostlink-macros = "0.2.0"
@@ -52,7 +52,9 @@
thiserror = "2.0.18"
[profile.release]
-panic = "abort"
+panic = "unwind"
opt-level = "z"
lto = true
codegen-units = 1
+debug = "full"
+split-debuginfo = "off"
cmds/remowt-agent/src/main.rsdiffbeforeafterboth1use std::borrow::Cow;2use std::collections::{BTreeMap, HashMap};3use std::fs::Permissions;4use std::future::pending;5use std::os::unix::fs::PermissionsExt as _;6use std::path::PathBuf;7use std::sync::{Arc, Mutex, OnceLock};89use bifrostlink::declarative::RemoteEndpoints;10use bifrostlink::Rpc;11use bifrostlink_ports::stdio::from_stdio;12use bifrostlink_ports::unix_socket::from_socket;13use clap::Parser;14use remowt_endpoints::{15 fs::Fs, nix_daemon::NixDaemon, pty::Pty, subprocess::Subprocess, systemd::Systemd,16};17use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};18use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};19use remowt_ui_prompt::bifrost::PromptEndpointsClient;20use remowt_ui_prompt::rofi::RofiPrompter;21use remowt_ui_prompt::{PrependSourcePrompter, Prompter, Source};22use tokio::fs;23use tokio::net::UnixStream;24use tokio::runtime::Builder;25use tokio::task::AbortHandle;26use tracing::{debug, trace};27use zbus::fdo;28use zbus::zvariant::{OwnedValue, Str};29use zbus::{interface, proxy, Connection};30use zbus_polkit::policykit1::Subject;3132use self::helper::{Helper, SocketHelper, SuidHelper};3334pub mod askpass;35pub mod bus;36pub mod editor;37pub mod helper;3839struct CancelTaskOnDrop {40 tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,41 handle: String,42}43impl Drop for CancelTaskOnDrop {44 fn drop(&mut self) {45 debug!("cancel on drop");46 if let Some(task) = self47 .tasks48 .lock()49 .expect("not poisoned")50 .remove(&self.handle)51 {52 task.abort();53 }54 }55}5657struct Agent<H, P> {58 tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,59 helper: H,60 prompter: P,61}62impl<H, P> Agent<H, P> {63 fn new(helper: H, prompter: P) -> Self {64 Agent {65 tasks: Arc::new(Mutex::new(HashMap::new())),66 helper,67 prompter,68 }69 }70}7172#[interface(name = "org.freedesktop.PolicyKit1.AuthenticationAgent")]73impl<H, P> Agent<H, P>74where75 H: Helper + Clone + Send + Sync + 'static,76 P: Prompter + Clone + Send + Sync + 'static,77{78 /// BeginAuthentication method79 #[allow(clippy::too_many_arguments)]80 async fn begin_authentication(81 &self,82 action_id: String,83 message: String,84 _icon_name: String,85 mut details: BTreeMap<String, String>,86 cookie: String,87 identities: Vec<Identity>,88 ) -> zbus::fdo::Result<()> {89 use std::fmt::Write;90 debug!("begin auth");91 let _cancel_guard = Arc::new(OnceLock::new());92 let task = {93 let helper = self.helper.clone();94 let prompter = self.prompter.clone();95 let cookie = cookie.clone();96 let _cancel_guard = _cancel_guard.clone();97 tokio::task::spawn(async move {98 let _cancel_guard = _cancel_guard.clone();99 trace!("conversation task");100 let mut description = format!("{message}\n\n<b>Action id:</b> {action_id}",);101 if let Some(subject) = details.remove("polkit.caller-pid") {102 let _ = write!(description, "\n<b>Caller:</b> ");103 if let Ok(pid) = subject.parse::<u32>() {104 let _ = write!(description, "{}", PidDisplay(pid));105 } else {106 let _ = write!(description, "{}", emphasize("invalid pid"));107 }108 }109 if let Some(subject) = details.remove("polkit.subject-pid") {110 let _ = write!(description, "\n<b>Subject:</b> ");111 if let Ok(pid) = subject.parse::<u32>() {112 let _ = write!(description, "{}", PidDisplay(pid));113 } else {114 let _ = write!(description, "{}", emphasize("invalid pid"));115 }116 }117 let mut prompter = PrependSourcePrompter {118 source: vec![Source(Cow::Borrowed("polkit agent"))],119 description: description.clone(),120 prompter,121 };122123 let identity_displays: Vec<String> =124 identities.iter().map(|v| v.to_string()).collect();125 let identity_displays: Vec<&str> =126 identity_displays.iter().map(|v| v.as_str()).collect();127 debug!("choose identity");128 let choosen_identity = match identity_displays.len() {129 0 => {130 return Err(fdo::Error::AuthFailed(131 "no identity to authenticate as".to_owned(),132 ))133 }134 1 => 0,135 _ => {136 prompter137 .prompt_enum(138 "Identity",139 "Select identity to use for polkit authorization",140 &identity_displays,141 &[],142 )143 .await?144 }145 };146 debug!("identity chosen");147148 let _ = write!(149 description,150 "\n<b>Identity:</b> {}",151 identities[choosen_identity as usize]152 );153 prompter.description = description;154155 prompter.source.push(Source(Cow::Borrowed("polkit daemon")));156157 helper158 .help_me(159 &cookie,160 prompter,161 identities[choosen_identity as usize].clone(),162 )163 .await164 .map_err(|e| fdo::Error::Failed(e.to_string()))?;165 // let connection = Connection::system().await?;166 // let helper = PolkitHelperProxy::new(&connection).await?;167168 Ok(())169 })170 };171 self.tasks172 .lock()173 .unwrap()174 .insert(cookie.clone(), task.abort_handle());175 debug!("abort handle stored");176 let _ = _cancel_guard.set(CancelTaskOnDrop {177 tasks: self.tasks.clone(),178 handle: cookie.clone(),179 });180181 let _ = task.await;182183 Ok(())184 }185186 /// CancelAuthentication method187 async fn cancel_authentication(&self, cookie: &str) -> zbus::fdo::Result<()> {188 debug!("auth cancelled");189 if let Some(abort) = self.tasks.lock().unwrap().remove(cookie) {190 debug!("abort handle found");191 abort.abort();192 }193 // debug!("Authentication cancled ! {cookie}");194 Ok(())195 }196}197198const OBJ_PATH: &str = "/org/freedesktop/PolicyKit1/AuthenticationAgent";199200#[proxy(201 interface = "lach.PolkitHelper",202 default_service = "lach.polkit.helper1",203 default_path = "/lach/PolkitHelper"204)]205trait PolkitHelper {206 fn init_conversation(&self, request: BackendRequest) -> zbus::Result<()>;207}208209#[derive(Parser)]210enum Opts {211 AskPass {212 prompt: String,213 description: String,214 },215 Editor {216 /// Argument to nvim217 path: String,218 },219 RealAgent {220 #[arg(long)]221 path: Option<PathBuf>,222 /// Expect own address to be AgentPrivileged, skip installing polkit agent223 #[arg(long)]224 privileged: bool,225 #[arg(long)]226 local: bool,227 },228 LocalAgent,229}230231fn main() -> anyhow::Result<()> {232 tracing_subscriber::fmt()233 .with_writer(std::io::stderr)234 .without_time()235 .init();236 let opts = Opts::parse();237238 let runtime = Builder::new_current_thread().enable_all().build()?;239240 match opts {241 Opts::AskPass {242 prompt,243 description,244 } => runtime.block_on(askpass::ask(&prompt, description)),245 Opts::LocalAgent => runtime.block_on(main_real()),246 Opts::Editor { path } => runtime.block_on(editor::edit(path)),247 Opts::RealAgent {248 path,249 privileged,250 local,251 } => runtime.block_on(main_real_agent(path, privileged, local)),252 }253}254async fn main_real() -> anyhow::Result<()> {255 let system_conn = Connection::system().await?;256 let helper = SocketHelper {257 fallback: SuidHelper,258 };259 register_auth_agent(&system_conn, Agent::new(helper, RofiPrompter)).await?;260261 let session_conn = Connection::session().await?;262 askpass::serve(&session_conn, RofiPrompter).await?;263264 let _keep_alive = (system_conn, session_conn);265 pending().await266}267async fn main_real_agent(268 path: Option<PathBuf>,269 privileged: bool,270 local: bool,271) -> anyhow::Result<()> {272 let address = if privileged {273 Address::AgentPrivileged274 } else {275 Address::Agent276 };277 let mut rpc = Rpc::<BifConfig>::new(address);278279 Fs::new().register_endpoints(&mut rpc);280 Systemd.register_endpoints(&mut rpc);281 Pty::new().register_endpoints(&mut rpc);282 Subprocess::new().register_endpoints(&mut rpc);283 NixDaemon.register_endpoints(&mut rpc);284285 remowt_plugin::host::serve(&mut rpc);286287 let user_prompter = PromptEndpointsClient::wrap(rpc.remote(Address::User));288 let editor_client = EditorEndpointsClient::wrap(rpc.remote(Address::User));289290 let bus = bus::spawn().await?;291 askpass::serve(&bus.conn, user_prompter.clone()).await?;292 editor::serve(&bus.conn, editor_client).await?;293294 let helpers = tempfile::Builder::new().prefix("remowt-path.").tempdir()?;295 let exe = std::env::current_exe()?;296 let askpass_helper = helpers.path().join("remowt-askpass");297 let editor_helper = helpers.path().join("remowt-editor");298 {299 let script = format!(300 "#!/bin/sh\nexec {} ask-pass \"password\" \"$1\"\n",301 sh_quote(&exe.to_string_lossy())302 );303 fs::write(&askpass_helper, script).await?;304 fs::set_permissions(&askpass_helper, Permissions::from_mode(0o755)).await?;305 }306 {307 let script = format!(308 "#!/bin/sh\nexec {} editor \"$1\"\n",309 sh_quote(&exe.to_string_lossy())310 );311 fs::write(&editor_helper, script).await?;312 fs::set_permissions(&editor_helper, Permissions::from_mode(0o755)).await?;313 }314315 // Safety: Hoping tokio own threads won't read any of those...316 unsafe {317 prepend_path(helpers.path());318 std::env::set_var("SUDO_ASKPASS", &askpass_helper);319 std::env::set_var("SSH_ASKPASS", &askpass_helper);320 std::env::set_var("SSH_ASKPASS_REQUIRE", "force");321 std::env::set_var("EDITOR", &editor_helper);322 std::env::set_var("VISUAL", &editor_helper);323 std::env::set_var("DBUS_SESSION_BUS_ADDRESS", &bus.address);324 }325326 let port = match path {327 Some(path) => from_socket(UnixStream::connect(path).await?),328 None => from_stdio(),329 };330 rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));331332 let polkit_conn = if !privileged && !local {333 // The unprivileged agent doubles as a polkit authentication agent so334 // `run0` (e.g. our own elevation) routes its prompt to the User over335 // bifrost instead of failing on a tty-less session.336 let conn = Connection::system().await?;337 let helper = SocketHelper {338 fallback: SuidHelper,339 };340 register_auth_agent(&conn, Agent::new(helper, user_prompter)).await?;341 Some(conn)342 } else {343 None344 };345346 let _keep_alive = (bus, helpers, polkit_conn);347 pending().await348}349350async fn register_auth_agent<H, P>(conn: &Connection, agent: Agent<H, P>) -> anyhow::Result<()>351where352 H: Helper + Clone + Send + Sync + 'static,353 P: Prompter + Clone + Send + Sync + 'static,354{355 let proxy = zbus_polkit::policykit1::AuthorityProxy::new(conn).await?;356 conn.object_server().at(OBJ_PATH, agent).await?;357358 let subject = auth_agent_subject()?;359 proxy360 .register_authentication_agent(&subject, "C", OBJ_PATH)361 .await?;362 debug!(kind = subject.subject_kind, "registered polkit agent");363 Ok(())364}365366fn auth_agent_subject() -> anyhow::Result<Subject> {367 let mut details = HashMap::new();368 if let Ok(session_id) = std::env::var("XDG_SESSION_ID") {369 let val: OwnedValue = Str::from(session_id).into();370 details.insert("session-id".to_string(), val);371 return Ok(Subject {372 subject_kind: "unix-session".to_string(),373 subject_details: details,374 });375 }376377 details.insert("pid".to_string(), OwnedValue::from(std::process::id()));378 Ok(Subject {379 subject_kind: "unix-process".to_string(),380 subject_details: details,381 })382}383384fn sh_quote(s: &str) -> String {385 format!("'{}'", s.replace('\'', "'\\''"))386}387388/// Prepend `dir` to the process `PATH`.389///390/// # SAFETY391///392/// Same as `set_var`393unsafe fn prepend_path(dir: &std::path::Path) {394 let value = match std::env::var_os("PATH") {395 Some(existing) => {396 let mut v = dir.as_os_str().to_owned();397 v.push(":");398 v.push(existing);399 v400 }401 None => dir.as_os_str().to_owned(),402 };403 unsafe {404 std::env::set_var("PATH", value);405 }406}1use std::borrow::Cow;2use std::collections::{BTreeMap, HashMap};3use std::fs::Permissions;4use std::future::pending;5use std::io;6use std::os::unix::fs::PermissionsExt as _;7use std::path::PathBuf;8use std::sync::{Arc, Mutex, OnceLock};910use bifrostlink::declarative::RemoteEndpoints;11use bifrostlink::Rpc;12use bifrostlink_ports::stdio::from_stdio;13use bifrostlink_ports::unix_socket::from_socket;14use clap::Parser;15use remowt_endpoints::{16 fs::Fs, nix_daemon::NixDaemon, pty::Pty, subprocess::Subprocess, systemd::Systemd,17};18use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};19use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};20use remowt_ui_prompt::bifrost::PromptEndpointsClient;21use remowt_ui_prompt::rofi::RofiPrompter;22use remowt_ui_prompt::{PrependSourcePrompter, Prompter, Source};23use tokio::fs;24use tokio::net::UnixStream;25use tokio::runtime::Builder;26use tokio::task::AbortHandle;27use tracing::{debug, trace};28use zbus::fdo;29use zbus::zvariant::{OwnedValue, Str};30use zbus::{interface, proxy, Connection};31use zbus_polkit::policykit1::Subject;3233use self::helper::{Helper, SocketHelper, SuidHelper};3435pub mod askpass;36pub mod bus;37pub mod editor;38pub mod helper;3940struct CancelTaskOnDrop {41 tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,42 handle: String,43}44impl Drop for CancelTaskOnDrop {45 fn drop(&mut self) {46 debug!("cancel on drop");47 if let Some(task) = self48 .tasks49 .lock()50 .expect("not poisoned")51 .remove(&self.handle)52 {53 task.abort();54 }55 }56}5758struct Agent<H, P> {59 tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,60 helper: H,61 prompter: P,62}63impl<H, P> Agent<H, P> {64 fn new(helper: H, prompter: P) -> Self {65 Agent {66 tasks: Arc::new(Mutex::new(HashMap::new())),67 helper,68 prompter,69 }70 }71}7273#[interface(name = "org.freedesktop.PolicyKit1.AuthenticationAgent")]74impl<H, P> Agent<H, P>75where76 H: Helper + Clone + Send + Sync + 'static,77 P: Prompter + Clone + Send + Sync + 'static,78{79 /// BeginAuthentication method80 #[allow(clippy::too_many_arguments)]81 async fn begin_authentication(82 &self,83 action_id: String,84 message: String,85 _icon_name: String,86 mut details: BTreeMap<String, String>,87 cookie: String,88 identities: Vec<Identity>,89 ) -> zbus::fdo::Result<()> {90 use std::fmt::Write;91 debug!("begin auth");92 let _cancel_guard = Arc::new(OnceLock::new());93 let task = {94 let helper = self.helper.clone();95 let prompter = self.prompter.clone();96 let cookie = cookie.clone();97 let _cancel_guard = _cancel_guard.clone();98 tokio::task::spawn(async move {99 let _cancel_guard = _cancel_guard.clone();100 trace!("conversation task");101 let mut description = format!("{message}\n\n<b>Action id:</b> {action_id}",);102 if let Some(subject) = details.remove("polkit.caller-pid") {103 let _ = write!(description, "\n<b>Caller:</b> ");104 if let Ok(pid) = subject.parse::<u32>() {105 let _ = write!(description, "{}", PidDisplay(pid));106 } else {107 let _ = write!(description, "{}", emphasize("invalid pid"));108 }109 }110 if let Some(subject) = details.remove("polkit.subject-pid") {111 let _ = write!(description, "\n<b>Subject:</b> ");112 if let Ok(pid) = subject.parse::<u32>() {113 let _ = write!(description, "{}", PidDisplay(pid));114 } else {115 let _ = write!(description, "{}", emphasize("invalid pid"));116 }117 }118 let mut prompter = PrependSourcePrompter {119 source: vec![Source(Cow::Borrowed("polkit agent"))],120 description: description.clone(),121 prompter,122 };123124 let identity_displays: Vec<String> =125 identities.iter().map(|v| v.to_string()).collect();126 let identity_displays: Vec<&str> =127 identity_displays.iter().map(|v| v.as_str()).collect();128 debug!("choose identity");129 let choosen_identity = match identity_displays.len() {130 0 => {131 return Err(fdo::Error::AuthFailed(132 "no identity to authenticate as".to_owned(),133 ))134 }135 1 => 0,136 _ => {137 prompter138 .prompt_enum(139 "Identity",140 "Select identity to use for polkit authorization",141 &identity_displays,142 &[],143 )144 .await?145 }146 };147 debug!("identity chosen");148149 let _ = write!(150 description,151 "\n<b>Identity:</b> {}",152 identities[choosen_identity as usize]153 );154 prompter.description = description;155156 prompter.source.push(Source(Cow::Borrowed("polkit daemon")));157158 helper159 .help_me(160 &cookie,161 prompter,162 identities[choosen_identity as usize].clone(),163 )164 .await165 .map_err(|e| fdo::Error::Failed(e.to_string()))?;166 // let connection = Connection::system().await?;167 // let helper = PolkitHelperProxy::new(&connection).await?;168169 Ok(())170 })171 };172 self.tasks173 .lock()174 .unwrap()175 .insert(cookie.clone(), task.abort_handle());176 debug!("abort handle stored");177 let _ = _cancel_guard.set(CancelTaskOnDrop {178 tasks: self.tasks.clone(),179 handle: cookie.clone(),180 });181182 let _ = task.await;183184 Ok(())185 }186187 /// CancelAuthentication method188 async fn cancel_authentication(&self, cookie: &str) -> zbus::fdo::Result<()> {189 debug!("auth cancelled");190 if let Some(abort) = self.tasks.lock().unwrap().remove(cookie) {191 debug!("abort handle found");192 abort.abort();193 }194 // debug!("Authentication cancled ! {cookie}");195 Ok(())196 }197}198199const OBJ_PATH: &str = "/org/freedesktop/PolicyKit1/AuthenticationAgent";200201#[proxy(202 interface = "lach.PolkitHelper",203 default_service = "lach.polkit.helper1",204 default_path = "/lach/PolkitHelper"205)]206trait PolkitHelper {207 fn init_conversation(&self, request: BackendRequest) -> zbus::Result<()>;208}209210#[derive(Parser)]211enum Opts {212 AskPass {213 prompt: String,214 description: String,215 },216 Editor {217 /// Argument to nvim218 path: String,219 },220 RealAgent {221 #[arg(long)]222 path: Option<PathBuf>,223 /// Expect own address to be AgentPrivileged, skip installing polkit agent224 #[arg(long)]225 privileged: bool,226 #[arg(long)]227 local: bool,228 },229 LocalAgent,230}231232fn main() -> anyhow::Result<()> {233 tracing_subscriber::fmt()234 .with_writer(io::stderr)235 .without_time()236 .init();237 let opts = Opts::parse();238239 let runtime = Builder::new_current_thread().enable_all().build()?;240241 match opts {242 Opts::AskPass {243 prompt,244 description,245 } => runtime.block_on(askpass::ask(&prompt, description)),246 Opts::LocalAgent => runtime.block_on(main_real()),247 Opts::Editor { path } => runtime.block_on(editor::edit(path)),248 Opts::RealAgent {249 path,250 privileged,251 local,252 } => runtime.block_on(main_real_agent(path, privileged, local)),253 }254}255async fn main_real() -> anyhow::Result<()> {256 let system_conn = Connection::system().await?;257 let helper = SocketHelper {258 fallback: SuidHelper,259 };260 register_auth_agent(&system_conn, Agent::new(helper, RofiPrompter)).await?;261262 let session_conn = Connection::session().await?;263 askpass::serve(&session_conn, RofiPrompter).await?;264265 let _keep_alive = (system_conn, session_conn);266 pending().await267}268async fn main_real_agent(269 path: Option<PathBuf>,270 privileged: bool,271 local: bool,272) -> anyhow::Result<()> {273 let address = if privileged {274 Address::AgentPrivileged275 } else {276 Address::Agent277 };278 let mut rpc = Rpc::<BifConfig>::new(address);279280 Fs::new().register_endpoints(&mut rpc);281 Systemd.register_endpoints(&mut rpc);282 Pty::new().register_endpoints(&mut rpc);283 Subprocess::new().register_endpoints(&mut rpc);284 NixDaemon.register_endpoints(&mut rpc);285286 remowt_plugin::host::serve(&mut rpc);287288 let user_prompter = PromptEndpointsClient::wrap(rpc.remote(Address::User));289 let editor_client = EditorEndpointsClient::wrap(rpc.remote(Address::User));290291 let bus = bus::spawn().await?;292 askpass::serve(&bus.conn, user_prompter.clone()).await?;293 editor::serve(&bus.conn, editor_client).await?;294295 let helpers = tempfile::Builder::new().prefix("remowt-path.").tempdir()?;296 let exe = std::env::current_exe()?;297 let askpass_helper = helpers.path().join("remowt-askpass");298 let editor_helper = helpers.path().join("remowt-editor");299 {300 let script = format!(301 "#!/bin/sh\nexec {} ask-pass \"password\" \"$1\"\n",302 sh_quote(&exe.to_string_lossy())303 );304 fs::write(&askpass_helper, script).await?;305 fs::set_permissions(&askpass_helper, Permissions::from_mode(0o755)).await?;306 }307 {308 let script = format!(309 "#!/bin/sh\nexec {} editor \"$1\"\n",310 sh_quote(&exe.to_string_lossy())311 );312 fs::write(&editor_helper, script).await?;313 fs::set_permissions(&editor_helper, Permissions::from_mode(0o755)).await?;314 }315316 // Safety: Hoping tokio own threads won't read any of those...317 unsafe {318 prepend_path(helpers.path());319 std::env::set_var("SUDO_ASKPASS", &askpass_helper);320 std::env::set_var("SSH_ASKPASS", &askpass_helper);321 std::env::set_var("SSH_ASKPASS_REQUIRE", "force");322 std::env::set_var("EDITOR", &editor_helper);323 std::env::set_var("VISUAL", &editor_helper);324 std::env::set_var("DBUS_SESSION_BUS_ADDRESS", &bus.address);325 }326327 let port = match path {328 Some(path) => from_socket(UnixStream::connect(path).await?),329 None => from_stdio(),330 };331 rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));332333 let polkit_conn = if !privileged && !local {334 // The unprivileged agent doubles as a polkit authentication agent so335 // `run0` (e.g. our own elevation) routes its prompt to the User over336 // bifrost instead of failing on a tty-less session.337 let conn = Connection::system().await?;338 let helper = SocketHelper {339 fallback: SuidHelper,340 };341 register_auth_agent(&conn, Agent::new(helper, user_prompter)).await?;342 Some(conn)343 } else {344 None345 };346347 let _keep_alive = (bus, helpers, polkit_conn);348 pending().await349}350351async fn register_auth_agent<H, P>(conn: &Connection, agent: Agent<H, P>) -> anyhow::Result<()>352where353 H: Helper + Clone + Send + Sync + 'static,354 P: Prompter + Clone + Send + Sync + 'static,355{356 let proxy = zbus_polkit::policykit1::AuthorityProxy::new(conn).await?;357 conn.object_server().at(OBJ_PATH, agent).await?;358359 let subject = auth_agent_subject()?;360 proxy361 .register_authentication_agent(&subject, "C", OBJ_PATH)362 .await?;363 debug!(kind = subject.subject_kind, "registered polkit agent");364 Ok(())365}366367fn auth_agent_subject() -> anyhow::Result<Subject> {368 let mut details = HashMap::new();369 if let Ok(session_id) = std::env::var("XDG_SESSION_ID") {370 let val: OwnedValue = Str::from(session_id).into();371 details.insert("session-id".to_string(), val);372 return Ok(Subject {373 subject_kind: "unix-session".to_string(),374 subject_details: details,375 });376 }377378 details.insert("pid".to_string(), OwnedValue::from(std::process::id()));379 Ok(Subject {380 subject_kind: "unix-process".to_string(),381 subject_details: details,382 })383}384385fn sh_quote(s: &str) -> String {386 format!("'{}'", s.replace('\'', "'\\''"))387}388389/// Prepend `dir` to the process `PATH`.390///391/// # SAFETY392///393/// Same as `set_var`394unsafe fn prepend_path(dir: &std::path::Path) {395 let value = match std::env::var_os("PATH") {396 Some(existing) => {397 let mut v = dir.as_os_str().to_owned();398 v.push(":");399 v.push(existing);400 v401 }402 None => dir.as_os_str().to_owned(),403 };404 unsafe {405 std::env::set_var("PATH", value);406 }407}crates/remowt-client/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -17,13 +17,15 @@
use russh::keys::ssh_key::PublicKey;
use russh::Channel;
use tempfile::TempDir;
+use tokio::io::AsyncRead;
use tokio::net::UnixListener;
use tokio::sync::oneshot;
+use tokio::task::JoinHandle;
use tokio::{
fs,
- io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, DuplexStream},
+ io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},
};
-use tracing::{debug, warn};
+use tracing::{debug, info, warn};
use uuid::Uuid;
pub mod editor;
@@ -97,7 +99,7 @@
let mut child = SshExecChild::from_exec(ch);
drop(child.stdin);
- drain_stderr(child.stderr, cmd.to_owned());
+ drain_to_tracing(child.stderr, cmd.to_owned(), true);
let mut out = Vec::new();
child.stdout.read_to_end(&mut out).await?;
@@ -317,7 +319,7 @@
.await?;
let child = SshExecChild::from_exec(cmd_chan);
- drain_stderr(child.stderr, "agent".to_owned());
+ drain_to_tracing(child.stderr, "agent".to_owned(), true);
rpc.add_direct(
Address::Agent,
child_port(child.stdout, child.stdin),
@@ -518,20 +520,33 @@
}
}
-fn drain_stderr(stream: DuplexStream, context: String) {
+pub(crate) fn drain_to_tracing(
+ stream: impl AsyncRead + Unpin + 'static + Send,
+ context: String,
+ stderr: bool,
+) -> JoinHandle<()> {
tokio::spawn(async move {
- let mut reader = BufReader::new(stream).lines();
+ let mut reader = BufReader::new(stream);
+ let mut buf = Vec::with_capacity(4096);
loop {
- match reader.next_line().await {
- Ok(Some(line)) => warn!(context = %context, "{line}"),
- Ok(None) => break,
+ buf.clear();
+ match reader.read_until(b'\n', &mut buf).await {
+ Ok(0) => break,
+ Ok(_) => {
+ let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));
+ if stderr {
+ warn!(context = %context, "{line}");
+ } else {
+ info!(context = %context, "{line}");
+ }
+ }
Err(e) => {
- warn!(context = %context, "stderr read failed: {e}");
+ warn!(context = %context, "child stdio read failed: {e}");
break;
}
}
}
- });
+ })
}
fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
crates/remowt-client/src/subprocess.rsdiffbeforeafterboth--- a/crates/remowt-client/src/subprocess.rs
+++ b/crates/remowt-client/src/subprocess.rs
@@ -6,13 +6,13 @@
use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
use remowt_link_shared::BifConfig;
use serde::de::DeserializeOwned;
-use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _, BufReader};
+use tokio::io::AsyncWriteExt as _;
use tokio::select;
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
use tracing::{debug, info, warn};
use crate::forwarded::{RemowtListener, RemowtStream};
-use crate::Remowt;
+use crate::{drain_to_tracing, Remowt};
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum StdioMode {
@@ -62,12 +62,24 @@
client,
} = self;
drop(stdin);
- drop(stdout);
- drop(stderr);
- client
- .wait(id)
- .await?
- .map_err(|e| anyhow!("agent wait failed: {e}"))
+ let drain_out = async move {
+ if let Some(s) = stdout {
+ drain_to_tracing(s, "<child stdout>".to_owned(), false).await;
+ }
+ };
+ let drain_err = async move {
+ if let Some(s) = stderr {
+ drain_to_tracing(s, "<child stderr>".to_owned(), true).await;
+ }
+ };
+ let wait = async move {
+ client
+ .wait(id)
+ .await?
+ .map_err(|e| anyhow!("agent wait failed: {e}"))
+ };
+ let (code, _, _) = tokio::join!(wait, drain_out, drain_err);
+ code
}
pub async fn kill(&self, signal: i32) -> Result<()> {
@@ -163,7 +175,7 @@
);
let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
- let stdout_stream = handle_output(stdout, stdout_res?, &program, false);
+ let stdout_stream = handle_output(stdout, stdout_res?, &program);
let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
Ok(RemowtChild {
@@ -215,18 +227,13 @@
}
}
-fn handle_output(
- mode: StdioMode,
- s: Option<RemowtStream>,
- program: &str,
- is_stderr: bool,
-) -> Option<RemowtStream> {
+fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
match mode {
StdioMode::Pipe => s,
StdioMode::Inherit => {
if let Some(s) = s {
let program = program.to_owned();
- tokio::spawn(pump_to_tracing(s, program, is_stderr));
+ tokio::spawn(drain_to_tracing(s, program, false));
}
None
}
@@ -244,7 +251,7 @@
StderrMode::Inherit => {
if let Some(s) = s {
let program = program.to_owned();
- tokio::spawn(pump_to_tracing(s, program, true));
+ tokio::spawn(drain_to_tracing(s, program, true));
}
None
}
@@ -252,26 +259,6 @@
}
}
-async fn pump_to_tracing(stream: RemowtStream, program: String, is_stderr: bool) {
- let mut reader = BufReader::new(stream).lines();
- loop {
- match reader.next_line().await {
- Ok(Some(line)) => {
- if is_stderr {
- warn!(program, "{line}");
- } else {
- info!(program, "{line}");
- }
- }
- Ok(None) => break,
- Err(e) => {
- warn!(program, "child log stream error: {e}");
- break;
- }
- }
- }
-}
-
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
@@ -422,6 +409,27 @@
}
};
+ while let Some(e) = err.next().await {
+ if let Ok(line) = e {
+ warn!(program = %program, "{line}");
+ }
+ }
+ if want_stdout {
+ if let Some(out_bytes) = out_bytes.as_mut() {
+ while let Some(o) = out_bytes.next().await {
+ if let Ok(chunk) = o {
+ buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);
+ }
+ }
+ }
+ } else if let Some(out_lines) = out_lines.as_mut() {
+ while let Some(o) = out_lines.next().await {
+ if let Ok(line) = o {
+ info!(program = %program, "{line}");
+ }
+ }
+ }
+
match exit {
Some(0) => Ok(buf),
Some(c) => bail!("command '{line}' failed with status {c}"),
crates/remowt-link-shared/src/port.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/port.rs
+++ b/crates/remowt-link-shared/src/port.rs
@@ -3,10 +3,8 @@
use bifrostlink::Port;
use bytes::{Bytes, BytesMut};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+use tokio::select;
-/// Wire a length-prefixed duplex byte stream (e.g. a child process's
-/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
-/// length followed by that many payload bytes.
pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
where
R: AsyncRead + Unpin + Send + 'static,
@@ -18,13 +16,13 @@
let len = match reader.read_u32().await {
Ok(len) => len,
Err(e) => {
- tracing::error!("child read failed: {e}");
+ log_read_end(&e);
break;
}
};
let mut buf = BytesMut::zeroed(len as usize);
if let Err(e) = reader.read_exact(&mut buf).await {
- tracing::error!("child read failed: {e}");
+ log_read_end(&e);
break;
}
if tx.send(buf.freeze()).is_err() {
@@ -35,15 +33,45 @@
let write_task = async move {
while let Some(msg) = rx.recv().await {
if let Err(e) = write_frame(&mut writer, msg).await {
- tracing::error!("child write failed: {e}");
+ log_write_end(&e);
break;
}
}
};
- tokio::join!(read_task, write_task);
+ select! {
+ _ = read_task => {},
+ _ = write_task => {},
+ }
})
}
+fn log_read_end(e: &io::Error) {
+ if matches!(
+ e.kind(),
+ io::ErrorKind::UnexpectedEof
+ | io::ErrorKind::BrokenPipe
+ | io::ErrorKind::ConnectionReset
+ | io::ErrorKind::ConnectionAborted
+ ) {
+ tracing::debug!("child read ended: {e}");
+ } else {
+ tracing::error!("child read failed: {e}");
+ }
+}
+
+fn log_write_end(e: &io::Error) {
+ if matches!(
+ e.kind(),
+ io::ErrorKind::BrokenPipe
+ | io::ErrorKind::ConnectionReset
+ | io::ErrorKind::ConnectionAborted
+ ) {
+ tracing::debug!("child write ended: {e}");
+ } else {
+ tracing::error!("child write failed: {e}");
+ }
+}
+
async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
let len = u32::try_from(msg.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;