difftreelog
refactor perform build using nix repl
in: trunk
8 files changed
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,5 +1,7 @@
+use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
-use std::fmt::Display;
+use std::fmt::{self, Display};
+use std::path::PathBuf;
use std::process::Stdio;
use std::sync::{Arc, OnceLock};
@@ -8,7 +10,7 @@
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
use serde::de::DeserializeOwned;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
@@ -72,14 +74,20 @@
// s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)
// }
if !self.collected.is_empty() {
- bail!("{}", self.collected.iter().map(|v| {
- if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {
- let v = unindent::unindent(f.trim_start());
- v.trim().to_owned()
- } else {
- v.to_owned()
- }
- }).join("\n"));
+ bail!(
+ "{}",
+ self.collected
+ .iter()
+ .map(|v| {
+ if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {
+ let v = unindent::unindent(f.trim_start());
+ v.trim().to_owned()
+ } else {
+ v.to_owned()
+ }
+ })
+ .join("\n")
+ );
}
Ok(())
}
@@ -150,6 +158,13 @@
}
}
+struct WarnHandler;
+impl Handler for WarnHandler {
+ fn handle_line(&mut self, e: &str) {
+ warn!(target: "nix", "{e}")
+ }
+}
+
impl NixSessionInner {
async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {
let mut cmd = Command::new("nix");
@@ -174,12 +189,13 @@
stdin.flush().await?;
let nix_handler = NixHandler::default();
let mut full_delimiter = None;
+ let mut errors = vec![];
while let Some(line) = out.next().await {
let line = match line {
OutputLine::Out(o) => o,
OutputLine::Err(_e) => {
// Handle startup errors, but skip repl hello?
- //nix_handler.handle_line(&e);
+ errors.push(_e);
continue;
}
};
@@ -190,6 +206,9 @@
}
}
let Some(full_delimiter) = full_delimiter else {
+ for e in errors {
+ error!("{e}");
+ }
bail!("failed to discover delimiter");
};
let mut res = Self {
@@ -342,21 +361,93 @@
#[derive(Clone)]
pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);
+#[macro_export]
+macro_rules! nix_path {
+ (@o($o:ident) $var:ident $($tt:tt)*) => {{
+ $o.push(Index::var(stringify!($var)));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) . $var:ident $($tt:tt)*) => {{
+ $o.push(Index::attr(stringify!($var)));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) . $var:literal $($tt:tt)*) => {{
+ $o.push(Index::attr($var));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) . { $var:expr } $($tt:tt)*) => {{
+ $o.push(Index::attr($var));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) [ $var:literal ] $($tt:tt)*) => {{
+ $o.push(Index::idx($var));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) ($e:expr) $($tt:tt)*) => {
+ $o.push(Index::apply($e));
+ nix_path!(@o($o) $($tt)*);
+ };
+ (@o($o:ident)) => {};
+ ($($tt:tt)+) => {{
+ use $crate::{nix_path, better_nix_eval::Index};
+ let mut out = vec![];
+ nix_path!(@o(out) $($tt)*);
+ out
+ }}
+}
+
#[derive(Clone)]
-enum Index {
+pub enum Index {
+ Var(String),
String(String),
- // Idx(u32),
+ Apply(String),
+ Idx(u32),
}
+impl Index {
+ pub fn var(v: impl AsRef<str>) -> Self {
+ let v = v.as_ref();
+ assert!(
+ !(v.contains('.') | v.contains(' ')),
+ "bad variable name: {v}"
+ );
+ Self::Var(v.to_owned())
+ }
+ pub fn attr(v: impl AsRef<str>) -> Self {
+ Self::String(v.as_ref().to_owned())
+ }
+ pub fn idx(v: u32) -> Self {
+ Self::Idx(v)
+ }
+ pub fn apply(v: impl Serialize) -> Self {
+ let serialized = nixlike::serialize(v).expect("invalid value for apply");
+ Self::Apply(serialized)
+ }
+}
impl Display for Index {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
+ Index::Var(v) => {
+ write!(f, "{v}")
+ }
Index::String(k) => {
let v = nixlike::format_identifier(k.as_str());
write!(f, ".{v}")
}
+ Index::Apply(o) => {
+ let v = nixlike::serialize(o).map_err(|_| fmt::Error)?;
+ write!(f, "<apply>({v})")
+ }
+ Index::Idx(i) => {
+ write!(f, "[{i}]")
+ }
}
}
}
+impl fmt::Debug for Index {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{self}")
+ }
+}
struct PathDisplay<'i>(&'i [Index]);
impl Display for PathDisplay<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -381,43 +472,49 @@
}
}
pub async fn field(session: NixSession, field: &str) -> Result<Self> {
- Self::root(session).get_field_deep([field]).await
+ Self::root(session)
+ .select([Index::var(field)])
+ .await
}
pub async fn get_json_deep<'a, V: DeserializeOwned>(
&self,
- name: impl IntoIterator<Item = &'a str>,
+ name: impl IntoIterator<Item = Index>,
) -> Result<V> {
- let field = self.get_field_deep(name).await?;
+ let field = self.select(name).await?;
field.as_json().await
}
- pub async fn get_field(&self, name: &str) -> Result<Self> {
- self.get_field_deep([name]).await
- }
- pub async fn get_field_deep<'a>(
- &self,
- name: impl IntoIterator<Item = &'a str>,
- ) -> Result<Self> {
- let mut iter = name.into_iter();
+ pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {
+ let mut name = name.into_iter();
let mut full_path = self.full_path.clone();
let mut query = if let Some(id) = self.value {
format!("sess_field_{id}")
} else {
- let first = iter.next().expect("name not empty");
- ensure!(
- !(first.contains('.') | first.contains(' ')),
- "bad name for root query: {first}"
- );
- full_path.push(Index::String(first.to_string()));
- first.to_string()
+ let first = name.next();
+ if let Some(Index::Var(i)) = first {
+ full_path.push(Index::Var(i.clone()));
+ i.clone()
+ } else {
+ panic!("first path item should be variable, got {first:?}")
+ }
};
- for v in iter {
- full_path.push(Index::String(v.to_string()));
- // Escape
- let escaped = nixlike::serialize(v)?;
- let escaped = escaped.trim();
- query.push('.');
- query.push_str(escaped);
+ for v in name {
+ full_path.push(v.clone());
+ match v {
+ Index::Var(_) => panic!("var item may only be first"),
+ Index::String(s) => {
+ let escaped = nixlike::serialize(s)?;
+ query.push('.');
+ query.push_str(escaped.trim());
+ }
+ Index::Apply(a) => {
+ query.push(' ');
+ query.push_str(&a);
+ }
+ Index::Idx(idx) => {
+ query = format!("builtins.elemAt ({query}) {idx}");
+ }
+ }
}
let vid = self
@@ -454,6 +551,28 @@
.await
.with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))
}
+ pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {
+ let id = self.value.expect("can't use build on not-value");
+ let vid = self
+ .session
+ .0
+ .lock()
+ .await
+ .execute_expression_raw(&format!(":b sess_field_{id}"), &mut NixHandler::default())
+ .await?;
+ ensure!(!vid.is_empty(), "build failed");
+ let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")
+ else {
+ panic!("unexpected build output: {vid:?}");
+ };
+ let outputs = vid
+ .split('\n')
+ .filter(|v| !v.is_empty())
+ .map(|v| v.split_once(" -> ").expect("unexpected build output"))
+ .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))
+ .collect();
+ Ok(outputs)
+ }
}
impl Drop for Field {
fn drop(&mut self) {
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,8 +1,10 @@
+use std::os::unix::fs::symlink;
use std::path::PathBuf;
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
use crate::host::Config;
+use crate::nix_path;
use anyhow::{anyhow, Result};
use clap::Parser;
use itertools::Itertools;
@@ -11,15 +13,9 @@
#[derive(Parser, Clone)]
pub struct BuildSystems {
- /// Do not continue on error
- #[clap(long)]
- fail_fast: bool,
/// Disable automatic rollback
#[clap(long)]
disable_rollback: bool,
- /// Run builds as sudo
- #[clap(long)]
- privileged_build: bool,
#[clap(subcommand)]
subcommand: Subcommand,
}
@@ -294,34 +290,11 @@
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
let action = Action::from(self.subcommand.clone());
- let built = {
- let dir = tempfile::tempdir()?;
- dir.path().to_owned()
- };
-
- let mut nix_build = MyCommand::new("nix");
- nix_build
- .args([
- "build",
- "--impure",
- "--json",
- // "--show-trace",
- "--no-link",
- ])
- .comparg("--out-link", &built)
- .arg(
- config.configuration_attr_name(&format!(
- "buildSystems.{}.{host}",
- action.build_attr()
- )),
- )
- .args(&config.nix_args);
-
- if self.privileged_build {
- nix_build = nix_build.sudo();
- }
-
- nix_build.run_nix().await.map_err(|e| {
+ let drv = config
+ .fleet_field
+ .select(nix_path!(.buildSystems.{action.build_attr()}.{&host}))
+ .await?;
+ let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
info!("sd-image build failed");
info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");
@@ -329,7 +302,9 @@
}
e
})?;
- let built = std::fs::canonicalize(built)?;
+ let out_output = outputs
+ .get("out")
+ .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;
match action {
Action::Upload { action } => {
@@ -342,7 +317,7 @@
.arg("sign")
.comparg("--key-file", "/etc/nix/private-key")
.arg("-r")
- .arg(&built);
+ .arg(out_output);
if let Err(e) = sign.sudo().run_nix().await {
warn!("Failed to sign store paths: {e}");
};
@@ -353,7 +328,7 @@
nix.arg("copy")
.arg("--substitute-on-destination")
.comparg("--to", format!("ssh-ng://{host}"))
- .arg(&built);
+ .arg(out_output);
match nix.run_nix().await {
Ok(()) => break,
Err(e) if tries < 3 => {
@@ -366,53 +341,22 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, built).await?
+ execute_upload(&self, &config, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
out.push(format!("sd-image-{}", host));
- info!("building sd image to {:?}", out);
- let mut nix_build = MyCommand::new("nix");
- nix_build
- .args(["build", "--impure", "--no-link"])
- .comparg("--out-link", &out)
- .arg(config.configuration_attr_name(&format!("buildSystems.sdImage.{}", host,)))
- .args(&config.nix_args);
- if !self.fail_fast {
- nix_build.arg("--keep-going");
- }
- if self.privileged_build {
- nix_build = nix_build.sudo();
- }
-
- nix_build.run_nix().await?;
+ info!("linking sd image to {:?}", out);
+ symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
out.push(format!("installation-cd-{}", host));
- info!("building sd image to {:?}", out);
- let mut nix_build = MyCommand::new("nix");
- nix_build
- .args(["build", "--impure", "--no-link"])
- .comparg("--out-link", &out)
- .arg(
- config.configuration_attr_name(&format!(
- "buildSystems.installationCd.{}",
- host,
- )),
- )
- .args(&config.nix_args);
- if !self.fail_fast {
- nix_build.arg("--keep-going");
- }
- if self.privileged_build {
- nix_build = nix_build.sudo();
- }
-
- nix_build.run_nix().await?;
+ info!("linking iso image to {:?}", out);
+ symlink(out_output, out)?;
}
};
Ok(())
cmds/fleet/src/cmds/info.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/info.rs
+++ b/cmds/fleet/src/cmds/info.rs
@@ -1,6 +1,7 @@
use std::collections::BTreeSet;
use crate::host::Config;
+use crate::nix_path;
use anyhow::{ensure, Result};
use clap::Parser;
@@ -38,7 +39,7 @@
if !tagged.is_empty() {
let tags: Vec<String> = config
.fleet_field
- .get_field_deep(["configuredSystems", &host.name, "config", "tags"])
+ .select(nix_path!(.configuredSystems.{&host.name}.config.tags))
.await?
.as_json()
.await?;
@@ -64,7 +65,7 @@
let host = config.system_config(&host).await?;
if external {
out.extend(
- host.get_field_deep(["network", "externalIps"])
+ host.select(nix_path!(.network.externalIps))
.await?
.as_json::<Vec<String>>()
.await?,
@@ -72,7 +73,7 @@
}
if internal {
out.extend(
- host.get_field_deep(["network", "internalIps"])
+ host.select(nix_path!(.network.internalIps))
.await?
.as_json::<Vec<String>>()
.await?,
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,6 +1,6 @@
use crate::{
fleetdata::{FleetSecret, FleetSharedSecret},
- host::Config,
+ host::Config, nix_path,
};
use anyhow::{bail, ensure, Context, Result};
use chrono::Utc;
@@ -339,7 +339,7 @@
let mut data = config.shared_secret(name)?;
let expected_owners: Vec<String> = config
.config_field
- .get_json_deep(["sharedSecrets", name, "expectedOwners"])
+ .get_json_deep(nix_path!(sharedSecrets.{name}.expectedOwners))
.await?;
if expected_owners.is_empty() {
warn!("secret was removed from fleet config: {name}, removing from data");
@@ -352,7 +352,7 @@
if set != expected_set {
let owner_dependent: bool = config
.config_field
- .get_json_deep(["sharedSecrets", name, "ownerDependent"])
+ .get_json_deep(nix_path!(.sharedSecrets.{name}.ownerDependent))
.await?;
if !owner_dependent {
warn!("reencrypting secret '{name}' for new owner set");
cmds/fleet/src/command.rsdiffbeforeafterboth1use std::{2 borrow::Cow,3 collections::HashMap,4 ffi::OsStr,5 process::Stdio,6 sync::{Arc, Mutex},7 task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25 out.push_str(input);26 return;27 }28 out.push('\'');29 for (i, v) in input.split('\'').enumerate() {30 if i != 0 {31 out.push_str("'\"'\"'");32 }33 out.push_str(v);34 }35 out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38 os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42 command: String,43 args: Vec<String>,44 env: Vec<(String, String)>,45 ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48 pub fn new(cmd: impl AsRef<OsStr>) -> Self {49 assert!(!cmd.as_ref().is_empty());50 Self {51 command: ostoutf8(cmd),52 args: vec![],53 env: vec![],54 ssh_session: None,55 }56 }57 fn into_args(self) -> Vec<String> {58 let mut out = Vec::new();59 if !self.env.is_empty() {60 out.push("env".to_owned());61 for (k, v) in self.env {62 assert!(!k.contains('='));63 out.push(format!("{k}={v}"));64 }65 }66 out.push(self.command);67 out.extend(self.args);68 out69 }70 fn into_string(self) -> String {71 let mut out = String::new();72 if !self.env.is_empty() {73 out.push_str("env");74 for (k, v) in self.env {75 out.push(' ');76 assert!(!k.contains('='));77 escape_bash(&k, &mut out);78 out.push('=');79 escape_bash(&v, &mut out);80 }81 }82 if !out.is_empty() {83 out.push(' ');84 }85 escape_bash(&self.command, &mut out);86 for arg in self.args {87 out.push(' ');88 escape_bash(&arg, &mut out);89 }90 out91 }92 fn into_command(self) -> Command {93 let mut out = Command::new(self.command);94 out.args(self.args);95 for (k, v) in self.env {96 out.env(k, v);97 }98 out99 }100 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {101 Ok(if let Some(session) = self.ssh_session.clone() {102 let cmd = self.into_command();103 Either::Right(104 cmd.over_ssh(session)105 .map_err(|e| anyhow!("ssh error: {e}"))?,106 )107 } else {108 let cmd = self.into_command();109 Either::Left(cmd)110 })111 }112 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {113 let arg = arg.as_ref();114 self.args.push(ostoutf8(arg));115 self116 }117 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {118 let arg = arg.as_ref();119 let value = value.as_ref();120 let arg = ostoutf8(arg);121 let value = ostoutf8(value);122 self.arg(format!("{arg}={value}"));123 self124 }125 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {126 self.arg(arg);127 self.arg(value);128 self129 }130 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {131 for arg in args.into_iter() {132 let arg = arg.as_ref();133 self.args.push(ostoutf8(arg));134 }135 self136 }137 pub fn sudo(self) -> Self {138 if std::env::var_os("NO_SUDO").is_some() {139 let mut out = Self::new("su");140 out.arg("-c").arg(self.into_string());141 out142 } else {143 let mut out = Self::new("sudo");144 out.args(self.into_args());145 out146 }147 }148 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {149 let mut out = Self::new("ssh");150 out.arg(on).arg("--");151 out.arg(self.into_string());152 out153 }154 pub fn over_ssh(mut self, session: Arc<Session>) -> Self {155 self.ssh_session = Some(session);156 self157 }158159 pub async fn run(self) -> Result<()> {160 let str = self.clone().into_string();161 let cmd = self.into_command();162 run_nix_inner(str, cmd, &mut PlainHandler).await?;163 Ok(())164 }165 pub async fn run_string(self) -> Result<String> {166 let str = self.clone().into_string();167 let cmd = self.into_command();168 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;169 Ok(v)170 }171172 pub async fn run_nix_string(self) -> Result<String> {173 let str = self.clone().into_string();174 let mut cmd = self.into_command();175 cmd.arg("--log-format").arg("internal-json");176 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await177 }178 pub async fn run_nix(self) -> Result<()> {179 let str = self.clone().into_string();180 let mut cmd = self.into_command();181 cmd.arg("--log-format").arg("internal-json");182 cmd.stdout(Stdio::inherit());183 run_nix_inner(str, cmd, &mut NixHandler::default()).await184 }185}186187struct EmptyAsyncRead;188impl AsyncRead for EmptyAsyncRead {189 fn poll_read(190 self: std::pin::Pin<&mut Self>,191 _cx: &mut std::task::Context<'_>,192 _buf: &mut tokio::io::ReadBuf<'_>,193 ) -> Poll<std::io::Result<()>> {194 Poll::Pending195 }196}197198async fn run_nix_inner_stdout(199 str: String,200 cmd: Command,201 handler: &mut dyn Handler,202) -> Result<String> {203 Ok(run_nix_inner_raw(str, cmd, true, handler, None)204 .await?205 .expect("has out"))206}207async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {208 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;209 assert!(v.is_none());210 Ok(())211}212213pub trait Handler: Send {214 fn handle_line(&mut self, e: &str);215}216217pub struct ClonableHandler<H>(Arc<Mutex<H>>);218impl<H> Clone for ClonableHandler<H> {219 fn clone(&self) -> Self {220 Self(self.0.clone())221 }222}223impl<H> ClonableHandler<H> {224 pub fn new(inner: H) -> Self {225 Self(Arc::new(Mutex::new(inner)))226 }227}228impl<H: Handler> Handler for ClonableHandler<H> {229 fn handle_line(&mut self, e: &str) {230 self.0.lock().unwrap().handle_line(e)231 }232}233234struct PlainHandler;235impl Handler for PlainHandler {236 fn handle_line(&mut self, e: &str) {237 info!(target: "log", "{e}");238 }239}240241pub struct NoopHandler;242impl Handler for NoopHandler {243 fn handle_line(&mut self, _e: &str) {}244}245246#[derive(Default)]247pub struct NixHandler {248 spans: HashMap<u64, Span>,249}250fn process_message(m: &str) -> Cow<'_, str> {251 static OSC_CLEANER: Lazy<Regex> =252 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());253 OSC_CLEANER.replace_all(m, "")254}255impl Handler for NixHandler {256 fn handle_line(&mut self, e: &str) {257 if let Some(e) = e.strip_prefix("@nix ") {258 let log: NixLog = match serde_json::from_str(e) {259 Ok(l) => l,260 Err(err) => {261 warn!("failed to parse nix log line {:?}: {}", e, err);262 return;263 }264 };265 match log {266 NixLog::Msg { msg, raw_msg, .. } => {267 #[allow(clippy::nonminimal_bool)]268 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))269 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")270 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {271 if let Some(raw_msg) = raw_msg {272 if !msg.is_empty() {273 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())274 } else {275 info!(target: "nix", "{}", raw_msg.trim_end())276 }277 } else {278 info!(target: "nix", "{}", msg.trim_end())279 }280 }281 }282 NixLog::Start {283 ref fields,284 typ,285 id,286 ..287 } if typ == 105 && !fields.is_empty() => {288 if let [LogField::String(drv), ..] = &fields[..] {289 let mut drv = drv.as_str();290 if let Some(pkg) = drv.strip_prefix("/nix/store/") {291 let mut it = pkg.splitn(2, '-');292 it.next();293 if let Some(pkg) = it.next() {294 drv = pkg;295 }296 }297 info!(target: "nix","building {}", drv);298 let span = info_span!("build", drv);299 span.pb_start();300 self.spans.insert(id, span);301 } else {302 warn!("bad build log: {:?}", log)303 }304 }305 NixLog::Start {306 ref fields,307 typ,308 id,309 ..310 } if typ == 100 && fields.len() >= 3 => {311 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =312 &fields[..]313 {314 let mut drv = drv.as_str();315316 if let Some(pkg) = drv.strip_prefix("/nix/store/") {317 let mut it = pkg.splitn(2, '-');318 it.next();319 if let Some(pkg) = it.next() {320 drv = pkg;321 }322 }323 // info!(target: "nix","copying {} {} -> {}", drv, from, to);324 let span = info_span!("copy", from, to, drv);325 span.pb_start();326 self.spans.insert(id, span);327 } else {328 warn!("bad copy log: {:?}", log)329 }330 }331 NixLog::Start { text, typ, id, .. }332 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>333 {334 if !text.is_empty()335 && text != "querying info about missing paths"336 && text != "copying 0 paths"337 {338 let span = info_span!("job");339 span.pb_start();340 span.pb_set_message(&process_message(text.trim()));341 self.spans.insert(id, span);342 info!(target: "nix", "{}", text);343 }344 }345 NixLog::Start {346 text,347 level: 0,348 typ: 108,349 ..350 } if text.is_empty() => {351 // Cache lookup? Coupled with copy log352 }353 NixLog::Start {354 text,355 level: 4,356 typ: 109,357 ..358 } if text.starts_with("querying info about ") => {359 // Cache lookup360 }361 NixLog::Start {362 text,363 level: 4,364 typ: 101,365 ..366 } if text.starts_with("downloading ") => {367 // NAR downloading, coupled with copy log368 }369 NixLog::Start {370 text,371 level: 1,372 typ: 111,373 ..374 } if text.starts_with("waiting for a machine to build ") => {375 // Useless repeating notification about build376 }377 NixLog::Start {378 text,379 level: 3,380 typ: 111,381 ..382 } if text.starts_with("resolved derivation: ") => {383 // CA resolved384 }385 NixLog::Start {386 text,387 level: 1,388 typ: 111,389 id,390 ..391 } if text.starts_with("waiting for lock on ") => {392 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();393 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {394 drv = txt;395 }396 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {397 drv = txt;398 }399 if let Some(txt) = drv.split("', '").next() {400 drv = txt;401 }402 if let Some(pkg) = drv.strip_prefix("/nix/store/") {403 let mut it = pkg.splitn(2, '-');404 it.next();405 if let Some(pkg) = it.next() {406 drv = pkg;407 }408 }409 let span = info_span!("waiting on drv", drv);410 span.pb_start();411 self.spans.insert(id, span);412 // Concurrent build of the same message413 }414 NixLog::Stop { id, .. } => {415 self.spans.remove(&id);416 }417 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {418 if let Some(span) = self.spans.get(&id) {419 if let LogField::String(s) = &fields[0] {420 span.pb_set_message(&process_message(s.trim()));421 } else {422 warn!("bad fields: {fields:?}");423 }424 } else {425 warn!("unknown result id: {id} {typ} {fields:?}");426 }427 // dbg!(fields, id, typ);428 }429 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {430 if let Some(span) = self.spans.get(&id) {431 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =432 &fields[..4]433 {434 span.pb_set_length(*expected);435 span.pb_set_position(*done);436 } else {437 warn!("bad fields: {fields:?}");438 }439 } else {440 // warn!("unknown result id: {id} {typ} {fields:?}");441 // Unaccounted progress.442 }443 // dbg!(fields, id, typ);444 }445 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {446 // Set phase, expected447 }448 _ => warn!("unknown log: {:?}", log),449 };450 } else {451 let e = e.trim();452 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {453 return;454 }455 info!("{e}")456 }457 }458}459460async fn run_nix_inner_raw(461 str: String,462 mut cmd: Command,463 want_stdout: bool,464 err_handler: &mut dyn Handler,465 mut out_handler: Option<&mut dyn Handler>,466) -> Result<Option<String>> {467 cmd.stderr(Stdio::piped());468 cmd.stdout(Stdio::piped());469 let mut child = cmd.spawn()?;470 let mut stderr = child.stderr.take().unwrap();471 let stdout = child.stdout.take().unwrap();472 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());473 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));474 let mut ob = want_stdout475 .then(|| out.take().unwrap())476 .unwrap_or_else(|| Box::new(EmptyAsyncRead));477 let mut ol = (!want_stdout)478 .then(|| out.take().unwrap())479 .unwrap_or_else(|| Box::new(EmptyAsyncRead));480 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());481 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());482483 // while let Some(line) = read.next().await? {}484485 let mut out_buf = if want_stdout { Some(vec![]) } else { None };486 loop {487 select! {488 e = err.next() => {489 if let Some(e) = e {490 let e = e?;491 err_handler.handle_line(&e);492 }493 },494 o = ob.next() => {495 if let Some(o) = o {496 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);497 }498 },499 o = ol.next() => {500 if let Some(o) = o {501 let o = o?;502 if let Some(out) = out_handler.as_mut() {503 out.handle_line(&o)504 } else {505 err_handler.handle_line(&o)506 }507 // out_handler.handle_info(&o);508 }509 },510 code = child.wait() => {511 let code = code?;512 if !code.success() {513 anyhow::bail!("command '{str}' failed with status {}", code);514 }515 break;516 }517 }518 }519520 Ok(out_buf.map(String::from_utf8).transpose()?)521}522523pub trait ErrorRecorder: Send {524 /// Return true to discard message from logging525 fn push_message(&mut self, msg: &str) -> bool;526}527528#[derive(Debug)]529enum LogField {530 String(String),531 Num(u64),532}533534impl<'de> Deserialize<'de> for LogField {535 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>536 where537 D: serde::Deserializer<'de>,538 {539 struct StringOrNum;540 impl<'de> Visitor<'de> for StringOrNum {541 type Value = LogField;542543 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {544 write!(f, "string or unsigned")545 }546547 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>548 where549 E: serde::de::Error,550 {551 Ok(LogField::String(v.to_owned()))552 }553554 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>555 where556 E: serde::de::Error,557 {558 Ok(LogField::Num(v))559 }560 }561562 deserializer.deserialize_any(StringOrNum)563 }564}565566#[derive(Deserialize, Debug)]567#[serde(rename_all = "camelCase", tag = "action")]568#[allow(dead_code)]569enum NixLog {570 Msg {571 level: u32,572 msg: String,573 raw_msg: Option<String>,574 },575 Start {576 id: u64,577 level: u32,578 #[serde(default)]579 fields: Vec<LogField>,580 text: String,581 #[serde(rename = "type")]582 typ: u32,583 },584 Stop {585 id: u64,586 },587 Result {588 id: u64,589 #[serde(rename = "type")]590 typ: u32,591 #[serde(default)]592 fields: Vec<LogField>,593 },594}cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -13,9 +13,10 @@
use tempfile::NamedTempFile;
use crate::{
- better_nix_eval::{Field, NixSessionPool},
+ better_nix_eval::{Field, Index, NixSessionPool},
command::MyCommand,
fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
+ nix_path,
};
pub struct FleetConfigInternals {
@@ -24,9 +25,9 @@
pub opts: FleetOpts,
pub data: Mutex<FleetData>,
pub nix_args: Vec<OsString>,
- // fleetConfigurations.<name>
+ /// fleetConfigurations.<name>.<localSystem>
pub fleet_field: Field,
- // fleet_config.configUnchecked
+ /// fleet_config.configUnchecked
pub config_field: Field,
}
@@ -91,22 +92,12 @@
command = command.ssh(host);
}
command.run_string().await
- }
-
- pub fn configuration_attr_name(&self, name: &str) -> OsString {
- let mut str = self.directory.as_os_str().to_owned();
- str.push("#");
- str.push(&format!(
- "fleetConfigurations.default.{}.{}",
- self.local_system, name
- ));
- str
}
pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
let names = self
.fleet_field
- .get_field_deep(["configuredHosts"])
+ .select(nix_path!(.configuredHosts))
.await?
.list_fields()
.await?;
@@ -118,7 +109,7 @@
}
pub async fn system_config(&self, host: &str) -> Result<Field> {
self.fleet_field
- .get_field_deep(["configuredSystems", host, "config"])
+ .select(nix_path!(.configuredSystems.{host}.config))
.await
}
@@ -131,7 +122,7 @@
/// Shared secrets configured in fleet.nix or in flake
pub async fn list_configured_shared(&self) -> Result<Vec<String>> {
self.config_field
- .get_field("sharedSecrets")
+ .select(nix_path!(.sharedSecrets))
.await?
.list_fields()
.await
@@ -221,7 +212,7 @@
}
pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {
self.config_field
- .get_field_deep(["sharedSecrets", secret, "expectedOwners"])
+ .select(nix_path!(.sharedSecrets.{secret}.expectedOwners))
.await?
.as_json()
.await
@@ -279,7 +270,9 @@
if self.local_system == "detect" {
let builtins_field = Field::field(root_field.clone(), "builtins").await?;
- let system = builtins_field.get_field("currentSystem").await?;
+ let system = builtins_field
+ .select(nix_path!(.currentSystem))
+ .await?;
self.local_system = system.as_json().await?;
}
let local_system = self.local_system.clone();
@@ -287,9 +280,11 @@
let fleet_root = Field::field(root_field, "fleetConfigurations").await?;
let fleet_field = fleet_root
- .get_field_deep(["default", &local_system])
+ .select(nix_path!(.default.{&local_system}))
+ .await?;
+ let config_field = fleet_field
+ .select(nix_path!(.configUnchecked))
.await?;
- let config_field = fleet_field.get_field("configUnchecked").await?;
let mut fleet_data_path = directory.clone();
fleet_data_path.push("fleet.nix");
cmds/fleet/src/main.rsdiffbeforeafterboth--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -1,3 +1,4 @@
+#![recursion_limit = "512"]
#![feature(try_blocks)]
pub(crate) mod cmds;
flake.nixdiffbeforeafterboth--- a/flake.nix
+++ b/flake.nix
@@ -19,6 +19,7 @@
rustPlatform = pkgs.makeRustPlatform { cargo = rust; rustc = rust; };
in
{
+ packages = (import ./pkgs) pkgs pkgs;
devShell = (pkgs.mkShell.override { stdenv = llvmPkgs.stdenv; }) {
nativeBuildInputs = with pkgs; [
rust