git.delta.rocks / jrsonnet / refs/commits / 7e2e5c591e04

difftreelog

refactor more repl abstractions

Yaroslav Bolyukin2023-12-27parent: #624fe7e.patch.diff
in: trunk

11 files changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
before · cmds/fleet/src/better_nix_eval.rs
1use std::collections::HashMap;2use std::ffi::{OsStr, OsString};3use std::fmt::{self, Display};4use std::path::PathBuf;5use std::process::Stdio;6use std::sync::{Arc, OnceLock};78use anyhow::{anyhow, bail, ensure, Context, Result};9use futures::StreamExt;10use itertools::Itertools;11use r2d2::{Pool, PooledConnection};12use serde::de::DeserializeOwned;13use serde::{Deserialize, Serialize};14use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;17use tokio::sync::{mpsc, oneshot};18use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2223const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2425pub struct NixSessionInner {26	full_delimiter: String,27	nix_handler: ClonableHandler<NixHandler>,28	out: OutputHandler,29	stdin: ChildStdin,30	string_wrapping: (String, String),31	number_wrapping: (String, String),3233	next_id: u32,34	free_list: Vec<u32>,35}36const TRAIN_STRING: &str = "\"TRAIN_STRING\"";37const TRAIN_NUMBER: &str = "13141516";3839#[must_use]40struct ErrorCollector<'i, H> {41	collected: Vec<String>,42	inner: &'i mut H,43}44impl<'i, H> ErrorCollector<'i, H> {45	fn new(inner: &'i mut H) -> Self {46		Self {47			collected: vec![],48			inner,49		}50	}51}52impl<H> ErrorCollector<'_, H> {53	fn handle_line_inner(&mut self, msg: &str) -> bool {54		let Some(msg) = msg.strip_prefix("@nix ") else {55			return false;56		};57		#[derive(Deserialize)]58		struct ErrorAction {59			action: String,60			level: u32,61			msg: String,62		}63		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {64			return false;65		};66		if act.action != "msg" || act.level != 0 {67			return false;68		}69		self.collected.push(act.msg);70		true71	}72	fn finish(self) -> Result<()> {73		// fn dedent(s: String) -> String {74		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)75		// }76		if !self.collected.is_empty() {77			bail!(78				"{}",79				self.collected80					.iter()81					.map(|v| {82						if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {83							let v = unindent::unindent(f.trim_start());84							v.trim().to_owned()85						} else {86							v.to_owned()87						}88					})89					.join("\n")90			);91		}92		Ok(())93	}94	fn flush(self) {95		for line in self.collected {96			warn!("{line}");97		}98	}99}100impl<H: Handler> Handler for ErrorCollector<'_, H> {101	fn handle_line(&mut self, e: &str) {102		if self.handle_line_inner(e) {103			return;104		}105		self.inner.handle_line(e)106	}107}108109enum OutputLine {110	Out(String),111	Err(String),112}113struct OutputHandler {114	rx: mpsc::Receiver<OutputLine>,115	_cancel_handle: oneshot::Receiver<()>,116}117impl OutputHandler {118	fn new(out: ChildStdout, err: ChildStderr) -> Self {119		let mut out = FramedRead::new(out, LinesCodec::new());120		let mut err = FramedRead::new(err, LinesCodec::new());121		let (tx, rx) = mpsc::channel(20);122		let (mut cancelled, _cancel_handle) = oneshot::channel();123		tokio::spawn(async move {124			loop {125				select! {126					// We should receive errors earlier than synchronization127					biased;128					e = err.next() => {129						let Some(Ok(e)) = e else {130							if e.is_some() {131								error!("bad repl stderr: {e:?}");132							}133							continue;134						};135						let _ = tx.send(OutputLine::Err(e)).await;136					}137					o = out.next() => {138						let Some(Ok(o)) = o else {139							if o.is_some() {140								error!("bad repl stdout: {o:?}");141							}142							continue;143						};144						let _ = tx.send(OutputLine::Out(o)).await;145					}146					// Reader doesn't care about stdout, as this is cancelled.147					// Error still might be useful, to process leftover span closures?148					_ = cancelled.closed() => {149						break;150					}151				}152			}153		});154		Self { rx, _cancel_handle }155	}156	async fn next(&mut self) -> Option<OutputLine> {157		self.rx.recv().await158	}159}160161struct WarnHandler;162impl Handler for WarnHandler {163	fn handle_line(&mut self, e: &str) {164		warn!(target: "nix", "{e}")165	}166}167168impl NixSessionInner {169	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {170		let mut cmd = Command::new("nix");171		cmd.arg("repl")172			.arg(flake)173			.arg("--log-format")174			.arg("internal-json");175		for arg in extra_args {176			cmd.arg(arg);177		}178		cmd.stdin(Stdio::piped());179		cmd.stdout(Stdio::piped());180		cmd.stderr(Stdio::piped());181		let cmd = cmd.spawn()?;182		let stdout = cmd.stdout.unwrap();183		let stderr = cmd.stderr.unwrap();184		let mut out = OutputHandler::new(stdout, stderr);185		let mut stdin = cmd.stdin.unwrap();186		// Standard repl hello doesn't work with internal-json logger187		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;188		stdin.write_all(b"\n").await?;189		stdin.flush().await?;190		let nix_handler = NixHandler::default();191		let mut full_delimiter = None;192		let mut errors = vec![];193		while let Some(line) = out.next().await {194			let line = match line {195				OutputLine::Out(o) => o,196				OutputLine::Err(_e) => {197					// Handle startup errors, but skip repl hello?198					errors.push(_e);199					continue;200				}201			};202			if line.contains(REPL_DELIMITER) {203				debug!("discovered repl delimiter with added colors: {line}");204				full_delimiter = Some(line.to_owned());205				break;206			}207		}208		let Some(full_delimiter) = full_delimiter else {209			for e in errors {210				error!("{e}");211			}212			bail!("failed to discover delimiter");213		};214		let mut res = Self {215			full_delimiter,216			nix_handler: ClonableHandler::new(nix_handler),217			out,218			stdin,219			string_wrapping: Default::default(),220			number_wrapping: Default::default(),221222			next_id: 0,223			free_list: vec![],224		};225		res.train().await?;226		Ok(res)227	}228	async fn train(&mut self) -> Result<()> {229		{230			let full_string = self231				.execute_expression_raw(TRAIN_STRING, &mut NoopHandler)232				.await?;233			let string_offset = full_string.find(TRAIN_STRING).expect("contained");234			let string_prefix = &full_string[..string_offset];235			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];236			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());237		}238		{239			let full_number = self240				.execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)241				.await?;242			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");243			let number_prefix = &full_number[..number_offset];244			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];245			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());246		}247		Ok(())248	}249	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {250		if tracing::enabled!(Level::DEBUG) {251			let cmd_str = String::from_utf8_lossy(cmd.as_ref());252			tracing::debug!("{cmd_str}");253		};254		self.stdin.write_all(cmd.as_ref()).await?;255		self.stdin.write_all(b"\n").await?;256		Ok(())257	}258	async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {259		let mut out = String::new();260		while let Some(line) = self.out.next().await {261			let line = match line {262				OutputLine::Out(out) => out,263				OutputLine::Err(err) => {264					err_handler.handle_line(&err);265					continue;266				}267			};268			if line == self.full_delimiter {269				return Ok(out);270			}271			if !out.is_empty() {272				out.push('\n');273			}274			out.push_str(&line);275		}276		bail!("didn't reached delimiter");277	}278	async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {279		let num = self.number_wrapping.clone();280		let n = self.execute_expression_wrapping(expr, &num).await?;281		Ok(n.parse::<u64>()?)282	}283	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {284		let num = self.string_wrapping.clone();285		let n = self.execute_expression_wrapping(expr, &num).await?;286		let str: String = serde_json::from_str(&n)?;287		Ok(str)288	}289	async fn execute_expression_to_json<V: DeserializeOwned>(290		&mut self,291		expr: impl AsRef<[u8]>,292	) -> Result<V> {293		let mut fexpr = b"builtins.toJSON (".to_vec();294		fexpr.extend_from_slice(expr.as_ref());295		fexpr.push(b')');296		let v = self.execute_expression_string(fexpr).await?;297		Ok(serde_json::from_str(&v)?)298	}299	async fn execute_expression_wrapping(300		&mut self,301		expr: impl AsRef<[u8]>,302		wrapping: &(String, String),303	) -> Result<String> {304		let mut nix_handler = self.nix_handler.clone();305		let mut collected = ErrorCollector::new(&mut nix_handler);306		let res = self.execute_expression_raw(expr, &mut collected).await?;307		if res.is_empty() {308			collected.finish()?;309			bail!("expected expression, got nothing")310		} else {311			collected.flush()312		};313		let Some(res) = res.strip_prefix(&wrapping.0) else {314			bail!("invalid type")315		};316		let Some(res) = res.strip_suffix(&wrapping.1) else {317			bail!("invalid type")318		};319		Ok(res.to_owned())320	}321	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {322		let mut nix_handler = self.nix_handler.clone();323		let mut collected = ErrorCollector::new(&mut nix_handler);324		let v = self.execute_expression_raw(expr, &mut collected).await?;325		collected.finish()?;326		ensure!(v.is_empty(), "unexpected expression result");327		Ok(())328	}329	async fn execute_expression_raw(330		&mut self,331		expr: impl AsRef<[u8]>,332		err_handler: &mut dyn Handler,333	) -> Result<String> {334		self.send_command(expr).await?;335		// It will be echoed336		self.send_command(REPL_DELIMITER).await?;337		self.read_until_delimiter(err_handler).await338	}339	async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {340		let id = self.allocate_id();341		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))342			.await?;343		Ok(id)344	}345346	/// Id should be immediately used347	fn allocate_id(&mut self) -> u32 {348		if let Some(free) = self.free_list.pop() {349			free350		} else {351			let v = self.next_id;352			self.next_id += 1;353			v354		}355	}356	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.357	// async fn free_id(&mut self, id: u32) -> Result<()> {358	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))359	// 		.await?;360	// 	self.free_list.push(id);361	// 	Ok(())362	// }363}364365#[derive(Clone)]366pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);367368#[macro_export]369macro_rules! nix_path {370	(@o($o:ident) $var:ident $($tt:tt)*) => {{371		$o.push(Index::var(stringify!($var)));372		nix_path!(@o($o) $($tt)*);373	}};374	(@o($o:ident) . $var:ident $($tt:tt)*) => {{375		$o.push(Index::attr(stringify!($var)));376		nix_path!(@o($o) $($tt)*);377	}};378	(@o($o:ident) . $var:literal $($tt:tt)*) => {{379		$o.push(Index::attr($var));380		nix_path!(@o($o) $($tt)*);381	}};382	(@o($o:ident) . { $var:expr } $($tt:tt)*) => {{383		$o.push(Index::attr($var));384		nix_path!(@o($o) $($tt)*);385	}};386	(@o($o:ident) [ $var:literal ] $($tt:tt)*) => {{387		$o.push(Index::idx($var));388		nix_path!(@o($o) $($tt)*);389	}};390	(@o($o:ident) ($e:expr) $($tt:tt)*) => {391		$o.push(Index::apply($e));392		nix_path!(@o($o) $($tt)*);393	};394	(@o($o:ident)) => {};395	($($tt:tt)+) => {{396		use $crate::{nix_path, better_nix_eval::Index};397		let mut out = vec![];398		nix_path!(@o(out) $($tt)*);399		out400	}}401}402403#[derive(Clone)]404pub enum Index {405	Var(String),406	String(String),407	Apply(String),408	Idx(u32),409}410impl Index {411	pub fn var(v: impl AsRef<str>) -> Self {412		let v = v.as_ref();413		assert!(414			!(v.contains('.') | v.contains(' ')),415			"bad variable name: {v}"416		);417		Self::Var(v.to_owned())418	}419	pub fn attr(v: impl AsRef<str>) -> Self {420		Self::String(v.as_ref().to_owned())421	}422	pub fn idx(v: u32) -> Self {423		Self::Idx(v)424	}425	pub fn apply(v: impl Serialize) -> Self {426		let serialized = nixlike::serialize(v).expect("invalid value for apply");427		Self::Apply(serialized.trim_end().to_owned())428	}429}430impl Display for Index {431	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {432		match self {433			Index::Var(v) => {434				write!(f, "{v}")435			}436			Index::String(k) => {437				let v = nixlike::format_identifier(k.as_str());438				write!(f, ".{v}")439			}440			Index::Apply(o) => {441				write!(f, "<apply>({o})")442			}443			Index::Idx(i) => {444				write!(f, "[{i}]")445			}446		}447	}448}449impl fmt::Debug for Index {450	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {451		write!(f, "{self}")452	}453}454struct PathDisplay<'i>(&'i [Index]);455impl Display for PathDisplay<'_> {456	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {457		for i in self.0 {458			write!(f, "{i}")?;459		}460		Ok(())461	}462}463pub struct Field {464	full_path: Vec<Index>,465	session: NixSession,466	value: Option<u32>,467}468impl Field {469	fn root(session: NixSession) -> Self {470		Self {471			full_path: vec![],472			session,473			value: None,474		}475	}476	pub async fn field(session: NixSession, field: &str) -> Result<Self> {477		Self::root(session)478			.select([Index::var(field)])479			.await480	}481	pub async fn get_json_deep<'a, V: DeserializeOwned>(482		&self,483		name: impl IntoIterator<Item = Index>,484	) -> Result<V> {485		let field = self.select(name).await?;486		field.as_json().await487	}488	pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {489		let mut name = name.into_iter();490491		let mut full_path = self.full_path.clone();492		let mut query = if let Some(id) = self.value {493			format!("sess_field_{id}")494		} else {495			let first = name.next();496			if let Some(Index::Var(i)) = first {497				full_path.push(Index::Var(i.clone()));498				i.clone()499			} else {500				panic!("first path item should be variable, got {first:?}")501			}502		};503		for v in name {504			full_path.push(v.clone());505			match v {506				Index::Var(_) => panic!("var item may only be first"),507				Index::String(s) => {508					let escaped = nixlike::serialize(s)?;509					query.push('.');510					query.push_str(escaped.trim());511				}512				Index::Apply(a) => {513					// In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`514					query = format!("({query} {a})");515				}516				Index::Idx(idx) => {517					query = format!("builtins.elemAt ({query}) {idx}");518				}519			}520		}521522		let vid = self523			.session524			.0525			.lock()526			.await527			.execute_assign(&query)528			.await529			.with_context(|| format!("full path: {}", PathDisplay(&full_path)))?;530		Ok(Self {531			full_path,532			session: self.session.clone(),533			value: Some(vid),534		})535	}536	pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {537		let id = self.value.expect("can't serialize root field");538		self.session539			.0540			.lock()541			.await542			.execute_expression_to_json(&format!("sess_field_{id}"))543			.await544			.with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))545	}546	pub async fn list_fields(&self) -> Result<Vec<String>> {547		let id = self.value.expect("can't list root fields");548		self.session549			.0550			.lock()551			.await552			.execute_expression_to_json(&format!("builtins.attrNames sess_field_{id}"))553			.await554			.with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))555	}556	pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {557		let id = self.value.expect("can't use build on not-value");558		let vid = self559			.session560			.0561			.lock()562			.await563			.execute_expression_raw(&format!(":b sess_field_{id}"), &mut NixHandler::default())564			.await?;565		ensure!(!vid.is_empty(), "build failed: {}", PathDisplay(&self.full_path));566		let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")567		else {568			panic!("unexpected build output: {vid:?}");569		};570		let outputs = vid571			.split('\n')572			.filter(|v| !v.is_empty())573			.map(|v| v.split_once(" -> ").expect("unexpected build output"))574			.map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))575			.collect();576		Ok(outputs)577	}578}579impl Drop for Field {580	fn drop(&mut self) {581		if let Some(id) = self.value {582			if let Ok(mut lock) = self.session.0.try_lock() {583				lock.free_list.push(id)584			}585			// Leaked586		}587	}588}589struct NixSessionPoolInner {590	flake: OsString,591	nix_args: Vec<OsString>,592}593594#[derive(Debug)]595pub struct NixPoolError(anyhow::Error);596impl From<anyhow::Error> for NixPoolError {597	fn from(value: anyhow::Error) -> Self {598		Self(value)599	}600}601impl std::error::Error for NixPoolError {}602impl std::fmt::Display for NixPoolError {603	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {604		self.0.fmt(f)605	}606}607impl r2d2::ManageConnection for NixSessionPoolInner {608	type Connection = NixSessionInner;609	type Error = NixPoolError;610	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {611		let _v = TOKIO_RUNTIME612			.get()613			.expect("missed tokio runtime init!")614			.enter();615		Ok(futures::executor::block_on(NixSessionInner::new(616			self.flake.as_os_str(),617			self.nix_args.iter().map(OsString::as_os_str),618		))?)619	}620621	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {622		let _v = TOKIO_RUNTIME623			.get()624			.expect("missed tokio runtime init!")625			.enter();626		let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;627		if res != 4 {628			return Err(anyhow!("sanity check failed").into());629		};630		Ok(())631	}632633	fn has_broken(&self, _conn: &mut Self::Connection) -> bool {634		false635	}636}637pub struct NixSessionPool(Pool<NixSessionPoolInner>);638impl NixSessionPool {639	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {640		let inner = tokio::task::block_in_place(|| {641			r2d2::Builder::<NixSessionPoolInner>::new()642				.min_idle(Some(0))643				.build(NixSessionPoolInner { flake, nix_args })644		})?;645		Ok(Self(inner))646	}647	pub async fn get(&self) -> Result<NixSession> {648		let v = tokio::task::block_in_place(|| self.0.get())?;649		Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))650	}651}652653pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -4,8 +4,8 @@
 
 use crate::command::MyCommand;
 use crate::host::Config;
-use crate::nix_path;
-use anyhow::{anyhow, Result, Context};
+use crate::nix_go;
+use anyhow::{anyhow, Result};
 use clap::Parser;
 use itertools::Itertools;
 use tokio::{task::LocalSet, time::sleep};
@@ -290,12 +290,10 @@
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
 		let action = Action::from(self.subcommand.clone());
-		let drv = config
-			.fleet_field
-			.select(nix_path!(.buildSystems((serde_json::json!({
-				"localSystem": config.local_system.clone(),
-			}))).{action.build_attr()}.{&host}))
-			.await.context("system attribute")?;
+		let fleet_field = &config.fleet_field;
+		let drv = nix_go!(fleet_field.buildSystems(Obj {
+			localSystem: { config.local_system.clone() }
+		}));
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
 				info!("sd-image build failed");
modifiedcmds/fleet/src/cmds/info.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/info.rs
+++ b/cmds/fleet/src/cmds/info.rs
@@ -1,7 +1,7 @@
 use std::collections::BTreeSet;
 
 use crate::host::Config;
-use crate::nix_path;
+use crate::nix_go_json;
 use anyhow::{ensure, Result};
 use clap::Parser;
 
@@ -37,12 +37,9 @@
 			InfoCmd::ListHosts { ref tagged } => {
 				'host: for host in config.list_hosts().await? {
 					if !tagged.is_empty() {
-						let tags: Vec<String> = config
-							.fleet_field
-							.select(nix_path!(.configuredSystems.{&host.name}.config.tags))
-							.await?
-							.as_json()
-							.await?;
+						let fleet_field = &config.fleet_field;
+						let tags: Vec<String> =
+							nix_go_json!(fleet_field.configuredSystems[{ host.name }].config.tags);
 						for tag in tagged {
 							if !tags.contains(tag) {
 								continue 'host;
@@ -64,20 +61,12 @@
 				let mut out = <BTreeSet<String>>::new();
 				let host = config.system_config(&host).await?;
 				if external {
-					out.extend(
-						host.select(nix_path!(.network.externalIps))
-							.await?
-							.as_json::<Vec<String>>()
-							.await?,
-					);
+					let data: Vec<String> = nix_go_json!(host.network.externalIps);
+					out.extend(data);
 				}
 				if internal {
-					out.extend(
-						host.select(nix_path!(.network.internalIps))
-							.await?
-							.as_json::<Vec<String>>()
-							.await?,
-					);
+					let data: Vec<String> = nix_go_json!(host.network.internalIps);
+					out.extend(data);
 				}
 				for ip in out {
 					data.push(ip);
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,9 +1,10 @@
 use crate::{
 	fleetdata::{FleetSecret, FleetSharedSecret},
-	host::Config, nix_path,
+	host::Config,
+	nix_go, nix_go_json,
 };
-use anyhow::{bail, ensure, Context, Result};
-use chrono::Utc;
+use anyhow::{anyhow, bail, ensure, Context, Result};
+use chrono::{DateTime, Utc};
 use clap::Parser;
 use futures::{StreamExt, TryStreamExt};
 use owo_colors::OwoColorize;
@@ -17,8 +18,8 @@
 use tracing::{error, info, info_span, warn};
 
 #[derive(Parser)]
-pub enum Secrets {
-	/// Force load keys for all defined hosts
+pub enum Secret {
+	/// Force load host keys for all defined hosts
 	ForceKeys,
 	/// Add secret, data should be provided in stdin
 	AddShared {
@@ -29,14 +30,20 @@
 		/// Override secret if already present
 		#[clap(long)]
 		force: bool,
+		/// Secret public part
 		#[clap(long)]
 		public: Option<String>,
+		/// Load public part from specified file
 		#[clap(long)]
 		public_file: Option<PathBuf>,
 
+		/// Create a notification on secret expiration
+		#[clap(long)]
+		expires_at: Option<DateTime<Utc>>,
+
 		/// Secret with this name already exists, override its value while keeping the same owners.
 		#[clap(long)]
-		readd: bool,
+		re_add: bool,
 	},
 	/// Add secret, data should be provided in stdin
 	Add {
@@ -81,12 +88,33 @@
 		prefer_identities: Vec<String>,
 	},
 	List {},
+	InvokeGenerator,
 }
 
-impl Secrets {
+impl Secret {
 	pub async fn run(self, config: &Config) -> Result<()> {
 		match self {
-			Secrets::ForceKeys => {
+			Secret::InvokeGenerator => {
+				let config_field = &config.config_unchecked_field;
+
+				let generate_impure =
+					nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);
+				let on = nix_go!(generate_impure.on);
+				let call_package = nix_go!(
+					config_field.buildableSystems(Obj {
+						localSystem: { config.local_system.clone() }
+					})[on]
+						.config
+						.nixpkgs
+						.pkgs
+						.callPackage
+				);
+				let generator = nix_go!(call_package(generate_impure.generator));
+				let built = generator.build().await?;
+				// .as_json().await?;
+				dbg!(&built);
+			}
+			Secret::ForceKeys => {
 				for host in config.list_hosts().await? {
 					if config.should_skip(&host.name) {
 						continue;
@@ -94,19 +122,20 @@
 					config.key(&host.name).await?;
 				}
 			}
-			Secrets::AddShared {
+			Secret::AddShared {
 				mut machines,
 				name,
 				force,
 				public,
 				public_file,
-				readd,
+				expires_at,
+				re_add,
 			} => {
 				let exists = config.has_shared(&name);
-				if exists && !force && !readd {
+				if exists && !force && !re_add {
 					bail!("secret already defined");
 				}
-				if readd {
+				if re_add {
 					// Fixme: use clap to limit this usage
 					ensure!(!force, "--force and --readd are not compatible");
 					ensure!(exists, "secret doesn't exists");
@@ -137,7 +166,7 @@
 							.map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)
 							.collect();
 						let mut encryptor = age::Encryptor::with_recipients(recipients)
-							.expect("recipients provided")
+							.ok_or_else(|| anyhow!("no recipients provided"))?
 							.wrap_output(&mut encrypted)?;
 						io::copy(&mut Cursor::new(input), &mut encryptor)?;
 						encryptor.finish()?;
@@ -150,7 +179,7 @@
 						owners: machines,
 						secret: FleetSecret {
 							created_at: Utc::now(),
-							expires_at: None,
+							expires_at,
 							secret,
 							public: match (public, public_file) {
 								(Some(v), None) => Some(v),
@@ -164,7 +193,7 @@
 					},
 				);
 			}
-			Secrets::Add {
+			Secret::Add {
 				machine,
 				name,
 				force,
@@ -211,7 +240,7 @@
 			}
 			// TODO: Instead of using sudo, decode secret on remote machine
 			#[allow(clippy::await_holding_refcell_ref)]
-			Secrets::Read {
+			Secret::Read {
 				name,
 				machine,
 				plaintext,
@@ -228,7 +257,7 @@
 					println!("{}", z85::encode(&data));
 				}
 			}
-			Secrets::UpdateShared {
+			Secret::UpdateShared {
 				name,
 				machines,
 				mut add_machines,
@@ -321,7 +350,7 @@
 				secret.secret.secret = encrypted;
 				config.replace_shared(name, secret);
 			}
-			Secrets::Regenerate { prefer_identities } => {
+			Secret::Regenerate { prefer_identities } => {
 				{
 					let expected_shared_set = config
 						.list_configured_shared()
@@ -337,10 +366,9 @@
 				for name in &config.list_shared() {
 					info!("updating secret: {name}");
 					let mut data = config.shared_secret(name)?;
-					let expected_owners: Vec<String> = config
-						.config_field
-						.get_json_deep(nix_path!(sharedSecrets.{name}.expectedOwners))
-						.await?;
+					let config_field = &config.config_field;
+					let expected_owners: Vec<String> =
+						nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);
 					if expected_owners.is_empty() {
 						warn!("secret was removed from fleet config: {name}, removing from data");
 						to_remove.push(name.to_string());
@@ -350,10 +378,8 @@
 					let expected_set = expected_owners.iter().collect::<HashSet<_>>();
 					let should_remove = set.difference(&expected_set).next().is_some();
 					if set != expected_set {
-						let owner_dependent: bool = config
-							.config_field
-							.get_json_deep(nix_path!(.sharedSecrets.{name}.ownerDependent))
-							.await?;
+						let owner_dependent: bool =
+							nix_go_json!(config_field.sharedSecrets[{ name }].ownerDependent);
 						if !owner_dependent {
 							warn!("reencrypting secret '{name}' for new owner set");
 							// TODO: force regeneration
@@ -401,7 +427,7 @@
 					config.remove_shared(&k);
 				}
 			}
-			Secrets::List {} => {
+			Secret::List {} => {
 				let _span = info_span!("loading secrets").entered();
 				let configured = config.list_configured_shared().await?;
 				#[derive(Tabled)]
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -337,6 +337,8 @@
 					if !text.is_empty()
 						&& text != "querying info about missing paths"
 						&& text != "copying 0 paths"
+						// Too much spam on lazy-trees branch
+						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
 					{
 						let span = info_span!("job");
 						span.pb_start();
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -16,7 +16,7 @@
 	better_nix_eval::{Field, NixSessionPool},
 	command::MyCommand,
 	fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
-	nix_path,
+	nix_go, nix_go_json,
 };
 
 pub struct FleetConfigInternals {
@@ -29,6 +29,8 @@
 	pub fleet_field: Field,
 	/// fleet_config.configUnchecked
 	pub config_field: Field,
+	/// fleet_config.unchecked
+	pub config_unchecked_field: Field,
 }
 
 #[derive(Clone)]
@@ -95,12 +97,8 @@
 	}
 
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
-		let names = self
-			.fleet_field
-			.select(nix_path!(.configuredHosts))
-			.await?
-			.list_fields()
-			.await?;
+		let fleet_field = &self.fleet_field;
+		let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
 		let mut out = vec![];
 		for name in names {
 			out.push(ConfigHost { name })
@@ -108,9 +106,8 @@
 		Ok(out)
 	}
 	pub async fn system_config(&self, host: &str) -> Result<Field> {
-		self.fleet_field
-			.select(nix_path!(.configuredSystems.{host}.config))
-			.await
+		let fleet_field = &self.fleet_field;
+		Ok(nix_go!(fleet_field.configuredSystems[{ host }].config))
 	}
 
 	pub(super) fn data(&self) -> MutexGuard<FleetData> {
@@ -121,11 +118,8 @@
 	}
 	/// Shared secrets configured in fleet.nix or in flake
 	pub async fn list_configured_shared(&self) -> Result<Vec<String>> {
-		self.config_field
-			.select(nix_path!(.sharedSecrets))
-			.await?
-			.list_fields()
-			.await
+		let config_field = &self.config_field;
+		nix_go!(config_field.sharedSecrets).list_fields().await
 	}
 	/// Shared secrets configured in fleet.nix
 	pub fn list_shared(&self) -> Vec<String> {
@@ -211,11 +205,10 @@
 		Ok(secret.clone())
 	}
 	pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {
-		self.config_field
-			.select(nix_path!(.sharedSecrets.{secret}.expectedOwners))
-			.await?
-			.as_json()
-			.await
+		let config_field = &self.config_field;
+		Ok(nix_go_json!(
+			config_field.sharedSecrets[{ secret }].expectedOwners
+		))
 	}
 
 	pub fn save(&self) -> Result<()> {
@@ -269,21 +262,15 @@
 
 		if self.local_system == "detect" {
 			let builtins_field = Field::field(root_field.clone(), "builtins").await?;
-			let system = builtins_field
-				.select(nix_path!(.currentSystem))
-				.await?;
-			self.local_system = system.as_json().await?;
+			self.local_system = nix_go_json!(builtins_field.currentSystem);
 		}
 		let local_system = self.local_system.clone();
 
 		let fleet_root = Field::field(root_field, "fleetConfigurations").await?;
 
-		let fleet_field = fleet_root
-			.select(nix_path!(.default))
-			.await?;
-		let config_field = fleet_field
-			.select(nix_path!(.configUnchecked))
-			.await?;
+		let fleet_field = nix_go!(fleet_root.default);
+		let config_field = nix_go!(fleet_field.configUnchecked);
+		let config_unchecked_field = nix_go!(fleet_field.unchecked);
 
 		let mut fleet_data_path = directory.clone();
 		fleet_data_path.push("fleet.nix");
@@ -298,6 +285,7 @@
 			nix_args,
 			fleet_field,
 			config_field,
+			config_unchecked_field,
 		})))
 	}
 }
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -1,5 +1,5 @@
 #![recursion_limit = "512"]
-#![feature(try_blocks)]
+#![feature(try_blocks, lint_reasons)]
 
 pub(crate) mod cmds;
 pub(crate) mod command;
@@ -17,7 +17,7 @@
 use anyhow::{bail, Result};
 use clap::Parser;
 
-use cmds::{build_systems::BuildSystems, info::Info, secrets::Secrets};
+use cmds::{build_systems::BuildSystems, info::Info, secrets::Secret};
 use futures::future::LocalBoxFuture;
 use futures::stream::FuturesUnordered;
 use futures::TryStreamExt;
@@ -73,7 +73,7 @@
 	BuildSystems(BuildSystems),
 	/// Secret management
 	#[clap(subcommand)]
-	Secrets(Secrets),
+	Secret(Secret),
 	/// Upload prefetch directory to the nix store
 	Prefetch(Prefetch),
 	/// Config parsing
@@ -92,7 +92,7 @@
 async fn run_command(config: &Config, command: Opts) -> Result<()> {
 	match command {
 		Opts::BuildSystems(c) => c.run(config).await?,
-		Opts::Secrets(s) => s.run(config).await?,
+		Opts::Secret(s) => s.run(config).await?,
 		Opts::Info(i) => i.run(config).await?,
 		Opts::Prefetch(p) => p.run(config).await?,
 	};
modifiedflake.lockdiffbeforeafterboth
--- a/flake.lock
+++ b/flake.lock
@@ -5,11 +5,11 @@
         "systems": "systems"
       },
       "locked": {
-        "lastModified": 1694529238,
-        "narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=",
+        "lastModified": 1701680307,
+        "narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
         "owner": "numtide",
         "repo": "flake-utils",
-        "rev": "ff7b65b44d01cf9ba6a71320833626af21126384",
+        "rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
         "type": "github"
       },
       "original": {
@@ -38,11 +38,11 @@
     },
     "nixpkgs": {
       "locked": {
-        "lastModified": 1698350982,
-        "narHash": "sha256-zoEV8Ad3bOAejp0ys/mOpaHSWrzK+GupZwGGYfuWuEY=",
+        "lastModified": 1703705939,
+        "narHash": "sha256-9s2Ep3NyRDj9HUgfv2TQUwQEanRUAmeXkvKIr/o1XbY=",
         "owner": "nixos",
         "repo": "nixpkgs",
-        "rev": "dd83f9de26ff7c0326468b659ea4729fa5cf6262",
+        "rev": "1ada32da4ba24d7310653c9ac54888bee463f455",
         "type": "github"
       },
       "original": {
@@ -67,11 +67,11 @@
         ]
       },
       "locked": {
-        "lastModified": 1698199907,
-        "narHash": "sha256-n8RtHBIb0rLuYs4RDehW6mj6r6Yam/ODY1af/VCcurw=",
+        "lastModified": 1703643208,
+        "narHash": "sha256-UL4KO8JxnD5rOycwHqBAf84lExF1/VnYMDC7b/wpPDU=",
         "owner": "oxalica",
         "repo": "rust-overlay",
-        "rev": "22b8d29fd22cfaa2c311e0d6fd8a0ed9c2a1152b",
+        "rev": "ce117f3e0de8262be8cd324ee6357775228687cf",
         "type": "github"
       },
       "original": {
modifiedflake.nixdiffbeforeafterboth
--- a/flake.nix
+++ b/flake.nix
@@ -3,35 +3,52 @@
 
   inputs = {
     nixpkgs.url = "github:nixos/nixpkgs/master";
-    rust-overlay = { url = "github:oxalica/rust-overlay"; inputs.nixpkgs.follows = "nixpkgs"; };
-    flake-utils = { url = "github:numtide/flake-utils"; };
+    rust-overlay = {
+      url = "github:oxalica/rust-overlay";
+      inputs.nixpkgs.follows = "nixpkgs";
+    };
+    flake-utils = {url = "github:numtide/flake-utils";};
   };
-  outputs = { self, rust-overlay, flake-utils, nixpkgs }: with nixpkgs.lib; rec {
-    lib = import ./lib { inherit flake-utils; };
-  } // flake-utils.lib.eachDefaultSystem (system:
-    let
-      pkgs = import nixpkgs
-        {
-          inherit system; overlays = [ (import rust-overlay) ];
-        };
-      llvmPkgs = pkgs.buildPackages.llvmPackages_11;
-      rust = (pkgs.rustChannelOf { date = "2023-10-20"; channel = "nightly"; }).default.override { extensions = [ "rust-src" "rust-analyzer" ]; };
-      rustPlatform = pkgs.makeRustPlatform { cargo = rust; rustc = rust; };
-    in
-    {
-		packages = (import ./pkgs) pkgs pkgs;
-      devShell = (pkgs.mkShell.override { stdenv = llvmPkgs.stdenv; }) {
-        nativeBuildInputs = with pkgs; [
-          rust
-          lld
-          cargo-edit
-          cargo-udeps
-          cargo-fuzz
+  outputs = {
+    self,
+    rust-overlay,
+    flake-utils,
+    nixpkgs,
+  }:
+    with nixpkgs.lib;
+      {
+        lib = import ./lib {inherit flake-utils;};
+      }
+      // flake-utils.lib.eachDefaultSystem (system: let
+        pkgs =
+          import nixpkgs
+          {
+            inherit system;
+            overlays = [(import rust-overlay)];
+          };
+        llvmPkgs = pkgs.buildPackages.llvmPackages_11;
+        rust =
+          (pkgs.rustChannelOf {
+            date = "2023-12-26";
+            channel = "nightly";
+          })
+          .default
+          .override {extensions = ["rust-src" "rust-analyzer"];};
+      in {
+        packages = (import ./pkgs) pkgs pkgs;
+        devShell = (pkgs.mkShell.override {stdenv = llvmPkgs.stdenv;}) {
+          nativeBuildInputs = with pkgs; [
+            rust
+            lld
+            cargo-edit
+            cargo-udeps
+            cargo-fuzz
+            cargo-watch
 
-          pkg-config
-          openssl
-          bacon
-        ];
-      };
-    });
+            pkg-config
+            openssl
+            bacon
+          ];
+        };
+      });
 }
modifiedlib/default.nixdiffbeforeafterboth
--- a/lib/default.nix
+++ b/lib/default.nix
@@ -10,80 +10,99 @@
     fleetLib = import ./fleetLib.nix {
       inherit nixpkgs hostNames;
     };
-  in
-    let
-      withData = data: rec {
-        root = nixpkgs.lib.evalModules {
-          modules = (import ../modules/fleet/_modules.nix) ++ [config data];
-          specialArgs = {
-            inherit nixpkgs fleetLib;
-          };
-        };
-        failedAssertions = map (x: x.message) (nixpkgs.lib.filter (x: !x.assertion) root.config.assertions);
-        rootAssertWarn =
-          if failedAssertions != []
-          then throw "Failed assertions:\n${nixpkgs.lib.concatStringsSep "\n" (map (x: "- ${x}") failedAssertions)}"
-          else nixpkgs.lib.showWarnings root.config.warnings root;
-        configuredHosts = rootAssertWarn.config.hosts;
-        configuredSecrets = rootAssertWarn.config.secrets;
-        configuredSystems = configuredSystemsWithExtraModules [];
-        configuredSystemsWithExtraModules = extraModules:
-          nixpkgs.lib.listToAttrs (
-            map
-            (
-              name: {
-                inherit name;
-                value = nixpkgs.lib.nixosSystem {
-                  system = configuredHosts.${name}.system;
-                  modules = configuredHosts.${name}.modules ++ extraModules;
-                  specialArgs = {
-                    inherit fleetLib;
-                    fleet = fleetLib.hostsToAttrs (host: configuredSystems.${host}.config);
-                  };
+  in let
+    root = nixpkgs.lib.evalModules {
+      modules = (import ../modules/fleet/_modules.nix) ++ [config data];
+      specialArgs = {
+        inherit nixpkgs fleetLib;
+      };
+    };
+    failedAssertions = map (x: x.message) (nixpkgs.lib.filter (x: !x.assertion) root.config.assertions);
+    checkedRoot =
+      if failedAssertions != []
+      then throw "Fleet failed assertions:\n${nixpkgs.lib.concatStringsSep "\n" (map (x: "- ${x}") failedAssertions)}"
+      else nixpkgs.lib.showWarnings root.config.warnings root;
+    withData = {
+      root,
+      data,
+    }: rec {
+      configuredHosts = root.config.hosts;
+      configuredUncheckedHosts = root.config.hosts;
+      configuredSystems = configuredSystemsWithExtraModules [];
+      configuredSystemsWithExtraModules = extraModules:
+        nixpkgs.lib.listToAttrs (
+          map
+          (
+            name: {
+              inherit name;
+              value = nixpkgs.lib.nixosSystem {
+                system = configuredHosts.${name}.system;
+                modules = configuredHosts.${name}.modules ++ extraModules;
+                specialArgs = {
+                  inherit fleetLib;
+                  fleet = fleetLib.hostsToAttrs (host: configuredSystems.${host}.config);
                 };
-              }
-            )
-            (builtins.attrNames rootAssertWarn.config.hosts)
-          );
-        buildSystems = {localSystem}: let
-          buildConfigurationModule = {config, ...}: {
-            # Equivalent to nixpkgs.localSystem
-            # nixpkgs.system = localSystem;
-            nixpkgs.buildPlatform.system = localSystem;
-          };
-        in {
-          toplevel = builtins.mapAttrs (_name: value: value.config.system.build.toplevel) (configuredSystemsWithExtraModules [
-            buildConfigurationModule
-            ({...}: {
-              buildTarget = "toplevel";
-            })
-          ]);
-          sdImage = builtins.mapAttrs (_name: value: value.config.system.build.sdImage) (configuredSystemsWithExtraModules [
-            buildConfigurationModule
-            #(nixpkgs + "/nixos/modules/installer/sd-card/sd-image-aarch64-installer.nix")
-            ({...}: {
-              buildTarget = "sd-image";
-            })
-          ]);
-          installationCd = builtins.mapAttrs (_name: value: value.config.system.build.isoImage) (configuredSystemsWithExtraModules [
-            buildConfigurationModule
-            (nixpkgs + "/nixos/modules/installer/cd-dvd/installation-cd-minimal.nix")
-            ({lib, ...}: {
-              buildTarget = "installation-cd";
-              # Needed for https://github.com/NixOS/nixpkgs/issues/58959
-              boot.supportedFilesystems = lib.mkForce ["btrfs" "reiserfs" "vfat" "f2fs" "xfs" "ntfs" "cifs"];
-            })
-          ]);
+              };
+            }
+          )
+          (builtins.attrNames root.config.hosts)
+        );
+      buildableSystems = {localSystem}: let
+        buildConfigurationModule = {config, ...}: {
+          # Equivalent to nixpkgs.localSystem
+          # nixpkgs.system = localSystem;
+          nixpkgs.buildPlatform.system = localSystem;
+        };
+      in
+        configuredSystemsWithExtraModules [
+          buildConfigurationModule
+        ];
+      buildSystems = {localSystem}: let
+        buildConfigurationModule = {config, ...}: {
+          # Equivalent to nixpkgs.localSystem
+          # nixpkgs.system = localSystem;
+          nixpkgs.buildPlatform.system = localSystem;
         };
-        configUnchecked = root.config;
-      };
-      defaultData = withData data;
-    in rec {
-      inherit (defaultData) configuredHosts configuredSecrets configuredSystems buildSystems configUnchecked;
-      injectData = data: let
-        injectedData = withData data;
       in {
-        inherit (injectedData) configuredHosts configuredSecrets configuredSystems buildSystems configUnchecked;
+        toplevel = builtins.mapAttrs (_name: value: value.config.system.build.toplevel) (configuredSystemsWithExtraModules [
+          buildConfigurationModule
+          ({...}: {
+            buildTarget = "toplevel";
+          })
+        ]);
+        sdImage = builtins.mapAttrs (_name: value: value.config.system.build.sdImage) (configuredSystemsWithExtraModules [
+          buildConfigurationModule
+          #(nixpkgs + "/nixos/modules/installer/sd-card/sd-image-aarch64-installer.nix")
+          ({...}: {
+            buildTarget = "sd-image";
+          })
+        ]);
+        installationCd = builtins.mapAttrs (_name: value: value.config.system.build.isoImage) (configuredSystemsWithExtraModules [
+          buildConfigurationModule
+          (nixpkgs + "/nixos/modules/installer/cd-dvd/installation-cd-minimal.nix")
+          ({lib, ...}: {
+            buildTarget = "installation-cd";
+            # Needed for https://github.com/NixOS/nixpkgs/issues/58959
+            boot.supportedFilesystems = lib.mkForce ["btrfs" "reiserfs" "vfat" "f2fs" "xfs" "ntfs" "cifs"];
+          })
+        ]);
       };
+      configUnchecked = root.config;
+    };
+    defaultData = withData {
+      inherit data;
+      root = checkedRoot;
+    };
+    uncheckedData = withData {inherit data root;};
+  in rec {
+    inherit (defaultData) configuredHosts configuredSystems buildSystems configUnchecked buildableSystems;
+    unchecked = {
+      inherit (uncheckedData) configuredHosts configuredSystems buildSystems configUnchecked buildableSystems;
+    };
+    injectData = data: let
+      injectedData = withData data;
+    in {
+      inherit (injectedData) configuredHosts configuredSystems buildSystems configUnchecked;
     };
+  };
 }
modifiedmodules/fleet/secrets.nixdiffbeforeafterboth
--- a/modules/fleet/secrets.nix
+++ b/modules/fleet/secrets.nix
@@ -15,6 +15,9 @@
         type = bool;
         description = "Is this secret owner-dependent, and needs to be regenerated on ownership set change, or it may be just reencrypted";
       };
+      generateImpure = mkOption {
+        type = unspecified;
+      };
       generator = mkOption {
         type = nullOr (submodule {
           packages = mkOption {