difftreelog
refactor(remowt-ssh) switch to russh
in: trunk
2 files changed
cmds/remowt-ssh/Cargo.tomldiffbeforeafterboth4edition = "2021"4edition = "2021"556[dependencies]6[dependencies]7clap = { version = "4.5.16", features = ["derive"] }7clap = { workspace = true, features = ["derive"] }8openssh = { version = "0.11.0", features = ["native-mux"] }8openssh = { workspace = true, features = ["native-mux"] }9tracing-subscriber = "0.3.18"9tracing-subscriber.workspace = true10bifrostlink.workspace = true10bifrostlink.workspace = true11remowt-link-shared = { version = "0.1.0", path = "../../crates/remowt-link-shared" }11remowt-link-shared.workspace = true12remowt-client.workspace = true13tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }12tokio = { version = "1.39.3", features = ["macros"] }14nix = { workspace = true, features = ["term"] }13anyhow = "1.0.86"15anyhow.workspace = true14bifrostlink-ports.workspace = true16bifrostlink-ports.workspace = true15uuid = { version = "1.10.0", features = ["v4"] }17uuid = { workspace = true, features = ["v4"] }16tempdir = "0.3.7"18tempdir.workspace = true17russh = { git = "https://github.com/Eugeny/russh/" }19async-trait.workspace = true20bytes.workspace = true18russh-config = { git = "https://github.com/Eugeny/russh/" }21tokio-stream.workspace = true22tracing.workspace = true23thiserror = "2.0.18"19russh-keys = { git = "https://github.com/Eugeny/russh/" }24serde_json.workspace = true25serde.workspace = true20async-trait = "0.1.81"26ui-prompt.workspace = true27russh.workspace = true28russh-config.workspace = true2129cmds/remowt-ssh/src/main.rsdiffbeforeafterboth--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -1,140 +1,199 @@
use std::borrow::Cow;
-use std::ffi::OsString;
-use std::os::unix::ffi::OsStringExt;
+use std::env::VarError;
+use std::io;
+use std::os::fd::{AsRawFd, RawFd};
use std::path::PathBuf;
-use std::sync::Arc;
+use std::pin::Pin;
+use std::task::{Context, Poll};
-use anyhow::{bail, ensure};
-use async_trait::async_trait;
-use bifrostlink::Rpc;
+use anyhow::anyhow;
use clap::Parser;
-use remowt_link_shared::Address;
-use russh::client::{connect, Config, Handler, Session};
-use tempdir::TempDir;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::net::UnixSocket;
+use nix::libc;
+use nix::sys::termios::{self, SetArg, Termios};
+use remowt_client::editor::SshEditor;
+use remowt_client::{AgentBundle, Remowt};
+use remowt_link_shared::editor::serve_editor;
+use tokio::io::unix::AsyncFd;
+use tokio::io::{AsyncRead, ReadBuf};
+use tokio::signal::unix::{signal, SignalKind};
+use tracing::info;
+use ui_prompt::bifrost::serve_prompts;
+use ui_prompt::rofi::RofiPrompter;
+use ui_prompt::{PrependSourcePrompter, Source};
#[derive(Parser)]
struct Opts {
host: String,
}
-struct MyHandler {
- host: String,
- port: u16,
-}
-#[async_trait]
-impl Handler for MyHandler {
- type Error = russh::Error;
- async fn check_server_key(
- &mut self,
- server_public_key: &russh_keys::key::PublicKey,
- ) -> Result<bool, Self::Error> {
- Ok(russh_keys::check_known_hosts(
- &self.host,
- self.port,
- &server_public_key,
- )?)
- }
+fn agents_dir() -> anyhow::Result<PathBuf> {
+ std::env::var_os("REMOWT_AGENTS_DIR")
+ .map(PathBuf::from)
+ .or_else(|| option_env!("REMOWT_AGENTS_DIR").map(PathBuf::from))
+ .ok_or_else(|| anyhow!("no remowt-agents bundle"))
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
- let rpc = Rpc::<Address, remowt_link_shared::Error>::new(Address::User);
tracing_subscriber::fmt::init();
let opts = Opts::parse();
- let conf = dbg!(russh_config::parse_home(&opts.host)?);
- println!("connect");
- let mut sess = connect(
- Arc::new(Config {
- ..Default::default()
- }),
- dbg!((conf.host_name.clone(), conf.port)),
- MyHandler {
- host: conf.host_name,
- port: conf.port,
+ let bundle = AgentBundle::from_dir(agents_dir()?)?;
+ let conn = Remowt::connect(&opts.host, &bundle).await?;
+ let mut rpc = conn.rpc();
+
+ serve_prompts(
+ &mut rpc,
+ PrependSourcePrompter {
+ prompter: RofiPrompter,
+ source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],
+ description: "".to_owned(),
},
- )
- .await?;
- println!("agent");
- let mut agent = russh_keys::agent::client::AgentClient::connect_env().await?;
- for ele in agent.request_identities().await? {
- let (_agent, res) = sess.authenticate_future(conf.user.clone(), ele, agent).await;
- agent = _agent;
- if res? {
- break;
- }
+ );
+ if let Some(sess) = conn.ssh() {
+ serve_editor(&mut rpc, SshEditor { sess });
}
- // let sess = Session::connect(opts.host, openssh::KnownHosts::Strict).await?;
- let socket = UnixSocket::new_stream()?;
+ info!("entering shell");
+ run_shell(&conn).await?;
+ info!("shell ended");
- println!("mktemp");
- let mut cmd_chan = sess.channel_open_session().await?;
- cmd_chan
- .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")
- .await?;
- let mut stdout = vec![];
- loop {
- let Some(msg) = cmd_chan.wait().await else {
- bail!("unexpected channel end");
- };
- match msg {
- russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),
- russh::ChannelMsg::ExitStatus { exit_status } => {
- if exit_status != 0 {
- bail!("mktemp failed");
+ Ok(())
+}
+
+async fn run_shell(conn: &Remowt) -> anyhow::Result<()> {
+ let term = match std::env::var("TERM") {
+ Ok(v) => v,
+ Err(VarError::NotPresent) => "xterm-256color".to_owned(),
+ Err(e) => return Err(e.into()),
+ };
+ let (cols, rows) = term_size().unwrap_or((80, 24));
+
+ let shell = conn.open_shell(&term, cols, rows).await?;
+ let resizer = shell.resizer();
+ let stream = shell.stream;
+
+ let _raw = RawMode::enable();
+
+ if let Ok(mut winch) = signal(SignalKind::window_change()) {
+ tokio::spawn(async move {
+ while winch.recv().await.is_some() {
+ if let Some((cols, rows)) = term_size() {
+ let _ = resizer.resize(cols, rows).await;
}
- break;
}
- _ => {}
- }
+ });
}
- ensure!(stdout.ends_with(b"\n"));
- stdout.pop();
+ let (mut from_remote, mut to_remote) = tokio::io::split(stream);
+ let mut stdin = AsyncStdin::new()?;
+ let mut stdout = tokio::io::stdout();
- // Remote host is not neccessary linux, openssh crate makes incorrect assumptions here.
- // TODO: Remove on local close.
- let remote_dir = PathBuf::from(OsString::from_vec(stdout));
- let remote_socket = remote_dir.join("primary.sock");
+ tokio::select! {
+ r = tokio::io::copy(&mut from_remote, &mut stdout) => { r?; }
+ _ = tokio::io::copy(&mut stdin, &mut to_remote) => {}
+ }
- let local_dir = TempDir::new("remowt")?;
- let local_socket = local_dir.path().join("primary.sock");
+ Ok(())
+}
- println!("listen");
- socket.bind(&local_socket)?;
- let listener = socket.listen(1)?;
+struct AsyncStdin {
+ fd: AsyncFd<RawFd>,
+ original_flags: i32,
+}
- eprintln!("forward socket");
+impl AsyncStdin {
+ fn new() -> io::Result<Self> {
+ let raw = libc::STDIN_FILENO;
+ // SAFETY: F_GETFL/F_SETFL round-trip on a valid fd.
+ let original_flags = unsafe { libc::fcntl(raw, libc::F_GETFL) };
+ if original_flags < 0 {
+ return Err(io::Error::last_os_error());
+ }
+ if unsafe { libc::fcntl(raw, libc::F_SETFL, original_flags | libc::O_NONBLOCK) } < 0 {
+ return Err(io::Error::last_os_error());
+ }
+ Ok(Self {
+ fd: AsyncFd::new(raw)?,
+ original_flags,
+ })
+ }
+}
- let mut sock = sess
- .channel_open_direct_streamlocal(dbg!(remote_socket.to_str().expect("path is utf-8")))
- .await?;
+impl Drop for AsyncStdin {
+ fn drop(&mut self) {
+ // SAFETY: restoring the flags we saved on a valid fd.
+ unsafe { libc::fcntl(libc::STDIN_FILENO, libc::F_SETFL, self.original_flags) };
+ }
+}
- eprintln!("wait");
- while let Some(v) = sock.wait().await {
- dbg!(v);
+impl AsyncRead for AsyncStdin {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ let this = self.get_mut();
+ loop {
+ let mut guard = match this.fd.poll_read_ready(cx) {
+ Poll::Ready(Ok(g)) => g,
+ Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
+ Poll::Pending => return Poll::Pending,
+ };
+ let unfilled = buf.initialize_unfilled();
+ let res = guard.try_io(|inner| {
+ let fd = *inner.get_ref();
+ // SAFETY: writing into `unfilled`'s own backing storage.
+ let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };
+ if n < 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(n as usize)
+ }
+ });
+ match res {
+ Ok(Ok(n)) => {
+ buf.advance(n);
+ return Poll::Ready(Ok(()));
+ }
+ Ok(Err(e)) => return Poll::Ready(Err(e)),
+ Err(_would_block) => continue,
+ }
+ }
}
+}
+
+fn term_size() -> Option<(u16, u16)> {
+ let mut ws: libc::winsize = unsafe { std::mem::zeroed() };
+ let rc = unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) };
+ if rc != 0 || ws.ws_col == 0 {
+ None
+ } else {
+ Some((ws.ws_col, ws.ws_row))
+ }
+}
- eprintln!("spawn agent");
+struct RawMode {
+ original: Termios,
+}
- // let _agent = sess
- // .command("/home/lach/.remowt/remowt-agent")
- // .arg("agent-real")
- // .arg("--path")
- // .arg(remote_socket.to_str().expect("path is utf-8"))
- // .spawn()
- // .await?;
- //
- // let (mut conn, _) = listener.accept().await?;
- // let mut buf = [0u8; 13];
- // conn.read_exact(&mut buf).await?;
- // assert_eq!(&buf, b"REMOWT_HELLO\0");
- // conn.write_all(b"REMOWT_EHLO\0").await?;
- //
- // println!("handshake complete!");
+impl RawMode {
+ fn enable() -> Option<Self> {
+ let stdin = std::io::stdin();
+ // SAFETY: trivial libc call on a borrowed fd.
+ if unsafe { libc::isatty(stdin.as_raw_fd()) } != 1 {
+ return None;
+ }
+ let original = termios::tcgetattr(&stdin).ok()?;
+ let mut raw = original.clone();
+ termios::cfmakeraw(&mut raw);
+ termios::tcsetattr(&stdin, SetArg::TCSANOW, &raw).ok()?;
+ Some(Self { original })
+ }
+}
- Ok(())
+impl Drop for RawMode {
+ fn drop(&mut self) {
+ let _ = termios::tcsetattr(std::io::stdin(), SetArg::TCSANOW, &self.original);
+ }
}