git.delta.rocks / jrsonnet / refs/commits / cf89cc01f4f6

difftreelog

source

cmds/fleet/src/main.rs6.3 KiBsourcehistory
1#![recursion_limit = "512"]23pub(crate) mod cmds;4// pub(crate) mod command;5pub(crate) mod extra_args;67use std::{env, ffi::OsString, process::ExitCode};89use anyhow::{Result, bail};10use clap::{CommandFactory, Parser};11use cmds::{12	build_systems::{BuildSystems, Deploy},13	complete::Complete,14	info::Info,15	rollback::RollbackSingle,16	secrets::Secret,17	tf::Tf,18};19use fleet_base::{host::Config, opts::FleetOpts};20use futures::{TryStreamExt, future::LocalBoxFuture, stream::FuturesUnordered};21// use host::Config;22#[cfg(feature = "indicatif")]23use human_repr::HumanCount;24#[cfg(feature = "indicatif")]25use indicatif::{ProgressState, ProgressStyle};26use nix_eval::{gc_register_my_thread, gc_unregister_my_thread, init_libraries};27use tracing::{Instrument, error, info, info_span};28#[cfg(feature = "indicatif")]29use tracing_indicatif::IndicatifLayer;30use tracing_subscriber::{EnvFilter, fmt::format::Format, prelude::*};3132#[derive(Parser)]33struct Prefetch {}34impl Prefetch {35	async fn run(&self, config: &Config) -> Result<()> {36		let mut prefetch_dir = config.directory.to_path_buf();37		prefetch_dir.push("prefetch");38		if !prefetch_dir.is_dir() {39			info!("nothing to prefetch: no prefetch directory");40			return Ok(());41		}42		let tasks = <FuturesUnordered<LocalBoxFuture<Result<()>>>>::new();43		for entry in std::fs::read_dir(&prefetch_dir)? {44			tasks.push(Box::pin(async {45				let entry = entry?;46				if !entry.metadata()?.is_file() {47					bail!("only files should exist in prefetch directory");48				}49				let span = info_span!(50					"prefetching",51					name = entry.file_name().to_string_lossy().as_ref()52				);53				let mut path = OsString::new();54				path.push("file://");55				path.push(entry.path());5657				let mut status = config.local_host().cmd("nix").await?;58				status.args(&config.nix_args);59				status.arg("store").arg("prefetch-file").arg(path);60				status.run_nix_string().instrument(span).await?;61				Ok(())62			}));63		}64		tasks.try_collect::<Vec<()>>().await?;65		Ok(())66	}67}6869#[derive(Parser)]70enum Opts {71	/// Build system closures72	BuildSystems(BuildSystems),73	/// Upload and switch system closures74	Deploy(Deploy),75	/// Rollback remote machine by redeploying old generation as the new one76	RollbackSingle(RollbackSingle),77	/// Secret management78	#[clap(subcommand)]79	Secret(Secret),80	/// Upload prefetch directory to the nix store81	Prefetch(Prefetch),82	/// Config parsing83	Info(Info),84	/// Command completions85	#[clap(hide(true))]86	Complete(Complete),87	/// Compile and evaluate terranix configuration88	Tf(Tf),89}9091#[derive(Parser)]92#[clap(version, author)]93struct RootOpts {94	#[clap(flatten)]95	fleet_opts: FleetOpts,96	#[clap(subcommand)]97	command: Opts,98}99100async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {101	match command {102		Opts::BuildSystems(c) => c.run(config, &opts).await?,103		Opts::Deploy(d) => d.run(config, &opts).await?,104		Opts::RollbackSingle(r) => r.run(config, &opts).await?,105		Opts::Secret(s) => s.run(config, &opts).await?,106		Opts::Info(i) => i.run(config).await?,107		Opts::Prefetch(p) => p.run(config).await?,108		Opts::Tf(t) => t.run(config).await?,109		// TODO: actually parse commands before starting the async runtime110		Opts::Complete(c) => {111			tokio::task::spawn_blocking(move || c.run(RootOpts::command())).await?112		}113	};114	Ok(())115}116117fn setup_logging() {118	#[cfg(feature = "indicatif")]119	let indicatif_layer = {120		use std::time::Duration;121122		IndicatifLayer::new().with_progress_style(123			ProgressStyle::with_template(124				"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",125			)126				.unwrap()127				.with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {128					let Some(len) = state.len() else {129						return;130					};131					let pos = state.pos();132					if pos > len {133						let _ = write!(writer, "{}", pos.human_count_bare());134					} else {135						let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());136					}137				})138				.with_key(139					"color_start",140					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {141						let elapsed = state.elapsed();142143						if elapsed > Duration::from_secs(60) {144							// Red145							let _ = write!(writer, "\x1b[{}m", 1 + 30);146						} else if elapsed > Duration::from_secs(30) {147							// Yellow148							let _ = write!(writer, "\x1b[{}m", 3 + 30);149						}150					},151				)152				.with_key(153					"color_end",154					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {155						if state.elapsed() > Duration::from_secs(30) {156							let _ = write!(writer, "\x1b[0m");157						}158					},159				),160		)161	};162163	let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));164165	let reg = tracing_subscriber::registry().with({166		let sub = tracing_subscriber::fmt::layer()167			.without_time()168			.with_target(false);169		#[cfg(feature = "indicatif")]170		let sub = sub.with_writer(indicatif_layer.get_stderr_writer());171		sub.with_filter(filter) // .without,172	});173174	if env::var_os("FLEET_OTEL").is_some() {}175176	// #[cfg(feature = "indicatif")]177	#[cfg(feature = "indicatif")]178	let reg = reg.with(indicatif_layer);179	reg.init();180}181182fn main() -> ExitCode {183	let opts = RootOpts::parse();184	if let Opts::Complete(c) = &opts.command {185		c.run(RootOpts::command());186		return ExitCode::SUCCESS;187	}188189	setup_logging();190191	init_libraries();192193	tokio::runtime::Builder::new_multi_thread()194		.enable_all()195		.on_thread_start(|| {196			gc_register_my_thread();197		})198		.on_thread_stop(|| {199			gc_unregister_my_thread();200		})201		.build()202		.expect("failed to build runtime")203		.block_on(async {204			if let Err(e) = main_real(opts).await {205				error!("{e:#}");206				ExitCode::FAILURE207			} else {208				ExitCode::SUCCESS209			}210		})211	// async_main(opts)212}213214async fn main_real(opts: RootOpts) -> Result<()> {215	let nix_args = std::env::var_os("NIX_ARGS")216		.map(|a| extra_args::parse_os(&a))217		.transpose()?218		.unwrap_or_default();219	let config = opts220		.fleet_opts221		.build(222			nix_args,223			matches!(opts.command, Opts::Deploy(_) | Opts::BuildSystems(_)),224		)225		.await?;226227	match run_command(&config, opts.fleet_opts, opts.command).await {228		Ok(()) => {229			config.save()?;230			Ok(())231		}232		Err(e) => {233			let _ = config.save();234			Err(e)235		}236	}237}238239#[cfg(test)]240mod tests {241	use super::*;242243	#[test]244	fn verify_command() {245		use clap::CommandFactory;246		RootOpts::command().debug_assert();247	}248}