git.delta.rocks / remowt / refs/commits / 4516008b4ee2

difftreelog

refactor(remowt-ssh) switch to russh

twnkorssYaroslav Bolyukin2026-01-25parent: #745d95d.patch.diff
in: trunk

2 files changed

modifiedcmds/remowt-ssh/Cargo.tomldiffbeforeafterboth
--- a/cmds/remowt-ssh/Cargo.toml
+++ b/cmds/remowt-ssh/Cargo.toml
@@ -4,17 +4,25 @@
 edition = "2021"
 
 [dependencies]
-clap = { version = "4.5.16", features = ["derive"] }
-openssh = { version = "0.11.0", features = ["native-mux"] }
-tracing-subscriber = "0.3.18"
+clap = { workspace = true, features = ["derive"] }
+openssh = { workspace = true, features = ["native-mux"] }
+tracing-subscriber.workspace = true
 bifrostlink.workspace = true
-remowt-link-shared = { version = "0.1.0", path = "../../crates/remowt-link-shared" }
-tokio = { version = "1.39.3", features = ["macros"] }
-anyhow = "1.0.86"
+remowt-link-shared.workspace = true
+remowt-client.workspace = true
+tokio = { workspace = true, features = ["macros", "fs", "net", "io-util", "rt", "signal"] }
+nix = { workspace = true, features = ["term"] }
+anyhow.workspace = true
 bifrostlink-ports.workspace = true
-uuid = { version = "1.10.0", features = ["v4"] }
-tempdir = "0.3.7"
-russh = { git = "https://github.com/Eugeny/russh/" }
-russh-config = { git = "https://github.com/Eugeny/russh/" }
-russh-keys = { git = "https://github.com/Eugeny/russh/" }
-async-trait = "0.1.81"
+uuid = { workspace = true, features = ["v4"] }
+tempdir.workspace = true
+async-trait.workspace = true
+bytes.workspace = true
+tokio-stream.workspace = true
+tracing.workspace = true
+thiserror = "2.0.18"
+serde_json.workspace = true
+serde.workspace = true
+ui-prompt.workspace = true
+russh.workspace = true
+russh-config.workspace = true
modifiedcmds/remowt-ssh/src/main.rsdiffbeforeafterboth
before · cmds/remowt-ssh/src/main.rs
1use std::borrow::Cow;2use std::ffi::OsString;3use std::os::unix::ffi::OsStringExt;4use std::path::PathBuf;5use std::sync::Arc;67use anyhow::{bail, ensure};8use async_trait::async_trait;9use bifrostlink::Rpc;10use clap::Parser;11use remowt_link_shared::Address;12use russh::client::{connect, Config, Handler, Session};13use tempdir::TempDir;14use tokio::io::{AsyncReadExt, AsyncWriteExt};15use tokio::net::UnixSocket;1617#[derive(Parser)]18struct Opts {19	host: String,20}2122struct MyHandler {23	host: String,24	port: u16,25}26#[async_trait]27impl Handler for MyHandler {28	type Error = russh::Error;29	async fn check_server_key(30		&mut self,31		server_public_key: &russh_keys::key::PublicKey,32	) -> Result<bool, Self::Error> {33		Ok(russh_keys::check_known_hosts(34			&self.host,35			self.port,36			&server_public_key,37		)?)38	}39}4041#[tokio::main(flavor = "current_thread")]42async fn main() -> anyhow::Result<()> {43	let rpc = Rpc::<Address, remowt_link_shared::Error>::new(Address::User);44	tracing_subscriber::fmt::init();45	let opts = Opts::parse();4647	let conf = dbg!(russh_config::parse_home(&opts.host)?);48	println!("connect");49	let mut sess = connect(50		Arc::new(Config {51			..Default::default()52		}),53		dbg!((conf.host_name.clone(), conf.port)),54		MyHandler {55			host: conf.host_name,56			port: conf.port,57		},58	)59	.await?;60	println!("agent");61	let mut agent = russh_keys::agent::client::AgentClient::connect_env().await?;62	for ele in agent.request_identities().await? {63		let (_agent, res) = sess.authenticate_future(conf.user.clone(), ele, agent).await;64		agent = _agent;65		if res? {66			break;67		}68	}69	// let sess = Session::connect(opts.host, openssh::KnownHosts::Strict).await?;7071	let socket = UnixSocket::new_stream()?;7273	println!("mktemp");74	let mut cmd_chan = sess.channel_open_session().await?;75	cmd_chan76		.exec(true, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir")77		.await?;78	let mut stdout = vec![];79	loop {80		let Some(msg) = cmd_chan.wait().await else {81			bail!("unexpected channel end");82		};83		match msg {84			russh::ChannelMsg::Data { data } => stdout.extend(data.as_ref()),85			russh::ChannelMsg::ExitStatus { exit_status } => {86				if exit_status != 0 {87					bail!("mktemp failed");88				}89				break;90			}91			_ => {}92		}93	}9495	ensure!(stdout.ends_with(b"\n"));96	stdout.pop();9798	// Remote host is not neccessary linux, openssh crate makes incorrect assumptions here.99	// TODO: Remove on local close.100	let remote_dir = PathBuf::from(OsString::from_vec(stdout));101	let remote_socket = remote_dir.join("primary.sock");102103	let local_dir = TempDir::new("remowt")?;104	let local_socket = local_dir.path().join("primary.sock");105106	println!("listen");107	socket.bind(&local_socket)?;108	let listener = socket.listen(1)?;109110	eprintln!("forward socket");111112	let mut sock = sess113		.channel_open_direct_streamlocal(dbg!(remote_socket.to_str().expect("path is utf-8")))114		.await?;115116	eprintln!("wait");117	while let Some(v) = sock.wait().await {118		dbg!(v);119	}120121	eprintln!("spawn agent");122123	// let _agent = sess124	// 	.command("/home/lach/.remowt/remowt-agent")125	// 	.arg("agent-real")126	// 	.arg("--path")127	// 	.arg(remote_socket.to_str().expect("path is utf-8"))128	// 	.spawn()129	// 	.await?;130	//131	// let (mut conn, _) = listener.accept().await?;132	// let mut buf = [0u8; 13];133	// conn.read_exact(&mut buf).await?;134	// assert_eq!(&buf, b"REMOWT_HELLO\0");135	// conn.write_all(b"REMOWT_EHLO\0").await?;136	//137	// println!("handshake complete!");138139	Ok(())140}
after · cmds/remowt-ssh/src/main.rs
1use std::borrow::Cow;2use std::env::VarError;3use std::io;4use std::os::fd::{AsRawFd, RawFd};5use std::path::PathBuf;6use std::pin::Pin;7use std::task::{Context, Poll};89use anyhow::anyhow;10use clap::Parser;11use nix::libc;12use nix::sys::termios::{self, SetArg, Termios};13use remowt_client::editor::SshEditor;14use remowt_client::{AgentBundle, Remowt};15use remowt_link_shared::editor::serve_editor;16use tokio::io::unix::AsyncFd;17use tokio::io::{AsyncRead, ReadBuf};18use tokio::signal::unix::{signal, SignalKind};19use tracing::info;20use ui_prompt::bifrost::serve_prompts;21use ui_prompt::rofi::RofiPrompter;22use ui_prompt::{PrependSourcePrompter, Source};2324#[derive(Parser)]25struct Opts {26	host: String,27}2829fn agents_dir() -> anyhow::Result<PathBuf> {30	std::env::var_os("REMOWT_AGENTS_DIR")31		.map(PathBuf::from)32		.or_else(|| option_env!("REMOWT_AGENTS_DIR").map(PathBuf::from))33		.ok_or_else(|| anyhow!("no remowt-agents bundle"))34}3536#[tokio::main(flavor = "current_thread")]37async fn main() -> anyhow::Result<()> {38	tracing_subscriber::fmt::init();39	let opts = Opts::parse();4041	let bundle = AgentBundle::from_dir(agents_dir()?)?;42	let conn = Remowt::connect(&opts.host, &bundle).await?;43	let mut rpc = conn.rpc();4445	serve_prompts(46		&mut rpc,47		PrependSourcePrompter {48			prompter: RofiPrompter,49			source: vec![Source(Cow::Owned(format!("ssh host: {}", opts.host)))],50			description: "".to_owned(),51		},52	);53	if let Some(sess) = conn.ssh() {54		serve_editor(&mut rpc, SshEditor { sess });55	}5657	info!("entering shell");58	run_shell(&conn).await?;59	info!("shell ended");6061	Ok(())62}6364async fn run_shell(conn: &Remowt) -> anyhow::Result<()> {65	let term = match std::env::var("TERM") {66		Ok(v) => v,67		Err(VarError::NotPresent) => "xterm-256color".to_owned(),68		Err(e) => return Err(e.into()),69	};70	let (cols, rows) = term_size().unwrap_or((80, 24));7172	let shell = conn.open_shell(&term, cols, rows).await?;73	let resizer = shell.resizer();74	let stream = shell.stream;7576	let _raw = RawMode::enable();7778	if let Ok(mut winch) = signal(SignalKind::window_change()) {79		tokio::spawn(async move {80			while winch.recv().await.is_some() {81				if let Some((cols, rows)) = term_size() {82					let _ = resizer.resize(cols, rows).await;83				}84			}85		});86	}8788	let (mut from_remote, mut to_remote) = tokio::io::split(stream);89	let mut stdin = AsyncStdin::new()?;90	let mut stdout = tokio::io::stdout();9192	tokio::select! {93		r = tokio::io::copy(&mut from_remote, &mut stdout) => { r?; }94		_ = tokio::io::copy(&mut stdin, &mut to_remote) => {}95	}9697	Ok(())98}99100struct AsyncStdin {101	fd: AsyncFd<RawFd>,102	original_flags: i32,103}104105impl AsyncStdin {106	fn new() -> io::Result<Self> {107		let raw = libc::STDIN_FILENO;108		// SAFETY: F_GETFL/F_SETFL round-trip on a valid fd.109		let original_flags = unsafe { libc::fcntl(raw, libc::F_GETFL) };110		if original_flags < 0 {111			return Err(io::Error::last_os_error());112		}113		if unsafe { libc::fcntl(raw, libc::F_SETFL, original_flags | libc::O_NONBLOCK) } < 0 {114			return Err(io::Error::last_os_error());115		}116		Ok(Self {117			fd: AsyncFd::new(raw)?,118			original_flags,119		})120	}121}122123impl Drop for AsyncStdin {124	fn drop(&mut self) {125		// SAFETY: restoring the flags we saved on a valid fd.126		unsafe { libc::fcntl(libc::STDIN_FILENO, libc::F_SETFL, self.original_flags) };127	}128}129130impl AsyncRead for AsyncStdin {131	fn poll_read(132		self: Pin<&mut Self>,133		cx: &mut Context<'_>,134		buf: &mut ReadBuf<'_>,135	) -> Poll<io::Result<()>> {136		let this = self.get_mut();137		loop {138			let mut guard = match this.fd.poll_read_ready(cx) {139				Poll::Ready(Ok(g)) => g,140				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),141				Poll::Pending => return Poll::Pending,142			};143			let unfilled = buf.initialize_unfilled();144			let res = guard.try_io(|inner| {145				let fd = *inner.get_ref();146				// SAFETY: writing into `unfilled`'s own backing storage.147				let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };148				if n < 0 {149					Err(io::Error::last_os_error())150				} else {151					Ok(n as usize)152				}153			});154			match res {155				Ok(Ok(n)) => {156					buf.advance(n);157					return Poll::Ready(Ok(()));158				}159				Ok(Err(e)) => return Poll::Ready(Err(e)),160				Err(_would_block) => continue,161			}162		}163	}164}165166fn term_size() -> Option<(u16, u16)> {167	let mut ws: libc::winsize = unsafe { std::mem::zeroed() };168	let rc = unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) };169	if rc != 0 || ws.ws_col == 0 {170		None171	} else {172		Some((ws.ws_col, ws.ws_row))173	}174}175176struct RawMode {177	original: Termios,178}179180impl RawMode {181	fn enable() -> Option<Self> {182		let stdin = std::io::stdin();183		// SAFETY: trivial libc call on a borrowed fd.184		if unsafe { libc::isatty(stdin.as_raw_fd()) } != 1 {185			return None;186		}187		let original = termios::tcgetattr(&stdin).ok()?;188		let mut raw = original.clone();189		termios::cfmakeraw(&mut raw);190		termios::tcsetattr(&stdin, SetArg::TCSANOW, &raw).ok()?;191		Some(Self { original })192	}193}194195impl Drop for RawMode {196	fn drop(&mut self) {197		let _ = termios::tcsetattr(std::io::stdin(), SetArg::TCSANOW, &self.original);198	}199}