1use std::io;23use bifrostlink::Port;4use bytes::{Bytes, BytesMut};5use russh::{Channel, ChannelStream};6use russh::client::Msg;7use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};8use tokio::join;9use tracing::error;1011async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {12 let len = srx.read_u32().await?;13 let mut buf = BytesMut::zeroed(len as usize);14 srx.read_exact(&mut buf).await?;15 Ok(buf)16}17async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {18 stx.write_u32(value.len().try_into().expect("can't be larger"))19 .await?;20 stx.write_all(&value).await?;21 Ok(())22}2324pub fn channel_port(ch: Channel<Msg>) -> Port {25 Port::new(move |mut rx, tx| async move {26 let (mut srx, mut stx) = tokio::io::split(ch.into_stream());27 let srx_task = async move {28 loop {29 match read(&mut srx).await {30 Ok(buf) => {31 if tx.send(buf.freeze()).is_err() {32 break;33 }34 }35 Err(e) => {36 error!("channel read failed: {e}");37 break;38 }39 }40 }41 };42 let stx_task = async move {43 while let Some(value) = rx.recv().await {44 if let Err(e) = write(&mut stx, value).await {45 error!("channel write failed: {e}");46 break;47 }48 }49 };50 join!(srx_task, stx_task);51 })52}