1use std::ffi::OsStr;2use std::io;3use std::process::Stdio;4use std::sync::Mutex;56use bifrostlink::{Port, Rpc, Rtt, WeakRpc};7use bytes::{Bytes, BytesMut};8use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};9use tokio::process::{Child, ChildStdin, ChildStdout, Command};1011use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};12use remowt_link_shared::{Address, BifConfig};1314pub fn serve(rpc: &mut Rpc<BifConfig>) {15 let host = Host {16 me: rpc.me(),17 rpc: rpc.clone().downgrade(),18 children: Mutex::new(Vec::new()),19 };20 PluginEndpoints(host).register_endpoints(rpc);21}2223struct Host {24 me: Address,25 rpc: WeakRpc<BifConfig>,26 children: Mutex<Vec<Child>>,27}2829impl Host {30 fn spawn(&self, id: u16, path: impl AsRef<OsStr>) -> Result<(), Error> {31 let rpc = self.rpc.clone().upgrade().ok_or(Error::Gone)?;3233 let mut child = Command::new(path)34 .arg(id.to_string())35 .arg(serde_json::to_string(&self.me).expect("address serializes"))36 .stdin(Stdio::piped())37 .stdout(Stdio::piped())38 .kill_on_drop(true)39 .spawn()40 .map_err(|e| Error::Spawn(e.to_string()))?;41 let stdin = child.stdin.take().expect("stdin piped");42 let stdout = child.stdout.take().expect("stdout piped");4344 rpc.add_direct(Address::Plugin(id), child_port(stdout, stdin), Rtt(0));45 self.children.lock().expect("not poisoned").push(child);46 Ok(())47 }48}4950impl PluginHost for Host {51 async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {52 53 54 55 if name.is_empty() || name == "." || name == ".." || name.contains(['/', '\0']) {56 return Err(Error::BadName);57 }58 let exe = std::env::current_exe().map_err(|e| Error::Spawn(e.to_string()))?;59 let dir = exe60 .parent()61 .ok_or_else(|| Error::Spawn("primary agent has no parent directory".to_owned()))?;62 self.spawn(id, dir.join(&name))63 }6465 async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {66 if path.is_empty() || path.contains('\0') {67 return Err(Error::BadName);68 }69 self.spawn(id, path)70 }71}7273fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {74 Port::new(|mut rx, tx| async move {75 let reader = async move {76 loop {77 let len = match stdout.read_u32().await {78 Ok(len) => len,79 Err(e) => {80 tracing::error!("plugin stdout read failed: {e}");81 break;82 }83 };84 let mut buf = BytesMut::zeroed(len as usize);85 if let Err(e) = stdout.read_exact(&mut buf).await {86 tracing::error!("plugin stdout read failed: {e}");87 break;88 }89 if tx.send(buf.freeze()).is_err() {90 break;91 }92 }93 };94 let writer = async move {95 while let Some(msg) = rx.recv().await {96 if let Err(e) = write_frame(&mut stdin, msg).await {97 tracing::error!("plugin stdin write failed: {e}");98 break;99 }100 }101 };102 tokio::join!(reader, writer);103 })104}105106async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {107 let len = u32::try_from(msg.len())108 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;109 stdin.write_u32(len).await?;110 stdin.write_all(&msg).await?;111 stdin.flush().await?;112 Ok(())113}