difftreelog
feat(client) proper support for local
in: trunk
16 files changed
.gitignorediffbeforeafterboth--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
/target
/.direnv
+/result
+/result-*
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2096,6 +2096,7 @@
"russh-config",
"serde",
"serde_json",
+ "tempfile",
"tokio",
"tracing",
"uuid",
@@ -2131,6 +2132,7 @@
"serde_json",
"thiserror",
"tokio",
+ "tracing",
]
[[package]]
@@ -2140,7 +2142,6 @@
"anyhow",
"bifrostlink",
"bifrostlink-ports",
- "bytes",
"remowt-link-shared",
"serde_json",
"tokio",
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -225,8 +225,6 @@
}
fn main() -> anyhow::Result<()> {
- // Log to stderr: `privileged-agent` uses stdout as the bifrost transport,
- // so anything written there would corrupt the stream.
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.without_time()
cmds/remowt-ssh/Cargo.tomldiffbeforeafterboth--- a/cmds/remowt-ssh/Cargo.toml
+++ b/cmds/remowt-ssh/Cargo.toml
@@ -11,8 +11,15 @@
tracing-subscriber.workspace = true
bifrostlink.workspace = true
remowt-link-shared.workspace = true
-remowt-client.workspace = true
-tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }
+remowt-client = { workspace = true, features = ["shell"] }
+tokio = { workspace = true, features = [
+ "macros",
+ "fs",
+ "net",
+ "io-util",
+ "rt",
+ "signal",
+] }
nix = { workspace = true, features = ["term"] }
anyhow.workspace = true
bifrostlink-ports.workspace = true
cmds/remowt-ssh/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -19,11 +19,14 @@
use tokio::io::unix::AsyncFd;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::signal::unix::{signal, SignalKind};
-use tracing::info;
+use tracing::debug;
#[derive(Parser)]
-struct Opts {
- host: String,
+enum Opts {
+ /// Connect to remote host with remowt agent.
+ Ssh { host: String },
+ /// Connect to local host for testing the connectivity.
+ Local,
}
fn agents_dir() -> anyhow::Result<PathBuf> {
@@ -35,18 +38,27 @@
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
- tracing_subscriber::fmt::init();
+ tracing_subscriber::fmt()
+ .with_writer(std::io::stderr)
+ .without_time()
+ .init();
let opts = Opts::parse();
let bundle = AgentBundle::from_dir(agents_dir()?)?;
- let conn = Remowt::connect(&opts.host, &bundle).await?;
+ let conn = match &opts {
+ Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,
+ Opts::Local => Remowt::connect_local(&bundle).await?,
+ };
let mut rpc = conn.rpc();
serve_prompts(
&mut rpc,
PrependSourcePrompter {
prompter: RofiPrompter,
- source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],
+ source: match opts {
+ Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
+ Opts::Local => vec![],
+ },
description: "".to_owned(),
},
);
@@ -54,9 +66,9 @@
serve_editor(&mut rpc, SshEditor { sess });
}
- info!("entering shell");
+ debug!("entering shell");
run_shell(&conn).await?;
- info!("shell ended");
+ debug!("shell ended");
Ok(())
}
crates/remowt-client/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -16,7 +16,18 @@
remowt-link-shared.workspace = true
russh.workspace = true
russh-config.workspace = true
-tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }
+tempfile.workspace = true
+tokio = { workspace = true, features = [
+ "net",
+ "io-util",
+ "rt",
+ "sync",
+ "macros",
+ "process",
+] }
tracing.workspace = true
uuid = { workspace = true, features = ["v4"] }
-remowt-endpoints.workspace = true
+remowt-endpoints = { workspace = true, optional = true }
+
+[features]
+shell = ["dep:remowt-endpoints"]
crates/remowt-client/src/forwarded.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/forwarded.rs
@@ -0,0 +1,79 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Result};
+use camino::Utf8PathBuf;
+use russh::client::Msg;
+use russh::{Channel, ChannelStream};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::net::{UnixListener, UnixStream};
+use tokio::sync::oneshot;
+
+pub enum RemowtListener {
+ Ssh(oneshot::Receiver<Channel<Msg>>),
+ Local(UnixListener, Utf8PathBuf),
+}
+
+impl RemowtListener {
+ pub async fn accept(self) -> Result<RemowtStream> {
+ match self {
+ RemowtListener::Ssh(rx) => {
+ let ch = rx
+ .await
+ .map_err(|_| anyhow!("agent never connected the forwarded socket"))?;
+ Ok(RemowtStream::Ssh(ch.into_stream()))
+ }
+ RemowtListener::Local(listener, path) => {
+ let (stream, _) = listener.accept().await?;
+ let _ = std::fs::remove_file(&path);
+ Ok(RemowtStream::Local(stream))
+ }
+ }
+ }
+}
+
+pub enum RemowtStream {
+ Ssh(ChannelStream<Msg>),
+ Local(UnixStream),
+}
+
+impl AsyncRead for RemowtStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_read(cx, buf),
+ RemowtStream::Local(s) => Pin::new(s).poll_read(cx, buf),
+ }
+ }
+}
+
+impl AsyncWrite for RemowtStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_write(cx, buf),
+ RemowtStream::Local(s) => Pin::new(s).poll_write(cx, buf),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_flush(cx),
+ RemowtStream::Local(s) => Pin::new(s).poll_flush(cx),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_shutdown(cx),
+ RemowtStream::Local(s) => Pin::new(s).poll_shutdown(cx),
+ }
+ }
+}
crates/remowt-client/src/lib.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::env;3use std::path::PathBuf;4use std::sync::{Arc, Mutex};56use anyhow::{anyhow, bail, ensure, Context as _, Result};7use bifrostlink::declarative::RemoteEndpoints;8use bifrostlink::{Remote, Rpc, Rtt};9use bifrostlink_ports::unix_socket::from_socket;10use camino::{Utf8Path, Utf8PathBuf};11use remowt_link_shared::plugin::PluginEndpointsClient;12use remowt_link_shared::port::child_port;13use remowt_link_shared::{Address, BifConfig};14use russh::client::{connect, Config, Handle, Handler, Msg, Session};15use russh::keys::agent::client::AgentClient;16use russh::keys::agent::AgentIdentity;17use russh::keys::check_known_hosts;18use russh::keys::ssh_key::PublicKey;19use russh::Channel;20use tempfile::TempDir;21use tokio::net::UnixListener;22use tokio::sync::oneshot::{self, channel};23use tokio::{24 fs,25 io::{AsyncReadExt as _, AsyncWriteExt as _},26};27use tracing::{debug, error};28use uuid::Uuid;2930use self::port::channel_port;31use self::subprocess::RemowtChild;3233pub mod editor;34mod forwarded;35mod port;36#[cfg(feature = "shell")]37mod shell;38mod subprocess;3940pub use forwarded::{RemowtListener, RemowtStream};41#[cfg(feature = "shell")]42pub use shell::{RemowtShell, RemowtShellResizer};4344type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;4546fn sh_quote(s: impl AsRef<str>) -> String {47 format!("'{}'", s.as_ref().replace('\'', "'\\''"))48}4950const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];5152pub struct AgentBundle {53 dir: PathBuf,54 hashes: HashMap<String, String>,55}5657impl AgentBundle {58 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {59 let dir = dir.into();60 let hashes_path = dir.join("hashes");61 let raw = std::fs::read_to_string(&hashes_path)62 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;63 let mut hashes = HashMap::new();64 for line in raw.lines() {65 let line = line.trim();66 if line.is_empty() {67 continue;68 }69 let (arch, hash) = line70 .split_once(char::is_whitespace)71 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;72 hashes.insert(arch.to_owned(), hash.trim().to_owned());73 }74 ensure!(75 !hashes.is_empty(),76 "agent bundle {} has no hashes",77 dir.display()78 );79 Ok(Self { dir, hashes })80 }8182 fn binary(&self, arch: &str) -> PathBuf {83 self.dir.join(format!("remowt-agent-{arch}"))84 }8586 fn local_binary(&self) -> Result<PathBuf> {87 let arch = env::consts::ARCH;88 let path = self.binary(arch);89 ensure!(90 path.is_file(),91 "no local remowt-agent build for arch {arch} in bundle {}",92 self.dir.display()93 );94 Ok(path)95 }96}9798async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {99 let ch = sess.channel_open_session().await?;100 ch.exec(true, cmd).await?;101102 let mut child = RemowtChild::from_exec(ch);103 drop(child.stdin);104105 let mut out = Vec::new();106 let mut err = Vec::new();107 tokio::try_join!(108 child.stdout.read_to_end(&mut out),109 child.stderr.read_to_end(&mut err),110 )?;111 if !err.is_empty() {112 error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());113 }114 let code = child.exit.await.ok().flatten();115 Ok((code, out))116}117118async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {119 let (code, mut out) = run(sess, cmd).await?;120 ensure!(121 code == Some(0),122 "remote command failed (exit {code:?}): {cmd}"123 );124 if !out.is_empty() {125 ensure!(126 out.ends_with(b"\n"),127 "remote command was not newline-terminated: {cmd}: {out:?}"128 );129 out.pop();130 }131 String::from_utf8(out).context("expected utf8 output for command")132}133134async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {135 debug!("uname -a");136 let arch = run_string_ok(sess, "uname -m").await?;137 let hash = bundle138 .hashes139 .get(&arch)140 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;141142 debug!("get dir");143 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")144 .await?145 .to_owned();146 let dir = if cache.is_empty() {147 let home = run_string_ok(sess, "echo \"$HOME\"").await?;148 ensure!(149 !home.is_empty(),150 "remote $HOME and $XDG_CACHE_HOME both empty"151 );152 Utf8PathBuf::from(home).join(".cache/remowt")153 } else {154 Utf8PathBuf::from(cache).join("remowt")155 };156 let path = dir.join(hash);157158 debug!("presence");159 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;160 if present != Some(0) {161 let bin = bundle.binary(&arch);162 debug!("read");163 let bytes = fs::read(&bin)164 .await165 .with_context(|| format!("reading agent binary {}", bin.display()))?;166 debug!("upload");167 upload_agent(sess, &dir, &path, bytes).await?;168 }169 Ok(path)170}171172async fn upload_agent(173 sess: &Handle<SshHandler>,174 dir: &Utf8Path,175 path: &Utf8Path,176 bytes: Vec<u8>,177) -> Result<()> {178 debug!("mkdirp");179 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;180181 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));182 let ch = sess.channel_open_session().await?;183 debug!("cat");184 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;185186 let mut child = RemowtChild::from_exec(ch);187 child188 .stdin189 .write_all(&bytes)190 .await191 .context("sending agent binary")?;192 child193 .stdin194 .shutdown()195 .await196 .context("sending agent binary")?;197 let code = child.wait().await;198 ensure!(code == Some(0), "agent upload failed (exit {code:?})");199200 debug!("chmod");201 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;202 run_string_ok(203 sess,204 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),205 )206 .await?;207 Ok(())208}209210async fn detect_escalation(211 sess: &Handle<SshHandler>,212) -> Result<(&'static str, &'static [&'static str])> {213 for (tool, flags) in ESCALATORS {214 // `tool` is a fixed identifier (no metacharacters), safe to interpolate.215 let (code, _) = run(sess, &format!("command -v {tool}")).await?;216 if code == Some(0) {217 return Ok((tool, flags));218 }219 }220 bail!("no escalation tool found on remote")221}222223fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {224 let mut parts = vec![tool.to_owned()];225 parts.extend(flags.iter().map(|f| f.to_string()));226 parts.push(sh_quote(agent_path));227 parts.push("real-agent".to_owned());228 parts.push("--privileged".to_owned());229 if let Some(p) = path {230 parts.push("--path".to_owned());231 parts.push(sh_quote(p));232 }233 parts.join(" ")234}235236fn find_in_path(name: &str) -> Option<std::path::PathBuf> {237 let path = env::var_os("PATH")?;238 env::split_paths(&path)239 .map(|dir| dir.join(name))240 .find(|p| p.is_file())241}242243pub struct SshHandler {244 host: String,245 port: u16,246 subs: Subs,247}248impl Handler for SshHandler {249 type Error = russh::Error;250 async fn check_server_key(251 &mut self,252 server_public_key: &PublicKey,253 ) -> Result<bool, Self::Error> {254 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)255 }256 async fn server_channel_open_forwarded_streamlocal(257 &mut self,258 channel: Channel<Msg>,259 socket_path: &str,260 _session: &mut Session,261 ) -> Result<(), Self::Error> {262 let Some(ch) = self263 .subs264 .lock()265 .expect("lock")266 .remove(&Utf8PathBuf::from(socket_path))267 else {268 return Err(russh::Error::WrongChannel);269 };270 let _ = ch.send(channel);271 Ok(())272 }273}274275enum Transport {276 Ssh {277 sess: Arc<Handle<SshHandler>>,278 subs: Subs,279 runtime_dir: Utf8PathBuf,280 agent_path: Utf8PathBuf,281 },282 Local {283 agent_path: PathBuf,284 runtime_dir: Utf8PathBuf,285 },286}287288pub struct Remowt {289 transport: Transport,290 rpc: Rpc<BifConfig>,291 elevated: tokio::sync::OnceCell<()>,292 children: Mutex<Vec<tokio::process::Child>>,293 _runtime_tmp: Option<TempDir>,294}295296pub type RemowtRemote = Remote<BifConfig>;297298impl Remowt {299 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {300 let conf = russh_config::parse_home(host)?;301 let port = conf.host_config.port.or(conf.port).unwrap_or(22);302 let hostname = conf303 .host_config304 .hostname305 .clone()306 .unwrap_or_else(|| conf.host_name.clone());307 let user = conf308 .user309 .clone()310 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));311312 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));313 let mut sess = connect(314 Arc::new(Config::default()),315 (hostname.clone(), port),316 SshHandler {317 host: hostname,318 port,319 subs: subs.clone(),320 },321 )322 .await?;323324 let mut agent = AgentClient::connect_env().await?;325 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();326 let mut authenticated = false;327 for ident in agent.request_identities().await? {328 let AgentIdentity::PublicKey { key, .. } = ident else {329 continue;330 };331 if sess332 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)333 .await?334 .success()335 {336 authenticated = true;337 break;338 }339 }340 ensure!(authenticated, "ssh authentication failed");341342 let sess = Arc::new(sess);343344 debug!("deploying agent");345 let agent_path = deploy_agent(&sess, bundle).await?;346347 debug!("runtime dir");348 let runtime_dir = remote_runtime_dir(&sess).await?;349 let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));350351 let (onetx, onerx) = channel();352 subs.lock().expect("lock").insert(primary.clone(), onetx);353 sess.streamlocal_forward(primary.clone()).await?;354355 let rpc = Rpc::<BifConfig>::new(Address::User);356357 // TODO: ensure no injection is possible in the socket path.358 let cmd_chan = sess.channel_open_session().await?;359 debug!("starting agent");360 cmd_chan361 .exec(362 true,363 format!(364 "{} real-agent --path={}",365 sh_quote(&agent_path),366 sh_quote(&primary)367 ),368 )369 .await?;370371 let port = channel_port(372 onerx373 .await374 .map_err(|_| anyhow!("agent never opened its channel"))?,375 );376 rpc.add_direct(Address::Agent, port, Rtt(0));377378 Ok(Self {379 transport: Transport::Ssh {380 sess,381 subs,382 runtime_dir,383 agent_path,384 },385 rpc,386 elevated: tokio::sync::OnceCell::new(),387 children: Mutex::new(Vec::new()),388 _runtime_tmp: None,389 })390 }391392 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {393 let agent_path = bundle.local_binary()?;394 let mut child = tokio::process::Command::new(&agent_path)395 .arg("real-agent")396 .stdin(std::process::Stdio::piped())397 .stdout(std::process::Stdio::piped())398 .kill_on_drop(true)399 .spawn()400 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;401 let stdin = child.stdin.take().expect("stdin piped");402 let stdout = child.stdout.take().expect("stdout piped");403404 let rpc = Rpc::<BifConfig>::new(Address::User);405 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));406407 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;408409 Ok(Self {410 transport: Transport::Local {411 agent_path,412 runtime_dir,413 },414 rpc,415 elevated: tokio::sync::OnceCell::new(),416 children: Mutex::new(vec![child]),417 _runtime_tmp: runtime_tmp,418 })419 }420421 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {422 match &self.transport {423 Transport::Ssh { sess, .. } => Some(sess.clone()),424 Transport::Local { .. } => None,425 }426 }427428 pub fn rpc(&self) -> Rpc<BifConfig> {429 self.rpc.clone()430 }431432 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {433 R::wrap(self.rpc.remote(Address::Agent))434 }435436 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {437 let client: PluginEndpointsClient<BifConfig> = self.endpoints();438 client439 .load_plugin(id, name.to_owned())440 .await?441 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))442 }443 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {444 self.ensure_elevated().await?;445 let client: PluginEndpointsClient<BifConfig> =446 PluginEndpointsClient::wrap(self.rpc.remote(Address::AgentPrivileged));447 client448 .load_plugin_path(id, path.to_owned())449 .await?450 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))451 }452 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {453 R::wrap(self.rpc.remote(Address::Plugin(id)))454 }455 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {456 self.ensure_elevated().await?;457 Ok(R::wrap(self.rpc.remote(Address::AgentPrivileged)))458 }459460 async fn ensure_elevated(&self) -> Result<()> {461 self.elevated462 .get_or_try_init(|| async {463 let port = match &self.transport {464 Transport::Ssh {465 sess, agent_path, ..466 } => {467 let (tool, flags) = detect_escalation(sess).await?;468 let ch = sess.channel_open_session().await?;469 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))470 .await?;471 channel_port(ch)472 }473 Transport::Local { agent_path, .. } => {474 let sock = self475 .runtime_dir()476 .join(format!("remowt-priv-{}.sock", Uuid::new_v4()));477 let _ = std::fs::remove_file(&sock);478 let listener = UnixListener::bind(&sock)?;479 let (tool, flags) = ESCALATORS480 .iter()481 .find(|(t, _)| find_in_path(t).is_some())482 .ok_or_else(|| anyhow!("no escalation tool found"))?;483 let child = tokio::process::Command::new(tool)484 .args(*flags)485 .arg(agent_path)486 .arg("real-agent")487 .arg("--privileged")488 .arg("--path")489 .arg(sock.as_str())490 .kill_on_drop(true)491 .spawn()?;492 self.children.lock().expect("lock").push(child);493 let (stream, _) = listener.accept().await?;494 let _ = std::fs::remove_file(&sock);495 from_socket(stream)496 }497 };498 self.rpc.add_direct(Address::AgentPrivileged, port, Rtt(0));499 anyhow::Ok(())500 })501 .await?;502 Ok(())503 }504505 pub async fn exec(&self, command: String) -> Result<RemowtChild> {506 let Some(sess) = self.ssh() else {507 bail!("exec should not be called on local")508 };509 let ch = sess.channel_open_session().await?;510 ch.exec(true, command).await?;511 Ok(RemowtChild::from_exec(ch))512 }513514 fn runtime_dir(&self) -> Utf8PathBuf {515 match &self.transport {516 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),517 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),518 }519 }520521 pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {522 match &self.transport {523 Transport::Ssh { sess, subs, .. } => {524 let (tx, rx) = oneshot::channel();525 subs.lock().expect("lock").insert(path.to_owned(), tx);526 sess.streamlocal_forward(path.to_owned()).await?;527 Ok(RemowtListener::Ssh(rx))528 }529 Transport::Local { .. } => {530 let _ = std::fs::remove_file(path);531 Ok(RemowtListener::Local(532 UnixListener::bind(path)?,533 path.to_owned(),534 ))535 }536 }537 }538}539540fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {541 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {542 if !dir.is_empty() {543 return Ok((Utf8PathBuf::from(dir), None));544 }545 }546 let tmp = tempfile::Builder::new()547 .prefix("remowt.")548 .rand_bytes(12)549 .tempdir()?;550 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())551 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;552 Ok((dir, Some(tmp)))553}554555async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {556 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;557 let dir = dir.trim();558 if dir.is_empty() {559 remote_mktemp(sess).await560 } else {561 Ok(Utf8PathBuf::from(dir))562 }563}564565async fn remote_mktemp(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {566 let mut cmd_chan = sess.channel_open_session().await?;567 cmd_chan568 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")569 .await?;570 let mut stdout = vec![];571 loop {572 let Some(msg) = cmd_chan.wait().await else {573 bail!("unexpected channel end");574 };575 match msg {576 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),577 russh::ChannelMsg::ExitStatus { exit_status } => {578 if exit_status != 0 {579 bail!("mktemp failed");580 }581 break;582 }583 _ => {}584 }585 }586 ensure!(stdout.ends_with(b"\n"));587 stdout.pop();588 Ok(Utf8PathBuf::from(String::from_utf8(stdout)?))589}crates/remowt-client/src/port.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/port.rs
@@ -0,0 +1,52 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use russh::{Channel, ChannelStream};
+use russh::client::Msg;
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};
+use tokio::join;
+use tracing::error;
+
+async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
+ let len = srx.read_u32().await?;
+ let mut buf = BytesMut::zeroed(len as usize);
+ srx.read_exact(&mut buf).await?;
+ Ok(buf)
+}
+async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
+ stx.write_u32(value.len().try_into().expect("can't be larger"))
+ .await?;
+ stx.write_all(&value).await?;
+ Ok(())
+}
+
+pub fn channel_port(ch: Channel<Msg>) -> Port {
+ Port::new(move |mut rx, tx| async move {
+ let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
+ let srx_task = async move {
+ loop {
+ match read(&mut srx).await {
+ Ok(buf) => {
+ if tx.send(buf.freeze()).is_err() {
+ break;
+ }
+ }
+ Err(e) => {
+ error!("channel read failed: {e}");
+ break;
+ }
+ }
+ }
+ };
+ let stx_task = async move {
+ while let Some(value) = rx.recv().await {
+ if let Err(e) = write(&mut stx, value).await {
+ error!("channel write failed: {e}");
+ break;
+ }
+ }
+ };
+ join!(srx_task, stx_task);
+ })
+}
crates/remowt-client/src/shell.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/shell.rs
@@ -0,0 +1,60 @@
+use anyhow::{anyhow, Result};
+use bifrostlink::declarative::RemoteEndpoints as _;
+use bifrostlink::Remote;
+use remowt_endpoints::pty::{PtyClient, ShellId};
+use remowt_link_shared::{Address, BifConfig};
+use uuid::Uuid;
+
+use crate::forwarded::RemowtStream;
+use crate::Remowt;
+
+pub struct RemowtShell {
+ pub id: ShellId,
+ pub stream: RemowtStream,
+ remote: Remote<BifConfig>,
+}
+impl RemowtShell {
+ pub fn resizer(&self) -> RemowtShellResizer {
+ RemowtShellResizer {
+ remote: self.remote.clone(),
+ id: self.id,
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct RemowtShellResizer {
+ remote: Remote<BifConfig>,
+ id: ShellId,
+}
+
+impl RemowtShellResizer {
+ pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
+ PtyClient::wrap(self.remote.clone())
+ .resize(self.id, cols, rows)
+ .await?
+ .map_err(|e| anyhow!("failed to resize remote shell: {e}"))
+ }
+}
+
+impl Remowt {
+ pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {
+ let sock = self
+ .runtime_dir()
+ .join(format!("remowt-shell-{}.sock", Uuid::new_v4()));
+
+ let forwarded = self.forward_socket(&sock).await?;
+ let client: PtyClient<BifConfig> = self.endpoints();
+ let id = client
+ .open_shell(sock, term.to_owned(), cols, rows)
+ .await?
+ .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
+ let stream = forwarded.accept().await?;
+
+ Ok(RemowtShell {
+ id,
+ stream,
+ remote: self.rpc.remote(Address::Agent),
+ })
+ }
+}
crates/remowt-client/src/subprocess.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/subprocess.rs
@@ -0,0 +1,84 @@
+use bytes::Bytes;
+use russh::client::Msg;
+use russh::{Channel, ChannelMsg};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
+use tokio::sync::oneshot;
+
+const BUF: usize = 64 * 1024;
+
+pub struct RemowtChild {
+ pub stdin: DuplexStream,
+ pub stdout: DuplexStream,
+ pub stderr: DuplexStream,
+ pub exit: oneshot::Receiver<Option<u32>>,
+}
+
+impl RemowtChild {
+ /// Manage channel returned by russh exec().
+ pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
+ let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
+ let (mut out_w, stdout) = tokio::io::duplex(BUF);
+ let (mut err_w, stderr) = tokio::io::duplex(BUF);
+ let (exit_tx, exit) = oneshot::channel();
+
+ tokio::spawn(async move {
+ let (mut read, write) = ch.split();
+
+ // Forward our stdin to the channel, signalling EOF when it closes.
+ let stdin_pump = tokio::spawn(async move {
+ let mut buf = vec![0u8; BUF];
+ loop {
+ match stdin_r.read(&mut buf).await {
+ Ok(0) | Err(_) => break,
+ Ok(n) => {
+ if write
+ .data_bytes(Bytes::copy_from_slice(&buf[..n]))
+ .await
+ .is_err()
+ {
+ return;
+ }
+ }
+ }
+ }
+ let _ = write.eof().await;
+ });
+
+ let mut code = None;
+ while let Some(msg) = read.wait().await {
+ match msg {
+ ChannelMsg::Data { data } => {
+ if out_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExtendedData { data, .. } => {
+ if err_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
+ _ => {}
+ }
+ }
+
+ // The process is gone; stop waiting on stdin we'll never forward.
+ stdin_pump.abort();
+ let _ = out_w.shutdown().await;
+ let _ = err_w.shutdown().await;
+ let _ = exit_tx.send(code);
+ });
+
+ RemowtChild {
+ stdin,
+ stdout,
+ stderr,
+ exit,
+ }
+ }
+
+ /// Wait for the process to finish, returning its exit status.
+ pub async fn wait(self) -> Option<u32> {
+ self.exit.await.ok().flatten()
+ }
+}
crates/remowt-link-shared/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-link-shared/Cargo.toml
+++ b/crates/remowt-link-shared/Cargo.toml
@@ -11,6 +11,7 @@
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
-tokio = { workspace = true, features = ["fs"] }
+tokio = { workspace = true, features = ["fs", "io-util", "macros"] }
+tracing.workspace = true
remowt-ui-prompt.workspace = true
camino = { workspace = true, features = ["serde1"] }
crates/remowt-link-shared/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -1,14 +1,12 @@
-use std::future::Future;
-
-use bifrostlink::declarative::endpoints;
use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};
use bifrostlink::notification;
use bifrostlink::packet::OpaquePacketWrapper;
-use bifrostlink::{AddressT, Config};
+use bifrostlink::AddressT;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
pub mod editor;
+pub mod port;
#[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]
pub enum Address {
@@ -20,26 +18,6 @@
impl AddressT for Address {}
pub mod plugin;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum ElevateError {
- #[error("elevation failed: {0}")]
- Failed(String),
-}
-
-pub trait Elevator: Send + Sync {
- fn elevate(&self) -> impl Future<Output = Result<(), ElevateError>> + Send;
-}
-
-pub struct ElevateEndpoints<E>(pub E);
-
-#[endpoints(ns = 3)]
-impl<E: Elevator + 'static> ElevateEndpoints<E> {
- #[endpoints(id = 1)]
- async fn elevate(&self) -> Result<(), ElevateError> {
- self.0.elevate().await
- }
-}
#[derive(thiserror::Error, Debug)]
pub enum Error {
crates/remowt-link-shared/src/port.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-link-shared/src/port.rs
@@ -0,0 +1,54 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+
+/// Wire a length-prefixed duplex byte stream (e.g. a child process's
+/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
+/// length followed by that many payload bytes.
+pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
+where
+ R: AsyncRead + Unpin + Send + 'static,
+ W: AsyncWrite + Unpin + Send + 'static,
+{
+ Port::new(|mut rx, tx| async move {
+ let read_task = async move {
+ loop {
+ let len = match reader.read_u32().await {
+ Ok(len) => len,
+ Err(e) => {
+ tracing::error!("child read failed: {e}");
+ break;
+ }
+ };
+ let mut buf = BytesMut::zeroed(len as usize);
+ if let Err(e) = reader.read_exact(&mut buf).await {
+ tracing::error!("child read failed: {e}");
+ break;
+ }
+ if tx.send(buf.freeze()).is_err() {
+ break;
+ }
+ }
+ };
+ let write_task = async move {
+ while let Some(msg) = rx.recv().await {
+ if let Err(e) = write_frame(&mut writer, msg).await {
+ tracing::error!("child write failed: {e}");
+ break;
+ }
+ }
+ };
+ tokio::join!(read_task, write_task);
+ })
+}
+
+async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
+ let len = u32::try_from(msg.len())
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
+ writer.write_u32(len).await?;
+ writer.write_all(&msg).await?;
+ writer.flush().await?;
+ Ok(())
+}
crates/remowt-plugin/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-plugin/Cargo.toml
+++ b/crates/remowt-plugin/Cargo.toml
@@ -9,7 +9,6 @@
anyhow.workspace = true
bifrostlink.workspace = true
bifrostlink-ports.workspace = true
-bytes.workspace = true
remowt-link-shared.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = [
crates/remowt-plugin/src/host.rsdiffbeforeafterboth--- a/crates/remowt-plugin/src/host.rs
+++ b/crates/remowt-plugin/src/host.rs
@@ -1,14 +1,12 @@
use std::ffi::OsStr;
-use std::io;
use std::process::Stdio;
use std::sync::Mutex;
-use bifrostlink::{Port, Rpc, Rtt, WeakRpc};
-use bytes::{Bytes, BytesMut};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
-use tokio::process::{Child, ChildStdin, ChildStdout, Command};
+use bifrostlink::{Rpc, Rtt, WeakRpc};
+use tokio::process::{Child, Command};
use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};
+use remowt_link_shared::port::child_port;
use remowt_link_shared::{Address, BifConfig};
pub fn serve(rpc: &mut Rpc<BifConfig>) {
@@ -68,46 +66,4 @@
}
self.spawn(id, path)
}
-}
-
-fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {
- Port::new(|mut rx, tx| async move {
- let reader = async move {
- loop {
- let len = match stdout.read_u32().await {
- Ok(len) => len,
- Err(e) => {
- tracing::error!("plugin stdout read failed: {e}");
- break;
- }
- };
- let mut buf = BytesMut::zeroed(len as usize);
- if let Err(e) = stdout.read_exact(&mut buf).await {
- tracing::error!("plugin stdout read failed: {e}");
- break;
- }
- if tx.send(buf.freeze()).is_err() {
- break;
- }
- }
- };
- let writer = async move {
- while let Some(msg) = rx.recv().await {
- if let Err(e) = write_frame(&mut stdin, msg).await {
- tracing::error!("plugin stdin write failed: {e}");
- break;
- }
- }
- };
- tokio::join!(reader, writer);
- })
-}
-
-async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {
- let len = u32::try_from(msg.len())
- .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
- stdin.write_u32(len).await?;
- stdin.write_all(&msg).await?;
- stdin.flush().await?;
- Ok(())
}