git.delta.rocks / remowt / refs/commits / 113f1e5ab113

difftreelog

fix privileged agent plugins

tyzsxvmrYaroslav Bolyukin2026-06-12parent: #62007a6.patch.diff
in: trunk

4 files changed

modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -15,12 +15,13 @@
 use remowt_link_shared::{Address, BifConfig, Fs, Pty, Systemd};
 use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};
 use remowt_ui_prompt::bifrost::PromptEndpointsClient;
+use remowt_ui_prompt::rofi::RofiPrompter;
 use remowt_ui_prompt::{PrependSourcePrompter, Prompter, Source};
 use tokio::fs;
 use tokio::net::UnixStream;
 use tokio::runtime::Builder;
 use tokio::task::AbortHandle;
-use tracing::{info, trace};
+use tracing::{debug, info, trace};
 use zbus::fdo;
 use zbus::zvariant::{OwnedValue, Str};
 use zbus::{interface, proxy, Connection};
@@ -39,7 +40,7 @@
 }
 impl Drop for CancelTaskOnDrop {
 	fn drop(&mut self) {
-		info!("cancel on drop");
+		debug!("cancel on drop");
 		if let Some(task) = self
 			.tasks
 			.lock()
@@ -121,7 +122,7 @@
 					identities.iter().map(|v| v.to_string()).collect();
 				let identity_displays: Vec<&str> =
 					identity_displays.iter().map(|v| v.as_str()).collect();
-				info!("choose identity");
+				debug!("choose identity");
 				let choosen_identity = match identity_displays.len() {
 					0 => {
 						return Err(fdo::Error::AuthFailed(
@@ -140,7 +141,7 @@
 							.await?
 					}
 				};
-				info!("identity chosen");
+				debug!("identity chosen");
 
 				let _ = write!(
 					description,
@@ -169,7 +170,7 @@
 			.lock()
 			.unwrap()
 			.insert(cookie.clone(), task.abort_handle());
-		info!("abort handle stored");
+		debug!("abort handle stored");
 		let _ = _cancel_guard.set(CancelTaskOnDrop {
 			tasks: self.tasks.clone(),
 			handle: cookie.clone(),
@@ -182,9 +183,9 @@
 
 	/// CancelAuthentication method
 	async fn cancel_authentication(&self, cookie: &str) -> zbus::fdo::Result<()> {
-		info!("auth cancelled");
+		debug!("auth cancelled");
 		if let Some(abort) = self.tasks.lock().unwrap().remove(cookie) {
-			info!("abort handle found");
+			debug!("abort handle found");
 			abort.abort();
 		}
 		// debug!("Authentication cancled ! {cookie}");
@@ -220,6 +221,7 @@
 		#[arg(long)]
 		privileged: bool,
 	},
+	LocalAgent,
 }
 
 fn main() -> anyhow::Result<()> {
@@ -227,6 +229,7 @@
 	// so anything written there would corrupt the stream.
 	tracing_subscriber::fmt()
 		.with_writer(std::io::stderr)
+		.without_time()
 		.init();
 	let opts = Opts::parse();
 
@@ -237,10 +240,21 @@
 			prompt,
 			description,
 		} => runtime.block_on(askpass::ask(&prompt, description)),
+		Opts::LocalAgent => runtime.block_on(main_real()),
 		Opts::Editor { path } => runtime.block_on(editor::edit(path)),
 		Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),
 	}
 }
+async fn main_real() -> anyhow::Result<()> {
+	let conn = Connection::system().await?;
+	let helper = SocketHelper {
+		fallback: SuidHelper,
+	};
+	register_auth_agent(&conn, Agent::new(helper, RofiPrompter)).await?;
+
+	let _conn = conn;
+	pending().await
+}
 async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {
 	let address = if privileged {
 		Address::AgentPrivileged
@@ -330,7 +344,7 @@
 	proxy
 		.register_authentication_agent(&subject, "C", OBJ_PATH)
 		.await?;
-	info!(kind = subject.subject_kind, "registered polkit agent");
+	debug!(kind = subject.subject_kind, "registered polkit agent");
 	Ok(())
 }
 
modifiedcrates/remowt-plugin/Cargo.tomldiffbeforeafterboth
--- a/crates/remowt-plugin/Cargo.toml
+++ b/crates/remowt-plugin/Cargo.toml
@@ -11,6 +11,7 @@
 bifrostlink-ports.workspace = true
 bytes.workspace = true
 remowt-link-shared.workspace = true
+serde_json.workspace = true
 tokio = { workspace = true, features = [
 	"rt",
 	"net",
modifiedcrates/remowt-plugin/src/host.rsdiffbeforeafterboth
before · crates/remowt-plugin/src/host.rs
1use std::ffi::OsStr;2use std::io;3use std::process::Stdio;4use std::sync::Mutex;56use bifrostlink::{Port, Rpc, Rtt, WeakRpc};7use bytes::{Bytes, BytesMut};8use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};9use tokio::process::{Child, ChildStdin, ChildStdout, Command};1011use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};12use remowt_link_shared::{Address, BifConfig};1314pub fn serve(rpc: &mut Rpc<BifConfig>) {15	let host = Host {16		rpc: rpc.clone().downgrade(),17		children: Mutex::new(Vec::new()),18	};19	PluginEndpoints(host).register_endpoints(rpc);20}2122struct Host {23	rpc: WeakRpc<BifConfig>,24	children: Mutex<Vec<Child>>,25}2627impl Host {28	fn spawn(&self, id: u16, path: impl AsRef<OsStr>) -> Result<(), Error> {29		let rpc = self.rpc.clone().upgrade().ok_or(Error::Gone)?;3031		let mut child = Command::new(path)32			.arg(id.to_string())33			.stdin(Stdio::piped())34			.stdout(Stdio::piped())35			.kill_on_drop(true)36			.spawn()37			.map_err(|e| Error::Spawn(e.to_string()))?;38		let stdin = child.stdin.take().expect("stdin piped");39		let stdout = child.stdout.take().expect("stdout piped");4041		rpc.add_direct(Address::Plugin(id), child_port(stdout, stdin), Rtt(0));42		self.children.lock().expect("not poisoned").push(child);43		Ok(())44	}45}4647impl PluginHost for Host {48	async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {49		// TODO: Right now loads plugin next to the binary...50		// But with our CA addressed schema, the plugins should be located in content-addressed subdir...51		// Maybe it should just be scrapped in favor of load_plugin_path.52		if name.is_empty() || name == "." || name == ".." || name.contains(['/', '\0']) {53			return Err(Error::BadName);54		}55		let exe = std::env::current_exe().map_err(|e| Error::Spawn(e.to_string()))?;56		let dir = exe57			.parent()58			.ok_or_else(|| Error::Spawn("primary agent has no parent directory".to_owned()))?;59		self.spawn(id, dir.join(&name))60	}6162	async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {63		if path.is_empty() || path.contains('\0') {64			return Err(Error::BadName);65		}66		self.spawn(id, path)67	}68}6970fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {71	Port::new(|mut rx, tx| async move {72		let reader = async move {73			loop {74				let len = match stdout.read_u32().await {75					Ok(len) => len,76					Err(e) => {77						tracing::error!("plugin stdout read failed: {e}");78						break;79					}80				};81				let mut buf = BytesMut::zeroed(len as usize);82				if let Err(e) = stdout.read_exact(&mut buf).await {83					tracing::error!("plugin stdout read failed: {e}");84					break;85				}86				if tx.send(buf.freeze()).is_err() {87					break;88				}89			}90		};91		let writer = async move {92			while let Some(msg) = rx.recv().await {93				if let Err(e) = write_frame(&mut stdin, msg).await {94					tracing::error!("plugin stdin write failed: {e}");95					break;96				}97			}98		};99		tokio::join!(reader, writer);100	})101}102103async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {104	let len = u32::try_from(msg.len())105		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;106	stdin.write_u32(len).await?;107	stdin.write_all(&msg).await?;108	stdin.flush().await?;109	Ok(())110}
after · crates/remowt-plugin/src/host.rs
1use std::ffi::OsStr;2use std::io;3use std::process::Stdio;4use std::sync::Mutex;56use bifrostlink::{Port, Rpc, Rtt, WeakRpc};7use bytes::{Bytes, BytesMut};8use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};9use tokio::process::{Child, ChildStdin, ChildStdout, Command};1011use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};12use remowt_link_shared::{Address, BifConfig};1314pub fn serve(rpc: &mut Rpc<BifConfig>) {15	let host = Host {16		me: rpc.me(),17		rpc: rpc.clone().downgrade(),18		children: Mutex::new(Vec::new()),19	};20	PluginEndpoints(host).register_endpoints(rpc);21}2223struct Host {24	me: Address,25	rpc: WeakRpc<BifConfig>,26	children: Mutex<Vec<Child>>,27}2829impl Host {30	fn spawn(&self, id: u16, path: impl AsRef<OsStr>) -> Result<(), Error> {31		let rpc = self.rpc.clone().upgrade().ok_or(Error::Gone)?;3233		let mut child = Command::new(path)34			.arg(id.to_string())35			.arg(serde_json::to_string(&self.me).expect("address serializes"))36			.stdin(Stdio::piped())37			.stdout(Stdio::piped())38			.kill_on_drop(true)39			.spawn()40			.map_err(|e| Error::Spawn(e.to_string()))?;41		let stdin = child.stdin.take().expect("stdin piped");42		let stdout = child.stdout.take().expect("stdout piped");4344		rpc.add_direct(Address::Plugin(id), child_port(stdout, stdin), Rtt(0));45		self.children.lock().expect("not poisoned").push(child);46		Ok(())47	}48}4950impl PluginHost for Host {51	async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {52		// TODO: Right now loads plugin next to the binary...53		// But with our CA addressed schema, the plugins should be located in content-addressed subdir...54		// Maybe it should just be scrapped in favor of load_plugin_path.55		if name.is_empty() || name == "." || name == ".." || name.contains(['/', '\0']) {56			return Err(Error::BadName);57		}58		let exe = std::env::current_exe().map_err(|e| Error::Spawn(e.to_string()))?;59		let dir = exe60			.parent()61			.ok_or_else(|| Error::Spawn("primary agent has no parent directory".to_owned()))?;62		self.spawn(id, dir.join(&name))63	}6465	async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {66		if path.is_empty() || path.contains('\0') {67			return Err(Error::BadName);68		}69		self.spawn(id, path)70	}71}7273fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {74	Port::new(|mut rx, tx| async move {75		let reader = async move {76			loop {77				let len = match stdout.read_u32().await {78					Ok(len) => len,79					Err(e) => {80						tracing::error!("plugin stdout read failed: {e}");81						break;82					}83				};84				let mut buf = BytesMut::zeroed(len as usize);85				if let Err(e) = stdout.read_exact(&mut buf).await {86					tracing::error!("plugin stdout read failed: {e}");87					break;88				}89				if tx.send(buf.freeze()).is_err() {90					break;91				}92			}93		};94		let writer = async move {95			while let Some(msg) = rx.recv().await {96				if let Err(e) = write_frame(&mut stdin, msg).await {97					tracing::error!("plugin stdin write failed: {e}");98					break;99				}100			}101		};102		tokio::join!(reader, writer);103	})104}105106async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {107	let len = u32::try_from(msg.len())108		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;109	stdin.write_u32(len).await?;110	stdin.write_all(&msg).await?;111	stdin.flush().await?;112	Ok(())113}
modifiedcrates/remowt-plugin/src/lib.rsdiffbeforeafterboth
--- a/crates/remowt-plugin/src/lib.rs
+++ b/crates/remowt-plugin/src/lib.rs
@@ -18,6 +18,13 @@
 		.map_err(|e| anyhow::anyhow!("invalid plugin index {arg:?}: {e}"))
 }
 
+pub fn host_address() -> Result<Address> {
+	let arg = std::env::args()
+		.nth(2)
+		.ok_or_else(|| anyhow::anyhow!("missing host address argument"))?;
+	serde_json::from_str(&arg).map_err(|e| anyhow::anyhow!("invalid host address {arg:?}: {e}"))
+}
+
 pub fn run<F>(register: F) -> Result<()>
 where
 	F: FnOnce(&mut Rpc<BifConfig>),
@@ -27,10 +34,11 @@
 		.init();
 
 	let index = plugin_index()?;
+	let host = host_address()?;
 	let runtime = Builder::new_current_thread().enable_all().build()?;
 	runtime.block_on(async move {
 		let mut rpc = Rpc::<BifConfig>::new(Address::Plugin(index));
-		rpc.add_direct(Address::Agent, from_stdio(), Rtt(0));
+		rpc.add_direct(host, from_stdio(), Rtt(0));
 		register(&mut rpc);
 		let _rpc = rpc;
 		pending::<Result<()>>().await