difftreelog
refactor perform build using nix repl
in: trunk
8 files changed
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,5 +1,7 @@
+use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
-use std::fmt::Display;
+use std::fmt::{self, Display};
+use std::path::PathBuf;
use std::process::Stdio;
use std::sync::{Arc, OnceLock};
@@ -8,7 +10,7 @@
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
use serde::de::DeserializeOwned;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
@@ -72,14 +74,20 @@
// s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)
// }
if !self.collected.is_empty() {
- bail!("{}", self.collected.iter().map(|v| {
- if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {
- let v = unindent::unindent(f.trim_start());
- v.trim().to_owned()
- } else {
- v.to_owned()
- }
- }).join("\n"));
+ bail!(
+ "{}",
+ self.collected
+ .iter()
+ .map(|v| {
+ if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {
+ let v = unindent::unindent(f.trim_start());
+ v.trim().to_owned()
+ } else {
+ v.to_owned()
+ }
+ })
+ .join("\n")
+ );
}
Ok(())
}
@@ -150,6 +158,13 @@
}
}
+struct WarnHandler;
+impl Handler for WarnHandler {
+ fn handle_line(&mut self, e: &str) {
+ warn!(target: "nix", "{e}")
+ }
+}
+
impl NixSessionInner {
async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {
let mut cmd = Command::new("nix");
@@ -174,12 +189,13 @@
stdin.flush().await?;
let nix_handler = NixHandler::default();
let mut full_delimiter = None;
+ let mut errors = vec![];
while let Some(line) = out.next().await {
let line = match line {
OutputLine::Out(o) => o,
OutputLine::Err(_e) => {
// Handle startup errors, but skip repl hello?
- //nix_handler.handle_line(&e);
+ errors.push(_e);
continue;
}
};
@@ -190,6 +206,9 @@
}
}
let Some(full_delimiter) = full_delimiter else {
+ for e in errors {
+ error!("{e}");
+ }
bail!("failed to discover delimiter");
};
let mut res = Self {
@@ -342,21 +361,93 @@
#[derive(Clone)]
pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);
+#[macro_export]
+macro_rules! nix_path {
+ (@o($o:ident) $var:ident $($tt:tt)*) => {{
+ $o.push(Index::var(stringify!($var)));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) . $var:ident $($tt:tt)*) => {{
+ $o.push(Index::attr(stringify!($var)));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) . $var:literal $($tt:tt)*) => {{
+ $o.push(Index::attr($var));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) . { $var:expr } $($tt:tt)*) => {{
+ $o.push(Index::attr($var));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) [ $var:literal ] $($tt:tt)*) => {{
+ $o.push(Index::idx($var));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) ($e:expr) $($tt:tt)*) => {
+ $o.push(Index::apply($e));
+ nix_path!(@o($o) $($tt)*);
+ };
+ (@o($o:ident)) => {};
+ ($($tt:tt)+) => {{
+ use $crate::{nix_path, better_nix_eval::Index};
+ let mut out = vec![];
+ nix_path!(@o(out) $($tt)*);
+ out
+ }}
+}
+
#[derive(Clone)]
-enum Index {
+pub enum Index {
+ Var(String),
String(String),
- // Idx(u32),
+ Apply(String),
+ Idx(u32),
}
+impl Index {
+ pub fn var(v: impl AsRef<str>) -> Self {
+ let v = v.as_ref();
+ assert!(
+ !(v.contains('.') | v.contains(' ')),
+ "bad variable name: {v}"
+ );
+ Self::Var(v.to_owned())
+ }
+ pub fn attr(v: impl AsRef<str>) -> Self {
+ Self::String(v.as_ref().to_owned())
+ }
+ pub fn idx(v: u32) -> Self {
+ Self::Idx(v)
+ }
+ pub fn apply(v: impl Serialize) -> Self {
+ let serialized = nixlike::serialize(v).expect("invalid value for apply");
+ Self::Apply(serialized)
+ }
+}
impl Display for Index {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
+ Index::Var(v) => {
+ write!(f, "{v}")
+ }
Index::String(k) => {
let v = nixlike::format_identifier(k.as_str());
write!(f, ".{v}")
}
+ Index::Apply(o) => {
+ let v = nixlike::serialize(o).map_err(|_| fmt::Error)?;
+ write!(f, "<apply>({v})")
+ }
+ Index::Idx(i) => {
+ write!(f, "[{i}]")
+ }
}
}
}
+impl fmt::Debug for Index {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{self}")
+ }
+}
struct PathDisplay<'i>(&'i [Index]);
impl Display for PathDisplay<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -381,43 +472,49 @@
}
}
pub async fn field(session: NixSession, field: &str) -> Result<Self> {
- Self::root(session).get_field_deep([field]).await
+ Self::root(session)
+ .select([Index::var(field)])
+ .await
}
pub async fn get_json_deep<'a, V: DeserializeOwned>(
&self,
- name: impl IntoIterator<Item = &'a str>,
+ name: impl IntoIterator<Item = Index>,
) -> Result<V> {
- let field = self.get_field_deep(name).await?;
+ let field = self.select(name).await?;
field.as_json().await
}
- pub async fn get_field(&self, name: &str) -> Result<Self> {
- self.get_field_deep([name]).await
- }
- pub async fn get_field_deep<'a>(
- &self,
- name: impl IntoIterator<Item = &'a str>,
- ) -> Result<Self> {
- let mut iter = name.into_iter();
+ pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {
+ let mut name = name.into_iter();
let mut full_path = self.full_path.clone();
let mut query = if let Some(id) = self.value {
format!("sess_field_{id}")
} else {
- let first = iter.next().expect("name not empty");
- ensure!(
- !(first.contains('.') | first.contains(' ')),
- "bad name for root query: {first}"
- );
- full_path.push(Index::String(first.to_string()));
- first.to_string()
+ let first = name.next();
+ if let Some(Index::Var(i)) = first {
+ full_path.push(Index::Var(i.clone()));
+ i.clone()
+ } else {
+ panic!("first path item should be variable, got {first:?}")
+ }
};
- for v in iter {
- full_path.push(Index::String(v.to_string()));
- // Escape
- let escaped = nixlike::serialize(v)?;
- let escaped = escaped.trim();
- query.push('.');
- query.push_str(escaped);
+ for v in name {
+ full_path.push(v.clone());
+ match v {
+ Index::Var(_) => panic!("var item may only be first"),
+ Index::String(s) => {
+ let escaped = nixlike::serialize(s)?;
+ query.push('.');
+ query.push_str(escaped.trim());
+ }
+ Index::Apply(a) => {
+ query.push(' ');
+ query.push_str(&a);
+ }
+ Index::Idx(idx) => {
+ query = format!("builtins.elemAt ({query}) {idx}");
+ }
+ }
}
let vid = self
@@ -454,6 +551,28 @@
.await
.with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))
}
+ pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {
+ let id = self.value.expect("can't use build on not-value");
+ let vid = self
+ .session
+ .0
+ .lock()
+ .await
+ .execute_expression_raw(&format!(":b sess_field_{id}"), &mut NixHandler::default())
+ .await?;
+ ensure!(!vid.is_empty(), "build failed");
+ let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")
+ else {
+ panic!("unexpected build output: {vid:?}");
+ };
+ let outputs = vid
+ .split('\n')
+ .filter(|v| !v.is_empty())
+ .map(|v| v.split_once(" -> ").expect("unexpected build output"))
+ .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))
+ .collect();
+ Ok(outputs)
+ }
}
impl Drop for Field {
fn drop(&mut self) {
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,8 +1,10 @@
+use std::os::unix::fs::symlink;
use std::path::PathBuf;
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
use crate::host::Config;
+use crate::nix_path;
use anyhow::{anyhow, Result};
use clap::Parser;
use itertools::Itertools;
@@ -11,15 +13,9 @@
#[derive(Parser, Clone)]
pub struct BuildSystems {
- /// Do not continue on error
- #[clap(long)]
- fail_fast: bool,
/// Disable automatic rollback
#[clap(long)]
disable_rollback: bool,
- /// Run builds as sudo
- #[clap(long)]
- privileged_build: bool,
#[clap(subcommand)]
subcommand: Subcommand,
}
@@ -294,34 +290,11 @@
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
let action = Action::from(self.subcommand.clone());
- let built = {
- let dir = tempfile::tempdir()?;
- dir.path().to_owned()
- };
-
- let mut nix_build = MyCommand::new("nix");
- nix_build
- .args([
- "build",
- "--impure",
- "--json",
- // "--show-trace",
- "--no-link",
- ])
- .comparg("--out-link", &built)
- .arg(
- config.configuration_attr_name(&format!(
- "buildSystems.{}.{host}",
- action.build_attr()
- )),
- )
- .args(&config.nix_args);
-
- if self.privileged_build {
- nix_build = nix_build.sudo();
- }
-
- nix_build.run_nix().await.map_err(|e| {
+ let drv = config
+ .fleet_field
+ .select(nix_path!(.buildSystems.{action.build_attr()}.{&host}))
+ .await?;
+ let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
info!("sd-image build failed");
info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");
@@ -329,7 +302,9 @@
}
e
})?;
- let built = std::fs::canonicalize(built)?;
+ let out_output = outputs
+ .get("out")
+ .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;
match action {
Action::Upload { action } => {
@@ -342,7 +317,7 @@
.arg("sign")
.comparg("--key-file", "/etc/nix/private-key")
.arg("-r")
- .arg(&built);
+ .arg(out_output);
if let Err(e) = sign.sudo().run_nix().await {
warn!("Failed to sign store paths: {e}");
};
@@ -353,7 +328,7 @@
nix.arg("copy")
.arg("--substitute-on-destination")
.comparg("--to", format!("ssh-ng://{host}"))
- .arg(&built);
+ .arg(out_output);
match nix.run_nix().await {
Ok(()) => break,
Err(e) if tries < 3 => {
@@ -366,53 +341,22 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, built).await?
+ execute_upload(&self, &config, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
out.push(format!("sd-image-{}", host));
- info!("building sd image to {:?}", out);
- let mut nix_build = MyCommand::new("nix");
- nix_build
- .args(["build", "--impure", "--no-link"])
- .comparg("--out-link", &out)
- .arg(config.configuration_attr_name(&format!("buildSystems.sdImage.{}", host,)))
- .args(&config.nix_args);
- if !self.fail_fast {
- nix_build.arg("--keep-going");
- }
- if self.privileged_build {
- nix_build = nix_build.sudo();
- }
-
- nix_build.run_nix().await?;
+ info!("linking sd image to {:?}", out);
+ symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
out.push(format!("installation-cd-{}", host));
- info!("building sd image to {:?}", out);
- let mut nix_build = MyCommand::new("nix");
- nix_build
- .args(["build", "--impure", "--no-link"])
- .comparg("--out-link", &out)
- .arg(
- config.configuration_attr_name(&format!(
- "buildSystems.installationCd.{}",
- host,
- )),
- )
- .args(&config.nix_args);
- if !self.fail_fast {
- nix_build.arg("--keep-going");
- }
- if self.privileged_build {
- nix_build = nix_build.sudo();
- }
-
- nix_build.run_nix().await?;
+ info!("linking iso image to {:?}", out);
+ symlink(out_output, out)?;
}
};
Ok(())
cmds/fleet/src/cmds/info.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/info.rs
+++ b/cmds/fleet/src/cmds/info.rs
@@ -1,6 +1,7 @@
use std::collections::BTreeSet;
use crate::host::Config;
+use crate::nix_path;
use anyhow::{ensure, Result};
use clap::Parser;
@@ -38,7 +39,7 @@
if !tagged.is_empty() {
let tags: Vec<String> = config
.fleet_field
- .get_field_deep(["configuredSystems", &host.name, "config", "tags"])
+ .select(nix_path!(.configuredSystems.{&host.name}.config.tags))
.await?
.as_json()
.await?;
@@ -64,7 +65,7 @@
let host = config.system_config(&host).await?;
if external {
out.extend(
- host.get_field_deep(["network", "externalIps"])
+ host.select(nix_path!(.network.externalIps))
.await?
.as_json::<Vec<String>>()
.await?,
@@ -72,7 +73,7 @@
}
if internal {
out.extend(
- host.get_field_deep(["network", "internalIps"])
+ host.select(nix_path!(.network.internalIps))
.await?
.as_json::<Vec<String>>()
.await?,
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,6 +1,6 @@
use crate::{
fleetdata::{FleetSecret, FleetSharedSecret},
- host::Config,
+ host::Config, nix_path,
};
use anyhow::{bail, ensure, Context, Result};
use chrono::Utc;
@@ -339,7 +339,7 @@
let mut data = config.shared_secret(name)?;
let expected_owners: Vec<String> = config
.config_field
- .get_json_deep(["sharedSecrets", name, "expectedOwners"])
+ .get_json_deep(nix_path!(sharedSecrets.{name}.expectedOwners))
.await?;
if expected_owners.is_empty() {
warn!("secret was removed from fleet config: {name}, removing from data");
@@ -352,7 +352,7 @@
if set != expected_set {
let owner_dependent: bool = config
.config_field
- .get_json_deep(["sharedSecrets", name, "ownerDependent"])
+ .get_json_deep(nix_path!(.sharedSecrets.{name}.ownerDependent))
.await?;
if !owner_dependent {
warn!("reencrypting secret '{name}' for new owner set");
cmds/fleet/src/command.rsdiffbeforeafterboth1use std::{2 borrow::Cow,3 collections::HashMap,4 ffi::OsStr,5 process::Stdio,6 sync::{Arc, Mutex},7 task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25 out.push_str(input);26 return;27 }28 out.push('\'');29 for (i, v) in input.split('\'').enumerate() {30 if i != 0 {31 out.push_str("'\"'\"'");32 }33 out.push_str(v);34 }35 out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38 os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42 command: String,43 args: Vec<String>,44 env: Vec<(String, String)>,45 ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48 pub fn new(cmd: impl AsRef<OsStr>) -> Self {49 assert!(!cmd.as_ref().is_empty());50 Self {51 command: ostoutf8(cmd),52 args: vec![],53 env: vec![],54 ssh_session: None,55 }56 }57 fn into_args(self) -> Vec<String> {58 let mut out = Vec::new();59 if !self.env.is_empty() {60 out.push("env".to_owned());61 for (k, v) in self.env {62 assert!(!k.contains('='));63 out.push(format!("{k}={v}"));64 }65 }66 out.push(self.command);67 out.extend(self.args);68 out69 }70 fn into_string(self) -> String {71 let mut out = String::new();72 if !self.env.is_empty() {73 out.push_str("env");74 for (k, v) in self.env {75 out.push(' ');76 assert!(!k.contains('='));77 escape_bash(&k, &mut out);78 out.push('=');79 escape_bash(&v, &mut out);80 }81 }82 if !out.is_empty() {83 out.push(' ');84 }85 escape_bash(&self.command, &mut out);86 for arg in self.args {87 out.push(' ');88 escape_bash(&arg, &mut out);89 }90 out91 }92 fn into_command(self) -> Command {93 let mut out = Command::new(self.command);94 out.args(self.args);95 for (k, v) in self.env {96 out.env(k, v);97 }98 out99 }100 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {101 Ok(if let Some(session) = self.ssh_session.clone() {102 let cmd = self.into_command();103 Either::Right(104 cmd.over_ssh(session)105 .map_err(|e| anyhow!("ssh error: {e}"))?,106 )107 } else {108 let cmd = self.into_command();109 Either::Left(cmd)110 })111 }112 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {113 let arg = arg.as_ref();114 self.args.push(ostoutf8(arg));115 self116 }117 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {118 let arg = arg.as_ref();119 let value = value.as_ref();120 let arg = ostoutf8(arg);121 let value = ostoutf8(value);122 self.arg(format!("{arg}={value}"));123 self124 }125 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {126 self.arg(arg);127 self.arg(value);128 self129 }130 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {131 for arg in args.into_iter() {132 let arg = arg.as_ref();133 self.args.push(ostoutf8(arg));134 }135 self136 }137 pub fn sudo(self) -> Self {138 if std::env::var_os("NO_SUDO").is_some() {139 let mut out = Self::new("su");140 out.arg("-c").arg(self.into_string());141 out142 } else {143 let mut out = Self::new("sudo");144 out.args(self.into_args());145 out146 }147 }148 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {149 let mut out = Self::new("ssh");150 out.arg(on).arg("--");151 out.arg(self.into_string());152 out153 }154 pub fn over_ssh(mut self, session: Arc<Session>) -> Self {155 self.ssh_session = Some(session);156 self157 }158159 pub async fn run(self) -> Result<()> {160 let str = self.clone().into_string();161 let cmd = self.into_command();162 run_nix_inner(str, cmd, &mut PlainHandler).await?;163 Ok(())164 }165 pub async fn run_string(self) -> Result<String> {166 let str = self.clone().into_string();167 let cmd = self.into_command();168 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;169 Ok(v)170 }171172 pub async fn run_nix_string(self) -> Result<String> {173 let str = self.clone().into_string();174 let mut cmd = self.into_command();175 cmd.arg("--log-format").arg("internal-json");176 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await177 }178 pub async fn run_nix(self) -> Result<()> {179 let str = self.clone().into_string();180 let mut cmd = self.into_command();181 cmd.arg("--log-format").arg("internal-json");182 cmd.stdout(Stdio::inherit());183 run_nix_inner(str, cmd, &mut NixHandler::default()).await184 }185}186187struct EmptyAsyncRead;188impl AsyncRead for EmptyAsyncRead {189 fn poll_read(190 self: std::pin::Pin<&mut Self>,191 _cx: &mut std::task::Context<'_>,192 _buf: &mut tokio::io::ReadBuf<'_>,193 ) -> Poll<std::io::Result<()>> {194 Poll::Pending195 }196}197198async fn run_nix_inner_stdout(199 str: String,200 cmd: Command,201 handler: &mut dyn Handler,202) -> Result<String> {203 Ok(run_nix_inner_raw(str, cmd, true, handler, None)204 .await?205 .expect("has out"))206}207async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {208 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;209 assert!(v.is_none());210 Ok(())211}212213pub trait Handler: Send {214 fn handle_line(&mut self, e: &str);215}216217pub struct ClonableHandler<H>(Arc<Mutex<H>>);218impl<H> Clone for ClonableHandler<H> {219 fn clone(&self) -> Self {220 Self(self.0.clone())221 }222}223impl<H> ClonableHandler<H> {224 pub fn new(inner: H) -> Self {225 Self(Arc::new(Mutex::new(inner)))226 }227}228impl<H: Handler> Handler for ClonableHandler<H> {229 fn handle_line(&mut self, e: &str) {230 self.0.lock().unwrap().handle_line(e)231 }232}233234struct PlainHandler;235impl Handler for PlainHandler {236 fn handle_line(&mut self, e: &str) {237 info!(target: "log", "{e}");238 }239}240241pub struct NoopHandler;242impl Handler for NoopHandler {243 fn handle_line(&mut self, _e: &str) {}244}245246#[derive(Default)]247pub struct NixHandler {248 spans: HashMap<u64, Span>,249}250fn process_message(m: &str) -> Cow<'_, str> {251 static OSC_CLEANER: Lazy<Regex> =252 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());253 OSC_CLEANER.replace_all(m, "")254}255impl Handler for NixHandler {256 fn handle_line(&mut self, e: &str) {257 if let Some(e) = e.strip_prefix("@nix ") {258 let log: NixLog = match serde_json::from_str(e) {259 Ok(l) => l,260 Err(err) => {261 warn!("failed to parse nix log line {:?}: {}", e, err);262 return;263 }264 };265 match log {266 NixLog::Msg { msg, raw_msg, .. } => {267 #[allow(clippy::nonminimal_bool)]268 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))269 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")270 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {271 if let Some(raw_msg) = raw_msg {272 if !msg.is_empty() {273 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())274 } else {275 info!(target: "nix", "{}", raw_msg.trim_end())276 }277 } else {278 info!(target: "nix", "{}", msg.trim_end())279 }280 }281 }282 NixLog::Start {283 ref fields,284 typ,285 id,286 ..287 } if typ == 105 && !fields.is_empty() => {288 if let [LogField::String(drv), ..] = &fields[..] {289 let mut drv = drv.as_str();290 if let Some(pkg) = drv.strip_prefix("/nix/store/") {291 let mut it = pkg.splitn(2, '-');292 it.next();293 if let Some(pkg) = it.next() {294 drv = pkg;295 }296 }297 info!(target: "nix","building {}", drv);298 let span = info_span!("build", drv);299 span.pb_start();300 self.spans.insert(id, span);301 } else {302 warn!("bad build log: {:?}", log)303 }304 }305 NixLog::Start {306 ref fields,307 typ,308 id,309 ..310 } if typ == 100 && fields.len() >= 3 => {311 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =312 &fields[..]313 {314 let mut drv = drv.as_str();315316 if let Some(pkg) = drv.strip_prefix("/nix/store/") {317 let mut it = pkg.splitn(2, '-');318 it.next();319 if let Some(pkg) = it.next() {320 drv = pkg;321 }322 }323 // info!(target: "nix","copying {} {} -> {}", drv, from, to);324 let span = info_span!("copy", from, to, drv);325 span.pb_start();326 self.spans.insert(id, span);327 } else {328 warn!("bad copy log: {:?}", log)329 }330 }331 NixLog::Start { text, typ, id, .. }332 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>333 {334 if !text.is_empty()335 && text != "querying info about missing paths"336 && text != "copying 0 paths"337 {338 let span = info_span!("job");339 span.pb_start();340 span.pb_set_message(&process_message(text.trim()));341 self.spans.insert(id, span);342 info!(target: "nix", "{}", text);343 }344 }345 NixLog::Start {346 text,347 level: 0,348 typ: 108,349 ..350 } if text.is_empty() => {351 // Cache lookup? Coupled with copy log352 }353 NixLog::Start {354 text,355 level: 4,356 typ: 109,357 ..358 } if text.starts_with("querying info about ") => {359 // Cache lookup360 }361 NixLog::Start {362 text,363 level: 4,364 typ: 101,365 ..366 } if text.starts_with("downloading ") => {367 // NAR downloading, coupled with copy log368 }369 NixLog::Start {370 text,371 level: 1,372 typ: 111,373 ..374 } if text.starts_with("waiting for a machine to build ") => {375 // Useless repeating notification about build376 }377 NixLog::Start {378 text,379 level: 3,380 typ: 111,381 ..382 } if text.starts_with("resolved derivation: ") => {383 // CA resolved384 }385 NixLog::Start {386 text,387 level: 1,388 typ: 111,389 id,390 ..391 } if text.starts_with("waiting for lock on ") => {392 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();393 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {394 drv = txt;395 }396 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {397 drv = txt;398 }399 if let Some(txt) = drv.split("', '").next() {400 drv = txt;401 }402 if let Some(pkg) = drv.strip_prefix("/nix/store/") {403 let mut it = pkg.splitn(2, '-');404 it.next();405 if let Some(pkg) = it.next() {406 drv = pkg;407 }408 }409 let span = info_span!("waiting on drv", drv);410 span.pb_start();411 self.spans.insert(id, span);412 // Concurrent build of the same message413 }414 NixLog::Stop { id, .. } => {415 self.spans.remove(&id);416 }417 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {418 if let Some(span) = self.spans.get(&id) {419 if let LogField::String(s) = &fields[0] {420 span.pb_set_message(&process_message(s.trim()));421 } else {422 warn!("bad fields: {fields:?}");423 }424 } else {425 warn!("unknown result id: {id} {typ} {fields:?}");426 }427 // dbg!(fields, id, typ);428 }429 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {430 if let Some(span) = self.spans.get(&id) {431 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =432 &fields[..4]433 {434 span.pb_set_length(*expected);435 span.pb_set_position(*done);436 } else {437 warn!("bad fields: {fields:?}");438 }439 } else {440 // warn!("unknown result id: {id} {typ} {fields:?}");441 // Unaccounted progress.442 }443 // dbg!(fields, id, typ);444 }445 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {446 // Set phase, expected447 }448 _ => warn!("unknown log: {:?}", log),449 };450 } else {451 let e = e.trim();452 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {453 return;454 }455 info!("{e}")456 }457 }458}459460async fn run_nix_inner_raw(461 str: String,462 mut cmd: Command,463 want_stdout: bool,464 err_handler: &mut dyn Handler,465 mut out_handler: Option<&mut dyn Handler>,466) -> Result<Option<String>> {467 cmd.stderr(Stdio::piped());468 cmd.stdout(Stdio::piped());469 let mut child = cmd.spawn()?;470 let mut stderr = child.stderr.take().unwrap();471 let stdout = child.stdout.take().unwrap();472 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());473 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));474 let mut ob = want_stdout475 .then(|| out.take().unwrap())476 .unwrap_or_else(|| Box::new(EmptyAsyncRead));477 let mut ol = (!want_stdout)478 .then(|| out.take().unwrap())479 .unwrap_or_else(|| Box::new(EmptyAsyncRead));480 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());481 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());482483 // while let Some(line) = read.next().await? {}484485 let mut out_buf = if want_stdout { Some(vec![]) } else { None };486 loop {487 select! {488 e = err.next() => {489 if let Some(e) = e {490 let e = e?;491 err_handler.handle_line(&e);492 }493 },494 o = ob.next() => {495 if let Some(o) = o {496 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);497 }498 },499 o = ol.next() => {500 if let Some(o) = o {501 let o = o?;502 if let Some(out) = out_handler.as_mut() {503 out.handle_line(&o)504 } else {505 err_handler.handle_line(&o)506 }507 // out_handler.handle_info(&o);508 }509 },510 code = child.wait() => {511 let code = code?;512 if !code.success() {513 anyhow::bail!("command '{str}' failed with status {}", code);514 }515 break;516 }517 }518 }519520 Ok(out_buf.map(String::from_utf8).transpose()?)521}522523pub trait ErrorRecorder: Send {524 /// Return true to discard message from logging525 fn push_message(&mut self, msg: &str) -> bool;526}527528#[derive(Debug)]529enum LogField {530 String(String),531 Num(u64),532}533534impl<'de> Deserialize<'de> for LogField {535 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>536 where537 D: serde::Deserializer<'de>,538 {539 struct StringOrNum;540 impl<'de> Visitor<'de> for StringOrNum {541 type Value = LogField;542543 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {544 write!(f, "string or unsigned")545 }546547 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>548 where549 E: serde::de::Error,550 {551 Ok(LogField::String(v.to_owned()))552 }553554 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>555 where556 E: serde::de::Error,557 {558 Ok(LogField::Num(v))559 }560 }561562 deserializer.deserialize_any(StringOrNum)563 }564}565566#[derive(Deserialize, Debug)]567#[serde(rename_all = "camelCase", tag = "action")]568#[allow(dead_code)]569enum NixLog {570 Msg {571 level: u32,572 msg: String,573 raw_msg: Option<String>,574 },575 Start {576 id: u64,577 level: u32,578 #[serde(default)]579 fields: Vec<LogField>,580 text: String,581 #[serde(rename = "type")]582 typ: u32,583 },584 Stop {585 id: u64,586 },587 Result {588 id: u64,589 #[serde(rename = "type")]590 typ: u32,591 #[serde(default)]592 fields: Vec<LogField>,593 },594}1use std::{2 collections::HashMap,3 ffi::OsStr,4 process::Stdio,5 sync::{Arc, Mutex},6 task::Poll,7};89use anyhow::{anyhow, Result};10use futures::StreamExt;11use itertools::Either;12use once_cell::sync::Lazy;13use openssh::{OverSsh, Session};14use regex::Regex;15use serde::{de::Visitor, Deserialize};16use tokio::{io::AsyncRead, process::Command, select};17use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};18use tracing::{info, info_span, warn, Span};19use tracing_indicatif::span_ext::IndicatifSpanExt;2021fn escape_bash(input: &str, out: &mut String) {22 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";23 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {24 out.push_str(input);25 return;26 }27 out.push('\'');28 for (i, v) in input.split('\'').enumerate() {29 if i != 0 {30 out.push_str("'\"'\"'");31 }32 out.push_str(v);33 }34 out.push('\'');35}36fn ostoutf8(os: impl AsRef<OsStr>) -> String {37 os.as_ref().to_str().expect("non-utf8 data").to_owned()38}39#[derive(Clone)]40pub struct MyCommand {41 command: String,42 args: Vec<String>,43 env: Vec<(String, String)>,44 ssh_session: Option<Arc<Session>>,45}46impl MyCommand {47 pub fn new(cmd: impl AsRef<OsStr>) -> Self {48 assert!(!cmd.as_ref().is_empty());49 Self {50 command: ostoutf8(cmd),51 args: vec![],52 env: vec![],53 ssh_session: None,54 }55 }56 fn into_args(self) -> Vec<String> {57 let mut out = Vec::new();58 if !self.env.is_empty() {59 out.push("env".to_owned());60 for (k, v) in self.env {61 assert!(!k.contains('='));62 out.push(format!("{k}={v}"));63 }64 }65 out.push(self.command);66 out.extend(self.args);67 out68 }69 fn into_string(self) -> String {70 let mut out = String::new();71 if !self.env.is_empty() {72 out.push_str("env");73 for (k, v) in self.env {74 out.push(' ');75 assert!(!k.contains('='));76 escape_bash(&k, &mut out);77 out.push('=');78 escape_bash(&v, &mut out);79 }80 }81 if !out.is_empty() {82 out.push(' ');83 }84 escape_bash(&self.command, &mut out);85 for arg in self.args {86 out.push(' ');87 escape_bash(&arg, &mut out);88 }89 out90 }91 fn into_command(self) -> Command {92 let mut out = Command::new(self.command);93 out.args(self.args);94 for (k, v) in self.env {95 out.env(k, v);96 }97 out98 }99 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {100 Ok(if let Some(session) = self.ssh_session.clone() {101 let cmd = self.into_command();102 Either::Right(103 cmd.over_ssh(session)104 .map_err(|e| anyhow!("ssh error: {e}"))?,105 )106 } else {107 let cmd = self.into_command();108 Either::Left(cmd)109 })110 }111 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {112 let arg = arg.as_ref();113 self.args.push(ostoutf8(arg));114 self115 }116 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {117 let arg = arg.as_ref();118 let value = value.as_ref();119 let arg = ostoutf8(arg);120 let value = ostoutf8(value);121 self.arg(format!("{arg}={value}"));122 self123 }124 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {125 self.arg(arg);126 self.arg(value);127 self128 }129 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {130 for arg in args.into_iter() {131 let arg = arg.as_ref();132 self.args.push(ostoutf8(arg));133 }134 self135 }136 pub fn sudo(self) -> Self {137 if std::env::var_os("NO_SUDO").is_some() {138 let mut out = Self::new("su");139 out.arg("-c").arg(self.into_string());140 out141 } else {142 let mut out = Self::new("sudo");143 out.args(self.into_args());144 out145 }146 }147 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {148 let mut out = Self::new("ssh");149 out.arg(on).arg("--");150 out.arg(self.into_string());151 out152 }153 pub fn over_ssh(mut self, session: Arc<Session>) -> Self {154 self.ssh_session = Some(session);155 self156 }157158 pub async fn run(self) -> Result<()> {159 let str = self.clone().into_string();160 let cmd = self.into_command();161 run_nix_inner(str, cmd, &mut PlainHandler).await?;162 Ok(())163 }164 pub async fn run_string(self) -> Result<String> {165 let str = self.clone().into_string();166 let cmd = self.into_command();167 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;168 Ok(v)169 }170171 pub async fn run_nix_string(self) -> Result<String> {172 let str = self.clone().into_string();173 let mut cmd = self.into_command();174 cmd.arg("--log-format").arg("internal-json");175 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await176 }177 pub async fn run_nix(self) -> Result<()> {178 let str = self.clone().into_string();179 let mut cmd = self.into_command();180 cmd.arg("--log-format").arg("internal-json");181 cmd.stdout(Stdio::inherit());182 run_nix_inner(str, cmd, &mut NixHandler::default()).await183 }184}185186struct EmptyAsyncRead;187impl AsyncRead for EmptyAsyncRead {188 fn poll_read(189 self: std::pin::Pin<&mut Self>,190 _cx: &mut std::task::Context<'_>,191 _buf: &mut tokio::io::ReadBuf<'_>,192 ) -> Poll<std::io::Result<()>> {193 Poll::Pending194 }195}196197async fn run_nix_inner_stdout(198 str: String,199 cmd: Command,200 handler: &mut dyn Handler,201) -> Result<String> {202 Ok(run_nix_inner_raw(str, cmd, true, handler, None)203 .await?204 .expect("has out"))205}206async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {207 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;208 assert!(v.is_none());209 Ok(())210}211212pub trait Handler: Send {213 fn handle_line(&mut self, e: &str);214}215216pub struct ClonableHandler<H>(Arc<Mutex<H>>);217impl<H> Clone for ClonableHandler<H> {218 fn clone(&self) -> Self {219 Self(self.0.clone())220 }221}222impl<H> ClonableHandler<H> {223 pub fn new(inner: H) -> Self {224 Self(Arc::new(Mutex::new(inner)))225 }226}227impl<H: Handler> Handler for ClonableHandler<H> {228 fn handle_line(&mut self, e: &str) {229 self.0.lock().unwrap().handle_line(e)230 }231}232233struct PlainHandler;234impl Handler for PlainHandler {235 fn handle_line(&mut self, e: &str) {236 info!(target: "log", "{e}");237 }238}239240pub struct NoopHandler;241impl Handler for NoopHandler {242 fn handle_line(&mut self, _e: &str) {}243}244245#[derive(Default)]246pub struct NixHandler {247 spans: HashMap<u64, Span>,248}249fn process_message(m: &str) -> String {250 static OSC_CLEANER: Lazy<Regex> =251 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());252 static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());253 let m = OSC_CLEANER.replace_all(m, "");254 // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,255 // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.256 DETABBER.replace_all(m.as_ref(), " ").to_string()257}258impl Handler for NixHandler {259 fn handle_line(&mut self, e: &str) {260 if let Some(e) = e.strip_prefix("@nix ") {261 let log: NixLog = match serde_json::from_str(e) {262 Ok(l) => l,263 Err(err) => {264 warn!("failed to parse nix log line {:?}: {}", e, err);265 return;266 }267 };268 match log {269 NixLog::Msg { msg, raw_msg, .. } => {270 #[allow(clippy::nonminimal_bool)]271 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))272 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")273 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {274 if let Some(raw_msg) = raw_msg {275 if !msg.is_empty() {276 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())277 } else {278 info!(target: "nix", "{}", raw_msg.trim_end())279 }280 } else {281 info!(target: "nix", "{}", msg.trim_end())282 }283 }284 }285 NixLog::Start {286 ref fields,287 typ,288 id,289 ..290 } if typ == 105 && !fields.is_empty() => {291 if let [LogField::String(drv), ..] = &fields[..] {292 let mut drv = drv.as_str();293 if let Some(pkg) = drv.strip_prefix("/nix/store/") {294 let mut it = pkg.splitn(2, '-');295 it.next();296 if let Some(pkg) = it.next() {297 drv = pkg;298 }299 }300 info!(target: "nix","building {}", drv);301 let span = info_span!("build", drv);302 span.pb_start();303 self.spans.insert(id, span);304 } else {305 warn!("bad build log: {:?}", log)306 }307 }308 NixLog::Start {309 ref fields,310 typ,311 id,312 ..313 } if typ == 100 && fields.len() >= 3 => {314 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =315 &fields[..]316 {317 let mut drv = drv.as_str();318319 if let Some(pkg) = drv.strip_prefix("/nix/store/") {320 let mut it = pkg.splitn(2, '-');321 it.next();322 if let Some(pkg) = it.next() {323 drv = pkg;324 }325 }326 // info!(target: "nix","copying {} {} -> {}", drv, from, to);327 let span = info_span!("copy", from, to, drv);328 span.pb_start();329 self.spans.insert(id, span);330 } else {331 warn!("bad copy log: {:?}", log)332 }333 }334 NixLog::Start { text, typ, id, .. }335 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>336 {337 if !text.is_empty()338 && text != "querying info about missing paths"339 && text != "copying 0 paths"340 {341 let span = info_span!("job");342 span.pb_start();343 span.pb_set_message(&process_message(text.trim()));344 self.spans.insert(id, span);345 info!(target: "nix", "{}", text);346 }347 }348 NixLog::Start {349 text,350 level: 0,351 typ: 108,352 ..353 } if text.is_empty() => {354 // Cache lookup? Coupled with copy log355 }356 NixLog::Start {357 text,358 level: 4,359 typ: 109,360 ..361 } if text.starts_with("querying info about ") => {362 // Cache lookup363 }364 NixLog::Start {365 text,366 level: 4,367 typ: 101,368 ..369 } if text.starts_with("downloading ") => {370 // NAR downloading, coupled with copy log371 }372 NixLog::Start {373 text,374 level: 1,375 typ: 111,376 ..377 } if text.starts_with("waiting for a machine to build ") => {378 // Useless repeating notification about build379 }380 NixLog::Start {381 text,382 level: 3,383 typ: 111,384 ..385 } if text.starts_with("resolved derivation: ") => {386 // CA resolved387 }388 NixLog::Start {389 text,390 level: 1,391 typ: 111,392 id,393 ..394 } if text.starts_with("waiting for lock on ") => {395 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();396 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {397 drv = txt;398 }399 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {400 drv = txt;401 }402 if let Some(txt) = drv.split("', '").next() {403 drv = txt;404 }405 if let Some(pkg) = drv.strip_prefix("/nix/store/") {406 let mut it = pkg.splitn(2, '-');407 it.next();408 if let Some(pkg) = it.next() {409 drv = pkg;410 }411 }412 let span = info_span!("waiting on drv", drv);413 span.pb_start();414 self.spans.insert(id, span);415 // Concurrent build of the same message416 }417 NixLog::Stop { id, .. } => {418 self.spans.remove(&id);419 }420 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {421 if let Some(span) = self.spans.get(&id) {422 if let LogField::String(s) = &fields[0] {423 span.pb_set_message(&process_message(s.trim()));424 } else {425 warn!("bad fields: {fields:?}");426 }427 } else {428 warn!("unknown result id: {id} {typ} {fields:?}");429 }430 // dbg!(fields, id, typ);431 }432 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {433 if let Some(span) = self.spans.get(&id) {434 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =435 &fields[..4]436 {437 span.pb_set_length(*expected);438 span.pb_set_position(*done);439 } else {440 warn!("bad fields: {fields:?}");441 }442 } else {443 // warn!("unknown result id: {id} {typ} {fields:?}");444 // Unaccounted progress.445 }446 // dbg!(fields, id, typ);447 }448 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {449 // Set phase, expected450 }451 _ => warn!("unknown log: {:?}", log),452 };453 } else {454 let e = e.trim();455 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {456 return;457 }458 info!("{e}")459 }460 }461}462463async fn run_nix_inner_raw(464 str: String,465 mut cmd: Command,466 want_stdout: bool,467 err_handler: &mut dyn Handler,468 mut out_handler: Option<&mut dyn Handler>,469) -> Result<Option<String>> {470 cmd.stderr(Stdio::piped());471 cmd.stdout(Stdio::piped());472 let mut child = cmd.spawn()?;473 let mut stderr = child.stderr.take().unwrap();474 let stdout = child.stdout.take().unwrap();475 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());476 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));477 let mut ob = want_stdout478 .then(|| out.take().unwrap())479 .unwrap_or_else(|| Box::new(EmptyAsyncRead));480 let mut ol = (!want_stdout)481 .then(|| out.take().unwrap())482 .unwrap_or_else(|| Box::new(EmptyAsyncRead));483 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());484 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());485486 // while let Some(line) = read.next().await? {}487488 let mut out_buf = if want_stdout { Some(vec![]) } else { None };489 loop {490 select! {491 e = err.next() => {492 if let Some(e) = e {493 let e = e?;494 err_handler.handle_line(&e);495 }496 },497 o = ob.next() => {498 if let Some(o) = o {499 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);500 }501 },502 o = ol.next() => {503 if let Some(o) = o {504 let o = o?;505 if let Some(out) = out_handler.as_mut() {506 out.handle_line(&o)507 } else {508 err_handler.handle_line(&o)509 }510 // out_handler.handle_info(&o);511 }512 },513 code = child.wait() => {514 let code = code?;515 if !code.success() {516 anyhow::bail!("command '{str}' failed with status {}", code);517 }518 break;519 }520 }521 }522523 Ok(out_buf.map(String::from_utf8).transpose()?)524}525526pub trait ErrorRecorder: Send {527 /// Return true to discard message from logging528 fn push_message(&mut self, msg: &str) -> bool;529}530531#[derive(Debug)]532enum LogField {533 String(String),534 Num(u64),535}536537impl<'de> Deserialize<'de> for LogField {538 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>539 where540 D: serde::Deserializer<'de>,541 {542 struct StringOrNum;543 impl<'de> Visitor<'de> for StringOrNum {544 type Value = LogField;545546 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {547 write!(f, "string or unsigned")548 }549550 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>551 where552 E: serde::de::Error,553 {554 Ok(LogField::String(v.to_owned()))555 }556557 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>558 where559 E: serde::de::Error,560 {561 Ok(LogField::Num(v))562 }563 }564565 deserializer.deserialize_any(StringOrNum)566 }567}568569#[derive(Deserialize, Debug)]570#[serde(rename_all = "camelCase", tag = "action")]571#[allow(dead_code)]572enum NixLog {573 Msg {574 level: u32,575 msg: String,576 raw_msg: Option<String>,577 },578 Start {579 id: u64,580 level: u32,581 #[serde(default)]582 fields: Vec<LogField>,583 text: String,584 #[serde(rename = "type")]585 typ: u32,586 },587 Stop {588 id: u64,589 },590 Result {591 id: u64,592 #[serde(rename = "type")]593 typ: u32,594 #[serde(default)]595 fields: Vec<LogField>,596 },597}cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -13,9 +13,10 @@
use tempfile::NamedTempFile;
use crate::{
- better_nix_eval::{Field, NixSessionPool},
+ better_nix_eval::{Field, Index, NixSessionPool},
command::MyCommand,
fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
+ nix_path,
};
pub struct FleetConfigInternals {
@@ -24,9 +25,9 @@
pub opts: FleetOpts,
pub data: Mutex<FleetData>,
pub nix_args: Vec<OsString>,
- // fleetConfigurations.<name>
+ /// fleetConfigurations.<name>.<localSystem>
pub fleet_field: Field,
- // fleet_config.configUnchecked
+ /// fleet_config.configUnchecked
pub config_field: Field,
}
@@ -91,22 +92,12 @@
command = command.ssh(host);
}
command.run_string().await
- }
-
- pub fn configuration_attr_name(&self, name: &str) -> OsString {
- let mut str = self.directory.as_os_str().to_owned();
- str.push("#");
- str.push(&format!(
- "fleetConfigurations.default.{}.{}",
- self.local_system, name
- ));
- str
}
pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
let names = self
.fleet_field
- .get_field_deep(["configuredHosts"])
+ .select(nix_path!(.configuredHosts))
.await?
.list_fields()
.await?;
@@ -118,7 +109,7 @@
}
pub async fn system_config(&self, host: &str) -> Result<Field> {
self.fleet_field
- .get_field_deep(["configuredSystems", host, "config"])
+ .select(nix_path!(.configuredSystems.{host}.config))
.await
}
@@ -131,7 +122,7 @@
/// Shared secrets configured in fleet.nix or in flake
pub async fn list_configured_shared(&self) -> Result<Vec<String>> {
self.config_field
- .get_field("sharedSecrets")
+ .select(nix_path!(.sharedSecrets))
.await?
.list_fields()
.await
@@ -221,7 +212,7 @@
}
pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {
self.config_field
- .get_field_deep(["sharedSecrets", secret, "expectedOwners"])
+ .select(nix_path!(.sharedSecrets.{secret}.expectedOwners))
.await?
.as_json()
.await
@@ -279,7 +270,9 @@
if self.local_system == "detect" {
let builtins_field = Field::field(root_field.clone(), "builtins").await?;
- let system = builtins_field.get_field("currentSystem").await?;
+ let system = builtins_field
+ .select(nix_path!(.currentSystem))
+ .await?;
self.local_system = system.as_json().await?;
}
let local_system = self.local_system.clone();
@@ -287,9 +280,11 @@
let fleet_root = Field::field(root_field, "fleetConfigurations").await?;
let fleet_field = fleet_root
- .get_field_deep(["default", &local_system])
+ .select(nix_path!(.default.{&local_system}))
+ .await?;
+ let config_field = fleet_field
+ .select(nix_path!(.configUnchecked))
.await?;
- let config_field = fleet_field.get_field("configUnchecked").await?;
let mut fleet_data_path = directory.clone();
fleet_data_path.push("fleet.nix");
cmds/fleet/src/main.rsdiffbeforeafterboth--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -1,3 +1,4 @@
+#![recursion_limit = "512"]
#![feature(try_blocks)]
pub(crate) mod cmds;
flake.nixdiffbeforeafterboth--- a/flake.nix
+++ b/flake.nix
@@ -19,6 +19,7 @@
rustPlatform = pkgs.makeRustPlatform { cargo = rust; rustc = rust; };
in
{
+ packages = (import ./pkgs) pkgs pkgs;
devShell = (pkgs.mkShell.override { stdenv = llvmPkgs.stdenv; }) {
nativeBuildInputs = with pkgs; [
rust