difftreelog
feat remowt-pty
in: trunk
2 files changed
crates/remowt-pty/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/remowt-pty/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "remowt-pty"
+description = "PTY/shell endpoint for remowt"
+version.workspace = true
+edition = "2021"
+
+[dependencies]
+bifrostlink.workspace = true
+bifrostlink-macros.workspace = true
+camino = { workspace = true, features = ["serde1"] }
+nix = { workspace = true, features = ["process", "term"] }
+serde = { workspace = true, features = ["derive"] }
+thiserror = "1"
+tokio = { workspace = true, features = [
+ "net",
+ "io-util",
+ "rt",
+ "macros",
+ "process",
+ "sync",
+] }
+tracing.workspace = true
crates/remowt-pty/src/lib.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::io;3use std::os::fd::{AsRawFd, OwnedFd};4use std::pin::Pin;5use std::process::Stdio;6use std::sync::atomic::{AtomicU64, Ordering};7use std::sync::{Arc, Mutex};8use std::task::{Context, Poll};910use bifrostlink::declarative::endpoints;11use bifrostlink::Config;12use camino::Utf8PathBuf;13use nix::libc;14use nix::pty::{openpty, OpenptyResult, Winsize};15use serde::{Deserialize, Serialize};16use tokio::io::unix::AsyncFd;17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};18use tokio::net::UnixStream;19use tracing::{info, warn};2021pub type ShellId = u64;2223#[derive(Serialize, Deserialize, Debug, thiserror::Error)]24pub enum Error {25 #[error("openpty failed: {0}")]26 Open(String),27 #[error("failed to spawn shell: {0}")]28 Spawn(String),29 #[error("failed to connect to forwarded socket: {0}")]30 Connect(String),31 #[error("no shell with that id")]32 NoSuchShell,33 #[error("resize failed: {0}")]34 Resize(String),35 #[error("io error: {0}")]36 Io(String),37}3839impl From<io::Error> for Error {40 fn from(e: io::Error) -> Self {41 Error::Io(e.to_string())42 }43}4445#[derive(Clone, Default)]46pub struct Pty {47 shells: Arc<Mutex<HashMap<ShellId, OwnedFd>>>,48 next_id: Arc<AtomicU64>,49}5051impl Pty {52 pub fn new() -> Self {53 Self::default()54 }55}5657#[endpoints(ns = 7)]58impl Pty {59 #[endpoints(id = 1)]60 async fn open_shell(61 &self,62 socket_path: Utf8PathBuf,63 term: String,64 cols: u16,65 rows: u16,66 ) -> Result<ShellId, Error> {67 let ws = Winsize {68 ws_row: rows,69 ws_col: cols,70 ws_xpixel: 0,71 ws_ypixel: 0,72 };73 let OpenptyResult { master, slave } =74 openpty(Some(&ws), None).map_err(|e| Error::Open(e.to_string()))?;7576 let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_owned());7778 let slave_in = slave.try_clone()?;79 let slave_out = slave.try_clone()?;80 let slave_err = slave;8182 let mut cmd = tokio::process::Command::new(&shell);83 cmd.env("TERM", &term);84 if let Ok(home) = std::env::var("HOME") {85 cmd.current_dir(home);86 }87 cmd.stdin(Stdio::from(slave_in));88 cmd.stdout(Stdio::from(slave_out));89 cmd.stderr(Stdio::from(slave_err));90 // SAFETY: only async-signal-safe calls (setsid, ioctl) before exec.91 unsafe {92 cmd.pre_exec(|| {93 nix::unistd::setsid().map_err(|e| io::Error::from_raw_os_error(e as i32))?;94 if libc::ioctl(0, libc::TIOCSCTTY as _, 0) < 0 {95 return Err(io::Error::last_os_error());96 }97 Ok(())98 });99 }100101 let mut child = cmd.spawn().map_err(|e| Error::Spawn(e.to_string()))?;102103 let resize_fd = master.try_clone()?;104 let id = self.next_id.fetch_add(1, Ordering::Relaxed);105 self.shells106 .lock()107 .expect("not poisoned")108 .insert(id, resize_fd);109110 let sock = match UnixStream::connect(&socket_path).await {111 Ok(s) => s,112 Err(e) => {113 self.shells.lock().expect("not poisoned").remove(&id);114 let _ = child.kill().await;115 return Err(Error::Connect(e.to_string()));116 }117 };118 let pty = AsyncPty::new(master)?;119120 info!(id, shell, "shell opened");121 let shells = self.shells.clone();122 tokio::spawn(async move {123 let mut pty = pty;124 let mut sock = sock;125 if let Err(e) = tokio::io::copy_bidirectional(&mut pty, &mut sock).await {126 warn!(id, "shell pump ended: {e}");127 }128 let _ = child.kill().await;129 shells.lock().expect("not poisoned").remove(&id);130 info!(id, "shell closed");131 });132133 Ok(id)134 }135136 #[endpoints(id = 2)]137 async fn resize(&self, id: ShellId, cols: u16, rows: u16) -> Result<(), Error> {138 let ws = libc::winsize {139 ws_row: rows,140 ws_col: cols,141 ws_xpixel: 0,142 ws_ypixel: 0,143 };144 let shells = self.shells.lock().expect("not poisoned");145 let fd = shells.get(&id).ok_or(Error::NoSuchShell)?;146 // SAFETY: `fd` is a live PTY master147 let rc = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ as _, &ws) };148 if rc < 0 {149 return Err(Error::Resize(io::Error::last_os_error().to_string()));150 }151 Ok(())152 }153}154155struct AsyncPty {156 fd: AsyncFd<OwnedFd>,157}158159impl AsyncPty {160 fn new(fd: OwnedFd) -> io::Result<Self> {161 let raw = fd.as_raw_fd();162 // SAFETY: standard F_GETFL/F_SETFL round-trip on a valid fd.163 unsafe {164 let flags = libc::fcntl(raw, libc::F_GETFL);165 if flags < 0 {166 return Err(io::Error::last_os_error());167 }168 if libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {169 return Err(io::Error::last_os_error());170 }171 }172 Ok(Self {173 fd: AsyncFd::new(fd)?,174 })175 }176}177178impl AsyncRead for AsyncPty {179 fn poll_read(180 self: Pin<&mut Self>,181 cx: &mut Context<'_>,182 buf: &mut ReadBuf<'_>,183 ) -> Poll<io::Result<()>> {184 let this = self.get_mut();185 loop {186 let mut guard = match this.fd.poll_read_ready(cx) {187 Poll::Ready(Ok(g)) => g,188 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),189 Poll::Pending => return Poll::Pending,190 };191 let unfilled = buf.initialize_unfilled();192 let res = guard.try_io(|inner| {193 let fd = inner.get_ref().as_raw_fd();194 // SAFETY: writing into `unfilled`'s own backing storage.195 let n = unsafe { libc::read(fd, unfilled.as_mut_ptr().cast(), unfilled.len()) };196 if n < 0 {197 let err = io::Error::last_os_error();198 if err.raw_os_error() == Some(libc::EIO) {199 Ok(0)200 } else {201 Err(err)202 }203 } else {204 Ok(n as usize)205 }206 });207 match res {208 Ok(Ok(n)) => {209 buf.advance(n);210 return Poll::Ready(Ok(()));211 }212 Ok(Err(e)) => return Poll::Ready(Err(e)),213 Err(_would_block) => continue,214 }215 }216 }217}218219impl AsyncWrite for AsyncPty {220 fn poll_write(221 self: Pin<&mut Self>,222 cx: &mut Context<'_>,223 buf: &[u8],224 ) -> Poll<io::Result<usize>> {225 let this = self.get_mut();226 loop {227 let mut guard = match this.fd.poll_write_ready(cx) {228 Poll::Ready(Ok(g)) => g,229 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),230 Poll::Pending => return Poll::Pending,231 };232 let res = guard.try_io(|inner| {233 let fd = inner.get_ref().as_raw_fd();234 // SAFETY: reading from `buf` for `buf.len()` bytes.235 let n = unsafe { libc::write(fd, buf.as_ptr().cast(), buf.len()) };236 if n < 0 {237 Err(io::Error::last_os_error())238 } else {239 Ok(n as usize)240 }241 });242 match res {243 Ok(r) => return Poll::Ready(r),244 Err(_would_block) => continue,245 }246 }247 }248249 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {250 Poll::Ready(Ok(()))251 }252253 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {254 Poll::Ready(Ok(()))255 }256}