From 7e5126608cefb37dfb62c6a79f4090569d7449ce Mon Sep 17 00:00:00 2001 From: Yaroslav Bolyukin Date: Sat, 13 Jun 2026 00:15:24 +0000 Subject: [PATCH] feat(client): proper support for local --- --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target /.direnv +/result +/result-* --- a/Cargo.lock +++ b/Cargo.lock @@ -2096,6 +2096,7 @@ "russh-config", "serde", "serde_json", + "tempfile", "tokio", "tracing", "uuid", @@ -2131,6 +2132,7 @@ "serde_json", "thiserror", "tokio", + "tracing", ] [[package]] @@ -2140,7 +2142,6 @@ "anyhow", "bifrostlink", "bifrostlink-ports", - "bytes", "remowt-link-shared", "serde_json", "tokio", --- a/cmds/remowt-agent/src/main.rs +++ b/cmds/remowt-agent/src/main.rs @@ -225,8 +225,6 @@ } fn main() -> anyhow::Result<()> { - // Log to stderr: `privileged-agent` uses stdout as the bifrost transport, - // so anything written there would corrupt the stream. tracing_subscriber::fmt() .with_writer(std::io::stderr) .without_time() --- a/cmds/remowt-ssh/Cargo.toml +++ b/cmds/remowt-ssh/Cargo.toml @@ -11,8 +11,15 @@ tracing-subscriber.workspace = true bifrostlink.workspace = true remowt-link-shared.workspace = true -remowt-client.workspace = true -tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] } +remowt-client = { workspace = true, features = ["shell"] } +tokio = { workspace = true, features = [ + "macros", + "fs", + "net", + "io-util", + "rt", + "signal", +] } nix = { workspace = true, features = ["term"] } anyhow.workspace = true bifrostlink-ports.workspace = true --- a/cmds/remowt-ssh/src/main.rs +++ b/cmds/remowt-ssh/src/main.rs @@ -19,11 +19,14 @@ use tokio::io::unix::AsyncFd; use tokio::io::{AsyncRead, ReadBuf}; use tokio::signal::unix::{signal, SignalKind}; -use tracing::info; +use tracing::debug; #[derive(Parser)] -struct Opts { - host: String, +enum Opts { + /// Connect to remote host with remowt agent. + Ssh { host: String }, + /// Connect to local host for testing the connectivity. + Local, } fn agents_dir() -> anyhow::Result { @@ -35,18 +38,27 @@ #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .without_time() + .init(); let opts = Opts::parse(); let bundle = AgentBundle::from_dir(agents_dir()?)?; - let conn = Remowt::connect(&opts.host, &bundle).await?; + let conn = match &opts { + Opts::Ssh { host } => Remowt::connect(host, &bundle).await?, + Opts::Local => Remowt::connect_local(&bundle).await?, + }; let mut rpc = conn.rpc(); serve_prompts( &mut rpc, PrependSourcePrompter { prompter: RofiPrompter, - source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))], + source: match opts { + Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))], + Opts::Local => vec![], + }, description: "".to_owned(), }, ); @@ -54,9 +66,9 @@ serve_editor(&mut rpc, SshEditor { sess }); } - info!("entering shell"); + debug!("entering shell"); run_shell(&conn).await?; - info!("shell ended"); + debug!("shell ended"); Ok(()) } --- a/crates/remowt-client/Cargo.toml +++ b/crates/remowt-client/Cargo.toml @@ -16,7 +16,18 @@ remowt-link-shared.workspace = true russh.workspace = true russh-config.workspace = true -tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] } +tempfile.workspace = true +tokio = { workspace = true, features = [ + "net", + "io-util", + "rt", + "sync", + "macros", + "process", +] } tracing.workspace = true uuid = { workspace = true, features = ["v4"] } -remowt-endpoints.workspace = true +remowt-endpoints = { workspace = true, optional = true } + +[features] +shell = ["dep:remowt-endpoints"] --- /dev/null +++ b/crates/remowt-client/src/forwarded.rs @@ -0,0 +1,79 @@ +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::{anyhow, Result}; +use camino::Utf8PathBuf; +use russh::client::Msg; +use russh::{Channel, ChannelStream}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::{UnixListener, UnixStream}; +use tokio::sync::oneshot; + +pub enum RemowtListener { + Ssh(oneshot::Receiver>), + Local(UnixListener, Utf8PathBuf), +} + +impl RemowtListener { + pub async fn accept(self) -> Result { + match self { + RemowtListener::Ssh(rx) => { + let ch = rx + .await + .map_err(|_| anyhow!("agent never connected the forwarded socket"))?; + Ok(RemowtStream::Ssh(ch.into_stream())) + } + RemowtListener::Local(listener, path) => { + let (stream, _) = listener.accept().await?; + let _ = std::fs::remove_file(&path); + Ok(RemowtStream::Local(stream)) + } + } + } +} + +pub enum RemowtStream { + Ssh(ChannelStream), + Local(UnixStream), +} + +impl AsyncRead for RemowtStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.get_mut() { + RemowtStream::Ssh(s) => Pin::new(s).poll_read(cx, buf), + RemowtStream::Local(s) => Pin::new(s).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for RemowtStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.get_mut() { + RemowtStream::Ssh(s) => Pin::new(s).poll_write(cx, buf), + RemowtStream::Local(s) => Pin::new(s).poll_write(cx, buf), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + RemowtStream::Ssh(s) => Pin::new(s).poll_flush(cx), + RemowtStream::Local(s) => Pin::new(s).poll_flush(cx), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + RemowtStream::Ssh(s) => Pin::new(s).poll_shutdown(cx), + RemowtStream::Local(s) => Pin::new(s).poll_shutdown(cx), + } + } +} --- a/crates/remowt-client/src/lib.rs +++ b/crates/remowt-client/src/lib.rs @@ -1,61 +1,53 @@ use std::collections::HashMap; +use std::env; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::{env, io}; use anyhow::{anyhow, bail, ensure, Context as _, Result}; use bifrostlink::declarative::RemoteEndpoints; -use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc}; +use bifrostlink::{Remote, Rpc, Rtt}; use bifrostlink_ports::unix_socket::from_socket; -use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; -use remowt_endpoints::{ - fs::Fs, - pty::{Pty, PtyClient, ShellId}, - systemd::Systemd, -}; use remowt_link_shared::plugin::PluginEndpointsClient; -use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator}; +use remowt_link_shared::port::child_port; +use remowt_link_shared::{Address, BifConfig}; use russh::client::{connect, Config, Handle, Handler, Msg, Session}; use russh::keys::agent::client::AgentClient; use russh::keys::agent::AgentIdentity; use russh::keys::check_known_hosts; use russh::keys::ssh_key::PublicKey; -use russh::{Channel, ChannelMsg, ChannelStream}; -use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf}; -use tokio::join; +use russh::Channel; +use tempfile::TempDir; use tokio::net::UnixListener; -use tokio::sync::mpsc; use tokio::sync::oneshot::{self, channel}; -use tracing::error; +use tokio::{ + fs, + io::{AsyncReadExt as _, AsyncWriteExt as _}, +}; +use tracing::{debug, error}; use uuid::Uuid; +use self::port::channel_port; +use self::subprocess::RemowtChild; + pub mod editor; +mod forwarded; +mod port; +#[cfg(feature = "shell")] +mod shell; +mod subprocess; -type Subs = Arc>>>>; +pub use forwarded::{RemowtListener, RemowtStream}; +#[cfg(feature = "shell")] +pub use shell::{RemowtShell, RemowtShellResizer}; -async fn read(srx: &mut ReadHalf>) -> io::Result { - let len = srx.read_u32().await?; - let mut buf = BytesMut::zeroed(len as usize); - srx.read_exact(&mut buf).await?; - Ok(buf) -} -async fn write(stx: &mut WriteHalf>, value: Bytes) -> io::Result<()> { - stx.write_u32(value.len().try_into().expect("can't be larger")) - .await?; - stx.write_all(&value).await?; - Ok(()) -} +type Subs = Arc>>>>; fn sh_quote(s: impl AsRef) -> String { format!("'{}'", s.as_ref().replace('\'', "'\\''")) } -const ESCALATORS: [(&str, &[&str]); 3] = [ - ("run0", &["--background=", "--pipe"]), - ("sudo", &[]), - ("doas", &[]), -]; +const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])]; pub struct AgentBundle { dir: PathBuf, @@ -90,26 +82,36 @@ fn binary(&self, arch: &str) -> PathBuf { self.dir.join(format!("remowt-agent-{arch}")) } + + fn local_binary(&self) -> Result { + let arch = env::consts::ARCH; + let path = self.binary(arch); + ensure!( + path.is_file(), + "no local remowt-agent build for arch {arch} in bundle {}", + self.dir.display() + ); + Ok(path) + } } async fn run(sess: &Handle, cmd: &str) -> Result<(Option, Vec)> { - let mut ch = sess.channel_open_session().await?; + let ch = sess.channel_open_session().await?; ch.exec(true, cmd).await?; + + let mut child = RemowtChild::from_exec(ch); + drop(child.stdin); + let mut out = Vec::new(); - let mut code = None; - while let Some(msg) = ch.wait().await { - match msg { - ChannelMsg::Data { data } => out.extend(data.as_ref()), - ChannelMsg::ExtendedData { data, .. } => { - error!( - "remote stderr: {}", - String::from_utf8_lossy(data.as_ref()).trim() - ); - } - ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status), - _ => {} - } + let mut err = Vec::new(); + tokio::try_join!( + child.stdout.read_to_end(&mut out), + child.stderr.read_to_end(&mut err), + )?; + if !err.is_empty() { + error!("remote stderr: {}", String::from_utf8_lossy(&err).trim()); } + let code = child.exit.await.ok().flatten(); Ok((code, out)) } @@ -119,21 +121,27 @@ code == Some(0), "remote command failed (exit {code:?}): {cmd}" ); - ensure!(out.ends_with(b"\n")); - out.pop(); + if !out.is_empty() { + ensure!( + out.ends_with(b"\n"), + "remote command was not newline-terminated: {cmd}: {out:?}" + ); + out.pop(); + } String::from_utf8(out).context("expected utf8 output for command") } async fn deploy_agent(sess: &Handle, bundle: &AgentBundle) -> Result { + debug!("uname -a"); let arch = run_string_ok(sess, "uname -m").await?; let hash = bundle .hashes .get(&arch) .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?; + debug!("get dir"); let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"") .await? - .trim() .to_owned(); let dir = if cache.is_empty() { let home = run_string_ok(sess, "echo \"$HOME\"").await?; @@ -141,17 +149,21 @@ !home.is_empty(), "remote $HOME and $XDG_CACHE_HOME both empty" ); - Utf8PathBuf::from(home).join("cache/remowt") + Utf8PathBuf::from(home).join(".cache/remowt") } else { Utf8PathBuf::from(cache).join("remowt") }; let path = dir.join(hash); + debug!("presence"); let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?; if present != Some(0) { let bin = bundle.binary(&arch); - let bytes = std::fs::read(&bin) + debug!("read"); + let bytes = fs::read(&bin) + .await .with_context(|| format!("reading agent binary {}", bin.display()))?; + debug!("upload"); upload_agent(sess, &dir, &path, bytes).await?; } Ok(path) @@ -163,29 +175,29 @@ path: &Utf8Path, bytes: Vec, ) -> Result<()> { + debug!("mkdirp"); run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?; - let tmp = path.join(format!("tmp.{}", Uuid::new_v4())); + let tmp = dir.join(format!("tmp.{}", Uuid::new_v4())); let ch = sess.channel_open_session().await?; + debug!("cat"); ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?; - ch.data_bytes(bytes).await?; - ch.eof().await?; - let mut ch = ch; - let mut code = None; - while let Some(msg) = ch.wait().await { - match msg { - ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status), - ChannelMsg::ExtendedData { data, .. } => { - error!( - "agent upload: {}", - String::from_utf8_lossy(data.as_ref()).trim() - ); - } - _ => {} - } - } + + let mut child = RemowtChild::from_exec(ch); + child + .stdin + .write_all(&bytes) + .await + .context("sending agent binary")?; + child + .stdin + .shutdown() + .await + .context("sending agent binary")?; + let code = child.wait().await; ensure!(code == Some(0), "agent upload failed (exit {code:?})"); + debug!("chmod"); run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?; run_string_ok( sess, @@ -205,7 +217,7 @@ return Ok((tool, flags)); } } - bail!("no escalation tool (run0/sudo/doas) found on remote") + bail!("no escalation tool found on remote") } fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String { @@ -226,36 +238,6 @@ env::split_paths(&path) .map(|dir| dir.join(name)) .find(|p| p.is_file()) -} - -fn port_from_channel(ch: Channel) -> Port { - Port::new(move |mut rx, tx| async move { - let (mut srx, mut stx) = tokio::io::split(ch.into_stream()); - let srx_task = async move { - loop { - match read(&mut srx).await { - Ok(buf) => { - if tx.send(buf.freeze()).is_err() { - break; - } - } - Err(e) => { - error!("channel read failed: {e}"); - break; - } - } - } - }; - let stx_task = async move { - while let Some(value) = rx.recv().await { - if let Err(e) = write(&mut stx, value).await { - error!("channel write failed: {e}"); - break; - } - } - }; - join!(srx_task, stx_task); - }) } pub struct SshHandler { @@ -286,56 +268,20 @@ return Err(russh::Error::WrongChannel); }; let _ = ch.send(channel); - Ok(()) - } -} - -struct SshElevator { - sess: Arc>, - rpc: WeakRpc, - agent_path: Utf8PathBuf, -} -impl Elevator for SshElevator { - async fn elevate(&self) -> Result<(), ElevateError> { - let fail = |e: String| ElevateError::Failed(e); - let (tool, flags) = detect_escalation(&self.sess) - .await - .map_err(|e| fail(e.to_string()))?; - let ch = self - .sess - .channel_open_session() - .await - .map_err(|e| fail(e.to_string()))?; - ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None)) - .await - .map_err(|e| fail(e.to_string()))?; - let rpc = self - .rpc - .clone() - .upgrade() - .ok_or_else(|| fail("rpc is gone".to_owned()))?; - rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0)); Ok(()) } } -pub struct RemoteChild { - pub stdout: DuplexStream, - pub stderr: DuplexStream, - pub exit: oneshot::Receiver>, -} - enum Transport { Ssh { sess: Arc>, subs: Subs, - remote_dir: Utf8PathBuf, + runtime_dir: Utf8PathBuf, agent_path: Utf8PathBuf, }, Local { - #[allow(dead_code)] - agent: Rpc, - agent_path: String, + agent_path: PathBuf, + runtime_dir: Utf8PathBuf, }, } @@ -344,44 +290,11 @@ rpc: Rpc, elevated: tokio::sync::OnceCell<()>, children: Mutex>, + _runtime_tmp: Option, } pub type RemowtRemote = Remote; -fn loopback() -> (Port, Port) { - let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::(); - let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::(); - let user = Port::new(move |mut rx, tx| async move { - loop { - tokio::select! { - msg = rx.recv() => match msg { - Some(msg) => if a2b_tx.send(msg).is_err() { break }, - None => break, - }, - msg = b2a_rx.recv() => match msg { - Some(msg) => if tx.send(msg).is_err() { break }, - None => break, - }, - } - } - }); - let agent = Port::new(move |mut rx, tx| async move { - loop { - tokio::select! { - msg = rx.recv() => match msg { - Some(msg) => if b2a_tx.send(msg).is_err() { break }, - None => break, - }, - msg = a2b_rx.recv() => match msg { - Some(msg) => if tx.send(msg).is_err() { break }, - None => break, - }, - } - } - }); - (user, agent) -} - impl Remowt { pub async fn connect(host: &str, bundle: &AgentBundle) -> Result { let conf = russh_config::parse_home(host)?; @@ -426,13 +339,14 @@ } ensure!(authenticated, "ssh authentication failed"); - // All remaining session ops take `&self`; share the handle. let sess = Arc::new(sess); + debug!("deploying agent"); let agent_path = deploy_agent(&sess, bundle).await?; - let remote_dir = remote_mktemp(&sess).await?; - let primary = remote_dir.join("primary.sock"); + debug!("runtime dir"); + let runtime_dir = remote_runtime_dir(&sess).await?; + let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4())); let (onetx, onerx) = channel(); subs.lock().expect("lock").insert(primary.clone(), onetx); @@ -442,6 +356,7 @@ // TODO: ensure no injection is possible in the socket path. let cmd_chan = sess.channel_open_session().await?; + debug!("starting agent"); cmd_chan .exec( true, @@ -453,7 +368,7 @@ ) .await?; - let port = port_from_channel( + let port = channel_port( onerx .await .map_err(|_| anyhow!("agent never opened its channel"))?, @@ -464,36 +379,42 @@ transport: Transport::Ssh { sess, subs, - remote_dir, + runtime_dir, agent_path, }, rpc, elevated: tokio::sync::OnceCell::new(), children: Mutex::new(Vec::new()), + _runtime_tmp: None, }) } - pub async fn connect_local(agent_path: &str) -> Result { - let (port_user, port_agent) = loopback(); + pub async fn connect_local(bundle: &AgentBundle) -> Result { + let agent_path = bundle.local_binary()?; + let mut child = tokio::process::Command::new(&agent_path) + .arg("real-agent") + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .kill_on_drop(true) + .spawn() + .with_context(|| format!("spawning agent binary {}", agent_path.display()))?; + let stdin = child.stdin.take().expect("stdin piped"); + let stdout = child.stdout.take().expect("stdout piped"); + let rpc = Rpc::::new(Address::User); - let mut agent = Rpc::::new(Address::Agent); + rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0)); - // Register handlers before wiring up the link (see the agent binary). - Fs::new().register_endpoints(&mut agent); - Systemd.register_endpoints(&mut agent); - Pty::new().register_endpoints(&mut agent); + let (runtime_dir, runtime_tmp) = local_runtime_dir()?; - agent.add_direct(Address::User, port_agent, Rtt(0)); - rpc.add_direct(Address::Agent, port_user, Rtt(0)); - Ok(Self { transport: Transport::Local { - agent, - agent_path: agent_path.to_owned(), + agent_path, + runtime_dir, }, rpc, elevated: tokio::sync::OnceCell::new(), - children: Mutex::new(Vec::new()), + children: Mutex::new(vec![child]), + _runtime_tmp: runtime_tmp, }) } @@ -547,24 +468,25 @@ let ch = sess.channel_open_session().await?; ch.exec(true, privileged_cmd(tool, flags, agent_path, None)) .await?; - port_from_channel(ch) + channel_port(ch) } Transport::Local { agent_path, .. } => { - let sock = env::temp_dir() - .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4())); + let sock = self + .runtime_dir() + .join(format!("remowt-priv-{}.sock", Uuid::new_v4())); let _ = std::fs::remove_file(&sock); let listener = UnixListener::bind(&sock)?; let (tool, flags) = ESCALATORS .iter() .find(|(t, _)| find_in_path(t).is_some()) - .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?; + .ok_or_else(|| anyhow!("no escalation tool found"))?; let child = tokio::process::Command::new(tool) .args(*flags) .arg(agent_path) .arg("real-agent") .arg("--privileged") .arg("--path") - .arg(sock.to_str().expect("temp path is utf-8")) + .arg(sock.as_str()) .kill_on_drop(true) .spawn()?; self.children.lock().expect("lock").push(child); @@ -580,138 +502,63 @@ Ok(()) } - pub async fn exec(&self, command: String) -> Result { + pub async fn exec(&self, command: String) -> Result { let Some(sess) = self.ssh() else { bail!("exec should not be called on local") }; let ch = sess.channel_open_session().await?; ch.exec(true, command).await?; - - let (mut out_w, stdout) = tokio::io::duplex(64 * 1024); - let (mut err_w, stderr) = tokio::io::duplex(64 * 1024); - let (exit_tx, exit) = oneshot::channel(); - - tokio::spawn(async move { - let mut ch = ch; - let mut code = None; - while let Some(msg) = ch.wait().await { - match msg { - ChannelMsg::Data { data } => { - if out_w.write_all(&data).await.is_err() { - break; - } - } - ChannelMsg::ExtendedData { data, .. } => { - if err_w.write_all(&data).await.is_err() { - break; - } - } - ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status), - _ => {} - } - } - let _ = out_w.shutdown().await; - let _ = err_w.shutdown().await; - let _ = exit_tx.send(code); - }); - - Ok(RemoteChild { - stdout, - stderr, - exit, - }) + Ok(RemowtChild::from_exec(ch)) } - pub fn serve_elevate(&self) -> Result<()> { - let Transport::Ssh { - sess, agent_path, .. - } = &self.transport - else { - bail!("elevate should not be called on local") - }; - let mut rpc = self.rpc.clone(); - ElevateEndpoints(SshElevator { - sess: sess.clone(), - rpc: self.rpc.clone().downgrade(), - agent_path: agent_path.to_owned(), - }) - .register_endpoints(&mut rpc); - Ok(()) + fn runtime_dir(&self) -> Utf8PathBuf { + match &self.transport { + Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(), + Transport::Local { runtime_dir, .. } => runtime_dir.clone(), + } } - pub fn remote_dir(&self) -> Option<&Utf8Path> { + pub async fn forward_socket(&self, path: &Utf8Path) -> Result { match &self.transport { - Transport::Ssh { remote_dir, .. } => Some(remote_dir), - Transport::Local { .. } => None, + Transport::Ssh { sess, subs, .. } => { + let (tx, rx) = oneshot::channel(); + subs.lock().expect("lock").insert(path.to_owned(), tx); + sess.streamlocal_forward(path.to_owned()).await?; + Ok(RemowtListener::Ssh(rx)) + } + Transport::Local { .. } => { + let _ = std::fs::remove_file(path); + Ok(RemowtListener::Local( + UnixListener::bind(path)?, + path.to_owned(), + )) + } } - } - - pub async fn forward_socket( - &self, - remote_path: &Utf8Path, - ) -> Result>> { - let Transport::Ssh { sess, subs, .. } = &self.transport else { - bail!("forward_socket should not be called on local") - }; - let (tx, rx) = oneshot::channel(); - subs.lock() - .expect("lock") - .insert(remote_path.to_owned(), tx); - sess.streamlocal_forward(remote_path.to_owned()).await?; - Ok(rx) - } - - pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result { - let Transport::Ssh { remote_dir, .. } = &self.transport else { - bail!("open_shell should not be called on local") - }; - let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4())); - - let rx = self.forward_socket(&sock).await?; - let client: PtyClient = self.endpoints(); - let id = client - .open_shell(sock, term.to_owned(), cols, rows) - .await? - .map_err(|e| anyhow!("agent failed to open shell: {e}"))?; - let ch = rx - .await - .map_err(|_| anyhow!("agent never connected the shell socket"))?; - - Ok(Shell { - id, - stream: ch.into_stream(), - remote: self.rpc.remote(Address::Agent), - }) } } -pub struct Shell { - pub id: ShellId, - pub stream: ChannelStream, - remote: Remote, -} - -impl Shell { - pub fn resizer(&self) -> ShellResizer { - ShellResizer { - remote: self.remote.clone(), - id: self.id, +fn local_runtime_dir() -> Result<(Utf8PathBuf, Option)> { + if let Ok(dir) = env::var("XDG_RUNTIME_DIR") { + if !dir.is_empty() { + return Ok((Utf8PathBuf::from(dir), None)); } } -} - -#[derive(Clone)] -pub struct ShellResizer { - remote: Remote, - id: ShellId, + let tmp = tempfile::Builder::new() + .prefix("remowt.") + .rand_bytes(12) + .tempdir()?; + let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned()) + .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?; + Ok((dir, Some(tmp))) } -impl ShellResizer { - pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> { - PtyClient::wrap(self.remote.clone()) - .resize(self.id, cols, rows) - .await? - .map_err(|e| anyhow!("failed to resize remote shell: {e}")) +async fn remote_runtime_dir(sess: &Handle) -> Result { + let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?; + let dir = dir.trim(); + if dir.is_empty() { + remote_mktemp(sess).await + } else { + Ok(Utf8PathBuf::from(dir)) } } --- /dev/null +++ b/crates/remowt-client/src/port.rs @@ -0,0 +1,52 @@ +use std::io; + +use bifrostlink::Port; +use bytes::{Bytes, BytesMut}; +use russh::{Channel, ChannelStream}; +use russh::client::Msg; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf}; +use tokio::join; +use tracing::error; + +async fn read(srx: &mut ReadHalf>) -> io::Result { + let len = srx.read_u32().await?; + let mut buf = BytesMut::zeroed(len as usize); + srx.read_exact(&mut buf).await?; + Ok(buf) +} +async fn write(stx: &mut WriteHalf>, value: Bytes) -> io::Result<()> { + stx.write_u32(value.len().try_into().expect("can't be larger")) + .await?; + stx.write_all(&value).await?; + Ok(()) +} + +pub fn channel_port(ch: Channel) -> Port { + Port::new(move |mut rx, tx| async move { + let (mut srx, mut stx) = tokio::io::split(ch.into_stream()); + let srx_task = async move { + loop { + match read(&mut srx).await { + Ok(buf) => { + if tx.send(buf.freeze()).is_err() { + break; + } + } + Err(e) => { + error!("channel read failed: {e}"); + break; + } + } + } + }; + let stx_task = async move { + while let Some(value) = rx.recv().await { + if let Err(e) = write(&mut stx, value).await { + error!("channel write failed: {e}"); + break; + } + } + }; + join!(srx_task, stx_task); + }) +} --- /dev/null +++ b/crates/remowt-client/src/shell.rs @@ -0,0 +1,60 @@ +use anyhow::{anyhow, Result}; +use bifrostlink::declarative::RemoteEndpoints as _; +use bifrostlink::Remote; +use remowt_endpoints::pty::{PtyClient, ShellId}; +use remowt_link_shared::{Address, BifConfig}; +use uuid::Uuid; + +use crate::forwarded::RemowtStream; +use crate::Remowt; + +pub struct RemowtShell { + pub id: ShellId, + pub stream: RemowtStream, + remote: Remote, +} +impl RemowtShell { + pub fn resizer(&self) -> RemowtShellResizer { + RemowtShellResizer { + remote: self.remote.clone(), + id: self.id, + } + } +} + +#[derive(Clone)] +pub struct RemowtShellResizer { + remote: Remote, + id: ShellId, +} + +impl RemowtShellResizer { + pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> { + PtyClient::wrap(self.remote.clone()) + .resize(self.id, cols, rows) + .await? + .map_err(|e| anyhow!("failed to resize remote shell: {e}")) + } +} + +impl Remowt { + pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result { + let sock = self + .runtime_dir() + .join(format!("remowt-shell-{}.sock", Uuid::new_v4())); + + let forwarded = self.forward_socket(&sock).await?; + let client: PtyClient = self.endpoints(); + let id = client + .open_shell(sock, term.to_owned(), cols, rows) + .await? + .map_err(|e| anyhow!("agent failed to open shell: {e}"))?; + let stream = forwarded.accept().await?; + + Ok(RemowtShell { + id, + stream, + remote: self.rpc.remote(Address::Agent), + }) + } +} --- /dev/null +++ b/crates/remowt-client/src/subprocess.rs @@ -0,0 +1,84 @@ +use bytes::Bytes; +use russh::client::Msg; +use russh::{Channel, ChannelMsg}; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream}; +use tokio::sync::oneshot; + +const BUF: usize = 64 * 1024; + +pub struct RemowtChild { + pub stdin: DuplexStream, + pub stdout: DuplexStream, + pub stderr: DuplexStream, + pub exit: oneshot::Receiver>, +} + +impl RemowtChild { + /// Manage channel returned by russh exec(). + pub(crate) fn from_exec(ch: Channel) -> Self { + let (stdin, mut stdin_r) = tokio::io::duplex(BUF); + let (mut out_w, stdout) = tokio::io::duplex(BUF); + let (mut err_w, stderr) = tokio::io::duplex(BUF); + let (exit_tx, exit) = oneshot::channel(); + + tokio::spawn(async move { + let (mut read, write) = ch.split(); + + // Forward our stdin to the channel, signalling EOF when it closes. + let stdin_pump = tokio::spawn(async move { + let mut buf = vec![0u8; BUF]; + loop { + match stdin_r.read(&mut buf).await { + Ok(0) | Err(_) => break, + Ok(n) => { + if write + .data_bytes(Bytes::copy_from_slice(&buf[..n])) + .await + .is_err() + { + return; + } + } + } + } + let _ = write.eof().await; + }); + + let mut code = None; + while let Some(msg) = read.wait().await { + match msg { + ChannelMsg::Data { data } => { + if out_w.write_all(&data).await.is_err() { + break; + } + } + ChannelMsg::ExtendedData { data, .. } => { + if err_w.write_all(&data).await.is_err() { + break; + } + } + ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status), + _ => {} + } + } + + // The process is gone; stop waiting on stdin we'll never forward. + stdin_pump.abort(); + let _ = out_w.shutdown().await; + let _ = err_w.shutdown().await; + let _ = exit_tx.send(code); + }); + + RemowtChild { + stdin, + stdout, + stderr, + exit, + } + } + + /// Wait for the process to finish, returning its exit status. + pub async fn wait(self) -> Option { + self.exit.await.ok().flatten() + } +} --- a/crates/remowt-link-shared/Cargo.toml +++ b/crates/remowt-link-shared/Cargo.toml @@ -11,6 +11,7 @@ serde = { workspace = true, features = ["derive"] } serde_json.workspace = true thiserror.workspace = true -tokio = { workspace = true, features = ["fs"] } +tokio = { workspace = true, features = ["fs", "io-util", "macros"] } +tracing.workspace = true remowt-ui-prompt.workspace = true camino = { workspace = true, features = ["serde1"] } --- a/crates/remowt-link-shared/src/lib.rs +++ b/crates/remowt-link-shared/src/lib.rs @@ -1,14 +1,12 @@ -use std::future::Future; - -use bifrostlink::declarative::endpoints; use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError}; use bifrostlink::notification; use bifrostlink::packet::OpaquePacketWrapper; -use bifrostlink::{AddressT, Config}; +use bifrostlink::AddressT; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; pub mod editor; +pub mod port; #[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)] pub enum Address { @@ -20,26 +18,6 @@ impl AddressT for Address {} pub mod plugin; - -#[derive(Serialize, Deserialize, Debug, thiserror::Error)] -pub enum ElevateError { - #[error("elevation failed: {0}")] - Failed(String), -} - -pub trait Elevator: Send + Sync { - fn elevate(&self) -> impl Future> + Send; -} - -pub struct ElevateEndpoints(pub E); - -#[endpoints(ns = 3)] -impl ElevateEndpoints { - #[endpoints(id = 1)] - async fn elevate(&self) -> Result<(), ElevateError> { - self.0.elevate().await - } -} #[derive(thiserror::Error, Debug)] pub enum Error { --- /dev/null +++ b/crates/remowt-link-shared/src/port.rs @@ -0,0 +1,54 @@ +use std::io; + +use bifrostlink::Port; +use bytes::{Bytes, BytesMut}; +use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; + +/// Wire a length-prefixed duplex byte stream (e.g. a child process's +/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32` +/// length followed by that many payload bytes. +pub fn child_port(mut reader: R, mut writer: W) -> Port +where + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, +{ + Port::new(|mut rx, tx| async move { + let read_task = async move { + loop { + let len = match reader.read_u32().await { + Ok(len) => len, + Err(e) => { + tracing::error!("child read failed: {e}"); + break; + } + }; + let mut buf = BytesMut::zeroed(len as usize); + if let Err(e) = reader.read_exact(&mut buf).await { + tracing::error!("child read failed: {e}"); + break; + } + if tx.send(buf.freeze()).is_err() { + break; + } + } + }; + let write_task = async move { + while let Some(msg) = rx.recv().await { + if let Err(e) = write_frame(&mut writer, msg).await { + tracing::error!("child write failed: {e}"); + break; + } + } + }; + tokio::join!(read_task, write_task); + }) +} + +async fn write_frame(writer: &mut W, msg: Bytes) -> io::Result<()> { + let len = u32::try_from(msg.len()) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?; + writer.write_u32(len).await?; + writer.write_all(&msg).await?; + writer.flush().await?; + Ok(()) +} --- a/crates/remowt-plugin/Cargo.toml +++ b/crates/remowt-plugin/Cargo.toml @@ -9,7 +9,6 @@ anyhow.workspace = true bifrostlink.workspace = true bifrostlink-ports.workspace = true -bytes.workspace = true remowt-link-shared.workspace = true serde_json.workspace = true tokio = { workspace = true, features = [ --- a/crates/remowt-plugin/src/host.rs +++ b/crates/remowt-plugin/src/host.rs @@ -1,14 +1,12 @@ use std::ffi::OsStr; -use std::io; use std::process::Stdio; use std::sync::Mutex; -use bifrostlink::{Port, Rpc, Rtt, WeakRpc}; -use bytes::{Bytes, BytesMut}; -use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; -use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use bifrostlink::{Rpc, Rtt, WeakRpc}; +use tokio::process::{Child, Command}; use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost}; +use remowt_link_shared::port::child_port; use remowt_link_shared::{Address, BifConfig}; pub fn serve(rpc: &mut Rpc) { @@ -68,46 +66,4 @@ } self.spawn(id, path) } -} - -fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port { - Port::new(|mut rx, tx| async move { - let reader = async move { - loop { - let len = match stdout.read_u32().await { - Ok(len) => len, - Err(e) => { - tracing::error!("plugin stdout read failed: {e}"); - break; - } - }; - let mut buf = BytesMut::zeroed(len as usize); - if let Err(e) = stdout.read_exact(&mut buf).await { - tracing::error!("plugin stdout read failed: {e}"); - break; - } - if tx.send(buf.freeze()).is_err() { - break; - } - } - }; - let writer = async move { - while let Some(msg) = rx.recv().await { - if let Err(e) = write_frame(&mut stdin, msg).await { - tracing::error!("plugin stdin write failed: {e}"); - break; - } - } - }; - tokio::join!(reader, writer); - }) -} - -async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> { - let len = u32::try_from(msg.len()) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?; - stdin.write_u32(len).await?; - stdin.write_all(&msg).await?; - stdin.flush().await?; - Ok(()) } -- gitstuff