git.delta.rocks / remowt / refs/commits / fdfebf9c37e9

difftreelog

feat remowt-pty

oxlvzyyuYaroslav Bolyukin2026-01-25parent: #211d408.patch.diff
in: trunk

2 files changed

addedcrates/remowt-pty/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/remowt-pty/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "remowt-pty"
+description = "PTY/shell endpoint for remowt"
+version.workspace = true
+edition = "2021"
+
+[dependencies]
+bifrostlink.workspace = true
+bifrostlink-macros.workspace = true
+camino = { workspace = true, features = ["serde1"] }
+nix = { workspace = true, features = ["process", "term"] }
+serde = { workspace = true, features = ["derive"] }
+thiserror = "1"
+tokio = { workspace = true, features = [
+	"net",
+	"io-util",
+	"rt",
+	"macros",
+	"process",
+	"sync",
+] }
+tracing.workspace = true
addedcrates/remowt-pty/src/lib.rsdiffbeforeafterboth
after · crates/remowt-pty/src/lib.rs
1use std::collections::HashMap;2use std::io;3use std::os::fd::{AsRawFd, OwnedFd};4use std::pin::Pin;5use std::process::Stdio;6use std::sync::atomic::{AtomicU64, Ordering};7use std::sync::{Arc, Mutex};8use std::task::{Context, Poll};910use bifrostlink::declarative::endpoints;11use bifrostlink::Config;12use camino::Utf8PathBuf;13use nix::libc;14use nix::pty::{openpty, OpenptyResult, Winsize};15use serde::{Deserialize, Serialize};16use tokio::io::unix::AsyncFd;17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};18use tokio::net::UnixStream;19use tracing::{info, warn};2021pub type ShellId = u64;2223#[derive(Serialize, Deserialize, Debug, thiserror::Error)]24pub enum Error {25	#[error("openpty failed: {0}")]26	Open(String),27	#[error("failed to spawn shell: {0}")]28	Spawn(String),29	#[error("failed to connect to forwarded socket: {0}")]30	Connect(String),31	#[error("no shell with that id")]32	NoSuchShell,33	#[error("resize failed: {0}")]34	Resize(String),35	#[error("io error: {0}")]36	Io(String),37}3839impl From<io::Error> for Error {40	fn from(e: io::Error) -> Self {41		Error::Io(e.to_string())42	}43}4445#[derive(Clone, Default)]46pub struct Pty {47	shells: Arc<Mutex<HashMap<ShellId, OwnedFd>>>,48	next_id: Arc<AtomicU64>,49}5051impl Pty {52	pub fn new() -> Self {53		Self::default()54	}55}5657#[endpoints(ns = 7)]58impl Pty {59	#[endpoints(id = 1)]60	async fn open_shell(61		&self,62		socket_path: Utf8PathBuf,63		term: String,64		cols: u16,65		rows: u16,66	) -> Result<ShellId, Error> {67		let ws = Winsize {68			ws_row: rows,69			ws_col: cols,70			ws_xpixel: 0,71			ws_ypixel: 0,72		};73		let OpenptyResult { master, slave } =74			openpty(Some(&ws), None).map_err(|e| Error::Open(e.to_string()))?;7576		let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_owned());7778		let slave_in = slave.try_clone()?;79		let slave_out = slave.try_clone()?;80		let slave_err = slave;8182		let mut cmd = tokio::process::Command::new(&shell);83		cmd.env("TERM", &term);84		if let Ok(home) = std::env::var("HOME") {85			cmd.current_dir(home);86		}87		cmd.stdin(Stdio::from(slave_in));88		cmd.stdout(Stdio::from(slave_out));89		cmd.stderr(Stdio::from(slave_err));90		// SAFETY: only async-signal-safe calls (setsid, ioctl) before exec.91		unsafe {92			cmd.pre_exec(|| {93				nix::unistd::setsid().map_err(|e| io::Error::from_raw_os_error(e as i32))?;94				if libc::ioctl(0, libc::TIOCSCTTY as _, 0) < 0 {95					return Err(io::Error::last_os_error());96				}97				Ok(())98			});99		}100101		let mut child = cmd.spawn().map_err(|e| Error::Spawn(e.to_string()))?;102103		let resize_fd = master.try_clone()?;104		let id = self.next_id.fetch_add(1, Ordering::Relaxed);105		self.shells106			.lock()107			.expect("not poisoned")108			.insert(id, resize_fd);109110		let sock = match UnixStream::connect(&socket_path).await {111			Ok(s) => s,112			Err(e) => {113				self.shells.lock().expect("not poisoned").remove(&id);114				let _ = child.kill().await;115				return Err(Error::Connect(e.to_string()));116			}117		};118		let pty = AsyncPty::new(master)?;119120		info!(id, shell, "shell opened");121		let shells = self.shells.clone();122		tokio::spawn(async move {123			let mut pty = pty;124			let mut sock = sock;125			if let Err(e) = tokio::io::copy_bidirectional(&mut pty, &mut sock).await {126				warn!(id, "shell pump ended: {e}");127			}128			let _ = child.kill().await;129			shells.lock().expect("not poisoned").remove(&id);130			info!(id, "shell closed");131		});132133		Ok(id)134	}135136	#[endpoints(id = 2)]137	async fn resize(&self, id: ShellId, cols: u16, rows: u16) -> Result<(), Error> {138		let ws = libc::winsize {139			ws_row: rows,140			ws_col: cols,141			ws_xpixel: 0,142			ws_ypixel: 0,143		};144		let shells = self.shells.lock().expect("not poisoned");145		let fd = shells.get(&id).ok_or(Error::NoSuchShell)?;146		// SAFETY: `fd` is a live PTY master147		let rc = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ as _, &ws) };148		if rc < 0 {149			return Err(Error::Resize(io::Error::last_os_error().to_string()));150		}151		Ok(())152	}153}154155struct AsyncPty {156	fd: AsyncFd<OwnedFd>,157}158159impl AsyncPty {160	fn new(fd: OwnedFd) -> io::Result<Self> {161		let raw = fd.as_raw_fd();162		// SAFETY: standard F_GETFL/F_SETFL round-trip on a valid fd.163		unsafe {164			let flags = libc::fcntl(raw, libc::F_GETFL);165			if flags < 0 {166				return Err(io::Error::last_os_error());167			}168			if libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {169				return Err(io::Error::last_os_error());170			}171		}172		Ok(Self {173			fd: AsyncFd::new(fd)?,174		})175	}176}177178impl AsyncRead for AsyncPty {179	fn poll_read(180		self: Pin<&mut Self>,181		cx: &mut Context<'_>,182		buf: &mut ReadBuf<'_>,183	) -> Poll<io::Result<()>> {184		let this = self.get_mut();185		loop {186			let mut guard = match this.fd.poll_read_ready(cx) {187				Poll::Ready(Ok(g)) => g,188				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),189				Poll::Pending => return Poll::Pending,190			};191			let unfilled = buf.initialize_unfilled();192			let res = guard.try_io(|inner| {193				let fd = inner.get_ref().as_raw_fd();194				// SAFETY: writing into `unfilled`'s own backing storage.195				let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };196				if n < 0 {197					let err = io::Error::last_os_error();198					if err.raw_os_error() == Some(libc::EIO) {199						Ok(0)200					} else {201						Err(err)202					}203				} else {204					Ok(n as usize)205				}206			});207			match res {208				Ok(Ok(n)) => {209					buf.advance(n);210					return Poll::Ready(Ok(()));211				}212				Ok(Err(e)) => return Poll::Ready(Err(e)),213				Err(_would_block) => continue,214			}215		}216	}217}218219impl AsyncWrite for AsyncPty {220	fn poll_write(221		self: Pin<&mut Self>,222		cx: &mut Context<'_>,223		buf: &[u8],224	) -> Poll<io::Result<usize>> {225		let this = self.get_mut();226		loop {227			let mut guard = match this.fd.poll_write_ready(cx) {228				Poll::Ready(Ok(g)) => g,229				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),230				Poll::Pending => return Poll::Pending,231			};232			let res = guard.try_io(|inner| {233				let fd = inner.get_ref().as_raw_fd();234				// SAFETY: reading from `buf` for `buf.len()` bytes.235				let n = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) };236				if n < 0 {237					Err(io::Error::last_os_error())238				} else {239					Ok(n as usize)240				}241			});242			match res {243				Ok(r) => return Poll::Ready(r),244				Err(_would_block) => continue,245			}246		}247	}248249	fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {250		Poll::Ready(Ok(()))251	}252253	fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {254		Poll::Ready(Ok(()))255	}256}