difftreelog
feat(client) proper support for local
in: trunk
16 files changed
.gitignorediffbeforeafterboth--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
/target
/.direnv
+/result
+/result-*
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2096,6 +2096,7 @@
"russh-config",
"serde",
"serde_json",
+ "tempfile",
"tokio",
"tracing",
"uuid",
@@ -2131,6 +2132,7 @@
"serde_json",
"thiserror",
"tokio",
+ "tracing",
]
[[package]]
@@ -2140,7 +2142,6 @@
"anyhow",
"bifrostlink",
"bifrostlink-ports",
- "bytes",
"remowt-link-shared",
"serde_json",
"tokio",
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -225,8 +225,6 @@
}
fn main() -> anyhow::Result<()> {
- // Log to stderr: `privileged-agent` uses stdout as the bifrost transport,
- // so anything written there would corrupt the stream.
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.without_time()
cmds/remowt-ssh/Cargo.tomldiffbeforeafterboth--- a/cmds/remowt-ssh/Cargo.toml
+++ b/cmds/remowt-ssh/Cargo.toml
@@ -11,8 +11,15 @@
tracing-subscriber.workspace = true
bifrostlink.workspace = true
remowt-link-shared.workspace = true
-remowt-client.workspace = true
-tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }
+remowt-client = { workspace = true, features = ["shell"] }
+tokio = { workspace = true, features = [
+ "macros",
+ "fs",
+ "net",
+ "io-util",
+ "rt",
+ "signal",
+] }
nix = { workspace = true, features = ["term"] }
anyhow.workspace = true
bifrostlink-ports.workspace = true
cmds/remowt-ssh/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -19,11 +19,14 @@
use tokio::io::unix::AsyncFd;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::signal::unix::{signal, SignalKind};
-use tracing::info;
+use tracing::debug;
#[derive(Parser)]
-struct Opts {
- host: String,
+enum Opts {
+ /// Connect to remote host with remowt agent.
+ Ssh { host: String },
+ /// Connect to local host for testing the connectivity.
+ Local,
}
fn agents_dir() -> anyhow::Result<PathBuf> {
@@ -35,18 +38,27 @@
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
- tracing_subscriber::fmt::init();
+ tracing_subscriber::fmt()
+ .with_writer(std::io::stderr)
+ .without_time()
+ .init();
let opts = Opts::parse();
let bundle = AgentBundle::from_dir(agents_dir()?)?;
- let conn = Remowt::connect(&opts.host, &bundle).await?;
+ let conn = match &opts {
+ Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,
+ Opts::Local => Remowt::connect_local(&bundle).await?,
+ };
let mut rpc = conn.rpc();
serve_prompts(
&mut rpc,
PrependSourcePrompter {
prompter: RofiPrompter,
- source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],
+ source: match opts {
+ Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
+ Opts::Local => vec![],
+ },
description: "".to_owned(),
},
);
@@ -54,9 +66,9 @@
serve_editor(&mut rpc, SshEditor { sess });
}
- info!("entering shell");
+ debug!("entering shell");
run_shell(&conn).await?;
- info!("shell ended");
+ debug!("shell ended");
Ok(())
}
crates/remowt-client/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -16,7 +16,18 @@
remowt-link-shared.workspace = true
russh.workspace = true
russh-config.workspace = true
-tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }
+tempfile.workspace = true
+tokio = { workspace = true, features = [
+ "net",
+ "io-util",
+ "rt",
+ "sync",
+ "macros",
+ "process",
+] }
tracing.workspace = true
uuid = { workspace = true, features = ["v4"] }
-remowt-endpoints.workspace = true
+remowt-endpoints = { workspace = true, optional = true }
+
+[features]
+shell = ["dep:remowt-endpoints"]
crates/remowt-client/src/forwarded.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/forwarded.rs
@@ -0,0 +1,79 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Result};
+use camino::Utf8PathBuf;
+use russh::client::Msg;
+use russh::{Channel, ChannelStream};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::net::{UnixListener, UnixStream};
+use tokio::sync::oneshot;
+
+pub enum RemowtListener {
+ Ssh(oneshot::Receiver<Channel<Msg>>),
+ Local(UnixListener, Utf8PathBuf),
+}
+
+impl RemowtListener {
+ pub async fn accept(self) -> Result<RemowtStream> {
+ match self {
+ RemowtListener::Ssh(rx) => {
+ let ch = rx
+ .await
+ .map_err(|_| anyhow!("agent never connected the forwarded socket"))?;
+ Ok(RemowtStream::Ssh(ch.into_stream()))
+ }
+ RemowtListener::Local(listener, path) => {
+ let (stream, _) = listener.accept().await?;
+ let _ = std::fs::remove_file(&path);
+ Ok(RemowtStream::Local(stream))
+ }
+ }
+ }
+}
+
+pub enum RemowtStream {
+ Ssh(ChannelStream<Msg>),
+ Local(UnixStream),
+}
+
+impl AsyncRead for RemowtStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_read(cx, buf),
+ RemowtStream::Local(s) => Pin::new(s).poll_read(cx, buf),
+ }
+ }
+}
+
+impl AsyncWrite for RemowtStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_write(cx, buf),
+ RemowtStream::Local(s) => Pin::new(s).poll_write(cx, buf),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_flush(cx),
+ RemowtStream::Local(s) => Pin::new(s).poll_flush(cx),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_shutdown(cx),
+ RemowtStream::Local(s) => Pin::new(s).poll_shutdown(cx),
+ }
+ }
+}
crates/remowt-client/src/lib.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::path::PathBuf;3use std::sync::{Arc, Mutex};4use std::{env, io};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};9use bifrostlink_ports::unix_socket::from_socket;10use bytes::{Bytes, BytesMut};11use camino::{Utf8Path, Utf8PathBuf};12use remowt_endpoints::{13 fs::Fs,14 pty::{Pty, PtyClient, ShellId},15 systemd::Systemd,16};17use remowt_link_shared::plugin::PluginEndpointsClient;18use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator};19use russh::client::{connect, Config, Handle, Handler, Msg, Session};20use russh::keys::agent::client::AgentClient;21use russh::keys::agent::AgentIdentity;22use russh::keys::check_known_hosts;23use russh::keys::ssh_key::PublicKey;24use russh::{Channel, ChannelMsg, ChannelStream};25use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};26use tokio::join;27use tokio::net::UnixListener;28use tokio::sync::mpsc;29use tokio::sync::oneshot::{self, channel};30use tracing::error;31use uuid::Uuid;3233pub mod editor;3435type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;3637async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {38 let len = srx.read_u32().await?;39 let mut buf = BytesMut::zeroed(len as usize);40 srx.read_exact(&mut buf).await?;41 Ok(buf)42}43async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {44 stx.write_u32(value.len().try_into().expect("can't be larger"))45 .await?;46 stx.write_all(&value).await?;47 Ok(())48}4950fn sh_quote(s: impl AsRef<str>) -> String {51 format!("'{}'", s.as_ref().replace('\'', "'\\''"))52}5354const ESCALATORS: [(&str, &[&str]); 3] = [55 ("run0", &["--background=", "--pipe"]),56 ("sudo", &[]),57 ("doas", &[]),58];5960pub struct AgentBundle {61 dir: PathBuf,62 hashes: HashMap<String, String>,63}6465impl AgentBundle {66 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {67 let dir = dir.into();68 let hashes_path = dir.join("hashes");69 let raw = std::fs::read_to_string(&hashes_path)70 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;71 let mut hashes = HashMap::new();72 for line in raw.lines() {73 let line = line.trim();74 if line.is_empty() {75 continue;76 }77 let (arch, hash) = line78 .split_once(char::is_whitespace)79 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;80 hashes.insert(arch.to_owned(), hash.trim().to_owned());81 }82 ensure!(83 !hashes.is_empty(),84 "agent bundle {} has no hashes",85 dir.display()86 );87 Ok(Self { dir, hashes })88 }8990 fn binary(&self, arch: &str) -> PathBuf {91 self.dir.join(format!("remowt-agent-{arch}"))92 }93}9495async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {96 let mut ch = sess.channel_open_session().await?;97 ch.exec(true, cmd).await?;98 let mut out = Vec::new();99 let mut code = None;100 while let Some(msg) = ch.wait().await {101 match msg {102 ChannelMsg::Data { data } => out.extend(data.as_ref()),103 ChannelMsg::ExtendedData { data, .. } => {104 error!(105 "remote stderr: {}",106 String::from_utf8_lossy(data.as_ref()).trim()107 );108 }109 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),110 _ => {}111 }112 }113 Ok((code, out))114}115116async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {117 let (code, mut out) = run(sess, cmd).await?;118 ensure!(119 code == Some(0),120 "remote command failed (exit {code:?}): {cmd}"121 );122 ensure!(out.ends_with(b"\n"));123 out.pop();124 String::from_utf8(out).context("expected utf8 output for command")125}126127async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {128 let arch = run_string_ok(sess, "uname -m").await?;129 let hash = bundle130 .hashes131 .get(&arch)132 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;133134 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")135 .await?136 .trim()137 .to_owned();138 let dir = if cache.is_empty() {139 let home = run_string_ok(sess, "echo \"$HOME\"").await?;140 ensure!(141 !home.is_empty(),142 "remote $HOME and $XDG_CACHE_HOME both empty"143 );144 Utf8PathBuf::from(home).join("cache/remowt")145 } else {146 Utf8PathBuf::from(cache).join("remowt")147 };148 let path = dir.join(hash);149150 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;151 if present != Some(0) {152 let bin = bundle.binary(&arch);153 let bytes = std::fs::read(&bin)154 .with_context(|| format!("reading agent binary {}", bin.display()))?;155 upload_agent(sess, &dir, &path, bytes).await?;156 }157 Ok(path)158}159160async fn upload_agent(161 sess: &Handle<SshHandler>,162 dir: &Utf8Path,163 path: &Utf8Path,164 bytes: Vec<u8>,165) -> Result<()> {166 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;167168 let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));169 let ch = sess.channel_open_session().await?;170 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;171 ch.data_bytes(bytes).await?;172 ch.eof().await?;173 let mut ch = ch;174 let mut code = None;175 while let Some(msg) = ch.wait().await {176 match msg {177 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),178 ChannelMsg::ExtendedData { data, .. } => {179 error!(180 "agent upload: {}",181 String::from_utf8_lossy(data.as_ref()).trim()182 );183 }184 _ => {}185 }186 }187 ensure!(code == Some(0), "agent upload failed (exit {code:?})");188189 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;190 run_string_ok(191 sess,192 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),193 )194 .await?;195 Ok(())196}197198async fn detect_escalation(199 sess: &Handle<SshHandler>,200) -> Result<(&'static str, &'static [&'static str])> {201 for (tool, flags) in ESCALATORS {202 // `tool` is a fixed identifier (no metacharacters), safe to interpolate.203 let (code, _) = run(sess, &format!("command -v {tool}")).await?;204 if code == Some(0) {205 return Ok((tool, flags));206 }207 }208 bail!("no escalation tool (run0/sudo/doas) found on remote")209}210211fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {212 let mut parts = vec![tool.to_owned()];213 parts.extend(flags.iter().map(|f| f.to_string()));214 parts.push(sh_quote(agent_path));215 parts.push("real-agent".to_owned());216 parts.push("--privileged".to_owned());217 if let Some(p) = path {218 parts.push("--path".to_owned());219 parts.push(sh_quote(p));220 }221 parts.join(" ")222}223224fn find_in_path(name: &str) -> Option<std::path::PathBuf> {225 let path = env::var_os("PATH")?;226 env::split_paths(&path)227 .map(|dir| dir.join(name))228 .find(|p| p.is_file())229}230231fn port_from_channel(ch: Channel<Msg>) -> Port {232 Port::new(move |mut rx, tx| async move {233 let (mut srx, mut stx) = tokio::io::split(ch.into_stream());234 let srx_task = async move {235 loop {236 match read(&mut srx).await {237 Ok(buf) => {238 if tx.send(buf.freeze()).is_err() {239 break;240 }241 }242 Err(e) => {243 error!("channel read failed: {e}");244 break;245 }246 }247 }248 };249 let stx_task = async move {250 while let Some(value) = rx.recv().await {251 if let Err(e) = write(&mut stx, value).await {252 error!("channel write failed: {e}");253 break;254 }255 }256 };257 join!(srx_task, stx_task);258 })259}260261pub struct SshHandler {262 host: String,263 port: u16,264 subs: Subs,265}266impl Handler for SshHandler {267 type Error = russh::Error;268 async fn check_server_key(269 &mut self,270 server_public_key: &PublicKey,271 ) -> Result<bool, Self::Error> {272 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)273 }274 async fn server_channel_open_forwarded_streamlocal(275 &mut self,276 channel: Channel<Msg>,277 socket_path: &str,278 _session: &mut Session,279 ) -> Result<(), Self::Error> {280 let Some(ch) = self281 .subs282 .lock()283 .expect("lock")284 .remove(&Utf8PathBuf::from(socket_path))285 else {286 return Err(russh::Error::WrongChannel);287 };288 let _ = ch.send(channel);289 Ok(())290 }291}292293struct SshElevator {294 sess: Arc<Handle<SshHandler>>,295 rpc: WeakRpc<BifConfig>,296 agent_path: Utf8PathBuf,297}298impl Elevator for SshElevator {299 async fn elevate(&self) -> Result<(), ElevateError> {300 let fail = |e: String| ElevateError::Failed(e);301 let (tool, flags) = detect_escalation(&self.sess)302 .await303 .map_err(|e| fail(e.to_string()))?;304 let ch = self305 .sess306 .channel_open_session()307 .await308 .map_err(|e| fail(e.to_string()))?;309 ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))310 .await311 .map_err(|e| fail(e.to_string()))?;312 let rpc = self313 .rpc314 .clone()315 .upgrade()316 .ok_or_else(|| fail("rpc is gone".to_owned()))?;317 rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));318 Ok(())319 }320}321322pub struct RemoteChild {323 pub stdout: DuplexStream,324 pub stderr: DuplexStream,325 pub exit: oneshot::Receiver<Option<u32>>,326}327328enum Transport {329 Ssh {330 sess: Arc<Handle<SshHandler>>,331 subs: Subs,332 remote_dir: Utf8PathBuf,333 agent_path: Utf8PathBuf,334 },335 Local {336 #[allow(dead_code)]337 agent: Rpc<BifConfig>,338 agent_path: String,339 },340}341342pub struct Remowt {343 transport: Transport,344 rpc: Rpc<BifConfig>,345 elevated: tokio::sync::OnceCell<()>,346 children: Mutex<Vec<tokio::process::Child>>,347}348349pub type RemowtRemote = Remote<BifConfig>;350351fn loopback() -> (Port, Port) {352 let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();353 let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();354 let user = Port::new(move |mut rx, tx| async move {355 loop {356 tokio::select! {357 msg = rx.recv() => match msg {358 Some(msg) => if a2b_tx.send(msg).is_err() { break },359 None => break,360 },361 msg = b2a_rx.recv() => match msg {362 Some(msg) => if tx.send(msg).is_err() { break },363 None => break,364 },365 }366 }367 });368 let agent = Port::new(move |mut rx, tx| async move {369 loop {370 tokio::select! {371 msg = rx.recv() => match msg {372 Some(msg) => if b2a_tx.send(msg).is_err() { break },373 None => break,374 },375 msg = a2b_rx.recv() => match msg {376 Some(msg) => if tx.send(msg).is_err() { break },377 None => break,378 },379 }380 }381 });382 (user, agent)383}384385impl Remowt {386 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {387 let conf = russh_config::parse_home(host)?;388 let port = conf.host_config.port.or(conf.port).unwrap_or(22);389 let hostname = conf390 .host_config391 .hostname392 .clone()393 .unwrap_or_else(|| conf.host_name.clone());394 let user = conf395 .user396 .clone()397 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));398399 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));400 let mut sess = connect(401 Arc::new(Config::default()),402 (hostname.clone(), port),403 SshHandler {404 host: hostname,405 port,406 subs: subs.clone(),407 },408 )409 .await?;410411 let mut agent = AgentClient::connect_env().await?;412 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();413 let mut authenticated = false;414 for ident in agent.request_identities().await? {415 let AgentIdentity::PublicKey { key, .. } = ident else {416 continue;417 };418 if sess419 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)420 .await?421 .success()422 {423 authenticated = true;424 break;425 }426 }427 ensure!(authenticated, "ssh authentication failed");428429 // All remaining session ops take `&self`; share the handle.430 let sess = Arc::new(sess);431432 let agent_path = deploy_agent(&sess, bundle).await?;433434 let remote_dir = remote_mktemp(&sess).await?;435 let primary = remote_dir.join("primary.sock");436437 let (onetx, onerx) = channel();438 subs.lock().expect("lock").insert(primary.clone(), onetx);439 sess.streamlocal_forward(primary.clone()).await?;440441 let rpc = Rpc::<BifConfig>::new(Address::User);442443 // TODO: ensure no injection is possible in the socket path.444 let cmd_chan = sess.channel_open_session().await?;445 cmd_chan446 .exec(447 true,448 format!(449 "{} real-agent --path={}",450 sh_quote(&agent_path),451 sh_quote(&primary)452 ),453 )454 .await?;455456 let port = port_from_channel(457 onerx458 .await459 .map_err(|_| anyhow!("agent never opened its channel"))?,460 );461 rpc.add_direct(Address::Agent, port, Rtt(0));462463 Ok(Self {464 transport: Transport::Ssh {465 sess,466 subs,467 remote_dir,468 agent_path,469 },470 rpc,471 elevated: tokio::sync::OnceCell::new(),472 children: Mutex::new(Vec::new()),473 })474 }475476 pub async fn connect_local(agent_path: &str) -> Result<Self> {477 let (port_user, port_agent) = loopback();478 let rpc = Rpc::<BifConfig>::new(Address::User);479 let mut agent = Rpc::<BifConfig>::new(Address::Agent);480481 // Register handlers before wiring up the link (see the agent binary).482 Fs::new().register_endpoints(&mut agent);483 Systemd.register_endpoints(&mut agent);484 Pty::new().register_endpoints(&mut agent);485486 agent.add_direct(Address::User, port_agent, Rtt(0));487 rpc.add_direct(Address::Agent, port_user, Rtt(0));488489 Ok(Self {490 transport: Transport::Local {491 agent,492 agent_path: agent_path.to_owned(),493 },494 rpc,495 elevated: tokio::sync::OnceCell::new(),496 children: Mutex::new(Vec::new()),497 })498 }499500 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {501 match &self.transport {502 Transport::Ssh { sess, .. } => Some(sess.clone()),503 Transport::Local { .. } => None,504 }505 }506507 pub fn rpc(&self) -> Rpc<BifConfig> {508 self.rpc.clone()509 }510511 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {512 R::wrap(self.rpc.remote(Address::Agent))513 }514515 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {516 let client: PluginEndpointsClient<BifConfig> = self.endpoints();517 client518 .load_plugin(id, name.to_owned())519 .await?520 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))521 }522 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {523 self.ensure_elevated().await?;524 let client: PluginEndpointsClient<BifConfig> =525 PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));526 client527 .load_plugin_path(id, path.to_owned())528 .await?529 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))530 }531 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {532 R::wrap(self.rpc.remote(Address::Plugin(id)))533 }534 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {535 self.ensure_elevated().await?;536 Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))537 }538539 async fn ensure_elevated(&self) -> Result<()> {540 self.elevated541 .get_or_try_init(|| async {542 let port = match &self.transport {543 Transport::Ssh {544 sess, agent_path, ..545 } => {546 let (tool, flags) = detect_escalation(sess).await?;547 let ch = sess.channel_open_session().await?;548 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))549 .await?;550 port_from_channel(ch)551 }552 Transport::Local { agent_path, .. } => {553 let sock = env::temp_dir()554 .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));555 let _ = std::fs::remove_file(&sock);556 let listener = UnixListener::bind(&sock)?;557 let (tool, flags) = ESCALATORS558 .iter()559 .find(|(t, _)| find_in_path(t).is_some())560 .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;561 let child = tokio::process::Command::new(tool)562 .args(*flags)563 .arg(agent_path)564 .arg("real-agent")565 .arg("--privileged")566 .arg("--path")567 .arg(sock.to_str().expect("temp path is utf-8"))568 .kill_on_drop(true)569 .spawn()?;570 self.children.lock().expect("lock").push(child);571 let (stream, _) = listener.accept().await?;572 let _ = std::fs::remove_file(&sock);573 from_socket(stream)574 }575 };576 self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));577 anyhow::Ok(())578 })579 .await?;580 Ok(())581 }582583 pub async fn exec(&self, command: String) -> Result<RemoteChild> {584 let Some(sess) = self.ssh() else {585 bail!("exec should not be called on local")586 };587 let ch = sess.channel_open_session().await?;588 ch.exec(true, command).await?;589590 let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);591 let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);592 let (exit_tx, exit) = oneshot::channel();593594 tokio::spawn(async move {595 let mut ch = ch;596 let mut code = None;597 while let Some(msg) = ch.wait().await {598 match msg {599 ChannelMsg::Data { data } => {600 if out_w.write_all(&data).await.is_err() {601 break;602 }603 }604 ChannelMsg::ExtendedData { data, .. } => {605 if err_w.write_all(&data).await.is_err() {606 break;607 }608 }609 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),610 _ => {}611 }612 }613 let _ = out_w.shutdown().await;614 let _ = err_w.shutdown().await;615 let _ = exit_tx.send(code);616 });617618 Ok(RemoteChild {619 stdout,620 stderr,621 exit,622 })623 }624625 pub fn serve_elevate(&self) -> Result<()> {626 let Transport::Ssh {627 sess, agent_path, ..628 } = &self.transport629 else {630 bail!("elevate should not be called on local")631 };632 let mut rpc = self.rpc.clone();633 ElevateEndpoints(SshElevator {634 sess: sess.clone(),635 rpc: self.rpc.clone().downgrade(),636 agent_path: agent_path.to_owned(),637 })638 .register_endpoints(&mut rpc);639 Ok(())640 }641642 pub fn remote_dir(&self) -> Option<&Utf8Path> {643 match &self.transport {644 Transport::Ssh { remote_dir, .. } => Some(remote_dir),645 Transport::Local { .. } => None,646 }647 }648649 pub async fn forward_socket(650 &self,651 remote_path: &Utf8Path,652 ) -> Result<oneshot::Receiver<Channel<Msg>>> {653 let Transport::Ssh { sess, subs, .. } = &self.transport else {654 bail!("forward_socket should not be called on local")655 };656 let (tx, rx) = oneshot::channel();657 subs.lock()658 .expect("lock")659 .insert(remote_path.to_owned(), tx);660 sess.streamlocal_forward(remote_path.to_owned()).await?;661 Ok(rx)662 }663664 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {665 let Transport::Ssh { remote_dir, .. } = &self.transport else {666 bail!("open_shell should not be called on local")667 };668 let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));669670 let rx = self.forward_socket(&sock).await?;671 let client: PtyClient<BifConfig> = self.endpoints();672 let id = client673 .open_shell(sock, term.to_owned(), cols, rows)674 .await?675 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;676 let ch = rx677 .await678 .map_err(|_| anyhow!("agent never connected the shell socket"))?;679680 Ok(Shell {681 id,682 stream: ch.into_stream(),683 remote: self.rpc.remote(Address::Agent),684 })685 }686}687688pub struct Shell {689 pub id: ShellId,690 pub stream: ChannelStream<Msg>,691 remote: Remote<BifConfig>,692}693694impl Shell {695 pub fn resizer(&self) -> ShellResizer {696 ShellResizer {697 remote: self.remote.clone(),698 id: self.id,699 }700 }701}702703#[derive(Clone)]704pub struct ShellResizer {705 remote: Remote<BifConfig>,706 id: ShellId,707}708709impl ShellResizer {710 pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {711 PtyClient::wrap(self.remote.clone())712 .resize(self.id, cols, rows)713 .await?714 .map_err(|e| anyhow!("failed to resize remote shell: {e}"))715 }716}717718async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {719 let mut cmd_chan = sess.channel_open_session().await?;720 cmd_chan721 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")722 .await?;723 let mut stdout = vec![];724 loop {725 let Some(msg) = cmd_chan.wait().await else {726 bail!("unexpected channel end");727 };728 match msg {729 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),730 russh::ChannelMsg::ExitStatus { exit_status } => {731 if exit_status != 0 {732 bail!("mktemp failed");733 }734 break;735 }736 _ => {}737 }738 }739 ensure!(stdout.ends_with(b"\n"));740 stdout.pop();741 Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))742}crates/remowt-client/src/port.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/port.rs
@@ -0,0 +1,52 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use russh::{Channel, ChannelStream};
+use russh::client::Msg;
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};
+use tokio::join;
+use tracing::error;
+
+async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
+ let len = srx.read_u32().await?;
+ let mut buf = BytesMut::zeroed(len as usize);
+ srx.read_exact(&mut buf).await?;
+ Ok(buf)
+}
+async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
+ stx.write_u32(value.len().try_into().expect("can't be larger"))
+ .await?;
+ stx.write_all(&value).await?;
+ Ok(())
+}
+
+pub fn channel_port(ch: Channel<Msg>) -> Port {
+ Port::new(move |mut rx, tx| async move {
+ let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
+ let srx_task = async move {
+ loop {
+ match read(&mut srx).await {
+ Ok(buf) => {
+ if tx.send(buf.freeze()).is_err() {
+ break;
+ }
+ }
+ Err(e) => {
+ error!("channel read failed: {e}");
+ break;
+ }
+ }
+ }
+ };
+ let stx_task = async move {
+ while let Some(value) = rx.recv().await {
+ if let Err(e) = write(&mut stx, value).await {
+ error!("channel write failed: {e}");
+ break;
+ }
+ }
+ };
+ join!(srx_task, stx_task);
+ })
+}
crates/remowt-client/src/shell.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/shell.rs
@@ -0,0 +1,60 @@
+use anyhow::{anyhow, Result};
+use bifrostlink::declarative::RemoteEndpoints as _;
+use bifrostlink::Remote;
+use remowt_endpoints::pty::{PtyClient, ShellId};
+use remowt_link_shared::{Address, BifConfig};
+use uuid::Uuid;
+
+use crate::forwarded::RemowtStream;
+use crate::Remowt;
+
+pub struct RemowtShell {
+ pub id: ShellId,
+ pub stream: RemowtStream,
+ remote: Remote<BifConfig>,
+}
+impl RemowtShell {
+ pub fn resizer(&self) -> RemowtShellResizer {
+ RemowtShellResizer {
+ remote: self.remote.clone(),
+ id: self.id,
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct RemowtShellResizer {
+ remote: Remote<BifConfig>,
+ id: ShellId,
+}
+
+impl RemowtShellResizer {
+ pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
+ PtyClient::wrap(self.remote.clone())
+ .resize(self.id, cols, rows)
+ .await?
+ .map_err(|e| anyhow!("failed to resize remote shell: {e}"))
+ }
+}
+
+impl Remowt {
+ pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {
+ let sock = self
+ .runtime_dir()
+ .join(format!("remowt-shell-{}.sock", Uuid::new_v4()));
+
+ let forwarded = self.forward_socket(&sock).await?;
+ let client: PtyClient<BifConfig> = self.endpoints();
+ let id = client
+ .open_shell(sock, term.to_owned(), cols, rows)
+ .await?
+ .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
+ let stream = forwarded.accept().await?;
+
+ Ok(RemowtShell {
+ id,
+ stream,
+ remote: self.rpc.remote(Address::Agent),
+ })
+ }
+}
crates/remowt-client/src/subprocess.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/subprocess.rs
@@ -0,0 +1,84 @@
+use bytes::Bytes;
+use russh::client::Msg;
+use russh::{Channel, ChannelMsg};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
+use tokio::sync::oneshot;
+
+const BUF: usize = 64 * 1024;
+
+pub struct RemowtChild {
+ pub stdin: DuplexStream,
+ pub stdout: DuplexStream,
+ pub stderr: DuplexStream,
+ pub exit: oneshot::Receiver<Option<u32>>,
+}
+
+impl RemowtChild {
+ /// Manage channel returned by russh exec().
+ pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
+ let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
+ let (mut out_w, stdout) = tokio::io::duplex(BUF);
+ let (mut err_w, stderr) = tokio::io::duplex(BUF);
+ let (exit_tx, exit) = oneshot::channel();
+
+ tokio::spawn(async move {
+ let (mut read, write) = ch.split();
+
+ // Forward our stdin to the channel, signalling EOF when it closes.
+ let stdin_pump = tokio::spawn(async move {
+ let mut buf = vec![0u8; BUF];
+ loop {
+ match stdin_r.read(&mut buf).await {
+ Ok(0) | Err(_) => break,
+ Ok(n) => {
+ if write
+ .data_bytes(Bytes::copy_from_slice(&buf[..n]))
+ .await
+ .is_err()
+ {
+ return;
+ }
+ }
+ }
+ }
+ let _ = write.eof().await;
+ });
+
+ let mut code = None;
+ while let Some(msg) = read.wait().await {
+ match msg {
+ ChannelMsg::Data { data } => {
+ if out_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExtendedData { data, .. } => {
+ if err_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
+ _ => {}
+ }
+ }
+
+ // The process is gone; stop waiting on stdin we'll never forward.
+ stdin_pump.abort();
+ let _ = out_w.shutdown().await;
+ let _ = err_w.shutdown().await;
+ let _ = exit_tx.send(code);
+ });
+
+ RemowtChild {
+ stdin,
+ stdout,
+ stderr,
+ exit,
+ }
+ }
+
+ /// Wait for the process to finish, returning its exit status.
+ pub async fn wait(self) -> Option<u32> {
+ self.exit.await.ok().flatten()
+ }
+}
crates/remowt-link-shared/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-link-shared/Cargo.toml
+++ b/crates/remowt-link-shared/Cargo.toml
@@ -11,6 +11,7 @@
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
-tokio = { workspace = true, features = ["fs"] }
+tokio = { workspace = true, features = ["fs", "io-util", "macros"] }
+tracing.workspace = true
remowt-ui-prompt.workspace = true
camino = { workspace = true, features = ["serde1"] }
crates/remowt-link-shared/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -1,14 +1,12 @@
-use std::future::Future;
-
-use bifrostlink::declarative::endpoints;
use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};
use bifrostlink::notification;
use bifrostlink::packet::OpaquePacketWrapper;
-use bifrostlink::{AddressT, Config};
+use bifrostlink::AddressT;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
pub mod editor;
+pub mod port;
#[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]
pub enum Address {
@@ -20,26 +18,6 @@
impl AddressT for Address {}
pub mod plugin;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum ElevateError {
- #[error("elevation failed: {0}")]
- Failed(String),
-}
-
-pub trait Elevator: Send + Sync {
- fn elevate(&self) -> impl Future<Output = Result<(), ElevateError>> + Send;
-}
-
-pub struct ElevateEndpoints<E>(pub E);
-
-#[endpoints(ns = 3)]
-impl<E: Elevator + 'static> ElevateEndpoints<E> {
- #[endpoints(id = 1)]
- async fn elevate(&self) -> Result<(), ElevateError> {
- self.0.elevate().await
- }
-}
#[derive(thiserror::Error, Debug)]
pub enum Error {
crates/remowt-link-shared/src/port.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-link-shared/src/port.rs
@@ -0,0 +1,54 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+
+/// Wire a length-prefixed duplex byte stream (e.g. a child process's
+/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
+/// length followed by that many payload bytes.
+pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
+where
+ R: AsyncRead + Unpin + Send + 'static,
+ W: AsyncWrite + Unpin + Send + 'static,
+{
+ Port::new(|mut rx, tx| async move {
+ let read_task = async move {
+ loop {
+ let len = match reader.read_u32().await {
+ Ok(len) => len,
+ Err(e) => {
+ tracing::error!("child read failed: {e}");
+ break;
+ }
+ };
+ let mut buf = BytesMut::zeroed(len as usize);
+ if let Err(e) = reader.read_exact(&mut buf).await {
+ tracing::error!("child read failed: {e}");
+ break;
+ }
+ if tx.send(buf.freeze()).is_err() {
+ break;
+ }
+ }
+ };
+ let write_task = async move {
+ while let Some(msg) = rx.recv().await {
+ if let Err(e) = write_frame(&mut writer, msg).await {
+ tracing::error!("child write failed: {e}");
+ break;
+ }
+ }
+ };
+ tokio::join!(read_task, write_task);
+ })
+}
+
+async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
+ let len = u32::try_from(msg.len())
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
+ writer.write_u32(len).await?;
+ writer.write_all(&msg).await?;
+ writer.flush().await?;
+ Ok(())
+}
crates/remowt-plugin/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-plugin/Cargo.toml
+++ b/crates/remowt-plugin/Cargo.toml
@@ -9,7 +9,6 @@
anyhow.workspace = true
bifrostlink.workspace = true
bifrostlink-ports.workspace = true
-bytes.workspace = true
remowt-link-shared.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = [
crates/remowt-plugin/src/host.rsdiffbeforeafterboth--- a/crates/remowt-plugin/src/host.rs
+++ b/crates/remowt-plugin/src/host.rs
@@ -1,14 +1,12 @@
use std::ffi::OsStr;
-use std::io;
use std::process::Stdio;
use std::sync::Mutex;
-use bifrostlink::{Port, Rpc, Rtt, WeakRpc};
-use bytes::{Bytes, BytesMut};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
-use tokio::process::{Child, ChildStdin, ChildStdout, Command};
+use bifrostlink::{Rpc, Rtt, WeakRpc};
+use tokio::process::{Child, Command};
use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};
+use remowt_link_shared::port::child_port;
use remowt_link_shared::{Address, BifConfig};
pub fn serve(rpc: &mut Rpc<BifConfig>) {
@@ -68,46 +66,4 @@
}
self.spawn(id, path)
}
-}
-
-fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {
- Port::new(|mut rx, tx| async move {
- let reader = async move {
- loop {
- let len = match stdout.read_u32().await {
- Ok(len) => len,
- Err(e) => {
- tracing::error!("plugin stdout read failed: {e}");
- break;
- }
- };
- let mut buf = BytesMut::zeroed(len as usize);
- if let Err(e) = stdout.read_exact(&mut buf).await {
- tracing::error!("plugin stdout read failed: {e}");
- break;
- }
- if tx.send(buf.freeze()).is_err() {
- break;
- }
- }
- };
- let writer = async move {
- while let Some(msg) = rx.recv().await {
- if let Err(e) = write_frame(&mut stdin, msg).await {
- tracing::error!("plugin stdin write failed: {e}");
- break;
- }
- }
- };
- tokio::join!(reader, writer);
- })
-}
-
-async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {
- let len = u32::try_from(msg.len())
- .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
- stdin.write_u32(len).await?;
- stdin.write_all(&msg).await?;
- stdin.flush().await?;
- Ok(())
}