difftreelog
feat(client) proper support for local
in: trunk
16 files changed
.gitignorediffbeforeafterboth--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
/target
/.direnv
+/result
+/result-*
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2096,6 +2096,7 @@
"russh-config",
"serde",
"serde_json",
+ "tempfile",
"tokio",
"tracing",
"uuid",
@@ -2131,6 +2132,7 @@
"serde_json",
"thiserror",
"tokio",
+ "tracing",
]
[[package]]
@@ -2140,7 +2142,6 @@
"anyhow",
"bifrostlink",
"bifrostlink-ports",
- "bytes",
"remowt-link-shared",
"serde_json",
"tokio",
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-agent/src/main.rs
+++ b/cmds/remowt-agent/src/main.rs
@@ -225,8 +225,6 @@
}
fn main() -> anyhow::Result<()> {
- // Log to stderr: `privileged-agent` uses stdout as the bifrost transport,
- // so anything written there would corrupt the stream.
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.without_time()
cmds/remowt-ssh/Cargo.tomldiffbeforeafterboth--- a/cmds/remowt-ssh/Cargo.toml
+++ b/cmds/remowt-ssh/Cargo.toml
@@ -11,8 +11,15 @@
tracing-subscriber.workspace = true
bifrostlink.workspace = true
remowt-link-shared.workspace = true
-remowt-client.workspace = true
-tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }
+remowt-client = { workspace = true, features = ["shell"] }
+tokio = { workspace = true, features = [
+ "macros",
+ "fs",
+ "net",
+ "io-util",
+ "rt",
+ "signal",
+] }
nix = { workspace = true, features = ["term"] }
anyhow.workspace = true
bifrostlink-ports.workspace = true
cmds/remowt-ssh/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -19,11 +19,14 @@
use tokio::io::unix::AsyncFd;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::signal::unix::{signal, SignalKind};
-use tracing::info;
+use tracing::debug;
#[derive(Parser)]
-struct Opts {
- host: String,
+enum Opts {
+ /// Connect to remote host with remowt agent.
+ Ssh { host: String },
+ /// Connect to local host for testing the connectivity.
+ Local,
}
fn agents_dir() -> anyhow::Result<PathBuf> {
@@ -35,18 +38,27 @@
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
- tracing_subscriber::fmt::init();
+ tracing_subscriber::fmt()
+ .with_writer(std::io::stderr)
+ .without_time()
+ .init();
let opts = Opts::parse();
let bundle = AgentBundle::from_dir(agents_dir()?)?;
- let conn = Remowt::connect(&opts.host, &bundle).await?;
+ let conn = match &opts {
+ Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,
+ Opts::Local => Remowt::connect_local(&bundle).await?,
+ };
let mut rpc = conn.rpc();
serve_prompts(
&mut rpc,
PrependSourcePrompter {
prompter: RofiPrompter,
- source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],
+ source: match opts {
+ Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
+ Opts::Local => vec![],
+ },
description: "".to_owned(),
},
);
@@ -54,9 +66,9 @@
serve_editor(&mut rpc, SshEditor { sess });
}
- info!("entering shell");
+ debug!("entering shell");
run_shell(&conn).await?;
- info!("shell ended");
+ debug!("shell ended");
Ok(())
}
crates/remowt-client/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-client/Cargo.toml
+++ b/crates/remowt-client/Cargo.toml
@@ -16,7 +16,18 @@
remowt-link-shared.workspace = true
russh.workspace = true
russh-config.workspace = true
-tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }
+tempfile.workspace = true
+tokio = { workspace = true, features = [
+ "net",
+ "io-util",
+ "rt",
+ "sync",
+ "macros",
+ "process",
+] }
tracing.workspace = true
uuid = { workspace = true, features = ["v4"] }
-remowt-endpoints.workspace = true
+remowt-endpoints = { workspace = true, optional = true }
+
+[features]
+shell = ["dep:remowt-endpoints"]
crates/remowt-client/src/forwarded.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/forwarded.rs
@@ -0,0 +1,79 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Result};
+use camino::Utf8PathBuf;
+use russh::client::Msg;
+use russh::{Channel, ChannelStream};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::net::{UnixListener, UnixStream};
+use tokio::sync::oneshot;
+
+pub enum RemowtListener {
+ Ssh(oneshot::Receiver<Channel<Msg>>),
+ Local(UnixListener, Utf8PathBuf),
+}
+
+impl RemowtListener {
+ pub async fn accept(self) -> Result<RemowtStream> {
+ match self {
+ RemowtListener::Ssh(rx) => {
+ let ch = rx
+ .await
+ .map_err(|_| anyhow!("agent never connected the forwarded socket"))?;
+ Ok(RemowtStream::Ssh(ch.into_stream()))
+ }
+ RemowtListener::Local(listener, path) => {
+ let (stream, _) = listener.accept().await?;
+ let _ = std::fs::remove_file(&path);
+ Ok(RemowtStream::Local(stream))
+ }
+ }
+ }
+}
+
+pub enum RemowtStream {
+ Ssh(ChannelStream<Msg>),
+ Local(UnixStream),
+}
+
+impl AsyncRead for RemowtStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_read(cx, buf),
+ RemowtStream::Local(s) => Pin::new(s).poll_read(cx, buf),
+ }
+ }
+}
+
+impl AsyncWrite for RemowtStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_write(cx, buf),
+ RemowtStream::Local(s) => Pin::new(s).poll_write(cx, buf),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_flush(cx),
+ RemowtStream::Local(s) => Pin::new(s).poll_flush(cx),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ RemowtStream::Ssh(s) => Pin::new(s).poll_shutdown(cx),
+ RemowtStream::Local(s) => Pin::new(s).poll_shutdown(cx),
+ }
+ }
+}
crates/remowt-client/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-client/src/lib.rs
+++ b/crates/remowt-client/src/lib.rs
@@ -1,61 +1,53 @@
use std::collections::HashMap;
+use std::env;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
-use std::{env, io};
use anyhow::{anyhow, bail, ensure, Context as _, Result};
use bifrostlink::declarative::RemoteEndpoints;
-use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};
+use bifrostlink::{Remote, Rpc, Rtt};
use bifrostlink_ports::unix_socket::from_socket;
-use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
-use remowt_endpoints::{
- fs::Fs,
- pty::{Pty, PtyClient, ShellId},
- systemd::Systemd,
-};
use remowt_link_shared::plugin::PluginEndpointsClient;
-use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator};
+use remowt_link_shared::port::child_port;
+use remowt_link_shared::{Address, BifConfig};
use russh::client::{connect, Config, Handle, Handler, Msg, Session};
use russh::keys::agent::client::AgentClient;
use russh::keys::agent::AgentIdentity;
use russh::keys::check_known_hosts;
use russh::keys::ssh_key::PublicKey;
-use russh::{Channel, ChannelMsg, ChannelStream};
-use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};
-use tokio::join;
+use russh::Channel;
+use tempfile::TempDir;
use tokio::net::UnixListener;
-use tokio::sync::mpsc;
use tokio::sync::oneshot::{self, channel};
-use tracing::error;
+use tokio::{
+ fs,
+ io::{AsyncReadExt as _, AsyncWriteExt as _},
+};
+use tracing::{debug, error};
use uuid::Uuid;
+use self::port::channel_port;
+use self::subprocess::RemowtChild;
+
pub mod editor;
+mod forwarded;
+mod port;
+#[cfg(feature = "shell")]
+mod shell;
+mod subprocess;
-type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
+pub use forwarded::{RemowtListener, RemowtStream};
+#[cfg(feature = "shell")]
+pub use shell::{RemowtShell, RemowtShellResizer};
-async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
- let len = srx.read_u32().await?;
- let mut buf = BytesMut::zeroed(len as usize);
- srx.read_exact(&mut buf).await?;
- Ok(buf)
-}
-async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
- stx.write_u32(value.len().try_into().expect("can't be larger"))
- .await?;
- stx.write_all(&value).await?;
- Ok(())
-}
+type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
fn sh_quote(s: impl AsRef<str>) -> String {
format!("'{}'", s.as_ref().replace('\'', "'\\''"))
}
-const ESCALATORS: [(&str, &[&str]); 3] = [
- ("run0", &["--background=", "--pipe"]),
- ("sudo", &[]),
- ("doas", &[]),
-];
+const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];
pub struct AgentBundle {
dir: PathBuf,
@@ -90,26 +82,36 @@
fn binary(&self, arch: &str) -> PathBuf {
self.dir.join(format!("remowt-agent-{arch}"))
}
+
+ fn local_binary(&self) -> Result<PathBuf> {
+ let arch = env::consts::ARCH;
+ let path = self.binary(arch);
+ ensure!(
+ path.is_file(),
+ "no local remowt-agent build for arch {arch} in bundle {}",
+ self.dir.display()
+ );
+ Ok(path)
+ }
}
async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {
- let mut ch = sess.channel_open_session().await?;
+ let ch = sess.channel_open_session().await?;
ch.exec(true, cmd).await?;
+
+ let mut child = RemowtChild::from_exec(ch);
+ drop(child.stdin);
+
let mut out = Vec::new();
- let mut code = None;
- while let Some(msg) = ch.wait().await {
- match msg {
- ChannelMsg::Data { data } => out.extend(data.as_ref()),
- ChannelMsg::ExtendedData { data, .. } => {
- error!(
- "remote stderr: {}",
- String::from_utf8_lossy(data.as_ref()).trim()
- );
- }
- ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
- _ => {}
- }
+ let mut err = Vec::new();
+ tokio::try_join!(
+ child.stdout.read_to_end(&mut out),
+ child.stderr.read_to_end(&mut err),
+ )?;
+ if !err.is_empty() {
+ error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());
}
+ let code = child.exit.await.ok().flatten();
Ok((code, out))
}
@@ -119,21 +121,27 @@
code == Some(0),
"remote command failed (exit {code:?}): {cmd}"
);
- ensure!(out.ends_with(b"\n"));
- out.pop();
+ if !out.is_empty() {
+ ensure!(
+ out.ends_with(b"\n"),
+ "remote command was not newline-terminated: {cmd}: {out:?}"
+ );
+ out.pop();
+ }
String::from_utf8(out).context("expected utf8 output for command")
}
async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {
+ debug!("uname -a");
let arch = run_string_ok(sess, "uname -m").await?;
let hash = bundle
.hashes
.get(&arch)
.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;
+ debug!("get dir");
let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")
.await?
- .trim()
.to_owned();
let dir = if cache.is_empty() {
let home = run_string_ok(sess, "echo \"$HOME\"").await?;
@@ -141,17 +149,21 @@
!home.is_empty(),
"remote $HOME and $XDG_CACHE_HOME both empty"
);
- Utf8PathBuf::from(home).join("cache/remowt")
+ Utf8PathBuf::from(home).join(".cache/remowt")
} else {
Utf8PathBuf::from(cache).join("remowt")
};
let path = dir.join(hash);
+ debug!("presence");
let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;
if present != Some(0) {
let bin = bundle.binary(&arch);
- let bytes = std::fs::read(&bin)
+ debug!("read");
+ let bytes = fs::read(&bin)
+ .await
.with_context(|| format!("reading agent binary {}", bin.display()))?;
+ debug!("upload");
upload_agent(sess, &dir, &path, bytes).await?;
}
Ok(path)
@@ -163,29 +175,29 @@
path: &Utf8Path,
bytes: Vec<u8>,
) -> Result<()> {
+ debug!("mkdirp");
run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;
- let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));
+ let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));
let ch = sess.channel_open_session().await?;
+ debug!("cat");
ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;
- ch.data_bytes(bytes).await?;
- ch.eof().await?;
- let mut ch = ch;
- let mut code = None;
- while let Some(msg) = ch.wait().await {
- match msg {
- ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
- ChannelMsg::ExtendedData { data, .. } => {
- error!(
- "agent upload: {}",
- String::from_utf8_lossy(data.as_ref()).trim()
- );
- }
- _ => {}
- }
- }
+
+ let mut child = RemowtChild::from_exec(ch);
+ child
+ .stdin
+ .write_all(&bytes)
+ .await
+ .context("sending agent binary")?;
+ child
+ .stdin
+ .shutdown()
+ .await
+ .context("sending agent binary")?;
+ let code = child.wait().await;
ensure!(code == Some(0), "agent upload failed (exit {code:?})");
+ debug!("chmod");
run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;
run_string_ok(
sess,
@@ -205,7 +217,7 @@
return Ok((tool, flags));
}
}
- bail!("no escalation tool (run0/sudo/doas) found on remote")
+ bail!("no escalation tool found on remote")
}
fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {
@@ -226,36 +238,6 @@
env::split_paths(&path)
.map(|dir| dir.join(name))
.find(|p| p.is_file())
-}
-
-fn port_from_channel(ch: Channel<Msg>) -> Port {
- Port::new(move |mut rx, tx| async move {
- let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
- let srx_task = async move {
- loop {
- match read(&mut srx).await {
- Ok(buf) => {
- if tx.send(buf.freeze()).is_err() {
- break;
- }
- }
- Err(e) => {
- error!("channel read failed: {e}");
- break;
- }
- }
- }
- };
- let stx_task = async move {
- while let Some(value) = rx.recv().await {
- if let Err(e) = write(&mut stx, value).await {
- error!("channel write failed: {e}");
- break;
- }
- }
- };
- join!(srx_task, stx_task);
- })
}
pub struct SshHandler {
@@ -286,56 +268,20 @@
return Err(russh::Error::WrongChannel);
};
let _ = ch.send(channel);
- Ok(())
- }
-}
-
-struct SshElevator {
- sess: Arc<Handle<SshHandler>>,
- rpc: WeakRpc<BifConfig>,
- agent_path: Utf8PathBuf,
-}
-impl Elevator for SshElevator {
- async fn elevate(&self) -> Result<(), ElevateError> {
- let fail = |e: String| ElevateError::Failed(e);
- let (tool, flags) = detect_escalation(&self.sess)
- .await
- .map_err(|e| fail(e.to_string()))?;
- let ch = self
- .sess
- .channel_open_session()
- .await
- .map_err(|e| fail(e.to_string()))?;
- ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))
- .await
- .map_err(|e| fail(e.to_string()))?;
- let rpc = self
- .rpc
- .clone()
- .upgrade()
- .ok_or_else(|| fail("rpc is gone".to_owned()))?;
- rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));
Ok(())
}
}
-pub struct RemoteChild {
- pub stdout: DuplexStream,
- pub stderr: DuplexStream,
- pub exit: oneshot::Receiver<Option<u32>>,
-}
-
enum Transport {
Ssh {
sess: Arc<Handle<SshHandler>>,
subs: Subs,
- remote_dir: Utf8PathBuf,
+ runtime_dir: Utf8PathBuf,
agent_path: Utf8PathBuf,
},
Local {
- #[allow(dead_code)]
- agent: Rpc<BifConfig>,
- agent_path: String,
+ agent_path: PathBuf,
+ runtime_dir: Utf8PathBuf,
},
}
@@ -344,44 +290,11 @@
rpc: Rpc<BifConfig>,
elevated: tokio::sync::OnceCell<()>,
children: Mutex<Vec<tokio::process::Child>>,
+ _runtime_tmp: Option<TempDir>,
}
pub type RemowtRemote = Remote<BifConfig>;
-fn loopback() -> (Port, Port) {
- let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();
- let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();
- let user = Port::new(move |mut rx, tx| async move {
- loop {
- tokio::select! {
- msg = rx.recv() => match msg {
- Some(msg) => if a2b_tx.send(msg).is_err() { break },
- None => break,
- },
- msg = b2a_rx.recv() => match msg {
- Some(msg) => if tx.send(msg).is_err() { break },
- None => break,
- },
- }
- }
- });
- let agent = Port::new(move |mut rx, tx| async move {
- loop {
- tokio::select! {
- msg = rx.recv() => match msg {
- Some(msg) => if b2a_tx.send(msg).is_err() { break },
- None => break,
- },
- msg = a2b_rx.recv() => match msg {
- Some(msg) => if tx.send(msg).is_err() { break },
- None => break,
- },
- }
- }
- });
- (user, agent)
-}
-
impl Remowt {
pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {
let conf = russh_config::parse_home(host)?;
@@ -426,13 +339,14 @@
}
ensure!(authenticated, "ssh authentication failed");
- // All remaining session ops take `&self`; share the handle.
let sess = Arc::new(sess);
+ debug!("deploying agent");
let agent_path = deploy_agent(&sess, bundle).await?;
- let remote_dir = remote_mktemp(&sess).await?;
- let primary = remote_dir.join("primary.sock");
+ debug!("runtime dir");
+ let runtime_dir = remote_runtime_dir(&sess).await?;
+ let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));
let (onetx, onerx) = channel();
subs.lock().expect("lock").insert(primary.clone(), onetx);
@@ -442,6 +356,7 @@
// TODO: ensure no injection is possible in the socket path.
let cmd_chan = sess.channel_open_session().await?;
+ debug!("starting agent");
cmd_chan
.exec(
true,
@@ -453,7 +368,7 @@
)
.await?;
- let port = port_from_channel(
+ let port = channel_port(
onerx
.await
.map_err(|_| anyhow!("agent never opened its channel"))?,
@@ -464,36 +379,42 @@
transport: Transport::Ssh {
sess,
subs,
- remote_dir,
+ runtime_dir,
agent_path,
},
rpc,
elevated: tokio::sync::OnceCell::new(),
children: Mutex::new(Vec::new()),
+ _runtime_tmp: None,
})
}
- pub async fn connect_local(agent_path: &str) -> Result<Self> {
- let (port_user, port_agent) = loopback();
+ pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {
+ let agent_path = bundle.local_binary()?;
+ let mut child = tokio::process::Command::new(&agent_path)
+ .arg("real-agent")
+ .stdin(std::process::Stdio::piped())
+ .stdout(std::process::Stdio::piped())
+ .kill_on_drop(true)
+ .spawn()
+ .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;
+ let stdin = child.stdin.take().expect("stdin piped");
+ let stdout = child.stdout.take().expect("stdout piped");
+
let rpc = Rpc::<BifConfig>::new(Address::User);
- let mut agent = Rpc::<BifConfig>::new(Address::Agent);
+ rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));
- // Register handlers before wiring up the link (see the agent binary).
- Fs::new().register_endpoints(&mut agent);
- Systemd.register_endpoints(&mut agent);
- Pty::new().register_endpoints(&mut agent);
+ let (runtime_dir, runtime_tmp) = local_runtime_dir()?;
- agent.add_direct(Address::User, port_agent, Rtt(0));
- rpc.add_direct(Address::Agent, port_user, Rtt(0));
-
Ok(Self {
transport: Transport::Local {
- agent,
- agent_path: agent_path.to_owned(),
+ agent_path,
+ runtime_dir,
},
rpc,
elevated: tokio::sync::OnceCell::new(),
- children: Mutex::new(Vec::new()),
+ children: Mutex::new(vec![child]),
+ _runtime_tmp: runtime_tmp,
})
}
@@ -547,24 +468,25 @@
let ch = sess.channel_open_session().await?;
ch.exec(true, privileged_cmd(tool, flags, agent_path, None))
.await?;
- port_from_channel(ch)
+ channel_port(ch)
}
Transport::Local { agent_path, .. } => {
- let sock = env::temp_dir()
- .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));
+ let sock = self
+ .runtime_dir()
+ .join(format!("remowt-priv-{}.sock", Uuid::new_v4()));
let _ = std::fs::remove_file(&sock);
let listener = UnixListener::bind(&sock)?;
let (tool, flags) = ESCALATORS
.iter()
.find(|(t, _)| find_in_path(t).is_some())
- .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;
+ .ok_or_else(|| anyhow!("no escalation tool found"))?;
let child = tokio::process::Command::new(tool)
.args(*flags)
.arg(agent_path)
.arg("real-agent")
.arg("--privileged")
.arg("--path")
- .arg(sock.to_str().expect("temp path is utf-8"))
+ .arg(sock.as_str())
.kill_on_drop(true)
.spawn()?;
self.children.lock().expect("lock").push(child);
@@ -580,138 +502,63 @@
Ok(())
}
- pub async fn exec(&self, command: String) -> Result<RemoteChild> {
+ pub async fn exec(&self, command: String) -> Result<RemowtChild> {
let Some(sess) = self.ssh() else {
bail!("exec should not be called on local")
};
let ch = sess.channel_open_session().await?;
ch.exec(true, command).await?;
-
- let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);
- let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);
- let (exit_tx, exit) = oneshot::channel();
-
- tokio::spawn(async move {
- let mut ch = ch;
- let mut code = None;
- while let Some(msg) = ch.wait().await {
- match msg {
- ChannelMsg::Data { data } => {
- if out_w.write_all(&data).await.is_err() {
- break;
- }
- }
- ChannelMsg::ExtendedData { data, .. } => {
- if err_w.write_all(&data).await.is_err() {
- break;
- }
- }
- ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
- _ => {}
- }
- }
- let _ = out_w.shutdown().await;
- let _ = err_w.shutdown().await;
- let _ = exit_tx.send(code);
- });
-
- Ok(RemoteChild {
- stdout,
- stderr,
- exit,
- })
+ Ok(RemowtChild::from_exec(ch))
}
- pub fn serve_elevate(&self) -> Result<()> {
- let Transport::Ssh {
- sess, agent_path, ..
- } = &self.transport
- else {
- bail!("elevate should not be called on local")
- };
- let mut rpc = self.rpc.clone();
- ElevateEndpoints(SshElevator {
- sess: sess.clone(),
- rpc: self.rpc.clone().downgrade(),
- agent_path: agent_path.to_owned(),
- })
- .register_endpoints(&mut rpc);
- Ok(())
+ fn runtime_dir(&self) -> Utf8PathBuf {
+ match &self.transport {
+ Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),
+ Transport::Local { runtime_dir, .. } => runtime_dir.clone(),
+ }
}
- pub fn remote_dir(&self) -> Option<&Utf8Path> {
+ pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {
match &self.transport {
- Transport::Ssh { remote_dir, .. } => Some(remote_dir),
- Transport::Local { .. } => None,
+ Transport::Ssh { sess, subs, .. } => {
+ let (tx, rx) = oneshot::channel();
+ subs.lock().expect("lock").insert(path.to_owned(), tx);
+ sess.streamlocal_forward(path.to_owned()).await?;
+ Ok(RemowtListener::Ssh(rx))
+ }
+ Transport::Local { .. } => {
+ let _ = std::fs::remove_file(path);
+ Ok(RemowtListener::Local(
+ UnixListener::bind(path)?,
+ path.to_owned(),
+ ))
+ }
}
- }
-
- pub async fn forward_socket(
- &self,
- remote_path: &Utf8Path,
- ) -> Result<oneshot::Receiver<Channel<Msg>>> {
- let Transport::Ssh { sess, subs, .. } = &self.transport else {
- bail!("forward_socket should not be called on local")
- };
- let (tx, rx) = oneshot::channel();
- subs.lock()
- .expect("lock")
- .insert(remote_path.to_owned(), tx);
- sess.streamlocal_forward(remote_path.to_owned()).await?;
- Ok(rx)
- }
-
- pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {
- let Transport::Ssh { remote_dir, .. } = &self.transport else {
- bail!("open_shell should not be called on local")
- };
- let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));
-
- let rx = self.forward_socket(&sock).await?;
- let client: PtyClient<BifConfig> = self.endpoints();
- let id = client
- .open_shell(sock, term.to_owned(), cols, rows)
- .await?
- .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
- let ch = rx
- .await
- .map_err(|_| anyhow!("agent never connected the shell socket"))?;
-
- Ok(Shell {
- id,
- stream: ch.into_stream(),
- remote: self.rpc.remote(Address::Agent),
- })
}
}
-pub struct Shell {
- pub id: ShellId,
- pub stream: ChannelStream<Msg>,
- remote: Remote<BifConfig>,
-}
-
-impl Shell {
- pub fn resizer(&self) -> ShellResizer {
- ShellResizer {
- remote: self.remote.clone(),
- id: self.id,
+fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
+ if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {
+ if !dir.is_empty() {
+ return Ok((Utf8PathBuf::from(dir), None));
}
}
-}
-
-#[derive(Clone)]
-pub struct ShellResizer {
- remote: Remote<BifConfig>,
- id: ShellId,
+ let tmp = tempfile::Builder::new()
+ .prefix("remowt.")
+ .rand_bytes(12)
+ .tempdir()?;
+ let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())
+ .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;
+ Ok((dir, Some(tmp)))
}
-impl ShellResizer {
- pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
- PtyClient::wrap(self.remote.clone())
- .resize(self.id, cols, rows)
- .await?
- .map_err(|e| anyhow!("failed to resize remote shell: {e}"))
+async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {
+ let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;
+ let dir = dir.trim();
+ if dir.is_empty() {
+ remote_mktemp(sess).await
+ } else {
+ Ok(Utf8PathBuf::from(dir))
}
}
crates/remowt-client/src/port.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/port.rs
@@ -0,0 +1,52 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use russh::{Channel, ChannelStream};
+use russh::client::Msg;
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};
+use tokio::join;
+use tracing::error;
+
+async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
+ let len = srx.read_u32().await?;
+ let mut buf = BytesMut::zeroed(len as usize);
+ srx.read_exact(&mut buf).await?;
+ Ok(buf)
+}
+async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
+ stx.write_u32(value.len().try_into().expect("can't be larger"))
+ .await?;
+ stx.write_all(&value).await?;
+ Ok(())
+}
+
+pub fn channel_port(ch: Channel<Msg>) -> Port {
+ Port::new(move |mut rx, tx| async move {
+ let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
+ let srx_task = async move {
+ loop {
+ match read(&mut srx).await {
+ Ok(buf) => {
+ if tx.send(buf.freeze()).is_err() {
+ break;
+ }
+ }
+ Err(e) => {
+ error!("channel read failed: {e}");
+ break;
+ }
+ }
+ }
+ };
+ let stx_task = async move {
+ while let Some(value) = rx.recv().await {
+ if let Err(e) = write(&mut stx, value).await {
+ error!("channel write failed: {e}");
+ break;
+ }
+ }
+ };
+ join!(srx_task, stx_task);
+ })
+}
crates/remowt-client/src/shell.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/shell.rs
@@ -0,0 +1,60 @@
+use anyhow::{anyhow, Result};
+use bifrostlink::declarative::RemoteEndpoints as _;
+use bifrostlink::Remote;
+use remowt_endpoints::pty::{PtyClient, ShellId};
+use remowt_link_shared::{Address, BifConfig};
+use uuid::Uuid;
+
+use crate::forwarded::RemowtStream;
+use crate::Remowt;
+
+pub struct RemowtShell {
+ pub id: ShellId,
+ pub stream: RemowtStream,
+ remote: Remote<BifConfig>,
+}
+impl RemowtShell {
+ pub fn resizer(&self) -> RemowtShellResizer {
+ RemowtShellResizer {
+ remote: self.remote.clone(),
+ id: self.id,
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct RemowtShellResizer {
+ remote: Remote<BifConfig>,
+ id: ShellId,
+}
+
+impl RemowtShellResizer {
+ pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
+ PtyClient::wrap(self.remote.clone())
+ .resize(self.id, cols, rows)
+ .await?
+ .map_err(|e| anyhow!("failed to resize remote shell: {e}"))
+ }
+}
+
+impl Remowt {
+ pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<RemowtShell> {
+ let sock = self
+ .runtime_dir()
+ .join(format!("remowt-shell-{}.sock", Uuid::new_v4()));
+
+ let forwarded = self.forward_socket(&sock).await?;
+ let client: PtyClient<BifConfig> = self.endpoints();
+ let id = client
+ .open_shell(sock, term.to_owned(), cols, rows)
+ .await?
+ .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
+ let stream = forwarded.accept().await?;
+
+ Ok(RemowtShell {
+ id,
+ stream,
+ remote: self.rpc.remote(Address::Agent),
+ })
+ }
+}
crates/remowt-client/src/subprocess.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-client/src/subprocess.rs
@@ -0,0 +1,84 @@
+use bytes::Bytes;
+use russh::client::Msg;
+use russh::{Channel, ChannelMsg};
+use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
+use tokio::sync::oneshot;
+
+const BUF: usize = 64 * 1024;
+
+pub struct RemowtChild {
+ pub stdin: DuplexStream,
+ pub stdout: DuplexStream,
+ pub stderr: DuplexStream,
+ pub exit: oneshot::Receiver<Option<u32>>,
+}
+
+impl RemowtChild {
+ /// Manage channel returned by russh exec().
+ pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {
+ let (stdin, mut stdin_r) = tokio::io::duplex(BUF);
+ let (mut out_w, stdout) = tokio::io::duplex(BUF);
+ let (mut err_w, stderr) = tokio::io::duplex(BUF);
+ let (exit_tx, exit) = oneshot::channel();
+
+ tokio::spawn(async move {
+ let (mut read, write) = ch.split();
+
+ // Forward our stdin to the channel, signalling EOF when it closes.
+ let stdin_pump = tokio::spawn(async move {
+ let mut buf = vec![0u8; BUF];
+ loop {
+ match stdin_r.read(&mut buf).await {
+ Ok(0) | Err(_) => break,
+ Ok(n) => {
+ if write
+ .data_bytes(Bytes::copy_from_slice(&buf[..n]))
+ .await
+ .is_err()
+ {
+ return;
+ }
+ }
+ }
+ }
+ let _ = write.eof().await;
+ });
+
+ let mut code = None;
+ while let Some(msg) = read.wait().await {
+ match msg {
+ ChannelMsg::Data { data } => {
+ if out_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExtendedData { data, .. } => {
+ if err_w.write_all(&data).await.is_err() {
+ break;
+ }
+ }
+ ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
+ _ => {}
+ }
+ }
+
+ // The process is gone; stop waiting on stdin we'll never forward.
+ stdin_pump.abort();
+ let _ = out_w.shutdown().await;
+ let _ = err_w.shutdown().await;
+ let _ = exit_tx.send(code);
+ });
+
+ RemowtChild {
+ stdin,
+ stdout,
+ stderr,
+ exit,
+ }
+ }
+
+ /// Wait for the process to finish, returning its exit status.
+ pub async fn wait(self) -> Option<u32> {
+ self.exit.await.ok().flatten()
+ }
+}
crates/remowt-link-shared/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-link-shared/Cargo.toml
+++ b/crates/remowt-link-shared/Cargo.toml
@@ -11,6 +11,7 @@
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
-tokio = { workspace = true, features = ["fs"] }
+tokio = { workspace = true, features = ["fs", "io-util", "macros"] }
+tracing.workspace = true
remowt-ui-prompt.workspace = true
camino = { workspace = true, features = ["serde1"] }
crates/remowt-link-shared/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-link-shared/src/lib.rs
+++ b/crates/remowt-link-shared/src/lib.rs
@@ -1,14 +1,12 @@
-use std::future::Future;
-
-use bifrostlink::declarative::endpoints;
use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};
use bifrostlink::notification;
use bifrostlink::packet::OpaquePacketWrapper;
-use bifrostlink::{AddressT, Config};
+use bifrostlink::AddressT;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
pub mod editor;
+pub mod port;
#[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]
pub enum Address {
@@ -20,26 +18,6 @@
impl AddressT for Address {}
pub mod plugin;
-
-#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
-pub enum ElevateError {
- #[error("elevation failed: {0}")]
- Failed(String),
-}
-
-pub trait Elevator: Send + Sync {
- fn elevate(&self) -> impl Future<Output = Result<(), ElevateError>> + Send;
-}
-
-pub struct ElevateEndpoints<E>(pub E);
-
-#[endpoints(ns = 3)]
-impl<E: Elevator + 'static> ElevateEndpoints<E> {
- #[endpoints(id = 1)]
- async fn elevate(&self) -> Result<(), ElevateError> {
- self.0.elevate().await
- }
-}
#[derive(thiserror::Error, Debug)]
pub enum Error {
crates/remowt-link-shared/src/port.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-link-shared/src/port.rs
@@ -0,0 +1,54 @@
+use std::io;
+
+use bifrostlink::Port;
+use bytes::{Bytes, BytesMut};
+use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+
+/// Wire a length-prefixed duplex byte stream (e.g. a child process's
+/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`
+/// length followed by that many payload bytes.
+pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port
+where
+ R: AsyncRead + Unpin + Send + 'static,
+ W: AsyncWrite + Unpin + Send + 'static,
+{
+ Port::new(|mut rx, tx| async move {
+ let read_task = async move {
+ loop {
+ let len = match reader.read_u32().await {
+ Ok(len) => len,
+ Err(e) => {
+ tracing::error!("child read failed: {e}");
+ break;
+ }
+ };
+ let mut buf = BytesMut::zeroed(len as usize);
+ if let Err(e) = reader.read_exact(&mut buf).await {
+ tracing::error!("child read failed: {e}");
+ break;
+ }
+ if tx.send(buf.freeze()).is_err() {
+ break;
+ }
+ }
+ };
+ let write_task = async move {
+ while let Some(msg) = rx.recv().await {
+ if let Err(e) = write_frame(&mut writer, msg).await {
+ tracing::error!("child write failed: {e}");
+ break;
+ }
+ }
+ };
+ tokio::join!(read_task, write_task);
+ })
+}
+
+async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {
+ let len = u32::try_from(msg.len())
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
+ writer.write_u32(len).await?;
+ writer.write_all(&msg).await?;
+ writer.flush().await?;
+ Ok(())
+}
crates/remowt-plugin/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-plugin/Cargo.toml
+++ b/crates/remowt-plugin/Cargo.toml
@@ -9,7 +9,6 @@
anyhow.workspace = true
bifrostlink.workspace = true
bifrostlink-ports.workspace = true
-bytes.workspace = true
remowt-link-shared.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = [
crates/remowt-plugin/src/host.rsdiffbeforeafterboth1use std::ffi::OsStr;1use std::ffi::OsStr;2use std::io;3use std::process::Stdio;2use std::process::Stdio;4use std::sync::Mutex;3use std::sync::Mutex;546use bifrostlink::{Port, Rpc, Rtt, WeakRpc};5use bifrostlink::{Rpc, Rtt, WeakRpc};7use bytes::{Bytes, BytesMut};8use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};9use tokio::process::{Child, ChildStdin, ChildStdout, Command};6use tokio::process::{Child, Command};10711use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};8use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};9use remowt_link_shared::port::child_port;12use remowt_link_shared::{Address, BifConfig};10use remowt_link_shared::{Address, BifConfig};131114pub fn serve(rpc: &mut Rpc<BifConfig>) {12pub fn serve(rpc: &mut Rpc<BifConfig>) {70 }68 }71}69}7273fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {74 Port::new(|mut rx, tx| async move {75 let reader = async move {76 loop {77 let len = match stdout.read_u32().await {78 Ok(len) => len,79 Err(e) => {80 tracing::error!("plugin stdout read failed: {e}");81 break;82 }83 };84 let mut buf = BytesMut::zeroed(len as usize);85 if let Err(e) = stdout.read_exact(&mut buf).await {86 tracing::error!("plugin stdout read failed: {e}");87 break;88 }89 if tx.send(buf.freeze()).is_err() {90 break;91 }92 }93 };94 let writer = async move {95 while let Some(msg) = rx.recv().await {96 if let Err(e) = write_frame(&mut stdin, msg).await {97 tracing::error!("plugin stdin write failed: {e}");98 break;99 }100 }101 };102 tokio::join!(reader, writer);103 })104}105106async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {107 let len = u32::try_from(msg.len())108 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;109 stdin.write_u32(len).await?;110 stdin.write_all(&msg).await?;111 stdin.flush().await?;112 Ok(())113}11470