git.delta.rocks / jrsonnet / refs/commits / 69498f520d8e

difftreelog

source

cmds/fleet/src/main.rs6.4 KiBsourcehistory
1#![recursion_limit = "512"]23pub(crate) mod cmds;4// pub(crate) mod command;5pub(crate) mod extra_args;67use std::{env, ffi::OsString, process::ExitCode, sync::Arc};89use anyhow::{Result, bail};10use clap::{CommandFactory, Parser};11use cmds::{12	build_systems::{BuildSystems, Deploy},13	complete::Complete,14	info::Info,15	rollback::RollbackSingle,16	secrets::Secret,17	tf::Tf,18};19use fleet_base::{host::Config, opts::FleetOpts};20use futures::{TryStreamExt, future::LocalBoxFuture, stream::FuturesUnordered};21// use host::Config;22#[cfg(feature = "indicatif")]23use human_repr::HumanCount;24#[cfg(feature = "indicatif")]25use indicatif::{ProgressState, ProgressStyle};26use nix_eval::{27	gc_register_my_thread, gc_unregister_my_thread, init_libraries, init_tokio_for_nix,28};29use tracing::{Instrument, error, info, info_span};30#[cfg(feature = "indicatif")]31use tracing_indicatif::IndicatifLayer;32use tracing_subscriber::{EnvFilter, prelude::*};3334#[derive(Parser)]35struct Prefetch {}36impl Prefetch {37	async fn run(&self, config: &Config) -> Result<()> {38		let mut prefetch_dir = config.directory.to_path_buf();39		prefetch_dir.push("prefetch");40		if !prefetch_dir.is_dir() {41			info!("nothing to prefetch: no prefetch directory");42			return Ok(());43		}44		let tasks = FuturesUnordered::new();45		for entry in std::fs::read_dir(&prefetch_dir)? {46			tasks.push(async {47				let entry = entry?;48				if !entry.metadata()?.is_file() {49					bail!("only files should exist in prefetch directory");50				}51				let span = info_span!(52					"prefetching",53					name = entry.file_name().to_string_lossy().as_ref()54				);55				let mut path = OsString::new();56				path.push("file://");57				path.push(entry.path());5859				let mut status = config.local_host().cmd("nix").await?;60				status.args(&config.nix_args);61				status.arg("store").arg("prefetch-file").arg(path);62				status.run_nix_string().instrument(span).await?;63				Ok(())64			});65		}66		tasks.try_collect::<Vec<()>>().await?;67		Ok(())68	}69}7071#[derive(Parser)]72enum Opts {73	/// Build system closures74	BuildSystems(BuildSystems),75	/// Upload and switch system closures76	Deploy(Deploy),77	/// Rollback remote machine by redeploying old generation as the new one78	RollbackSingle(RollbackSingle),79	/// Secret management80	#[clap(subcommand)]81	Secret(Secret),82	/// Upload prefetch directory to the nix store83	Prefetch(Prefetch),84	/// Config parsing85	Info(Info),86	/// Command completions87	#[clap(hide(true))]88	Complete(Complete),89	/// Compile and evaluate terranix configuration90	Tf(Tf),91}9293#[derive(Parser)]94#[clap(version, author)]95struct RootOpts {96	#[clap(flatten)]97	fleet_opts: FleetOpts,98	#[clap(subcommand)]99	command: Opts,100}101102async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {103	match command {104		Opts::BuildSystems(c) => c.run(config, &opts).await?,105		Opts::Deploy(d) => d.run(config, &opts).await?,106		Opts::RollbackSingle(r) => r.run(config, &opts).await?,107		Opts::Secret(s) => s.run(config, &opts).await?,108		Opts::Info(i) => i.run(config).await?,109		Opts::Prefetch(p) => p.run(config).await?,110		Opts::Tf(t) => t.run(config).await?,111		// TODO: actually parse commands before starting the async runtime112		Opts::Complete(c) => {113			tokio::task::spawn_blocking(move || c.run(RootOpts::command())).await?114		}115	};116	Ok(())117}118119fn setup_logging() {120	#[cfg(feature = "indicatif")]121	let indicatif_layer = {122		use std::time::Duration;123124		IndicatifLayer::new().with_progress_style(125			ProgressStyle::with_template(126				"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",127			)128				.unwrap()129				.with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {130					let Some(len) = state.len() else {131						return;132					};133					let pos = state.pos();134					if pos > len {135						let _ = write!(writer, "{}", pos.human_count_bare());136					} else {137						let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());138					}139				})140				.with_key(141					"color_start",142					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {143						let elapsed = state.elapsed();144145						if elapsed > Duration::from_secs(60) {146							// Red147							let _ = write!(writer, "\x1b[{}m", 1 + 30);148						} else if elapsed > Duration::from_secs(30) {149							// Yellow150							let _ = write!(writer, "\x1b[{}m", 3 + 30);151						}152					},153				)154				.with_key(155					"color_end",156					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {157						if state.elapsed() > Duration::from_secs(30) {158							let _ = write!(writer, "\x1b[0m");159						}160					},161				),162		)163	};164165	let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));166167	let reg = tracing_subscriber::registry().with({168		let sub = tracing_subscriber::fmt::layer()169			.without_time()170			.with_target(false);171		#[cfg(feature = "indicatif")]172		let sub = sub.with_writer(indicatif_layer.get_stderr_writer());173		sub.with_filter(filter) // .without,174	});175176	if env::var_os("FLEET_OTEL").is_some() {}177178	// #[cfg(feature = "indicatif")]179	#[cfg(feature = "indicatif")]180	let reg = reg.with(indicatif_layer);181	reg.init();182}183184fn main() -> ExitCode {185	let opts = RootOpts::parse();186	if let Opts::Complete(c) = &opts.command {187		c.run(RootOpts::command());188		return ExitCode::SUCCESS;189	}190191	setup_logging();192193	init_libraries();194195	let runtime = tokio::runtime::Builder::new_multi_thread()196		.enable_all()197		.on_thread_start(|| {198			gc_register_my_thread();199		})200		.on_thread_stop(|| {201			gc_unregister_my_thread();202		})203		.build()204		.expect("failed to build runtime");205	let runtime = Arc::new(runtime);206207	init_tokio_for_nix(runtime.clone());208209	runtime.block_on(async {210		tokio::task::spawn(async move {211			if let Err(e) = main_real(opts).await {212				error!("{e:#}");213				ExitCode::FAILURE214			} else {215				ExitCode::SUCCESS216			}217		})218		.await219		.expect("primary task panicked")220	})221	// async_main(opts)222}223224async fn main_real(opts: RootOpts) -> Result<()> {225	let nix_args = std::env::var_os("NIX_ARGS")226		.map(|a| extra_args::parse_os(&a))227		.transpose()?228		.unwrap_or_default();229	let config = opts.fleet_opts.build(230		nix_args,231		matches!(opts.command, Opts::Deploy(_) | Opts::BuildSystems(_)),232	)?;233234	match run_command(&config, opts.fleet_opts, opts.command).await {235		Ok(()) => {236			config.save()?;237			Ok(())238		}239		Err(e) => {240			let _ = config.save();241			Err(e)242		}243	}244}245246#[cfg(test)]247mod tests {248	use super::*;249250	#[test]251	fn verify_command() {252		use clap::CommandFactory;253		RootOpts::command().debug_assert();254	}255}