git.delta.rocks / remowt / refs/commits / 99930b3c897c

difftreelog

source

crates/remowt-client/src/port.rs1.3 KiBsourcehistory
1use std::io;23use bifrostlink::Port;4use bytes::{Bytes, BytesMut};5use russh::{Channel, ChannelStream};6use russh::client::Msg;7use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};8use tokio::join;9use tracing::error;1011async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {12	let len = srx.read_u32().await?;13	let mut buf = BytesMut::zeroed(len as usize);14	srx.read_exact(&mut buf).await?;15	Ok(buf)16}17async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {18	stx.write_u32(value.len().try_into().expect("can't be larger"))19		.await?;20	stx.write_all(&value).await?;21	Ok(())22}2324pub fn channel_port(ch: Channel<Msg>) -> Port {25	Port::new(move |mut rx, tx| async move {26		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());27		let srx_task = async move {28			loop {29				match read(&mut srx).await {30					Ok(buf) => {31						if tx.send(buf.freeze()).is_err() {32							break;33						}34					}35					Err(e) => {36						error!("channel read failed: {e}");37						break;38					}39				}40			}41		};42		let stx_task = async move {43			while let Some(value) = rx.recv().await {44				if let Err(e) = write(&mut stx, value).await {45					error!("channel write failed: {e}");46					break;47				}48			}49		};50		join!(srx_task, stx_task);51	})52}