difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
use std::sync::{Arc, OnceLock};
use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
use futures::StreamExt;
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::codec::{FramedRead, LinesCodec};
use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
@@ -30,6 +32,8 @@
string_wrapping: (String, String),
number_wrapping: (String, String),
+ executing_command: Arc<Mutex<()>>,
+
next_id: u32,
free_list: Vec<u32>,
}
@@ -219,6 +223,8 @@
string_wrapping: Default::default(),
number_wrapping: Default::default(),
+ executing_command: Arc::new(Mutex::new(())),
+
next_id: 0,
free_list: vec![],
};
@@ -331,6 +337,10 @@
expr: impl AsRef<[u8]>,
err_handler: &mut dyn Handler,
) -> Result<String> {
+ // Prevent two commands from being executed in parallel, messing with each other.
+ let _lock = self.executing_command.clone();
+ let _guard = _lock.lock().await;
+
self.send_command(expr).await?;
// It will be echoed
self.send_command(REPL_DELIMITER).await?;
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -112,12 +112,12 @@
current: bool,
datetime: String,
}
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
- let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.arg("--list-generations");
// Sudo is required due to --list-generations acquiring lock on the profile.
- let data = config.run_string_on(host, cmd, true).await?;
+ let data = cmd.sudo().run_string().await?;
let generations = data
.split('\n')
.map(|e| e.trim())
@@ -161,25 +161,12 @@
.map_err(|_e| anyhow!("bad list-generations output"))?
.ok_or_else(|| anyhow!("failed to find generation"))?;
Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("stop").arg(unit);
- config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("start").arg(unit);
- config.run_on(host, cmd, true).await
}
async fn execute_upload(
build: &BuildSystems,
- config: &Config,
action: UploadAction,
- host: &str,
+ host: &ConfigHost,
built: PathBuf,
) -> Result<()> {
let mut failed = false;
@@ -191,15 +178,15 @@
if !build.disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
- let generation = get_current_generation(config, host).await?;
+ let generation = get_current_generation(&host).await?;
info!(
"rollback target would be {} {}",
generation.id, generation.datetime
);
{
- let mut cmd = MyCommand::new("sh");
+ let mut cmd = host.cmd("sh").await?;
cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to set rollback marker: {e}");
failed = true;
}
@@ -215,37 +202,41 @@
// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
// Anyway, reboot will still help in this case.
if action.should_schedule_rollback_run() {
- let mut cmd = MyCommand::new("systemd-run");
+ let mut cmd = host.cmd("systemd-run").await?;
cmd.comparg("--on-active", "3min")
.comparg("--unit", "rollback-watchdog-run")
.arg("systemctl")
.arg("start")
.arg("rollback-watchdog.service");
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to schedule rollback run: {e}");
failed = true;
}
}
}
+
if action.should_switch_profile() && !failed {
info!("switching generation");
- let mut cmd = MyCommand::new("nix-env");
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to switch generation: {e}");
failed = true;
}
}
+
+ // FIXME: Connection might be disconnected after activation run
+
if action.should_activate() && !failed {
let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
- let mut cmd = MyCommand::new(switch_script);
+ let mut cmd = host.cmd(switch_script).await?;
cmd.arg(action.name());
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
@@ -253,7 +244,8 @@
if !build.disable_rollback {
if failed {
info!("executing rollback");
- if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+ if let Err(e) = host
+ .systemctl_start("rollback-watchdog.service")
.instrument(info_span!("rollback"))
.await
{
@@ -261,27 +253,29 @@
}
} else {
info!("trying to mark upgrade as successful");
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
}
}
info!("disarming watchdog, just in case");
- if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+ if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
// It is ok, if there was no reboot - then timer might not be running.
}
if action.should_schedule_rollback_run() {
- if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+ if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
error!("failed to disarm rollback run: {e}");
}
}
- } else {
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
- // Marker might not exist, yet better try to remove it.
- }
+ } else if let Err(_e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
+ // Marker might not exist, yet better try to remove it.
}
Ok(())
}
@@ -289,12 +283,13 @@
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
+ let host = config.host(&host).await?;
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
let drv = nix_go!(
fleet_field.buildSystems(Obj {
localSystem: { config.local_system.clone() }
- })[{ action.build_attr() }][{ host }]
+ })[{ action.build_attr() }][{ &host.name }]
);
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
match action {
Action::Upload { action } => {
- if !config.is_local(&host) {
+ if !config.is_local(&host.name) {
info!("uploading system closure");
{
+ // TODO: Move to remote_derivation method.
// Alternatively, nix store make-content-addressed can be used,
// at least for the first deployment, to provide trusted store key.
//
@@ -329,13 +325,11 @@
}
let mut tries = 0;
loop {
- let mut nix = MyCommand::new("nix");
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{host}"))
- .arg(out_output);
- match nix.run_nix().await {
- Ok(()) => break,
+ match host.remote_derivation(out_output).await {
+ Ok(remote) => {
+ assert!(&remote == out_output, "CA derivations aren't implemented");
+ break;
+ }
Err(e) if tries < 3 => {
tries += 1;
warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, out_output.clone()).await?
+ execute_upload(&self, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host));
+ out.push(format!("sd-image-{}", host.name));
info!("linking sd image to {:?}", out);
symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host));
+ out.push(format!("installation-cd-{}", host.name));
info!("linking iso image to {:?}", out);
symlink(out_output, out)?;
@@ -379,6 +373,17 @@
let this = this.clone();
let span = info_span!("deployment", host = field::display(&host.name));
let hostname = host.name;
+ // FIXME: Since the introduction of better-nix-eval,
+ // due to single repl used for builds, hosts are waiting for each other to build,
+ // instead of building concurrently.
+ //
+ // Open multiple repls?
+ //
+ // Create build batcher, which will behave similar to golangs
+ // WaitGroup, and start executing once all the build tasks are scheduled?
+ // This also allows to cleanup build output, as there will be no longer
+ // "waiting for remote machine" messages in the cases when one package is needed for
+ // multiple hosts.
set.spawn_local(
(async move {
match this.build_task(config, hostname).await {
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
@@ -404,9 +404,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(data) = secret.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, data, target_recipients)
- .await?;
+ let host = config.host(&identity_holder).await?;
+ let encrypted = host.reencrypt(data, target_recipients).await?;
secret.secret.secret = Some(encrypted);
}
@@ -481,9 +480,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(secret) = data.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret, target_recipients)
- .await?;
+ let host = config.host(identity_holder).await?;
+ let encrypted = host.reencrypt(secret, target_recipients).await?;
data.secret.secret = Some(encrypted);
}
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
- collections::HashMap,
- ffi::OsStr,
- pin,
- process::Stdio,
- sync::{Arc, Mutex},
- task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
use futures::StreamExt;
use itertools::Either;
-use once_cell::sync::Lazy;
use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
use tokio::{io::AsyncRead, process::Command, select};
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
return self;
}
let mut out = Self::new("env");
- if let Some(session) = self.ssh_session {
- out = out.ssh_session(session);
- }
+ out.ssh_session = self.ssh_session;
for (k, v) in self.env {
assert!(!k.contains('='));
out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
out
} else {
let mut out = Self::new("sudo");
+ out.ssh_session = self.ssh_session.take();
out.args(self.into_args());
out
}
}
- pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
- self.ssh_session = Some(on);
- self
- }
- pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
- let mut out = Self::new("ssh");
- out.ssh_session = self.ssh_session.take();
- out.arg(on).arg("--");
- out.arg(self.into_string());
- out
- }
pub async fn run(self) -> Result<()> {
let str = self.clone().into_string();
@@ -276,259 +256,6 @@
let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
assert!(v.is_none());
Ok(())
-}
-
-pub trait Handler: Send {
- fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
- fn clone(&self) -> Self {
- Self(self.0.clone())
- }
-}
-impl<H> ClonableHandler<H> {
- pub fn new(inner: H) -> Self {
- Self(Arc::new(Mutex::new(inner)))
- }
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
- fn handle_line(&mut self, e: &str) {
- self.0.lock().unwrap().handle_line(e)
- }
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
- fn handle_line(&mut self, e: &str) {
- info!(target: "log", "{e}");
- }
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
- fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
- spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
- static OSC_CLEANER: Lazy<Regex> =
- Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
- static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
- let m = OSC_CLEANER.replace_all(m, "");
- // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
- // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
- DETABBER.replace_all(m.as_ref(), " ").to_string()
-}
-impl Handler for NixHandler {
- fn handle_line(&mut self, e: &str) {
- if let Some(e) = e.strip_prefix("@nix ") {
- let log: NixLog = match serde_json::from_str(e) {
- Ok(l) => l,
- Err(err) => {
- warn!("failed to parse nix log line {:?}: {}", e, err);
- return;
- }
- };
- match log {
- NixLog::Msg { msg, raw_msg, .. } => {
- #[allow(clippy::nonminimal_bool)]
- if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
- && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
- && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
- if let Some(raw_msg) = raw_msg {
- if !msg.is_empty() {
- info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
- } else {
- info!(target: "nix", "{}", raw_msg.trim_end())
- }
- } else {
- info!(target: "nix", "{}", msg.trim_end())
- }
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 105 && !fields.is_empty() => {
- if let [LogField::String(drv), ..] = &fields[..] {
- let mut drv = drv.as_str();
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- info!(target: "nix","building {}", drv);
- let span = info_span!("build", drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad build log: {:?}", log)
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 100 && fields.len() >= 3 => {
- if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
- &fields[..]
- {
- let mut drv = drv.as_str();
-
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- // info!(target: "nix","copying {} {} -> {}", drv, from, to);
- let span = info_span!("copy", from, to, drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad copy log: {:?}", log)
- }
- }
- NixLog::Start { text, typ, id, .. }
- if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
- {
- if !text.is_empty()
- && text != "querying info about missing paths"
- && text != "copying 0 paths"
- // Too much spam on lazy-trees branch
- && !(text.starts_with("copying '") && text.ends_with("' to the store"))
- {
- let span = info_span!("job");
- span.pb_start();
- span.pb_set_message(&process_message(text.trim()));
- self.spans.insert(id, span);
- info!(target: "nix", "{}", text);
- }
- }
- NixLog::Start {
- text,
- level: 0,
- typ: 108,
- ..
- } if text.is_empty() => {
- // Cache lookup? Coupled with copy log
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 109,
- ..
- } if text.starts_with("querying info about ") => {
- // Cache lookup
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 101,
- ..
- } if text.starts_with("downloading ") => {
- // NAR downloading, coupled with copy log
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- ..
- } if text.starts_with("waiting for a machine to build ") => {
- // Useless repeating notification about build
- }
- NixLog::Start {
- text,
- level: 3,
- typ: 111,
- ..
- } if text.starts_with("resolved derivation: ") => {
- // CA resolved
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- id,
- ..
- } if text.starts_with("waiting for lock on ") => {
- let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
- if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
- drv = txt;
- }
- if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
- drv = txt;
- }
- if let Some(txt) = drv.split("', '").next() {
- drv = txt;
- }
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- let span = info_span!("waiting on drv", drv);
- span.pb_start();
- self.spans.insert(id, span);
- // Concurrent build of the same message
- }
- NixLog::Stop { id, .. } => {
- self.spans.remove(&id);
- }
- NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
- if let Some(span) = self.spans.get(&id) {
- if let LogField::String(s) = &fields[0] {
- span.pb_set_message(&process_message(s.trim()));
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- warn!("unknown result id: {id} {typ} {fields:?}");
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
- if let Some(span) = self.spans.get(&id) {
- if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
- &fields[..4]
- {
- span.pb_set_length(*expected);
- span.pb_set_position(*done);
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- // warn!("unknown result id: {id} {typ} {fields:?}");
- // Unaccounted progress.
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
- // Set phase, expected
- }
- _ => warn!("unknown log: {:?}", log),
- };
- } else {
- let e = e.trim();
- if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
- return;
- }
- info!("{e}")
- }
- }
}
async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
) -> Result<Option<Vec<u8>>> {
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
+ debug!("running command {cmd:?} on local");
let mut child = cmd.spawn()?;
let mut stderr = child.stderr.take().unwrap();
let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
err_handler: &mut dyn Handler,
mut out_handler: Option<&mut dyn Handler>,
) -> Result<Option<Vec<u8>>> {
+ debug!("running command {cmd:?} over ssh");
cmd.stderr(openssh::Stdio::piped());
cmd.stdout(openssh::Stdio::piped());
let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
}
Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
- /// Return true to discard message from logging
- fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
- String(String),
- Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- struct StringOrNum;
- impl<'de> Visitor<'de> for StringOrNum {
- type Value = LogField;
-
- fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "string or unsigned")
- }
-
- fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::String(v.to_owned()))
- }
-
- fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::Num(v))
- }
- }
-
- deserializer.deserialize_any(StringOrNum)
- }
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
- Msg {
- level: u32,
- msg: String,
- raw_msg: Option<String>,
- },
- Start {
- id: u64,
- level: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- text: String,
- #[serde(rename = "type")]
- typ: u32,
- },
- Stop {
- id: u64,
- },
- Result {
- id: u64,
- #[serde(rename = "type")]
- typ: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- },
}
cmds/fleet/src/host.rsdiffbeforeafterboth1use std::{2 env::current_dir,3 ffi::{OsStr, OsString},4 fmt::Display,5 io::Write,6 ops::Deref,7 path::PathBuf,8 str::FromStr,9 sync::{Arc, Mutex, MutexGuard, OnceLock},10};1112use age::Recipient;13use anyhow::{anyhow, bail, Context, Result};14use clap::{ArgGroup, Parser};15use openssh::SessionBuilder;16use serde::de::DeserializeOwned;17use tempfile::NamedTempFile;1819use crate::{20 better_nix_eval::{Field, NixSessionPool},21 command::MyCommand,22 fleetdata::{FleetData, FleetSecret, FleetSharedSecret, SecretData},23 nix_go, nix_go_json,24};2526pub struct FleetConfigInternals {27 pub local_system: String,28 pub directory: PathBuf,29 pub opts: FleetOpts,30 pub data: Mutex<FleetData>,31 pub nix_args: Vec<OsString>,32 /// fleetConfigurations.<name>.<localSystem>33 pub fleet_field: Field,34 /// fleet_config.configUnchecked35 pub config_field: Field,36 /// fleet_config.unchecked37 pub config_unchecked_field: Field,38}3940#[derive(Clone)]41pub struct Config(Arc<FleetConfigInternals>);4243impl Deref for Config {44 type Target = FleetConfigInternals;4546 fn deref(&self) -> &Self::Target {47 &self.048 }49}5051pub struct ConfigHost {52 pub name: String,53 pub session: OnceLock<Arc<openssh::Session>>,54}55impl ConfigHost {56 pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {57 // FIXME: TOCTOU58 if let Some(session) = &self.session.get() {59 return Ok((*session).clone());60 };61 let session = SessionBuilder::default();6263 let session = session64 .connect(&self.name)65 .await66 .map_err(|e| anyhow!("ssh error: {e}"))?;67 let session = Arc::new(session);68 self.session.set(session.clone()).expect("TOCTOU happened");69 Ok(session)70 }71 pub async fn mktemp_dir(&self) -> Result<String> {72 let mut cmd = self.cmd("mktemp").await?;73 cmd.arg("-d");74 let path = cmd.run_string().await?;75 Ok(path.trim_end().to_owned())76 }77 pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {78 let mut cmd = self.cmd("cat").await?;79 cmd.arg(path);80 cmd.run_bytes().await81 }82 pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {83 let mut cmd = self.cmd("cat").await?;84 cmd.arg(path);85 cmd.run_string().await86 }87 pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {88 let text = self.read_file_text(path).await?;89 Ok(serde_json::from_str(&text)?)90 }91 pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>92 where93 <D as FromStr>::Err: Display,94 {95 let text = self.read_file_text(path).await?;96 D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))97 }98 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {99 let session = self.open_session().await?;100 Ok(MyCommand::new_on(cmd, session))101 }102103 pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {104 let mut cmd = self.cmd("fleet-install-secrets").await?;105 cmd.arg("decrypt").eqarg("--secret", data.encode_z85());106 let encoded = cmd107 .sudo()108 .run_string()109 .await110 .context("failed to call remote host for decrypt")?;111 z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")112 }113 /// Returns path for futureproofing, as path might change i.e on conversion to CA114 pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {115 let mut nix = MyCommand::new("nix");116 nix.arg("copy")117 .arg("--substitute-on-destination")118 .comparg("--to", format!("ssh-ng://{}", self.name))119 .arg(path);120 nix.run_nix().await?;121 Ok(path.to_owned())122 }123}124125impl Config {126 pub fn should_skip(&self, host: &str) -> bool {127 if !self.opts.skip.is_empty() {128 self.opts.skip.iter().any(|h| h as &str == host)129 } else if !self.opts.only.is_empty() {130 !self.opts.only.iter().any(|h| h as &str == host)131 } else {132 false133 }134 }135 pub fn is_local(&self, host: &str) -> bool {136 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)137 }138139 pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {140 if sudo {141 command = command.sudo();142 }143 if !self.is_local(host) {144 command = command.ssh(host);145 }146 command.run().await147 }148 pub async fn run_string_on(149 &self,150 host: &str,151 mut command: MyCommand,152 sudo: bool,153 ) -> Result<String> {154 if sudo {155 command = command.sudo();156 }157 if !self.is_local(host) {158 command = command.ssh(host);159 }160 command.run_string().await161 }162163 pub async fn host(&self, name: &str) -> Result<ConfigHost> {164 Ok(ConfigHost {165 name: name.to_owned(),166 session: OnceLock::new(),167 })168 }169 pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {170 let fleet_field = &self.fleet_field;171 let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;172 let mut out = vec![];173 for name in names {174 out.push(ConfigHost {175 name,176 session: OnceLock::new(),177 })178 }179 Ok(out)180 }181 pub async fn system_config(&self, host: &str) -> Result<Field> {182 let fleet_field = &self.fleet_field;183 Ok(nix_go!(fleet_field.configuredSystems[{ host }].config))184 }185186 pub(super) fn data(&self) -> MutexGuard<FleetData> {187 self.data.lock().unwrap()188 }189 pub(super) fn data_mut(&self) -> MutexGuard<FleetData> {190 self.data.lock().unwrap()191 }192 /// Shared secrets configured in fleet.nix or in flake193 pub async fn list_configured_shared(&self) -> Result<Vec<String>> {194 let config_field = &self.config_unchecked_field;195 nix_go!(config_field.configUnchecked.sharedSecrets)196 .list_fields()197 .await198 }199 /// Shared secrets configured in fleet.nix200 pub fn list_shared(&self) -> Vec<String> {201 let data = self.data();202 data.shared_secrets.keys().cloned().collect()203 }204 pub fn has_shared(&self, name: &str) -> bool {205 let data = self.data();206 data.shared_secrets.contains_key(name)207 }208 pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {209 let mut data = self.data_mut();210 data.shared_secrets.insert(name.to_owned(), shared);211 }212 pub fn remove_shared(&self, secret: &str) {213 let mut data = self.data_mut();214 data.shared_secrets.remove(secret);215 }216217 pub fn has_secret(&self, host: &str, secret: &str) -> bool {218 let data = self.data();219 let Some(host_secrets) = data.host_secrets.get(host) else {220 return false;221 };222 host_secrets.contains_key(secret)223 }224 pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {225 let mut data = self.data_mut();226 let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();227 host_secrets.insert(secret, value);228 }229230 pub async fn reencrypt_on_host(231 &self,232 host: &str,233 data: SecretData,234 targets: Vec<String>,235 ) -> Result<SecretData> {236 let mut recmd = MyCommand::new("fleet-install-secrets");237 recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());238 for target in targets {239 recmd.eqarg("--targets", target);240 }241 recmd = recmd.sudo().ssh(host);242 let encoded = recmd243 .run_string()244 .await245 .context("failed to call remote host for decrypt")?246 .trim()247 .to_owned();248 SecretData::decode_z85(&encoded)249 }250251 pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {252 let data = self.data();253 let Some(host_secrets) = data.host_secrets.get(host) else {254 bail!("no secrets for machine {host}");255 };256 let Some(secret) = host_secrets.get(secret) else {257 bail!("machine {host} has no secret {secret}");258 };259 Ok(secret.clone())260 }261 pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {262 let data = self.data();263 let Some(secret) = data.shared_secrets.get(secret) else {264 bail!("no shared secret {secret}");265 };266 Ok(secret.clone())267 }268 pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {269 let config_field = &self.config_unchecked_field;270 Ok(nix_go_json!(271 config_field.configUnchecked.sharedSecrets[{ secret }].expectedOwners272 ))273 }274275 pub fn save(&self) -> Result<()> {276 let mut tempfile = NamedTempFile::new_in(self.directory.clone())?;277 let data = nixlike::serialize(&self.data() as &FleetData)?;278 tempfile.write_all(279 format!(280 "# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",281 data282 )283 .as_bytes(),284 )?;285 let mut fleet_data_path = self.directory.clone();286 fleet_data_path.push("fleet.nix");287 tempfile.persist(fleet_data_path)?;288 Ok(())289 }290}291292#[derive(Parser, Clone)]293#[clap(group = ArgGroup::new("target_hosts"))]294pub struct FleetOpts {295 /// All hosts except those would be skipped296 #[clap(long, number_of_values = 1, group = "target_hosts")]297 only: Vec<String>,298299 /// Hosts to skip300 #[clap(long, number_of_values = 1, group = "target_hosts")]301 skip: Vec<String>,302303 /// Host, which should be threaten as current machine304 #[clap(long)]305 pub localhost: Option<String>,306307 /// Override detected system for host, to perform builds via308 /// binfmt-declared qemu instead of trying to crosscompile309 #[clap(long, default_value = "detect")]310 pub local_system: String,311}312313impl FleetOpts {314 pub async fn build(mut self, nix_args: Vec<OsString>) -> Result<Config> {315 if self.localhost.is_none() {316 self.localhost317 .replace(hostname::get().unwrap().to_str().unwrap().to_owned());318 }319 let directory = current_dir()?;320321 let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;322 let root_field = pool.get().await?;323324 if self.local_system == "detect" {325 let builtins_field = Field::field(root_field.clone(), "builtins").await?;326 self.local_system = nix_go_json!(builtins_field.currentSystem);327 }328 let local_system = self.local_system.clone();329330 let fleet_root = Field::field(root_field, "fleetConfigurations").await?;331332 let fleet_field = nix_go!(fleet_root.default);333 let config_field = nix_go!(fleet_field.configUnchecked);334 let config_unchecked_field = nix_go!(fleet_field.unchecked);335336 let mut fleet_data_path = directory.clone();337 fleet_data_path.push("fleet.nix");338 let bytes = std::fs::read_to_string(fleet_data_path)?;339 let data = nixlike::parse_str(&bytes)?;340341 Ok(Config(Arc::new(FleetConfigInternals {342 opts: self,343 directory,344 data,345 local_system,346 nix_args,347 fleet_field,348 config_field,349 config_unchecked_field,350 })))351 }352}cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}