1use std::io;23use bifrostlink::Port;4use bytes::{Bytes, BytesMut};5use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};678910pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port11where12 R: AsyncRead + Unpin + Send + 'static,13 W: AsyncWrite + Unpin + Send + 'static,14{15 Port::new(|mut rx, tx| async move {16 let read_task = async move {17 loop {18 let len = match reader.read_u32().await {19 Ok(len) => len,20 Err(e) => {21 tracing::error!("child read failed: {e}");22 break;23 }24 };25 let mut buf = BytesMut::zeroed(len as usize);26 if let Err(e) = reader.read_exact(&mut buf).await {27 tracing::error!("child read failed: {e}");28 break;29 }30 if tx.send(buf.freeze()).is_err() {31 break;32 }33 }34 };35 let write_task = async move {36 while let Some(msg) = rx.recv().await {37 if let Err(e) = write_frame(&mut writer, msg).await {38 tracing::error!("child write failed: {e}");39 break;40 }41 }42 };43 tokio::join!(read_task, write_task);44 })45}4647async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {48 let len = u32::try_from(msg.len())49 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;50 writer.write_u32(len).await?;51 writer.write_all(&msg).await?;52 writer.flush().await?;53 Ok(())54}