difftreelog
fix privileged agent plugins
in: trunk
4 files changed
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -15,12 +15,13 @@
use remowt_link_shared::{Address, BifConfig, Fs, Pty, Systemd};
use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};
use remowt_ui_prompt::bifrost::PromptEndpointsClient;
+use remowt_ui_prompt::rofi::RofiPrompter;
use remowt_ui_prompt::{PrependSourcePrompter, Prompter, Source};
use tokio::fs;
use tokio::net::UnixStream;
use tokio::runtime::Builder;
use tokio::task::AbortHandle;
-use tracing::{info, trace};
+use tracing::{debug, info, trace};
use zbus::fdo;
use zbus::zvariant::{OwnedValue, Str};
use zbus::{interface, proxy, Connection};
@@ -39,7 +40,7 @@
}
impl Drop for CancelTaskOnDrop {
fn drop(&mut self) {
- info!("cancel on drop");
+ debug!("cancel on drop");
if let Some(task) = self
.tasks
.lock()
@@ -121,7 +122,7 @@
identities.iter().map(|v| v.to_string()).collect();
let identity_displays: Vec<&str> =
identity_displays.iter().map(|v| v.as_str()).collect();
- info!("choose identity");
+ debug!("choose identity");
let choosen_identity = match identity_displays.len() {
0 => {
return Err(fdo::Error::AuthFailed(
@@ -140,7 +141,7 @@
.await?
}
};
- info!("identity chosen");
+ debug!("identity chosen");
let _ = write!(
description,
@@ -169,7 +170,7 @@
.lock()
.unwrap()
.insert(cookie.clone(), task.abort_handle());
- info!("abort handle stored");
+ debug!("abort handle stored");
let _ = _cancel_guard.set(CancelTaskOnDrop {
tasks: self.tasks.clone(),
handle: cookie.clone(),
@@ -182,9 +183,9 @@
/// CancelAuthentication method
async fn cancel_authentication(&self, cookie: &str) -> zbus::fdo::Result<()> {
- info!("auth cancelled");
+ debug!("auth cancelled");
if let Some(abort) = self.tasks.lock().unwrap().remove(cookie) {
- info!("abort handle found");
+ debug!("abort handle found");
abort.abort();
}
// debug!("Authentication cancled ! {cookie}");
@@ -220,6 +221,7 @@
#[arg(long)]
privileged: bool,
},
+ LocalAgent,
}
fn main() -> anyhow::Result<()> {
@@ -227,6 +229,7 @@
// so anything written there would corrupt the stream.
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
+ .without_time()
.init();
let opts = Opts::parse();
@@ -237,10 +240,21 @@
prompt,
description,
} => runtime.block_on(askpass::ask(&prompt, description)),
+ Opts::LocalAgent => runtime.block_on(main_real()),
Opts::Editor { path } => runtime.block_on(editor::edit(path)),
Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),
}
}
+async fn main_real() -> anyhow::Result<()> {
+ let conn = Connection::system().await?;
+ let helper = SocketHelper {
+ fallback: SuidHelper,
+ };
+ register_auth_agent(&conn, Agent::new(helper, RofiPrompter)).await?;
+
+ let _conn = conn;
+ pending().await
+}
async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {
let address = if privileged {
Address::AgentPrivileged
@@ -330,7 +344,7 @@
proxy
.register_authentication_agent(&subject, "C", OBJ_PATH)
.await?;
- info!(kind = subject.subject_kind, "registered polkit agent");
+ debug!(kind = subject.subject_kind, "registered polkit agent");
Ok(())
}
crates/remowt-plugin/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-plugin/Cargo.toml
+++ b/crates/remowt-plugin/Cargo.toml
@@ -11,6 +11,7 @@
bifrostlink-ports.workspace = true
bytes.workspace = true
remowt-link-shared.workspace = true
+serde_json.workspace = true
tokio = { workspace = true, features = [
"rt",
"net",
crates/remowt-plugin/src/host.rsdiffbeforeafterboth1use std::ffi::OsStr;2use std::io;3use std::process::Stdio;4use std::sync::Mutex;56use bifrostlink::{Port, Rpc, Rtt, WeakRpc};7use bytes::{Bytes, BytesMut};8use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};9use tokio::process::{Child, ChildStdin, ChildStdout, Command};1011use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};12use remowt_link_shared::{Address, BifConfig};1314pub fn serve(rpc: &mut Rpc<BifConfig>) {15 let host = Host {16 rpc: rpc.clone().downgrade(),17 children: Mutex::new(Vec::new()),18 };19 PluginEndpoints(host).register_endpoints(rpc);20}2122struct Host {23 rpc: WeakRpc<BifConfig>,24 children: Mutex<Vec<Child>>,25}2627impl Host {28 fn spawn(&self, id: u16, path: impl AsRef<OsStr>) -> Result<(), Error> {29 let rpc = self.rpc.clone().upgrade().ok_or(Error::Gone)?;3031 let mut child = Command::new(path)32 .arg(id.to_string())33 .stdin(Stdio::piped())34 .stdout(Stdio::piped())35 .kill_on_drop(true)36 .spawn()37 .map_err(|e| Error::Spawn(e.to_string()))?;38 let stdin = child.stdin.take().expect("stdin piped");39 let stdout = child.stdout.take().expect("stdout piped");4041 rpc.add_direct(Address::Plugin(id), child_port(stdout, stdin), Rtt(0));42 self.children.lock().expect("not poisoned").push(child);43 Ok(())44 }45}4647impl PluginHost for Host {48 async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {49 // TODO: Right now loads plugin next to the binary...50 // But with our CA addressed schema, the plugins should be located in content-addressed subdir...51 // Maybe it should just be scrapped in favor of load_plugin_path.52 if name.is_empty() || name == "." || name == ".." || name.contains(['/', '\0']) {53 return Err(Error::BadName);54 }55 let exe = std::env::current_exe().map_err(|e| Error::Spawn(e.to_string()))?;56 let dir = exe57 .parent()58 .ok_or_else(|| Error::Spawn("primary agent has no parent directory".to_owned()))?;59 self.spawn(id, dir.join(&name))60 }6162 async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {63 if path.is_empty() || path.contains('\0') {64 return Err(Error::BadName);65 }66 self.spawn(id, path)67 }68}6970fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {71 Port::new(|mut rx, tx| async move {72 let reader = async move {73 loop {74 let len = match stdout.read_u32().await {75 Ok(len) => len,76 Err(e) => {77 tracing::error!("plugin stdout read failed: {e}");78 break;79 }80 };81 let mut buf = BytesMut::zeroed(len as usize);82 if let Err(e) = stdout.read_exact(&mut buf).await {83 tracing::error!("plugin stdout read failed: {e}");84 break;85 }86 if tx.send(buf.freeze()).is_err() {87 break;88 }89 }90 };91 let writer = async move {92 while let Some(msg) = rx.recv().await {93 if let Err(e) = write_frame(&mut stdin, msg).await {94 tracing::error!("plugin stdin write failed: {e}");95 break;96 }97 }98 };99 tokio::join!(reader, writer);100 })101}102103async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {104 let len = u32::try_from(msg.len())105 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;106 stdin.write_u32(len).await?;107 stdin.write_all(&msg).await?;108 stdin.flush().await?;109 Ok(())110}1use std::ffi::OsStr;2use std::io;3use std::process::Stdio;4use std::sync::Mutex;56use bifrostlink::{Port, Rpc, Rtt, WeakRpc};7use bytes::{Bytes, BytesMut};8use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};9use tokio::process::{Child, ChildStdin, ChildStdout, Command};1011use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};12use remowt_link_shared::{Address, BifConfig};1314pub fn serve(rpc: &mut Rpc<BifConfig>) {15 let host = Host {16 me: rpc.me(),17 rpc: rpc.clone().downgrade(),18 children: Mutex::new(Vec::new()),19 };20 PluginEndpoints(host).register_endpoints(rpc);21}2223struct Host {24 me: Address,25 rpc: WeakRpc<BifConfig>,26 children: Mutex<Vec<Child>>,27}2829impl Host {30 fn spawn(&self, id: u16, path: impl AsRef<OsStr>) -> Result<(), Error> {31 let rpc = self.rpc.clone().upgrade().ok_or(Error::Gone)?;3233 let mut child = Command::new(path)34 .arg(id.to_string())35 .arg(serde_json::to_string(&self.me).expect("address serializes"))36 .stdin(Stdio::piped())37 .stdout(Stdio::piped())38 .kill_on_drop(true)39 .spawn()40 .map_err(|e| Error::Spawn(e.to_string()))?;41 let stdin = child.stdin.take().expect("stdin piped");42 let stdout = child.stdout.take().expect("stdout piped");4344 rpc.add_direct(Address::Plugin(id), child_port(stdout, stdin), Rtt(0));45 self.children.lock().expect("not poisoned").push(child);46 Ok(())47 }48}4950impl PluginHost for Host {51 async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {52 // TODO: Right now loads plugin next to the binary...53 // But with our CA addressed schema, the plugins should be located in content-addressed subdir...54 // Maybe it should just be scrapped in favor of load_plugin_path.55 if name.is_empty() || name == "." || name == ".." || name.contains(['/', '\0']) {56 return Err(Error::BadName);57 }58 let exe = std::env::current_exe().map_err(|e| Error::Spawn(e.to_string()))?;59 let dir = exe60 .parent()61 .ok_or_else(|| Error::Spawn("primary agent has no parent directory".to_owned()))?;62 self.spawn(id, dir.join(&name))63 }6465 async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {66 if path.is_empty() || path.contains('\0') {67 return Err(Error::BadName);68 }69 self.spawn(id, path)70 }71}7273fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {74 Port::new(|mut rx, tx| async move {75 let reader = async move {76 loop {77 let len = match stdout.read_u32().await {78 Ok(len) => len,79 Err(e) => {80 tracing::error!("plugin stdout read failed: {e}");81 break;82 }83 };84 let mut buf = BytesMut::zeroed(len as usize);85 if let Err(e) = stdout.read_exact(&mut buf).await {86 tracing::error!("plugin stdout read failed: {e}");87 break;88 }89 if tx.send(buf.freeze()).is_err() {90 break;91 }92 }93 };94 let writer = async move {95 while let Some(msg) = rx.recv().await {96 if let Err(e) = write_frame(&mut stdin, msg).await {97 tracing::error!("plugin stdin write failed: {e}");98 break;99 }100 }101 };102 tokio::join!(reader, writer);103 })104}105106async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {107 let len = u32::try_from(msg.len())108 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;109 stdin.write_u32(len).await?;110 stdin.write_all(&msg).await?;111 stdin.flush().await?;112 Ok(())113}crates/remowt-plugin/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-plugin/src/lib.rs
+++ b/crates/remowt-plugin/src/lib.rs
@@ -18,6 +18,13 @@
.map_err(|e| anyhow::anyhow!("invalid plugin index {arg:?}: {e}"))
}
+pub fn host_address() -> Result<Address> {
+ let arg = std::env::args()
+ .nth(2)
+ .ok_or_else(|| anyhow::anyhow!("missing host address argument"))?;
+ serde_json::from_str(&arg).map_err(|e| anyhow::anyhow!("invalid host address {arg:?}: {e}"))
+}
+
pub fn run<F>(register: F) -> Result<()>
where
F: FnOnce(&mut Rpc<BifConfig>),
@@ -27,10 +34,11 @@
.init();
let index = plugin_index()?;
+ let host = host_address()?;
let runtime = Builder::new_current_thread().enable_all().build()?;
runtime.block_on(async move {
let mut rpc = Rpc::<BifConfig>::new(Address::Plugin(index));
- rpc.add_direct(Address::Agent, from_stdio(), Rtt(0));
+ rpc.add_direct(host, from_stdio(), Rtt(0));
register(&mut rpc);
let _rpc = rpc;
pending::<Result<()>>().await