1use std::io;23use bifrostlink::Port;4use bytes::{Bytes, BytesMut};5use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};6use tokio::select;78pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port9where10 R: AsyncRead + Unpin + Send + 'static,11 W: AsyncWrite + Unpin + Send + 'static,12{13 Port::new(|mut rx, tx| async move {14 let read_task = async move {15 loop {16 let len = match reader.read_u32().await {17 Ok(len) => len,18 Err(e) => {19 log_read_end(&e);20 break;21 }22 };23 let mut buf = BytesMut::zeroed(len as usize);24 if let Err(e) = reader.read_exact(&mut buf).await {25 log_read_end(&e);26 break;27 }28 if tx.send(buf.freeze()).is_err() {29 break;30 }31 }32 };33 let write_task = async move {34 while let Some(msg) = rx.recv().await {35 if let Err(e) = write_frame(&mut writer, msg).await {36 log_write_end(&e);37 break;38 }39 }40 };41 select! {42 _ = read_task => {},43 _ = write_task => {},44 }45 })46}4748fn log_read_end(e: &io::Error) {49 if matches!(50 e.kind(),51 io::ErrorKind::UnexpectedEof52 | io::ErrorKind::BrokenPipe53 | io::ErrorKind::ConnectionReset54 | io::ErrorKind::ConnectionAborted55 ) {56 tracing::debug!("child read ended: {e}");57 } else {58 tracing::error!("child read failed: {e}");59 }60}6162fn log_write_end(e: &io::Error) {63 if matches!(64 e.kind(),65 io::ErrorKind::BrokenPipe66 | io::ErrorKind::ConnectionReset67 | io::ErrorKind::ConnectionAborted68 ) {69 tracing::debug!("child write ended: {e}");70 } else {71 tracing::error!("child write failed: {e}");72 }73}7475async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {76 let len = u32::try_from(msg.len())77 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;78 writer.write_u32(len).await?;79 writer.write_all(&msg).await?;80 writer.flush().await?;81 Ok(())82}