difftreelog
feat(client) proper support for local
in: trunk
16 files changed
.gitignorediffbeforeafterboth1/target1/target2/.direnv2/.direnv3/result4/result-*35Cargo.lockdiffbeforeafterboth2096 "russh-config",2096 "russh-config",2097 "serde",2097 "serde",2098 "serde_json",2098 "serde_json",2099 "tempfile",2099 "tokio",2100 "tokio",2100 "tracing",2101 "tracing",2101 "uuid",2102 "uuid",2131 "serde_json",2132 "serde_json",2132 "thiserror",2133 "thiserror",2133 "tokio",2134 "tokio",2135 "tracing",2134]2136]213521372136[[package]]2138[[package]]2140 "anyhow",2142 "anyhow",2141 "bifrostlink",2143 "bifrostlink",2142 "bifrostlink-ports",2144 "bifrostlink-ports",2143 "bytes",2144 "remowt-link-shared",2145 "remowt-link-shared",2145 "serde_json",2146 "serde_json",2146 "tokio",2147 "tokio",cmds/remowt-agent/src/main.rsdiffbeforeafterboth225}225}226226227fn main() -> anyhow::Result<()> {227fn main() -> anyhow::Result<()> {228 // Log to stderr: `privileged-agent` uses stdout as the bifrost transport,229 // so anything written there would corrupt the stream.230 tracing_subscriber::fmt()228 tracing_subscriber::fmt()231 .with_writer(std::io::stderr)229 .with_writer(std::io::stderr)232 .without_time()230 .without_time()cmds/remowt-ssh/Cargo.tomldiffbeforeafterboth11tracing-subscriber.workspace = true11tracing-subscriber.workspace = true12bifrostlink.workspace = true12bifrostlink.workspace = true13remowt-link-shared.workspace = true13remowt-link-shared.workspace = true14remowt-client.workspace = true14remowt-client = { workspace = true, features = ["shell"] }15tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }15tokio = { workspace = true, features = [16 "macros",17 "fs",18 "net",19 "io-util",20 "rt",21 "signal",22] }16nix = { workspace = true, features = ["term"] }23nix = { workspace = true, features = ["term"] }17anyhow.workspace = true24anyhow.workspace = truecmds/remowt-ssh/src/main.rsdiffbeforeafterboth19use tokio::io::unix::AsyncFd;19use tokio::io::unix::AsyncFd;20use tokio::io::{AsyncRead, ReadBuf};20use tokio::io::{AsyncRead, ReadBuf};21use tokio::signal::unix::{signal, SignalKind};21use tokio::signal::unix::{signal, SignalKind};22use tracing::info;22use tracing::debug;232324#[derive(Parser)]24#[derive(Parser)]25struct Opts {25enum Opts {26 /// Connect to remote host with remowt agent.26 host: String,27 Ssh { host: String },27}28 /// Connect to local host for testing the connectivity.29 Local,30}283129fn agents_dir() -> anyhow::Result<PathBuf> {32fn agents_dir() -> anyhow::Result<PathBuf> {353836#[tokio::main(flavor = "current_thread")]39#[tokio::main(flavor = "current_thread")]37async fn main() -> anyhow::Result<()> {40async fn main() -> anyhow::Result<()> {38 tracing_subscriber::fmt::init();41 tracing_subscriber::fmt()42 .with_writer(std::io::stderr)43 .without_time()44 .init();39 let opts = Opts::parse();45 let opts = Opts::parse();404641 let bundle = AgentBundle::from_dir(agents_dir()?)?;47 let bundle = AgentBundle::from_dir(agents_dir()?)?;42 let conn = Remowt::connect(&opts.host, &bundle).await?;48 let conn = match &opts {49 Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,50 Opts::Local => Remowt::connect_local(&bundle).await?,51 };43 let mut rpc = conn.rpc();52 let mut rpc = conn.rpc();445345 serve_prompts(54 serve_prompts(46 &mut rpc,55 &mut rpc,47 PrependSourcePrompter {56 PrependSourcePrompter {48 prompter: RofiPrompter,57 prompter: RofiPrompter,49 source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],58 source: match opts {59 Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],60 Opts::Local => vec![],61 },50 description: "".to_owned(),62 description: "".to_owned(),51 },63 },52 );64 );53 if let Some(sess) = conn.ssh() {65 if let Some(sess) = conn.ssh() {54 serve_editor(&mut rpc, SshEditor { sess });66 serve_editor(&mut rpc, SshEditor { sess });55 }67 }566857 info!("entering shell");69 debug!("entering shell");58 run_shell(&conn).await?;70 run_shell(&conn).await?;59 info!("shell ended");71 debug!("shell ended");607261 Ok(())73 Ok(())62}74}crates/remowt-client/Cargo.tomldiffbeforeafterboth16remowt-link-shared.workspace = true16remowt-link-shared.workspace = true17russh.workspace = true17russh.workspace = true18russh-config.workspace = true18russh-config.workspace = true19tempfile.workspace = true19tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }20tokio = { workspace = true, features = [21 "net",22 "io-util",23 "rt",24 "sync",25 "macros",26 "process",27] }20tracing.workspace = true28tracing.workspace = true21uuid = { workspace = true, features = ["v4"] }29uuid = { workspace = true, features = ["v4"] }22remowt-endpoints.workspace = true30remowt-endpoints = { workspace = true, optional = true }3132[features]33shell = ["dep:remowt-endpoints"]2334crates/remowt-client/src/forwarded.rsdiffbeforeafterbothno changes
crates/remowt-client/src/lib.rsdiffbeforeafterboth1use std::collections::HashMap;1use std::collections::HashMap;2use std::env;2use std::path::PathBuf;3use std::path::PathBuf;3use std::sync::{Arc, Mutex};4use std::sync::{Arc, Mutex};4use std::{env, io};556use anyhow::{anyhow, bail, ensure, Context as _, Result};6use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};8use bifrostlink::{Remote, Rpc, Rtt};9use bifrostlink_ports::unix_socket::from_socket;9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};10use camino::{Utf8Path, Utf8PathBuf};12use remowt_endpoints::{13 fs::Fs,14 pty::{Pty, PtyClient, ShellId},15 systemd::Systemd,16};17use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::plugin::PluginEndpointsClient;18use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator};12use remowt_link_shared::port::child_port;13use remowt_link_shared::{Address, BifConfig};19use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::client::{connect, Config, Handle, Handler, Msg, Session};20use russh::keys::agent::client::AgentClient;15use russh::keys::agent::client::AgentClient;21use russh::keys::agent::AgentIdentity;16use russh::keys::agent::AgentIdentity;22use russh::keys::check_known_hosts;17use russh::keys::check_known_hosts;23use russh::keys::ssh_key::PublicKey;18use russh::keys::ssh_key::PublicKey;24use russh::{Channel, ChannelMsg, ChannelStream};19use russh::Channel;25use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};26use tokio::join;20use tempfile::TempDir;27use tokio::net::UnixListener;21use tokio::net::UnixListener;28use tokio::sync::mpsc;29use tokio::sync::oneshot::{self, channel};22use tokio::sync::oneshot::{self, channel};30use tracing::error;23use tokio::{24 fs,25 io::{AsyncReadExt as _, AsyncWriteExt as _},26};27use tracing::{debug, error};31use uuid::Uuid;28use uuid::Uuid;322930use self::port::channel_port;31use self::subprocess::RemowtChild;3233pub mod editor;33pub mod editor;34mod forwarded;35mod port;36#[cfg(feature = "shell")]37mod shell;38mod subprocess;343940pub use forwarded::{RemowtListener, RemowtStream};41#[cfg(feature = "shell")]42pub use shell::{RemowtShell, RemowtShellResizer};4335type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;44type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;364537async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {38 let len = srx.read_u32().await?;39 let mut buf = BytesMut::zeroed(len as usize);40 srx.read_exact(&mut buf).await?;41 Ok(buf)42}43async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {44 stx.write_u32(value.len().try_into().expect("can't be larger"))45 .await?;46 stx.write_all(&value).await?;47 Ok(())48}4950fn sh_quote(s: impl AsRef<str>) -> String {46fn sh_quote(s: impl AsRef<str>) -> String {51 format!("'{}'", s.as_ref().replace('\'', "'\\''"))47 format!("'{}'", s.as_ref().replace('\'', "'\\''"))52}48}534954const ESCALATORS: [(&str, &[&str]); 3] = [50const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];55 ("run0", &["--background=", "--pipe"]),56 ("sudo", &[]),57 ("doas", &[]),58];595160pub struct AgentBundle {52pub struct AgentBundle {61 dir: PathBuf,53 dir: PathBuf,90 fn binary(&self, arch: &str) -> PathBuf {82 fn binary(&self, arch: &str) -> PathBuf {91 self.dir.join(format!("remowt-agent-{arch}"))83 self.dir.join(format!("remowt-agent-{arch}"))92 }84 }8586 fn local_binary(&self) -> Result<PathBuf> {87 let arch = env::consts::ARCH;88 let path = self.binary(arch);89 ensure!(90 path.is_file(),91 "no local remowt-agent build for arch {arch} in bundle {}",92 self.dir.display()93 );94 Ok(path)95 }93}96}949795async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {98async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {96 let mut ch = sess.channel_open_session().await?;99 let ch = sess.channel_open_session().await?;97 ch.exec(true, cmd).await?;100 ch.exec(true, cmd).await?;101102 let mut child = RemowtChild::from_exec(ch);103 drop(child.stdin);10498 let mut out = Vec::new();105 let mut out = Vec::new();99 let mut code = None;106 let mut err = Vec::new();100 while let Some(msg) = ch.wait().await {107 tokio::try_join!(101 match msg {108 child.stdout.read_to_end(&mut out),102 ChannelMsg::Data { data } => out.extend(data.as_ref()),109 child.stderr.read_to_end(&mut err),103 ChannelMsg::ExtendedData { data, .. } => {110 )?;111 if !err.is_empty() {104 error!(112 error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());105 "remote stderr: {}",106 String::from_utf8_lossy(data.as_ref()).trim()107 );108 }109 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),110 _ => {}111 }112 }113 }114 let code = child.exit.await.ok().flatten();113 Ok((code, out))115 Ok((code, out))114}116}115117119 code == Some(0),121 code == Some(0),120 "remote command failed (exit {code:?}): {cmd}"122 "remote command failed (exit {code:?}): {cmd}"121 );123 );122 ensure!(out.ends_with(b"\n"));124 if !out.is_empty() {125 ensure!(126 out.ends_with(b"\n"),127 "remote command was not newline-terminated: {cmd}: {out:?}"128 );123 out.pop();129 out.pop();130 }124 String::from_utf8(out).context("expected utf8 output for command")131 String::from_utf8(out).context("expected utf8 output for command")125}132}126133127async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {134async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {135 debug!("uname -a");128 let arch = run_string_ok(sess, "uname -m").await?;136 let arch = run_string_ok(sess, "uname -m").await?;129 let hash = bundle137 let hash = bundle130 .hashes138 .hashes131 .get(&arch)139 .get(&arch)132 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;140 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133141142 debug!("get dir");134 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")143 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")135 .await?144 .await?136 .trim()137 .to_owned();145 .to_owned();138 let dir = if cache.is_empty() {146 let dir = if cache.is_empty() {139 let home = run_string_ok(sess, "echo \"$HOME\"").await?;147 let home = run_string_ok(sess, "echo \"$HOME\"").await?;140 ensure!(148 ensure!(141 !home.is_empty(),149 !home.is_empty(),142 "remote $HOME and $XDG_CACHE_HOME both empty"150 "remote $HOME and $XDG_CACHE_HOME both empty"143 );151 );144 Utf8PathBuf::from(home).join("cache/remowt")152 Utf8PathBuf::from(home).join(".cache/remowt")145 } else {153 } else {146 Utf8PathBuf::from(cache).join("remowt")154 Utf8PathBuf::from(cache).join("remowt")147 };155 };148 let path = dir.join(hash);156 let path = dir.join(hash);149157158 debug!("presence");150 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;159 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;151 if present != Some(0) {160 if present != Some(0) {152 let bin = bundle.binary(&arch);161 let bin = bundle.binary(&arch);153 let bytes = std::fs::read(&bin)162 debug!("read");163 let bytes = fs::read(&bin)164 .await154 .with_context(|| format!("reading agent binary {}", bin.display()))?;165 .with_context(|| format!("reading agent binary {}", bin.display()))?;166 debug!("upload");155 upload_agent(sess, &dir, &path, bytes).await?;167 upload_agent(sess, &dir, &path, bytes).await?;156 }168 }157 Ok(path)169 Ok(path)163 path: &Utf8Path,175 path: &Utf8Path,164 bytes: Vec<u8>,176 bytes: Vec<u8>,165) -> Result<()> {177) -> Result<()> {178 debug!("mkdirp");166 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;179 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;167180168 let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));181 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));169 let ch = sess.channel_open_session().await?;182 let ch = sess.channel_open_session().await?;183 debug!("cat");170 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;184 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;171 ch.data_bytes(bytes).await?;185172 ch.eof().await?;186 let mut child = RemowtChild::from_exec(ch);173 let mut ch = ch;174 let mut code = None;187 child175 while let Some(msg) = ch.wait().await {188 .stdin176 match msg {189 .write_all(&bytes)177 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),178 ChannelMsg::ExtendedData { data, .. } => {190 .await179 error!(191 .context("sending agent binary")?;180 "agent upload: {}",181 String::from_utf8_lossy(data.as_ref()).trim()192 child193 .stdin194 .shutdown()182 );195 .await196 .context("sending agent binary")?;183 }197 let code = child.wait().await;184 _ => {}185 }186 }187 ensure!(code == Some(0), "agent upload failed (exit {code:?})");198 ensure!(code == Some(0), "agent upload failed (exit {code:?})");188199200 debug!("chmod");189 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;201 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;190 run_string_ok(202 run_string_ok(191 sess,203 sess,205 return Ok((tool, flags));217 return Ok((tool, flags));206 }218 }207 }219 }208 bail!("no escalation tool (run0/sudo/doas) found on remote")220 bail!("no escalation tool found on remote")209}221}210222211fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {223fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {228 .find(|p| p.is_file())240 .find(|p| p.is_file())229}241}230242231fn port_from_channel(ch: Channel<Msg>) -> Port {232 Port::new(move |mut rx, tx| async move {233 let (mut srx, mut stx) = tokio::io::split(ch.into_stream());234 let srx_task = async move {235 loop {236 match read(&mut srx).await {237 Ok(buf) => {238 if tx.send(buf.freeze()).is_err() {239 break;240 }241 }242 Err(e) => {243 error!("channel read failed: {e}");244 break;245 }246 }247 }248 };249 let stx_task = async move {250 while let Some(value) = rx.recv().await {251 if let Err(e) = write(&mut stx, value).await {252 error!("channel write failed: {e}");253 break;254 }255 }256 };257 join!(srx_task, stx_task);258 })259}260261pub struct SshHandler {243pub struct SshHandler {262 host: String,244 host: String,263 port: u16,245 port: u16,290 }272 }291}273}292274293struct SshElevator {294 sess: Arc<Handle<SshHandler>>,295 rpc: WeakRpc<BifConfig>,296 agent_path: Utf8PathBuf,297}298impl Elevator for SshElevator {299 async fn elevate(&self) -> Result<(), ElevateError> {300 let fail = |e: String| ElevateError::Failed(e);301 let (tool, flags) = detect_escalation(&self.sess)302 .await303 .map_err(|e| fail(e.to_string()))?;304 let ch = self305 .sess306 .channel_open_session()307 .await308 .map_err(|e| fail(e.to_string()))?;309 ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))310 .await311 .map_err(|e| fail(e.to_string()))?;312 let rpc = self313 .rpc314 .clone()315 .upgrade()316 .ok_or_else(|| fail("rpc is gone".to_owned()))?;317 rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));318 Ok(())319 }320}321322pub struct RemoteChild {323 pub stdout: DuplexStream,324 pub stderr: DuplexStream,325 pub exit: oneshot::Receiver<Option<u32>>,326}327328enum Transport {275enum Transport {329 Ssh {276 Ssh {330 sess: Arc<Handle<SshHandler>>,277 sess: Arc<Handle<SshHandler>>,331 subs: Subs,278 subs: Subs,332 remote_dir: Utf8PathBuf,279 runtime_dir: Utf8PathBuf,333 agent_path: Utf8PathBuf,280 agent_path: Utf8PathBuf,334 },281 },335 Local {282 Local {336 #[allow(dead_code)]283 agent_path: PathBuf,337 agent: Rpc<BifConfig>,338 agent_path: String,284 runtime_dir: Utf8PathBuf,339 },285 },340}286}341287344 rpc: Rpc<BifConfig>,290 rpc: Rpc<BifConfig>,345 elevated: tokio::sync::OnceCell<()>,291 elevated: tokio::sync::OnceCell<()>,346 children: Mutex<Vec<tokio::process::Child>>,292 children: Mutex<Vec<tokio::process::Child>>,293 _runtime_tmp: Option<TempDir>,347}294}348295349pub type RemowtRemote = Remote<BifConfig>;296pub type RemowtRemote = Remote<BifConfig>;350297351fn loopback() -> (Port, Port) {352 let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();353 let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();354 let user = Port::new(move |mut rx, tx| async move {355 loop {356 tokio::select! {357 msg = rx.recv() => match msg {358 Some(msg) => if a2b_tx.send(msg).is_err() { break },359 None => break,360 },361 msg = b2a_rx.recv() => match msg {362 Some(msg) => if tx.send(msg).is_err() { break },363 None => break,364 },365 }366 }367 });368 let agent = Port::new(move |mut rx, tx| async move {369 loop {370 tokio::select! {371 msg = rx.recv() => match msg {372 Some(msg) => if b2a_tx.send(msg).is_err() { break },373 None => break,374 },375 msg = a2b_rx.recv() => match msg {376 Some(msg) => if tx.send(msg).is_err() { break },377 None => break,378 },379 }380 }381 });382 (user, agent)383}384385impl Remowt {298impl Remowt {386 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {299 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {387 let conf = russh_config::parse_home(host)?;300 let conf = russh_config::parse_home(host)?;426 }339 }427 ensure!(authenticated, "ssh authentication failed");340 ensure!(authenticated, "ssh authentication failed");428341429 // All remaining session ops take `&self`; share the handle.430 let sess = Arc::new(sess);342 let sess = Arc::new(sess);431343344 debug!("deploying agent");432 let agent_path = deploy_agent(&sess, bundle).await?;345 let agent_path = deploy_agent(&sess, bundle).await?;433346434 let remote_dir = remote_mktemp(&sess).await?;347 debug!("runtime dir");348 let runtime_dir = remote_runtime_dir(&sess).await?;435 let primary = remote_dir.join("primary.sock");349 let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));436350437 let (onetx, onerx) = channel();351 let (onetx, onerx) = channel();438 subs.lock().expect("lock").insert(primary.clone(), onetx);352 subs.lock().expect("lock").insert(primary.clone(), onetx);442356443 // TODO: ensure no injection is possible in the socket path.357 // TODO: ensure no injection is possible in the socket path.444 let cmd_chan = sess.channel_open_session().await?;358 let cmd_chan = sess.channel_open_session().await?;359 debug!("starting agent");445 cmd_chan360 cmd_chan446 .exec(361 .exec(447 true,362 true,453 )368 )454 .await?;369 .await?;455370456 let port = port_from_channel(371 let port = channel_port(457 onerx372 onerx458 .await373 .await459 .map_err(|_| anyhow!("agent never opened its channel"))?,374 .map_err(|_| anyhow!("agent never opened its channel"))?,464 transport: Transport::Ssh {379 transport: Transport::Ssh {465 sess,380 sess,466 subs,381 subs,467 remote_dir,382 runtime_dir,468 agent_path,383 agent_path,469 },384 },470 rpc,385 rpc,471 elevated: tokio::sync::OnceCell::new(),386 elevated: tokio::sync::OnceCell::new(),472 children: Mutex::new(Vec::new()),387 children: Mutex::new(Vec::new()),388 _runtime_tmp: None,473 })389 })474 }390 }475391476 pub async fn connect_local(agent_path: &str) -> Result<Self> {392 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {477 let (port_user, port_agent) = loopback();393 let agent_path = bundle.local_binary()?;394 let mut child = tokio::process::Command::new(&agent_path)395 .arg("real-agent")396 .stdin(std::process::Stdio::piped())397 .stdout(std::process::Stdio::piped())398 .kill_on_drop(true)399 .spawn()400 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;401 let stdin = child.stdin.take().expect("stdin piped");402 let stdout = child.stdout.take().expect("stdout piped");403478 let rpc = Rpc::<BifConfig>::new(Address::User);404 let rpc = Rpc::<BifConfig>::new(Address::User);479 let mut agent = Rpc::<BifConfig>::new(Address::Agent);405 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));480406481 // Register handlers before wiring up the link (see the agent binary).407 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;482 Fs::new().register_endpoints(&mut agent);483 Systemd.register_endpoints(&mut agent);484 Pty::new().register_endpoints(&mut agent);485408486 agent.add_direct(Address::User, port_agent, Rtt(0));487 rpc.add_direct(Address::Agent, port_user, Rtt(0));488489 Ok(Self {409 Ok(Self {490 transport: Transport::Local {410 transport: Transport::Local {491 agent,411 agent_path,492 agent_path: agent_path.to_owned(),412 runtime_dir,493 },413 },494 rpc,414 rpc,495 elevated: tokio::sync::OnceCell::new(),415 elevated: tokio::sync::OnceCell::new(),496 children: Mutex::new(Vec::new()),416 children: Mutex::new(vec![child]),417 _runtime_tmp: runtime_tmp,497 })418 })498 }419 }499420547 let ch = sess.channel_open_session().await?;468 let ch = sess.channel_open_session().await?;548 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))469 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))549 .await?;470 .await?;550 port_from_channel(ch)471 channel_port(ch)551 }472 }552 Transport::Local { agent_path, .. } => {473 Transport::Local { agent_path, .. } => {553 let sock = env::temp_dir()474 let sock = self475 .runtime_dir()554 .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));476 .join(format!("remowt-priv-{}.sock", Uuid::new_v4()));555 let _ = std::fs::remove_file(&sock);477 let _ = std::fs::remove_file(&sock);556 let listener = UnixListener::bind(&sock)?;478 let listener = UnixListener::bind(&sock)?;557 let (tool, flags) = ESCALATORS479 let (tool, flags) = ESCALATORS558 .iter()480 .iter()559 .find(|(t, _)| find_in_path(t).is_some())481 .find(|(t, _)| find_in_path(t).is_some())560 .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;482 .ok_or_else(|| anyhow!("no escalation tool found"))?;561 let child = tokio::process::Command::new(tool)483 let child = tokio::process::Command::new(tool)562 .args(*flags)484 .args(*flags)563 .arg(agent_path)485 .arg(agent_path)564 .arg("real-agent")486 .arg("real-agent")565 .arg("--privileged")487 .arg("--privileged")566 .arg("--path")488 .arg("--path")567 .arg(sock.to_str().expect("temp path is utf-8"))489 .arg(sock.as_str())568 .kill_on_drop(true)490 .kill_on_drop(true)569 .spawn()?;491 .spawn()?;570 self.children.lock().expect("lock").push(child);492 self.children.lock().expect("lock").push(child);580 Ok(())502 Ok(())581 }503 }582504583 pub async fn exec(&self, command: String) -> Result<RemoteChild> {505 pub async fn exec(&self, command: String) -> Result<RemowtChild> {584 let Some(sess) = self.ssh() else {506 let Some(sess) = self.ssh() else {585 bail!("exec should not be called on local")507 bail!("exec should not be called on local")586 };508 };587 let ch = sess.channel_open_session().await?;509 let ch = sess.channel_open_session().await?;588 ch.exec(true, command).await?;510 ch.exec(true, command).await?;589590 let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);511 Ok(RemowtChild::from_exec(ch))591 let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);592 let (exit_tx, exit) = oneshot::channel();593594 tokio::spawn(async move {595 let mut ch = ch;596 let mut code = None;597 while let Some(msg) = ch.wait().await {598 match msg {599 ChannelMsg::Data { data } => {600 if out_w.write_all(&data).await.is_err() {601 break;602 }603 }604 ChannelMsg::ExtendedData { data, .. } => {605 if err_w.write_all(&data).await.is_err() {606 break;607 }608 }609 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),610 _ => {}611 }612 }613 let _ = out_w.shutdown().await;614 let _ = err_w.shutdown().await;615 let _ = exit_tx.send(code);616 });617618 Ok(RemoteChild {619 stdout,620 stderr,621 exit,622 })623 }512 }624513625 pub fn serve_elevate(&self) -> Result<()> {514 fn runtime_dir(&self) -> Utf8PathBuf {626 let Transport::Ssh {515 match &self.transport {627 sess, agent_path, ..628 } = &self.transport629 else {630 bail!("elevate should not be called on local")516 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),631 };632 let mut rpc = self.rpc.clone();633 ElevateEndpoints(SshElevator {517 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),634 sess: sess.clone(),635 rpc: self.rpc.clone().downgrade(),636 agent_path: agent_path.to_owned(),637 })518 }638 .register_endpoints(&mut rpc);639 Ok(())640 }519 }641520642 pub fn remote_dir(&self) -> Option<&Utf8Path> {521 pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {643 match &self.transport {522 match &self.transport {644 Transport::Ssh { remote_dir, .. } => Some(remote_dir),523 Transport::Ssh { sess, subs, .. } => {524 let (tx, rx) = oneshot::channel();525 subs.lock().expect("lock").insert(path.to_owned(), tx);645 Transport::Local { .. } => None,526 sess.streamlocal_forward(path.to_owned()).await?;527 Ok(RemowtListener::Ssh(rx))528 }529 Transport::Local { .. } => {530 let _ = std::fs::remove_file(path);531 Ok(RemowtListener::Local(532 UnixListener::bind(path)?,533 path.to_owned(),534 ))535 }646 }536 }647 }537 }648649 pub async fn forward_socket(650 &self,651 remote_path: &Utf8Path,652 ) -> Result<oneshot::Receiver<Channel<Msg>>> {653 let Transport::Ssh { sess, subs, .. } = &self.transport else {654 bail!("forward_socket should not be called on local")655 };656 let (tx, rx) = oneshot::channel();657 subs.lock()658 .expect("lock")659 .insert(remote_path.to_owned(), tx);660 sess.streamlocal_forward(remote_path.to_owned()).await?;661 Ok(rx)662 }663664 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {665 let Transport::Ssh { remote_dir, .. } = &self.transport else {666 bail!("open_shell should not be called on local")667 };668 let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));669670 let rx = self.forward_socket(&sock).await?;671 let client: PtyClient<BifConfig> = self.endpoints();672 let id = client673 .open_shell(sock, term.to_owned(), cols, rows)674 .await?675 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;676 let ch = rx677 .await678 .map_err(|_| anyhow!("agent never connected the shell socket"))?;679680 Ok(Shell {681 id,682 stream: ch.into_stream(),683 remote: self.rpc.remote(Address::Agent),684 })685 }686}538}687539688pub struct Shell {540fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {689 pub id: ShellId,690 pub stream: ChannelStream<Msg>,691 remote: Remote<BifConfig>,692}693694impl Shell {695 pub fn resizer(&self) -> ShellResizer {541 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {696 ShellResizer {542 if !dir.is_empty() {697 remote: self.remote.clone(),543 return Ok((Utf8PathBuf::from(dir), None));698 id: self.id,699 }544 }700 }545 }546 let tmp = tempfile::Builder::new()547 .prefix("remowt.")548 .rand_bytes(12)549 .tempdir()?;550 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())551 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;552 Ok((dir, Some(tmp)))701}553}702554703#[derive(Clone)]704pub struct ShellResizer {555async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {705 remote: Remote<BifConfig>,706 id: ShellId,707}708709impl ShellResizer {710 pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {556 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;711 PtyClient::wrap(self.remote.clone())557 let dir = dir.trim();712 .resize(self.id, cols, rows)558 if dir.is_empty() {713 .await?559 remote_mktemp(sess).await714 .map_err(|e| anyhow!("failed to resize remote shell: {e}"))560 } else {561 Ok(Utf8PathBuf::from(dir))715 }562 }716}563}717564crates/remowt-client/src/port.rsdiffbeforeafterbothno changes
crates/remowt-client/src/shell.rsdiffbeforeafterbothno changes
crates/remowt-client/src/subprocess.rsdiffbeforeafterbothno changes
crates/remowt-link-shared/Cargo.tomldiffbeforeafterboth11serde = { workspace = true, features = ["derive"] }11serde = { workspace = true, features = ["derive"] }12serde_json.workspace = true12serde_json.workspace = true13thiserror.workspace = true13thiserror.workspace = true14tokio = { workspace = true, features = ["fs"] }14tokio = { workspace = true, features = ["fs", "io-util", "macros"] }15tracing.workspace = true15remowt-ui-prompt.workspace = true16remowt-ui-prompt.workspace = true16camino = { workspace = true, features = ["serde1"] }17camino = { workspace = true, features = ["serde1"] }1718crates/remowt-link-shared/src/lib.rsdiffbeforeafterboth1use std::future::Future;23use bifrostlink::declarative::endpoints;4use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};1use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};5use bifrostlink::notification;2use bifrostlink::notification;6use bifrostlink::packet::OpaquePacketWrapper;3use bifrostlink::packet::OpaquePacketWrapper;7use bifrostlink::{AddressT, Config};4use bifrostlink::AddressT;8use serde::de::DeserializeOwned;5use serde::de::DeserializeOwned;9use serde::{Deserialize, Serialize};6use serde::{Deserialize, Serialize};10711pub mod editor;8pub mod editor;9pub mod port;121013#[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]11#[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]14pub enum Address {12pub enum Address {211922pub mod plugin;20pub mod plugin;2324#[derive(Serialize, Deserialize, Debug, thiserror::Error)]25pub enum ElevateError {26 #[error("elevation failed: {0}")]27 Failed(String),28}2930pub trait Elevator: Send + Sync {31 fn elevate(&self) -> impl Future<Output = Result<(), ElevateError>> + Send;32}3334pub struct ElevateEndpoints<E>(pub E);3536#[endpoints(ns = 3)]37impl<E: Elevator + 'static> ElevateEndpoints<E> {38 #[endpoints(id = 1)]39 async fn elevate(&self) -> Result<(), ElevateError> {40 self.0.elevate().await41 }42}432144#[derive(thiserror::Error, Debug)]22#[derive(thiserror::Error, Debug)]45pub enum Error {23pub enum Error {crates/remowt-link-shared/src/port.rsdiffbeforeafterbothno changes
crates/remowt-plugin/Cargo.tomldiffbeforeafterboth9anyhow.workspace = true9anyhow.workspace = true10bifrostlink.workspace = true10bifrostlink.workspace = true11bifrostlink-ports.workspace = true11bifrostlink-ports.workspace = true12bytes.workspace = true13remowt-link-shared.workspace = true12remowt-link-shared.workspace = true14serde_json.workspace = true13serde_json.workspace = true15tokio = { workspace = true, features = [14tokio = { workspace = true, features = [crates/remowt-plugin/src/host.rsdiffbeforeafterboth1use std::ffi::OsStr;1use std::ffi::OsStr;2use std::io;3use std::process::Stdio;2use std::process::Stdio;4use std::sync::Mutex;3use std::sync::Mutex;546use bifrostlink::{Port, Rpc, Rtt, WeakRpc};5use bifrostlink::{Rpc, Rtt, WeakRpc};7use bytes::{Bytes, BytesMut};8use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};9use tokio::process::{Child, ChildStdin, ChildStdout, Command};6use tokio::process::{Child, Command};10711use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};8use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};9use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};10use remowt_link_shared::{Address, BifConfig};131114pub fn serve(rpc: &mut Rpc<BifConfig>) {12pub fn serve(rpc: &mut Rpc<BifConfig>) {70 }68 }71}69}7273fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {74 Port::new(|mut rx, tx| async move {75 let reader = async move {76 loop {77 let len = match stdout.read_u32().await {78 Ok(len) => len,79 Err(e) => {80 tracing::error!("plugin stdout read failed: {e}");81 break;82 }83 };84 let mut buf = BytesMut::zeroed(len as usize);85 if let Err(e) = stdout.read_exact(&mut buf).await {86 tracing::error!("plugin stdout read failed: {e}");87 break;88 }89 if tx.send(buf.freeze()).is_err() {90 break;91 }92 }93 };94 let writer = async move {95 while let Some(msg) = rx.recv().await {96 if let Err(e) = write_frame(&mut stdin, msg).await {97 tracing::error!("plugin stdin write failed: {e}");98 break;99 }100 }101 };102 tokio::join!(reader, writer);103 })104}105106async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {107 let len = u32::try_from(msg.len())108 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;109 stdin.write_u32(len).await?;110 stdin.write_all(&msg).await?;111 stdin.flush().await?;112 Ok(())113}11470