git.delta.rocks / remowt / refs/commits / 6d9cf16dada2

difftreelog

source

crates/remowt-client/src/ssh_exec.rs1.9 KiBsourcehistory
1use bytes::Bytes;2use russh::client::Msg;3use russh::{Channel, ChannelMsg};4use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};5use tokio::sync::oneshot;67const BUF: usize = 64 * 1024;89pub(crate) struct SshExecChild {10	pub stdin: DuplexStream,11	pub stdout: DuplexStream,12	pub stderr: DuplexStream,13	pub exit: oneshot::Receiver<Option<u32>>,14}1516impl SshExecChild {17	/// Manage channel returned by russh exec().18	pub(crate) fn from_exec(ch: Channel<Msg>) -> Self {19		let (stdin, mut stdin_r) = tokio::io::duplex(BUF);20		let (mut out_w, stdout) = tokio::io::duplex(BUF);21		let (mut err_w, stderr) = tokio::io::duplex(BUF);22		let (exit_tx, exit) = oneshot::channel();2324		tokio::spawn(async move {25			let (mut read, write) = ch.split();2627			let stdin_pump = tokio::spawn(async move {28				let mut buf = vec![0u8; BUF];29				loop {30					match stdin_r.read(&mut buf).await {31						Ok(0) | Err(_) => break,32						Ok(n) => {33							if write34								.data_bytes(Bytes::copy_from_slice(&buf[..n]))35								.await36								.is_err()37							{38								return;39							}40						}41					}42				}43				let _ = write.eof().await;44			});4546			let mut code = None;47			while let Some(msg) = read.wait().await {48				match msg {49					ChannelMsg::Data { data } => {50						if out_w.write_all(&data).await.is_err() {51							break;52						}53					}54					ChannelMsg::ExtendedData { data, .. } => {55						if err_w.write_all(&data).await.is_err() {56							break;57						}58					}59					ChannelMsg::ExitStatus { exit_status } => code = Some(exit_status),60					_ => {}61				}62			}6364			stdin_pump.abort();65			let _ = out_w.shutdown().await;66			let _ = err_w.shutdown().await;67			let _ = exit_tx.send(code);68		});6970		SshExecChild {71			stdin,72			stdout,73			stderr,74			exit,75		}76	}7778	/// Wait for the process to finish, returning its exit status.79	pub(crate) async fn wait(self) -> Option<u32> {80		self.exit.await.ok().flatten()81	}82}