difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
use std::sync::{Arc, OnceLock};
use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
use futures::StreamExt;
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::codec::{FramedRead, LinesCodec};
use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
@@ -30,6 +32,8 @@
string_wrapping: (String, String),
number_wrapping: (String, String),
+ executing_command: Arc<Mutex<()>>,
+
next_id: u32,
free_list: Vec<u32>,
}
@@ -219,6 +223,8 @@
string_wrapping: Default::default(),
number_wrapping: Default::default(),
+ executing_command: Arc::new(Mutex::new(())),
+
next_id: 0,
free_list: vec![],
};
@@ -331,6 +337,10 @@
expr: impl AsRef<[u8]>,
err_handler: &mut dyn Handler,
) -> Result<String> {
+ // Prevent two commands from being executed in parallel, messing with each other.
+ let _lock = self.executing_command.clone();
+ let _guard = _lock.lock().await;
+
self.send_command(expr).await?;
// It will be echoed
self.send_command(REPL_DELIMITER).await?;
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -112,12 +112,12 @@
current: bool,
datetime: String,
}
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
- let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.arg("--list-generations");
// Sudo is required due to --list-generations acquiring lock on the profile.
- let data = config.run_string_on(host, cmd, true).await?;
+ let data = cmd.sudo().run_string().await?;
let generations = data
.split('\n')
.map(|e| e.trim())
@@ -161,25 +161,12 @@
.map_err(|_e| anyhow!("bad list-generations output"))?
.ok_or_else(|| anyhow!("failed to find generation"))?;
Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("stop").arg(unit);
- config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("start").arg(unit);
- config.run_on(host, cmd, true).await
}
async fn execute_upload(
build: &BuildSystems,
- config: &Config,
action: UploadAction,
- host: &str,
+ host: &ConfigHost,
built: PathBuf,
) -> Result<()> {
let mut failed = false;
@@ -191,15 +178,15 @@
if !build.disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
- let generation = get_current_generation(config, host).await?;
+ let generation = get_current_generation(&host).await?;
info!(
"rollback target would be {} {}",
generation.id, generation.datetime
);
{
- let mut cmd = MyCommand::new("sh");
+ let mut cmd = host.cmd("sh").await?;
cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to set rollback marker: {e}");
failed = true;
}
@@ -215,37 +202,41 @@
// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
// Anyway, reboot will still help in this case.
if action.should_schedule_rollback_run() {
- let mut cmd = MyCommand::new("systemd-run");
+ let mut cmd = host.cmd("systemd-run").await?;
cmd.comparg("--on-active", "3min")
.comparg("--unit", "rollback-watchdog-run")
.arg("systemctl")
.arg("start")
.arg("rollback-watchdog.service");
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to schedule rollback run: {e}");
failed = true;
}
}
}
+
if action.should_switch_profile() && !failed {
info!("switching generation");
- let mut cmd = MyCommand::new("nix-env");
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to switch generation: {e}");
failed = true;
}
}
+
+ // FIXME: Connection might be disconnected after activation run
+
if action.should_activate() && !failed {
let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
- let mut cmd = MyCommand::new(switch_script);
+ let mut cmd = host.cmd(switch_script).await?;
cmd.arg(action.name());
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
@@ -253,7 +244,8 @@
if !build.disable_rollback {
if failed {
info!("executing rollback");
- if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+ if let Err(e) = host
+ .systemctl_start("rollback-watchdog.service")
.instrument(info_span!("rollback"))
.await
{
@@ -261,27 +253,29 @@
}
} else {
info!("trying to mark upgrade as successful");
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
}
}
info!("disarming watchdog, just in case");
- if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+ if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
// It is ok, if there was no reboot - then timer might not be running.
}
if action.should_schedule_rollback_run() {
- if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+ if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
error!("failed to disarm rollback run: {e}");
}
}
- } else {
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
- // Marker might not exist, yet better try to remove it.
- }
+ } else if let Err(_e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
+ // Marker might not exist, yet better try to remove it.
}
Ok(())
}
@@ -289,12 +283,13 @@
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
+ let host = config.host(&host).await?;
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
let drv = nix_go!(
fleet_field.buildSystems(Obj {
localSystem: { config.local_system.clone() }
- })[{ action.build_attr() }][{ host }]
+ })[{ action.build_attr() }][{ &host.name }]
);
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
match action {
Action::Upload { action } => {
- if !config.is_local(&host) {
+ if !config.is_local(&host.name) {
info!("uploading system closure");
{
+ // TODO: Move to remote_derivation method.
// Alternatively, nix store make-content-addressed can be used,
// at least for the first deployment, to provide trusted store key.
//
@@ -329,13 +325,11 @@
}
let mut tries = 0;
loop {
- let mut nix = MyCommand::new("nix");
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{host}"))
- .arg(out_output);
- match nix.run_nix().await {
- Ok(()) => break,
+ match host.remote_derivation(out_output).await {
+ Ok(remote) => {
+ assert!(&remote == out_output, "CA derivations aren't implemented");
+ break;
+ }
Err(e) if tries < 3 => {
tries += 1;
warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, out_output.clone()).await?
+ execute_upload(&self, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host));
+ out.push(format!("sd-image-{}", host.name));
info!("linking sd image to {:?}", out);
symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host));
+ out.push(format!("installation-cd-{}", host.name));
info!("linking iso image to {:?}", out);
symlink(out_output, out)?;
@@ -379,6 +373,17 @@
let this = this.clone();
let span = info_span!("deployment", host = field::display(&host.name));
let hostname = host.name;
+ // FIXME: Since the introduction of better-nix-eval,
+ // due to single repl used for builds, hosts are waiting for each other to build,
+ // instead of building concurrently.
+ //
+ // Open multiple repls?
+ //
+ // Create build batcher, which will behave similar to golangs
+ // WaitGroup, and start executing once all the build tasks are scheduled?
+ // This also allows to cleanup build output, as there will be no longer
+ // "waiting for remote machine" messages in the cases when one package is needed for
+ // multiple hosts.
set.spawn_local(
(async move {
match this.build_task(config, hostname).await {
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
@@ -404,9 +404,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(data) = secret.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, data, target_recipients)
- .await?;
+ let host = config.host(&identity_holder).await?;
+ let encrypted = host.reencrypt(data, target_recipients).await?;
secret.secret.secret = Some(encrypted);
}
@@ -481,9 +480,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(secret) = data.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret, target_recipients)
- .await?;
+ let host = config.host(identity_holder).await?;
+ let encrypted = host.reencrypt(secret, target_recipients).await?;
data.secret.secret = Some(encrypted);
}
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
- collections::HashMap,
- ffi::OsStr,
- pin,
- process::Stdio,
- sync::{Arc, Mutex},
- task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
use futures::StreamExt;
use itertools::Either;
-use once_cell::sync::Lazy;
use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
use tokio::{io::AsyncRead, process::Command, select};
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
return self;
}
let mut out = Self::new("env");
- if let Some(session) = self.ssh_session {
- out = out.ssh_session(session);
- }
+ out.ssh_session = self.ssh_session;
for (k, v) in self.env {
assert!(!k.contains('='));
out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
out
} else {
let mut out = Self::new("sudo");
+ out.ssh_session = self.ssh_session.take();
out.args(self.into_args());
out
}
}
- pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
- self.ssh_session = Some(on);
- self
- }
- pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
- let mut out = Self::new("ssh");
- out.ssh_session = self.ssh_session.take();
- out.arg(on).arg("--");
- out.arg(self.into_string());
- out
- }
pub async fn run(self) -> Result<()> {
let str = self.clone().into_string();
@@ -276,259 +256,6 @@
let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
assert!(v.is_none());
Ok(())
-}
-
-pub trait Handler: Send {
- fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
- fn clone(&self) -> Self {
- Self(self.0.clone())
- }
-}
-impl<H> ClonableHandler<H> {
- pub fn new(inner: H) -> Self {
- Self(Arc::new(Mutex::new(inner)))
- }
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
- fn handle_line(&mut self, e: &str) {
- self.0.lock().unwrap().handle_line(e)
- }
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
- fn handle_line(&mut self, e: &str) {
- info!(target: "log", "{e}");
- }
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
- fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
- spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
- static OSC_CLEANER: Lazy<Regex> =
- Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
- static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
- let m = OSC_CLEANER.replace_all(m, "");
- // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
- // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
- DETABBER.replace_all(m.as_ref(), " ").to_string()
-}
-impl Handler for NixHandler {
- fn handle_line(&mut self, e: &str) {
- if let Some(e) = e.strip_prefix("@nix ") {
- let log: NixLog = match serde_json::from_str(e) {
- Ok(l) => l,
- Err(err) => {
- warn!("failed to parse nix log line {:?}: {}", e, err);
- return;
- }
- };
- match log {
- NixLog::Msg { msg, raw_msg, .. } => {
- #[allow(clippy::nonminimal_bool)]
- if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
- && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
- && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
- if let Some(raw_msg) = raw_msg {
- if !msg.is_empty() {
- info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
- } else {
- info!(target: "nix", "{}", raw_msg.trim_end())
- }
- } else {
- info!(target: "nix", "{}", msg.trim_end())
- }
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 105 && !fields.is_empty() => {
- if let [LogField::String(drv), ..] = &fields[..] {
- let mut drv = drv.as_str();
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- info!(target: "nix","building {}", drv);
- let span = info_span!("build", drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad build log: {:?}", log)
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 100 && fields.len() >= 3 => {
- if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
- &fields[..]
- {
- let mut drv = drv.as_str();
-
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- // info!(target: "nix","copying {} {} -> {}", drv, from, to);
- let span = info_span!("copy", from, to, drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad copy log: {:?}", log)
- }
- }
- NixLog::Start { text, typ, id, .. }
- if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
- {
- if !text.is_empty()
- && text != "querying info about missing paths"
- && text != "copying 0 paths"
- // Too much spam on lazy-trees branch
- && !(text.starts_with("copying '") && text.ends_with("' to the store"))
- {
- let span = info_span!("job");
- span.pb_start();
- span.pb_set_message(&process_message(text.trim()));
- self.spans.insert(id, span);
- info!(target: "nix", "{}", text);
- }
- }
- NixLog::Start {
- text,
- level: 0,
- typ: 108,
- ..
- } if text.is_empty() => {
- // Cache lookup? Coupled with copy log
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 109,
- ..
- } if text.starts_with("querying info about ") => {
- // Cache lookup
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 101,
- ..
- } if text.starts_with("downloading ") => {
- // NAR downloading, coupled with copy log
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- ..
- } if text.starts_with("waiting for a machine to build ") => {
- // Useless repeating notification about build
- }
- NixLog::Start {
- text,
- level: 3,
- typ: 111,
- ..
- } if text.starts_with("resolved derivation: ") => {
- // CA resolved
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- id,
- ..
- } if text.starts_with("waiting for lock on ") => {
- let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
- if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
- drv = txt;
- }
- if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
- drv = txt;
- }
- if let Some(txt) = drv.split("', '").next() {
- drv = txt;
- }
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- let span = info_span!("waiting on drv", drv);
- span.pb_start();
- self.spans.insert(id, span);
- // Concurrent build of the same message
- }
- NixLog::Stop { id, .. } => {
- self.spans.remove(&id);
- }
- NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
- if let Some(span) = self.spans.get(&id) {
- if let LogField::String(s) = &fields[0] {
- span.pb_set_message(&process_message(s.trim()));
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- warn!("unknown result id: {id} {typ} {fields:?}");
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
- if let Some(span) = self.spans.get(&id) {
- if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
- &fields[..4]
- {
- span.pb_set_length(*expected);
- span.pb_set_position(*done);
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- // warn!("unknown result id: {id} {typ} {fields:?}");
- // Unaccounted progress.
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
- // Set phase, expected
- }
- _ => warn!("unknown log: {:?}", log),
- };
- } else {
- let e = e.trim();
- if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
- return;
- }
- info!("{e}")
- }
- }
}
async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
) -> Result<Option<Vec<u8>>> {
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
+ debug!("running command {cmd:?} on local");
let mut child = cmd.spawn()?;
let mut stderr = child.stderr.take().unwrap();
let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
err_handler: &mut dyn Handler,
mut out_handler: Option<&mut dyn Handler>,
) -> Result<Option<Vec<u8>>> {
+ debug!("running command {cmd:?} over ssh");
cmd.stderr(openssh::Stdio::piped());
cmd.stdout(openssh::Stdio::piped());
let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
}
Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
- /// Return true to discard message from logging
- fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
- String(String),
- Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- struct StringOrNum;
- impl<'de> Visitor<'de> for StringOrNum {
- type Value = LogField;
-
- fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "string or unsigned")
- }
-
- fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::String(v.to_owned()))
- }
-
- fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::Num(v))
- }
- }
-
- deserializer.deserialize_any(StringOrNum)
- }
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
- Msg {
- level: u32,
- msg: String,
- raw_msg: Option<String>,
- },
- Start {
- id: u64,
- level: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- text: String,
- #[serde(rename = "type")]
- typ: u32,
- },
- Stop {
- id: u64,
- },
- Result {
- id: u64,
- #[serde(rename = "type")]
- typ: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- },
}
cmds/fleet/src/host.rsdiffbeforeafterboth1use std::{2 env::current_dir,3 ffi::{OsStr, OsString},4 fmt::Display,5 io::Write,6 ops::Deref,7 path::PathBuf,8 str::FromStr,9 sync::{Arc, Mutex, MutexGuard, OnceLock},10};1112use age::Recipient;13use anyhow::{anyhow, bail, Context, Result};14use clap::{ArgGroup, Parser};15use openssh::SessionBuilder;16use serde::de::DeserializeOwned;17use tempfile::NamedTempFile;1819use crate::{20 better_nix_eval::{Field, NixSessionPool},21 command::MyCommand,22 fleetdata::{FleetData, FleetSecret, FleetSharedSecret, SecretData},23 nix_go, nix_go_json,24};2526pub struct FleetConfigInternals {27 pub local_system: String,28 pub directory: PathBuf,29 pub opts: FleetOpts,30 pub data: Mutex<FleetData>,31 pub nix_args: Vec<OsString>,32 /// fleetConfigurations.<name>.<localSystem>33 pub fleet_field: Field,34 /// fleet_config.configUnchecked35 pub config_field: Field,36 /// fleet_config.unchecked37 pub config_unchecked_field: Field,38}3940#[derive(Clone)]41pub struct Config(Arc<FleetConfigInternals>);4243impl Deref for Config {44 type Target = FleetConfigInternals;4546 fn deref(&self) -> &Self::Target {47 &self.048 }49}5051pub struct ConfigHost {52 pub name: String,53 pub session: OnceLock<Arc<openssh::Session>>,54}55impl ConfigHost {56 pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {57 // FIXME: TOCTOU58 if let Some(session) = &self.session.get() {59 return Ok((*session).clone());60 };61 let session = SessionBuilder::default();6263 let session = session64 .connect(&self.name)65 .await66 .map_err(|e| anyhow!("ssh error: {e}"))?;67 let session = Arc::new(session);68 self.session.set(session.clone()).expect("TOCTOU happened");69 Ok(session)70 }71 pub async fn mktemp_dir(&self) -> Result<String> {72 let mut cmd = self.cmd("mktemp").await?;73 cmd.arg("-d");74 let path = cmd.run_string().await?;75 Ok(path.trim_end().to_owned())76 }77 pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {78 let mut cmd = self.cmd("cat").await?;79 cmd.arg(path);80 cmd.run_bytes().await81 }82 pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {83 let mut cmd = self.cmd("cat").await?;84 cmd.arg(path);85 cmd.run_string().await86 }87 pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {88 let text = self.read_file_text(path).await?;89 Ok(serde_json::from_str(&text)?)90 }91 pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>92 where93 <D as FromStr>::Err: Display,94 {95 let text = self.read_file_text(path).await?;96 D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))97 }98 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {99 let session = self.open_session().await?;100 Ok(MyCommand::new_on(cmd, session))101 }102103 pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {104 let mut cmd = self.cmd("fleet-install-secrets").await?;105 cmd.arg("decrypt").eqarg("--secret", data.encode_z85());106 let encoded = cmd107 .sudo()108 .run_string()109 .await110 .context("failed to call remote host for decrypt")?;111 z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")112 }113 /// Returns path for futureproofing, as path might change i.e on conversion to CA114 pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {115 let mut nix = MyCommand::new("nix");116 nix.arg("copy")117 .arg("--substitute-on-destination")118 .comparg("--to", format!("ssh-ng://{}", self.name))119 .arg(path);120 nix.run_nix().await?;121 Ok(path.to_owned())122 }123}124125impl Config {126 pub fn should_skip(&self, host: &str) -> bool {127 if !self.opts.skip.is_empty() {128 self.opts.skip.iter().any(|h| h as &str == host)129 } else if !self.opts.only.is_empty() {130 !self.opts.only.iter().any(|h| h as &str == host)131 } else {132 false133 }134 }135 pub fn is_local(&self, host: &str) -> bool {136 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)137 }138139 pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {140 if sudo {141 command = command.sudo();142 }143 if !self.is_local(host) {144 command = command.ssh(host);145 }146 command.run().await147 }148 pub async fn run_string_on(149 &self,150 host: &str,151 mut command: MyCommand,152 sudo: bool,153 ) -> Result<String> {154 if sudo {155 command = command.sudo();156 }157 if !self.is_local(host) {158 command = command.ssh(host);159 }160 command.run_string().await161 }162163 pub async fn host(&self, name: &str) -> Result<ConfigHost> {164 Ok(ConfigHost {165 name: name.to_owned(),166 session: OnceLock::new(),167 })168 }169 pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {170 let fleet_field = &self.fleet_field;171 let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;172 let mut out = vec![];173 for name in names {174 out.push(ConfigHost {175 name,176 session: OnceLock::new(),177 })178 }179 Ok(out)180 }181 pub async fn system_config(&self, host: &str) -> Result<Field> {182 let fleet_field = &self.fleet_field;183 Ok(nix_go!(fleet_field.configuredSystems[{ host }].config))184 }185186 pub(super) fn data(&self) -> MutexGuard<FleetData> {187 self.data.lock().unwrap()188 }189 pub(super) fn data_mut(&self) -> MutexGuard<FleetData> {190 self.data.lock().unwrap()191 }192 /// Shared secrets configured in fleet.nix or in flake193 pub async fn list_configured_shared(&self) -> Result<Vec<String>> {194 let config_field = &self.config_unchecked_field;195 nix_go!(config_field.configUnchecked.sharedSecrets)196 .list_fields()197 .await198 }199 /// Shared secrets configured in fleet.nix200 pub fn list_shared(&self) -> Vec<String> {201 let data = self.data();202 data.shared_secrets.keys().cloned().collect()203 }204 pub fn has_shared(&self, name: &str) -> bool {205 let data = self.data();206 data.shared_secrets.contains_key(name)207 }208 pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {209 let mut data = self.data_mut();210 data.shared_secrets.insert(name.to_owned(), shared);211 }212 pub fn remove_shared(&self, secret: &str) {213 let mut data = self.data_mut();214 data.shared_secrets.remove(secret);215 }216217 pub fn has_secret(&self, host: &str, secret: &str) -> bool {218 let data = self.data();219 let Some(host_secrets) = data.host_secrets.get(host) else {220 return false;221 };222 host_secrets.contains_key(secret)223 }224 pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {225 let mut data = self.data_mut();226 let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();227 host_secrets.insert(secret, value);228 }229230 pub async fn reencrypt_on_host(231 &self,232 host: &str,233 data: SecretData,234 targets: Vec<String>,235 ) -> Result<SecretData> {236 let mut recmd = MyCommand::new("fleet-install-secrets");237 recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());238 for target in targets {239 recmd.eqarg("--targets", target);240 }241 recmd = recmd.sudo().ssh(host);242 let encoded = recmd243 .run_string()244 .await245 .context("failed to call remote host for decrypt")?246 .trim()247 .to_owned();248 SecretData::decode_z85(&encoded)249 }250251 pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {252 let data = self.data();253 let Some(host_secrets) = data.host_secrets.get(host) else {254 bail!("no secrets for machine {host}");255 };256 let Some(secret) = host_secrets.get(secret) else {257 bail!("machine {host} has no secret {secret}");258 };259 Ok(secret.clone())260 }261 pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {262 let data = self.data();263 let Some(secret) = data.shared_secrets.get(secret) else {264 bail!("no shared secret {secret}");265 };266 Ok(secret.clone())267 }268 pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {269 let config_field = &self.config_unchecked_field;270 Ok(nix_go_json!(271 config_field.configUnchecked.sharedSecrets[{ secret }].expectedOwners272 ))273 }274275 pub fn save(&self) -> Result<()> {276 let mut tempfile = NamedTempFile::new_in(self.directory.clone())?;277 let data = nixlike::serialize(&self.data() as &FleetData)?;278 tempfile.write_all(279 format!(280 "# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",281 data282 )283 .as_bytes(),284 )?;285 let mut fleet_data_path = self.directory.clone();286 fleet_data_path.push("fleet.nix");287 tempfile.persist(fleet_data_path)?;288 Ok(())289 }290}291292#[derive(Parser, Clone)]293#[clap(group = ArgGroup::new("target_hosts"))]294pub struct FleetOpts {295 /// All hosts except those would be skipped296 #[clap(long, number_of_values = 1, group = "target_hosts")]297 only: Vec<String>,298299 /// Hosts to skip300 #[clap(long, number_of_values = 1, group = "target_hosts")]301 skip: Vec<String>,302303 /// Host, which should be threaten as current machine304 #[clap(long)]305 pub localhost: Option<String>,306307 /// Override detected system for host, to perform builds via308 /// binfmt-declared qemu instead of trying to crosscompile309 #[clap(long, default_value = "detect")]310 pub local_system: String,311}312313impl FleetOpts {314 pub async fn build(mut self, nix_args: Vec<OsString>) -> Result<Config> {315 if self.localhost.is_none() {316 self.localhost317 .replace(hostname::get().unwrap().to_str().unwrap().to_owned());318 }319 let directory = current_dir()?;320321 let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;322 let root_field = pool.get().await?;323324 if self.local_system == "detect" {325 let builtins_field = Field::field(root_field.clone(), "builtins").await?;326 self.local_system = nix_go_json!(builtins_field.currentSystem);327 }328 let local_system = self.local_system.clone();329330 let fleet_root = Field::field(root_field, "fleetConfigurations").await?;331332 let fleet_field = nix_go!(fleet_root.default);333 let config_field = nix_go!(fleet_field.configUnchecked);334 let config_unchecked_field = nix_go!(fleet_field.unchecked);335336 let mut fleet_data_path = directory.clone();337 fleet_data_path.push("fleet.nix");338 let bytes = std::fs::read_to_string(fleet_data_path)?;339 let data = nixlike::parse_str(&bytes)?;340341 Ok(Config(Arc::new(FleetConfigInternals {342 opts: self,343 directory,344 data,345 local_system,346 nix_args,347 fleet_field,348 config_field,349 config_unchecked_field,350 })))351 }352}1use std::{2 env::current_dir,3 ffi::{OsStr, OsString},4 fmt::Display,5 io::Write,6 ops::Deref,7 path::PathBuf,8 str::FromStr,9 sync::{Arc, Mutex, MutexGuard, OnceLock},10};1112use anyhow::{anyhow, bail, Context, Result};13use clap::{ArgGroup, Parser};14use openssh::SessionBuilder;15use serde::de::DeserializeOwned;16use tempfile::NamedTempFile;1718use crate::{19 better_nix_eval::{Field, NixSessionPool},20 command::MyCommand,21 fleetdata::{FleetData, FleetSecret, FleetSharedSecret, SecretData},22 nix_go, nix_go_json,23};2425pub struct FleetConfigInternals {26 pub local_system: String,27 pub directory: PathBuf,28 pub opts: FleetOpts,29 pub data: Mutex<FleetData>,30 pub nix_args: Vec<OsString>,31 /// fleetConfigurations.<name>.<localSystem>32 pub fleet_field: Field,33 /// fleet_config.configUnchecked34 pub config_field: Field,35 /// fleet_config.unchecked36 pub config_unchecked_field: Field,37}3839#[derive(Clone)]40pub struct Config(Arc<FleetConfigInternals>);4142impl Deref for Config {43 type Target = FleetConfigInternals;4445 fn deref(&self) -> &Self::Target {46 &self.047 }48}4950pub struct ConfigHost {51 pub name: String,52 pub local: bool,53 pub session: OnceLock<Arc<openssh::Session>>,54}55impl ConfigHost {56 async fn open_session(&self) -> Result<Arc<openssh::Session>> {57 assert!(!self.local, "do not open ssh connection to local session");58 // FIXME: TOCTOU59 if let Some(session) = &self.session.get() {60 return Ok((*session).clone());61 };62 let session = SessionBuilder::default();6364 let session = session65 .connect(&self.name)66 .await67 .map_err(|e| anyhow!("ssh error: {e}"))?;68 let session = Arc::new(session);69 self.session.set(session.clone()).expect("TOCTOU happened");70 Ok(session)71 }72 pub async fn mktemp_dir(&self) -> Result<String> {73 let mut cmd = self.cmd("mktemp").await?;74 cmd.arg("-d");75 let path = cmd.run_string().await?;76 Ok(path.trim_end().to_owned())77 }78 pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {79 let mut cmd = self.cmd("cat").await?;80 cmd.arg(path);81 cmd.run_bytes().await82 }83 pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {84 let mut cmd = self.cmd("cat").await?;85 cmd.arg(path);86 cmd.run_string().await87 }88 pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {89 let text = self.read_file_text(path).await?;90 Ok(serde_json::from_str(&text)?)91 }92 pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>93 where94 <D as FromStr>::Err: Display,95 {96 let text = self.read_file_text(path).await?;97 D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))98 }99 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {100 if self.local {101 Ok(MyCommand::new(cmd))102 } else {103 let session = self.open_session().await?;104 Ok(MyCommand::new_on(cmd, session))105 }106 }107108 pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {109 let mut cmd = self.cmd("fleet-install-secrets").await?;110 cmd.arg("decrypt").eqarg("--secret", data.encode_z85());111 let encoded = cmd112 .sudo()113 .run_string()114 .await115 .context("failed to call remote host for decrypt")?;116 z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")117 }118 pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {119 let mut cmd = self.cmd("fleet-install-secrets").await?;120 cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());121 for target in targets {122 cmd.eqarg("--targets", target);123 }124 let encoded = cmd125 .sudo()126 .run_string()127 .await128 .context("failed to call remote host for decrypt")?;129 SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")130 }131 /// Returns path for futureproofing, as path might change i.e on conversion to CA132 pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {133 if self.local {134 // Path is located locally, thus already trusted.135 return Ok(path.to_owned());136 }137 let mut nix = MyCommand::new("nix");138 nix.arg("copy")139 .arg("--substitute-on-destination")140 .comparg("--to", format!("ssh-ng://{}", self.name))141 .arg(path);142 nix.run_nix().await?;143 Ok(path.to_owned())144 }145 pub async fn systemctl_stop(&self, name: &str) -> Result<()> {146 let mut cmd = self.cmd("systemctl").await?;147 cmd.arg("stop").arg(name);148 cmd.sudo().run().await149 }150 pub async fn systemctl_start(&self, name: &str) -> Result<()> {151 let mut cmd = self.cmd("systemctl").await?;152 cmd.arg("start").arg(name);153 cmd.sudo().run().await154 }155156 pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {157 let mut cmd = self.cmd("rm").await?;158 cmd.arg("-f").arg(path);159 if sudo {160 cmd = cmd.sudo()161 }162 cmd.run().await163 }164}165166impl Config {167 pub fn should_skip(&self, host: &str) -> bool {168 if !self.opts.skip.is_empty() {169 self.opts.skip.iter().any(|h| h as &str == host)170 } else if !self.opts.only.is_empty() {171 !self.opts.only.iter().any(|h| h as &str == host)172 } else {173 false174 }175 }176 pub fn is_local(&self, host: &str) -> bool {177 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)178 }179180 pub async fn host(&self, name: &str) -> Result<ConfigHost> {181 Ok(ConfigHost {182 name: name.to_owned(),183 local: self.is_local(name),184 session: OnceLock::new(),185 })186 }187 pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {188 let fleet_field = &self.fleet_field;189 let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;190 let mut out = vec![];191 for name in names {192 out.push(ConfigHost {193 local: self.is_local(&name),194 name,195 session: OnceLock::new(),196 })197 }198 Ok(out)199 }200 pub async fn system_config(&self, host: &str) -> Result<Field> {201 let fleet_field = &self.fleet_field;202 Ok(nix_go!(fleet_field.configuredSystems[{ host }].config))203 }204205 pub(super) fn data(&self) -> MutexGuard<FleetData> {206 self.data.lock().unwrap()207 }208 pub(super) fn data_mut(&self) -> MutexGuard<FleetData> {209 self.data.lock().unwrap()210 }211 /// Shared secrets configured in fleet.nix or in flake212 pub async fn list_configured_shared(&self) -> Result<Vec<String>> {213 let config_field = &self.config_unchecked_field;214 nix_go!(config_field.configUnchecked.sharedSecrets)215 .list_fields()216 .await217 }218 /// Shared secrets configured in fleet.nix219 pub fn list_shared(&self) -> Vec<String> {220 let data = self.data();221 data.shared_secrets.keys().cloned().collect()222 }223 pub fn has_shared(&self, name: &str) -> bool {224 let data = self.data();225 data.shared_secrets.contains_key(name)226 }227 pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {228 let mut data = self.data_mut();229 data.shared_secrets.insert(name.to_owned(), shared);230 }231 pub fn remove_shared(&self, secret: &str) {232 let mut data = self.data_mut();233 data.shared_secrets.remove(secret);234 }235236 pub fn has_secret(&self, host: &str, secret: &str) -> bool {237 let data = self.data();238 let Some(host_secrets) = data.host_secrets.get(host) else {239 return false;240 };241 host_secrets.contains_key(secret)242 }243 pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {244 let mut data = self.data_mut();245 let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();246 host_secrets.insert(secret, value);247 }248249 pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {250 let data = self.data();251 let Some(host_secrets) = data.host_secrets.get(host) else {252 bail!("no secrets for machine {host}");253 };254 let Some(secret) = host_secrets.get(secret) else {255 bail!("machine {host} has no secret {secret}");256 };257 Ok(secret.clone())258 }259 pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {260 let data = self.data();261 let Some(secret) = data.shared_secrets.get(secret) else {262 bail!("no shared secret {secret}");263 };264 Ok(secret.clone())265 }266 pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {267 let config_field = &self.config_unchecked_field;268 Ok(nix_go_json!(269 config_field.configUnchecked.sharedSecrets[{ secret }].expectedOwners270 ))271 }272273 pub fn save(&self) -> Result<()> {274 let mut tempfile = NamedTempFile::new_in(self.directory.clone())?;275 let data = nixlike::serialize(&self.data() as &FleetData)?;276 tempfile.write_all(277 format!(278 "# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",279 data280 )281 .as_bytes(),282 )?;283 let mut fleet_data_path = self.directory.clone();284 fleet_data_path.push("fleet.nix");285 tempfile.persist(fleet_data_path)?;286 Ok(())287 }288}289290#[derive(Parser, Clone)]291#[clap(group = ArgGroup::new("target_hosts"))]292pub struct FleetOpts {293 /// All hosts except those would be skipped294 #[clap(long, number_of_values = 1, group = "target_hosts")]295 only: Vec<String>,296297 /// Hosts to skip298 #[clap(long, number_of_values = 1, group = "target_hosts")]299 skip: Vec<String>,300301 /// Host, which should be threaten as current machine302 #[clap(long)]303 pub localhost: Option<String>,304305 /// Override detected system for host, to perform builds via306 /// binfmt-declared qemu instead of trying to crosscompile307 #[clap(long, default_value = "detect")]308 pub local_system: String,309}310311impl FleetOpts {312 pub async fn build(mut self, nix_args: Vec<OsString>) -> Result<Config> {313 if self.localhost.is_none() {314 self.localhost315 .replace(hostname::get().unwrap().to_str().unwrap().to_owned());316 }317 let directory = current_dir()?;318319 let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;320 let root_field = pool.get().await?;321322 if self.local_system == "detect" {323 let builtins_field = Field::field(root_field.clone(), "builtins").await?;324 self.local_system = nix_go_json!(builtins_field.currentSystem);325 }326 let local_system = self.local_system.clone();327328 let fleet_root = Field::field(root_field, "fleetConfigurations").await?;329330 let fleet_field = nix_go!(fleet_root.default);331 let config_field = nix_go!(fleet_field.configUnchecked);332 let config_unchecked_field = nix_go!(fleet_field.unchecked);333334 let mut fleet_data_path = directory.clone();335 fleet_data_path.push("fleet.nix");336 let bytes = std::fs::read_to_string(fleet_data_path)?;337 let data = nixlike::parse_str(&bytes)?;338339 Ok(Config(Arc::new(FleetConfigInternals {340 opts: self,341 directory,342 data,343 local_system,344 nix_args,345 fleet_field,346 config_field,347 config_unchecked_field,348 })))349 }350}cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}