git.delta.rocks / jrsonnet / refs/commits / 493bea4220c4

difftreelog

source

cmds/fleet/src/main.rs6.3 KiBsourcehistory
1#![recursion_limit = "512"]23pub(crate) mod cmds;4// pub(crate) mod command;5pub(crate) mod extra_args;67use std::{env, ffi::OsString, process::ExitCode, sync::Arc};89use anyhow::{Result, bail};10use clap::{CommandFactory, Parser};11use cmds::{12	build_systems::{BuildSystems, Deploy},13	complete::Complete,14	info::Info,15	rollback::RollbackSingle,16	secrets::Secret,17	tf::Tf,18};19use fleet_base::{host::Config, opts::FleetOpts};20use futures::{TryStreamExt, stream::FuturesUnordered};21#[cfg(feature = "indicatif")]22use human_repr::HumanCount;23#[cfg(feature = "indicatif")]24use indicatif::{ProgressState, ProgressStyle};25use nix_eval::{26	gc_register_my_thread, gc_unregister_my_thread, init_libraries, init_tokio_for_nix,27};28use tracing::{Instrument, error, info, info_span};29#[cfg(feature = "indicatif")]30use tracing_indicatif::IndicatifLayer;31use tracing_subscriber::{EnvFilter, prelude::*};3233#[derive(Parser)]34struct Prefetch {}35impl Prefetch {36	async fn run(&self, config: &Config) -> Result<()> {37		let mut prefetch_dir = config.directory.to_path_buf();38		prefetch_dir.push("prefetch");39		if !prefetch_dir.is_dir() {40			info!("nothing to prefetch: no prefetch directory");41			return Ok(());42		}43		let tasks = FuturesUnordered::new();44		for entry in std::fs::read_dir(&prefetch_dir)? {45			tasks.push(async {46				let entry = entry?;47				if !entry.metadata()?.is_file() {48					bail!("only files should exist in prefetch directory");49				}50				let span = info_span!(51					"prefetching",52					name = entry.file_name().to_string_lossy().as_ref()53				);54				let mut path = OsString::new();55				path.push("file://");56				path.push(entry.path());5758				let mut status = config.local_host().cmd("nix").await?;59				status.args(&config.nix_args);60				status.arg("store").arg("prefetch-file").arg(path);61				status.run_nix_string().instrument(span).await?;62				Ok(())63			});64		}65		tasks.try_collect::<Vec<()>>().await?;66		Ok(())67	}68}6970#[derive(Parser)]71enum Opts {72	/// Build system closures73	BuildSystems(BuildSystems),74	/// Upload and switch system closures75	Deploy(Deploy),76	/// Rollback remote machine by redeploying old generation as the new one77	RollbackSingle(RollbackSingle),78	/// Secret management79	#[clap(subcommand)]80	Secret(Secret),81	/// Upload prefetch directory to the nix store82	Prefetch(Prefetch),83	/// Config parsing84	Info(Info),85	/// Command completions86	#[clap(hide(true))]87	Complete(Complete),88	/// Compile and evaluate terranix configuration89	Tf(Tf),90}9192#[derive(Parser)]93#[clap(version, author)]94struct RootOpts {95	#[clap(flatten)]96	fleet_opts: FleetOpts,97	#[clap(subcommand)]98	command: Opts,99}100101async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {102	match command {103		Opts::BuildSystems(c) => c.run(config, &opts).await?,104		Opts::Deploy(d) => d.run(config, &opts).await?,105		Opts::RollbackSingle(r) => r.run(config, &opts).await?,106		Opts::Secret(s) => s.run(config, &opts).await?,107		Opts::Info(i) => i.run(config).await?,108		Opts::Prefetch(p) => p.run(config).await?,109		Opts::Tf(t) => t.run(config).await?,110		// TODO: actually parse commands before starting the async runtime111		Opts::Complete(c) => {112			tokio::task::spawn_blocking(move || c.run(RootOpts::command())).await?113		}114	};115	Ok(())116}117118fn setup_logging() {119	#[cfg(feature = "indicatif")]120	let indicatif_layer = {121		use std::time::Duration;122123		IndicatifLayer::new().with_progress_style(124			ProgressStyle::with_template(125				"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",126			)127				.unwrap()128				.with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {129					let Some(len) = state.len() else {130						return;131					};132					let pos = state.pos();133					if pos > len {134						let _ = write!(writer, "{}", pos.human_count_bare());135					} else {136						let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());137					}138				})139				.with_key(140					"color_start",141					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {142						let elapsed = state.elapsed();143144						if elapsed > Duration::from_secs(60) {145							// Red146							let _ = write!(writer, "\x1b[{}m", 1 + 30);147						} else if elapsed > Duration::from_secs(30) {148							// Yellow149							let _ = write!(writer, "\x1b[{}m", 3 + 30);150						}151					},152				)153				.with_key(154					"color_end",155					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {156						if state.elapsed() > Duration::from_secs(30) {157							let _ = write!(writer, "\x1b[0m");158						}159					},160				),161		)162	};163164	let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));165166	let reg = tracing_subscriber::registry().with({167		let sub = tracing_subscriber::fmt::layer()168			.without_time()169			.with_target(false);170		#[cfg(feature = "indicatif")]171		let sub = sub.with_writer(indicatif_layer.get_stderr_writer());172		sub.with_filter(filter) // .without,173	});174175	if env::var_os("FLEET_OTEL").is_some() {}176177	// #[cfg(feature = "indicatif")]178	#[cfg(feature = "indicatif")]179	let reg = reg.with(indicatif_layer);180	reg.init();181}182183fn main() -> ExitCode {184	let opts = RootOpts::parse();185	if let Opts::Complete(c) = &opts.command {186		c.run(RootOpts::command());187		return ExitCode::SUCCESS;188	}189190	setup_logging();191192	init_libraries();193194	let runtime = tokio::runtime::Builder::new_multi_thread()195		.enable_all()196		.on_thread_start(|| {197			gc_register_my_thread();198		})199		.on_thread_stop(|| {200			gc_unregister_my_thread();201		})202		.build()203		.expect("failed to build runtime");204	let runtime = Arc::new(runtime);205206	init_tokio_for_nix(runtime.clone());207208	runtime.block_on(async {209		tokio::task::spawn(async move {210			if let Err(e) = main_real(opts).await {211				error!("{e:#}");212				ExitCode::FAILURE213			} else {214				ExitCode::SUCCESS215			}216		})217		.await218		.expect("primary task panicked")219	})220	// async_main(opts)221}222223async fn main_real(opts: RootOpts) -> Result<()> {224	let nix_args = std::env::var_os("NIX_ARGS")225		.map(|a| extra_args::parse_os(&a))226		.transpose()?227		.unwrap_or_default();228	let config = opts.fleet_opts.build(229		nix_args,230		matches!(opts.command, Opts::Deploy(_) | Opts::BuildSystems(_)),231	)?;232233	match run_command(&config, opts.fleet_opts, opts.command).await {234		Ok(()) => {235			config.save()?;236			Ok(())237		}238		Err(e) => {239			let _ = config.save();240			Err(e)241		}242	}243}244245#[cfg(test)]246mod tests {247	use super::*;248249	#[test]250	fn verify_command() {251		use clap::CommandFactory;252		RootOpts::command().debug_assert();253	}254}