1use bytes::Bytes;2use russh::client::Msg;3use russh::{Channel, ChannelMsg};4use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};5use tokio::sync::oneshot;67const BUF: usize = 64 * 1024;89pub(crate) struct SshExecChild {10 pub stdin: DuplexStream,11 pub stdout: DuplexStream,12 pub stderr: DuplexStream,13 pub exit: oneshot::Receiver<Option<u32>>,14}1516impl SshExecChild {17 18 pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {19 let (stdin, mut stdin_r) = tokio::io::duplex(BUF);20 let (mut out_w, stdout) = tokio::io::duplex(BUF);21 let (mut err_w, stderr) = tokio::io::duplex(BUF);22 let (exit_tx, exit) = oneshot::channel();2324 tokio::spawn(async move {25 let (mut read, write) = ch.split();2627 let stdin_pump = tokio::spawn(async move {28 let mut buf = vec![0u8; BUF];29 loop {30 match stdin_r.read(&mut buf).await {31 Ok(0) | Err(_) => break,32 Ok(n) => {33 if write34 .data_bytes(Bytes::copy_from_slice(&buf[..n]))35 .await36 .is_err()37 {38 return;39 }40 }41 }42 }43 let _ = write.eof().await;44 });4546 let mut code = None;47 while let Some(msg) = read.wait().await {48 match msg {49 ChannelMsg::Data { data } => {50 if out_w.write_all(&data).await.is_err() {51 break;52 }53 }54 ChannelMsg::ExtendedData { data, .. } => {55 if err_w.write_all(&data).await.is_err() {56 break;57 }58 }59 ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),60 _ => {}61 }62 }6364 stdin_pump.abort();65 let _ = out_w.shutdown().await;66 let _ = err_w.shutdown().await;67 let _ = exit_tx.send(code);68 });6970 SshExecChild {71 stdin,72 stdout,73 stderr,74 exit,75 }76 }7778 79 pub(crate) async fn wait(self) -> Option<u32> {80 self.exit.await.ok().flatten()81 }82}