git.delta.rocks / jrsonnet / refs/commits / 7c6930a6bff0

difftreelog

refactor remove shell-outs for ssh

Yaroslav Bolyukin2023-12-29parent: #904d121.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
318source = "registry+https://github.com/rust-lang/crates.io-index"318source = "registry+https://github.com/rust-lang/crates.io-index"
319checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"319checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
320
321[[package]]
322name = "better-command"
323version = "0.1.0"
324dependencies = [
325 "once_cell",
326 "regex",
327 "serde",
328 "serde_json",
329 "tracing",
330 "tracing-indicatif",
331]
320332
321[[package]]333[[package]]
322name = "bitflags"334name = "bitflags"
748 "anyhow",760 "anyhow",
749 "async-trait",761 "async-trait",
750 "base64 0.21.5",762 "base64 0.21.5",
763 "better-command",
751 "chrono",764 "chrono",
752 "clap",765 "clap",
753 "futures",766 "futures",
15101523
1511[[package]]1524[[package]]
1512name = "once_cell"1525name = "once_cell"
1513version = "1.18.0"1526version = "1.19.0"
1514source = "registry+https://github.com/rust-lang/crates.io-index"1527source = "registry+https://github.com/rust-lang/crates.io-index"
1515checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"1528checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
15161529
1517[[package]]1530[[package]]
1518name = "opaque-debug"1531name = "opaque-debug"
1921source = "registry+https://github.com/rust-lang/crates.io-index"1934source = "registry+https://github.com/rust-lang/crates.io-index"
1922checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"1935checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
1936
1937[[package]]
1938name = "remowt-agent"
1939version = "0.1.0"
19231940
1924[[package]]1941[[package]]
1925name = "rnix"1942name = "rnix"
21052122
2106[[package]]2123[[package]]
2107name = "serde"2124name = "serde"
2108version = "1.0.190"2125version = "1.0.193"
2109source = "registry+https://github.com/rust-lang/crates.io-index"2126source = "registry+https://github.com/rust-lang/crates.io-index"
2110checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"2127checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
2111dependencies = [2128dependencies = [
2112 "serde_derive",2129 "serde_derive",
2113]2130]
21232140
2124[[package]]2141[[package]]
2125name = "serde_derive"2142name = "serde_derive"
2126version = "1.0.190"2143version = "1.0.193"
2127source = "registry+https://github.com/rust-lang/crates.io-index"2144source = "registry+https://github.com/rust-lang/crates.io-index"
2128checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"2145checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
2129dependencies = [2146dependencies = [
2130 "proc-macro2",2147 "proc-macro2",
2131 "quote",2148 "quote",
21342151
2135[[package]]2152[[package]]
2136name = "serde_json"2153name = "serde_json"
2137version = "1.0.107"2154version = "1.0.108"
2138source = "registry+https://github.com/rust-lang/crates.io-index"2155source = "registry+https://github.com/rust-lang/crates.io-index"
2139checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"2156checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
2140dependencies = [2157dependencies = [
2141 "itoa",2158 "itoa",
2142 "ryu",2159 "ryu",
25272544
2528[[package]]2545[[package]]
2529name = "tracing"2546name = "tracing"
2530version = "0.1.37"2547version = "0.1.40"
2531source = "registry+https://github.com/rust-lang/crates.io-index"2548source = "registry+https://github.com/rust-lang/crates.io-index"
2532checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"2549checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
2533dependencies = [2550dependencies = [
2534 "cfg-if",
2535 "pin-project-lite",2551 "pin-project-lite",
2536 "tracing-attributes",2552 "tracing-attributes",
2537 "tracing-core",2553 "tracing-core",
2538]2554]
25392555
2540[[package]]2556[[package]]
2541name = "tracing-attributes"2557name = "tracing-attributes"
2542version = "0.1.26"2558version = "0.1.27"
2543source = "registry+https://github.com/rust-lang/crates.io-index"2559source = "registry+https://github.com/rust-lang/crates.io-index"
2544checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"2560checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
2545dependencies = [2561dependencies = [
2546 "proc-macro2",2562 "proc-macro2",
2547 "quote",2563 "quote",
25502566
2551[[package]]2567[[package]]
2552name = "tracing-core"2568name = "tracing-core"
2553version = "0.1.31"2569version = "0.1.32"
2554source = "registry+https://github.com/rust-lang/crates.io-index"2570source = "registry+https://github.com/rust-lang/crates.io-index"
2555checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"2571checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
2556dependencies = [2572dependencies = [
2557 "once_cell",2573 "once_cell",
2558 "valuable",2574 "valuable",
2559]2575]
25602576
2561[[package]]2577[[package]]
2562name = "tracing-indicatif"2578name = "tracing-indicatif"
2563version = "0.3.5"2579version = "0.3.6"
2564source = "registry+https://github.com/rust-lang/crates.io-index"2580source = "registry+https://github.com/rust-lang/crates.io-index"
2565checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"2581checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
2566dependencies = [2582dependencies = [
2567 "indicatif",2583 "indicatif",
2568 "tracing",2584 "tracing",
modifiedCargo.tomldiffbeforeafterboth
2members = ["crates/*", "cmds/*"]2members = ["crates/*", "cmds/*"]
3resolver = "2"3resolver = "2"
4
5[workspace.dependencies]
6nixlike = { path = "./crates/nixlike" }
7better-command = { path = "./crates/better-command" }
48
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
6edition = "2021"6edition = "2021"
77
8[dependencies]8[dependencies]
9nixlike.workspace = true
10better-command.workspace = true
9anyhow = "1.0"11anyhow = "1.0"
10serde = { version = "1.0", features = ["derive"] }12serde = { version = "1.0", features = ["derive"] }
11serde_json = "1.0"13serde_json = "1.0"
15hostname = "0.3.1"17hostname = "0.3.1"
16age-core = "0.9.0"18age-core = "0.9.0"
17peg = "0.8.2"19peg = "0.8.2"
18nixlike = { path = "../../crates/nixlike" }
19age = { version = "0.9.2", features = ["ssh", "armor"] }20age = { version = "0.9.2", features = ["ssh", "armor"] }
20base64 = "0.21.5"21base64 = "0.21.5"
21chrono = { version = "0.4.31", features = ["serde"] }22chrono = { version = "0.4.31", features = ["serde"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
1//! Wrapper around nix repl, which allows to work on nix code, without relying on
2//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
3
1use std::collections::HashMap;4use std::collections::HashMap;
2use std::ffi::{OsStr, OsString};5use std::ffi::{OsStr, OsString};
6use std::sync::{Arc, OnceLock};9use std::sync::{Arc, OnceLock};
710
8use anyhow::{anyhow, bail, ensure, Context, Result};11use anyhow::{anyhow, bail, ensure, Context, Result};
12use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
9use futures::StreamExt;13use futures::StreamExt;
10use itertools::Itertools;14use itertools::Itertools;
11use r2d2::{Pool, PooledConnection};15use r2d2::{Pool, PooledConnection};
14use tokio::io::AsyncWriteExt;18use tokio::io::AsyncWriteExt;
15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
16use tokio::select;20use tokio::select;
17use tokio::sync::{mpsc, oneshot};21use tokio::sync::{mpsc, oneshot, Mutex};
18use tokio_util::codec::{FramedRead, LinesCodec};22use tokio_util::codec::{FramedRead, LinesCodec};
19use tracing::{debug, error, warn, Level};23use tracing::{debug, error, warn, Level};
20
21use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
2224
23const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";25const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
2426
30 string_wrapping: (String, String),32 string_wrapping: (String, String),
31 number_wrapping: (String, String),33 number_wrapping: (String, String),
34
35 executing_command: Arc<Mutex<()>>,
3236
33 next_id: u32,37 next_id: u32,
34 free_list: Vec<u32>,38 free_list: Vec<u32>,
219 string_wrapping: Default::default(),223 string_wrapping: Default::default(),
220 number_wrapping: Default::default(),224 number_wrapping: Default::default(),
225
226 executing_command: Arc::new(Mutex::new(())),
221227
222 next_id: 0,228 next_id: 0,
223 free_list: vec![],229 free_list: vec![],
331 expr: impl AsRef<[u8]>,337 expr: impl AsRef<[u8]>,
332 err_handler: &mut dyn Handler,338 err_handler: &mut dyn Handler,
333 ) -> Result<String> {339 ) -> Result<String> {
340 // Prevent two commands from being executed in parallel, messing with each other.
341 let _lock = self.executing_command.clone();
342 let _guard = _lock.lock().await;
343
334 self.send_command(expr).await?;344 self.send_command(expr).await?;
335 // It will be echoed345 // It will be echoed
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
3use std::{env::current_dir, time::Duration};3use std::{env::current_dir, time::Duration};
44
5use crate::command::MyCommand;5use crate::command::MyCommand;
6use crate::host::Config;6use crate::host::{Config, ConfigHost};
7use crate::nix_go;7use crate::nix_go;
8use anyhow::{anyhow, Result};8use anyhow::{anyhow, Result};
9use clap::Parser;9use clap::Parser;
10use itertools::Itertools;10use itertools::Itertools as _;
11use tokio::{task::LocalSet, time::sleep};11use tokio::{task::LocalSet, time::sleep};
12use tracing::{error, field, info, info_span, warn, Instrument};12use tracing::{error, field, info, info_span, warn, Instrument};
1313
112 current: bool,112 current: bool,
113 datetime: String,113 datetime: String,
114}114}
115async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {115async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
116 let mut cmd = MyCommand::new("nix-env");116 let mut cmd = host.cmd("nix-env").await?;
117 cmd.comparg("--profile", "/nix/var/nix/profiles/system")117 cmd.comparg("--profile", "/nix/var/nix/profiles/system")
118 .arg("--list-generations");118 .arg("--list-generations");
119 // Sudo is required due to --list-generations acquiring lock on the profile.119 // Sudo is required due to --list-generations acquiring lock on the profile.
120 let data = config.run_string_on(host, cmd, true).await?;120 let data = cmd.sudo().run_string().await?;
121 let generations = data121 let generations = data
122 .split('\n')122 .split('\n')
123 .map(|e| e.trim())123 .map(|e| e.trim())
163 Ok(current)163 Ok(current)
164}164}
165
166async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
167 let mut cmd = MyCommand::new("systemctl");
168 cmd.arg("stop").arg(unit);
169 config.run_on(host, cmd, true).await
170}
171
172async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
173 let mut cmd = MyCommand::new("systemctl");
174 cmd.arg("start").arg(unit);
175 config.run_on(host, cmd, true).await
176}
177165
178async fn execute_upload(166async fn execute_upload(
179 build: &BuildSystems,167 build: &BuildSystems,
180 config: &Config,
181 action: UploadAction,168 action: UploadAction,
182 host: &str,169 host: &ConfigHost,
183 built: PathBuf,170 built: PathBuf,
184) -> Result<()> {171) -> Result<()> {
185 let mut failed = false;172 let mut failed = false;
191 if !build.disable_rollback {178 if !build.disable_rollback {
192 let _span = info_span!("preparing").entered();179 let _span = info_span!("preparing").entered();
193 info!("preparing for rollback");180 info!("preparing for rollback");
194 let generation = get_current_generation(config, host).await?;181 let generation = get_current_generation(&host).await?;
195 info!(182 info!(
196 "rollback target would be {} {}",183 "rollback target would be {} {}",
197 generation.id, generation.datetime184 generation.id, generation.datetime
198 );185 );
199 {186 {
200 let mut cmd = MyCommand::new("sh");187 let mut cmd = host.cmd("sh").await?;
201 cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));188 cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
202 if let Err(e) = config.run_on(host, cmd, true).await {189 if let Err(e) = cmd.sudo().run().await {
203 error!("failed to set rollback marker: {e}");190 error!("failed to set rollback marker: {e}");
204 failed = true;191 failed = true;
205 }192 }
215 // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.202 // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
216 // Anyway, reboot will still help in this case.203 // Anyway, reboot will still help in this case.
217 if action.should_schedule_rollback_run() {204 if action.should_schedule_rollback_run() {
218 let mut cmd = MyCommand::new("systemd-run");205 let mut cmd = host.cmd("systemd-run").await?;
219 cmd.comparg("--on-active", "3min")206 cmd.comparg("--on-active", "3min")
220 .comparg("--unit", "rollback-watchdog-run")207 .comparg("--unit", "rollback-watchdog-run")
221 .arg("systemctl")208 .arg("systemctl")
222 .arg("start")209 .arg("start")
223 .arg("rollback-watchdog.service");210 .arg("rollback-watchdog.service");
224 if let Err(e) = config.run_on(host, cmd, true).await {211 if let Err(e) = cmd.sudo().run().await {
225 error!("failed to schedule rollback run: {e}");212 error!("failed to schedule rollback run: {e}");
226 failed = true;213 failed = true;
227 }214 }
228 }215 }
229 }216 }
217
230 if action.should_switch_profile() && !failed {218 if action.should_switch_profile() && !failed {
231 info!("switching generation");219 info!("switching generation");
232 let mut cmd = MyCommand::new("nix-env");220 let mut cmd = host.cmd("nix-env").await?;
233 cmd.comparg("--profile", "/nix/var/nix/profiles/system")221 cmd.comparg("--profile", "/nix/var/nix/profiles/system")
234 .comparg("--set", &built);222 .comparg("--set", &built);
235 if let Err(e) = config.run_on(host, cmd, true).await {223 if let Err(e) = cmd.sudo().run().await {
236 error!("failed to switch generation: {e}");224 error!("failed to switch generation: {e}");
237 failed = true;225 failed = true;
238 }226 }
239 }227 }
228
229 // FIXME: Connection might be disconnected after activation run
230
240 if action.should_activate() && !failed {231 if action.should_activate() && !failed {
241 let _span = info_span!("activating").entered();232 let _span = info_span!("activating").entered();
242 info!("executing activation script");233 info!("executing activation script");
243 let mut switch_script = built.clone();234 let mut switch_script = built.clone();
244 switch_script.push("bin");235 switch_script.push("bin");
245 switch_script.push("switch-to-configuration");236 switch_script.push("switch-to-configuration");
246 let mut cmd = MyCommand::new(switch_script);237 let mut cmd = host.cmd(switch_script).await?;
247 cmd.arg(action.name());238 cmd.arg(action.name());
248 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {239 if let Err(e) = cmd.sudo().run().in_current_span().await {
249 error!("failed to activate: {e}");240 error!("failed to activate: {e}");
250 failed = true;241 failed = true;
251 }242 }
252 }243 }
253 if !build.disable_rollback {244 if !build.disable_rollback {
254 if failed {245 if failed {
255 info!("executing rollback");246 info!("executing rollback");
256 if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")247 if let Err(e) = host
248 .systemctl_start("rollback-watchdog.service")
257 .instrument(info_span!("rollback"))249 .instrument(info_span!("rollback"))
258 .await250 .await
259 {251 {
260 error!("failed to trigger rollback: {e}")252 error!("failed to trigger rollback: {e}")
261 }253 }
262 } else {254 } else {
263 info!("trying to mark upgrade as successful");255 info!("trying to mark upgrade as successful");
264 let mut cmd = MyCommand::new("rm");
265 cmd.arg("-f").arg("/etc/fleet_rollback_marker");
266 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {256 if let Err(e) = host
257 .rm_file("/etc/fleet_rollback_marker", true)
258 .in_current_span()
259 .await
260 {
267 error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")261 error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
268 }262 }
269 }263 }
270 info!("disarming watchdog, just in case");264 info!("disarming watchdog, just in case");
271 if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {265 if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
272 // It is ok, if there was no reboot - then timer might not be running.266 // It is ok, if there was no reboot - then timer might not be running.
273 }267 }
274 if action.should_schedule_rollback_run() {268 if action.should_schedule_rollback_run() {
275 if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {269 if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
276 error!("failed to disarm rollback run: {e}");270 error!("failed to disarm rollback run: {e}");
277 }271 }
278 }272 }
279 } else {273 } else if let Err(_e) = host
280 let mut cmd = MyCommand::new("rm");
281 cmd.arg("-f").arg("/etc/fleet_rollback_marker");
282 if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {274 .rm_file("/etc/fleet_rollback_marker", true)
275 .in_current_span()
276 .await
277 {
283 // Marker might not exist, yet better try to remove it.278 // Marker might not exist, yet better try to remove it.
284 }279 }
285 }
286 Ok(())280 Ok(())
287}281}
288282
289impl BuildSystems {283impl BuildSystems {
290 async fn build_task(self, config: Config, host: String) -> Result<()> {284 async fn build_task(self, config: Config, host: String) -> Result<()> {
291 info!("building");285 info!("building");
286 let host = config.host(&host).await?;
292 let action = Action::from(self.subcommand.clone());287 let action = Action::from(self.subcommand.clone());
293 let fleet_field = &config.fleet_field;288 let fleet_field = &config.fleet_field;
294 let drv = nix_go!(289 let drv = nix_go!(
295 fleet_field.buildSystems(Obj {290 fleet_field.buildSystems(Obj {
296 localSystem: { config.local_system.clone() }291 localSystem: { config.local_system.clone() }
297 })[{ action.build_attr() }][{ host }]292 })[{ action.build_attr() }][{ &host.name }]
298 );293 );
299 let outputs = drv.build().await.map_err(|e| {294 let outputs = drv.build().await.map_err(|e| {
300 if action.build_attr() == "sdImage" {295 if action.build_attr() == "sdImage" {
309304
310 match action {305 match action {
311 Action::Upload { action } => {306 Action::Upload { action } => {
312 if !config.is_local(&host) {307 if !config.is_local(&host.name) {
313 info!("uploading system closure");308 info!("uploading system closure");
314 {309 {
310 // TODO: Move to remote_derivation method.
315 // Alternatively, nix store make-content-addressed can be used,311 // Alternatively, nix store make-content-addressed can be used,
316 // at least for the first deployment, to provide trusted store key.312 // at least for the first deployment, to provide trusted store key.
317 //313 //
329 }325 }
330 let mut tries = 0;326 let mut tries = 0;
331 loop {327 loop {
332 let mut nix = MyCommand::new("nix");
333 nix.arg("copy")
334 .arg("--substitute-on-destination")
335 .comparg("--to", format!("ssh-ng://{host}"))
336 .arg(out_output);
337 match nix.run_nix().await {328 match host.remote_derivation(out_output).await {
338 Ok(()) => break,329 Ok(remote) => {
330 assert!(&remote == out_output, "CA derivations aren't implemented");
331 break;
332 }
339 Err(e) if tries < 3 => {333 Err(e) if tries < 3 => {
340 tries += 1;334 tries += 1;
341 warn!("Copy failure ({}/3): {}", tries, e);335 warn!("Copy failure ({}/3): {}", tries, e);
346 }340 }
347 }341 }
348 if let Some(action) = action {342 if let Some(action) = action {
349 execute_upload(&self, &config, action, &host, out_output.clone()).await?343 execute_upload(&self, action, &host, out_output.clone()).await?
350 }344 }
351 }345 }
352 Action::Package(PackageAction::SdImage) => {346 Action::Package(PackageAction::SdImage) => {
353 let mut out = current_dir()?;347 let mut out = current_dir()?;
354 out.push(format!("sd-image-{}", host));348 out.push(format!("sd-image-{}", host.name));
355349
356 info!("linking sd image to {:?}", out);350 info!("linking sd image to {:?}", out);
357 symlink(out_output, out)?;351 symlink(out_output, out)?;
358 }352 }
359 Action::Package(PackageAction::InstallationCd) => {353 Action::Package(PackageAction::InstallationCd) => {
360 let mut out = current_dir()?;354 let mut out = current_dir()?;
361 out.push(format!("installation-cd-{}", host));355 out.push(format!("installation-cd-{}", host.name));
362356
363 info!("linking iso image to {:?}", out);357 info!("linking iso image to {:?}", out);
364 symlink(out_output, out)?;358 symlink(out_output, out)?;
379 let this = this.clone();373 let this = this.clone();
380 let span = info_span!("deployment", host = field::display(&host.name));374 let span = info_span!("deployment", host = field::display(&host.name));
381 let hostname = host.name;375 let hostname = host.name;
376 // FIXME: Since the introduction of better-nix-eval,
377 // due to single repl used for builds, hosts are waiting for each other to build,
378 // instead of building concurrently.
379 //
380 // Open multiple repls?
381 //
382 // Create build batcher, which will behave similar to golangs
383 // WaitGroup, and start executing once all the build tasks are scheduled?
384 // This also allows to cleanup build output, as there will be no longer
385 // "waiting for remote machine" messages in the cases when one package is needed for
386 // multiple hosts.
382 set.spawn_local(387 set.spawn_local(
383 (async move {388 (async move {
384 match this.build_task(config, hostname).await {389 match this.build_task(config, hostname).await {
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
7use anyhow::{anyhow, bail, ensure, Context, Result};7use anyhow::{anyhow, bail, ensure, Context, Result};
8use chrono::{DateTime, Utc};8use chrono::{DateTime, Utc};
9use clap::Parser;9use clap::Parser;
10use futures::{StreamExt, TryStreamExt};10use futures::StreamExt;
11use itertools::Itertools;11use itertools::Itertools;
12use owo_colors::OwoColorize;12use owo_colors::OwoColorize;
13use std::{13use std::{
404 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;404 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
405405
406 if let Some(data) = secret.secret.secret {406 if let Some(data) = secret.secret.secret {
407 let host = config.host(&identity_holder).await?;
407 let encrypted = config408 let encrypted = host.reencrypt(data, target_recipients).await?;
408 .reencrypt_on_host(identity_holder, data, target_recipients)
409 .await?;
410 secret.secret.secret = Some(encrypted);409 secret.secret.secret = Some(encrypted);
411 }410 }
481 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;480 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
482481
483 if let Some(secret) = data.secret.secret {482 if let Some(secret) = data.secret.secret {
483 let host = config.host(identity_holder).await?;
484 let encrypted = config484 let encrypted = host.reencrypt(secret, target_recipients).await?;
485 .reencrypt_on_host(identity_holder, secret, target_recipients)
486 .await?;
487485
488 data.secret.secret = Some(encrypted);486 data.secret.secret = Some(encrypted);
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
1use std::{1use std::thread::sleep;
2 collections::HashMap,2use std::time::Duration;
3 ffi::OsStr,3use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
4 pin,
5 process::Stdio,
6 sync::{Arc, Mutex},
7 task::Poll,
8};
94
10use anyhow::{anyhow, Result};5use anyhow::{anyhow, Result};
6use better_command::{Handler, NixHandler, PlainHandler};
11use futures::StreamExt;7use futures::StreamExt;
12use itertools::Either;8use itertools::Either;
13use once_cell::sync::Lazy;
14use openssh::{OverSsh, OwningCommand, Session};9use openssh::{OverSsh, OwningCommand, Session};
15use regex::Regex;
16use serde::{de::Visitor, Deserialize};
17use tokio::{io::AsyncRead, process::Command, select};10use tokio::{io::AsyncRead, process::Command, select};
18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
19use tracing::{info, info_span, warn, Span};12use tracing::{info, debug};
20use tracing_indicatif::span_ext::IndicatifSpanExt;
2113
22fn escape_bash(input: &str, out: &mut String) {14fn escape_bash(input: &str, out: &mut String) {
23 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
87 return self;79 return self;
88 }80 }
89 let mut out = Self::new("env");81 let mut out = Self::new("env");
90 if let Some(session) = self.ssh_session {82 out.ssh_session = self.ssh_session;
91 out = out.ssh_session(session);
92 }
93 for (k, v) in self.env {83 for (k, v) in self.env {
94 assert!(!k.contains('='));84 assert!(!k.contains('='));
95 out.arg(format!("{k}={v}"));85 out.arg(format!("{k}={v}"));
179 out169 out
180 } else {170 } else {
181 let mut out = Self::new("sudo");171 let mut out = Self::new("sudo");
172 out.ssh_session = self.ssh_session.take();
182 out.args(self.into_args());173 out.args(self.into_args());
183 out174 out
184 }175 }
185 }176 }
186 pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
187 self.ssh_session = Some(on);
188 self
189 }
190 pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
191 let mut out = Self::new("ssh");
192 out.ssh_session = self.ssh_session.take();
193 out.arg(on).arg("--");
194 out.arg(self.into_string());
195 out
196 }
197177
198 pub async fn run(self) -> Result<()> {178 pub async fn run(self) -> Result<()> {
199 let str = self.clone().into_string();179 let str = self.clone().into_string();
278 Ok(())258 Ok(())
279}259}
280260
281pub trait Handler: Send {
282 fn handle_line(&mut self, e: &str);
283}
284
285pub struct ClonableHandler<H>(Arc<Mutex<H>>);
286impl<H> Clone for ClonableHandler<H> {
287 fn clone(&self) -> Self {
288 Self(self.0.clone())
289 }
290}
291impl<H> ClonableHandler<H> {
292 pub fn new(inner: H) -> Self {
293 Self(Arc::new(Mutex::new(inner)))
294 }
295}
296impl<H: Handler> Handler for ClonableHandler<H> {
297 fn handle_line(&mut self, e: &str) {
298 self.0.lock().unwrap().handle_line(e)
299 }
300}
301
302struct PlainHandler;
303impl Handler for PlainHandler {
304 fn handle_line(&mut self, e: &str) {
305 info!(target: "log", "{e}");
306 }
307}
308
309pub struct NoopHandler;
310impl Handler for NoopHandler {
311 fn handle_line(&mut self, _e: &str) {}
312}
313
314#[derive(Default)]
315pub struct NixHandler {
316 spans: HashMap<u64, Span>,
317}
318fn process_message(m: &str) -> String {
319 static OSC_CLEANER: Lazy<Regex> =
320 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
321 static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
322 let m = OSC_CLEANER.replace_all(m, "");
323 // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
324 // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
325 DETABBER.replace_all(m.as_ref(), " ").to_string()
326}
327impl Handler for NixHandler {
328 fn handle_line(&mut self, e: &str) {
329 if let Some(e) = e.strip_prefix("@nix ") {
330 let log: NixLog = match serde_json::from_str(e) {
331 Ok(l) => l,
332 Err(err) => {
333 warn!("failed to parse nix log line {:?}: {}", e, err);
334 return;
335 }
336 };
337 match log {
338 NixLog::Msg { msg, raw_msg, .. } => {
339 #[allow(clippy::nonminimal_bool)]
340 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
341 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
342 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
343 if let Some(raw_msg) = raw_msg {
344 if !msg.is_empty() {
345 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
346 } else {
347 info!(target: "nix", "{}", raw_msg.trim_end())
348 }
349 } else {
350 info!(target: "nix", "{}", msg.trim_end())
351 }
352 }
353 }
354 NixLog::Start {
355 ref fields,
356 typ,
357 id,
358 ..
359 } if typ == 105 && !fields.is_empty() => {
360 if let [LogField::String(drv), ..] = &fields[..] {
361 let mut drv = drv.as_str();
362 if let Some(pkg) = drv.strip_prefix("/nix/store/") {
363 let mut it = pkg.splitn(2, '-');
364 it.next();
365 if let Some(pkg) = it.next() {
366 drv = pkg;
367 }
368 }
369 info!(target: "nix","building {}", drv);
370 let span = info_span!("build", drv);
371 span.pb_start();
372 self.spans.insert(id, span);
373 } else {
374 warn!("bad build log: {:?}", log)
375 }
376 }
377 NixLog::Start {
378 ref fields,
379 typ,
380 id,
381 ..
382 } if typ == 100 && fields.len() >= 3 => {
383 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
384 &fields[..]
385 {
386 let mut drv = drv.as_str();
387
388 if let Some(pkg) = drv.strip_prefix("/nix/store/") {
389 let mut it = pkg.splitn(2, '-');
390 it.next();
391 if let Some(pkg) = it.next() {
392 drv = pkg;
393 }
394 }
395 // info!(target: "nix","copying {} {} -> {}", drv, from, to);
396 let span = info_span!("copy", from, to, drv);
397 span.pb_start();
398 self.spans.insert(id, span);
399 } else {
400 warn!("bad copy log: {:?}", log)
401 }
402 }
403 NixLog::Start { text, typ, id, .. }
404 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
405 {
406 if !text.is_empty()
407 && text != "querying info about missing paths"
408 && text != "copying 0 paths"
409 // Too much spam on lazy-trees branch
410 && !(text.starts_with("copying '") && text.ends_with("' to the store"))
411 {
412 let span = info_span!("job");
413 span.pb_start();
414 span.pb_set_message(&process_message(text.trim()));
415 self.spans.insert(id, span);
416 info!(target: "nix", "{}", text);
417 }
418 }
419 NixLog::Start {
420 text,
421 level: 0,
422 typ: 108,
423 ..
424 } if text.is_empty() => {
425 // Cache lookup? Coupled with copy log
426 }
427 NixLog::Start {
428 text,
429 level: 4,
430 typ: 109,
431 ..
432 } if text.starts_with("querying info about ") => {
433 // Cache lookup
434 }
435 NixLog::Start {
436 text,
437 level: 4,
438 typ: 101,
439 ..
440 } if text.starts_with("downloading ") => {
441 // NAR downloading, coupled with copy log
442 }
443 NixLog::Start {
444 text,
445 level: 1,
446 typ: 111,
447 ..
448 } if text.starts_with("waiting for a machine to build ") => {
449 // Useless repeating notification about build
450 }
451 NixLog::Start {
452 text,
453 level: 3,
454 typ: 111,
455 ..
456 } if text.starts_with("resolved derivation: ") => {
457 // CA resolved
458 }
459 NixLog::Start {
460 text,
461 level: 1,
462 typ: 111,
463 id,
464 ..
465 } if text.starts_with("waiting for lock on ") => {
466 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
467 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
468 drv = txt;
469 }
470 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
471 drv = txt;
472 }
473 if let Some(txt) = drv.split("', '").next() {
474 drv = txt;
475 }
476 if let Some(pkg) = drv.strip_prefix("/nix/store/") {
477 let mut it = pkg.splitn(2, '-');
478 it.next();
479 if let Some(pkg) = it.next() {
480 drv = pkg;
481 }
482 }
483 let span = info_span!("waiting on drv", drv);
484 span.pb_start();
485 self.spans.insert(id, span);
486 // Concurrent build of the same message
487 }
488 NixLog::Stop { id, .. } => {
489 self.spans.remove(&id);
490 }
491 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
492 if let Some(span) = self.spans.get(&id) {
493 if let LogField::String(s) = &fields[0] {
494 span.pb_set_message(&process_message(s.trim()));
495 } else {
496 warn!("bad fields: {fields:?}");
497 }
498 } else {
499 warn!("unknown result id: {id} {typ} {fields:?}");
500 }
501 // dbg!(fields, id, typ);
502 }
503 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
504 if let Some(span) = self.spans.get(&id) {
505 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
506 &fields[..4]
507 {
508 span.pb_set_length(*expected);
509 span.pb_set_position(*done);
510 } else {
511 warn!("bad fields: {fields:?}");
512 }
513 } else {
514 // warn!("unknown result id: {id} {typ} {fields:?}");
515 // Unaccounted progress.
516 }
517 // dbg!(fields, id, typ);
518 }
519 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
520 // Set phase, expected
521 }
522 _ => warn!("unknown log: {:?}", log),
523 };
524 } else {
525 let e = e.trim();
526 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
527 return;
528 }
529 info!("{e}")
530 }
531 }
532}
533
534async fn run_nix_inner_raw(261async fn run_nix_inner_raw(
535 str: String,262 str: String,
540) -> Result<Option<Vec<u8>>> {267) -> Result<Option<Vec<u8>>> {
541 cmd.stderr(Stdio::piped());268 cmd.stderr(Stdio::piped());
542 cmd.stdout(Stdio::piped());269 cmd.stdout(Stdio::piped());
270 debug!("running command {cmd:?} on local");
543 let mut child = cmd.spawn()?;271 let mut child = cmd.spawn()?;
544 let mut stderr = child.stderr.take().unwrap();272 let mut stderr = child.stderr.take().unwrap();
545 let stdout = child.stdout.take().unwrap();273 let stdout = child.stdout.take().unwrap();
600 err_handler: &mut dyn Handler,328 err_handler: &mut dyn Handler,
601 mut out_handler: Option<&mut dyn Handler>,329 mut out_handler: Option<&mut dyn Handler>,
602) -> Result<Option<Vec<u8>>> {330) -> Result<Option<Vec<u8>>> {
331 debug!("running command {cmd:?} over ssh");
603 cmd.stderr(openssh::Stdio::piped());332 cmd.stderr(openssh::Stdio::piped());
604 cmd.stdout(openssh::Stdio::piped());333 cmd.stdout(openssh::Stdio::piped());
605 let mut child = cmd.spawn().await?;334 let mut child = cmd.spawn().await?;
656 }385 }
657386
658 Ok(out_buf)387 Ok(out_buf)
659}
660
661pub trait ErrorRecorder: Send {
662 /// Return true to discard message from logging
663 fn push_message(&mut self, msg: &str) -> bool;
664}
665
666#[derive(Debug)]
667enum LogField {
668 String(String),
669 Num(u64),
670}
671
672impl<'de> Deserialize<'de> for LogField {
673 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
674 where
675 D: serde::Deserializer<'de>,
676 {
677 struct StringOrNum;
678 impl<'de> Visitor<'de> for StringOrNum {
679 type Value = LogField;
680
681 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
682 write!(f, "string or unsigned")
683 }
684
685 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
686 where
687 E: serde::de::Error,
688 {
689 Ok(LogField::String(v.to_owned()))
690 }
691
692 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
693 where
694 E: serde::de::Error,
695 {
696 Ok(LogField::Num(v))
697 }
698 }
699
700 deserializer.deserialize_any(StringOrNum)
701 }
702}
703
704#[derive(Deserialize, Debug)]
705#[serde(rename_all = "camelCase", tag = "action")]
706#[allow(dead_code)]
707enum NixLog {
708 Msg {
709 level: u32,
710 msg: String,
711 raw_msg: Option<String>,
712 },
713 Start {
714 id: u64,
715 level: u32,
716 #[serde(default)]
717 fields: Vec<LogField>,
718 text: String,
719 #[serde(rename = "type")]
720 typ: u32,
721 },
722 Stop {
723 id: u64,
724 },
725 Result {
726 id: u64,
727 #[serde(rename = "type")]
728 typ: u32,
729 #[serde(default)]
730 fields: Vec<LogField>,
731 },
732}388}
733389
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
9 sync::{Arc, Mutex, MutexGuard, OnceLock},9 sync::{Arc, Mutex, MutexGuard, OnceLock},
10};10};
1111
12use age::Recipient;
13use anyhow::{anyhow, bail, Context, Result};12use anyhow::{anyhow, bail, Context, Result};
14use clap::{ArgGroup, Parser};13use clap::{ArgGroup, Parser};
15use openssh::SessionBuilder;14use openssh::SessionBuilder;
5049
51pub struct ConfigHost {50pub struct ConfigHost {
52 pub name: String,51 pub name: String,
52 pub local: bool,
53 pub session: OnceLock<Arc<openssh::Session>>,53 pub session: OnceLock<Arc<openssh::Session>>,
54}54}
55impl ConfigHost {55impl ConfigHost {
56 pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {56 async fn open_session(&self) -> Result<Arc<openssh::Session>> {
57 assert!(!self.local, "do not open ssh connection to local session");
57 // FIXME: TOCTOU58 // FIXME: TOCTOU
58 if let Some(session) = &self.session.get() {59 if let Some(session) = &self.session.get() {
59 return Ok((*session).clone());60 return Ok((*session).clone());
96 D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))97 D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
97 }98 }
98 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {99 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
100 if self.local {
101 Ok(MyCommand::new(cmd))
102 } else {
99 let session = self.open_session().await?;103 let session = self.open_session().await?;
100 Ok(MyCommand::new_on(cmd, session))104 Ok(MyCommand::new_on(cmd, session))
101 }105 }
106 }
102107
103 pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {108 pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
110 .context("failed to call remote host for decrypt")?;115 .context("failed to call remote host for decrypt")?;
111 z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")116 z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
112 }117 }
118 pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
119 let mut cmd = self.cmd("fleet-install-secrets").await?;
120 cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
121 for target in targets {
122 cmd.eqarg("--targets", target);
123 }
124 let encoded = cmd
125 .sudo()
126 .run_string()
127 .await
128 .context("failed to call remote host for decrypt")?;
129 SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
130 }
113 /// Returns path for futureproofing, as path might change i.e on conversion to CA131 /// Returns path for futureproofing, as path might change i.e on conversion to CA
114 pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {132 pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
133 if self.local {
134 // Path is located locally, thus already trusted.
135 return Ok(path.to_owned());
136 }
115 let mut nix = MyCommand::new("nix");137 let mut nix = MyCommand::new("nix");
116 nix.arg("copy")138 nix.arg("copy")
117 .arg("--substitute-on-destination")139 .arg("--substitute-on-destination")
120 nix.run_nix().await?;142 nix.run_nix().await?;
121 Ok(path.to_owned())143 Ok(path.to_owned())
122 }144 }
145 pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
146 let mut cmd = self.cmd("systemctl").await?;
147 cmd.arg("stop").arg(name);
148 cmd.sudo().run().await
149 }
150 pub async fn systemctl_start(&self, name: &str) -> Result<()> {
151 let mut cmd = self.cmd("systemctl").await?;
152 cmd.arg("start").arg(name);
153 cmd.sudo().run().await
154 }
155
156 pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
157 let mut cmd = self.cmd("rm").await?;
158 cmd.arg("-f").arg(path);
159 if sudo {
160 cmd = cmd.sudo()
161 }
162 cmd.run().await
163 }
123}164}
124165
125impl Config {166impl Config {
136 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)177 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
137 }178 }
138
139 pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
140 if sudo {
141 command = command.sudo();
142 }
143 if !self.is_local(host) {
144 command = command.ssh(host);
145 }
146 command.run().await
147 }
148 pub async fn run_string_on(
149 &self,
150 host: &str,
151 mut command: MyCommand,
152 sudo: bool,
153 ) -> Result<String> {
154 if sudo {
155 command = command.sudo();
156 }
157 if !self.is_local(host) {
158 command = command.ssh(host);
159 }
160 command.run_string().await
161 }
162179
163 pub async fn host(&self, name: &str) -> Result<ConfigHost> {180 pub async fn host(&self, name: &str) -> Result<ConfigHost> {
164 Ok(ConfigHost {181 Ok(ConfigHost {
165 name: name.to_owned(),182 name: name.to_owned(),
183 local: self.is_local(name),
166 session: OnceLock::new(),184 session: OnceLock::new(),
167 })185 })
168 }186 }
172 let mut out = vec![];190 let mut out = vec![];
173 for name in names {191 for name in names {
174 out.push(ConfigHost {192 out.push(ConfigHost {
193 local: self.is_local(&name),
175 name,194 name,
176 session: OnceLock::new(),195 session: OnceLock::new(),
177 })196 })
227 host_secrets.insert(secret, value);246 host_secrets.insert(secret, value);
228 }247 }
229
230 pub async fn reencrypt_on_host(
231 &self,
232 host: &str,
233 data: SecretData,
234 targets: Vec<String>,
235 ) -> Result<SecretData> {
236 let mut recmd = MyCommand::new("fleet-install-secrets");
237 recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
238 for target in targets {
239 recmd.eqarg("--targets", target);
240 }
241 recmd = recmd.sudo().ssh(host);
242 let encoded = recmd
243 .run_string()
244 .await
245 .context("failed to call remote host for decrypt")?
246 .trim()
247 .to_owned();
248 SecretData::decode_z85(&encoded)
249 }
250248
251 pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {249 pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
252 let data = self.data();250 let data = self.data();
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
1use std::str::FromStr;1use std::str::FromStr;
22
3use crate::command::MyCommand;
4use crate::host::Config;3use crate::host::Config;
5use age::Recipient;4use age::Recipient;
6use anyhow::{anyhow, Result};5use anyhow::{anyhow, Result};
30 Ok(key)29 Ok(key)
31 } else {30 } else {
32 warn!("Loading key for {}", host);31 warn!("Loading key for {}", host);
32 let host = self.host(host).await?;
33 let mut cmd = MyCommand::new("cat");33 let mut cmd = host.cmd("cat").await?;
34 cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");34 cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
35 let key = self.run_string_on(host, cmd, false).await?;35 let key = cmd.run_string().await?;
36 self.update_key(host, key.clone());36 self.update_key(&host.name, key.clone());
37 Ok(key)37 Ok(key)
38 }38 }
39 }39 }
addedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth

no changes

addedcmds/remowt-agent/README.adocdiffbeforeafterboth

no changes

addedcmds/remowt-agent/src/main.rsdiffbeforeafterboth

no changes

addedcrates/better-command/Cargo.tomldiffbeforeafterboth

no changes

addedcrates/better-command/src/handler.rsdiffbeforeafterboth

no changes

addedcrates/better-command/src/lib.rsdiffbeforeafterboth

no changes