difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
use std::sync::{Arc, OnceLock};
use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
use futures::StreamExt;
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::codec::{FramedRead, LinesCodec};
use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
@@ -30,6 +32,8 @@
string_wrapping: (String, String),
number_wrapping: (String, String),
+ executing_command: Arc<Mutex<()>>,
+
next_id: u32,
free_list: Vec<u32>,
}
@@ -219,6 +223,8 @@
string_wrapping: Default::default(),
number_wrapping: Default::default(),
+ executing_command: Arc::new(Mutex::new(())),
+
next_id: 0,
free_list: vec![],
};
@@ -331,6 +337,10 @@
expr: impl AsRef<[u8]>,
err_handler: &mut dyn Handler,
) -> Result<String> {
+ // Prevent two commands from being executed in parallel, messing with each other.
+ let _lock = self.executing_command.clone();
+ let _guard = _lock.lock().await;
+
self.send_command(expr).await?;
// It will be echoed
self.send_command(REPL_DELIMITER).await?;
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -112,12 +112,12 @@
current: bool,
datetime: String,
}
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
- let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.arg("--list-generations");
// Sudo is required due to --list-generations acquiring lock on the profile.
- let data = config.run_string_on(host, cmd, true).await?;
+ let data = cmd.sudo().run_string().await?;
let generations = data
.split('\n')
.map(|e| e.trim())
@@ -161,25 +161,12 @@
.map_err(|_e| anyhow!("bad list-generations output"))?
.ok_or_else(|| anyhow!("failed to find generation"))?;
Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("stop").arg(unit);
- config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("start").arg(unit);
- config.run_on(host, cmd, true).await
}
async fn execute_upload(
build: &BuildSystems,
- config: &Config,
action: UploadAction,
- host: &str,
+ host: &ConfigHost,
built: PathBuf,
) -> Result<()> {
let mut failed = false;
@@ -191,15 +178,15 @@
if !build.disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
- let generation = get_current_generation(config, host).await?;
+ let generation = get_current_generation(&host).await?;
info!(
"rollback target would be {} {}",
generation.id, generation.datetime
);
{
- let mut cmd = MyCommand::new("sh");
+ let mut cmd = host.cmd("sh").await?;
cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to set rollback marker: {e}");
failed = true;
}
@@ -215,37 +202,41 @@
// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
// Anyway, reboot will still help in this case.
if action.should_schedule_rollback_run() {
- let mut cmd = MyCommand::new("systemd-run");
+ let mut cmd = host.cmd("systemd-run").await?;
cmd.comparg("--on-active", "3min")
.comparg("--unit", "rollback-watchdog-run")
.arg("systemctl")
.arg("start")
.arg("rollback-watchdog.service");
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to schedule rollback run: {e}");
failed = true;
}
}
}
+
if action.should_switch_profile() && !failed {
info!("switching generation");
- let mut cmd = MyCommand::new("nix-env");
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to switch generation: {e}");
failed = true;
}
}
+
+ // FIXME: Connection might be disconnected after activation run
+
if action.should_activate() && !failed {
let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
- let mut cmd = MyCommand::new(switch_script);
+ let mut cmd = host.cmd(switch_script).await?;
cmd.arg(action.name());
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
@@ -253,7 +244,8 @@
if !build.disable_rollback {
if failed {
info!("executing rollback");
- if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+ if let Err(e) = host
+ .systemctl_start("rollback-watchdog.service")
.instrument(info_span!("rollback"))
.await
{
@@ -261,27 +253,29 @@
}
} else {
info!("trying to mark upgrade as successful");
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
}
}
info!("disarming watchdog, just in case");
- if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+ if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
// It is ok, if there was no reboot - then timer might not be running.
}
if action.should_schedule_rollback_run() {
- if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+ if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
error!("failed to disarm rollback run: {e}");
}
}
- } else {
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
- // Marker might not exist, yet better try to remove it.
- }
+ } else if let Err(_e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
+ // Marker might not exist, yet better try to remove it.
}
Ok(())
}
@@ -289,12 +283,13 @@
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
+ let host = config.host(&host).await?;
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
let drv = nix_go!(
fleet_field.buildSystems(Obj {
localSystem: { config.local_system.clone() }
- })[{ action.build_attr() }][{ host }]
+ })[{ action.build_attr() }][{ &host.name }]
);
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
match action {
Action::Upload { action } => {
- if !config.is_local(&host) {
+ if !config.is_local(&host.name) {
info!("uploading system closure");
{
+ // TODO: Move to remote_derivation method.
// Alternatively, nix store make-content-addressed can be used,
// at least for the first deployment, to provide trusted store key.
//
@@ -329,13 +325,11 @@
}
let mut tries = 0;
loop {
- let mut nix = MyCommand::new("nix");
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{host}"))
- .arg(out_output);
- match nix.run_nix().await {
- Ok(()) => break,
+ match host.remote_derivation(out_output).await {
+ Ok(remote) => {
+ assert!(&remote == out_output, "CA derivations aren't implemented");
+ break;
+ }
Err(e) if tries < 3 => {
tries += 1;
warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, out_output.clone()).await?
+ execute_upload(&self, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host));
+ out.push(format!("sd-image-{}", host.name));
info!("linking sd image to {:?}", out);
symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host));
+ out.push(format!("installation-cd-{}", host.name));
info!("linking iso image to {:?}", out);
symlink(out_output, out)?;
@@ -379,6 +373,17 @@
let this = this.clone();
let span = info_span!("deployment", host = field::display(&host.name));
let hostname = host.name;
+ // FIXME: Since the introduction of better-nix-eval,
+ // due to single repl used for builds, hosts are waiting for each other to build,
+ // instead of building concurrently.
+ //
+ // Open multiple repls?
+ //
+ // Create build batcher, which will behave similar to golangs
+ // WaitGroup, and start executing once all the build tasks are scheduled?
+ // This also allows to cleanup build output, as there will be no longer
+ // "waiting for remote machine" messages in the cases when one package is needed for
+ // multiple hosts.
set.spawn_local(
(async move {
match this.build_task(config, hostname).await {
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
@@ -404,9 +404,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(data) = secret.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, data, target_recipients)
- .await?;
+ let host = config.host(&identity_holder).await?;
+ let encrypted = host.reencrypt(data, target_recipients).await?;
secret.secret.secret = Some(encrypted);
}
@@ -481,9 +480,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(secret) = data.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret, target_recipients)
- .await?;
+ let host = config.host(identity_holder).await?;
+ let encrypted = host.reencrypt(secret, target_recipients).await?;
data.secret.secret = Some(encrypted);
}
cmds/fleet/src/command.rsdiffbeforeafterboth1use std::{1use std::thread::sleep;2 collections::HashMap,2use std::time::Duration;3 ffi::OsStr,3use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};4 pin,5 process::Stdio,6 sync::{Arc, Mutex},7 task::Poll,8};9410use anyhow::{anyhow, Result};5use anyhow::{anyhow, Result};6use better_command::{Handler, NixHandler, PlainHandler};11use futures::StreamExt;7use futures::StreamExt;12use itertools::Either;8use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, OwningCommand, Session};9use openssh::{OverSsh, OwningCommand, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};10use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};12use tracing::{info, debug};20use tracing_indicatif::span_ext::IndicatifSpanExt;211322fn escape_bash(input: &str, out: &mut String) {14fn escape_bash(input: &str, out: &mut String) {23 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";87 return self;79 return self;88 }80 }89 let mut out = Self::new("env");81 let mut out = Self::new("env");90 if let Some(session) = self.ssh_session {82 out.ssh_session = self.ssh_session;91 out = out.ssh_session(session);92 }93 for (k, v) in self.env {83 for (k, v) in self.env {94 assert!(!k.contains('='));84 assert!(!k.contains('='));95 out.arg(format!("{k}={v}"));85 out.arg(format!("{k}={v}"));179 out169 out180 } else {170 } else {181 let mut out = Self::new("sudo");171 let mut out = Self::new("sudo");172 out.ssh_session = self.ssh_session.take();182 out.args(self.into_args());173 out.args(self.into_args());183 out174 out184 }175 }185 }176 }186 pub fn ssh_session(mut self, on: Arc<Session>) -> Self {187 self.ssh_session = Some(on);188 self189 }190 pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {191 let mut out = Self::new("ssh");192 out.ssh_session = self.ssh_session.take();193 out.arg(on).arg("--");194 out.arg(self.into_string());195 out196 }197177198 pub async fn run(self) -> Result<()> {178 pub async fn run(self) -> Result<()> {199 let str = self.clone().into_string();179 let str = self.clone().into_string();278 Ok(())258 Ok(())279}259}280260281pub trait Handler: Send {282 fn handle_line(&mut self, e: &str);283}284285pub struct ClonableHandler<H>(Arc<Mutex<H>>);286impl<H> Clone for ClonableHandler<H> {287 fn clone(&self) -> Self {288 Self(self.0.clone())289 }290}291impl<H> ClonableHandler<H> {292 pub fn new(inner: H) -> Self {293 Self(Arc::new(Mutex::new(inner)))294 }295}296impl<H: Handler> Handler for ClonableHandler<H> {297 fn handle_line(&mut self, e: &str) {298 self.0.lock().unwrap().handle_line(e)299 }300}301302struct PlainHandler;303impl Handler for PlainHandler {304 fn handle_line(&mut self, e: &str) {305 info!(target: "log", "{e}");306 }307}308309pub struct NoopHandler;310impl Handler for NoopHandler {311 fn handle_line(&mut self, _e: &str) {}312}313314#[derive(Default)]315pub struct NixHandler {316 spans: HashMap<u64, Span>,317}318fn process_message(m: &str) -> String {319 static OSC_CLEANER: Lazy<Regex> =320 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());321 static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());322 let m = OSC_CLEANER.replace_all(m, "");323 // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,324 // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.325 DETABBER.replace_all(m.as_ref(), " ").to_string()326}327impl Handler for NixHandler {328 fn handle_line(&mut self, e: &str) {329 if let Some(e) = e.strip_prefix("@nix ") {330 let log: NixLog = match serde_json::from_str(e) {331 Ok(l) => l,332 Err(err) => {333 warn!("failed to parse nix log line {:?}: {}", e, err);334 return;335 }336 };337 match log {338 NixLog::Msg { msg, raw_msg, .. } => {339 #[allow(clippy::nonminimal_bool)]340 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))341 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")342 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {343 if let Some(raw_msg) = raw_msg {344 if !msg.is_empty() {345 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())346 } else {347 info!(target: "nix", "{}", raw_msg.trim_end())348 }349 } else {350 info!(target: "nix", "{}", msg.trim_end())351 }352 }353 }354 NixLog::Start {355 ref fields,356 typ,357 id,358 ..359 } if typ == 105 && !fields.is_empty() => {360 if let [LogField::String(drv), ..] = &fields[..] {361 let mut drv = drv.as_str();362 if let Some(pkg) = drv.strip_prefix("/nix/store/") {363 let mut it = pkg.splitn(2, '-');364 it.next();365 if let Some(pkg) = it.next() {366 drv = pkg;367 }368 }369 info!(target: "nix","building {}", drv);370 let span = info_span!("build", drv);371 span.pb_start();372 self.spans.insert(id, span);373 } else {374 warn!("bad build log: {:?}", log)375 }376 }377 NixLog::Start {378 ref fields,379 typ,380 id,381 ..382 } if typ == 100 && fields.len() >= 3 => {383 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =384 &fields[..]385 {386 let mut drv = drv.as_str();387388 if let Some(pkg) = drv.strip_prefix("/nix/store/") {389 let mut it = pkg.splitn(2, '-');390 it.next();391 if let Some(pkg) = it.next() {392 drv = pkg;393 }394 }395 // info!(target: "nix","copying {} {} -> {}", drv, from, to);396 let span = info_span!("copy", from, to, drv);397 span.pb_start();398 self.spans.insert(id, span);399 } else {400 warn!("bad copy log: {:?}", log)401 }402 }403 NixLog::Start { text, typ, id, .. }404 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>405 {406 if !text.is_empty()407 && text != "querying info about missing paths"408 && text != "copying 0 paths"409 // Too much spam on lazy-trees branch410 && !(text.starts_with("copying '") && text.ends_with("' to the store"))411 {412 let span = info_span!("job");413 span.pb_start();414 span.pb_set_message(&process_message(text.trim()));415 self.spans.insert(id, span);416 info!(target: "nix", "{}", text);417 }418 }419 NixLog::Start {420 text,421 level: 0,422 typ: 108,423 ..424 } if text.is_empty() => {425 // Cache lookup? Coupled with copy log426 }427 NixLog::Start {428 text,429 level: 4,430 typ: 109,431 ..432 } if text.starts_with("querying info about ") => {433 // Cache lookup434 }435 NixLog::Start {436 text,437 level: 4,438 typ: 101,439 ..440 } if text.starts_with("downloading ") => {441 // NAR downloading, coupled with copy log442 }443 NixLog::Start {444 text,445 level: 1,446 typ: 111,447 ..448 } if text.starts_with("waiting for a machine to build ") => {449 // Useless repeating notification about build450 }451 NixLog::Start {452 text,453 level: 3,454 typ: 111,455 ..456 } if text.starts_with("resolved derivation: ") => {457 // CA resolved458 }459 NixLog::Start {460 text,461 level: 1,462 typ: 111,463 id,464 ..465 } if text.starts_with("waiting for lock on ") => {466 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();467 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {468 drv = txt;469 }470 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {471 drv = txt;472 }473 if let Some(txt) = drv.split("', '").next() {474 drv = txt;475 }476 if let Some(pkg) = drv.strip_prefix("/nix/store/") {477 let mut it = pkg.splitn(2, '-');478 it.next();479 if let Some(pkg) = it.next() {480 drv = pkg;481 }482 }483 let span = info_span!("waiting on drv", drv);484 span.pb_start();485 self.spans.insert(id, span);486 // Concurrent build of the same message487 }488 NixLog::Stop { id, .. } => {489 self.spans.remove(&id);490 }491 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {492 if let Some(span) = self.spans.get(&id) {493 if let LogField::String(s) = &fields[0] {494 span.pb_set_message(&process_message(s.trim()));495 } else {496 warn!("bad fields: {fields:?}");497 }498 } else {499 warn!("unknown result id: {id} {typ} {fields:?}");500 }501 // dbg!(fields, id, typ);502 }503 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {504 if let Some(span) = self.spans.get(&id) {505 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =506 &fields[..4]507 {508 span.pb_set_length(*expected);509 span.pb_set_position(*done);510 } else {511 warn!("bad fields: {fields:?}");512 }513 } else {514 // warn!("unknown result id: {id} {typ} {fields:?}");515 // Unaccounted progress.516 }517 // dbg!(fields, id, typ);518 }519 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {520 // Set phase, expected521 }522 _ => warn!("unknown log: {:?}", log),523 };524 } else {525 let e = e.trim();526 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {527 return;528 }529 info!("{e}")530 }531 }532}533534async fn run_nix_inner_raw(261async fn run_nix_inner_raw(535 str: String,262 str: String,540) -> Result<Option<Vec<u8>>> {267) -> Result<Option<Vec<u8>>> {541 cmd.stderr(Stdio::piped());268 cmd.stderr(Stdio::piped());542 cmd.stdout(Stdio::piped());269 cmd.stdout(Stdio::piped());270 debug!("running command {cmd:?} on local");543 let mut child = cmd.spawn()?;271 let mut child = cmd.spawn()?;544 let mut stderr = child.stderr.take().unwrap();272 let mut stderr = child.stderr.take().unwrap();545 let stdout = child.stdout.take().unwrap();273 let stdout = child.stdout.take().unwrap();600 err_handler: &mut dyn Handler,328 err_handler: &mut dyn Handler,601 mut out_handler: Option<&mut dyn Handler>,329 mut out_handler: Option<&mut dyn Handler>,602) -> Result<Option<Vec<u8>>> {330) -> Result<Option<Vec<u8>>> {331 debug!("running command {cmd:?} over ssh");603 cmd.stderr(openssh::Stdio::piped());332 cmd.stderr(openssh::Stdio::piped());604 cmd.stdout(openssh::Stdio::piped());333 cmd.stdout(openssh::Stdio::piped());605 let mut child = cmd.spawn().await?;334 let mut child = cmd.spawn().await?;656 }385 }657386658 Ok(out_buf)387 Ok(out_buf)659}660661pub trait ErrorRecorder: Send {662 /// Return true to discard message from logging663 fn push_message(&mut self, msg: &str) -> bool;664}665666#[derive(Debug)]667enum LogField {668 String(String),669 Num(u64),670}671672impl<'de> Deserialize<'de> for LogField {673 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>674 where675 D: serde::Deserializer<'de>,676 {677 struct StringOrNum;678 impl<'de> Visitor<'de> for StringOrNum {679 type Value = LogField;680681 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {682 write!(f, "string or unsigned")683 }684685 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>686 where687 E: serde::de::Error,688 {689 Ok(LogField::String(v.to_owned()))690 }691692 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>693 where694 E: serde::de::Error,695 {696 Ok(LogField::Num(v))697 }698 }699700 deserializer.deserialize_any(StringOrNum)701 }702}703704#[derive(Deserialize, Debug)]705#[serde(rename_all = "camelCase", tag = "action")]706#[allow(dead_code)]707enum NixLog {708 Msg {709 level: u32,710 msg: String,711 raw_msg: Option<String>,712 },713 Start {714 id: u64,715 level: u32,716 #[serde(default)]717 fields: Vec<LogField>,718 text: String,719 #[serde(rename = "type")]720 typ: u32,721 },722 Stop {723 id: u64,724 },725 Result {726 id: u64,727 #[serde(rename = "type")]728 typ: u32,729 #[serde(default)]730 fields: Vec<LogField>,731 },732}388}733389cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
sync::{Arc, Mutex, MutexGuard, OnceLock},
};
-use age::Recipient;
use anyhow::{anyhow, bail, Context, Result};
use clap::{ArgGroup, Parser};
use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
pub struct ConfigHost {
pub name: String,
+ pub local: bool,
pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ assert!(!self.local, "do not open ssh connection to local session");
// FIXME: TOCTOU
if let Some(session) = &self.session.get() {
return Ok((*session).clone());
@@ -96,8 +97,12 @@
D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
}
pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
- let session = self.open_session().await?;
- Ok(MyCommand::new_on(cmd, session))
+ if self.local {
+ Ok(MyCommand::new(cmd))
+ } else {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
}
pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
.context("failed to call remote host for decrypt")?;
z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
}
+ pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+ for target in targets {
+ cmd.eqarg("--targets", target);
+ }
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
/// Returns path for futureproofing, as path might change i.e on conversion to CA
pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ if self.local {
+ // Path is located locally, thus already trusted.
+ return Ok(path.to_owned());
+ }
let mut nix = MyCommand::new("nix");
nix.arg("copy")
.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
nix.run_nix().await?;
Ok(path.to_owned())
}
+ pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("stop").arg(name);
+ cmd.sudo().run().await
+ }
+ pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("start").arg(name);
+ cmd.sudo().run().await
+ }
+
+ pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+ let mut cmd = self.cmd("rm").await?;
+ cmd.arg("-f").arg(path);
+ if sudo {
+ cmd = cmd.sudo()
+ }
+ cmd.run().await
+ }
}
impl Config {
@@ -134,35 +175,12 @@
}
pub fn is_local(&self, host: &str) -> bool {
self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
- }
-
- pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run().await
- }
- pub async fn run_string_on(
- &self,
- host: &str,
- mut command: MyCommand,
- sudo: bool,
- ) -> Result<String> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run_string().await
}
pub async fn host(&self, name: &str) -> Result<ConfigHost> {
Ok(ConfigHost {
name: name.to_owned(),
+ local: self.is_local(name),
session: OnceLock::new(),
})
}
@@ -172,6 +190,7 @@
let mut out = vec![];
for name in names {
out.push(ConfigHost {
+ local: self.is_local(&name),
name,
session: OnceLock::new(),
})
@@ -225,27 +244,6 @@
let mut data = self.data_mut();
let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
host_secrets.insert(secret, value);
- }
-
- pub async fn reencrypt_on_host(
- &self,
- host: &str,
- data: SecretData,
- targets: Vec<String>,
- ) -> Result<SecretData> {
- let mut recmd = MyCommand::new("fleet-install-secrets");
- recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
- for target in targets {
- recmd.eqarg("--targets", target);
- }
- recmd = recmd.sudo().ssh(host);
- let encoded = recmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- SecretData::decode_z85(&encoded)
}
pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}