difftreelog
feat(client) proper support for local
in: trunk
16 files changed
.gitignorediffbeforeafterboth--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
/target
/.direnv
+/result
+/result-*
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2096,6 +2096,7 @@
"russh-config",
"serde",
"serde_json",
+ "tempfile",
"tokio",
"tracing",
"uuid",
@@ -2131,6 +2132,7 @@
"serde_json",
"thiserror",
"tokio",
+ "tracing",
]
[[package]]
@@ -2140,7 +2142,6 @@
"anyhow",
"bifrostlink",
"bifrostlink-ports",
- "bytes",
"remowt-link-shared",
"serde_json",
"tokio",
cmds/remowt-agent/src/main.rsdiffbeforeafterboth1use std::borrow::Cow;2use std::collections::{BTreeMap, HashMap};3use std::fs::Permissions;4use std::future::pending;5use std::os::unix::fs::PermissionsExt as _;6use std::path::PathBuf;7use std::sync::{Arc, Mutex, OnceLock};89use bifrostlink::declarative::RemoteEndpoints;10use bifrostlink::Rpc;11use bifrostlink_ports::stdio::from_stdio;12use bifrostlink_ports::unix_socket::from_socket;13use clap::Parser;14use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};15use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};16use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};17use remowt_ui_prompt::bifrost::PromptEndpointsClient;18use remowt_ui_prompt::rofi::RofiPrompter;19use remowt_ui_prompt::{PrependSourcePrompter, Prompter, Source};20use tokio::fs;21use tokio::net::UnixStream;22use tokio::runtime::Builder;23use tokio::task::AbortHandle;24use tracing::{debug, info, trace};25use zbus::fdo;26use zbus::zvariant::{OwnedValue, Str};27use zbus::{interface, proxy, Connection};28use zbus_polkit::policykit1::Subject;2930use self::helper::{Helper, SocketHelper, SuidHelper};3132pub mod askpass;33pub mod bus;34pub mod editor;35pub mod helper;3637struct CancelTaskOnDrop {38 tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,39 handle: String,40}41impl Drop for CancelTaskOnDrop {42 fn drop(&mut self) {43 debug!("cancel on drop");44 if let Some(task) = self45 .tasks46 .lock()47 .expect("not poisoned")48 .remove(&self.handle)49 {50 task.abort();51 }52 }53}5455struct Agent<H, P> {56 tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,57 helper: H,58 prompter: P,59}60impl<H, P> Agent<H, P> {61 fn new(helper: H, prompter: P) -> Self {62 Agent {63 tasks: Arc::new(Mutex::new(HashMap::new())),64 helper,65 prompter,66 }67 }68}6970#[interface(name = "org.freedesktop.PolicyKit1.AuthenticationAgent")]71impl<H, P> Agent<H, P>72where73 H: Helper + Clone + Send + Sync + 'static,74 P: Prompter + Clone + Send + Sync + 'static,75{76 /// BeginAuthentication method77 #[allow(clippy::too_many_arguments)]78 async fn begin_authentication(79 &self,80 action_id: String,81 message: String,82 _icon_name: String,83 mut details: BTreeMap<String, String>,84 cookie: String,85 identities: Vec<Identity>,86 ) -> zbus::fdo::Result<()> {87 use std::fmt::Write;88 info!("begin auth");89 let _cancel_guard = Arc::new(OnceLock::new());90 let task = {91 let helper = self.helper.clone();92 let prompter = self.prompter.clone();93 let cookie = cookie.clone();94 let _cancel_guard = _cancel_guard.clone();95 tokio::task::spawn(async move {96 let _cancel_guard = _cancel_guard.clone();97 trace!("conversation task");98 let mut description = format!("{message}\n\n<b>Action id:</b> {action_id}",);99 if let Some(subject) = details.remove("polkit.caller-pid") {100 let _ = write!(description, "\n<b>Caller:</b> ");101 if let Ok(pid) = subject.parse::<u32>() {102 let _ = write!(description, "{}", PidDisplay(pid));103 } else {104 let _ = write!(description, "{}", emphasize("invalid pid"));105 }106 }107 if let Some(subject) = details.remove("polkit.subject-pid") {108 let _ = write!(description, "\n<b>Subject:</b> ");109 if let Ok(pid) = subject.parse::<u32>() {110 let _ = write!(description, "{}", PidDisplay(pid));111 } else {112 let _ = write!(description, "{}", emphasize("invalid pid"));113 }114 }115 let mut prompter = PrependSourcePrompter {116 source: vec![Source(Cow::Borrowed("polkit agent"))],117 description: description.clone(),118 prompter,119 };120121 let identity_displays: Vec<String> =122 identities.iter().map(|v| v.to_string()).collect();123 let identity_displays: Vec<&str> =124 identity_displays.iter().map(|v| v.as_str()).collect();125 debug!("choose identity");126 let choosen_identity = match identity_displays.len() {127 0 => {128 return Err(fdo::Error::AuthFailed(129 "no identity to authenticate as".to_owned(),130 ))131 }132 1 => 0,133 _ => {134 prompter135 .prompt_enum(136 "Identity",137 "Select identity to use for polkit authorization",138 &identity_displays,139 &[],140 )141 .await?142 }143 };144 debug!("identity chosen");145146 let _ = write!(147 description,148 "\n<b>Identity:</b> {}",149 identities[choosen_identity as usize]150 );151 prompter.description = description;152153 prompter.source.push(Source(Cow::Borrowed("polkit daemon")));154155 helper156 .help_me(157 &cookie,158 prompter,159 identities[choosen_identity as usize].clone(),160 )161 .await162 .map_err(|e| fdo::Error::Failed(e.to_string()))?;163 // let connection = Connection::system().await?;164 // let helper = PolkitHelperProxy::new(&connection).await?;165166 Ok(())167 })168 };169 self.tasks170 .lock()171 .unwrap()172 .insert(cookie.clone(), task.abort_handle());173 debug!("abort handle stored");174 let _ = _cancel_guard.set(CancelTaskOnDrop {175 tasks: self.tasks.clone(),176 handle: cookie.clone(),177 });178179 let _ = task.await;180181 Ok(())182 }183184 /// CancelAuthentication method185 async fn cancel_authentication(&self, cookie: &str) -> zbus::fdo::Result<()> {186 debug!("auth cancelled");187 if let Some(abort) = self.tasks.lock().unwrap().remove(cookie) {188 debug!("abort handle found");189 abort.abort();190 }191 // debug!("Authentication cancled ! {cookie}");192 Ok(())193 }194}195196const OBJ_PATH: &str = "/org/freedesktop/PolicyKit1/AuthenticationAgent";197198#[proxy(199 interface = "lach.PolkitHelper",200 default_service = "lach.polkit.helper1",201 default_path = "/lach/PolkitHelper"202)]203trait PolkitHelper {204 fn init_conversation(&self, request: BackendRequest) -> zbus::Result<()>;205}206207#[derive(Parser)]208enum Opts {209 AskPass {210 prompt: String,211 description: String,212 },213 Editor {214 /// Argument to nvim215 path: String,216 },217 RealAgent {218 #[arg(long)]219 path: Option<PathBuf>,220 /// Expect own address to be AgentPrivileged, skip installing polkit agent221 #[arg(long)]222 privileged: bool,223 },224 LocalAgent,225}226227fn main() -> anyhow::Result<()> {228 // Log to stderr: `privileged-agent` uses stdout as the bifrost transport,229 // so anything written there would corrupt the stream.230 tracing_subscriber::fmt()231 .with_writer(std::io::stderr)232 .without_time()233 .init();234 let opts = Opts::parse();235236 let runtime = Builder::new_current_thread().enable_all().build()?;237238 match opts {239 Opts::AskPass {240 prompt,241 description,242 } => runtime.block_on(askpass::ask(&prompt, description)),243 Opts::LocalAgent => runtime.block_on(main_real()),244 Opts::Editor { path } => runtime.block_on(editor::edit(path)),245 Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),246 }247}248async fn main_real() -> anyhow::Result<()> {249 let conn = Connection::system().await?;250 let helper = SocketHelper {251 fallback: SuidHelper,252 };253 register_auth_agent(&conn, Agent::new(helper, RofiPrompter)).await?;254255 let _conn = conn;256 pending().await257}258async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {259 let address = if privileged {260 Address::AgentPrivileged261 } else {262 Address::Agent263 };264 let mut rpc = Rpc::<BifConfig>::new(address);265266 Fs::new().register_endpoints(&mut rpc);267 Systemd.register_endpoints(&mut rpc);268 Pty::new().register_endpoints(&mut rpc);269270 remowt_plugin::host::serve(&mut rpc);271272 let user_prompter = PromptEndpointsClient::wrap(rpc.remote(Address::User));273 let editor_client = EditorEndpointsClient::wrap(rpc.remote(Address::User));274275 let bus = bus::spawn().await?;276 askpass::serve(&bus.conn, user_prompter.clone()).await?;277 editor::serve(&bus.conn, editor_client).await?;278279 let helpers = tempfile::Builder::new().prefix("remowt-path.").tempdir()?;280 let exe = std::env::current_exe()?;281 let askpass_helper = helpers.path().join("remowt-askpass");282 let editor_helper = helpers.path().join("remowt-editor");283 {284 let script = format!(285 "#!/bin/sh\nexec {} ask-pass \"password\" \"$1\"\n",286 sh_quote(&exe.to_string_lossy())287 );288 fs::write(&askpass_helper, script).await?;289 fs::set_permissions(&askpass_helper, Permissions::from_mode(0o755)).await?;290 }291 {292 let script = format!(293 "#!/bin/sh\nexec {} editor \"$1\"\n",294 sh_quote(&exe.to_string_lossy())295 );296 fs::write(&editor_helper, script).await?;297 fs::set_permissions(&editor_helper, Permissions::from_mode(0o755)).await?;298 }299300 // Safety: Hoping tokio own threads won't read any of those...301 unsafe {302 prepend_path(helpers.path());303 std::env::set_var("SUDO_ASKPASS", &askpass_helper);304 std::env::set_var("SSH_ASKPASS", &askpass_helper);305 std::env::set_var("SSH_ASKPASS_REQUIRE", "force");306 std::env::set_var("EDITOR", &editor_helper);307 std::env::set_var("VISUAL", &editor_helper);308 std::env::set_var("DBUS_SESSION_BUS_ADDRESS", &bus.address);309 }310311 let port = match path {312 Some(path) => from_socket(UnixStream::connect(path).await?),313 None => from_stdio(),314 };315 rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));316317 let polkit_conn = if !privileged {318 // The unprivileged agent doubles as a polkit authentication agent so319 // `run0` (e.g. our own elevation) routes its prompt to the User over320 // bifrost instead of failing on a tty-less session.321 let conn = Connection::system().await?;322 let helper = SocketHelper {323 fallback: SuidHelper,324 };325 register_auth_agent(&conn, Agent::new(helper, user_prompter)).await?;326 Some(conn)327 } else {328 None329 };330331 let _keep_alive = (bus, helpers, polkit_conn);332 pending().await333}334335async fn register_auth_agent<H, P>(conn: &Connection, agent: Agent<H, P>) -> anyhow::Result<()>336where337 H: Helper + Clone + Send + Sync + 'static,338 P: Prompter + Clone + Send + Sync + 'static,339{340 let proxy = zbus_polkit::policykit1::AuthorityProxy::new(conn).await?;341 conn.object_server().at(OBJ_PATH, agent).await?;342343 let subject = auth_agent_subject()?;344 proxy345 .register_authentication_agent(&subject, "C", OBJ_PATH)346 .await?;347 debug!(kind = subject.subject_kind, "registered polkit agent");348 Ok(())349}350351fn auth_agent_subject() -> anyhow::Result<Subject> {352 let mut details = HashMap::new();353 if let Ok(session_id) = std::env::var("XDG_SESSION_ID") {354 let val: OwnedValue = Str::from(session_id).into();355 details.insert("session-id".to_string(), val);356 return Ok(Subject {357 subject_kind: "unix-session".to_string(),358 subject_details: details,359 });360 }361362 details.insert("pid".to_string(), OwnedValue::from(std::process::id()));363 Ok(Subject {364 subject_kind: "unix-process".to_string(),365 subject_details: details,366 })367}368369fn sh_quote(s: &str) -> String {370 format!("'{}'", s.replace('\'', "'\\''"))371}372373/// Prepend `dir` to the process `PATH`.374///375/// # SAFETY376///377/// Same as `set_var`378unsafe fn prepend_path(dir: &std::path::Path) {379 let value = match std::env::var_os("PATH") {380 Some(existing) => {381 let mut v = dir.as_os_str().to_owned();382 v.push(":");383 v.push(existing);384 v385 }386 None => dir.as_os_str().to_owned(),387 };388 unsafe {389 std::env::set_var("PATH", value);390 }391}1use std::borrow::Cow;2use std::collections::{BTreeMap, HashMap};3use std::fs::Permissions;4use std::future::pending;5use std::os::unix::fs::PermissionsExt as _;6use std::path::PathBuf;7use std::sync::{Arc, Mutex, OnceLock};89use bifrostlink::declarative::RemoteEndpoints;10use bifrostlink::Rpc;11use bifrostlink_ports::stdio::from_stdio;12use bifrostlink_ports::unix_socket::from_socket;13use clap::Parser;14use remowt_endpoints::{fs::Fs, pty::Pty, systemd::Systemd};15use remowt_link_shared::{editor::EditorEndpointsClient, Address, BifConfig};16use remowt_polkit_shared::{emphasize, BackendRequest, Identity, PidDisplay};17use remowt_ui_prompt::bifrost::PromptEndpointsClient;18use remowt_ui_prompt::rofi::RofiPrompter;19use remowt_ui_prompt::{PrependSourcePrompter, Prompter, Source};20use tokio::fs;21use tokio::net::UnixStream;22use tokio::runtime::Builder;23use tokio::task::AbortHandle;24use tracing::{debug, info, trace};25use zbus::fdo;26use zbus::zvariant::{OwnedValue, Str};27use zbus::{interface, proxy, Connection};28use zbus_polkit::policykit1::Subject;2930use self::helper::{Helper, SocketHelper, SuidHelper};3132pub mod askpass;33pub mod bus;34pub mod editor;35pub mod helper;3637struct CancelTaskOnDrop {38 tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,39 handle: String,40}41impl Drop for CancelTaskOnDrop {42 fn drop(&mut self) {43 debug!("cancel on drop");44 if let Some(task) = self45 .tasks46 .lock()47 .expect("not poisoned")48 .remove(&self.handle)49 {50 task.abort();51 }52 }53}5455struct Agent<H, P> {56 tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,57 helper: H,58 prompter: P,59}60impl<H, P> Agent<H, P> {61 fn new(helper: H, prompter: P) -> Self {62 Agent {63 tasks: Arc::new(Mutex::new(HashMap::new())),64 helper,65 prompter,66 }67 }68}6970#[interface(name = "org.freedesktop.PolicyKit1.AuthenticationAgent")]71impl<H, P> Agent<H, P>72where73 H: Helper + Clone + Send + Sync + 'static,74 P: Prompter + Clone + Send + Sync + 'static,75{76 /// BeginAuthentication method77 #[allow(clippy::too_many_arguments)]78 async fn begin_authentication(79 &self,80 action_id: String,81 message: String,82 _icon_name: String,83 mut details: BTreeMap<String, String>,84 cookie: String,85 identities: Vec<Identity>,86 ) -> zbus::fdo::Result<()> {87 use std::fmt::Write;88 info!("begin auth");89 let _cancel_guard = Arc::new(OnceLock::new());90 let task = {91 let helper = self.helper.clone();92 let prompter = self.prompter.clone();93 let cookie = cookie.clone();94 let _cancel_guard = _cancel_guard.clone();95 tokio::task::spawn(async move {96 let _cancel_guard = _cancel_guard.clone();97 trace!("conversation task");98 let mut description = format!("{message}\n\n<b>Action id:</b> {action_id}",);99 if let Some(subject) = details.remove("polkit.caller-pid") {100 let _ = write!(description, "\n<b>Caller:</b> ");101 if let Ok(pid) = subject.parse::<u32>() {102 let _ = write!(description, "{}", PidDisplay(pid));103 } else {104 let _ = write!(description, "{}", emphasize("invalid pid"));105 }106 }107 if let Some(subject) = details.remove("polkit.subject-pid") {108 let _ = write!(description, "\n<b>Subject:</b> ");109 if let Ok(pid) = subject.parse::<u32>() {110 let _ = write!(description, "{}", PidDisplay(pid));111 } else {112 let _ = write!(description, "{}", emphasize("invalid pid"));113 }114 }115 let mut prompter = PrependSourcePrompter {116 source: vec![Source(Cow::Borrowed("polkit agent"))],117 description: description.clone(),118 prompter,119 };120121 let identity_displays: Vec<String> =122 identities.iter().map(|v| v.to_string()).collect();123 let identity_displays: Vec<&str> =124 identity_displays.iter().map(|v| v.as_str()).collect();125 debug!("choose identity");126 let choosen_identity = match identity_displays.len() {127 0 => {128 return Err(fdo::Error::AuthFailed(129 "no identity to authenticate as".to_owned(),130 ))131 }132 1 => 0,133 _ => {134 prompter135 .prompt_enum(136 "Identity",137 "Select identity to use for polkit authorization",138 &identity_displays,139 &[],140 )141 .await?142 }143 };144 debug!("identity chosen");145146 let _ = write!(147 description,148 "\n<b>Identity:</b> {}",149 identities[choosen_identity as usize]150 );151 prompter.description = description;152153 prompter.source.push(Source(Cow::Borrowed("polkit daemon")));154155 helper156 .help_me(157 &cookie,158 prompter,159 identities[choosen_identity as usize].clone(),160 )161 .await162 .map_err(|e| fdo::Error::Failed(e.to_string()))?;163 // let connection = Connection::system().await?;164 // let helper = PolkitHelperProxy::new(&connection).await?;165166 Ok(())167 })168 };169 self.tasks170 .lock()171 .unwrap()172 .insert(cookie.clone(), task.abort_handle());173 debug!("abort handle stored");174 let _ = _cancel_guard.set(CancelTaskOnDrop {175 tasks: self.tasks.clone(),176 handle: cookie.clone(),177 });178179 let _ = task.await;180181 Ok(())182 }183184 /// CancelAuthentication method185 async fn cancel_authentication(&self, cookie: &str) -> zbus::fdo::Result<()> {186 debug!("auth cancelled");187 if let Some(abort) = self.tasks.lock().unwrap().remove(cookie) {188 debug!("abort handle found");189 abort.abort();190 }191 // debug!("Authentication cancled ! {cookie}");192 Ok(())193 }194}195196const OBJ_PATH: &str = "/org/freedesktop/PolicyKit1/AuthenticationAgent";197198#[proxy(199 interface = "lach.PolkitHelper",200 default_service = "lach.polkit.helper1",201 default_path = "/lach/PolkitHelper"202)]203trait PolkitHelper {204 fn init_conversation(&self, request: BackendRequest) -> zbus::Result<()>;205}206207#[derive(Parser)]208enum Opts {209 AskPass {210 prompt: String,211 description: String,212 },213 Editor {214 /// Argument to nvim215 path: String,216 },217 RealAgent {218 #[arg(long)]219 path: Option<PathBuf>,220 /// Expect own address to be AgentPrivileged, skip installing polkit agent221 #[arg(long)]222 privileged: bool,223 },224 LocalAgent,225}226227fn main() -> anyhow::Result<()> {228 tracing_subscriber::fmt()229 .with_writer(std::io::stderr)230 .without_time()231 .init();232 let opts = Opts::parse();233234 let runtime = Builder::new_current_thread().enable_all().build()?;235236 match opts {237 Opts::AskPass {238 prompt,239 description,240 } => runtime.block_on(askpass::ask(&prompt, description)),241 Opts::LocalAgent => runtime.block_on(main_real()),242 Opts::Editor { path } => runtime.block_on(editor::edit(path)),243 Opts::RealAgent { path, privileged } => runtime.block_on(main_real_agent(path, privileged)),244 }245}246async fn main_real() -> anyhow::Result<()> {247 let conn = Connection::system().await?;248 let helper = SocketHelper {249 fallback: SuidHelper,250 };251 register_auth_agent(&conn, Agent::new(helper, RofiPrompter)).await?;252253 let _conn = conn;254 pending().await255}256async fn main_real_agent(path: Option<PathBuf>, privileged: bool) -> anyhow::Result<()> {257 let address = if privileged {258 Address::AgentPrivileged259 } else {260 Address::Agent261 };262 let mut rpc = Rpc::<BifConfig>::new(address);263264 Fs::new().register_endpoints(&mut rpc);265 Systemd.register_endpoints(&mut rpc);266 Pty::new().register_endpoints(&mut rpc);267268 remowt_plugin::host::serve(&mut rpc);269270 let user_prompter = PromptEndpointsClient::wrap(rpc.remote(Address::User));271 let editor_client = EditorEndpointsClient::wrap(rpc.remote(Address::User));272273 let bus = bus::spawn().await?;274 askpass::serve(&bus.conn, user_prompter.clone()).await?;275 editor::serve(&bus.conn, editor_client).await?;276277 let helpers = tempfile::Builder::new().prefix("remowt-path.").tempdir()?;278 let exe = std::env::current_exe()?;279 let askpass_helper = helpers.path().join("remowt-askpass");280 let editor_helper = helpers.path().join("remowt-editor");281 {282 let script = format!(283 "#!/bin/sh\nexec {} ask-pass \"password\" \"$1\"\n",284 sh_quote(&exe.to_string_lossy())285 );286 fs::write(&askpass_helper, script).await?;287 fs::set_permissions(&askpass_helper, Permissions::from_mode(0o755)).await?;288 }289 {290 let script = format!(291 "#!/bin/sh\nexec {} editor \"$1\"\n",292 sh_quote(&exe.to_string_lossy())293 );294 fs::write(&editor_helper, script).await?;295 fs::set_permissions(&editor_helper, Permissions::from_mode(0o755)).await?;296 }297298 // Safety: Hoping tokio own threads won't read any of those...299 unsafe {300 prepend_path(helpers.path());301 std::env::set_var("SUDO_ASKPASS", &askpass_helper);302 std::env::set_var("SSH_ASKPASS", &askpass_helper);303 std::env::set_var("SSH_ASKPASS_REQUIRE", "force");304 std::env::set_var("EDITOR", &editor_helper);305 std::env::set_var("VISUAL", &editor_helper);306 std::env::set_var("DBUS_SESSION_BUS_ADDRESS", &bus.address);307 }308309 let port = match path {310 Some(path) => from_socket(UnixStream::connect(path).await?),311 None => from_stdio(),312 };313 rpc.add_direct(Address::User, port, bifrostlink::Rtt(0));314315 let polkit_conn = if !privileged {316 // The unprivileged agent doubles as a polkit authentication agent so317 // `run0` (e.g. our own elevation) routes its prompt to the User over318 // bifrost instead of failing on a tty-less session.319 let conn = Connection::system().await?;320 let helper = SocketHelper {321 fallback: SuidHelper,322 };323 register_auth_agent(&conn, Agent::new(helper, user_prompter)).await?;324 Some(conn)325 } else {326 None327 };328329 let _keep_alive = (bus, helpers, polkit_conn);330 pending().await331}332333async fn register_auth_agent<H, P>(conn: &Connection, agent: Agent<H, P>) -> anyhow::Result<()>334where335 H: Helper + Clone + Send + Sync + 'static,336 P: Prompter + Clone + Send + Sync + 'static,337{338 let proxy = zbus_polkit::policykit1::AuthorityProxy::new(conn).await?;339 conn.object_server().at(OBJ_PATH, agent).await?;340341 let subject = auth_agent_subject()?;342 proxy343 .register_authentication_agent(&subject, "C", OBJ_PATH)344 .await?;345 debug!(kind = subject.subject_kind, "registered polkit agent");346 Ok(())347}348349fn auth_agent_subject() -> anyhow::Result<Subject> {350 let mut details = HashMap::new();351 if let Ok(session_id) = std::env::var("XDG_SESSION_ID") {352 let val: OwnedValue = Str::from(session_id).into();353 details.insert("session-id".to_string(), val);354 return Ok(Subject {355 subject_kind: "unix-session".to_string(),356 subject_details: details,357 });358 }359360 details.insert("pid".to_string(), OwnedValue::from(std::process::id()));361 Ok(Subject {362 subject_kind: "unix-process".to_string(),363 subject_details: details,364 })365}366367fn sh_quote(s: &str) -> String {368 format!("'{}'", s.replace('\'', "'\\''"))369}370371/// Prepend `dir` to the process `PATH`.372///373/// # SAFETY374///375/// Same as `set_var`376unsafe fn prepend_path(dir: &std::path::Path) {377 let value = match std::env::var_os("PATH") {378 Some(existing) => {379 let mut v = dir.as_os_str().to_owned();380 v.push(":");381 v.push(existing);382 v383 }384 None => dir.as_os_str().to_owned(),385 };386 unsafe {387 std::env::set_var("PATH", value);388 }389}cmds/remowt-ssh/Cargo.tomldiffbeforeafterboth--- a/cmds/remowt-ssh/Cargo.toml
+++ b/cmds/remowt-ssh/Cargo.toml
@@ -11,8 +11,15 @@
tracing-subscriber.workspace = true
bifrostlink.workspace = true
remowt-link-shared.workspace = true
-remowt-client.workspace = true
-tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }
+remowt-client = { workspace = true, features = ["shell"] }
+tokio = { workspace = true, features = [
+ "macros",
+ "fs",
+ "net",
+ "io-util",
+ "rt",
+ "signal",
+] }
nix = { workspace = true, features = ["term"] }
anyhow.workspace = true
bifrostlink-ports.workspace = true
cmds/remowt-ssh/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -19,11 +19,14 @@
use tokio::io::unix::AsyncFd;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::signal::unix::{signal, SignalKind};
-use tracing::info;
+use tracing::debug;
#[derive(Parser)]
-struct Opts {
- host: String,
+enum Opts {
+ /// Connect to remote host with remowt agent.
+ Ssh { host: String },
+ /// Connect to local host for testing the connectivity.
+ Local,
}
fn agents_dir() -> anyhow::Result<PathBuf> {
@@ -35,18 +38,27 @@
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
- tracing_subscriber::fmt::init();
+ tracing_subscriber::fmt()
+ .with_writer(std::io::stderr)
+ .without_time()
+ .init();
let opts = Opts::parse();
let bundle = AgentBundle::from_dir(agents_dir()?)?;
- let conn = Remowt::connect(&opts.host, &bundle).await?;
+ let conn = match &opts {
+ Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,
+ Opts::Local => Remowt::connect_local(&bundle).await?,
+ };
let mut rpc = conn.rpc();
serve_prompts(
&mut rpc,
PrependSourcePrompter {
prompter: RofiPrompter,
- source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],
+ source: match opts {
+ Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
+ Opts::Local => vec![],
+ },
description: "".to_owned(),
},
);
@@ -54,9 +66,9 @@
serve_editor(&mut rpc, SshEditor { sess });
}
- info!("entering shell");
+ debug!("entering shell");
run_shell(&conn).await?;
- info!("shell ended");
+ debug!("shell ended");
Ok(())
}
crates/remowt-client/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -16,7 +16,18 @@
remowt-link-shared.workspace = true
russh.workspace = true
russh-config.workspace = true
-tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }
+tempfile.workspace = true
+tokio = { workspace = true, features = [
+ "net",
+ "io-util",
+ "rt",
+ "sync",
+ "macros",
+ "process",
+] }
tracing.workspace = true
uuid = { workspace = true, features = ["v4"] }
-remowt-endpoints.workspace = true
+remowt-endpoints = { workspace = true, optional = true }
+
+[features]
+shell = ["dep:remowt-endpoints"]
crates/remowt-client/src/forwarded.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/forwarded.rs
@@ -0,0 +1,79 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Result};
+use camino::Utf8PathBuf;
+use russh::client::Msg;
+use russh::{Channel, ChannelStream};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::net::{UnixListener, UnixStream};
+use tokio::sync::oneshot;
+
+pub enum RemowtListener {
+ Ssh(oneshot::Receiver<Channel<Msg>>),
+ Local(UnixListener, Utf8PathBuf),
+}
+
+impl RemowtListener {
+ pub async fn accept(self) -> Result<RemowtStream> {
+ match self {
+ RemowtListener::Ssh(rx) => {
+ let ch = rx
+ .await
+ .map_err(|_| anyhow!("agent never connected the forwarded socket"))?;
+ Ok(RemowtStream::Ssh(ch.into_stream()))
+ }
+ RemowtListener::Local(listener, path) => {
+ let (stream, _) = listener.accept().await?;
+ let _ = std::fs::remove_file(&path);
+ Ok(RemowtStream::Local(stream))
+ }
+ }
+ }
+}
+
+pub enum RemowtStream {
+ Ssh(ChannelStream<Msg>),
+ Local(UnixStream),
+}
+
+impl AsyncRead for RemowtStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_read(cx, buf),
+ RemowtStream::Local(s) => Pin::new(s).poll_read(cx, buf),
+ }
+ }
+}
+
+impl AsyncWrite for RemowtStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_write(cx, buf),
+ RemowtStream::Local(s) => Pin::new(s).poll_write(cx, buf),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_flush(cx),
+ RemowtStream::Local(s) => Pin::new(s).poll_flush(cx),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_shutdown(cx),
+ RemowtStream::Local(s) => Pin::new(s).poll_shutdown(cx),
+ }
+ }
+}
crates/remowt-client/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -1,61 +1,53 @@
use std::collections::HashMap;
+use std::env;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
-use std::{env, io};
use anyhow::{anyhow, bail, ensure, Context as _, Result};
use bifrostlink::declarative::RemoteEndpoints;
-use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};
+use bifrostlink::{Remote, Rpc, Rtt};
use bifrostlink_ports::unix_socket::from_socket;
-use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
-use remowt_endpoints::{
- fs::Fs,
- pty::{Pty, PtyClient, ShellId},
- systemd::Systemd,
-};
use remowt_link_shared::plugin::PluginEndpointsClient;
-use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator};
+use remowt_link_shared::port::child_port;
+use remowt_link_shared::{Address, BifConfig};
use russh::client::{connect, Config, Handle, Handler, Msg, Session};
use russh::keys::agent::client::AgentClient;
use russh::keys::agent::AgentIdentity;
use russh::keys::check_known_hosts;
use russh::keys::ssh_key::PublicKey;
-use russh::{Channel, ChannelMsg, ChannelStream};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};
-use tokio::join;
+use russh::Channel;
+use tempfile::TempDir;
use tokio::net::UnixListener;
-use tokio::sync::mpsc;
use tokio::sync::oneshot::{self, channel};
-use tracing::error;
+use tokio::{
+ fs,
+ io::{AsyncReadExt as _, AsyncWriteExt as _},
+};
+use tracing::{debug, error};
use uuid::Uuid;
+use self::port::channel_port;
+use self::subprocess::RemowtChild;
+
pub mod editor;
+mod forwarded;
+mod port;
+#[cfg(feature = "shell")]
+mod shell;
+mod subprocess;
-type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
+pub use forwarded::{RemowtListener, RemowtStream};
+#[cfg(feature = "shell")]
+pub use shell::{RemowtShell, RemowtShellResizer};
-async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
- let len = srx.read_u32().await?;
- let mut buf = BytesMut::zeroed(len as usize);
- srx.read_exact(&mut buf).await?;
- Ok(buf)
-}
-async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
- stx.write_u32(value.len().try_into().expect("can't be larger"))
- .await?;
- stx.write_all(&value).await?;
- Ok(())
-}
+type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
fn sh_quote(s: impl AsRef<str>) -> String {
format!("'{}'", s.as_ref().replace('\'', "'\\''"))
}
-const ESCALATORS: [(&str, &[&str]); 3] = [
- ("run0", &["--background=", "--pipe"]),
- ("sudo", &[]),
- ("doas", &[]),
-];
+const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];
pub struct AgentBundle {
dir: PathBuf,
@@ -90,26 +82,36 @@
fn binary(&self, arch: &str) -> PathBuf {
self.dir.join(format!("remowt-agent-{arch}"))
}
+
+ fn local_binary(&self) -> Result<PathBuf> {
+ let arch = env::consts::ARCH;
+ let path = self.binary(arch);
+ ensure!(
+ path.is_file(),
+ "no local remowt-agent build for arch {arch} in bundle {}",
+ self.dir.display()
+ );
+ Ok(path)
+ }
}
async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {
- let mut ch = sess.channel_open_session().await?;
+ let ch = sess.channel_open_session().await?;
ch.exec(true, cmd).await?;
+
+ let mut child = RemowtChild::from_exec(ch);
+ drop(child.stdin);
+
let mut out = Vec::new();
- let mut code = None;
- while let Some(msg) = ch.wait().await {
- match msg {
- ChannelMsg::Data { data } => out.extend(data.as_ref()),
- ChannelMsg::ExtendedData { data, .. } => {
- error!(
- "remote stderr: {}",
- String::from_utf8_lossy(data.as_ref()).trim()
- );
- }
- ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
- _ => {}
- }
+ let mut err = Vec::new();
+ tokio::try_join!(
+ child.stdout.read_to_end(&mut out),
+ child.stderr.read_to_end(&mut err),
+ )?;
+ if !err.is_empty() {
+ error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());
}
+ let code = child.exit.await.ok().flatten();
Ok((code, out))
}
@@ -119,21 +121,27 @@
code == Some(0),
"remote command failed (exit {code:?}): {cmd}"
);
- ensure!(out.ends_with(b"\n"));
- out.pop();
+ if !out.is_empty() {
+ ensure!(
+ out.ends_with(b"\n"),
+ "remote command was not newline-terminated: {cmd}: {out:?}"
+ );
+ out.pop();
+ }
String::from_utf8(out).context("expected utf8 output for command")
}
async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {
+ debug!("uname -a");
let arch = run_string_ok(sess, "uname -m").await?;
let hash = bundle
.hashes
.get(&arch)
.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;
+ debug!("get dir");
let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")
.await?
- .trim()
.to_owned();
let dir = if cache.is_empty() {
let home = run_string_ok(sess, "echo \"$HOME\"").await?;
@@ -141,17 +149,21 @@
!home.is_empty(),
"remote $HOME and $XDG_CACHE_HOME both empty"
);
- Utf8PathBuf::from(home).join("cache/remowt")
+ Utf8PathBuf::from(home).join(".cache/remowt")
} else {
Utf8PathBuf::from(cache).join("remowt")
};
let path = dir.join(hash);
+ debug!("presence");
let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;
if present != Some(0) {
let bin = bundle.binary(&arch);
- let bytes = std::fs::read(&bin)
+ debug!("read");
+ let bytes = fs::read(&bin)
+ .await
.with_context(|| format!("reading agent binary {}", bin.display()))?;
+ debug!("upload");
upload_agent(sess, &dir, &path, bytes).await?;
}
Ok(path)
@@ -163,29 +175,29 @@
path: &Utf8Path,
bytes: Vec<u8>,
) -> Result<()> {
+ debug!("mkdirp");
run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;
- let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));
+ let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));
let ch = sess.channel_open_session().await?;
+ debug!("cat");
ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;
- ch.data_bytes(bytes).await?;
- ch.eof().await?;
- let mut ch = ch;
- let mut code = None;
- while let Some(msg) = ch.wait().await {
- match msg {
- ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
- ChannelMsg::ExtendedData { data, .. } => {
- error!(
- "agent upload: {}",
- String::from_utf8_lossy(data.as_ref()).trim()
- );
- }
- _ => {}
- }
- }
+
+ let mut child = RemowtChild::from_exec(ch);
+ child
+ .stdin
+ .write_all(&bytes)
+ .await
+ .context("sending agent binary")?;
+ child
+ .stdin
+ .shutdown()
+ .await
+ .context("sending agent binary")?;
+ let code = child.wait().await;
ensure!(code == Some(0), "agent upload failed (exit {code:?})");
+ debug!("chmod");
run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;
run_string_ok(
sess,
@@ -205,7 +217,7 @@
return Ok((tool, flags));
}
}
- bail!("no escalation tool (run0/sudo/doas) found on remote")
+ bail!("no escalation tool found on remote")
}
fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {
@@ -226,36 +238,6 @@
env::split_paths(&path)
.map(|dir| dir.join(name))
.find(|p| p.is_file())
-}
-
-fn port_from_channel(ch: Channel<Msg>) -> Port {
- Port::new(move |mut rx, tx| async move {
- let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
- let srx_task = async move {
- loop {
- match read(&mut srx).await {
- Ok(buf) => {
- if tx.send(buf.freeze()).is_err() {
- break;
- }
- }
- Err(e) => {
- error!("channel read failed: {e}");
- break;
- }
- }
- }
- };
- let stx_task = async move {
- while let Some(value) = rx.recv().await {
- if let Err(e) = write(&mut stx, value).await {
- error!("channel write failed: {e}");
- break;
- }
- }
- };
- join!(srx_task, stx_task);
- })
}
pub struct SshHandler {
@@ -286,56 +268,20 @@
return Err(russh::Error::WrongChannel);
};
let _ = ch.send(channel);
- Ok(())
- }
-}
-
-struct SshElevator {
- sess: Arc<Handle<SshHandler>>,
- rpc: WeakRpc<BifConfig>,
- agent_path: Utf8PathBuf,
-}
-impl Elevator for SshElevator {
- async fn elevate(&self) -> Result<(), ElevateError> {
- let fail = |e: String| ElevateError::Failed(e);
- let (tool, flags) = detect_escalation(&self.sess)
- .await
- .map_err(|e| fail(e.to_string()))?;
- let ch = self
- .sess
- .channel_open_session()
- .await
- .map_err(|e| fail(e.to_string()))?;
- ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))
- .await
- .map_err(|e| fail(e.to_string()))?;
- let rpc = self
- .rpc
- .clone()
- .upgrade()
- .ok_or_else(|| fail("rpc is gone".to_owned()))?;
- rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));
Ok(())
}
}
-pub struct RemoteChild {
- pub stdout: DuplexStream,
- pub stderr: DuplexStream,
- pub exit: oneshot::Receiver<Option<u32>>,
-}
-
enum Transport {
Ssh {
sess: Arc<Handle<SshHandler>>,
subs: Subs,
- remote_dir: Utf8PathBuf,
+ runtime_dir: Utf8PathBuf,
agent_path: Utf8PathBuf,
},
Local {
- #[allow(dead_code)]
- agent: Rpc<BifConfig>,
- agent_path: String,
+ agent_path: PathBuf,
+ runtime_dir: Utf8PathBuf,
},
}
@@ -344,44 +290,11 @@
rpc: Rpc<BifConfig>,
elevated: tokio::sync::OnceCell<()>,
children: Mutex<Vec<tokio::process::Child>>,
+ _runtime_tmp: Option<TempDir>,
}
pub type RemowtRemote = Remote<BifConfig>;
-fn loopback() -> (Port, Port) {
- let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();
- let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();
- let user = Port::new(move |mut rx, tx| async move {
- loop {
- tokio::select! {
- msg = rx.recv() => match msg {
- Some(msg) => if a2b_tx.send(msg).is_err() { break },
- None => break,
- },
- msg = b2a_rx.recv() => match msg {
- Some(msg) => if tx.send(msg).is_err() { break },
- None => break,
- },
- }
- }
- });
- let agent = Port::new(move |mut rx, tx| async move {
- loop {
- tokio::select! {
- msg = rx.recv() => match msg {
- Some(msg) => if b2a_tx.send(msg).is_err() { break },
- None => break,
- },
- msg = a2b_rx.recv() => match msg {
- Some(msg) => if tx.send(msg).is_err() { break },
- None => break,
- },
- }
- }
- });
- (user, agent)
-}
-
impl Remowt {
pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {
let conf = russh_config::parse_home(host)?;
@@ -426,13 +339,14 @@
}
ensure!(authenticated, "ssh authentication failed");
- // All remaining session ops take `&self`; share the handle.
let sess = Arc::new(sess);
+ debug!("deploying agent");
let agent_path = deploy_agent(&sess, bundle).await?;
- let remote_dir = remote_mktemp(&sess).await?;
- let primary = remote_dir.join("primary.sock");
+ debug!("runtime dir");
+ let runtime_dir = remote_runtime_dir(&sess).await?;
+ let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));
let (onetx, onerx) = channel();
subs.lock().expect("lock").insert(primary.clone(), onetx);
@@ -442,6 +356,7 @@
// TODO: ensure no injection is possible in the socket path.
let cmd_chan = sess.channel_open_session().await?;
+ debug!("starting agent");
cmd_chan
.exec(
true,
@@ -453,7 +368,7 @@
)
.await?;
- let port = port_from_channel(
+ let port = channel_port(
onerx
.await
.map_err(|_| anyhow!("agent never opened its channel"))?,
@@ -464,36 +379,42 @@
transport: Transport::Ssh {
sess,
subs,
- remote_dir,
+ runtime_dir,
agent_path,
},
rpc,
elevated: tokio::sync::OnceCell::new(),
children: Mutex::new(Vec::new()),
+ _runtime_tmp: None,
})
}
- pub async fn connect_local(agent_path: &str) -> Result<Self> {
- let (port_user, port_agent) = loopback();
+ pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {
+ let agent_path = bundle.local_binary()?;
+ let mut child = tokio::process::Command::new(&agent_path)
+ .arg("real-agent")
+ .stdin(std::process::Stdio::piped())
+ .stdout(std::process::Stdio::piped())
+ .kill_on_drop(true)
+ .spawn()
+ .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;
+ let stdin = child.stdin.take().expect("stdin piped");
+ let stdout = child.stdout.take().expect("stdout piped");
+
let rpc = Rpc::<BifConfig>::new(Address::User);
- let mut agent = Rpc::<BifConfig>::new(Address::Agent);
+ rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));
- // Register handlers before wiring up the link (see the agent binary).
- Fs::new().register_endpoints(&mut agent);
- Systemd.register_endpoints(&mut agent);
- Pty::new().register_endpoints(&mut agent);
+ let (runtime_dir, runtime_tmp) = local_runtime_dir()?;
- agent.add_direct(Address::User, port_agent, Rtt(0));
- rpc.add_direct(Address::Agent, port_user, Rtt(0));
-
Ok(Self {
transport: Transport::Local {
- agent,
- agent_path: agent_path.to_owned(),
+ agent_path,
+ runtime_dir,
},
rpc,
elevated: tokio::sync::OnceCell::new(),
- children: Mutex::new(Vec::new()),
+ children: Mutex::new(vec![child]),
+ _runtime_tmp: runtime_tmp,
})
}
@@ -547,24 +468,25 @@
let ch = sess.channel_open_session().await?;
ch.exec(true, privileged_cmd(tool, flags, agent_path, None))
.await?;
- port_from_channel(ch)
+ channel_port(ch)
}
Transport::Local { agent_path, .. } => {
- let sock = env::temp_dir()
- .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));
+ let sock = self
+ .runtime_dir()
+ .join(format!("remowt-priv-{}.sock", Uuid::new_v4()));
let _ = std::fs::remove_file(&sock);
let listener = UnixListener::bind(&sock)?;
let (tool, flags) = ESCALATORS
.iter()
.find(|(t, _)| find_in_path(t).is_some())
- .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;
+ .ok_or_else(|| anyhow!("no escalation tool found"))?;
let child = tokio::process::Command::new(tool)
.args(*flags)
.arg(agent_path)
.arg("real-agent")
.arg("--privileged")
.arg("--path")
- .arg(sock.to_str().expect("temp path is utf-8"))
+ .arg(sock.as_str())
.kill_on_drop(true)
.spawn()?;
self.children.lock().expect("lock").push(child);
@@ -580,138 +502,63 @@
Ok(())
}
- pub async fn exec(&self, command: String) -> Result<RemoteChild> {
+ pub async fn exec(&self, command: String) -> Result<RemowtChild> {
let Some(sess) = self.ssh() else {
bail!("exec should not be called on local")
};
let ch = sess.channel_open_session().await?;
ch.exec(true, command).await?;
-
- let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);
- let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);
- let (exit_tx, exit) = oneshot::channel();
-
- tokio::spawn(async move {
- let mut ch = ch;
- let mut code = None;
- while let Some(msg) = ch.wait().await {
- match msg {
- ChannelMsg::Data { data } => {
- if out_w.write_all(&data).await.is_err() {
- break;
- }
- }
- ChannelMsg::ExtendedData { data, .. } => {
- if err_w.write_all(&data).await.is_err() {
- break;
- }
- }
- ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
- _ => {}
- }
- }
- let _ = out_w.shutdown().await;
- let _ = err_w.shutdown().await;
- let _ = exit_tx.send(code);
- });
-
- Ok(RemoteChild {
- stdout,
- stderr,
- exit,
- })
+ Ok(RemowtChild::from_exec(ch))
}
- pub fn serve_elevate(&self) -> Result<()> {
- let Transport::Ssh {
- sess, agent_path, ..
- } = &self.transport
- else {
- bail!("elevate should not be called on local")
- };
- let mut rpc = self.rpc.clone();
- ElevateEndpoints(SshElevator {
- sess: sess.clone(),
- rpc: self.rpc.clone().downgrade(),
- agent_path: agent_path.to_owned(),
- })
- .register_endpoints(&mut rpc);
- Ok(())
+ fn runtime_dir(&self) -> Utf8PathBuf {
+ match &self.transport {
+ Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),
+ Transport::Local { runtime_dir, .. } => runtime_dir.clone(),
+ }
}
- pub fn remote_dir(&self) -> Option<&Utf8Path> {
+ pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {
match &self.transport {
- Transport::Ssh { remote_dir, .. } => Some(remote_dir),
- Transport::Local { .. } => None,
+ Transport::Ssh { sess, subs, .. } => {
+ let (tx, rx) = oneshot::channel();
+ subs.lock().expect("lock").insert(path.to_owned(), tx);
+ sess.streamlocal_forward(path.to_owned()).await?;
+ Ok(RemowtListener::Ssh(rx))
+ }
+ Transport::Local { .. } => {
+ let _ = std::fs::remove_file(path);
+ Ok(RemowtListener::Local(
+ UnixListener::bind(path)?,
+ path.to_owned(),
+ ))
+ }
}
- }
-
- pub async fn forward_socket(
- &self,
- remote_path: &Utf8Path,
- ) -> Result<oneshot::Receiver<Channel<Msg>>> {
- let Transport::Ssh { sess, subs, .. } = &self.transport else {
- bail!("forward_socket should not be called on local")
- };
- let (tx, rx) = oneshot::channel();
- subs.lock()
- .expect("lock")
- .insert(remote_path.to_owned(), tx);
- sess.streamlocal_forward(remote_path.to_owned()).await?;
- Ok(rx)
- }
-
- pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {
- let Transport::Ssh { remote_dir, .. } = &self.transport else {
- bail!("open_shell should not be called on local")
- };
- let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));
-
- let rx = self.forward_socket(&sock).await?;
- let client: PtyClient<BifConfig> = self.endpoints();
- let id = client
- .open_shell(sock, term.to_owned(), cols, rows)
- .await?
- .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
- let ch = rx
- .await
- .map_err(|_| anyhow!("agent never connected the shell socket"))?;
-
- Ok(Shell {
- id,
- stream: ch.into_stream(),
- remote: self.rpc.remote(Address::Agent),
- })
}
}
-pub struct Shell {
- pub id: ShellId,
- pub stream: ChannelStream<Msg>,
- remote: Remote<BifConfig>,
-}
-
-impl Shell {
- pub fn resizer(&self) -> ShellResizer {
- ShellResizer {
- remote: self.remote.clone(),
- id: self.id,
+fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
+ if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {
+ if !dir.is_empty() {
+ return Ok((Utf8PathBuf::from(dir), None));
}
}
-}
-
-#[derive(Clone)]
-pub struct ShellResizer {
- remote: Remote<BifConfig>,
- id: ShellId,
+ let tmp = tempfile::Builder::new()
+ .prefix("remowt.")
+ .rand_bytes(12)
+ .tempdir()?;
+ let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())
+ .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;
+ Ok((dir, Some(tmp)))
}
-impl ShellResizer {
- pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
- PtyClient::wrap(self.remote.clone())
- .resize(self.id, cols, rows)
- .await?
- .map_err(|e| anyhow!("failed to resize remote shell: {e}"))
+async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {
+ let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;
+ let dir = dir.trim();
+ if dir.is_empty() {
+ remote_mktemp(sess).await
+ } else {
+ Ok(Utf8PathBuf::from(dir))
}
}
crates/remowt-client/src/port.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/port.rs
@@ -0,0 +1,52 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use russh::{Channel, ChannelStream};
+use russh::client::Msg;
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};
+use tokio::join;
+use tracing::error;
+
+async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
+ let len = srx.read_u32().await?;
+ let mut buf = BytesMut::zeroed(len as usize);
+ srx.read_exact(&mut buf).await?;
+ Ok(buf)
+}
+async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
+ stx.write_u32(value.len().try_into().expect("can't be larger"))
+ .await?;
+ stx.write_all(&value).await?;
+ Ok(())
+}
+
+pub fn channel_port(ch: Channel<Msg>) -> Port {
+ Port::new(move |mut rx, tx| async move {
+ let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
+ let srx_task = async move {
+ loop {
+ match read(&mut srx).await {
+ Ok(buf) => {
+ if tx.send(buf.freeze()).is_err() {
+ break;
+ }
+ }
+ Err(e) => {
+ error!("channel read failed: {e}");
+ break;
+ }
+ }
+ }
+ };
+ let stx_task = async move {
+ while let Some(value) = rx.recv().await {
+ if let Err(e) = write(&mut stx, value).await {
+ error!("channel write failed: {e}");
+ break;
+ }
+ }
+ };
+ join!(srx_task, stx_task);
+ })
+}
crates/remowt-client/src/shell.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/shell.rs
@@ -0,0 +1,60 @@
+use anyhow::{anyhow, Result};
+use bifrostlink::declarative::RemoteEndpoints as _;
+use bifrostlink::Remote;
+use remowt_endpoints::pty::{PtyClient, ShellId};
+use remowt_link_shared::{Address, BifConfig};
+use uuid::Uuid;
+
+use crate::forwarded::RemowtStream;
+use crate::Remowt;
+
+pub struct RemowtShell {
+ pub id: ShellId,
+ pub stream: RemowtStream,
+ remote: Remote<BifConfig>,
+}
+impl RemowtShell {
+ pub fn resizer(&self) -> RemowtShellResizer {
+ RemowtShellResizer {
+ remote: self.remote.clone(),
+ id: self.id,
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct RemowtShellResizer {
+ remote: Remote<BifConfig>,
+ id: ShellId,
+}
+
+impl RemowtShellResizer {
+ pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
+ PtyClient::wrap(self.remote.clone())
+ .resize(self.id, cols, rows)
+ .await?
+ .map_err(|e| anyhow!("failed to resize remote shell: {e}"))
+ }
+}
+
+impl Remowt {
+ pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {
+ let sock = self
+ .runtime_dir()
+ .join(format!("remowt-shell-{}.sock", Uuid::new_v4()));
+
+ let forwarded = self.forward_socket(&sock).await?;
+ let client: PtyClient<BifConfig> = self.endpoints();
+ let id = client
+ .open_shell(sock, term.to_owned(), cols, rows)
+ .await?
+ .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
+ let stream = forwarded.accept().await?;
+
+ Ok(RemowtShell {
+ id,
+ stream,
+ remote: self.rpc.remote(Address::Agent),
+ })
+ }
+}
crates/remowt-client/src/subprocess.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/subprocess.rs
@@ -0,0 +1,84 @@
+use bytes::Bytes;
+use russh::client::Msg;
+use russh::{Channel, ChannelMsg};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
+use tokio::sync::oneshot;
+
+const BUF: usize = 64 * 1024;
+
+pub struct RemowtChild {
+ pub stdin: DuplexStream,
+ pub stdout: DuplexStream,
+ pub stderr: DuplexStream,
+ pub exit: oneshot::Receiver<Option<u32>>,
+}
+
+impl RemowtChild {
+ /// Manage channel returned by russh exec().
+ pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
+ let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
+ let (mut out_w, stdout) = tokio::io::duplex(BUF);
+ let (mut err_w, stderr) = tokio::io::duplex(BUF);
+ let (exit_tx, exit) = oneshot::channel();
+
+ tokio::spawn(async move {
+ let (mut read, write) = ch.split();
+
+ // Forward our stdin to the channel, signalling EOF when it closes.
+ let stdin_pump = tokio::spawn(async move {
+ let mut buf = vec![0u8; BUF];
+ loop {
+ match stdin_r.read(&mut buf).await {
+ Ok(0) | Err(_) => break,
+ Ok(n) => {
+ if write
+ .data_bytes(Bytes::copy_from_slice(&buf[..n]))
+ .await
+ .is_err()
+ {
+ return;
+ }
+ }
+ }
+ }
+ let _ = write.eof().await;
+ });
+
+ let mut code = None;
+ while let Some(msg) = read.wait().await {
+ match msg {
+ ChannelMsg::Data { data } => {
+ if out_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExtendedData { data, .. } => {
+ if err_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
+ _ => {}
+ }
+ }
+
+ // The process is gone; stop waiting on stdin we'll never forward.
+ stdin_pump.abort();
+ let _ = out_w.shutdown().await;
+ let _ = err_w.shutdown().await;
+ let _ = exit_tx.send(code);
+ });
+
+ RemowtChild {
+ stdin,
+ stdout,
+ stderr,
+ exit,
+ }
+ }
+
+ /// Wait for the process to finish, returning its exit status.
+ pub async fn wait(self) -> Option<u32> {
+ self.exit.await.ok().flatten()
+ }
+}
crates/remowt-link-shared/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-link-shared/Cargo.toml
+++ b/crates/remowt-link-shared/Cargo.toml
@@ -11,6 +11,7 @@
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
-tokio = { workspace = true, features = ["fs"] }
+tokio = { workspace = true, features = ["fs", "io-util", "macros"] }
+tracing.workspace = true
remowt-ui-prompt.workspace = true
camino = { workspace = true, features = ["serde1"] }
crates/remowt-link-shared/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -1,14 +1,12 @@
-use std::future::Future;
-
-use bifrostlink::declarative::endpoints;
use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};
use bifrostlink::notification;
use bifrostlink::packet::OpaquePacketWrapper;
-use bifrostlink::{AddressT, Config};
+use bifrostlink::AddressT;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
pub mod editor;
+pub mod port;
#[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]
pub enum Address {
@@ -20,26 +18,6 @@
impl AddressT for Address {}
pub mod plugin;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum ElevateError {
- #[error("elevation failed: {0}")]
- Failed(String),
-}
-
-pub trait Elevator: Send + Sync {
- fn elevate(&self) -> impl Future<Output = Result<(), ElevateError>> + Send;
-}
-
-pub struct ElevateEndpoints<E>(pub E);
-
-#[endpoints(ns = 3)]
-impl<E: Elevator + 'static> ElevateEndpoints<E> {
- #[endpoints(id = 1)]
- async fn elevate(&self) -> Result<(), ElevateError> {
- self.0.elevate().await
- }
-}
#[derive(thiserror::Error, Debug)]
pub enum Error {
crates/remowt-link-shared/src/port.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-link-shared/src/port.rs
@@ -0,0 +1,54 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+
+/// Wire a length-prefixed duplex byte stream (e.g. a child process's
+/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
+/// length followed by that many payload bytes.
+pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
+where
+ R: AsyncRead + Unpin + Send + 'static,
+ W: AsyncWrite + Unpin + Send + 'static,
+{
+ Port::new(|mut rx, tx| async move {
+ let read_task = async move {
+ loop {
+ let len = match reader.read_u32().await {
+ Ok(len) => len,
+ Err(e) => {
+ tracing::error!("child read failed: {e}");
+ break;
+ }
+ };
+ let mut buf = BytesMut::zeroed(len as usize);
+ if let Err(e) = reader.read_exact(&mut buf).await {
+ tracing::error!("child read failed: {e}");
+ break;
+ }
+ if tx.send(buf.freeze()).is_err() {
+ break;
+ }
+ }
+ };
+ let write_task = async move {
+ while let Some(msg) = rx.recv().await {
+ if let Err(e) = write_frame(&mut writer, msg).await {
+ tracing::error!("child write failed: {e}");
+ break;
+ }
+ }
+ };
+ tokio::join!(read_task, write_task);
+ })
+}
+
+async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
+ let len = u32::try_from(msg.len())
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
+ writer.write_u32(len).await?;
+ writer.write_all(&msg).await?;
+ writer.flush().await?;
+ Ok(())
+}
crates/remowt-plugin/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-plugin/Cargo.toml
+++ b/crates/remowt-plugin/Cargo.toml
@@ -9,7 +9,6 @@
anyhow.workspace = true
bifrostlink.workspace = true
bifrostlink-ports.workspace = true
-bytes.workspace = true
remowt-link-shared.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = [
crates/remowt-plugin/src/host.rsdiffbeforeafterboth--- a/crates/remowt-plugin/src/host.rs
+++ b/crates/remowt-plugin/src/host.rs
@@ -1,14 +1,12 @@
use std::ffi::OsStr;
-use std::io;
use std::process::Stdio;
use std::sync::Mutex;
-use bifrostlink::{Port, Rpc, Rtt, WeakRpc};
-use bytes::{Bytes, BytesMut};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
-use tokio::process::{Child, ChildStdin, ChildStdout, Command};
+use bifrostlink::{Rpc, Rtt, WeakRpc};
+use tokio::process::{Child, Command};
use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};
+use remowt_link_shared::port::child_port;
use remowt_link_shared::{Address, BifConfig};
pub fn serve(rpc: &mut Rpc<BifConfig>) {
@@ -68,46 +66,4 @@
}
self.spawn(id, path)
}
-}
-
-fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {
- Port::new(|mut rx, tx| async move {
- let reader = async move {
- loop {
- let len = match stdout.read_u32().await {
- Ok(len) => len,
- Err(e) => {
- tracing::error!("plugin stdout read failed: {e}");
- break;
- }
- };
- let mut buf = BytesMut::zeroed(len as usize);
- if let Err(e) = stdout.read_exact(&mut buf).await {
- tracing::error!("plugin stdout read failed: {e}");
- break;
- }
- if tx.send(buf.freeze()).is_err() {
- break;
- }
- }
- };
- let writer = async move {
- while let Some(msg) = rx.recv().await {
- if let Err(e) = write_frame(&mut stdin, msg).await {
- tracing::error!("plugin stdin write failed: {e}");
- break;
- }
- }
- };
- tokio::join!(reader, writer);
- })
-}
-
-async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {
- let len = u32::try_from(msg.len())
- .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
- stdin.write_u32(len).await?;
- stdin.write_all(&msg).await?;
- stdin.flush().await?;
- Ok(())
}