git.delta.rocks / remowt / refs/commits / 4516008b4ee2

difftreelog

refactor(remowt-ssh) switch to russh

twnkorssYaroslav Bolyukin2026-01-25parent: #745d95d.patch.diff
in: trunk

2 files changed

modifiedcmds/remowt-ssh/Cargo.tomldiffbeforeafterboth
4edition = "2021"4edition = "2021"
55
6[dependencies]6[dependencies]
7clap = { version = "4.5.16", features = ["derive"] }7clap = { workspace = true, features = ["derive"] }
8openssh = { version = "0.11.0", features = ["native-mux"] }8openssh = { workspace = true, features = ["native-mux"] }
9tracing-subscriber = "0.3.18"9tracing-subscriber.workspace = true
10bifrostlink.workspace = true10bifrostlink.workspace = true
11remowt-link-shared = { version = "0.1.0", path = "../../crates/remowt-link-shared" }11remowt-link-shared.workspace = true
12remowt-client.workspace = true
13tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }
12tokio = { version = "1.39.3", features = ["macros"] }14nix = { workspace = true, features = ["term"] }
13anyhow = "1.0.86"15anyhow.workspace = true
14bifrostlink-ports.workspace = true16bifrostlink-ports.workspace = true
15uuid = { version = "1.10.0", features = ["v4"] }17uuid = { workspace = true, features = ["v4"] }
16tempdir = "0.3.7"18tempdir.workspace = true
17russh = { git = "https://github.com/Eugeny/russh/" }19async-trait.workspace = true
20bytes.workspace = true
18russh-config = { git = "https://github.com/Eugeny/russh/" }21tokio-stream.workspace = true
22tracing.workspace = true
23thiserror = "2.0.18"
19russh-keys = { git = "https://github.com/Eugeny/russh/" }24serde_json.workspace = true
25serde.workspace = true
20async-trait = "0.1.81"26ui-prompt.workspace = true
27russh.workspace = true
28russh-config.workspace = true
2129
modifiedcmds/remowt-ssh/src/main.rsdiffbeforeafterboth
1use std::borrow::Cow;1use std::borrow::Cow;
2use std::ffi::OsString;2use std::env::VarError;
3use std::io;
3use std::os::unix::ffi::OsStringExt;4use std::os::fd::{AsRawFd, RawFd};
4use std::path::PathBuf;5use std::path::PathBuf;
5use std::sync::Arc;6use std::pin::Pin;
6
7use anyhow::{bail, ensure};7use std::task::{Context, Poll};
8
8use async_trait::async_trait;9use anyhow::anyhow;
9use bifrostlink::Rpc;
10use clap::Parser;10use clap::Parser;
11use remowt_link_shared::Address;11use nix::libc;
12use russh::client::{connect, Config, Handler, Session};12use nix::sys::termios::{self, SetArg, Termios};
13use tempdir::TempDir;13use remowt_client::editor::SshEditor;
14use remowt_client::{AgentBundle, Remowt};
15use remowt_link_shared::editor::serve_editor;
16use tokio::io::unix::AsyncFd;
14use tokio::io::{AsyncReadExt, AsyncWriteExt};17use tokio::io::{AsyncRead, ReadBuf};
15use tokio::net::UnixSocket;18use tokio::signal::unix::{signal, SignalKind};
19use tracing::info;
20use ui_prompt::bifrost::serve_prompts;
21use ui_prompt::rofi::RofiPrompter;
22use ui_prompt::{PrependSourcePrompter, Source};
1623
17#[derive(Parser)]24#[derive(Parser)]
18struct Opts {25struct Opts {
19 host: String,26 host: String,
20}27}
2128
22struct MyHandler {29fn agents_dir() -> anyhow::Result<PathBuf> {
23 host: String,30 std::env::var_os("REMOWT_AGENTS_DIR")
24 port: u16,31 .map(PathBuf::from)
32 .or_else(|| option_env!("REMOWT_AGENTS_DIR").map(PathBuf::from))
33 .ok_or_else(|| anyhow!("no remowt-agents bundle"))
25}34}
35
26#[async_trait]36#[tokio::main(flavor = "current_thread")]
27impl Handler for MyHandler {
28 type Error = russh::Error;
29 async fn check_server_key(37async fn main() -> anyhow::Result<()> {
30 &mut self,
31 server_public_key: &russh_keys::key::PublicKey,38 tracing_subscriber::fmt::init();
39 let opts = Opts::parse();
40
32 ) -> Result<bool, Self::Error> {41 let bundle = AgentBundle::from_dir(agents_dir()?)?;
33 Ok(russh_keys::check_known_hosts(42 let conn = Remowt::connect(&opts.host, &bundle).await?;
34 &self.host,43 let mut rpc = conn.rpc();
35 self.port,44
36 &server_public_key,45 serve_prompts(
37 )?)46 &mut rpc,
38 }47 PrependSourcePrompter {
39}48 prompter: RofiPrompter,
4049 source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],
41#[tokio::main(flavor = "current_thread")]50 description: "".to_owned(),
51 },
52 );
53 if let Some(sess) = conn.ssh() {
54 serve_editor(&mut rpc, SshEditor { sess });
55 }
56
57 info!("entering shell");
58 run_shell(&conn).await?;
59 info!("shell ended");
60
61 Ok(())
62}
63
42async fn main() -> anyhow::Result<()> {64async fn run_shell(conn: &Remowt) -> anyhow::Result<()> {
65 let term = match std::env::var("TERM") {
66 Ok(v) => v,
67 Err(VarError::NotPresent) => "xterm-256color".to_owned(),
68 Err(e) => return Err(e.into()),
69 };
70 let (cols, rows) = term_size().unwrap_or((80, 24));
71
72 let shell = conn.open_shell(&term, cols, rows).await?;
73 let resizer = shell.resizer();
74 let stream = shell.stream;
75
76 let _raw = RawMode::enable();
77
78 if let Ok(mut winch) = signal(SignalKind::window_change()) {
79 tokio::spawn(async move {
80 while winch.recv().await.is_some() {
81 if let Some((cols, rows)) = term_size() {
82 let _ = resizer.resize(cols, rows).await;
83 }
84 }
85 });
86 }
87
88 let (mut from_remote, mut to_remote) = tokio::io::split(stream);
89 let mut stdin = AsyncStdin::new()?;
90 let mut stdout = tokio::io::stdout();
91
92 tokio::select! {
93 r = tokio::io::copy(&mut from_remote, &mut stdout) => { r?; }
94 _ = tokio::io::copy(&mut stdin, &mut to_remote) => {}
95 }
96
97 Ok(())
98}
99
100struct AsyncStdin {
43 let rpc = Rpc::<Address, remowt_link_shared::Error>::new(Address::User);101 fd: AsyncFd<RawFd>,
44 tracing_subscriber::fmt::init();102 original_flags: i32,
103}
104
105impl AsyncStdin {
106 fn new() -> io::Result<Self> {
45 let opts = Opts::parse();107 let raw = libc::STDIN_FILENO;
46108 // SAFETY: F_GETFL/F_SETFL round-trip on a valid fd.
47 let conf = dbg!(russh_config::parse_home(&opts.host)?);
48 println!("connect");
49 let mut sess = connect(109 let original_flags = unsafe { libc::fcntl(raw, libc::F_GETFL) };
50 Arc::new(Config {110 if original_flags < 0 {
51 ..Default::default()
52 }),
53 dbg!((conf.host_name.clone(), conf.port)),
54 MyHandler {
55 host: conf.host_name,
56 port: conf.port,
57 },
58 )
59 .await?;
60 println!("agent");
61 let mut agent = russh_keys::agent::client::AgentClient::connect_env().await?;111 return Err(io::Error::last_os_error());
112 }
62 for ele in agent.request_identities().await? {113 if unsafe { libc::fcntl(raw, libc::F_SETFL, original_flags | libc::O_NONBLOCK) } < 0 {
63 let (_agent, res) = sess.authenticate_future(conf.user.clone(), ele, agent).await;114 return Err(io::Error::last_os_error());
64 agent = _agent;
65 if res? {
66 break;
67 }
68 }115 }
69 // let sess = Session::connect(opts.host, openssh::KnownHosts::Strict).await?;116 Ok(Self {
70
71 let socket = UnixSocket::new_stream()?;117 fd: AsyncFd::new(raw)?,
72118 original_flags,
73 println!("mktemp");119 })
120 }
121}
122
123impl Drop for AsyncStdin {
74 let mut cmd_chan = sess.channel_open_session().await?;124 fn drop(&mut self) {
75 cmd_chan125 // SAFETY: restoring the flags we saved on a valid fd.
76 .exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")126 unsafe { libc::fcntl(libc::STDIN_FILENO, libc::F_SETFL, self.original_flags) };
77 .await?;127 }
128}
129
130impl AsyncRead for AsyncStdin {
131 fn poll_read(
132 self: Pin<&mut Self>,
133 cx: &mut Context<'_>,
134 buf: &mut ReadBuf<'_>,
135 ) -> Poll<io::Result<()>> {
78 let mut stdout = vec![];136 let this = self.get_mut();
79 loop {137 loop {
138 let mut guard = match this.fd.poll_read_ready(cx) {
139 Poll::Ready(Ok(g)) => g,
140 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
141 Poll::Pending => return Poll::Pending,
142 };
143 let unfilled = buf.initialize_unfilled();
80 let Some(msg) = cmd_chan.wait().await else {144 let res = guard.try_io(|inner| {
145 let fd = *inner.get_ref();
146 // SAFETY: writing into `unfilled`'s own backing storage.
147 let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };
148 if n < 0 {
149 Err(io::Error::last_os_error())
150 } else {
81 bail!("unexpected channel end");151 Ok(n as usize)
82 };152 }
153 });
83 match msg {154 match res {
84 russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),155 Ok(Ok(n)) => {
156 buf.advance(n);
157 return Poll::Ready(Ok(()));
158 }
85 russh::ChannelMsg::ExitStatus { exit_status } => {159 Ok(Err(e)) => return Poll::Ready(Err(e)),
86 if exit_status != 0 {
87 bail!("mktemp failed");
88 }
89 break;
90 }
91 _ => {}160 Err(_would_block) => continue,
92 }161 }
93 }162 }
163 }
164}
94165
95 ensure!(stdout.ends_with(b"\n"));166fn term_size() -> Option<(u16, u16)> {
96 stdout.pop();167 let mut ws: libc::winsize = unsafe { std::mem::zeroed() };
97
98 // Remote host is not neccessary linux, openssh crate makes incorrect assumptions here.
99 // TODO: Remove on local close.
100 let remote_dir = PathBuf::from(OsString::from_vec(stdout));168 let rc = unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) };
101 let remote_socket = remote_dir.join("primary.sock");169 if rc != 0 || ws.ws_col == 0 {
102170 None
171 } else {
172 Some((ws.ws_col, ws.ws_row))
173 }
174}
175
176struct RawMode {
177 original: Termios,
178}
179
180impl RawMode {
181 fn enable() -> Option<Self> {
103 let local_dir = TempDir::new("remowt")?;182 let stdin = std::io::stdin();
183 // SAFETY: trivial libc call on a borrowed fd.
104 let local_socket = local_dir.path().join("primary.sock");184 if unsafe { libc::isatty(stdin.as_raw_fd()) } != 1 {
105185 return None;
106 println!("listen");186 }
107 socket.bind(&local_socket)?;187 let original = termios::tcgetattr(&stdin).ok()?;
108 let listener = socket.listen(1)?;188 let mut raw = original.clone();
109
110 eprintln!("forward socket");189 termios::cfmakeraw(&mut raw);
111
112 let mut sock = sess
113 .channel_open_direct_streamlocal(dbg!(remote_socket.to_str().expect("path is utf-8")))190 termios::tcsetattr(&stdin, SetArg::TCSANOW, &raw).ok()?;
114 .await?;
115
116 eprintln!("wait");191 Some(Self { original })
192 }
193}
194
195impl Drop for RawMode {
196 fn drop(&mut self) {
117 while let Some(v) = sock.wait().await {197 let _ = termios::tcsetattr(std::io::stdin(), SetArg::TCSANOW, &self.original);
118 dbg!(v);198 }
119 }199}
120
121 eprintln!("spawn agent");
122
123 // let _agent = sess
124 // .command("/home/lach/.remowt/remowt-agent")
125 // .arg("agent-real")
126 // .arg("--path")
127 // .arg(remote_socket.to_str().expect("path is utf-8"))
128 // .spawn()
129 // .await?;
130 //
131 // let (mut conn, _) = listener.accept().await?;
132 // let mut buf = [0u8; 13];
133 // conn.read_exact(&mut buf).await?;
134 // assert_eq!(&buf, b"REMOWT_HELLO\0");
135 // conn.write_all(b"REMOWT_EHLO\0").await?;
136 //
137 // println!("handshake complete!");
138
139 Ok(())
140}
141200