difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
use std::sync::{Arc, OnceLock};
use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
use futures::StreamExt;
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::codec::{FramedRead, LinesCodec};
use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
@@ -30,6 +32,8 @@
string_wrapping: (String, String),
number_wrapping: (String, String),
+ executing_command: Arc<Mutex<()>>,
+
next_id: u32,
free_list: Vec<u32>,
}
@@ -219,6 +223,8 @@
string_wrapping: Default::default(),
number_wrapping: Default::default(),
+ executing_command: Arc::new(Mutex::new(())),
+
next_id: 0,
free_list: vec![],
};
@@ -331,6 +337,10 @@
expr: impl AsRef<[u8]>,
err_handler: &mut dyn Handler,
) -> Result<String> {
+ // Prevent two commands from being executed in parallel, messing with each other.
+ let _lock = self.executing_command.clone();
+ let _guard = _lock.lock().await;
+
self.send_command(expr).await?;
// It will be echoed
self.send_command(REPL_DELIMITER).await?;
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth3use std::{env::current_dir, time::Duration};3use std::{env::current_dir, time::Duration};445use crate::command::MyCommand;5use crate::command::MyCommand;6use crate::host::Config;6use crate::host::{Config, ConfigHost};7use crate::nix_go;7use crate::nix_go;8use anyhow::{anyhow, Result};8use anyhow::{anyhow, Result};9use clap::Parser;9use clap::Parser;10use itertools::Itertools;10use itertools::Itertools as _;11use tokio::{task::LocalSet, time::sleep};11use tokio::{task::LocalSet, time::sleep};12use tracing::{error, field, info, info_span, warn, Instrument};12use tracing::{error, field, info, info_span, warn, Instrument};1313112 current: bool,112 current: bool,113 datetime: String,113 datetime: String,114}114}115async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {115async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {116 let mut cmd = MyCommand::new("nix-env");116 let mut cmd = host.cmd("nix-env").await?;117 cmd.comparg("--profile", "/nix/var/nix/profiles/system")117 cmd.comparg("--profile", "/nix/var/nix/profiles/system")118 .arg("--list-generations");118 .arg("--list-generations");119 // Sudo is required due to --list-generations acquiring lock on the profile.119 // Sudo is required due to --list-generations acquiring lock on the profile.120 let data = config.run_string_on(host, cmd, true).await?;120 let data = cmd.sudo().run_string().await?;121 let generations = data121 let generations = data122 .split('\n')122 .split('\n')123 .map(|e| e.trim())123 .map(|e| e.trim())163 Ok(current)163 Ok(current)164}164}165166async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {167 let mut cmd = MyCommand::new("systemctl");168 cmd.arg("stop").arg(unit);169 config.run_on(host, cmd, true).await170}171172async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {173 let mut cmd = MyCommand::new("systemctl");174 cmd.arg("start").arg(unit);175 config.run_on(host, cmd, true).await176}177165178async fn execute_upload(166async fn execute_upload(179 build: &BuildSystems,167 build: &BuildSystems,180 config: &Config,181 action: UploadAction,168 action: UploadAction,182 host: &str,169 host: &ConfigHost,183 built: PathBuf,170 built: PathBuf,184) -> Result<()> {171) -> Result<()> {185 let mut failed = false;172 let mut failed = false;191 if !build.disable_rollback {178 if !build.disable_rollback {192 let _span = info_span!("preparing").entered();179 let _span = info_span!("preparing").entered();193 info!("preparing for rollback");180 info!("preparing for rollback");194 let generation = get_current_generation(config, host).await?;181 let generation = get_current_generation(&host).await?;195 info!(182 info!(196 "rollback target would be {} {}",183 "rollback target would be {} {}",197 generation.id, generation.datetime184 generation.id, generation.datetime198 );185 );199 {186 {200 let mut cmd = MyCommand::new("sh");187 let mut cmd = host.cmd("sh").await?;201 cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));188 cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));202 if let Err(e) = config.run_on(host, cmd, true).await {189 if let Err(e) = cmd.sudo().run().await {203 error!("failed to set rollback marker: {e}");190 error!("failed to set rollback marker: {e}");204 failed = true;191 failed = true;205 }192 }215 // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.202 // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.216 // Anyway, reboot will still help in this case.203 // Anyway, reboot will still help in this case.217 if action.should_schedule_rollback_run() {204 if action.should_schedule_rollback_run() {218 let mut cmd = MyCommand::new("systemd-run");205 let mut cmd = host.cmd("systemd-run").await?;219 cmd.comparg("--on-active", "3min")206 cmd.comparg("--on-active", "3min")220 .comparg("--unit", "rollback-watchdog-run")207 .comparg("--unit", "rollback-watchdog-run")221 .arg("systemctl")208 .arg("systemctl")222 .arg("start")209 .arg("start")223 .arg("rollback-watchdog.service");210 .arg("rollback-watchdog.service");224 if let Err(e) = config.run_on(host, cmd, true).await {211 if let Err(e) = cmd.sudo().run().await {225 error!("failed to schedule rollback run: {e}");212 error!("failed to schedule rollback run: {e}");226 failed = true;213 failed = true;227 }214 }228 }215 }229 }216 }217230 if action.should_switch_profile() && !failed {218 if action.should_switch_profile() && !failed {231 info!("switching generation");219 info!("switching generation");232 let mut cmd = MyCommand::new("nix-env");220 let mut cmd = host.cmd("nix-env").await?;233 cmd.comparg("--profile", "/nix/var/nix/profiles/system")221 cmd.comparg("--profile", "/nix/var/nix/profiles/system")234 .comparg("--set", &built);222 .comparg("--set", &built);235 if let Err(e) = config.run_on(host, cmd, true).await {223 if let Err(e) = cmd.sudo().run().await {236 error!("failed to switch generation: {e}");224 error!("failed to switch generation: {e}");237 failed = true;225 failed = true;238 }226 }239 }227 }228229 // FIXME: Connection might be disconnected after activation run230240 if action.should_activate() && !failed {231 if action.should_activate() && !failed {241 let _span = info_span!("activating").entered();232 let _span = info_span!("activating").entered();242 info!("executing activation script");233 info!("executing activation script");243 let mut switch_script = built.clone();234 let mut switch_script = built.clone();244 switch_script.push("bin");235 switch_script.push("bin");245 switch_script.push("switch-to-configuration");236 switch_script.push("switch-to-configuration");246 let mut cmd = MyCommand::new(switch_script);237 let mut cmd = host.cmd(switch_script).await?;247 cmd.arg(action.name());238 cmd.arg(action.name());248 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {239 if let Err(e) = cmd.sudo().run().in_current_span().await {249 error!("failed to activate: {e}");240 error!("failed to activate: {e}");250 failed = true;241 failed = true;251 }242 }252 }243 }253 if !build.disable_rollback {244 if !build.disable_rollback {254 if failed {245 if failed {255 info!("executing rollback");246 info!("executing rollback");256 if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")247 if let Err(e) = host248 .systemctl_start("rollback-watchdog.service")257 .instrument(info_span!("rollback"))249 .instrument(info_span!("rollback"))258 .await250 .await259 {251 {260 error!("failed to trigger rollback: {e}")252 error!("failed to trigger rollback: {e}")261 }253 }262 } else {254 } else {263 info!("trying to mark upgrade as successful");255 info!("trying to mark upgrade as successful");264 let mut cmd = MyCommand::new("rm");265 cmd.arg("-f").arg("/etc/fleet_rollback_marker");266 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {256 if let Err(e) = host257 .rm_file("/etc/fleet_rollback_marker", true)258 .in_current_span()259 .await260 {267 error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")261 error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")268 }262 }269 }263 }270 info!("disarming watchdog, just in case");264 info!("disarming watchdog, just in case");271 if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {265 if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {272 // It is ok, if there was no reboot - then timer might not be running.266 // It is ok, if there was no reboot - then timer might not be running.273 }267 }274 if action.should_schedule_rollback_run() {268 if action.should_schedule_rollback_run() {275 if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {269 if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {276 error!("failed to disarm rollback run: {e}");270 error!("failed to disarm rollback run: {e}");277 }271 }278 }272 }279 } else {273 } else if let Err(_e) = host280 let mut cmd = MyCommand::new("rm");281 cmd.arg("-f").arg("/etc/fleet_rollback_marker");282 if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {274 .rm_file("/etc/fleet_rollback_marker", true)275 .in_current_span()276 .await277 {283 // Marker might not exist, yet better try to remove it.278 // Marker might not exist, yet better try to remove it.284 }279 }285 }286 Ok(())280 Ok(())287}281}288282289impl BuildSystems {283impl BuildSystems {290 async fn build_task(self, config: Config, host: String) -> Result<()> {284 async fn build_task(self, config: Config, host: String) -> Result<()> {291 info!("building");285 info!("building");286 let host = config.host(&host).await?;292 let action = Action::from(self.subcommand.clone());287 let action = Action::from(self.subcommand.clone());293 let fleet_field = &config.fleet_field;288 let fleet_field = &config.fleet_field;294 let drv = nix_go!(289 let drv = nix_go!(295 fleet_field.buildSystems(Obj {290 fleet_field.buildSystems(Obj {296 localSystem: { config.local_system.clone() }291 localSystem: { config.local_system.clone() }297 })[{ action.build_attr() }][{ host }]292 })[{ action.build_attr() }][{ &host.name }]298 );293 );299 let outputs = drv.build().await.map_err(|e| {294 let outputs = drv.build().await.map_err(|e| {300 if action.build_attr() == "sdImage" {295 if action.build_attr() == "sdImage" {309304310 match action {305 match action {311 Action::Upload { action } => {306 Action::Upload { action } => {312 if !config.is_local(&host) {307 if !config.is_local(&host.name) {313 info!("uploading system closure");308 info!("uploading system closure");314 {309 {310 // TODO: Move to remote_derivation method.315 // Alternatively, nix store make-content-addressed can be used,311 // Alternatively, nix store make-content-addressed can be used,316 // at least for the first deployment, to provide trusted store key.312 // at least for the first deployment, to provide trusted store key.317 //313 //329 }325 }330 let mut tries = 0;326 let mut tries = 0;331 loop {327 loop {332 let mut nix = MyCommand::new("nix");333 nix.arg("copy")334 .arg("--substitute-on-destination")335 .comparg("--to", format!("ssh-ng://{host}"))336 .arg(out_output);337 match nix.run_nix().await {328 match host.remote_derivation(out_output).await {338 Ok(()) => break,329 Ok(remote) => {330 assert!(&remote == out_output, "CA derivations aren't implemented");331 break;332 }339 Err(e) if tries < 3 => {333 Err(e) if tries < 3 => {340 tries += 1;334 tries += 1;341 warn!("Copy failure ({}/3): {}", tries, e);335 warn!("Copy failure ({}/3): {}", tries, e);346 }340 }347 }341 }348 if let Some(action) = action {342 if let Some(action) = action {349 execute_upload(&self, &config, action, &host, out_output.clone()).await?343 execute_upload(&self, action, &host, out_output.clone()).await?350 }344 }351 }345 }352 Action::Package(PackageAction::SdImage) => {346 Action::Package(PackageAction::SdImage) => {353 let mut out = current_dir()?;347 let mut out = current_dir()?;354 out.push(format!("sd-image-{}", host));348 out.push(format!("sd-image-{}", host.name));355349356 info!("linking sd image to {:?}", out);350 info!("linking sd image to {:?}", out);357 symlink(out_output, out)?;351 symlink(out_output, out)?;358 }352 }359 Action::Package(PackageAction::InstallationCd) => {353 Action::Package(PackageAction::InstallationCd) => {360 let mut out = current_dir()?;354 let mut out = current_dir()?;361 out.push(format!("installation-cd-{}", host));355 out.push(format!("installation-cd-{}", host.name));362356363 info!("linking iso image to {:?}", out);357 info!("linking iso image to {:?}", out);364 symlink(out_output, out)?;358 symlink(out_output, out)?;379 let this = this.clone();373 let this = this.clone();380 let span = info_span!("deployment", host = field::display(&host.name));374 let span = info_span!("deployment", host = field::display(&host.name));381 let hostname = host.name;375 let hostname = host.name;376 // FIXME: Since the introduction of better-nix-eval,377 // due to single repl used for builds, hosts are waiting for each other to build,378 // instead of building concurrently.379 //380 // Open multiple repls?381 //382 // Create build batcher, which will behave similar to golangs383 // WaitGroup, and start executing once all the build tasks are scheduled?384 // This also allows to cleanup build output, as there will be no longer385 // "waiting for remote machine" messages in the cases when one package is needed for386 // multiple hosts.382 set.spawn_local(387 set.spawn_local(383 (async move {388 (async move {384 match this.build_task(config, hostname).await {389 match this.build_task(config, hostname).await {cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
@@ -404,9 +404,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(data) = secret.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, data, target_recipients)
- .await?;
+ let host = config.host(&identity_holder).await?;
+ let encrypted = host.reencrypt(data, target_recipients).await?;
secret.secret.secret = Some(encrypted);
}
@@ -481,9 +480,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(secret) = data.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret, target_recipients)
- .await?;
+ let host = config.host(identity_holder).await?;
+ let encrypted = host.reencrypt(secret, target_recipients).await?;
data.secret.secret = Some(encrypted);
}
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
- collections::HashMap,
- ffi::OsStr,
- pin,
- process::Stdio,
- sync::{Arc, Mutex},
- task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
use futures::StreamExt;
use itertools::Either;
-use once_cell::sync::Lazy;
use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
use tokio::{io::AsyncRead, process::Command, select};
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
return self;
}
let mut out = Self::new("env");
- if let Some(session) = self.ssh_session {
- out = out.ssh_session(session);
- }
+ out.ssh_session = self.ssh_session;
for (k, v) in self.env {
assert!(!k.contains('='));
out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
out
} else {
let mut out = Self::new("sudo");
+ out.ssh_session = self.ssh_session.take();
out.args(self.into_args());
out
}
}
- pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
- self.ssh_session = Some(on);
- self
- }
- pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
- let mut out = Self::new("ssh");
- out.ssh_session = self.ssh_session.take();
- out.arg(on).arg("--");
- out.arg(self.into_string());
- out
- }
pub async fn run(self) -> Result<()> {
let str = self.clone().into_string();
@@ -276,259 +256,6 @@
let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
assert!(v.is_none());
Ok(())
-}
-
-pub trait Handler: Send {
- fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
- fn clone(&self) -> Self {
- Self(self.0.clone())
- }
-}
-impl<H> ClonableHandler<H> {
- pub fn new(inner: H) -> Self {
- Self(Arc::new(Mutex::new(inner)))
- }
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
- fn handle_line(&mut self, e: &str) {
- self.0.lock().unwrap().handle_line(e)
- }
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
- fn handle_line(&mut self, e: &str) {
- info!(target: "log", "{e}");
- }
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
- fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
- spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
- static OSC_CLEANER: Lazy<Regex> =
- Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
- static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
- let m = OSC_CLEANER.replace_all(m, "");
- // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
- // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
- DETABBER.replace_all(m.as_ref(), " ").to_string()
-}
-impl Handler for NixHandler {
- fn handle_line(&mut self, e: &str) {
- if let Some(e) = e.strip_prefix("@nix ") {
- let log: NixLog = match serde_json::from_str(e) {
- Ok(l) => l,
- Err(err) => {
- warn!("failed to parse nix log line {:?}: {}", e, err);
- return;
- }
- };
- match log {
- NixLog::Msg { msg, raw_msg, .. } => {
- #[allow(clippy::nonminimal_bool)]
- if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
- && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
- && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
- if let Some(raw_msg) = raw_msg {
- if !msg.is_empty() {
- info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
- } else {
- info!(target: "nix", "{}", raw_msg.trim_end())
- }
- } else {
- info!(target: "nix", "{}", msg.trim_end())
- }
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 105 && !fields.is_empty() => {
- if let [LogField::String(drv), ..] = &fields[..] {
- let mut drv = drv.as_str();
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- info!(target: "nix","building {}", drv);
- let span = info_span!("build", drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad build log: {:?}", log)
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 100 && fields.len() >= 3 => {
- if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
- &fields[..]
- {
- let mut drv = drv.as_str();
-
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- // info!(target: "nix","copying {} {} -> {}", drv, from, to);
- let span = info_span!("copy", from, to, drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad copy log: {:?}", log)
- }
- }
- NixLog::Start { text, typ, id, .. }
- if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
- {
- if !text.is_empty()
- && text != "querying info about missing paths"
- && text != "copying 0 paths"
- // Too much spam on lazy-trees branch
- && !(text.starts_with("copying '") && text.ends_with("' to the store"))
- {
- let span = info_span!("job");
- span.pb_start();
- span.pb_set_message(&process_message(text.trim()));
- self.spans.insert(id, span);
- info!(target: "nix", "{}", text);
- }
- }
- NixLog::Start {
- text,
- level: 0,
- typ: 108,
- ..
- } if text.is_empty() => {
- // Cache lookup? Coupled with copy log
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 109,
- ..
- } if text.starts_with("querying info about ") => {
- // Cache lookup
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 101,
- ..
- } if text.starts_with("downloading ") => {
- // NAR downloading, coupled with copy log
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- ..
- } if text.starts_with("waiting for a machine to build ") => {
- // Useless repeating notification about build
- }
- NixLog::Start {
- text,
- level: 3,
- typ: 111,
- ..
- } if text.starts_with("resolved derivation: ") => {
- // CA resolved
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- id,
- ..
- } if text.starts_with("waiting for lock on ") => {
- let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
- if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
- drv = txt;
- }
- if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
- drv = txt;
- }
- if let Some(txt) = drv.split("', '").next() {
- drv = txt;
- }
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- let span = info_span!("waiting on drv", drv);
- span.pb_start();
- self.spans.insert(id, span);
- // Concurrent build of the same message
- }
- NixLog::Stop { id, .. } => {
- self.spans.remove(&id);
- }
- NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
- if let Some(span) = self.spans.get(&id) {
- if let LogField::String(s) = &fields[0] {
- span.pb_set_message(&process_message(s.trim()));
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- warn!("unknown result id: {id} {typ} {fields:?}");
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
- if let Some(span) = self.spans.get(&id) {
- if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
- &fields[..4]
- {
- span.pb_set_length(*expected);
- span.pb_set_position(*done);
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- // warn!("unknown result id: {id} {typ} {fields:?}");
- // Unaccounted progress.
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
- // Set phase, expected
- }
- _ => warn!("unknown log: {:?}", log),
- };
- } else {
- let e = e.trim();
- if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
- return;
- }
- info!("{e}")
- }
- }
}
async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
) -> Result<Option<Vec<u8>>> {
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
+ debug!("running command {cmd:?} on local");
let mut child = cmd.spawn()?;
let mut stderr = child.stderr.take().unwrap();
let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
err_handler: &mut dyn Handler,
mut out_handler: Option<&mut dyn Handler>,
) -> Result<Option<Vec<u8>>> {
+ debug!("running command {cmd:?} over ssh");
cmd.stderr(openssh::Stdio::piped());
cmd.stdout(openssh::Stdio::piped());
let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
}
Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
- /// Return true to discard message from logging
- fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
- String(String),
- Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- struct StringOrNum;
- impl<'de> Visitor<'de> for StringOrNum {
- type Value = LogField;
-
- fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "string or unsigned")
- }
-
- fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::String(v.to_owned()))
- }
-
- fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::Num(v))
- }
- }
-
- deserializer.deserialize_any(StringOrNum)
- }
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
- Msg {
- level: u32,
- msg: String,
- raw_msg: Option<String>,
- },
- Start {
- id: u64,
- level: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- text: String,
- #[serde(rename = "type")]
- typ: u32,
- },
- Stop {
- id: u64,
- },
- Result {
- id: u64,
- #[serde(rename = "type")]
- typ: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- },
}
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
sync::{Arc, Mutex, MutexGuard, OnceLock},
};
-use age::Recipient;
use anyhow::{anyhow, bail, Context, Result};
use clap::{ArgGroup, Parser};
use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
pub struct ConfigHost {
pub name: String,
+ pub local: bool,
pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ assert!(!self.local, "do not open ssh connection to local session");
// FIXME: TOCTOU
if let Some(session) = &self.session.get() {
return Ok((*session).clone());
@@ -96,8 +97,12 @@
D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
}
pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
- let session = self.open_session().await?;
- Ok(MyCommand::new_on(cmd, session))
+ if self.local {
+ Ok(MyCommand::new(cmd))
+ } else {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
}
pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
.context("failed to call remote host for decrypt")?;
z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
}
+ pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+ for target in targets {
+ cmd.eqarg("--targets", target);
+ }
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
/// Returns path for futureproofing, as path might change i.e on conversion to CA
pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ if self.local {
+ // Path is located locally, thus already trusted.
+ return Ok(path.to_owned());
+ }
let mut nix = MyCommand::new("nix");
nix.arg("copy")
.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
nix.run_nix().await?;
Ok(path.to_owned())
}
+ pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("stop").arg(name);
+ cmd.sudo().run().await
+ }
+ pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("start").arg(name);
+ cmd.sudo().run().await
+ }
+
+ pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+ let mut cmd = self.cmd("rm").await?;
+ cmd.arg("-f").arg(path);
+ if sudo {
+ cmd = cmd.sudo()
+ }
+ cmd.run().await
+ }
}
impl Config {
@@ -134,35 +175,12 @@
}
pub fn is_local(&self, host: &str) -> bool {
self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
- }
-
- pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run().await
- }
- pub async fn run_string_on(
- &self,
- host: &str,
- mut command: MyCommand,
- sudo: bool,
- ) -> Result<String> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run_string().await
}
pub async fn host(&self, name: &str) -> Result<ConfigHost> {
Ok(ConfigHost {
name: name.to_owned(),
+ local: self.is_local(name),
session: OnceLock::new(),
})
}
@@ -172,6 +190,7 @@
let mut out = vec![];
for name in names {
out.push(ConfigHost {
+ local: self.is_local(&name),
name,
session: OnceLock::new(),
})
@@ -225,27 +244,6 @@
let mut data = self.data_mut();
let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
host_secrets.insert(secret, value);
- }
-
- pub async fn reencrypt_on_host(
- &self,
- host: &str,
- data: SecretData,
- targets: Vec<String>,
- ) -> Result<SecretData> {
- let mut recmd = MyCommand::new("fleet-install-secrets");
- recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
- for target in targets {
- recmd.eqarg("--targets", target);
- }
- recmd = recmd.sudo().ssh(host);
- let encoded = recmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- SecretData::decode_z85(&encoded)
}
pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}