difftreelog
refactor(remowt-ssh) switch to russh
in: trunk
2 files changed
cmds/remowt-ssh/Cargo.tomldiffbeforeafterboth4edition = "2021"4edition = "2021"556[dependencies]6[dependencies]7clap = { version = "4.5.16", features = ["derive"] }7clap = { workspace = true, features = ["derive"] }8openssh = { version = "0.11.0", features = ["native-mux"] }8openssh = { workspace = true, features = ["native-mux"] }9tracing-subscriber = "0.3.18"9tracing-subscriber.workspace = true10bifrostlink.workspace = true10bifrostlink.workspace = true11remowt-link-shared = { version = "0.1.0", path = "../../crates/remowt-link-shared" }11remowt-link-shared.workspace = true12remowt-client.workspace = true13tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }12tokio = { version = "1.39.3", features = ["macros"] }14nix = { workspace = true, features = ["term"] }13anyhow = "1.0.86"15anyhow.workspace = true14bifrostlink-ports.workspace = true16bifrostlink-ports.workspace = true15uuid = { version = "1.10.0", features = ["v4"] }17uuid = { workspace = true, features = ["v4"] }16tempdir = "0.3.7"18tempdir.workspace = true17russh = { git = "https://github.com/Eugeny/russh/" }19async-trait.workspace = true20bytes.workspace = true18russh-config = { git = "https://github.com/Eugeny/russh/" }21tokio-stream.workspace = true22tracing.workspace = true23thiserror = "2.0.18"19russh-keys = { git = "https://github.com/Eugeny/russh/" }24serde_json.workspace = true25serde.workspace = true20async-trait = "0.1.81"26ui-prompt.workspace = true27russh.workspace = true28russh-config.workspace = true2129cmds/remowt-ssh/src/main.rsdiffbeforeafterboth1use std::borrow::Cow;1use std::borrow::Cow;2use std::ffi::OsString;2use std::env::VarError;3use std::io;3use std::os::unix::ffi::OsStringExt;4use std::os::fd::{AsRawFd, RawFd};4use std::path::PathBuf;5use std::path::PathBuf;5use std::sync::Arc;6use std::pin::Pin;67use anyhow::{bail, ensure};7use std::task::{Context, Poll};88use async_trait::async_trait;9use anyhow::anyhow;9use bifrostlink::Rpc;10use clap::Parser;10use clap::Parser;11use remowt_link_shared::Address;11use nix::libc;12use russh::client::{connect, Config, Handler, Session};12use nix::sys::termios::{self, SetArg, Termios};13use tempdir::TempDir;13use remowt_client::editor::SshEditor;14use remowt_client::{AgentBundle, Remowt};15use remowt_link_shared::editor::serve_editor;16use tokio::io::unix::AsyncFd;14use tokio::io::{AsyncReadExt, AsyncWriteExt};17use tokio::io::{AsyncRead, ReadBuf};15use tokio::net::UnixSocket;18use tokio::signal::unix::{signal, SignalKind};19use tracing::info;20use ui_prompt::bifrost::serve_prompts;21use ui_prompt::rofi::RofiPrompter;22use ui_prompt::{PrependSourcePrompter, Source};162317#[derive(Parser)]24#[derive(Parser)]18struct Opts {25struct Opts {19 host: String,26 host: String,20}27}212822struct MyHandler {29fn agents_dir() -> anyhow::Result<PathBuf> {23 host: String,30 std::env::var_os("REMOWT_AGENTS_DIR")24 port: u16,31 .map(PathBuf::from)32 .or_else(|| option_env!("REMOWT_AGENTS_DIR").map(PathBuf::from))33 .ok_or_else(|| anyhow!("no remowt-agents bundle"))25}34}3526#[async_trait]36#[tokio::main(flavor = "current_thread")]27impl Handler for MyHandler {28 type Error = russh::Error;29 async fn check_server_key(37async fn main() -> anyhow::Result<()> {30 &mut self,31 server_public_key: &russh_keys::key::PublicKey,38 tracing_subscriber::fmt::init();39 let opts = Opts::parse();4032 ) -> Result<bool, Self::Error> {41 let bundle = AgentBundle::from_dir(agents_dir()?)?;33 Ok(russh_keys::check_known_hosts(42 let conn = Remowt::connect(&opts.host, &bundle).await?;34 &self.host,43 let mut rpc = conn.rpc();35 self.port,4436 &server_public_key,45 serve_prompts(37 )?)46 &mut rpc,38 }47 PrependSourcePrompter {39}48 prompter: RofiPrompter,4049 source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],41#[tokio::main(flavor = "current_thread")]50 description: "".to_owned(),51 },52 );53 if let Some(sess) = conn.ssh() {54 serve_editor(&mut rpc, SshEditor { sess });55 }5657 info!("entering shell");58 run_shell(&conn).await?;59 info!("shell ended");6061 Ok(())62}6342async fn main() -> anyhow::Result<()> {64async fn run_shell(conn: &Remowt) -> anyhow::Result<()> {65 let term = match std::env::var("TERM") {66 Ok(v) => v,67 Err(VarError::NotPresent) => "xterm-256color".to_owned(),68 Err(e) => return Err(e.into()),69 };70 let (cols, rows) = term_size().unwrap_or((80, 24));7172 let shell = conn.open_shell(&term, cols, rows).await?;73 let resizer = shell.resizer();74 let stream = shell.stream;7576 let _raw = RawMode::enable();7778 if let Ok(mut winch) = signal(SignalKind::window_change()) {79 tokio::spawn(async move {80 while winch.recv().await.is_some() {81 if let Some((cols, rows)) = term_size() {82 let _ = resizer.resize(cols, rows).await;83 }84 }85 });86 }8788 let (mut from_remote, mut to_remote) = tokio::io::split(stream);89 let mut stdin = AsyncStdin::new()?;90 let mut stdout = tokio::io::stdout();9192 tokio::select! {93 r = tokio::io::copy(&mut from_remote, &mut stdout) => { r?; }94 _ = tokio::io::copy(&mut stdin, &mut to_remote) => {}95 }9697 Ok(())98}99100struct AsyncStdin {43 let rpc = Rpc::<Address, remowt_link_shared::Error>::new(Address::User);101 fd: AsyncFd<RawFd>,44 tracing_subscriber::fmt::init();102 original_flags: i32,103}104105impl AsyncStdin {106 fn new() -> io::Result<Self> {45 let opts = Opts::parse();107 let raw = libc::STDIN_FILENO;46108 // SAFETY: F_GETFL/F_SETFL round-trip on a valid fd.47 let conf = dbg!(russh_config::parse_home(&opts.host)?);48 println!("connect");49 let mut sess = connect(109 let original_flags = unsafe { libc::fcntl(raw, libc::F_GETFL) };50 Arc::new(Config {110 if original_flags < 0 {51 ..Default::default()52 }),53 dbg!((conf.host_name.clone(), conf.port)),54 MyHandler {55 host: conf.host_name,56 port: conf.port,57 },58 )59 .await?;60 println!("agent");61 let mut agent = russh_keys::agent::client::AgentClient::connect_env().await?;111 return Err(io::Error::last_os_error());112 }62 for ele in agent.request_identities().await? {113 if unsafe { libc::fcntl(raw, libc::F_SETFL, original_flags | libc::O_NONBLOCK) } < 0 {63 let (_agent, res) = sess.authenticate_future(conf.user.clone(), ele, agent).await;114 return Err(io::Error::last_os_error());64 agent = _agent;65 if res? {66 break;67 }68 }115 }69 // let sess = Session::connect(opts.host, openssh::KnownHosts::Strict).await?;116 Ok(Self {7071 let socket = UnixSocket::new_stream()?;117 fd: AsyncFd::new(raw)?,72118 original_flags,73 println!("mktemp");119 })120 }121}122123impl Drop for AsyncStdin {74 let mut cmd_chan = sess.channel_open_session().await?;124 fn drop(&mut self) {75 cmd_chan125 // SAFETY: restoring the flags we saved on a valid fd.76 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")126 unsafe { libc::fcntl(libc::STDIN_FILENO, libc::F_SETFL, self.original_flags) };77 .await?;127 }128}129130impl AsyncRead for AsyncStdin {131 fn poll_read(132 self: Pin<&mut Self>,133 cx: &mut Context<'_>,134 buf: &mut ReadBuf<'_>,135 ) -> Poll<io::Result<()>> {78 let mut stdout = vec![];136 let this = self.get_mut();79 loop {137 loop {138 let mut guard = match this.fd.poll_read_ready(cx) {139 Poll::Ready(Ok(g)) => g,140 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),141 Poll::Pending => return Poll::Pending,142 };143 let unfilled = buf.initialize_unfilled();80 let Some(msg) = cmd_chan.wait().await else {144 let res = guard.try_io(|inner| {145 let fd = *inner.get_ref();146 // SAFETY: writing into `unfilled`'s own backing storage.147 let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };148 if n < 0 {149 Err(io::Error::last_os_error())150 } else {81 bail!("unexpected channel end");151 Ok(n as usize)82 };152 }153 });83 match msg {154 match res {84 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),155 Ok(Ok(n)) => {156 buf.advance(n);157 return Poll::Ready(Ok(()));158 }85 russh::ChannelMsg::ExitStatus { exit_status } => {159 Ok(Err(e)) => return Poll::Ready(Err(e)),86 if exit_status != 0 {87 bail!("mktemp failed");88 }89 break;90 }91 _ => {}160 Err(_would_block) => continue,92 }161 }93 }162 }163 }164}9416595 ensure!(stdout.ends_with(b"\n"));166fn term_size() -> Option<(u16, u16)> {96 stdout.pop();167 let mut ws: libc::winsize = unsafe { std::mem::zeroed() };9798 // Remote host is not neccessary linux, openssh crate makes incorrect assumptions here.99 // TODO: Remove on local close.100 let remote_dir = PathBuf::from(OsString::from_vec(stdout));168 let rc = unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) };101 let remote_socket = remote_dir.join("primary.sock");169 if rc != 0 || ws.ws_col == 0 {102170 None171 } else {172 Some((ws.ws_col, ws.ws_row))173 }174}175176struct RawMode {177 original: Termios,178}179180impl RawMode {181 fn enable() -> Option<Self> {103 let local_dir = TempDir::new("remowt")?;182 let stdin = std::io::stdin();183 // SAFETY: trivial libc call on a borrowed fd.104 let local_socket = local_dir.path().join("primary.sock");184 if unsafe { libc::isatty(stdin.as_raw_fd()) } != 1 {105185 return None;106 println!("listen");186 }107 socket.bind(&local_socket)?;187 let original = termios::tcgetattr(&stdin).ok()?;108 let listener = socket.listen(1)?;188 let mut raw = original.clone();109110 eprintln!("forward socket");189 termios::cfmakeraw(&mut raw);111112 let mut sock = sess113 .channel_open_direct_streamlocal(dbg!(remote_socket.to_str().expect("path is utf-8")))190 termios::tcsetattr(&stdin, SetArg::TCSANOW, &raw).ok()?;114 .await?;115116 eprintln!("wait");191 Some(Self { original })192 }193}194195impl Drop for RawMode {196 fn drop(&mut self) {117 while let Some(v) = sock.wait().await {197 let _ = termios::tcsetattr(std::io::stdin(), SetArg::TCSANOW, &self.original);118 dbg!(v);198 }119 }199}120121 eprintln!("spawn agent");122123 // let _agent = sess124 // .command("/home/lach/.remowt/remowt-agent")125 // .arg("agent-real")126 // .arg("--path")127 // .arg(remote_socket.to_str().expect("path is utf-8"))128 // .spawn()129 // .await?;130 //131 // let (mut conn, _) = listener.accept().await?;132 // let mut buf = [0u8; 13];133 // conn.read_exact(&mut buf).await?;134 // assert_eq!(&buf, b"REMOWT_HELLO\0");135 // conn.write_all(b"REMOWT_EHLO\0").await?;136 //137 // println!("handshake complete!");138139 Ok(())140}141200