git.delta.rocks / remowt / refs/commits / 4516008b4ee2

difftreelog

refactor(remowt-ssh) switch to russh

twnkorssYaroslav Bolyukin2026-01-25parent: #745d95d.patch.diff
in: trunk

2 files changed

modifiedcmds/remowt-ssh/Cargo.tomldiffbeforeafterboth
4edition = "2021"4edition = "2021"
55
6[dependencies]6[dependencies]
7clap = { version = "4.5.16", features = ["derive"] }7clap = { workspace = true, features = ["derive"] }
8openssh = { version = "0.11.0", features = ["native-mux"] }8openssh = { workspace = true, features = ["native-mux"] }
9tracing-subscriber = "0.3.18"9tracing-subscriber.workspace = true
10bifrostlink.workspace = true10bifrostlink.workspace = true
11remowt-link-shared = { version = "0.1.0", path = "../../crates/remowt-link-shared" }11remowt-link-shared.workspace = true
12remowt-client.workspace = true
13tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }
12tokio = { version = "1.39.3", features = ["macros"] }14nix = { workspace = true, features = ["term"] }
13anyhow = "1.0.86"15anyhow.workspace = true
14bifrostlink-ports.workspace = true16bifrostlink-ports.workspace = true
15uuid = { version = "1.10.0", features = ["v4"] }17uuid = { workspace = true, features = ["v4"] }
16tempdir = "0.3.7"18tempdir.workspace = true
17russh = { git = "https://github.com/Eugeny/russh/" }19async-trait.workspace = true
20bytes.workspace = true
18russh-config = { git = "https://github.com/Eugeny/russh/" }21tokio-stream.workspace = true
22tracing.workspace = true
23thiserror = "2.0.18"
19russh-keys = { git = "https://github.com/Eugeny/russh/" }24serde_json.workspace = true
25serde.workspace = true
20async-trait = "0.1.81"26ui-prompt.workspace = true
27russh.workspace = true
28russh-config.workspace = true
2129
modifiedcmds/remowt-ssh/src/main.rsdiffbeforeafterboth
--- a/cmds/remowt-ssh/src/main.rs
+++ b/cmds/remowt-ssh/src/main.rs
@@ -1,140 +1,199 @@
 use std::borrow::Cow;
-use std::ffi::OsString;
-use std::os::unix::ffi::OsStringExt;
+use std::env::VarError;
+use std::io;
+use std::os::fd::{AsRawFd, RawFd};
 use std::path::PathBuf;
-use std::sync::Arc;
+use std::pin::Pin;
+use std::task::{Context, Poll};
 
-use anyhow::{bail, ensure};
-use async_trait::async_trait;
-use bifrostlink::Rpc;
+use anyhow::anyhow;
 use clap::Parser;
-use remowt_link_shared::Address;
-use russh::client::{connect, Config, Handler, Session};
-use tempdir::TempDir;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::net::UnixSocket;
+use nix::libc;
+use nix::sys::termios::{self, SetArg, Termios};
+use remowt_client::editor::SshEditor;
+use remowt_client::{AgentBundle, Remowt};
+use remowt_link_shared::editor::serve_editor;
+use tokio::io::unix::AsyncFd;
+use tokio::io::{AsyncRead, ReadBuf};
+use tokio::signal::unix::{signal, SignalKind};
+use tracing::info;
+use ui_prompt::bifrost::serve_prompts;
+use ui_prompt::rofi::RofiPrompter;
+use ui_prompt::{PrependSourcePrompter, Source};
 
 #[derive(Parser)]
 struct Opts {
 	host: String,
 }
 
-struct MyHandler {
-	host: String,
-	port: u16,
-}
-#[async_trait]
-impl Handler for MyHandler {
-	type Error = russh::Error;
-	async fn check_server_key(
-		&mut self,
-		server_public_key: &russh_keys::key::PublicKey,
-	) -> Result<bool, Self::Error> {
-		Ok(russh_keys::check_known_hosts(
-			&self.host,
-			self.port,
-			&server_public_key,
-		)?)
-	}
+fn agents_dir() -> anyhow::Result<PathBuf> {
+	std::env::var_os("REMOWT_AGENTS_DIR")
+		.map(PathBuf::from)
+		.or_else(|| option_env!("REMOWT_AGENTS_DIR").map(PathBuf::from))
+		.ok_or_else(|| anyhow!("no remowt-agents bundle"))
 }
 
 #[tokio::main(flavor = "current_thread")]
 async fn main() -> anyhow::Result<()> {
-	let rpc = Rpc::<Address, remowt_link_shared::Error>::new(Address::User);
 	tracing_subscriber::fmt::init();
 	let opts = Opts::parse();
 
-	let conf = dbg!(russh_config::parse_home(&opts.host)?);
-	println!("connect");
-	let mut sess = connect(
-		Arc::new(Config {
-			..Default::default()
-		}),
-		dbg!((conf.host_name.clone(), conf.port)),
-		MyHandler {
-			host: conf.host_name,
-			port: conf.port,
+	let bundle = AgentBundle::from_dir(agents_dir()?)?;
+	let conn = Remowt::connect(&opts.host, &bundle).await?;
+	let mut rpc = conn.rpc();
+
+	serve_prompts(
+		&mut rpc,
+		PrependSourcePrompter {
+			prompter: RofiPrompter,
+			source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],
+			description: "".to_owned(),
 		},
-	)
-	.await?;
-	println!("agent");
-	let mut agent = russh_keys::agent::client::AgentClient::connect_env().await?;
-	for ele in agent.request_identities().await? {
-		let (_agent, res) = sess.authenticate_future(conf.user.clone(), ele, agent).await;
-		agent = _agent;
-		if res? {
-			break;
-		}
+	);
+	if let Some(sess) = conn.ssh() {
+		serve_editor(&mut rpc, SshEditor { sess });
 	}
-	// let sess = Session::connect(opts.host, openssh::KnownHosts::Strict).await?;
 
-	let socket = UnixSocket::new_stream()?;
+	info!("entering shell");
+	run_shell(&conn).await?;
+	info!("shell ended");
 
-	println!("mktemp");
-	let mut cmd_chan = sess.channel_open_session().await?;
-	cmd_chan
-		.exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")
-		.await?;
-	let mut stdout = vec![];
-	loop {
-		let Some(msg) = cmd_chan.wait().await else {
-			bail!("unexpected channel end");
-		};
-		match msg {
-			russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),
-			russh::ChannelMsg::ExitStatus { exit_status } => {
-				if exit_status != 0 {
-					bail!("mktemp failed");
+	Ok(())
+}
+
+async fn run_shell(conn: &Remowt) -> anyhow::Result<()> {
+	let term = match std::env::var("TERM") {
+		Ok(v) => v,
+		Err(VarError::NotPresent) => "xterm-256color".to_owned(),
+		Err(e) => return Err(e.into()),
+	};
+	let (cols, rows) = term_size().unwrap_or((80, 24));
+
+	let shell = conn.open_shell(&term, cols, rows).await?;
+	let resizer = shell.resizer();
+	let stream = shell.stream;
+
+	let _raw = RawMode::enable();
+
+	if let Ok(mut winch) = signal(SignalKind::window_change()) {
+		tokio::spawn(async move {
+			while winch.recv().await.is_some() {
+				if let Some((cols, rows)) = term_size() {
+					let _ = resizer.resize(cols, rows).await;
 				}
-				break;
 			}
-			_ => {}
-		}
+		});
 	}
 
-	ensure!(stdout.ends_with(b"\n"));
-	stdout.pop();
+	let (mut from_remote, mut to_remote) = tokio::io::split(stream);
+	let mut stdin = AsyncStdin::new()?;
+	let mut stdout = tokio::io::stdout();
 
-	// Remote host is not neccessary linux, openssh crate makes incorrect assumptions here.
-	// TODO: Remove on local close.
-	let remote_dir = PathBuf::from(OsString::from_vec(stdout));
-	let remote_socket = remote_dir.join("primary.sock");
+	tokio::select! {
+		r = tokio::io::copy(&mut from_remote, &mut stdout) => { r?; }
+		_ = tokio::io::copy(&mut stdin, &mut to_remote) => {}
+	}
 
-	let local_dir = TempDir::new("remowt")?;
-	let local_socket = local_dir.path().join("primary.sock");
+	Ok(())
+}
 
-	println!("listen");
-	socket.bind(&local_socket)?;
-	let listener = socket.listen(1)?;
+struct AsyncStdin {
+	fd: AsyncFd<RawFd>,
+	original_flags: i32,
+}
 
-	eprintln!("forward socket");
+impl AsyncStdin {
+	fn new() -> io::Result<Self> {
+		let raw = libc::STDIN_FILENO;
+		// SAFETY: F_GETFL/F_SETFL round-trip on a valid fd.
+		let original_flags = unsafe { libc::fcntl(raw, libc::F_GETFL) };
+		if original_flags < 0 {
+			return Err(io::Error::last_os_error());
+		}
+		if unsafe { libc::fcntl(raw, libc::F_SETFL, original_flags | libc::O_NONBLOCK) } < 0 {
+			return Err(io::Error::last_os_error());
+		}
+		Ok(Self {
+			fd: AsyncFd::new(raw)?,
+			original_flags,
+		})
+	}
+}
 
-	let mut sock = sess
-		.channel_open_direct_streamlocal(dbg!(remote_socket.to_str().expect("path is utf-8")))
-		.await?;
+impl Drop for AsyncStdin {
+	fn drop(&mut self) {
+		// SAFETY: restoring the flags we saved on a valid fd.
+		unsafe { libc::fcntl(libc::STDIN_FILENO, libc::F_SETFL, self.original_flags) };
+	}
+}
 
-	eprintln!("wait");
-	while let Some(v) = sock.wait().await {
-		dbg!(v);
+impl AsyncRead for AsyncStdin {
+	fn poll_read(
+		self: Pin<&mut Self>,
+		cx: &mut Context<'_>,
+		buf: &mut ReadBuf<'_>,
+	) -> Poll<io::Result<()>> {
+		let this = self.get_mut();
+		loop {
+			let mut guard = match this.fd.poll_read_ready(cx) {
+				Poll::Ready(Ok(g)) => g,
+				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
+				Poll::Pending => return Poll::Pending,
+			};
+			let unfilled = buf.initialize_unfilled();
+			let res = guard.try_io(|inner| {
+				let fd = *inner.get_ref();
+				// SAFETY: writing into `unfilled`'s own backing storage.
+				let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };
+				if n < 0 {
+					Err(io::Error::last_os_error())
+				} else {
+					Ok(n as usize)
+				}
+			});
+			match res {
+				Ok(Ok(n)) => {
+					buf.advance(n);
+					return Poll::Ready(Ok(()));
+				}
+				Ok(Err(e)) => return Poll::Ready(Err(e)),
+				Err(_would_block) => continue,
+			}
+		}
 	}
+}
+
+fn term_size() -> Option<(u16, u16)> {
+	let mut ws: libc::winsize = unsafe { std::mem::zeroed() };
+	let rc = unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) };
+	if rc != 0 || ws.ws_col == 0 {
+		None
+	} else {
+		Some((ws.ws_col, ws.ws_row))
+	}
+}
 
-	eprintln!("spawn agent");
+struct RawMode {
+	original: Termios,
+}
 
-	// let _agent = sess
-	// 	.command("/home/lach/.remowt/remowt-agent")
-	// 	.arg("agent-real")
-	// 	.arg("--path")
-	// 	.arg(remote_socket.to_str().expect("path is utf-8"))
-	// 	.spawn()
-	// 	.await?;
-	//
-	// let (mut conn, _) = listener.accept().await?;
-	// let mut buf = [0u8; 13];
-	// conn.read_exact(&mut buf).await?;
-	// assert_eq!(&buf, b"REMOWT_HELLO\0");
-	// conn.write_all(b"REMOWT_EHLO\0").await?;
-	//
-	// println!("handshake complete!");
+impl RawMode {
+	fn enable() -> Option<Self> {
+		let stdin = std::io::stdin();
+		// SAFETY: trivial libc call on a borrowed fd.
+		if unsafe { libc::isatty(stdin.as_raw_fd()) } != 1 {
+			return None;
+		}
+		let original = termios::tcgetattr(&stdin).ok()?;
+		let mut raw = original.clone();
+		termios::cfmakeraw(&mut raw);
+		termios::tcsetattr(&stdin, SetArg::TCSANOW, &raw).ok()?;
+		Some(Self { original })
+	}
+}
 
-	Ok(())
+impl Drop for RawMode {
+	fn drop(&mut self) {
+		let _ = termios::tcsetattr(std::io::stdin(), SetArg::TCSANOW, &self.original);
+	}
 }