git.delta.rocks / remowt / refs/commits / 875f55ab11c2

difftreelog

source

crates/remowt-plugin/src/host.rs3.3 KiBsourcehistory
1use std::ffi::OsStr;2use std::io;3use std::process::Stdio;4use std::sync::Mutex;56use bifrostlink::{Port, Rpc, Rtt, WeakRpc};7use bytes::{Bytes, BytesMut};8use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};9use tokio::process::{Child, ChildStdin, ChildStdout, Command};1011use remowt_link_shared::plugin::{Error, PluginEndpoints, PluginHost};12use remowt_link_shared::{Address, BifConfig};1314pub fn serve(rpc: &mut Rpc<BifConfig>) {15	let host = Host {16		me: rpc.me(),17		rpc: rpc.clone().downgrade(),18		children: Mutex::new(Vec::new()),19	};20	PluginEndpoints(host).register_endpoints(rpc);21}2223struct Host {24	me: Address,25	rpc: WeakRpc<BifConfig>,26	children: Mutex<Vec<Child>>,27}2829impl Host {30	fn spawn(&self, id: u16, path: impl AsRef<OsStr>) -> Result<(), Error> {31		let rpc = self.rpc.clone().upgrade().ok_or(Error::Gone)?;3233		let mut child = Command::new(path)34			.arg(id.to_string())35			.arg(serde_json::to_string(&self.me).expect("address serializes"))36			.stdin(Stdio::piped())37			.stdout(Stdio::piped())38			.kill_on_drop(true)39			.spawn()40			.map_err(|e| Error::Spawn(e.to_string()))?;41		let stdin = child.stdin.take().expect("stdin piped");42		let stdout = child.stdout.take().expect("stdout piped");4344		rpc.add_direct(Address::Plugin(id), child_port(stdout, stdin), Rtt(0));45		self.children.lock().expect("not poisoned").push(child);46		Ok(())47	}48}4950impl PluginHost for Host {51	async fn load_plugin(&self, id: u16, name: String) -> Result<(), Error> {52		// TODO: Right now loads plugin next to the binary...53		// But with our CA addressed schema, the plugins should be located in content-addressed subdir...54		// Maybe it should just be scrapped in favor of load_plugin_path.55		if name.is_empty() || name == "." || name == ".." || name.contains(['/', '\0']) {56			return Err(Error::BadName);57		}58		let exe = std::env::current_exe().map_err(|e| Error::Spawn(e.to_string()))?;59		let dir = exe60			.parent()61			.ok_or_else(|| Error::Spawn("primary agent has no parent directory".to_owned()))?;62		self.spawn(id, dir.join(&name))63	}6465	async fn load_plugin_path(&self, id: u16, path: String) -> Result<(), Error> {66		if path.is_empty() || path.contains('\0') {67			return Err(Error::BadName);68		}69		self.spawn(id, path)70	}71}7273fn child_port(mut stdout: ChildStdout, mut stdin: ChildStdin) -> Port {74	Port::new(|mut rx, tx| async move {75		let reader = async move {76			loop {77				let len = match stdout.read_u32().await {78					Ok(len) => len,79					Err(e) => {80						tracing::error!("plugin stdout read failed: {e}");81						break;82					}83				};84				let mut buf = BytesMut::zeroed(len as usize);85				if let Err(e) = stdout.read_exact(&mut buf).await {86					tracing::error!("plugin stdout read failed: {e}");87					break;88				}89				if tx.send(buf.freeze()).is_err() {90					break;91				}92			}93		};94		let writer = async move {95			while let Some(msg) = rx.recv().await {96				if let Err(e) = write_frame(&mut stdin, msg).await {97					tracing::error!("plugin stdin write failed: {e}");98					break;99				}100			}101		};102		tokio::join!(reader, writer);103	})104}105106async fn write_frame(stdin: &mut ChildStdin, msg: Bytes) -> io::Result<()> {107	let len = u32::try_from(msg.len())108		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;109	stdin.write_u32(len).await?;110	stdin.write_all(&msg).await?;111	stdin.flush().await?;112	Ok(())113}