difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
use std::sync::{Arc, OnceLock};
use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
use futures::StreamExt;
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::codec::{FramedRead, LinesCodec};
use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
@@ -30,6 +32,8 @@
string_wrapping: (String, String),
number_wrapping: (String, String),
+ executing_command: Arc<Mutex<()>>,
+
next_id: u32,
free_list: Vec<u32>,
}
@@ -219,6 +223,8 @@
string_wrapping: Default::default(),
number_wrapping: Default::default(),
+ executing_command: Arc::new(Mutex::new(())),
+
next_id: 0,
free_list: vec![],
};
@@ -331,6 +337,10 @@
expr: impl AsRef<[u8]>,
err_handler: &mut dyn Handler,
) -> Result<String> {
+ // Prevent two commands from being executed in parallel, messing with each other.
+ let _lock = self.executing_command.clone();
+ let _guard = _lock.lock().await;
+
self.send_command(expr).await?;
// It will be echoed
self.send_command(REPL_DELIMITER).await?;
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -112,12 +112,12 @@
current: bool,
datetime: String,
}
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
- let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.arg("--list-generations");
// Sudo is required due to --list-generations acquiring lock on the profile.
- let data = config.run_string_on(host, cmd, true).await?;
+ let data = cmd.sudo().run_string().await?;
let generations = data
.split('\n')
.map(|e| e.trim())
@@ -161,25 +161,12 @@
.map_err(|_e| anyhow!("bad list-generations output"))?
.ok_or_else(|| anyhow!("failed to find generation"))?;
Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("stop").arg(unit);
- config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("start").arg(unit);
- config.run_on(host, cmd, true).await
}
async fn execute_upload(
build: &BuildSystems,
- config: &Config,
action: UploadAction,
- host: &str,
+ host: &ConfigHost,
built: PathBuf,
) -> Result<()> {
let mut failed = false;
@@ -191,15 +178,15 @@
if !build.disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
- let generation = get_current_generation(config, host).await?;
+ let generation = get_current_generation(&host).await?;
info!(
"rollback target would be {} {}",
generation.id, generation.datetime
);
{
- let mut cmd = MyCommand::new("sh");
+ let mut cmd = host.cmd("sh").await?;
cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to set rollback marker: {e}");
failed = true;
}
@@ -215,37 +202,41 @@
// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
// Anyway, reboot will still help in this case.
if action.should_schedule_rollback_run() {
- let mut cmd = MyCommand::new("systemd-run");
+ let mut cmd = host.cmd("systemd-run").await?;
cmd.comparg("--on-active", "3min")
.comparg("--unit", "rollback-watchdog-run")
.arg("systemctl")
.arg("start")
.arg("rollback-watchdog.service");
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to schedule rollback run: {e}");
failed = true;
}
}
}
+
if action.should_switch_profile() && !failed {
info!("switching generation");
- let mut cmd = MyCommand::new("nix-env");
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to switch generation: {e}");
failed = true;
}
}
+
+ // FIXME: Connection might be disconnected after activation run
+
if action.should_activate() && !failed {
let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
- let mut cmd = MyCommand::new(switch_script);
+ let mut cmd = host.cmd(switch_script).await?;
cmd.arg(action.name());
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
@@ -253,7 +244,8 @@
if !build.disable_rollback {
if failed {
info!("executing rollback");
- if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+ if let Err(e) = host
+ .systemctl_start("rollback-watchdog.service")
.instrument(info_span!("rollback"))
.await
{
@@ -261,27 +253,29 @@
}
} else {
info!("trying to mark upgrade as successful");
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
}
}
info!("disarming watchdog, just in case");
- if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+ if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
// It is ok, if there was no reboot - then timer might not be running.
}
if action.should_schedule_rollback_run() {
- if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+ if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
error!("failed to disarm rollback run: {e}");
}
}
- } else {
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
- // Marker might not exist, yet better try to remove it.
- }
+ } else if let Err(_e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
+ // Marker might not exist, yet better try to remove it.
}
Ok(())
}
@@ -289,12 +283,13 @@
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
+ let host = config.host(&host).await?;
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
let drv = nix_go!(
fleet_field.buildSystems(Obj {
localSystem: { config.local_system.clone() }
- })[{ action.build_attr() }][{ host }]
+ })[{ action.build_attr() }][{ &host.name }]
);
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
match action {
Action::Upload { action } => {
- if !config.is_local(&host) {
+ if !config.is_local(&host.name) {
info!("uploading system closure");
{
+ // TODO: Move to remote_derivation method.
// Alternatively, nix store make-content-addressed can be used,
// at least for the first deployment, to provide trusted store key.
//
@@ -329,13 +325,11 @@
}
let mut tries = 0;
loop {
- let mut nix = MyCommand::new("nix");
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{host}"))
- .arg(out_output);
- match nix.run_nix().await {
- Ok(()) => break,
+ match host.remote_derivation(out_output).await {
+ Ok(remote) => {
+ assert!(&remote == out_output, "CA derivations aren't implemented");
+ break;
+ }
Err(e) if tries < 3 => {
tries += 1;
warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, out_output.clone()).await?
+ execute_upload(&self, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host));
+ out.push(format!("sd-image-{}", host.name));
info!("linking sd image to {:?}", out);
symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host));
+ out.push(format!("installation-cd-{}", host.name));
info!("linking iso image to {:?}", out);
symlink(out_output, out)?;
@@ -379,6 +373,17 @@
let this = this.clone();
let span = info_span!("deployment", host = field::display(&host.name));
let hostname = host.name;
+ // FIXME: Since the introduction of better-nix-eval,
+ // due to single repl used for builds, hosts are waiting for each other to build,
+ // instead of building concurrently.
+ //
+ // Open multiple repls?
+ //
+ // Create build batcher, which will behave similar to golangs
+ // WaitGroup, and start executing once all the build tasks are scheduled?
+ // This also allows to cleanup build output, as there will be no longer
+ // "waiting for remote machine" messages in the cases when one package is needed for
+ // multiple hosts.
set.spawn_local(
(async move {
match this.build_task(config, hostname).await {
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
@@ -404,9 +404,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(data) = secret.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, data, target_recipients)
- .await?;
+ let host = config.host(&identity_holder).await?;
+ let encrypted = host.reencrypt(data, target_recipients).await?;
secret.secret.secret = Some(encrypted);
}
@@ -481,9 +480,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(secret) = data.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret, target_recipients)
- .await?;
+ let host = config.host(identity_holder).await?;
+ let encrypted = host.reencrypt(secret, target_recipients).await?;
data.secret.secret = Some(encrypted);
}
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
- collections::HashMap,
- ffi::OsStr,
- pin,
- process::Stdio,
- sync::{Arc, Mutex},
- task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
use futures::StreamExt;
use itertools::Either;
-use once_cell::sync::Lazy;
use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
use tokio::{io::AsyncRead, process::Command, select};
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
return self;
}
let mut out = Self::new("env");
- if let Some(session) = self.ssh_session {
- out = out.ssh_session(session);
- }
+ out.ssh_session = self.ssh_session;
for (k, v) in self.env {
assert!(!k.contains('='));
out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
out
} else {
let mut out = Self::new("sudo");
+ out.ssh_session = self.ssh_session.take();
out.args(self.into_args());
out
}
}
- pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
- self.ssh_session = Some(on);
- self
- }
- pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
- let mut out = Self::new("ssh");
- out.ssh_session = self.ssh_session.take();
- out.arg(on).arg("--");
- out.arg(self.into_string());
- out
- }
pub async fn run(self) -> Result<()> {
let str = self.clone().into_string();
@@ -276,259 +256,6 @@
let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
assert!(v.is_none());
Ok(())
-}
-
-pub trait Handler: Send {
- fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
- fn clone(&self) -> Self {
- Self(self.0.clone())
- }
-}
-impl<H> ClonableHandler<H> {
- pub fn new(inner: H) -> Self {
- Self(Arc::new(Mutex::new(inner)))
- }
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
- fn handle_line(&mut self, e: &str) {
- self.0.lock().unwrap().handle_line(e)
- }
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
- fn handle_line(&mut self, e: &str) {
- info!(target: "log", "{e}");
- }
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
- fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
- spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
- static OSC_CLEANER: Lazy<Regex> =
- Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
- static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
- let m = OSC_CLEANER.replace_all(m, "");
- // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
- // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
- DETABBER.replace_all(m.as_ref(), " ").to_string()
-}
-impl Handler for NixHandler {
- fn handle_line(&mut self, e: &str) {
- if let Some(e) = e.strip_prefix("@nix ") {
- let log: NixLog = match serde_json::from_str(e) {
- Ok(l) => l,
- Err(err) => {
- warn!("failed to parse nix log line {:?}: {}", e, err);
- return;
- }
- };
- match log {
- NixLog::Msg { msg, raw_msg, .. } => {
- #[allow(clippy::nonminimal_bool)]
- if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
- && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
- && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
- if let Some(raw_msg) = raw_msg {
- if !msg.is_empty() {
- info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
- } else {
- info!(target: "nix", "{}", raw_msg.trim_end())
- }
- } else {
- info!(target: "nix", "{}", msg.trim_end())
- }
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 105 && !fields.is_empty() => {
- if let [LogField::String(drv), ..] = &fields[..] {
- let mut drv = drv.as_str();
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- info!(target: "nix","building {}", drv);
- let span = info_span!("build", drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad build log: {:?}", log)
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 100 && fields.len() >= 3 => {
- if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
- &fields[..]
- {
- let mut drv = drv.as_str();
-
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- // info!(target: "nix","copying {} {} -> {}", drv, from, to);
- let span = info_span!("copy", from, to, drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad copy log: {:?}", log)
- }
- }
- NixLog::Start { text, typ, id, .. }
- if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
- {
- if !text.is_empty()
- && text != "querying info about missing paths"
- && text != "copying 0 paths"
- // Too much spam on lazy-trees branch
- && !(text.starts_with("copying '") && text.ends_with("' to the store"))
- {
- let span = info_span!("job");
- span.pb_start();
- span.pb_set_message(&process_message(text.trim()));
- self.spans.insert(id, span);
- info!(target: "nix", "{}", text);
- }
- }
- NixLog::Start {
- text,
- level: 0,
- typ: 108,
- ..
- } if text.is_empty() => {
- // Cache lookup? Coupled with copy log
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 109,
- ..
- } if text.starts_with("querying info about ") => {
- // Cache lookup
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 101,
- ..
- } if text.starts_with("downloading ") => {
- // NAR downloading, coupled with copy log
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- ..
- } if text.starts_with("waiting for a machine to build ") => {
- // Useless repeating notification about build
- }
- NixLog::Start {
- text,
- level: 3,
- typ: 111,
- ..
- } if text.starts_with("resolved derivation: ") => {
- // CA resolved
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- id,
- ..
- } if text.starts_with("waiting for lock on ") => {
- let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
- if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
- drv = txt;
- }
- if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
- drv = txt;
- }
- if let Some(txt) = drv.split("', '").next() {
- drv = txt;
- }
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- let span = info_span!("waiting on drv", drv);
- span.pb_start();
- self.spans.insert(id, span);
- // Concurrent build of the same message
- }
- NixLog::Stop { id, .. } => {
- self.spans.remove(&id);
- }
- NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
- if let Some(span) = self.spans.get(&id) {
- if let LogField::String(s) = &fields[0] {
- span.pb_set_message(&process_message(s.trim()));
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- warn!("unknown result id: {id} {typ} {fields:?}");
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
- if let Some(span) = self.spans.get(&id) {
- if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
- &fields[..4]
- {
- span.pb_set_length(*expected);
- span.pb_set_position(*done);
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- // warn!("unknown result id: {id} {typ} {fields:?}");
- // Unaccounted progress.
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
- // Set phase, expected
- }
- _ => warn!("unknown log: {:?}", log),
- };
- } else {
- let e = e.trim();
- if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
- return;
- }
- info!("{e}")
- }
- }
}
async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
) -> Result<Option<Vec<u8>>> {
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
+ debug!("running command {cmd:?} on local");
let mut child = cmd.spawn()?;
let mut stderr = child.stderr.take().unwrap();
let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
err_handler: &mut dyn Handler,
mut out_handler: Option<&mut dyn Handler>,
) -> Result<Option<Vec<u8>>> {
+ debug!("running command {cmd:?} over ssh");
cmd.stderr(openssh::Stdio::piped());
cmd.stdout(openssh::Stdio::piped());
let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
}
Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
- /// Return true to discard message from logging
- fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
- String(String),
- Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- struct StringOrNum;
- impl<'de> Visitor<'de> for StringOrNum {
- type Value = LogField;
-
- fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "string or unsigned")
- }
-
- fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::String(v.to_owned()))
- }
-
- fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::Num(v))
- }
- }
-
- deserializer.deserialize_any(StringOrNum)
- }
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
- Msg {
- level: u32,
- msg: String,
- raw_msg: Option<String>,
- },
- Start {
- id: u64,
- level: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- text: String,
- #[serde(rename = "type")]
- typ: u32,
- },
- Stop {
- id: u64,
- },
- Result {
- id: u64,
- #[serde(rename = "type")]
- typ: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- },
}
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
sync::{Arc, Mutex, MutexGuard, OnceLock},
};
-use age::Recipient;
use anyhow::{anyhow, bail, Context, Result};
use clap::{ArgGroup, Parser};
use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
pub struct ConfigHost {
pub name: String,
+ pub local: bool,
pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ assert!(!self.local, "do not open ssh connection to local session");
// FIXME: TOCTOU
if let Some(session) = &self.session.get() {
return Ok((*session).clone());
@@ -96,8 +97,12 @@
D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
}
pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
- let session = self.open_session().await?;
- Ok(MyCommand::new_on(cmd, session))
+ if self.local {
+ Ok(MyCommand::new(cmd))
+ } else {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
}
pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
.context("failed to call remote host for decrypt")?;
z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
}
+ pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+ for target in targets {
+ cmd.eqarg("--targets", target);
+ }
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
/// Returns path for futureproofing, as path might change i.e on conversion to CA
pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ if self.local {
+ // Path is located locally, thus already trusted.
+ return Ok(path.to_owned());
+ }
let mut nix = MyCommand::new("nix");
nix.arg("copy")
.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
nix.run_nix().await?;
Ok(path.to_owned())
}
+ pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("stop").arg(name);
+ cmd.sudo().run().await
+ }
+ pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("start").arg(name);
+ cmd.sudo().run().await
+ }
+
+ pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+ let mut cmd = self.cmd("rm").await?;
+ cmd.arg("-f").arg(path);
+ if sudo {
+ cmd = cmd.sudo()
+ }
+ cmd.run().await
+ }
}
impl Config {
@@ -134,35 +175,12 @@
}
pub fn is_local(&self, host: &str) -> bool {
self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
- }
-
- pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run().await
- }
- pub async fn run_string_on(
- &self,
- host: &str,
- mut command: MyCommand,
- sudo: bool,
- ) -> Result<String> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run_string().await
}
pub async fn host(&self, name: &str) -> Result<ConfigHost> {
Ok(ConfigHost {
name: name.to_owned(),
+ local: self.is_local(name),
session: OnceLock::new(),
})
}
@@ -172,6 +190,7 @@
let mut out = vec![];
for name in names {
out.push(ConfigHost {
+ local: self.is_local(&name),
name,
session: OnceLock::new(),
})
@@ -225,27 +244,6 @@
let mut data = self.data_mut();
let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
host_secrets.insert(secret, value);
- }
-
- pub async fn reencrypt_on_host(
- &self,
- host: &str,
- data: SecretData,
- targets: Vec<String>,
- ) -> Result<SecretData> {
- let mut recmd = MyCommand::new("fleet-install-secrets");
- recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
- for target in targets {
- recmd.eqarg("--targets", target);
- }
- recmd = recmd.sudo().ssh(host);
- let encoded = recmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- SecretData::decode_z85(&encoded)
}
pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterbothno changes
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}