From 4516008b4ee2b846a5a37ef318b084caddc39227 Mon Sep 17 00:00:00 2001 From: Yaroslav Bolyukin Date: Sun, 25 Jan 2026 09:12:28 +0000 Subject: [PATCH] refactor(remowt-ssh): switch to russh --- --- a/cmds/remowt-ssh/Cargo.toml +++ b/cmds/remowt-ssh/Cargo.toml @@ -4,17 +4,25 @@ edition = "2021" [dependencies] -clap = { version = "4.5.16", features = ["derive"] } -openssh = { version = "0.11.0", features = ["native-mux"] } -tracing-subscriber = "0.3.18" +clap = { workspace = true, features = ["derive"] } +openssh = { workspace = true, features = ["native-mux"] } +tracing-subscriber.workspace = true bifrostlink.workspace = true -remowt-link-shared = { version = "0.1.0", path = "../../crates/remowt-link-shared" } -tokio = { version = "1.39.3", features = ["macros"] } -anyhow = "1.0.86" +remowt-link-shared.workspace = true +remowt-client.workspace = true +tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] } +nix = { workspace = true, features = ["term"] } +anyhow.workspace = true bifrostlink-ports.workspace = true -uuid = { version = "1.10.0", features = ["v4"] } -tempdir = "0.3.7" -russh = { git = "https://github.com/Eugeny/russh/" } -russh-config = { git = "https://github.com/Eugeny/russh/" } -russh-keys = { git = "https://github.com/Eugeny/russh/" } -async-trait = "0.1.81" +uuid = { workspace = true, features = ["v4"] } +tempdir.workspace = true +async-trait.workspace = true +bytes.workspace = true +tokio-stream.workspace = true +tracing.workspace = true +thiserror = "2.0.18" +serde_json.workspace = true +serde.workspace = true +ui-prompt.workspace = true +russh.workspace = true +russh-config.workspace = true --- a/cmds/remowt-ssh/src/main.rs +++ b/cmds/remowt-ssh/src/main.rs @@ -1,140 +1,199 @@ use std::borrow::Cow; -use std::ffi::OsString; -use std::os::unix::ffi::OsStringExt; +use std::env::VarError; +use std::io; +use std::os::fd::{AsRawFd, RawFd}; use std::path::PathBuf; -use std::sync::Arc; +use std::pin::Pin; +use std::task::{Context, Poll}; -use anyhow::{bail, ensure}; -use async_trait::async_trait; -use bifrostlink::Rpc; +use anyhow::anyhow; use clap::Parser; -use remowt_link_shared::Address; -use russh::client::{connect, Config, Handler, Session}; -use tempdir::TempDir; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::UnixSocket; +use nix::libc; +use nix::sys::termios::{self, SetArg, Termios}; +use remowt_client::editor::SshEditor; +use remowt_client::{AgentBundle, Remowt}; +use remowt_link_shared::editor::serve_editor; +use tokio::io::unix::AsyncFd; +use tokio::io::{AsyncRead, ReadBuf}; +use tokio::signal::unix::{signal, SignalKind}; +use tracing::info; +use ui_prompt::bifrost::serve_prompts; +use ui_prompt::rofi::RofiPrompter; +use ui_prompt::{PrependSourcePrompter, Source}; #[derive(Parser)] struct Opts { host: String, } -struct MyHandler { - host: String, - port: u16, -} -#[async_trait] -impl Handler for MyHandler { - type Error = russh::Error; - async fn check_server_key( - &mut self, - server_public_key: &russh_keys::key::PublicKey, - ) -> Result { - Ok(russh_keys::check_known_hosts( - &self.host, - self.port, - &server_public_key, - )?) - } +fn agents_dir() -> anyhow::Result { + std::env::var_os("REMOWT_AGENTS_DIR") + .map(PathBuf::from) + .or_else(|| option_env!("REMOWT_AGENTS_DIR").map(PathBuf::from)) + .ok_or_else(|| anyhow!("no remowt-agents bundle")) } #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { - let rpc = Rpc::::new(Address::User); tracing_subscriber::fmt::init(); let opts = Opts::parse(); - let conf = dbg!(russh_config::parse_home(&opts.host)?); - println!("connect"); - let mut sess = connect( - Arc::new(Config { - ..Default::default() - }), - dbg!((conf.host_name.clone(), conf.port)), - MyHandler { - host: conf.host_name, - port: conf.port, + let bundle = AgentBundle::from_dir(agents_dir()?)?; + let conn = Remowt::connect(&opts.host, &bundle).await?; + let mut rpc = conn.rpc(); + + serve_prompts( + &mut rpc, + PrependSourcePrompter { + prompter: RofiPrompter, + source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))], + description: "".to_owned(), }, - ) - .await?; - println!("agent"); - let mut agent = russh_keys::agent::client::AgentClient::connect_env().await?; - for ele in agent.request_identities().await? { - let (_agent, res) = sess.authenticate_future(conf.user.clone(), ele, agent).await; - agent = _agent; - if res? { - break; - } + ); + if let Some(sess) = conn.ssh() { + serve_editor(&mut rpc, SshEditor { sess }); } - // let sess = Session::connect(opts.host, openssh::KnownHosts::Strict).await?; - let socket = UnixSocket::new_stream()?; + info!("entering shell"); + run_shell(&conn).await?; + info!("shell ended"); - println!("mktemp"); - let mut cmd_chan = sess.channel_open_session().await?; - cmd_chan - .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir") - .await?; - let mut stdout = vec![]; - loop { - let Some(msg) = cmd_chan.wait().await else { - bail!("unexpected channel end"); - }; - match msg { - russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()), - russh::ChannelMsg::ExitStatus { exit_status } => { - if exit_status != 0 { - bail!("mktemp failed"); + Ok(()) +} + +async fn run_shell(conn: &Remowt) -> anyhow::Result<()> { + let term = match std::env::var("TERM") { + Ok(v) => v, + Err(VarError::NotPresent) => "xterm-256color".to_owned(), + Err(e) => return Err(e.into()), + }; + let (cols, rows) = term_size().unwrap_or((80, 24)); + + let shell = conn.open_shell(&term, cols, rows).await?; + let resizer = shell.resizer(); + let stream = shell.stream; + + let _raw = RawMode::enable(); + + if let Ok(mut winch) = signal(SignalKind::window_change()) { + tokio::spawn(async move { + while winch.recv().await.is_some() { + if let Some((cols, rows)) = term_size() { + let _ = resizer.resize(cols, rows).await; } - break; } - _ => {} - } + }); } - ensure!(stdout.ends_with(b"\n")); - stdout.pop(); + let (mut from_remote, mut to_remote) = tokio::io::split(stream); + let mut stdin = AsyncStdin::new()?; + let mut stdout = tokio::io::stdout(); - // Remote host is not neccessary linux, openssh crate makes incorrect assumptions here. - // TODO: Remove on local close. - let remote_dir = PathBuf::from(OsString::from_vec(stdout)); - let remote_socket = remote_dir.join("primary.sock"); + tokio::select! { + r = tokio::io::copy(&mut from_remote, &mut stdout) => { r?; } + _ = tokio::io::copy(&mut stdin, &mut to_remote) => {} + } - let local_dir = TempDir::new("remowt")?; - let local_socket = local_dir.path().join("primary.sock"); + Ok(()) +} - println!("listen"); - socket.bind(&local_socket)?; - let listener = socket.listen(1)?; +struct AsyncStdin { + fd: AsyncFd, + original_flags: i32, +} - eprintln!("forward socket"); +impl AsyncStdin { + fn new() -> io::Result { + let raw = libc::STDIN_FILENO; + // SAFETY: F_GETFL/F_SETFL round-trip on a valid fd. + let original_flags = unsafe { libc::fcntl(raw, libc::F_GETFL) }; + if original_flags < 0 { + return Err(io::Error::last_os_error()); + } + if unsafe { libc::fcntl(raw, libc::F_SETFL, original_flags | libc::O_NONBLOCK) } < 0 { + return Err(io::Error::last_os_error()); + } + Ok(Self { + fd: AsyncFd::new(raw)?, + original_flags, + }) + } +} - let mut sock = sess - .channel_open_direct_streamlocal(dbg!(remote_socket.to_str().expect("path is utf-8"))) - .await?; +impl Drop for AsyncStdin { + fn drop(&mut self) { + // SAFETY: restoring the flags we saved on a valid fd. + unsafe { libc::fcntl(libc::STDIN_FILENO, libc::F_SETFL, self.original_flags) }; + } +} - eprintln!("wait"); - while let Some(v) = sock.wait().await { - dbg!(v); +impl AsyncRead for AsyncStdin { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.get_mut(); + loop { + let mut guard = match this.fd.poll_read_ready(cx) { + Poll::Ready(Ok(g)) => g, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + }; + let unfilled = buf.initialize_unfilled(); + let res = guard.try_io(|inner| { + let fd = *inner.get_ref(); + // SAFETY: writing into `unfilled`'s own backing storage. + let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) }; + if n < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(n as usize) + } + }); + match res { + Ok(Ok(n)) => { + buf.advance(n); + return Poll::Ready(Ok(())); + } + Ok(Err(e)) => return Poll::Ready(Err(e)), + Err(_would_block) => continue, + } + } } +} + +fn term_size() -> Option<(u16, u16)> { + let mut ws: libc::winsize = unsafe { std::mem::zeroed() }; + let rc = unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) }; + if rc != 0 || ws.ws_col == 0 { + None + } else { + Some((ws.ws_col, ws.ws_row)) + } +} - eprintln!("spawn agent"); +struct RawMode { + original: Termios, +} - // let _agent = sess - // .command("/home/lach/.remowt/remowt-agent") - // .arg("agent-real") - // .arg("--path") - // .arg(remote_socket.to_str().expect("path is utf-8")) - // .spawn() - // .await?; - // - // let (mut conn, _) = listener.accept().await?; - // let mut buf = [0u8; 13]; - // conn.read_exact(&mut buf).await?; - // assert_eq!(&buf, b"REMOWT_HELLO\0"); - // conn.write_all(b"REMOWT_EHLO\0").await?; - // - // println!("handshake complete!"); +impl RawMode { + fn enable() -> Option { + let stdin = std::io::stdin(); + // SAFETY: trivial libc call on a borrowed fd. + if unsafe { libc::isatty(stdin.as_raw_fd()) } != 1 { + return None; + } + let original = termios::tcgetattr(&stdin).ok()?; + let mut raw = original.clone(); + termios::cfmakeraw(&mut raw); + termios::tcsetattr(&stdin, SetArg::TCSANOW, &raw).ok()?; + Some(Self { original }) + } +} - Ok(()) +impl Drop for RawMode { + fn drop(&mut self) { + let _ = termios::tcsetattr(std::io::stdin(), SetArg::TCSANOW, &self.original); + } } -- gitstuff