git.delta.rocks / remowt / refs/commits / 7e5126608cef

difftreelog

feat(client) proper support for local

uzyqskstYaroslav Bolyukin6 days agoparent: #600e6ed.patch.diff
in: trunk

16 files changed

modified.gitignorediffbeforeafterboth
1/target1/target
2/.direnv2/.direnv
3/result
4/result-*
35
modifiedCargo.lockdiffbeforeafterboth
2096 "russh-config",2096 "russh-config",
2097 "serde",2097 "serde",
2098 "serde_json",2098 "serde_json",
2099 "tempfile",
2099 "tokio",2100 "tokio",
2100 "tracing",2101 "tracing",
2101 "uuid",2102 "uuid",
2131 "serde_json",2132 "serde_json",
2132 "thiserror",2133 "thiserror",
2133 "tokio",2134 "tokio",
2135 "tracing",
2134]2136]
21352137
2136[[package]]2138[[package]]
2140 "anyhow",2142 "anyhow",
2141 "bifrostlink",2143 "bifrostlink",
2142 "bifrostlink-ports",2144 "bifrostlink-ports",
2143 "bytes",
2144 "remowt-link-shared",2145 "remowt-link-shared",
2145 "serde_json",2146 "serde_json",
2146 "tokio",2147 "tokio",
modifiedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
225}225}
226226
227fn main() -> anyhow::Result<()> {227fn main() -> anyhow::Result<()> {
228 // Log to stderr: `privileged-agent` uses stdout as the bifrost transport,
229 // so anything written there would corrupt the stream.
230 tracing_subscriber::fmt()228 tracing_subscriber::fmt()
231 .with_writer(std::io::stderr)229 .with_writer(std::io::stderr)
232 .without_time()230 .without_time()
modifiedcmds/remowt-ssh/Cargo.tomldiffbeforeafterboth
11tracing-subscriber.workspace = true11tracing-subscriber.workspace = true
12bifrostlink.workspace = true12bifrostlink.workspace = true
13remowt-link-shared.workspace = true13remowt-link-shared.workspace = true
14remowt-client.workspace = true14remowt-client = { workspace = true, features = ["shell"] }
15tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }15tokio = { workspace = true, features = [
16 "macros",
17 "fs",
18 "net",
19 "io-util",
20 "rt",
21 "signal",
22] }
16nix = { workspace = true, features = ["term"] }23nix = { workspace = true, features = ["term"] }
17anyhow.workspace = true24anyhow.workspace = true
modifiedcmds/remowt-ssh/src/main.rsdiffbeforeafterboth
19use tokio::io::unix::AsyncFd;19use tokio::io::unix::AsyncFd;
20use tokio::io::{AsyncRead, ReadBuf};20use tokio::io::{AsyncRead, ReadBuf};
21use tokio::signal::unix::{signal, SignalKind};21use tokio::signal::unix::{signal, SignalKind};
22use tracing::info;22use tracing::debug;
2323
24#[derive(Parser)]24#[derive(Parser)]
25struct Opts {25enum Opts {
26 /// Connect to remote host with remowt agent.
26 host: String,27 Ssh { host: String },
27}28 /// Connect to local host for testing the connectivity.
29 Local,
30}
2831
29fn agents_dir() -> anyhow::Result<PathBuf> {32fn agents_dir() -> anyhow::Result<PathBuf> {
3538
36#[tokio::main(flavor = "current_thread")]39#[tokio::main(flavor = "current_thread")]
37async fn main() -> anyhow::Result<()> {40async fn main() -> anyhow::Result<()> {
38 tracing_subscriber::fmt::init();41 tracing_subscriber::fmt()
42 .with_writer(std::io::stderr)
43 .without_time()
44 .init();
39 let opts = Opts::parse();45 let opts = Opts::parse();
4046
41 let bundle = AgentBundle::from_dir(agents_dir()?)?;47 let bundle = AgentBundle::from_dir(agents_dir()?)?;
42 let conn = Remowt::connect(&opts.host, &bundle).await?;48 let conn = match &opts {
49 Opts::Ssh { host } => Remowt::connect(host, &bundle).await?,
50 Opts::Local => Remowt::connect_local(&bundle).await?,
51 };
43 let mut rpc = conn.rpc();52 let mut rpc = conn.rpc();
4453
45 serve_prompts(54 serve_prompts(
46 &mut rpc,55 &mut rpc,
47 PrependSourcePrompter {56 PrependSourcePrompter {
48 prompter: RofiPrompter,57 prompter: RofiPrompter,
49 source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],58 source: match opts {
59 Opts::Ssh { host } => vec![Source(Cow::Owned(format!("ssh host: {}", host)))],
60 Opts::Local => vec![],
61 },
50 description: "".to_owned(),62 description: "".to_owned(),
51 },63 },
52 );64 );
53 if let Some(sess) = conn.ssh() {65 if let Some(sess) = conn.ssh() {
54 serve_editor(&mut rpc, SshEditor { sess });66 serve_editor(&mut rpc, SshEditor { sess });
55 }67 }
5668
57 info!("entering shell");69 debug!("entering shell");
58 run_shell(&conn).await?;70 run_shell(&conn).await?;
59 info!("shell ended");71 debug!("shell ended");
6072
61 Ok(())73 Ok(())
62}74}
modifiedcrates/remowt-client/Cargo.tomldiffbeforeafterboth
16remowt-link-shared.workspace = true16remowt-link-shared.workspace = true
17russh.workspace = true17russh.workspace = true
18russh-config.workspace = true18russh-config.workspace = true
19tempfile.workspace = true
19tokio = { workspace = true, features = ["net", "io-util", "rt", "sync", "macros", "process"] }20tokio = { workspace = true, features = [
21 "net",
22 "io-util",
23 "rt",
24 "sync",
25 "macros",
26 "process",
27] }
20tracing.workspace = true28tracing.workspace = true
21uuid = { workspace = true, features = ["v4"] }29uuid = { workspace = true, features = ["v4"] }
22remowt-endpoints.workspace = true30remowt-endpoints = { workspace = true, optional = true }
31
32[features]
33shell = ["dep:remowt-endpoints"]
2334
addedcrates/remowt-client/src/forwarded.rsdiffbeforeafterboth

no changes

modifiedcrates/remowt-client/src/lib.rsdiffbeforeafterboth
1use std::collections::HashMap;1use std::collections::HashMap;
2use std::env;
2use std::path::PathBuf;3use std::path::PathBuf;
3use std::sync::{Arc, Mutex};4use std::sync::{Arc, Mutex};
4use std::{env, io};
55
6use anyhow::{anyhow, bail, ensure, Context as _, Result};6use anyhow::{anyhow, bail, ensure, Context as _, Result};
7use bifrostlink::declarative::RemoteEndpoints;7use bifrostlink::declarative::RemoteEndpoints;
8use bifrostlink::{Port, Remote, Rpc, Rtt, WeakRpc};8use bifrostlink::{Remote, Rpc, Rtt};
9use bifrostlink_ports::unix_socket::from_socket;9use bifrostlink_ports::unix_socket::from_socket;
10use bytes::{Bytes, BytesMut};
11use camino::{Utf8Path, Utf8PathBuf};10use camino::{Utf8Path, Utf8PathBuf};
12use remowt_endpoints::{
13 fs::Fs,
14 pty::{Pty, PtyClient, ShellId},
15 systemd::Systemd,
16};
17use remowt_link_shared::plugin::PluginEndpointsClient;11use remowt_link_shared::plugin::PluginEndpointsClient;
18use remowt_link_shared::{Address, BifConfig, ElevateEndpoints, ElevateError, Elevator};12use remowt_link_shared::port::child_port;
13use remowt_link_shared::{Address, BifConfig};
19use russh::client::{connect, Config, Handle, Handler, Msg, Session};14use russh::client::{connect, Config, Handle, Handler, Msg, Session};
20use russh::keys::agent::client::AgentClient;15use russh::keys::agent::client::AgentClient;
21use russh::keys::agent::AgentIdentity;16use russh::keys::agent::AgentIdentity;
22use russh::keys::check_known_hosts;17use russh::keys::check_known_hosts;
23use russh::keys::ssh_key::PublicKey;18use russh::keys::ssh_key::PublicKey;
24use russh::{Channel, ChannelMsg, ChannelStream};19use russh::Channel;
25use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream, ReadHalf, WriteHalf};
26use tokio::join;20use tempfile::TempDir;
27use tokio::net::UnixListener;21use tokio::net::UnixListener;
28use tokio::sync::mpsc;
29use tokio::sync::oneshot::{self, channel};22use tokio::sync::oneshot::{self, channel};
30use tracing::error;23use tokio::{
24 fs,
25 io::{AsyncReadExt as _, AsyncWriteExt as _},
26};
27use tracing::{debug, error};
31use uuid::Uuid;28use uuid::Uuid;
3229
30use self::port::channel_port;
31use self::subprocess::RemowtChild;
32
33pub mod editor;33pub mod editor;
34mod forwarded;
35mod port;
36#[cfg(feature = "shell")]
37mod shell;
38mod subprocess;
3439
40pub use forwarded::{RemowtListener, RemowtStream};
41#[cfg(feature = "shell")]
42pub use shell::{RemowtShell, RemowtShellResizer};
43
35type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;44type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
3645
37async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
38 let len = srx.read_u32().await?;
39 let mut buf = BytesMut::zeroed(len as usize);
40 srx.read_exact(&mut buf).await?;
41 Ok(buf)
42}
43async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
44 stx.write_u32(value.len().try_into().expect("can't be larger"))
45 .await?;
46 stx.write_all(&value).await?;
47 Ok(())
48}
49
50fn sh_quote(s: impl AsRef<str>) -> String {46fn sh_quote(s: impl AsRef<str>) -> String {
51 format!("'{}'", s.as_ref().replace('\'', "'\\''"))47 format!("'{}'", s.as_ref().replace('\'', "'\\''"))
52}48}
5349
54const ESCALATORS: [(&str, &[&str]); 3] = [50const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];
55 ("run0", &["--background=", "--pipe"]),
56 ("sudo", &[]),
57 ("doas", &[]),
58];
5951
60pub struct AgentBundle {52pub struct AgentBundle {
61 dir: PathBuf,53 dir: PathBuf,
90 fn binary(&self, arch: &str) -> PathBuf {82 fn binary(&self, arch: &str) -> PathBuf {
91 self.dir.join(format!("remowt-agent-{arch}"))83 self.dir.join(format!("remowt-agent-{arch}"))
92 }84 }
85
86 fn local_binary(&self) -> Result<PathBuf> {
87 let arch = env::consts::ARCH;
88 let path = self.binary(arch);
89 ensure!(
90 path.is_file(),
91 "no local remowt-agent build for arch {arch} in bundle {}",
92 self.dir.display()
93 );
94 Ok(path)
95 }
93}96}
9497
95async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {98async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {
96 let mut ch = sess.channel_open_session().await?;99 let ch = sess.channel_open_session().await?;
97 ch.exec(true, cmd).await?;100 ch.exec(true, cmd).await?;
101
102 let mut child = RemowtChild::from_exec(ch);
103 drop(child.stdin);
104
98 let mut out = Vec::new();105 let mut out = Vec::new();
99 let mut code = None;106 let mut err = Vec::new();
100 while let Some(msg) = ch.wait().await {107 tokio::try_join!(
101 match msg {108 child.stdout.read_to_end(&mut out),
102 ChannelMsg::Data { data } => out.extend(data.as_ref()),109 child.stderr.read_to_end(&mut err),
103 ChannelMsg::ExtendedData { data, .. } => {110 )?;
111 if !err.is_empty() {
104 error!(112 error!("remote stderr: {}", String::from_utf8_lossy(&err).trim());
105 "remote stderr: {}",
106 String::from_utf8_lossy(data.as_ref()).trim()
107 );
108 }
109 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
110 _ => {}
111 }
112 }113 }
114 let code = child.exit.await.ok().flatten();
113 Ok((code, out))115 Ok((code, out))
114}116}
115117
119 code == Some(0),121 code == Some(0),
120 "remote command failed (exit {code:?}): {cmd}"122 "remote command failed (exit {code:?}): {cmd}"
121 );123 );
122 ensure!(out.ends_with(b"\n"));124 if !out.is_empty() {
125 ensure!(
126 out.ends_with(b"\n"),
127 "remote command was not newline-terminated: {cmd}: {out:?}"
128 );
123 out.pop();129 out.pop();
130 }
124 String::from_utf8(out).context("expected utf8 output for command")131 String::from_utf8(out).context("expected utf8 output for command")
125}132}
126133
127async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {134async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {
135 debug!("uname -a");
128 let arch = run_string_ok(sess, "uname -m").await?;136 let arch = run_string_ok(sess, "uname -m").await?;
129 let hash = bundle137 let hash = bundle
130 .hashes138 .hashes
131 .get(&arch)139 .get(&arch)
132 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;140 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;
133141
142 debug!("get dir");
134 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")143 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"")
135 .await?144 .await?
136 .trim()
137 .to_owned();145 .to_owned();
138 let dir = if cache.is_empty() {146 let dir = if cache.is_empty() {
139 let home = run_string_ok(sess, "echo \"$HOME\"").await?;147 let home = run_string_ok(sess, "echo \"$HOME\"").await?;
140 ensure!(148 ensure!(
141 !home.is_empty(),149 !home.is_empty(),
142 "remote $HOME and $XDG_CACHE_HOME both empty"150 "remote $HOME and $XDG_CACHE_HOME both empty"
143 );151 );
144 Utf8PathBuf::from(home).join("cache/remowt")152 Utf8PathBuf::from(home).join(".cache/remowt")
145 } else {153 } else {
146 Utf8PathBuf::from(cache).join("remowt")154 Utf8PathBuf::from(cache).join("remowt")
147 };155 };
148 let path = dir.join(hash);156 let path = dir.join(hash);
149157
158 debug!("presence");
150 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;159 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;
151 if present != Some(0) {160 if present != Some(0) {
152 let bin = bundle.binary(&arch);161 let bin = bundle.binary(&arch);
153 let bytes = std::fs::read(&bin)162 debug!("read");
163 let bytes = fs::read(&bin)
164 .await
154 .with_context(|| format!("reading agent binary {}", bin.display()))?;165 .with_context(|| format!("reading agent binary {}", bin.display()))?;
166 debug!("upload");
155 upload_agent(sess, &dir, &path, bytes).await?;167 upload_agent(sess, &dir, &path, bytes).await?;
156 }168 }
157 Ok(path)169 Ok(path)
163 path: &Utf8Path,175 path: &Utf8Path,
164 bytes: Vec<u8>,176 bytes: Vec<u8>,
165) -> Result<()> {177) -> Result<()> {
178 debug!("mkdirp");
166 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;179 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;
167180
168 let tmp = path.join(format!("tmp.{}", Uuid::new_v4()));181 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));
169 let ch = sess.channel_open_session().await?;182 let ch = sess.channel_open_session().await?;
183 debug!("cat");
170 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;184 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;
171 ch.data_bytes(bytes).await?;185
172 ch.eof().await?;186 let mut child = RemowtChild::from_exec(ch);
173 let mut ch = ch;
174 let mut code = None;187 child
175 while let Some(msg) = ch.wait().await {188 .stdin
176 match msg {189 .write_all(&bytes)
177 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
178 ChannelMsg::ExtendedData { data, .. } => {190 .await
179 error!(191 .context("sending agent binary")?;
180 "agent upload: {}",
181 String::from_utf8_lossy(data.as_ref()).trim()192 child
193 .stdin
194 .shutdown()
182 );195 .await
196 .context("sending agent binary")?;
183 }197 let code = child.wait().await;
184 _ => {}
185 }
186 }
187 ensure!(code == Some(0), "agent upload failed (exit {code:?})");198 ensure!(code == Some(0), "agent upload failed (exit {code:?})");
188199
200 debug!("chmod");
189 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;201 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;
190 run_string_ok(202 run_string_ok(
191 sess,203 sess,
205 return Ok((tool, flags));217 return Ok((tool, flags));
206 }218 }
207 }219 }
208 bail!("no escalation tool (run0/sudo/doas) found on remote")220 bail!("no escalation tool found on remote")
209}221}
210222
211fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {223fn privileged_cmd(tool: &str, flags: &[&str], agent_path: &Utf8Path, path: Option<&str>) -> String {
228 .find(|p| p.is_file())240 .find(|p| p.is_file())
229}241}
230242
231fn port_from_channel(ch: Channel<Msg>) -> Port {
232 Port::new(move |mut rx, tx| async move {
233 let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
234 let srx_task = async move {
235 loop {
236 match read(&mut srx).await {
237 Ok(buf) => {
238 if tx.send(buf.freeze()).is_err() {
239 break;
240 }
241 }
242 Err(e) => {
243 error!("channel read failed: {e}");
244 break;
245 }
246 }
247 }
248 };
249 let stx_task = async move {
250 while let Some(value) = rx.recv().await {
251 if let Err(e) = write(&mut stx, value).await {
252 error!("channel write failed: {e}");
253 break;
254 }
255 }
256 };
257 join!(srx_task, stx_task);
258 })
259}
260
261pub struct SshHandler {243pub struct SshHandler {
262 host: String,244 host: String,
263 port: u16,245 port: u16,
290 }272 }
291}273}
292274
293struct SshElevator {
294 sess: Arc<Handle<SshHandler>>,
295 rpc: WeakRpc<BifConfig>,
296 agent_path: Utf8PathBuf,
297}
298impl Elevator for SshElevator {
299 async fn elevate(&self) -> Result<(), ElevateError> {
300 let fail = |e: String| ElevateError::Failed(e);
301 let (tool, flags) = detect_escalation(&self.sess)
302 .await
303 .map_err(|e| fail(e.to_string()))?;
304 let ch = self
305 .sess
306 .channel_open_session()
307 .await
308 .map_err(|e| fail(e.to_string()))?;
309 ch.exec(true, privileged_cmd(tool, flags, &self.agent_path, None))
310 .await
311 .map_err(|e| fail(e.to_string()))?;
312 let rpc = self
313 .rpc
314 .clone()
315 .upgrade()
316 .ok_or_else(|| fail("rpc is gone".to_owned()))?;
317 rpc.add_direct(Address::AgentPrivileged, port_from_channel(ch), Rtt(0));
318 Ok(())
319 }
320}
321
322pub struct RemoteChild {
323 pub stdout: DuplexStream,
324 pub stderr: DuplexStream,
325 pub exit: oneshot::Receiver<Option<u32>>,
326}
327
328enum Transport {275enum Transport {
329 Ssh {276 Ssh {
330 sess: Arc<Handle<SshHandler>>,277 sess: Arc<Handle<SshHandler>>,
331 subs: Subs,278 subs: Subs,
332 remote_dir: Utf8PathBuf,279 runtime_dir: Utf8PathBuf,
333 agent_path: Utf8PathBuf,280 agent_path: Utf8PathBuf,
334 },281 },
335 Local {282 Local {
336 #[allow(dead_code)]283 agent_path: PathBuf,
337 agent: Rpc<BifConfig>,
338 agent_path: String,284 runtime_dir: Utf8PathBuf,
339 },285 },
340}286}
341287
344 rpc: Rpc<BifConfig>,290 rpc: Rpc<BifConfig>,
345 elevated: tokio::sync::OnceCell<()>,291 elevated: tokio::sync::OnceCell<()>,
346 children: Mutex<Vec<tokio::process::Child>>,292 children: Mutex<Vec<tokio::process::Child>>,
293 _runtime_tmp: Option<TempDir>,
347}294}
348295
349pub type RemowtRemote = Remote<BifConfig>;296pub type RemowtRemote = Remote<BifConfig>;
350297
351fn loopback() -> (Port, Port) {
352 let (a2b_tx, mut a2b_rx) = mpsc::unbounded_channel::<Bytes>();
353 let (b2a_tx, mut b2a_rx) = mpsc::unbounded_channel::<Bytes>();
354 let user = Port::new(move |mut rx, tx| async move {
355 loop {
356 tokio::select! {
357 msg = rx.recv() => match msg {
358 Some(msg) => if a2b_tx.send(msg).is_err() { break },
359 None => break,
360 },
361 msg = b2a_rx.recv() => match msg {
362 Some(msg) => if tx.send(msg).is_err() { break },
363 None => break,
364 },
365 }
366 }
367 });
368 let agent = Port::new(move |mut rx, tx| async move {
369 loop {
370 tokio::select! {
371 msg = rx.recv() => match msg {
372 Some(msg) => if b2a_tx.send(msg).is_err() { break },
373 None => break,
374 },
375 msg = a2b_rx.recv() => match msg {
376 Some(msg) => if tx.send(msg).is_err() { break },
377 None => break,
378 },
379 }
380 }
381 });
382 (user, agent)
383}
384
385impl Remowt {298impl Remowt {
386 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {299 pub async fn connect(host: &str, bundle: &AgentBundle) -> Result<Self> {
387 let conf = russh_config::parse_home(host)?;300 let conf = russh_config::parse_home(host)?;
426 }339 }
427 ensure!(authenticated, "ssh authentication failed");340 ensure!(authenticated, "ssh authentication failed");
428341
429 // All remaining session ops take `&self`; share the handle.
430 let sess = Arc::new(sess);342 let sess = Arc::new(sess);
431343
344 debug!("deploying agent");
432 let agent_path = deploy_agent(&sess, bundle).await?;345 let agent_path = deploy_agent(&sess, bundle).await?;
433346
434 let remote_dir = remote_mktemp(&sess).await?;347 debug!("runtime dir");
348 let runtime_dir = remote_runtime_dir(&sess).await?;
435 let primary = remote_dir.join("primary.sock");349 let primary = runtime_dir.join(format!("remowt-{}.sock", Uuid::new_v4()));
436350
437 let (onetx, onerx) = channel();351 let (onetx, onerx) = channel();
438 subs.lock().expect("lock").insert(primary.clone(), onetx);352 subs.lock().expect("lock").insert(primary.clone(), onetx);
442356
443 // TODO: ensure no injection is possible in the socket path.357 // TODO: ensure no injection is possible in the socket path.
444 let cmd_chan = sess.channel_open_session().await?;358 let cmd_chan = sess.channel_open_session().await?;
359 debug!("starting agent");
445 cmd_chan360 cmd_chan
446 .exec(361 .exec(
447 true,362 true,
453 )368 )
454 .await?;369 .await?;
455370
456 let port = port_from_channel(371 let port = channel_port(
457 onerx372 onerx
458 .await373 .await
459 .map_err(|_| anyhow!("agent never opened its channel"))?,374 .map_err(|_| anyhow!("agent never opened its channel"))?,
464 transport: Transport::Ssh {379 transport: Transport::Ssh {
465 sess,380 sess,
466 subs,381 subs,
467 remote_dir,382 runtime_dir,
468 agent_path,383 agent_path,
469 },384 },
470 rpc,385 rpc,
471 elevated: tokio::sync::OnceCell::new(),386 elevated: tokio::sync::OnceCell::new(),
472 children: Mutex::new(Vec::new()),387 children: Mutex::new(Vec::new()),
388 _runtime_tmp: None,
473 })389 })
474 }390 }
475391
476 pub async fn connect_local(agent_path: &str) -> Result<Self> {392 pub async fn connect_local(bundle: &AgentBundle) -> Result<Self> {
477 let (port_user, port_agent) = loopback();393 let agent_path = bundle.local_binary()?;
394 let mut child = tokio::process::Command::new(&agent_path)
395 .arg("real-agent")
396 .stdin(std::process::Stdio::piped())
397 .stdout(std::process::Stdio::piped())
398 .kill_on_drop(true)
399 .spawn()
400 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;
401 let stdin = child.stdin.take().expect("stdin piped");
402 let stdout = child.stdout.take().expect("stdout piped");
403
478 let rpc = Rpc::<BifConfig>::new(Address::User);404 let rpc = Rpc::<BifConfig>::new(Address::User);
479 let mut agent = Rpc::<BifConfig>::new(Address::Agent);405 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));
480406
481 // Register handlers before wiring up the link (see the agent binary).407 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;
482 Fs::new().register_endpoints(&mut agent);
483 Systemd.register_endpoints(&mut agent);
484 Pty::new().register_endpoints(&mut agent);
485408
486 agent.add_direct(Address::User, port_agent, Rtt(0));
487 rpc.add_direct(Address::Agent, port_user, Rtt(0));
488
489 Ok(Self {409 Ok(Self {
490 transport: Transport::Local {410 transport: Transport::Local {
491 agent,411 agent_path,
492 agent_path: agent_path.to_owned(),412 runtime_dir,
493 },413 },
494 rpc,414 rpc,
495 elevated: tokio::sync::OnceCell::new(),415 elevated: tokio::sync::OnceCell::new(),
496 children: Mutex::new(Vec::new()),416 children: Mutex::new(vec![child]),
417 _runtime_tmp: runtime_tmp,
497 })418 })
498 }419 }
499420
547 let ch = sess.channel_open_session().await?;468 let ch = sess.channel_open_session().await?;
548 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))469 ch.exec(true, privileged_cmd(tool, flags, agent_path, None))
549 .await?;470 .await?;
550 port_from_channel(ch)471 channel_port(ch)
551 }472 }
552 Transport::Local { agent_path, .. } => {473 Transport::Local { agent_path, .. } => {
553 let sock = env::temp_dir()474 let sock = self
475 .runtime_dir()
554 .join(format!("remowt-priv-{}.sock", uuid::Uuid::new_v4()));476 .join(format!("remowt-priv-{}.sock", Uuid::new_v4()));
555 let _ = std::fs::remove_file(&sock);477 let _ = std::fs::remove_file(&sock);
556 let listener = UnixListener::bind(&sock)?;478 let listener = UnixListener::bind(&sock)?;
557 let (tool, flags) = ESCALATORS479 let (tool, flags) = ESCALATORS
558 .iter()480 .iter()
559 .find(|(t, _)| find_in_path(t).is_some())481 .find(|(t, _)| find_in_path(t).is_some())
560 .ok_or_else(|| anyhow!("no escalation tool (run0/sudo/doas) found"))?;482 .ok_or_else(|| anyhow!("no escalation tool found"))?;
561 let child = tokio::process::Command::new(tool)483 let child = tokio::process::Command::new(tool)
562 .args(*flags)484 .args(*flags)
563 .arg(agent_path)485 .arg(agent_path)
564 .arg("real-agent")486 .arg("real-agent")
565 .arg("--privileged")487 .arg("--privileged")
566 .arg("--path")488 .arg("--path")
567 .arg(sock.to_str().expect("temp path is utf-8"))489 .arg(sock.as_str())
568 .kill_on_drop(true)490 .kill_on_drop(true)
569 .spawn()?;491 .spawn()?;
570 self.children.lock().expect("lock").push(child);492 self.children.lock().expect("lock").push(child);
580 Ok(())502 Ok(())
581 }503 }
582504
583 pub async fn exec(&self, command: String) -> Result<RemoteChild> {505 pub async fn exec(&self, command: String) -> Result<RemowtChild> {
584 let Some(sess) = self.ssh() else {506 let Some(sess) = self.ssh() else {
585 bail!("exec should not be called on local")507 bail!("exec should not be called on local")
586 };508 };
587 let ch = sess.channel_open_session().await?;509 let ch = sess.channel_open_session().await?;
588 ch.exec(true, command).await?;510 ch.exec(true, command).await?;
589
590 let (mut out_w, stdout) = tokio::io::duplex(64 * 1024);511 Ok(RemowtChild::from_exec(ch))
591 let (mut err_w, stderr) = tokio::io::duplex(64 * 1024);
592 let (exit_tx, exit) = oneshot::channel();
593
594 tokio::spawn(async move {
595 let mut ch = ch;
596 let mut code = None;
597 while let Some(msg) = ch.wait().await {
598 match msg {
599 ChannelMsg::Data { data } => {
600 if out_w.write_all(&data).await.is_err() {
601 break;
602 }
603 }
604 ChannelMsg::ExtendedData { data, .. } => {
605 if err_w.write_all(&data).await.is_err() {
606 break;
607 }
608 }
609 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),
610 _ => {}
611 }
612 }
613 let _ = out_w.shutdown().await;
614 let _ = err_w.shutdown().await;
615 let _ = exit_tx.send(code);
616 });
617
618 Ok(RemoteChild {
619 stdout,
620 stderr,
621 exit,
622 })
623 }512 }
624513
625 pub fn serve_elevate(&self) -> Result<()> {514 fn runtime_dir(&self) -> Utf8PathBuf {
626 let Transport::Ssh {515 match &self.transport {
627 sess, agent_path, ..
628 } = &self.transport
629 else {
630 bail!("elevate should not be called on local")516 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),
631 };
632 let mut rpc = self.rpc.clone();
633 ElevateEndpoints(SshElevator {517 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),
634 sess: sess.clone(),
635 rpc: self.rpc.clone().downgrade(),
636 agent_path: agent_path.to_owned(),
637 })518 }
638 .register_endpoints(&mut rpc);
639 Ok(())
640 }519 }
641520
642 pub fn remote_dir(&self) -> Option<&Utf8Path> {521 pub async fn forward_socket(&self, path: &Utf8Path) -> Result<RemowtListener> {
643 match &self.transport {522 match &self.transport {
644 Transport::Ssh { remote_dir, .. } => Some(remote_dir),523 Transport::Ssh { sess, subs, .. } => {
524 let (tx, rx) = oneshot::channel();
525 subs.lock().expect("lock").insert(path.to_owned(), tx);
645 Transport::Local { .. } => None,526 sess.streamlocal_forward(path.to_owned()).await?;
527 Ok(RemowtListener::Ssh(rx))
528 }
529 Transport::Local { .. } => {
530 let _ = std::fs::remove_file(path);
531 Ok(RemowtListener::Local(
532 UnixListener::bind(path)?,
533 path.to_owned(),
534 ))
535 }
646 }536 }
647 }537 }
648
649 pub async fn forward_socket(
650 &self,
651 remote_path: &Utf8Path,
652 ) -> Result<oneshot::Receiver<Channel<Msg>>> {
653 let Transport::Ssh { sess, subs, .. } = &self.transport else {
654 bail!("forward_socket should not be called on local")
655 };
656 let (tx, rx) = oneshot::channel();
657 subs.lock()
658 .expect("lock")
659 .insert(remote_path.to_owned(), tx);
660 sess.streamlocal_forward(remote_path.to_owned()).await?;
661 Ok(rx)
662 }
663
664 pub async fn open_shell(&self, term: &str, cols: u16, rows: u16) -> Result<Shell> {
665 let Transport::Ssh { remote_dir, .. } = &self.transport else {
666 bail!("open_shell should not be called on local")
667 };
668 let sock = remote_dir.join(format!("shell-{}.sock", uuid::Uuid::new_v4()));
669
670 let rx = self.forward_socket(&sock).await?;
671 let client: PtyClient<BifConfig> = self.endpoints();
672 let id = client
673 .open_shell(sock, term.to_owned(), cols, rows)
674 .await?
675 .map_err(|e| anyhow!("agent failed to open shell: {e}"))?;
676 let ch = rx
677 .await
678 .map_err(|_| anyhow!("agent never connected the shell socket"))?;
679
680 Ok(Shell {
681 id,
682 stream: ch.into_stream(),
683 remote: self.rpc.remote(Address::Agent),
684 })
685 }
686}538}
687539
688pub struct Shell {540fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
689 pub id: ShellId,
690 pub stream: ChannelStream<Msg>,
691 remote: Remote<BifConfig>,
692}
693
694impl Shell {
695 pub fn resizer(&self) -> ShellResizer {541 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {
696 ShellResizer {542 if !dir.is_empty() {
697 remote: self.remote.clone(),543 return Ok((Utf8PathBuf::from(dir), None));
698 id: self.id,
699 }544 }
700 }545 }
546 let tmp = tempfile::Builder::new()
547 .prefix("remowt.")
548 .rand_bytes(12)
549 .tempdir()?;
550 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())
551 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;
552 Ok((dir, Some(tmp)))
701}553}
702554
703#[derive(Clone)]
704pub struct ShellResizer {555async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {
705 remote: Remote<BifConfig>,
706 id: ShellId,
707}
708
709impl ShellResizer {
710 pub async fn resize(&self, cols: u16, rows: u16) -> Result<()> {556 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;
711 PtyClient::wrap(self.remote.clone())557 let dir = dir.trim();
712 .resize(self.id, cols, rows)558 if dir.is_empty() {
713 .await?559 remote_mktemp(sess).await
714 .map_err(|e| anyhow!("failed to resize remote shell: {e}"))560 } else {
561 Ok(Utf8PathBuf::from(dir))
715 }562 }
716}563}
717564
addedcrates/remowt-client/src/port.rsdiffbeforeafterboth

no changes

addedcrates/remowt-client/src/shell.rsdiffbeforeafterboth

no changes

addedcrates/remowt-client/src/subprocess.rsdiffbeforeafterboth

no changes

modifiedcrates/remowt-link-shared/Cargo.tomldiffbeforeafterboth
11serde = { workspace = true, features = ["derive"] }11serde = { workspace = true, features = ["derive"] }
12serde_json.workspace = true12serde_json.workspace = true
13thiserror.workspace = true13thiserror.workspace = true
14tokio = { workspace = true, features = ["fs"] }14tokio = { workspace = true, features = ["fs", "io-util", "macros"] }
15tracing.workspace = true
15remowt-ui-prompt.workspace = true16remowt-ui-prompt.workspace = true
16camino = { workspace = true, features = ["serde1"] }17camino = { workspace = true, features = ["serde1"] }
1718
modifiedcrates/remowt-link-shared/src/lib.rsdiffbeforeafterboth
1use std::future::Future;
2
3use bifrostlink::declarative::endpoints;
4use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};1use bifrostlink::error::{ErrorT, ListenerForYourRequestHasBeenDeadError, ResponseError};
5use bifrostlink::notification;2use bifrostlink::notification;
6use bifrostlink::packet::OpaquePacketWrapper;3use bifrostlink::packet::OpaquePacketWrapper;
7use bifrostlink::{AddressT, Config};4use bifrostlink::AddressT;
8use serde::de::DeserializeOwned;5use serde::de::DeserializeOwned;
9use serde::{Deserialize, Serialize};6use serde::{Deserialize, Serialize};
107
11pub mod editor;8pub mod editor;
9pub mod port;
1210
13#[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]11#[derive(Clone, Serialize, Hash, Eq, Debug, PartialEq, Deserialize)]
14pub enum Address {12pub enum Address {
2119
22pub mod plugin;20pub mod plugin;
23
24#[derive(Serialize, Deserialize, Debug, thiserror::Error)]
25pub enum ElevateError {
26 #[error("elevation failed: {0}")]
27 Failed(String),
28}
29
30pub trait Elevator: Send + Sync {
31 fn elevate(&self) -> impl Future<Output = Result<(), ElevateError>> + Send;
32}
33
34pub struct ElevateEndpoints<E>(pub E);
35
36#[endpoints(ns = 3)]
37impl<E: Elevator + 'static> ElevateEndpoints<E> {
38 #[endpoints(id = 1)]
39 async fn elevate(&self) -> Result<(), ElevateError> {
40 self.0.elevate().await
41 }
42}
4321
44#[derive(thiserror::Error, Debug)]22#[derive(thiserror::Error, Debug)]
45pub enum Error {23pub enum Error {
addedcrates/remowt-link-shared/src/port.rsdiffbeforeafterboth

no changes

modifiedcrates/remowt-plugin/Cargo.tomldiffbeforeafterboth
9anyhow.workspace = true9anyhow.workspace = true
10bifrostlink.workspace = true10bifrostlink.workspace = true
11bifrostlink-ports.workspace = true11bifrostlink-ports.workspace = true
12bytes.workspace = true
13remowt-link-shared.workspace = true12remowt-link-shared.workspace = true
14serde_json.workspace = true13serde_json.workspace = true
15tokio = { workspace = true, features = [14tokio = { workspace = true, features = [
modifiedcrates/remowt-plugin/src/host.rsdiffbeforeafterboth
1use std::ffi::OsStr;1use std::ffi::OsStr;
2use std::io;
3use std::process::Stdio;2use std::process::Stdio;
4use std::sync::Mutex;3use std::sync::Mutex;
54
6use bifrostlink::{Port, Rpc, Rtt, WeakRpc};5use bifrostlink::{Rpc, Rtt, WeakRpc};
7use bytes::{Bytes, BytesMut};
8use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
9use tokio::process::{Child, ChildStdin, ChildStdout, Command};6use tokio::process::{Child, Command};
107
11use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};8use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};
9use remowt_link_shared::port::child_port;
12use remowt_link_shared::{Address, BifConfig};10use remowt_link_shared::{Address, BifConfig};
1311
14pub fn serve(rpc: &mut Rpc<BifConfig>) {12pub fn serve(rpc: &mut Rpc<BifConfig>) {
70 }68 }
71}69}
72
73fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {
74 Port::new(|mut rx, tx| async move {
75 let reader = async move {
76 loop {
77 let len = match stdout.read_u32().await {
78 Ok(len) => len,
79 Err(e) => {
80 tracing::error!("plugin stdout read failed: {e}");
81 break;
82 }
83 };
84 let mut buf = BytesMut::zeroed(len as usize);
85 if let Err(e) = stdout.read_exact(&mut buf).await {
86 tracing::error!("plugin stdout read failed: {e}");
87 break;
88 }
89 if tx.send(buf.freeze()).is_err() {
90 break;
91 }
92 }
93 };
94 let writer = async move {
95 while let Some(msg) = rx.recv().await {
96 if let Err(e) = write_frame(&mut stdin, msg).await {
97 tracing::error!("plugin stdin write failed: {e}");
98 break;
99 }
100 }
101 };
102 tokio::join!(reader, writer);
103 })
104}
105
106async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {
107 let len = u32::try_from(msg.len())
108 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;
109 stdin.write_u32(len).await?;
110 stdin.write_all(&msg).await?;
111 stdin.flush().await?;
112 Ok(())
113}
11470