difftreelog
refactor split build-systems and deploy commands
in: trunk
10 files changed
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -5,3 +5,5 @@
[workspace.dependencies]
nixlike = { path = "./crates/nixlike" }
better-command = { path = "./crates/better-command" }
+uuid = { version = "1.3.3", features = ["v4"] }
+tokio = { version = "1.33.0", features = ["fs", "rt", "macros", "sync", "time", "rt-multi-thread"] }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -8,6 +8,7 @@
[dependencies]
nixlike.workspace = true
better-command.workspace = true
+tokio.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -27,7 +28,6 @@
"wrap_help",
"unicode",
] }
-tokio = { version = "1.33.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth1//! Wrapper around nix repl, which allows to work on nix code, without relying on2//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.34use std::collections::HashMap;5use std::ffi::{OsStr, OsString};6use std::fmt::{self, Display};7use std::path::PathBuf;8use std::process::Stdio;9use std::sync::{Arc, OnceLock};1011use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler};13use futures::StreamExt;14use itertools::Itertools;15use r2d2::{Pool, PooledConnection};16use serde::de::DeserializeOwned;17use serde::{Deserialize, Serialize};18use tokio::io::AsyncWriteExt;19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};20use tokio::select;21use tokio::sync::{mpsc, oneshot, Mutex};22use tokio_util::codec::{FramedRead, LinesCodec};23use tracing::{debug, error, warn, Level};2425const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2627pub struct NixSessionInner {28 full_delimiter: String,29 nix_handler: ClonableHandler<NixHandler>,30 out: OutputHandler,31 stdin: ChildStdin,32 string_wrapping: (String, String),33 number_wrapping: (String, String),3435 executing_command: Arc<Mutex<()>>,3637 next_id: u32,38 free_list: Vec<u32>,39}40const TRAIN_STRING: &str = "\"TRAIN_STRING\"";41const TRAIN_NUMBER: &str = "13141516";4243#[must_use]44struct ErrorCollector<'i, H> {45 collected: Vec<String>,46 inner: &'i mut H,47}48impl<'i, H> ErrorCollector<'i, H> {49 fn new(inner: &'i mut H) -> Self {50 Self {51 collected: vec![],52 inner,53 }54 }55}56impl<H> ErrorCollector<'_, H> {57 fn handle_line_inner(&mut self, msg: &str) -> bool {58 let Some(msg) = msg.strip_prefix("@nix ") else {59 return false;60 };61 #[derive(Deserialize)]62 struct ErrorAction {63 action: String,64 level: u32,65 msg: String,66 }67 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {68 return false;69 };70 if act.action != "msg" || act.level != 0 {71 return false;72 }73 self.collected.push(act.msg);74 true75 }76 fn finish(self) -> Result<()> {77 // fn dedent(s: String) -> String {78 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)79 // }80 if !self.collected.is_empty() {81 bail!(82 "{}",83 self.collected84 .iter()85 .map(|v| {86 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {87 let v = unindent::unindent(f.trim_start());88 v.trim().to_owned()89 } else {90 v.to_owned()91 }92 })93 .join("\n")94 );95 }96 Ok(())97 }98 fn flush(self) {99 for line in self.collected {100 warn!("{line}");101 }102 }103}104impl<H: Handler> Handler for ErrorCollector<'_, H> {105 fn handle_line(&mut self, e: &str) {106 if self.handle_line_inner(e) {107 return;108 }109 self.inner.handle_line(e)110 }111}112113enum OutputLine {114 Out(String),115 Err(String),116}117struct OutputHandler {118 rx: mpsc::Receiver<OutputLine>,119 _cancel_handle: oneshot::Receiver<()>,120}121impl OutputHandler {122 fn new(out: ChildStdout, err: ChildStderr) -> Self {123 let mut out = FramedRead::new(out, LinesCodec::new());124 let mut err = FramedRead::new(err, LinesCodec::new());125 let (tx, rx) = mpsc::channel(20);126 let (mut cancelled, _cancel_handle) = oneshot::channel();127 tokio::spawn(async move {128 loop {129 select! {130 // We should receive errors earlier than synchronization131 biased;132 e = err.next() => {133 let Some(Ok(e)) = e else {134 if e.is_some() {135 error!("bad repl stderr: {e:?}");136 }137 continue;138 };139 let _ = tx.send(OutputLine::Err(e)).await;140 }141 o = out.next() => {142 let Some(Ok(o)) = o else {143 if o.is_some() {144 error!("bad repl stdout: {o:?}");145 }146 continue;147 };148 let _ = tx.send(OutputLine::Out(o)).await;149 }150 // Reader doesn't care about stdout, as this is cancelled.151 // Error still might be useful, to process leftover span closures?152 _ = cancelled.closed() => {153 break;154 }155 }156 }157 });158 Self { rx, _cancel_handle }159 }160 async fn next(&mut self) -> Option<OutputLine> {161 self.rx.recv().await162 }163}164165struct WarnHandler;166impl Handler for WarnHandler {167 fn handle_line(&mut self, e: &str) {168 warn!(target: "nix", "{e}")169 }170}171172impl NixSessionInner {173 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {174 let mut cmd = Command::new("nix");175 cmd.arg("repl")176 .arg(flake)177 .arg("--log-format")178 .arg("internal-json");179 for arg in extra_args {180 cmd.arg(arg);181 }182 cmd.stdin(Stdio::piped());183 cmd.stdout(Stdio::piped());184 cmd.stderr(Stdio::piped());185 let cmd = cmd.spawn()?;186 let stdout = cmd.stdout.unwrap();187 let stderr = cmd.stderr.unwrap();188 let mut out = OutputHandler::new(stdout, stderr);189 let mut stdin = cmd.stdin.unwrap();190 // Standard repl hello doesn't work with internal-json logger191 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;192 stdin.write_all(b"\n").await?;193 stdin.flush().await?;194 let nix_handler = NixHandler::default();195 let mut full_delimiter = None;196 let mut errors = vec![];197 while let Some(line) = out.next().await {198 let line = match line {199 OutputLine::Out(o) => o,200 OutputLine::Err(_e) => {201 // Handle startup errors, but skip repl hello?202 errors.push(_e);203 continue;204 }205 };206 if line.contains(REPL_DELIMITER) {207 debug!("discovered repl delimiter with added colors: {line}");208 full_delimiter = Some(line.to_owned());209 break;210 }211 }212 let Some(full_delimiter) = full_delimiter else {213 for e in errors {214 error!("{e}");215 }216 bail!("failed to discover delimiter");217 };218 let mut res = Self {219 full_delimiter,220 nix_handler: ClonableHandler::new(nix_handler),221 out,222 stdin,223 string_wrapping: Default::default(),224 number_wrapping: Default::default(),225226 executing_command: Arc::new(Mutex::new(())),227228 next_id: 0,229 free_list: vec![],230 };231 res.train().await?;232 Ok(res)233 }234 async fn train(&mut self) -> Result<()> {235 {236 let full_string = self237 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)238 .await?;239 let string_offset = full_string.find(TRAIN_STRING).expect("contained");240 let string_prefix = &full_string[..string_offset];241 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];242 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());243 }244 {245 let full_number = self246 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)247 .await?;248 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");249 let number_prefix = &full_number[..number_offset];250 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];251 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());252 }253 Ok(())254 }255 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {256 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {257 let cmd_str = String::from_utf8_lossy(cmd.as_ref());258 tracing::debug!("{cmd_str}");259 };260 self.stdin.write_all(cmd.as_ref()).await?;261 self.stdin.write_all(b"\n").await?;262 Ok(())263 }264 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {265 let mut out = String::new();266 while let Some(line) = self.out.next().await {267 let line = match line {268 OutputLine::Out(out) => out,269 OutputLine::Err(err) => {270 err_handler.handle_line(&err);271 continue;272 }273 };274 if line == self.full_delimiter {275 return Ok(out);276 }277 if !out.is_empty() {278 out.push('\n');279 }280 out.push_str(&line);281 }282 bail!("didn't reached delimiter");283 }284 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {285 let num = self.number_wrapping.clone();286 let n = self.execute_expression_wrapping(expr, &num).await?;287 Ok(n.parse::<u64>()?)288 }289 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {290 let num = self.string_wrapping.clone();291 let n = self.execute_expression_wrapping(expr, &num).await?;292 let str: String = serde_json::from_str(&n)?;293 Ok(str)294 }295 async fn execute_expression_to_json<V: DeserializeOwned>(296 &mut self,297 expr: impl AsRef<[u8]>,298 ) -> Result<V> {299 let mut fexpr = b"builtins.toJSON (".to_vec();300 fexpr.extend_from_slice(expr.as_ref());301 fexpr.push(b')');302 let v = self303 .execute_expression_string(fexpr)304 .await305 .context("string expression")?;306 serde_json::from_str(&v).context("json parse")307 }308 async fn execute_expression_wrapping(309 &mut self,310 expr: impl AsRef<[u8]>,311 wrapping: &(String, String),312 ) -> Result<String> {313 let mut nix_handler = self.nix_handler.clone();314 let mut collected = ErrorCollector::new(&mut nix_handler);315 let res = self.execute_expression_raw(expr, &mut collected).await?;316 if res.is_empty() {317 collected.finish()?;318 bail!("expected expression, got nothing")319 } else {320 collected.flush()321 };322 let Some(res) = res.strip_prefix(&wrapping.0) else {323 bail!("invalid type")324 };325 let Some(res) = res.strip_suffix(&wrapping.1) else {326 bail!("invalid type")327 };328 Ok(res.to_owned())329 }330 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {331 let mut nix_handler = self.nix_handler.clone();332 let mut collected = ErrorCollector::new(&mut nix_handler);333 let v = self.execute_expression_raw(expr, &mut collected).await?;334 collected.finish()?;335 ensure!(v.is_empty(), "unexpected expression result");336 Ok(())337 }338 async fn execute_expression_raw(339 &mut self,340 expr: impl AsRef<[u8]>,341 err_handler: &mut dyn Handler,342 ) -> Result<String> {343 // Prevent two commands from being executed in parallel, messing with each other.344 let _lock = self.executing_command.clone();345 let _guard = _lock.lock().await;346347 self.send_command(expr).await?;348 // It will be echoed349 self.send_command(REPL_DELIMITER).await?;350 self.read_until_delimiter(err_handler).await351 }352 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {353 let id = self.allocate_id();354 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))355 .await?;356 Ok(id)357 }358359 /// Id should be immediately used360 fn allocate_id(&mut self) -> u32 {361 if let Some(free) = self.free_list.pop() {362 free363 } else {364 let v = self.next_id;365 self.next_id += 1;366 v367 }368 }369 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.370 // async fn free_id(&mut self, id: u32) -> Result<()> {371 // self.execute_expression_empty(format!("sess_field_{id} = null"))372 // .await?;373 // self.free_list.push(id);374 // Ok(())375 // }376}377378#[derive(Clone)]379pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);380381#[derive(Clone)]382pub struct NixExprBuilder {383 out: String,384 used_fields: Vec<Field>,385}386impl NixExprBuilder {387 pub fn object() -> Self {388 NixExprBuilder {389 out: "{ ".to_owned(),390 used_fields: Vec::new(),391 }392 }393 pub fn string(s: &str) -> Self {394 NixExprBuilder {395 out: nixlike::serialize(s)396 .expect("no problems with serializing_string")397 .trim_end()398 .to_owned(),399 used_fields: Vec::new(),400 }401 }402 pub fn serialized(v: impl Serialize) -> Self {403 let serialized = nixlike::serialize(v).expect("invalid value for apply");404 Self {405 out: serialized.trim_end().to_owned(),406 used_fields: Vec::new(),407 }408 }409 pub fn field(f: Field) -> Self {410 Self {411 out: format!("sess_field_{}", f.0.value.expect("no value")),412 used_fields: vec![f],413 }414 }415 pub fn end_obj(&mut self) {416 self.out.push('}');417 }418 pub fn obj_key(&mut self, name: Self, value: Self) {419 self.out.push_str(r#""${"#);420 self.extend(name);421 self.out.push_str(r#"}" = "#);422 self.extend(value);423 self.out.push_str("; ");424 }425426 pub fn extend(&mut self, e: Self) {427 self.out.push_str(&e.out);428 self.used_fields.extend(e.used_fields);429 }430431 pub fn session(&self) -> NixSession {432 let mut session = None;433 for ele in &self.used_fields {434 if session.is_none() {435 session = Some(ele.0.session.clone());436 continue;437 }438 let session = &session.as_ref().expect("checked").0;439 let ele_sess = &ele.0.session.0;440 assert!(441 Arc::ptr_eq(session, ele_sess),442 "can't mix fields from different session"443 );444 }445 session.expect("expr without fields used")446 }447 pub fn index_attr(&mut self, s: &str) {448 let escaped = nixlike::serialize(s).expect("string");449 self.out.push('.');450 self.out.push_str(escaped.trim_end());451 }452}453454#[macro_export]455macro_rules! nix_expr_inner {456 //(@munch_object FIXME: value should be arbitrary nix_expr_inner input... Time to write proc-macro?457 (@obj($o:ident) $field:ident, $($tt:tt)*) => {{458 $o.obj_key(459 NixExprBuilder::string(stringify!($field)),460 NixExprBuilder::field($field),461 );462 nix_expr_inner!(@obj($o) $($tt)*);463 }};464 (@obj($o:ident) $field:ident: $v:block, $($tt:tt)*) => {{465 $o.obj_key(466 NixExprBuilder::string(stringify!($field)),467 NixExprBuilder::serialized(&$v),468 );469 nix_expr_inner!(@obj($o) $($tt)*);470 }};471 (@obj($o:ident)) => {{}};472 (Obj { $($tt:tt)* }) => {{473 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474 let mut out = NixExprBuilder::object();475 nix_expr_inner!(@obj(out) $($tt)*);476 out.end_obj();477 out478 }};479 (@field($o:ident) . $var:ident $($tt:tt)*) => {{480 $o.index_attr(stringify!($var));481 nix_expr_inner!(@field($o) $($tt)*);482 }};483 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{484 $o.push(Index::attr(&$v));485 nix_expr_inner!(@o($o) $($tt)*);486 }};487 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{488 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));489 nix_expr_inner!(@o($o) $($tt)*);490 }};491 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {492 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));493 nix_expr_inner!(@o($o) $($tt)*);494 };495 (@field($o:ident)) => {};496 ($field:ident $($tt:tt)*) => {{497 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};498 #[allow(unused_mut, reason = "might be used if indexed")]499 let mut out = NixExprBuilder::field($field.clone());500 nix_expr_inner!(@field(out) $($tt)*);501 out502 }};503 ($v:literal) => {{504 use $crate::better_nix_eval::NixExprBuilder;505 NixExprBuilder::string($v)506 }};507 ({$v:expr}) => {{508 use $crate::better_nix_eval::NixExprBuilder;509 NixExprBuilder::serialized(&$v)510 }}511}512#[macro_export]513macro_rules! nix_expr {514 ($($tt:tt)+) => {{515 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};516 let expr = nix_expr_inner!($($tt)+);517 Field::new(expr.session(), expr.out)518 }};519}520521#[macro_export]522macro_rules! nix_go {523 (@o($o:ident) . $var:ident $($tt:tt)*) => {{524 $o.push(Index::attr(stringify!($var)));525 nix_go!(@o($o) $($tt)*);526 }};527 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{528 $o.push(Index::attr(&$v));529 nix_go!(@o($o) $($tt)*);530 }};531 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{532 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));533 nix_go!(@o($o) $($tt)*);534 }};535 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {536 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));537 nix_go!(@o($o) $($tt)*);538 };539 (@o($o:ident) | $($var:tt)*) => {540 $o.push(Index::Pipe($crate::nix_expr_inner!($($var)+)));541 };542 (@o($o:ident)) => {};543 ($field:ident $($tt:tt)+) => {{544 use $crate::{nix_go, better_nix_eval::Index};545 let field = $field.clone();546 let mut out = vec![];547 nix_go!(@o(out) $($tt)*);548 field.select(out).await?549 }}550}551#[macro_export]552macro_rules! nix_go_json {553 ($($tt:tt)*) => {{554 $crate::nix_go!($($tt)*).as_json().await?555 }};556}557558#[derive(Clone)]559pub enum Index {560 Var(String),561 String(String),562 Apply(String),563 Expr(NixExprBuilder),564 ExprApply(NixExprBuilder),565 Pipe(NixExprBuilder),566}567impl Index {568 pub fn var(v: impl AsRef<str>) -> Self {569 let v = v.as_ref();570 assert!(571 !(v.contains('.') | v.contains(' ')),572 "bad variable name: {v}"573 );574 Self::Var(v.to_owned())575 }576 pub fn attr(v: impl AsRef<str>) -> Self {577 Self::String(v.as_ref().to_owned())578 }579 pub fn apply(v: impl Serialize) -> Self {580 let serialized = nixlike::serialize(v).expect("invalid value for apply");581 Self::Apply(serialized.trim_end().to_owned())582 }583}584impl Display for Index {585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {586 match self {587 Index::Var(v) => {588 write!(f, "{v}")589 }590 Index::String(k) => {591 let v = nixlike::format_identifier(k.as_str());592 write!(f, ".{v}")593 }594 Index::Apply(o) => {595 write!(f, "<apply>({o})")596 }597 Index::Expr(e) => {598 write!(f, "[{}]", e.out)599 }600 Index::ExprApply(e) => {601 write!(f, "<apply>({})", e.out)602 }603 Index::Pipe(e) => {604 write!(f, "<map>({})", e.out)605 }606 }607 }608}609impl fmt::Debug for Index {610 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {611 write!(f, "{self}")612 }613}614struct PathDisplay<'i>(&'i [Index]);615impl Display for PathDisplay<'_> {616 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {617 for i in self.0 {618 write!(f, "{i}")?;619 }620 Ok(())621 }622}623struct FieldInner {624 full_path: Option<Vec<Index>>,625 session: NixSession,626 value: Option<u32>,627}628fn context(op: &str, full_path: Option<&[Index]>, query: &str) -> String {629 if let Some(full_path) = &full_path {630 format!("on {op}, full path: {}", PathDisplay(full_path))631 } else {632 format!("query: {query:?}")633 }634}635#[derive(Clone)]636pub struct Field(Arc<FieldInner>);637impl Field {638 fn root(session: NixSession) -> Self {639 Self(Arc::new(FieldInner {640 full_path: Some(vec![]),641 session,642 value: None,643 }))644 }645 async fn new(session: NixSession, query: &str) -> Result<Self> {646 let vid = session647 .0648 .lock()649 .await650 .execute_assign(query)651 .await652 .with_context(|| context("new root", None, query))?;653 Ok(Self(Arc::new(FieldInner {654 full_path: None,655 session,656 value: Some(vid),657 })))658 }659 pub async fn field(session: NixSession, field: &str) -> Result<Self> {660 Self::root(session).select([Index::var(field)]).await661 }662 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {663 let mut used_fields = Vec::new();664 let mut name = name.into_iter();665666 let mut full_path = self.0.full_path.clone();667 let mut query = if let Some(id) = self.0.value {668 format!("sess_field_{id}")669 } else {670 let first = name.next();671 if let Some(Index::Var(i)) = first {672 if let Some(full_path) = &mut full_path {673 full_path.push(Index::Var(i.clone()));674 }675 i.clone()676 } else {677 panic!("first path item should be variable, got {first:?}")678 }679 };680 for v in name {681 if let Some(full_path) = &mut full_path {682 full_path.push(v.clone());683 }684 match v {685 Index::Var(_) => panic!("var item may only be first"),686 Index::String(s) => {687 let escaped = nixlike::serialize(s)?;688 query.push('.');689 query.push_str(escaped.trim());690 }691 Index::Apply(a) => {692 // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`693 query = format!("({query} {a})");694 }695 Index::Expr(e) => {696 let index = Field::new(self.0.session.clone(), &e.out).await?;697 used_fields.push(index.clone());698 query.push('.');699 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));700 query.push_str(&index);701 }702 Index::ExprApply(e) => {703 let index = Field::new(self.0.session.clone(), &e.out).await?;704 used_fields.push(index.clone());705 query.push(' ');706 let index = format!("sess_field_{}", index.0.value.expect("value"));707 query.push_str(&index);708 query = format!("({query})");709 }710 Index::Pipe(v) => {711 let index = Field::new(self.0.session.clone(), &v.out).await?;712 used_fields.push(index.clone());713 let index = format!("sess_field_{}", index.0.value.expect("value"));714 query = format!("({index} {query})");715 }716 }717 }718719 let vid = self720 .0721 .session722 .0723 .lock()724 .await725 .execute_assign(&query)726 .await727 .with_context(|| {728 if let Some(full_path) = &full_path {729 format!("full path: {}", PathDisplay(full_path))730 } else {731 format!("query: {query:?}")732 }733 })?;734 Ok(Self(Arc::new(FieldInner {735 full_path,736 session: self.0.session.clone(),737 value: Some(vid),738 })))739 }740 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {741 let id = self.0.value.expect("can't serialize root field");742 let query = format!("sess_field_{id}");743 self.0744 .session745 .0746 .lock()747 .await748 .execute_expression_to_json(&query)749 .await750 .with_context(|| context("as_json", self.0.full_path.as_deref(), &query))751 }752 pub async fn has_field(&self, name: &str) -> Result<bool> {753 let id = self.0.value.expect("can't list root fields");754 let key = nixlike::escape_string(name);755 let query = format!("sess_field_{id} ? {key}");756 self.0757 .session758 .0759 .lock()760 .await761 .execute_expression_to_json(&query)762 .await763 .with_context(|| context("has_field", self.0.full_path.as_deref(), &query))764 }765 pub async fn list_fields(&self) -> Result<Vec<String>> {766 let id = self.0.value.expect("can't list root fields");767 let query = format!("builtins.attrNames sess_field_{id}");768 self.0769 .session770 .0771 .lock()772 .await773 .execute_expression_to_json(&query)774 .await775 .with_context(|| context("list field", self.0.full_path.as_deref(), &query))776 }777 pub async fn type_of(&self) -> Result<String> {778 let id = self.0.value.expect("can't list root fields");779 let query = format!("builtins.typeOf sess_field_{id}");780 self.0781 .session782 .0783 .lock()784 .await785 .execute_expression_to_json(&query)786 .await787 .with_context(|| context("type_of", self.0.full_path.as_deref(), &query))788 }789 pub async fn import(&self) -> Result<Self> {790 let import = Self::new(self.0.session.clone(), "import").await?;791 Ok(nix_go!(self | import))792 }793 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {794 let id = self.0.value.expect("can't use build on not-value");795 let query = format!(":b sess_field_{id}");796 let vid = self797 .0798 .session799 .0800 .lock()801 .await802 .execute_expression_raw(&query, &mut NixHandler::default())803 .await?;804 ensure!(805 !vid.is_empty(),806 "build failed: {}",807 context("build", self.0.full_path.as_deref(), &query),808 );809 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")810 else {811 panic!("unexpected build output: {vid:?}");812 };813 let outputs = vid814 .split('\n')815 .filter(|v| !v.is_empty())816 .map(|v| v.split_once(" -> ").expect("unexpected build output"))817 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))818 .collect();819 Ok(outputs)820 }821}822impl Drop for FieldInner {823 fn drop(&mut self) {824 if let Some(id) = self.value {825 if let Ok(mut lock) = self.session.0.try_lock() {826 lock.free_list.push(id)827 }828 // Leaked829 }830 }831}832struct NixSessionPoolInner {833 flake: OsString,834 nix_args: Vec<OsString>,835}836837#[derive(Debug)]838pub struct NixPoolError(anyhow::Error);839impl From<anyhow::Error> for NixPoolError {840 fn from(value: anyhow::Error) -> Self {841 Self(value)842 }843}844impl std::error::Error for NixPoolError {}845impl std::fmt::Display for NixPoolError {846 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {847 self.0.fmt(f)848 }849}850impl r2d2::ManageConnection for NixSessionPoolInner {851 type Connection = NixSessionInner;852 type Error = NixPoolError;853 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {854 let _v = TOKIO_RUNTIME855 .get()856 .expect("missed tokio runtime init!")857 .enter();858 Ok(futures::executor::block_on(NixSessionInner::new(859 self.flake.as_os_str(),860 self.nix_args.iter().map(OsString::as_os_str),861 ))?)862 }863864 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {865 let _v = TOKIO_RUNTIME866 .get()867 .expect("missed tokio runtime init!")868 .enter();869 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;870 if res != 4 {871 return Err(anyhow!("sanity check failed").into());872 };873 Ok(())874 }875876 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {877 false878 }879}880pub struct NixSessionPool(Pool<NixSessionPoolInner>);881impl NixSessionPool {882 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {883 let inner = tokio::task::block_in_place(|| {884 r2d2::Builder::<NixSessionPoolInner>::new()885 .min_idle(Some(0))886 .build(NixSessionPoolInner { flake, nix_args })887 })?;888 Ok(Self(inner))889 }890 pub async fn get(&self) -> Result<NixSession> {891 let v = tokio::task::block_in_place(|| self.0.get())?;892 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))893 }894}895896pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -6,34 +6,40 @@
use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
-use clap::Parser;
+use clap::{Parser, ValueEnum};
use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
-#[derive(Parser, Clone)]
-pub struct BuildSystems {
+#[derive(Parser)]
+pub struct Deploy {
/// Disable automatic rollback
#[clap(long)]
disable_rollback: bool,
- #[clap(subcommand)]
- subcommand: Subcommand,
+ action: DeployAction,
}
-enum UploadAction {
+#[derive(ValueEnum, Clone, Copy)]
+enum DeployAction {
+ /// Upload derivation, but do not execute the update.
+ Upload,
+ /// Upload and execute the activation script, old version will be used after reboot.
Test,
+ /// Upload and set as current system profile, but do not execute activation script.
Boot,
+ /// Upload, set current profile, and execute activation script.
Switch,
}
-impl UploadAction {
- fn name(&self) -> &'static str {
+
+impl DeployAction {
+ pub(crate) fn name(&self) -> Option<&'static str> {
match self {
- UploadAction::Test => "test",
- UploadAction::Boot => "boot",
- UploadAction::Switch => "switch",
+ DeployAction::Upload => None,
+ DeployAction::Test => Some("test"),
+ DeployAction::Boot => Some("boot"),
+ DeployAction::Switch => Some("switch"),
}
}
-
pub(crate) fn should_switch_profile(&self) -> bool {
matches!(self, Self::Switch | Self::Boot)
}
@@ -42,69 +48,15 @@
}
pub(crate) fn should_schedule_rollback_run(&self) -> bool {
matches!(self, Self::Switch | Self::Test)
- }
-}
-
-enum PackageAction {
- SdImage,
- InstallationCd,
-}
-impl PackageAction {
- fn build_attr(&self) -> String {
- match self {
- PackageAction::SdImage => "sdImage".to_owned(),
- PackageAction::InstallationCd => "isoImage".to_owned(),
- }
- }
-}
-
-enum Action {
- Upload { action: Option<UploadAction> },
- Package(PackageAction),
-}
-impl Action {
- fn build_attr(&self) -> String {
- match self {
- Action::Upload { .. } => "toplevel".to_owned(),
- Action::Package(p) => p.build_attr(),
- }
}
}
-impl From<Subcommand> for Action {
- fn from(s: Subcommand) -> Self {
- match s {
- Subcommand::Upload => Self::Upload { action: None },
- Subcommand::Test => Self::Upload {
- action: Some(UploadAction::Test),
- },
- Subcommand::Boot => Self::Upload {
- action: Some(UploadAction::Boot),
- },
- Subcommand::Switch => Self::Upload {
- action: Some(UploadAction::Switch),
- },
- Subcommand::SdImage => Self::Package(PackageAction::SdImage),
- Subcommand::InstallationCd => Self::Package(PackageAction::InstallationCd),
- }
- }
-}
-
#[derive(Parser, Clone)]
-enum Subcommand {
- /// Upload, but do not switch
- Upload,
- /// Upload + switch to built system until reboot
- Test,
- /// Upload + switch to built system after reboot
- Boot,
- /// Upload + test + boot
- Switch,
-
- /// Build SD .img image
- SdImage,
- /// Build an installation cd ISO image
- InstallationCd,
+pub struct BuildSystems {
+ /// Attribute to build. Systems are deployed from "toplevel" attr, well-known used attributes
+ /// are "sdImage"/"isoImage", and your configuration may include any other build attributes.
+ #[clap(long, default_value = "toplevel")]
+ build_attr: String,
}
struct Generation {
@@ -163,11 +115,11 @@
Ok(current)
}
-async fn execute_upload(
- build: &BuildSystems,
- action: UploadAction,
+async fn deploy_task(
+ action: DeployAction,
host: &ConfigHost,
built: PathBuf,
+ disable_rollback: bool,
) -> Result<()> {
let mut failed = false;
// TODO: Lockfile, to prevent concurrent system switch?
@@ -175,7 +127,7 @@
// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
// unit name conflict in systemd-run
// This code is tied to rollback.nix
- if !build.disable_rollback {
+ if !disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
let generation = get_current_generation(host).await?;
@@ -235,13 +187,13 @@
switch_script.push("bin");
switch_script.push("switch-to-configuration");
let mut cmd = host.cmd(switch_script).in_current_span().await?;
- cmd.arg(action.name());
+ cmd.arg(action.name().expect("upload.should_activate == false"));
if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
}
- if !build.disable_rollback {
+ if !disable_rollback {
if failed {
info!("executing rollback");
if let Err(e) = host
@@ -280,97 +232,45 @@
Ok(())
}
-impl BuildSystems {
- async fn build_task(self, config: Config, host: String) -> Result<()> {
- info!("building");
- let host = config.host(&host).await?;
- let action = Action::from(self.subcommand.clone());
- let fleet_config = &config.config_field;
- let drv = nix_go!(
- fleet_config.hosts[{ &host.name }].nixosSystem.config.system.build[{ action.build_attr() }]
- );
- let outputs = drv.build().await.map_err(|e| {
- if action.build_attr() == "sdImage" {
+async fn build_task(config: Config, host: String, build_attr: &str) -> Result<PathBuf> {
+ info!("building");
+ let host = config.host(&host).await?;
+ // let action = Action::from(self.subcommand.clone());
+ let fleet_config = &config.config_field;
+ let drv = nix_go!(
+ fleet_config.hosts[{ &host.name }]
+ .nixosSystem
+ .config
+ .system
+ .build[{ build_attr }]
+ );
+ let outputs = drv.build().await.map_err(|e| {
+ if build_attr == "sdImage" {
info!("sd-image build failed");
info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");
}
e
})?;
- let out_output = outputs
- .get("out")
- .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;
-
- match action {
- Action::Upload { action } => {
- if !config.is_local(&host.name) {
- info!("uploading system closure");
- {
- // TODO: Move to remote_derivation method.
- // Alternatively, nix store make-content-addressed can be used,
- // at least for the first deployment, to provide trusted store key.
- //
- // It is much slower, yet doesn't require root on the deployer machine.
- let mut sign = MyCommand::new("nix");
- // Private key for host machine is registered in nix-sign.nix
- sign.arg("store")
- .arg("sign")
- .comparg("--key-file", "/etc/nix/private-key")
- .arg("-r")
- .arg(out_output);
- if let Err(e) = sign.sudo().run_nix().await {
- warn!("Failed to sign store paths: {e}");
- };
- }
- let mut tries = 0;
- loop {
- match host.remote_derivation(out_output).await {
- Ok(remote) => {
- assert!(&remote == out_output, "CA derivations aren't implemented");
- break;
- }
- Err(e) if tries < 3 => {
- tries += 1;
- warn!("Copy failure ({}/3): {}", tries, e);
- sleep(Duration::from_millis(5000)).await;
- }
- Err(e) => return Err(e),
- }
- }
- }
- if let Some(action) = action {
- execute_upload(&self, action, &host, out_output.clone()).await?
- }
- }
- Action::Package(PackageAction::SdImage) => {
- let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host.name));
-
- info!("linking sd image to {:?}", out);
- symlink(out_output, out)?;
- }
- Action::Package(PackageAction::InstallationCd) => {
- let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host.name));
+ let out_output = outputs
+ .get("out")
+ .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;
- info!("linking iso image to {:?}", out);
- symlink(out_output, out)?;
- }
- };
- Ok(())
- }
+ Ok(out_output.clone())
+}
+impl BuildSystems {
pub async fn run(self, config: &Config) -> Result<()> {
let hosts = config.list_hosts().await?;
let set = LocalSet::new();
- let this = &self;
+ let build_attr = self.build_attr.clone();
for host in hosts.into_iter() {
if config.should_skip(&host.name) {
continue;
}
let config = config.clone();
- let this = this.clone();
- let span = info_span!("deployment", host = field::display(&host.name));
+ let span = info_span!("build", host = field::display(&host.name));
let hostname = host.name;
+ let build_attr = build_attr.clone();
// FIXME: Since the introduction of better-nix-eval,
// due to single repl used for builds, hosts are waiting for each other to build,
// instead of building concurrently.
@@ -384,11 +284,94 @@
// multiple hosts.
set.spawn_local(
(async move {
- match this.build_task(config, hostname).await {
- Ok(_) => {}
+ let built = match build_task(config, hostname.clone(), &build_attr).await {
+ Ok(path) => path,
+ Err(e) => {
+ error!("failed to deploy host: {}", e);
+ return;
+ }
+ };
+ // TODO: Handle error
+ let mut out = current_dir().expect("cwd exists");
+ out.push(format!("built-{}", hostname));
+
+ info!("linking iso image to {:?}", out);
+ if let Err(e) = symlink(built, out) {
+ error!("failed to symlink: {e}")
+ }
+ })
+ .instrument(span),
+ );
+ }
+ set.await;
+ Ok(())
+ }
+}
+
+impl Deploy {
+ pub async fn run(self, config: &Config) -> Result<()> {
+ let hosts = config.list_hosts().await?;
+ let set = LocalSet::new();
+ for host in hosts.into_iter() {
+ if config.should_skip(&host.name) {
+ continue;
+ }
+ let config = config.clone();
+ let span = info_span!("deploy", host = field::display(&host.name));
+ let hostname = host.name.clone();
+ // FIXME: Fix repl concurrency (see build-systems)
+ set.spawn_local(
+ (async move {
+ let built = match build_task(config.clone(), hostname.clone(), "toplevel").await
+ {
+ Ok(path) => path,
Err(e) => {
- error!("failed to deploy host: {}", e)
+ error!("failed to deploy host: {}", e);
+ return;
}
+ };
+ if !config.is_local(&hostname) {
+ info!("uploading system closure");
+ {
+ // TODO: Move to remote_derivation method.
+ // Alternatively, nix store make-content-addressed can be used,
+ // at least for the first deployment, to provide trusted store key.
+ //
+ // It is much slower, yet doesn't require root on the deployer machine.
+ let mut sign = MyCommand::new("nix");
+ // Private key for host machine is registered in nix-sign.nix
+ sign.arg("store")
+ .arg("sign")
+ .comparg("--key-file", "/etc/nix/private-key")
+ .arg("-r")
+ .arg(&built);
+ if let Err(e) = sign.sudo().run_nix().await {
+ warn!("Failed to sign store paths: {e}");
+ };
+ }
+ let mut tries = 0;
+ loop {
+ match host.remote_derivation(&built).await {
+ Ok(remote) => {
+ assert!(remote == built, "CA derivations aren't implemented");
+ break;
+ }
+ Err(e) if tries < 3 => {
+ tries += 1;
+ warn!("copy failure ({}/3): {}", tries, e);
+ sleep(Duration::from_millis(5000)).await;
+ }
+ Err(e) => {
+ error!("upload failed: {e}");
+ return;
+ }
+ }
+ }
+ }
+ if let Err(e) =
+ deploy_task(self.action, &host, built, self.disable_rollback).await
+ {
+ error!("activation failed: {e}");
}
})
.instrument(span),
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,8 +7,6 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::StreamExt;
-use itertools::Itertools;
use owo_colors::OwoColorize;
use serde::Deserialize;
use std::{
@@ -570,7 +568,7 @@
config.replace_shared(
name.to_owned(),
update_owner_set(
- &name,
+ name,
config,
data,
secret,
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -14,7 +14,6 @@
use openssh::SessionBuilder;
use serde::de::DeserializeOwned;
use tempfile::NamedTempFile;
-use tracing::instrument;
use crate::{
better_nix_eval::{Field, NixSessionPool},
@@ -90,6 +89,7 @@
cmd.arg(path);
cmd.run_string().await
}
+ #[allow(dead_code)]
pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {
let text = self.read_file_text(path).await?;
Ok(serde_json::from_str(&text)?)
cmds/fleet/src/main.rsdiffbeforeafterboth--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -12,14 +12,17 @@
mod fleetdata;
use std::ffi::OsString;
-use std::io::{stderr, stdout, Write};
use std::process::exit;
use std::time::Duration;
use anyhow::{bail, Result};
use clap::Parser;
-use cmds::{build_systems::BuildSystems, info::Info, secrets::Secret};
+use cmds::{
+ build_systems::{BuildSystems, Deploy},
+ info::Info,
+ secrets::Secret,
+};
use futures::future::LocalBoxFuture;
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
@@ -73,6 +76,8 @@
enum Opts {
/// Prepare systems for deployments
BuildSystems(BuildSystems),
+
+ Deploy(Deploy),
/// Secret management
#[clap(subcommand)]
Secret(Secret),
@@ -94,6 +99,7 @@
async fn run_command(config: &Config, command: Opts) -> Result<()> {
match command {
Opts::BuildSystems(c) => c.run(config).await?,
+ Opts::Deploy(d) => d.run(config).await?,
Opts::Secret(s) => s.run(config).await?,
Opts::Info(i) => i.run(config).await?,
Opts::Prefetch(p) => p.run(config).await?,
crates/better-command/src/handler.rsdiffbeforeafterboth--- a/crates/better-command/src/handler.rs
+++ b/crates/better-command/src/handler.rs
@@ -165,7 +165,7 @@
drv = pkg;
}
}
- // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ info!(target: "nix","copying {} {} -> {}", drv, from, to);
let span = info_span!("copy", from, to, drv);
span.pb_start();
self.spans.insert(id, span);
flake.lockdiffbeforeafterboth--- a/flake.lock
+++ b/flake.lock
@@ -38,11 +38,11 @@
},
"nixpkgs": {
"locked": {
- "lastModified": 1703974965,
- "narHash": "sha256-dvZjLuAcLnv25bqStTL2ZICC5YSs8aynF5amRM+I6UM=",
+ "lastModified": 1704409229,
+ "narHash": "sha256-Vc41cRJ3trOnocovLe0zZE35pK5Lfuo/zHk0xx3CNDY=",
"owner": "nixos",
"repo": "nixpkgs",
- "rev": "9f434bd436e2bb5615827469ed651e30c26daada",
+ "rev": "786f788914f2a6e94cedf361541894e972b8fd23",
"type": "github"
},
"original": {
@@ -67,11 +67,11 @@
]
},
"locked": {
- "lastModified": 1703902408,
- "narHash": "sha256-qXdWvu+tlgNjeoz8yQMRKSom6QyRROfgpmeOhwbujqw=",
+ "lastModified": 1704075545,
+ "narHash": "sha256-L3zgOuVKhPjKsVLc3yTm2YJ6+BATyZBury7wnhyc8QU=",
"owner": "oxalica",
"repo": "rust-overlay",
- "rev": "319f57cd2c34348c55970a4bf2b35afe82088681",
+ "rev": "a0df72e106322b67e9c6e591fe870380bd0da0d5",
"type": "github"
},
"original": {
flake.nixdiffbeforeafterboth--- a/flake.nix
+++ b/flake.nix
@@ -29,7 +29,7 @@
llvmPkgs = pkgs.buildPackages.llvmPackages_11;
rust =
(pkgs.rustChannelOf {
- date = "2023-12-29";
+ date = "2024-01-01";
channel = "nightly";
})
.default