difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
use std::sync::{Arc, OnceLock};
use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
use futures::StreamExt;
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::codec::{FramedRead, LinesCodec};
use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
@@ -30,6 +32,8 @@
string_wrapping: (String, String),
number_wrapping: (String, String),
+ executing_command: Arc<Mutex<()>>,
+
next_id: u32,
free_list: Vec<u32>,
}
@@ -219,6 +223,8 @@
string_wrapping: Default::default(),
number_wrapping: Default::default(),
+ executing_command: Arc::new(Mutex::new(())),
+
next_id: 0,
free_list: vec![],
};
@@ -331,6 +337,10 @@
expr: impl AsRef<[u8]>,
err_handler: &mut dyn Handler,
) -> Result<String> {
+ // Prevent two commands from being executed in parallel, messing with each other.
+ let _lock = self.executing_command.clone();
+ let _guard = _lock.lock().await;
+
self.send_command(expr).await?;
// It will be echoed
self.send_command(REPL_DELIMITER).await?;
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -112,12 +112,12 @@
current: bool,
datetime: String,
}
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
- let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.arg("--list-generations");
// Sudo is required due to --list-generations acquiring lock on the profile.
- let data = config.run_string_on(host, cmd, true).await?;
+ let data = cmd.sudo().run_string().await?;
let generations = data
.split('\n')
.map(|e| e.trim())
@@ -161,25 +161,12 @@
.map_err(|_e| anyhow!("bad list-generations output"))?
.ok_or_else(|| anyhow!("failed to find generation"))?;
Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("stop").arg(unit);
- config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("start").arg(unit);
- config.run_on(host, cmd, true).await
}
async fn execute_upload(
build: &BuildSystems,
- config: &Config,
action: UploadAction,
- host: &str,
+ host: &ConfigHost,
built: PathBuf,
) -> Result<()> {
let mut failed = false;
@@ -191,15 +178,15 @@
if !build.disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
- let generation = get_current_generation(config, host).await?;
+ let generation = get_current_generation(&host).await?;
info!(
"rollback target would be {} {}",
generation.id, generation.datetime
);
{
- let mut cmd = MyCommand::new("sh");
+ let mut cmd = host.cmd("sh").await?;
cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to set rollback marker: {e}");
failed = true;
}
@@ -215,37 +202,41 @@
// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
// Anyway, reboot will still help in this case.
if action.should_schedule_rollback_run() {
- let mut cmd = MyCommand::new("systemd-run");
+ let mut cmd = host.cmd("systemd-run").await?;
cmd.comparg("--on-active", "3min")
.comparg("--unit", "rollback-watchdog-run")
.arg("systemctl")
.arg("start")
.arg("rollback-watchdog.service");
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to schedule rollback run: {e}");
failed = true;
}
}
}
+
if action.should_switch_profile() && !failed {
info!("switching generation");
- let mut cmd = MyCommand::new("nix-env");
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to switch generation: {e}");
failed = true;
}
}
+
+ // FIXME: Connection might be disconnected after activation run
+
if action.should_activate() && !failed {
let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
- let mut cmd = MyCommand::new(switch_script);
+ let mut cmd = host.cmd(switch_script).await?;
cmd.arg(action.name());
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
@@ -253,7 +244,8 @@
if !build.disable_rollback {
if failed {
info!("executing rollback");
- if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+ if let Err(e) = host
+ .systemctl_start("rollback-watchdog.service")
.instrument(info_span!("rollback"))
.await
{
@@ -261,27 +253,29 @@
}
} else {
info!("trying to mark upgrade as successful");
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
}
}
info!("disarming watchdog, just in case");
- if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+ if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
// It is ok, if there was no reboot - then timer might not be running.
}
if action.should_schedule_rollback_run() {
- if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+ if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
error!("failed to disarm rollback run: {e}");
}
}
- } else {
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
- // Marker might not exist, yet better try to remove it.
- }
+ } else if let Err(_e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
+ // Marker might not exist, yet better try to remove it.
}
Ok(())
}
@@ -289,12 +283,13 @@
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
+ let host = config.host(&host).await?;
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
let drv = nix_go!(
fleet_field.buildSystems(Obj {
localSystem: { config.local_system.clone() }
- })[{ action.build_attr() }][{ host }]
+ })[{ action.build_attr() }][{ &host.name }]
);
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
match action {
Action::Upload { action } => {
- if !config.is_local(&host) {
+ if !config.is_local(&host.name) {
info!("uploading system closure");
{
+ // TODO: Move to remote_derivation method.
// Alternatively, nix store make-content-addressed can be used,
// at least for the first deployment, to provide trusted store key.
//
@@ -329,13 +325,11 @@
}
let mut tries = 0;
loop {
- let mut nix = MyCommand::new("nix");
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{host}"))
- .arg(out_output);
- match nix.run_nix().await {
- Ok(()) => break,
+ match host.remote_derivation(out_output).await {
+ Ok(remote) => {
+ assert!(&remote == out_output, "CA derivations aren't implemented");
+ break;
+ }
Err(e) if tries < 3 => {
tries += 1;
warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, out_output.clone()).await?
+ execute_upload(&self, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host));
+ out.push(format!("sd-image-{}", host.name));
info!("linking sd image to {:?}", out);
symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host));
+ out.push(format!("installation-cd-{}", host.name));
info!("linking iso image to {:?}", out);
symlink(out_output, out)?;
@@ -379,6 +373,17 @@
let this = this.clone();
let span = info_span!("deployment", host = field::display(&host.name));
let hostname = host.name;
+ // FIXME: Since the introduction of better-nix-eval,
+ // due to single repl used for builds, hosts are waiting for each other to build,
+ // instead of building concurrently.
+ //
+ // Open multiple repls?
+ //
+ // Create build batcher, which will behave similar to golangs
+ // WaitGroup, and start executing once all the build tasks are scheduled?
+ // This also allows to cleanup build output, as there will be no longer
+ // "waiting for remote machine" messages in the cases when one package is needed for
+ // multiple hosts.
set.spawn_local(
(async move {
match this.build_task(config, hostname).await {
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth1use crate::{2 better_nix_eval::Field,3 fleetdata::{FleetSecret, FleetSharedSecret, SecretData},4 host::Config,5 nix_go, nix_go_json,6};7use anyhow::{anyhow, bail, ensure, Context, Result};8use chrono::{DateTime, Utc};9use clap::Parser;10use futures::{StreamExt, TryStreamExt};11use itertools::Itertools;12use owo_colors::OwoColorize;13use std::{14 collections::HashSet,15 io::{self, Cursor, Read},16 path::PathBuf,17};18use tabled::{Table, Tabled};19use tokio::fs::read_to_string;20use tracing::{info, info_span, warn};2122#[derive(Parser)]23pub enum Secret {24 /// Force load host keys for all defined hosts25 ForceKeys,26 /// Add secret, data should be provided in stdin27 AddShared {28 /// Secret name29 name: String,30 /// Secret owners31 machines: Vec<String>,32 /// Override secret if already present33 #[clap(long)]34 force: bool,35 /// Secret public part36 #[clap(long)]37 public: Option<String>,38 /// Load public part from specified file39 #[clap(long)]40 public_file: Option<PathBuf>,4142 /// Create a notification on secret expiration43 #[clap(long)]44 expires_at: Option<DateTime<Utc>>,4546 /// Secret with this name already exists, override its value while keeping the same owners.47 #[clap(long)]48 re_add: bool,49 },50 /// Add secret, data should be provided in stdin51 Add {52 /// Secret name53 name: String,54 /// Secret owners55 machine: String,56 /// Override secret if already present57 #[clap(long)]58 force: bool,59 #[clap(long)]60 public: Option<String>,61 #[clap(long)]62 public_file: Option<PathBuf>,63 },64 /// Read secret from remote host, requires sudo on said host65 Read {66 name: String,67 machine: String,68 #[clap(long)]69 plaintext: bool,70 },71 UpdateShared {72 name: String,7374 #[clap(long)]75 machines: Option<Vec<String>>,7677 #[clap(long)]78 add_machines: Vec<String>,79 #[clap(long)]80 remove_machines: Vec<String>,8182 /// Which host should we use to decrypt83 #[clap(long)]84 prefer_identities: Vec<String>,85 },86 Regenerate {87 /// Which host should we use to decrypt, in case if reencryption is required, without88 /// regeneration89 #[clap(long)]90 prefer_identities: Vec<String>,91 },92 List {},93}9495async fn generate_shared(96 config: &Config,97 display_name: &str,98 secret: Field,99) -> Result<FleetSharedSecret> {100 Ok(if secret.has_field("generateImpure").await? {101 let config_field = &config.config_unchecked_field;102 let generate = nix_go!(secret.generateImpure);103 let owners: Vec<String> = nix_go_json!(secret.expectedOwners);104105 let on: String = nix_go_json!(generate.on);106 let call_package = nix_go!(107 config_field.buildableSystems(Obj {108 localSystem: { config.local_system.clone() }109 })[{ on }]110 .config111 .nixpkgs112 .resolvedPkgs113 .callPackage114 );115116 let host = config.host(&on).await?;117118 let generator = nix_go!(call_package(generate.generator)(Obj {}));119 let generator = generator.build().await?;120 let generator = generator121 .get("out")122 .ok_or_else(|| anyhow!("missing generateImpure out"))?;123 let generator = host.remote_derivation(generator).await?;124125 let mut recipients = String::new();126 for owner in &owners {127 let key = config.key(owner).await?;128 recipients.push_str(&format!("-r \"{key}\" "));129 }130 recipients.push_str("-e");131132 let out = host.mktemp_dir().await?;133134 let mut gen = host.cmd(generator).await?;135 gen.env("rageArgs", recipients).env("out", &out);136 gen.run().await?;137138 {139 let marker = host.read_file_text(format!("{out}/marker")).await?;140 ensure!(marker == "SUCCESS", "generation not succeeded");141 }142143 let public = host.read_file_text(format!("{out}/public")).await.ok();144 let secret = host.read_file_bin(format!("{out}/secret")).await.ok();145 if let Some(secret) = &secret {146 ensure!(147 age::Decryptor::new(Cursor::new(&secret)).is_ok(),148 "builder produced non-encrypted value as secret, this is highly insecure"149 );150 }151152 let created_at = host.read_file_value(format!("{out}/created_at")).await?;153 let expires_at = host.read_file_value(format!("{out}/expires_at")).await.ok();154155 FleetSharedSecret {156 owners,157 secret: FleetSecret {158 created_at,159 expires_at,160 public,161 secret: secret.map(SecretData),162 },163 }164 } else {165 bail!("no generator defined for {display_name}")166 })167}168169async fn parse_public(170 public: Option<String>,171 public_file: Option<PathBuf>,172) -> Result<Option<String>> {173 Ok(match (public, public_file) {174 (Some(v), None) => Some(v),175 (None, Some(v)) => Some(read_to_string(v).await?),176 (Some(_), Some(_)) => {177 bail!("only public or public_file should be set")178 }179 (None, None) => None,180 })181}182183fn parse_machines(184 initial: Vec<String>,185 machines: Option<Vec<String>>,186 mut add_machines: Vec<String>,187 mut remove_machines: Vec<String>,188) -> Result<Vec<String>> {189 if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {190 bail!("no operation");191 }192193 let initial_machines = initial.clone();194 let mut target_machines = initial;195 info!("Currently encrypted for {initial_machines:?}");196197 // ensure!(machines.is_some() || !add_machines.is_empty() || )198 if let Some(machines) = machines {199 ensure!(200 add_machines.is_empty() && remove_machines.is_empty(),201 "can't combine --machines and --add-machines/--remove-machines"202 );203 let target = initial_machines.iter().collect::<HashSet<_>>();204 let source = machines.iter().collect::<HashSet<_>>();205 for removed in target.difference(&source) {206 remove_machines.push((*removed).clone());207 }208 for added in source.difference(&target) {209 add_machines.push((*added).clone());210 }211 }212213 for machine in &remove_machines {214 let mut removed = false;215 while let Some(pos) = target_machines.iter().position(|m| m == machine) {216 target_machines.swap_remove(pos);217 removed = true;218 }219 if !removed {220 warn!("secret is not enabled for {machine}");221 }222 }223 for machine in &add_machines {224 if target_machines.iter().any(|m| m == machine) {225 warn!("secret is already added to {machine}");226 } else {227 target_machines.push(machine.to_owned());228 }229 }230 if !remove_machines.is_empty() {231 // TODO: maybe force secret regeneration?232 // Not that useful without revokation.233 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");234 }235 Ok(target_machines)236}237impl Secret {238 pub async fn run(self, config: &Config) -> Result<()> {239 match self {240 Secret::ForceKeys => {241 for host in config.list_hosts().await? {242 if config.should_skip(&host.name) {243 continue;244 }245 config.key(&host.name).await?;246 }247 }248 Secret::AddShared {249 mut machines,250 name,251 force,252 public,253 public_file,254 expires_at,255 re_add,256 } => {257 let exists = config.has_shared(&name);258 if exists && !force && !re_add {259 bail!("secret already defined");260 }261 if re_add {262 // Fixme: use clap to limit this usage263 ensure!(!force, "--force and --readd are not compatible");264 ensure!(exists, "secret doesn't exists");265 ensure!(266 machines.is_empty(),267 "you can't use machines argument for --readd"268 );269 let shared = config.shared_secret(&name)?;270 machines = shared.owners;271 }272273 let recipients = config274 .recipients(&machines.iter().map(String::as_str).collect_vec())275 .await?;276277 let secret = {278 let mut input = vec![];279 io::stdin().read_to_end(&mut input)?;280281 if input.is_empty() {282 None283 } else {284 Some(285 SecretData::encrypt(recipients, input)286 .ok_or_else(|| anyhow!("no recipients provided"))?,287 )288 }289 };290 let public = parse_public(public, public_file).await?;291 config.replace_shared(292 name,293 FleetSharedSecret {294 owners: machines,295 secret: FleetSecret {296 created_at: Utc::now(),297 expires_at,298 secret,299 public,300 },301 },302 );303 }304 Secret::Add {305 machine,306 name,307 force,308 public,309 public_file,310 } => {311 let recipient = config.recipient(&machine).await?;312313 let secret = {314 let mut input = vec![];315 io::stdin().read_to_end(&mut input)?;316 if input.is_empty() {317 bail!("no data provided")318 }319320 Some(SecretData::encrypt(vec![recipient], input).expect("recipient provided"))321 };322323 if config.has_secret(&machine, &name) && !force {324 bail!("secret already defined");325 }326 let public = parse_public(public, public_file).await?;327328 config.insert_secret(329 &machine,330 name,331 FleetSecret {332 created_at: Utc::now(),333 expires_at: None,334 secret,335 public,336 },337 );338 }339 #[allow(clippy::await_holding_refcell_ref)]340 Secret::Read {341 name,342 machine,343 plaintext,344 } => {345 let secret = config.host_secret(&machine, &name)?;346 let Some(secret) = secret.secret else {347 bail!("no secret {name}");348 };349 let host = config.host(&machine).await?;350 let data = host.decrypt(secret).await?;351 if plaintext {352 let s = String::from_utf8(data).context("output is not utf8")?;353 print!("{s}");354 } else {355 println!("{}", z85::encode(&data));356 }357 }358 Secret::UpdateShared {359 name,360 machines,361 add_machines,362 remove_machines,363 prefer_identities,364 } => {365 let mut secret = config.shared_secret(&name)?;366 if secret.secret.secret.is_none() {367 bail!("no secret");368 }369370 let initial_machines = secret.owners.clone();371 let target_machines = parse_machines(372 initial_machines.clone(),373 machines,374 add_machines,375 remove_machines,376 )?;377378 if target_machines.is_empty() {379 info!("no machines left for secret, removing it");380 config.remove_shared(&name);381 return Ok(());382 }383384 if target_machines == initial_machines {385 warn!("secret owners are already correct");386 return Ok(());387 }388389 let identity_holder = if !prefer_identities.is_empty() {390 prefer_identities391 .iter()392 .find(|i| initial_machines.iter().any(|s| s == *i))393 } else {394 secret.owners.first()395 };396 let Some(identity_holder) = identity_holder else {397 bail!("no available holder found");398 };399 let target_recipients = futures::stream::iter(&target_machines)400 .then(|m| async { config.key(m).await })401 .collect::<Vec<_>>()402 .await;403 let target_recipients =404 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;405406 if let Some(data) = secret.secret.secret {407 let encrypted = config408 .reencrypt_on_host(identity_holder, data, target_recipients)409 .await?;410 secret.secret.secret = Some(encrypted);411 }412413 secret.owners = target_machines;414 config.replace_shared(name, secret);415 }416 Secret::Regenerate { prefer_identities } => {417 {418 let expected_shared_set = config419 .list_configured_shared()420 .await?421 .into_iter()422 .collect::<HashSet<_>>();423 let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();424 for removed in expected_shared_set.difference(&shared_set) {425 info!("generating secret: {removed}");426 let config_field = &config.config_unchecked_field;427 let config_field = nix_go!(config_field.configUnchecked);428 let secret = nix_go!(config_field.sharedSecrets[{ removed }]);429 let shared = generate_shared(config, removed, secret).await?;430 config.replace_shared(removed.to_string(), shared)431 }432 }433 let mut to_remove = Vec::new();434 for name in &config.list_shared() {435 info!("updating secret: {name}");436 let mut data = config.shared_secret(name)?;437 let config_field = &config.config_unchecked_field;438 let config_field = nix_go!(config_field.configUnchecked);439 let expected_owners: Vec<String> =440 nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);441 if expected_owners.is_empty() {442 warn!("secret was removed from fleet config: {name}, removing from data");443 to_remove.push(name.to_string());444 continue;445 }446 let set = data.owners.iter().collect::<HashSet<_>>();447 let expected_set = expected_owners.iter().collect::<HashSet<_>>();448 let should_remove = set.difference(&expected_set).next().is_some();449 if set == expected_set {450 info!("secret data is ok");451 continue;452 }453454 let secret = nix_go!(config_field.sharedSecrets[{ name }]);455 let owner_dependent: bool = nix_go_json!(secret.ownerDependent);456 let regenerate_on_remove: bool = nix_go_json!(secret.regenerateOnOwnerRemoved);457 #[allow(clippy::nonminimal_bool)]458 if !owner_dependent && !(should_remove && regenerate_on_remove) {459 warn!("reencrypting secret '{name}' for new owner set");460 // TODO: force regeneration461 if should_remove {462 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");463 }464465 let identity_holder = if !prefer_identities.is_empty() {466 prefer_identities467 .iter()468 .find(|i| data.owners.iter().any(|s| s == *i))469 } else {470 data.owners.first()471 };472 let Some(identity_holder) = identity_holder else {473 bail!("no available holder found");474 };475476 let target_recipients = futures::stream::iter(&expected_owners)477 .then(|m| async { config.key(m).await })478 .collect::<Vec<_>>()479 .await;480 let target_recipients =481 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;482483 if let Some(secret) = data.secret.secret {484 let encrypted = config485 .reencrypt_on_host(identity_holder, secret, target_recipients)486 .await?;487488 data.secret.secret = Some(encrypted);489 }490 data.owners = expected_owners;491 config.replace_shared(name.to_owned(), data);492 } else {493 let shared = generate_shared(config, name, secret).await?;494 config.replace_shared(name.to_owned(), shared)495 }496 }497 for k in to_remove {498 config.remove_shared(&k);499 }500 }501 Secret::List {} => {502 let _span = info_span!("loading secrets").entered();503 let configured = config.list_configured_shared().await?;504 #[derive(Tabled)]505 struct SecretDisplay {506 #[tabled(rename = "Name")]507 name: String,508 #[tabled(rename = "Owners")]509 owners: String,510 }511 let mut table = vec![];512 for name in configured.iter().cloned() {513 let config = config.clone();514 let expected_owners = config.shared_secret_expected_owners(&name).await?;515 let data = config.shared_secret(&name)?;516 let owners = data517 .owners518 .iter()519 .map(|o| {520 if expected_owners.contains(o) {521 o.green().to_string()522 } else {523 o.red().to_string()524 }525 })526 .collect::<Vec<_>>();527 table.push(SecretDisplay {528 owners: owners.join(", "),529 name,530 })531 }532 info!("loaded\n{}", Table::new(table).to_string())533 }534 }535 Ok(())536 }537}1use crate::{2 better_nix_eval::Field,3 fleetdata::{FleetSecret, FleetSharedSecret, SecretData},4 host::Config,5 nix_go, nix_go_json,6};7use anyhow::{anyhow, bail, ensure, Context, Result};8use chrono::{DateTime, Utc};9use clap::Parser;10use futures::StreamExt;11use itertools::Itertools;12use owo_colors::OwoColorize;13use std::{14 collections::HashSet,15 io::{self, Cursor, Read},16 path::PathBuf,17};18use tabled::{Table, Tabled};19use tokio::fs::read_to_string;20use tracing::{info, info_span, warn};2122#[derive(Parser)]23pub enum Secret {24 /// Force load host keys for all defined hosts25 ForceKeys,26 /// Add secret, data should be provided in stdin27 AddShared {28 /// Secret name29 name: String,30 /// Secret owners31 machines: Vec<String>,32 /// Override secret if already present33 #[clap(long)]34 force: bool,35 /// Secret public part36 #[clap(long)]37 public: Option<String>,38 /// Load public part from specified file39 #[clap(long)]40 public_file: Option<PathBuf>,4142 /// Create a notification on secret expiration43 #[clap(long)]44 expires_at: Option<DateTime<Utc>>,4546 /// Secret with this name already exists, override its value while keeping the same owners.47 #[clap(long)]48 re_add: bool,49 },50 /// Add secret, data should be provided in stdin51 Add {52 /// Secret name53 name: String,54 /// Secret owners55 machine: String,56 /// Override secret if already present57 #[clap(long)]58 force: bool,59 #[clap(long)]60 public: Option<String>,61 #[clap(long)]62 public_file: Option<PathBuf>,63 },64 /// Read secret from remote host, requires sudo on said host65 Read {66 name: String,67 machine: String,68 #[clap(long)]69 plaintext: bool,70 },71 UpdateShared {72 name: String,7374 #[clap(long)]75 machines: Option<Vec<String>>,7677 #[clap(long)]78 add_machines: Vec<String>,79 #[clap(long)]80 remove_machines: Vec<String>,8182 /// Which host should we use to decrypt83 #[clap(long)]84 prefer_identities: Vec<String>,85 },86 Regenerate {87 /// Which host should we use to decrypt, in case if reencryption is required, without88 /// regeneration89 #[clap(long)]90 prefer_identities: Vec<String>,91 },92 List {},93}9495async fn generate_shared(96 config: &Config,97 display_name: &str,98 secret: Field,99) -> Result<FleetSharedSecret> {100 Ok(if secret.has_field("generateImpure").await? {101 let config_field = &config.config_unchecked_field;102 let generate = nix_go!(secret.generateImpure);103 let owners: Vec<String> = nix_go_json!(secret.expectedOwners);104105 let on: String = nix_go_json!(generate.on);106 let call_package = nix_go!(107 config_field.buildableSystems(Obj {108 localSystem: { config.local_system.clone() }109 })[{ on }]110 .config111 .nixpkgs112 .resolvedPkgs113 .callPackage114 );115116 let host = config.host(&on).await?;117118 let generator = nix_go!(call_package(generate.generator)(Obj {}));119 let generator = generator.build().await?;120 let generator = generator121 .get("out")122 .ok_or_else(|| anyhow!("missing generateImpure out"))?;123 let generator = host.remote_derivation(generator).await?;124125 let mut recipients = String::new();126 for owner in &owners {127 let key = config.key(owner).await?;128 recipients.push_str(&format!("-r \"{key}\" "));129 }130 recipients.push_str("-e");131132 let out = host.mktemp_dir().await?;133134 let mut gen = host.cmd(generator).await?;135 gen.env("rageArgs", recipients).env("out", &out);136 gen.run().await?;137138 {139 let marker = host.read_file_text(format!("{out}/marker")).await?;140 ensure!(marker == "SUCCESS", "generation not succeeded");141 }142143 let public = host.read_file_text(format!("{out}/public")).await.ok();144 let secret = host.read_file_bin(format!("{out}/secret")).await.ok();145 if let Some(secret) = &secret {146 ensure!(147 age::Decryptor::new(Cursor::new(&secret)).is_ok(),148 "builder produced non-encrypted value as secret, this is highly insecure"149 );150 }151152 let created_at = host.read_file_value(format!("{out}/created_at")).await?;153 let expires_at = host.read_file_value(format!("{out}/expires_at")).await.ok();154155 FleetSharedSecret {156 owners,157 secret: FleetSecret {158 created_at,159 expires_at,160 public,161 secret: secret.map(SecretData),162 },163 }164 } else {165 bail!("no generator defined for {display_name}")166 })167}168169async fn parse_public(170 public: Option<String>,171 public_file: Option<PathBuf>,172) -> Result<Option<String>> {173 Ok(match (public, public_file) {174 (Some(v), None) => Some(v),175 (None, Some(v)) => Some(read_to_string(v).await?),176 (Some(_), Some(_)) => {177 bail!("only public or public_file should be set")178 }179 (None, None) => None,180 })181}182183fn parse_machines(184 initial: Vec<String>,185 machines: Option<Vec<String>>,186 mut add_machines: Vec<String>,187 mut remove_machines: Vec<String>,188) -> Result<Vec<String>> {189 if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {190 bail!("no operation");191 }192193 let initial_machines = initial.clone();194 let mut target_machines = initial;195 info!("Currently encrypted for {initial_machines:?}");196197 // ensure!(machines.is_some() || !add_machines.is_empty() || )198 if let Some(machines) = machines {199 ensure!(200 add_machines.is_empty() && remove_machines.is_empty(),201 "can't combine --machines and --add-machines/--remove-machines"202 );203 let target = initial_machines.iter().collect::<HashSet<_>>();204 let source = machines.iter().collect::<HashSet<_>>();205 for removed in target.difference(&source) {206 remove_machines.push((*removed).clone());207 }208 for added in source.difference(&target) {209 add_machines.push((*added).clone());210 }211 }212213 for machine in &remove_machines {214 let mut removed = false;215 while let Some(pos) = target_machines.iter().position(|m| m == machine) {216 target_machines.swap_remove(pos);217 removed = true;218 }219 if !removed {220 warn!("secret is not enabled for {machine}");221 }222 }223 for machine in &add_machines {224 if target_machines.iter().any(|m| m == machine) {225 warn!("secret is already added to {machine}");226 } else {227 target_machines.push(machine.to_owned());228 }229 }230 if !remove_machines.is_empty() {231 // TODO: maybe force secret regeneration?232 // Not that useful without revokation.233 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");234 }235 Ok(target_machines)236}237impl Secret {238 pub async fn run(self, config: &Config) -> Result<()> {239 match self {240 Secret::ForceKeys => {241 for host in config.list_hosts().await? {242 if config.should_skip(&host.name) {243 continue;244 }245 config.key(&host.name).await?;246 }247 }248 Secret::AddShared {249 mut machines,250 name,251 force,252 public,253 public_file,254 expires_at,255 re_add,256 } => {257 let exists = config.has_shared(&name);258 if exists && !force && !re_add {259 bail!("secret already defined");260 }261 if re_add {262 // Fixme: use clap to limit this usage263 ensure!(!force, "--force and --readd are not compatible");264 ensure!(exists, "secret doesn't exists");265 ensure!(266 machines.is_empty(),267 "you can't use machines argument for --readd"268 );269 let shared = config.shared_secret(&name)?;270 machines = shared.owners;271 }272273 let recipients = config274 .recipients(&machines.iter().map(String::as_str).collect_vec())275 .await?;276277 let secret = {278 let mut input = vec![];279 io::stdin().read_to_end(&mut input)?;280281 if input.is_empty() {282 None283 } else {284 Some(285 SecretData::encrypt(recipients, input)286 .ok_or_else(|| anyhow!("no recipients provided"))?,287 )288 }289 };290 let public = parse_public(public, public_file).await?;291 config.replace_shared(292 name,293 FleetSharedSecret {294 owners: machines,295 secret: FleetSecret {296 created_at: Utc::now(),297 expires_at,298 secret,299 public,300 },301 },302 );303 }304 Secret::Add {305 machine,306 name,307 force,308 public,309 public_file,310 } => {311 let recipient = config.recipient(&machine).await?;312313 let secret = {314 let mut input = vec![];315 io::stdin().read_to_end(&mut input)?;316 if input.is_empty() {317 bail!("no data provided")318 }319320 Some(SecretData::encrypt(vec![recipient], input).expect("recipient provided"))321 };322323 if config.has_secret(&machine, &name) && !force {324 bail!("secret already defined");325 }326 let public = parse_public(public, public_file).await?;327328 config.insert_secret(329 &machine,330 name,331 FleetSecret {332 created_at: Utc::now(),333 expires_at: None,334 secret,335 public,336 },337 );338 }339 #[allow(clippy::await_holding_refcell_ref)]340 Secret::Read {341 name,342 machine,343 plaintext,344 } => {345 let secret = config.host_secret(&machine, &name)?;346 let Some(secret) = secret.secret else {347 bail!("no secret {name}");348 };349 let host = config.host(&machine).await?;350 let data = host.decrypt(secret).await?;351 if plaintext {352 let s = String::from_utf8(data).context("output is not utf8")?;353 print!("{s}");354 } else {355 println!("{}", z85::encode(&data));356 }357 }358 Secret::UpdateShared {359 name,360 machines,361 add_machines,362 remove_machines,363 prefer_identities,364 } => {365 let mut secret = config.shared_secret(&name)?;366 if secret.secret.secret.is_none() {367 bail!("no secret");368 }369370 let initial_machines = secret.owners.clone();371 let target_machines = parse_machines(372 initial_machines.clone(),373 machines,374 add_machines,375 remove_machines,376 )?;377378 if target_machines.is_empty() {379 info!("no machines left for secret, removing it");380 config.remove_shared(&name);381 return Ok(());382 }383384 if target_machines == initial_machines {385 warn!("secret owners are already correct");386 return Ok(());387 }388389 let identity_holder = if !prefer_identities.is_empty() {390 prefer_identities391 .iter()392 .find(|i| initial_machines.iter().any(|s| s == *i))393 } else {394 secret.owners.first()395 };396 let Some(identity_holder) = identity_holder else {397 bail!("no available holder found");398 };399 let target_recipients = futures::stream::iter(&target_machines)400 .then(|m| async { config.key(m).await })401 .collect::<Vec<_>>()402 .await;403 let target_recipients =404 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;405406 if let Some(data) = secret.secret.secret {407 let host = config.host(&identity_holder).await?;408 let encrypted = host.reencrypt(data, target_recipients).await?;409 secret.secret.secret = Some(encrypted);410 }411412 secret.owners = target_machines;413 config.replace_shared(name, secret);414 }415 Secret::Regenerate { prefer_identities } => {416 {417 let expected_shared_set = config418 .list_configured_shared()419 .await?420 .into_iter()421 .collect::<HashSet<_>>();422 let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();423 for removed in expected_shared_set.difference(&shared_set) {424 info!("generating secret: {removed}");425 let config_field = &config.config_unchecked_field;426 let config_field = nix_go!(config_field.configUnchecked);427 let secret = nix_go!(config_field.sharedSecrets[{ removed }]);428 let shared = generate_shared(config, removed, secret).await?;429 config.replace_shared(removed.to_string(), shared)430 }431 }432 let mut to_remove = Vec::new();433 for name in &config.list_shared() {434 info!("updating secret: {name}");435 let mut data = config.shared_secret(name)?;436 let config_field = &config.config_unchecked_field;437 let config_field = nix_go!(config_field.configUnchecked);438 let expected_owners: Vec<String> =439 nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);440 if expected_owners.is_empty() {441 warn!("secret was removed from fleet config: {name}, removing from data");442 to_remove.push(name.to_string());443 continue;444 }445 let set = data.owners.iter().collect::<HashSet<_>>();446 let expected_set = expected_owners.iter().collect::<HashSet<_>>();447 let should_remove = set.difference(&expected_set).next().is_some();448 if set == expected_set {449 info!("secret data is ok");450 continue;451 }452453 let secret = nix_go!(config_field.sharedSecrets[{ name }]);454 let owner_dependent: bool = nix_go_json!(secret.ownerDependent);455 let regenerate_on_remove: bool = nix_go_json!(secret.regenerateOnOwnerRemoved);456 #[allow(clippy::nonminimal_bool)]457 if !owner_dependent && !(should_remove && regenerate_on_remove) {458 warn!("reencrypting secret '{name}' for new owner set");459 // TODO: force regeneration460 if should_remove {461 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");462 }463464 let identity_holder = if !prefer_identities.is_empty() {465 prefer_identities466 .iter()467 .find(|i| data.owners.iter().any(|s| s == *i))468 } else {469 data.owners.first()470 };471 let Some(identity_holder) = identity_holder else {472 bail!("no available holder found");473 };474475 let target_recipients = futures::stream::iter(&expected_owners)476 .then(|m| async { config.key(m).await })477 .collect::<Vec<_>>()478 .await;479 let target_recipients =480 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;481482 if let Some(secret) = data.secret.secret {483 let host = config.host(identity_holder).await?;484 let encrypted = host.reencrypt(secret, target_recipients).await?;485486 data.secret.secret = Some(encrypted);487 }488 data.owners = expected_owners;489 config.replace_shared(name.to_owned(), data);490 } else {491 let shared = generate_shared(config, name, secret).await?;492 config.replace_shared(name.to_owned(), shared)493 }494 }495 for k in to_remove {496 config.remove_shared(&k);497 }498 }499 Secret::List {} => {500 let _span = info_span!("loading secrets").entered();501 let configured = config.list_configured_shared().await?;502 #[derive(Tabled)]503 struct SecretDisplay {504 #[tabled(rename = "Name")]505 name: String,506 #[tabled(rename = "Owners")]507 owners: String,508 }509 let mut table = vec![];510 for name in configured.iter().cloned() {511 let config = config.clone();512 let expected_owners = config.shared_secret_expected_owners(&name).await?;513 let data = config.shared_secret(&name)?;514 let owners = data515 .owners516 .iter()517 .map(|o| {518 if expected_owners.contains(o) {519 o.green().to_string()520 } else {521 o.red().to_string()522 }523 })524 .collect::<Vec<_>>();525 table.push(SecretDisplay {526 owners: owners.join(", "),527 name,528 })529 }530 info!("loaded\n{}", Table::new(table).to_string())531 }532 }533 Ok(())534 }535}cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
- collections::HashMap,
- ffi::OsStr,
- pin,
- process::Stdio,
- sync::{Arc, Mutex},
- task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
use futures::StreamExt;
use itertools::Either;
-use once_cell::sync::Lazy;
use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
use tokio::{io::AsyncRead, process::Command, select};
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
return self;
}
let mut out = Self::new("env");
- if let Some(session) = self.ssh_session {
- out = out.ssh_session(session);
- }
+ out.ssh_session = self.ssh_session;
for (k, v) in self.env {
assert!(!k.contains('='));
out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
out
} else {
let mut out = Self::new("sudo");
+ out.ssh_session = self.ssh_session.take();
out.args(self.into_args());
out
}
}
- pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
- self.ssh_session = Some(on);
- self
- }
- pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
- let mut out = Self::new("ssh");
- out.ssh_session = self.ssh_session.take();
- out.arg(on).arg("--");
- out.arg(self.into_string());
- out
- }
pub async fn run(self) -> Result<()> {
let str = self.clone().into_string();
@@ -276,259 +256,6 @@
let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
assert!(v.is_none());
Ok(())
-}
-
-pub trait Handler: Send {
- fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
- fn clone(&self) -> Self {
- Self(self.0.clone())
- }
-}
-impl<H> ClonableHandler<H> {
- pub fn new(inner: H) -> Self {
- Self(Arc::new(Mutex::new(inner)))
- }
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
- fn handle_line(&mut self, e: &str) {
- self.0.lock().unwrap().handle_line(e)
- }
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
- fn handle_line(&mut self, e: &str) {
- info!(target: "log", "{e}");
- }
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
- fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
- spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
- static OSC_CLEANER: Lazy<Regex> =
- Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
- static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
- let m = OSC_CLEANER.replace_all(m, "");
- // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
- // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
- DETABBER.replace_all(m.as_ref(), " ").to_string()
-}
-impl Handler for NixHandler {
- fn handle_line(&mut self, e: &str) {
- if let Some(e) = e.strip_prefix("@nix ") {
- let log: NixLog = match serde_json::from_str(e) {
- Ok(l) => l,
- Err(err) => {
- warn!("failed to parse nix log line {:?}: {}", e, err);
- return;
- }
- };
- match log {
- NixLog::Msg { msg, raw_msg, .. } => {
- #[allow(clippy::nonminimal_bool)]
- if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
- && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
- && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
- if let Some(raw_msg) = raw_msg {
- if !msg.is_empty() {
- info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
- } else {
- info!(target: "nix", "{}", raw_msg.trim_end())
- }
- } else {
- info!(target: "nix", "{}", msg.trim_end())
- }
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 105 && !fields.is_empty() => {
- if let [LogField::String(drv), ..] = &fields[..] {
- let mut drv = drv.as_str();
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- info!(target: "nix","building {}", drv);
- let span = info_span!("build", drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad build log: {:?}", log)
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 100 && fields.len() >= 3 => {
- if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
- &fields[..]
- {
- let mut drv = drv.as_str();
-
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- // info!(target: "nix","copying {} {} -> {}", drv, from, to);
- let span = info_span!("copy", from, to, drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad copy log: {:?}", log)
- }
- }
- NixLog::Start { text, typ, id, .. }
- if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
- {
- if !text.is_empty()
- && text != "querying info about missing paths"
- && text != "copying 0 paths"
- // Too much spam on lazy-trees branch
- && !(text.starts_with("copying '") && text.ends_with("' to the store"))
- {
- let span = info_span!("job");
- span.pb_start();
- span.pb_set_message(&process_message(text.trim()));
- self.spans.insert(id, span);
- info!(target: "nix", "{}", text);
- }
- }
- NixLog::Start {
- text,
- level: 0,
- typ: 108,
- ..
- } if text.is_empty() => {
- // Cache lookup? Coupled with copy log
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 109,
- ..
- } if text.starts_with("querying info about ") => {
- // Cache lookup
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 101,
- ..
- } if text.starts_with("downloading ") => {
- // NAR downloading, coupled with copy log
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- ..
- } if text.starts_with("waiting for a machine to build ") => {
- // Useless repeating notification about build
- }
- NixLog::Start {
- text,
- level: 3,
- typ: 111,
- ..
- } if text.starts_with("resolved derivation: ") => {
- // CA resolved
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- id,
- ..
- } if text.starts_with("waiting for lock on ") => {
- let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
- if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
- drv = txt;
- }
- if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
- drv = txt;
- }
- if let Some(txt) = drv.split("', '").next() {
- drv = txt;
- }
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- let span = info_span!("waiting on drv", drv);
- span.pb_start();
- self.spans.insert(id, span);
- // Concurrent build of the same message
- }
- NixLog::Stop { id, .. } => {
- self.spans.remove(&id);
- }
- NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
- if let Some(span) = self.spans.get(&id) {
- if let LogField::String(s) = &fields[0] {
- span.pb_set_message(&process_message(s.trim()));
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- warn!("unknown result id: {id} {typ} {fields:?}");
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
- if let Some(span) = self.spans.get(&id) {
- if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
- &fields[..4]
- {
- span.pb_set_length(*expected);
- span.pb_set_position(*done);
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- // warn!("unknown result id: {id} {typ} {fields:?}");
- // Unaccounted progress.
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
- // Set phase, expected
- }
- _ => warn!("unknown log: {:?}", log),
- };
- } else {
- let e = e.trim();
- if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
- return;
- }
- info!("{e}")
- }
- }
}
async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
) -> Result<Option<Vec<u8>>> {
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
+ debug!("running command {cmd:?} on local");
let mut child = cmd.spawn()?;
let mut stderr = child.stderr.take().unwrap();
let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
err_handler: &mut dyn Handler,
mut out_handler: Option<&mut dyn Handler>,
) -> Result<Option<Vec<u8>>> {
+ debug!("running command {cmd:?} over ssh");
cmd.stderr(openssh::Stdio::piped());
cmd.stdout(openssh::Stdio::piped());
let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
}
Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
- /// Return true to discard message from logging
- fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
- String(String),
- Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- struct StringOrNum;
- impl<'de> Visitor<'de> for StringOrNum {
- type Value = LogField;
-
- fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "string or unsigned")
- }
-
- fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::String(v.to_owned()))
- }
-
- fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::Num(v))
- }
- }
-
- deserializer.deserialize_any(StringOrNum)
- }
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
- Msg {
- level: u32,
- msg: String,
- raw_msg: Option<String>,
- },
- Start {
- id: u64,
- level: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- text: String,
- #[serde(rename = "type")]
- typ: u32,
- },
- Stop {
- id: u64,
- },
- Result {
- id: u64,
- #[serde(rename = "type")]
- typ: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- },
}
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
sync::{Arc, Mutex, MutexGuard, OnceLock},
};
-use age::Recipient;
use anyhow::{anyhow, bail, Context, Result};
use clap::{ArgGroup, Parser};
use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
pub struct ConfigHost {
pub name: String,
+ pub local: bool,
pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ assert!(!self.local, "do not open ssh connection to local session");
// FIXME: TOCTOU
if let Some(session) = &self.session.get() {
return Ok((*session).clone());
@@ -96,8 +97,12 @@
D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
}
pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
- let session = self.open_session().await?;
- Ok(MyCommand::new_on(cmd, session))
+ if self.local {
+ Ok(MyCommand::new(cmd))
+ } else {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
}
pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
.context("failed to call remote host for decrypt")?;
z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
}
+ pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+ for target in targets {
+ cmd.eqarg("--targets", target);
+ }
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
/// Returns path for futureproofing, as path might change i.e on conversion to CA
pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ if self.local {
+ // Path is located locally, thus already trusted.
+ return Ok(path.to_owned());
+ }
let mut nix = MyCommand::new("nix");
nix.arg("copy")
.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
nix.run_nix().await?;
Ok(path.to_owned())
}
+ pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("stop").arg(name);
+ cmd.sudo().run().await
+ }
+ pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("start").arg(name);
+ cmd.sudo().run().await
+ }
+
+ pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+ let mut cmd = self.cmd("rm").await?;
+ cmd.arg("-f").arg(path);
+ if sudo {
+ cmd = cmd.sudo()
+ }
+ cmd.run().await
+ }
}
impl Config {
@@ -134,35 +175,12 @@
}
pub fn is_local(&self, host: &str) -> bool {
self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
- }
-
- pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run().await
- }
- pub async fn run_string_on(
- &self,
- host: &str,
- mut command: MyCommand,
- sudo: bool,
- ) -> Result<String> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run_string().await
}
pub async fn host(&self, name: &str) -> Result<ConfigHost> {
Ok(ConfigHost {
name: name.to_owned(),
+ local: self.is_local(name),
session: OnceLock::new(),
})
}
@@ -172,6 +190,7 @@
let mut out = vec![];
for name in names {
out.push(ConfigHost {
+ local: self.is_local(&name),
name,
session: OnceLock::new(),
})
@@ -225,27 +244,6 @@
let mut data = self.data_mut();
let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
host_secrets.insert(secret, value);
- }
-
- pub async fn reencrypt_on_host(
- &self,
- host: &str,
- data: SecretData,
- targets: Vec<String>,
- ) -> Result<SecretData> {
- let mut recmd = MyCommand::new("fleet-install-secrets");
- recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
- for target in targets {
- recmd.eqarg("--targets", target);
- }
- recmd = recmd.sudo().ssh(host);
- let encoded = recmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- SecretData::decode_z85(&encoded)
}
pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}