git.delta.rocks / remowt / refs/commits / 6d9cf16dada2

difftreelog

source

crates/remowt-link-shared/src/port.rs1.9 KiBsourcehistory
1use std::io;23use bifrostlink::Port;4use bytes::{Bytes, BytesMut};5use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};6use tokio::select;78pub fn child_port<R, W>(mut reader: R, mut writer: W) -> Port9where10	R: AsyncRead + Unpin + Send + 'static,11	W: AsyncWrite + Unpin + Send + 'static,12{13	Port::new(|mut rx, tx| async move {14		let read_task = async move {15			loop {16				let len = match reader.read_u32().await {17					Ok(len) => len,18					Err(e) => {19						log_read_end(&e);20						break;21					}22				};23				let mut buf = BytesMut::zeroed(len as usize);24				if let Err(e) = reader.read_exact(&mut buf).await {25					log_read_end(&e);26					break;27				}28				if tx.send(buf.freeze()).is_err() {29					break;30				}31			}32		};33		let write_task = async move {34			while let Some(msg) = rx.recv().await {35				if let Err(e) = write_frame(&mut writer, msg).await {36					log_write_end(&e);37					break;38				}39			}40		};41		select! {42			_ = read_task => {},43			_ = write_task => {},44		}45	})46}4748fn log_read_end(e: &io::Error) {49	if matches!(50		e.kind(),51		io::ErrorKind::UnexpectedEof52			| io::ErrorKind::BrokenPipe53			| io::ErrorKind::ConnectionReset54			| io::ErrorKind::ConnectionAborted55	) {56		tracing::debug!("child read ended: {e}");57	} else {58		tracing::error!("child read failed: {e}");59	}60}6162fn log_write_end(e: &io::Error) {63	if matches!(64		e.kind(),65		io::ErrorKind::BrokenPipe66			| io::ErrorKind::ConnectionReset67			| io::ErrorKind::ConnectionAborted68	) {69		tracing::debug!("child write ended: {e}");70	} else {71		tracing::error!("child write failed: {e}");72	}73}7475async fn write_frame<W: AsyncWrite + Unpin>(writer: &mut W, msg: Bytes) -> io::Result<()> {76	let len = u32::try_from(msg.len())77		.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "message larger than 4GB"))?;78	writer.write_u32(len).await?;79	writer.write_all(&msg).await?;80	writer.flush().await?;81	Ok(())82}