git.delta.rocks / remowt / refs/commits / eadb0bb3c19e

difftreelog

source

crates/remowt-link-shared/src/port.rs1.5 KiBsourcehistory
1use std::io;23use bifrostlink::Port;4use bytes::{Bytes, BytesMut};5use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};67/// Wire a length-prefixed duplex byte stream (e.g. a child process's8/// stdout/stdin) into a bifrost [`Port`]. Each frame is a big-endian `u32`9/// length followed by that many payload bytes.10pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port11where12	R: AsyncRead + Unpin + Send + 'static,13	W: AsyncWrite + Unpin + Send + 'static,14{15	Port::new(|mut rx, tx| async move {16		let read_task = async move {17			loop {18				let len = match reader.read_u32().await {19					Ok(len) => len,20					Err(e) => {21						tracing::error!("child read failed: {e}");22						break;23					}24				};25				let mut buf = BytesMut::zeroed(len as usize);26				if let Err(e) = reader.read_exact(&mut buf).await {27					tracing::error!("child read failed: {e}");28					break;29				}30				if tx.send(buf.freeze()).is_err() {31					break;32				}33			}34		};35		let write_task = async move {36			while let Some(msg) = rx.recv().await {37				if let Err(e) = write_frame(&mut writer, msg).await {38					tracing::error!("child write failed: {e}");39					break;40				}41			}42		};43		tokio::join!(read_task, write_task);44	})45}4647async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {48	let len = u32::try_from(msg.len())49		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;50	writer.write_u32(len).await?;51	writer.write_all(&msg).await?;52	writer.flush().await?;53	Ok(())54}