git.delta.rocks / jrsonnet / refs/heads / trunk

difftreelog

source

cmds/fleet/src/main.rs7.7 KiBsourcehistory
1#![recursion_limit = "512"]23pub(crate) mod cmds;4// pub(crate) mod command;5pub(crate) mod extra_args;67use std::{env, ffi::OsString, process::ExitCode, sync::Arc};89use anyhow::{Result, bail};10use clap::{CommandFactory, Parser};11use cmds::{12	build_systems::{BuildSystems, Deploy},13	complete::Complete,14	info::Info,15	rollback::RollbackSingle,16	secrets::Secret,17	tf::Tf,18};19use fleet_base::{host::Config, opts::FleetOpts};20use futures::{TryStreamExt, stream::FuturesUnordered};21#[cfg(feature = "indicatif")]22use human_repr::HumanCount;23#[cfg(feature = "indicatif")]24use indicatif::{ProgressState, ProgressStyle};25use nix_eval::{26	gc_register_my_thread, gc_unregister_my_thread, init_libraries, init_tokio_for_nix,27};28use opentelemetry::trace::TracerProvider;29use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;30use opentelemetry_exporter_env::{31	OtlpBaseSettings, OtlpLogsSettings, OtlpTracesSettings, ResolvedOtlpSettings,32};33use opentelemetry_sdk::{logs::SdkLoggerProvider, trace::SdkTracerProvider};34use tracing::{Instrument, error, info, info_span};35#[cfg(feature = "indicatif")]36use tracing_indicatif::IndicatifLayer;37use tracing_subscriber::{EnvFilter, prelude::*};3839#[derive(Parser)]40struct Prefetch {}41impl Prefetch {42	async fn run(&self, config: &Config) -> Result<()> {43		let mut prefetch_dir = config.directory.to_path_buf();44		prefetch_dir.push("prefetch");45		if !prefetch_dir.is_dir() {46			info!("nothing to prefetch: no prefetch directory");47			return Ok(());48		}49		let tasks = FuturesUnordered::new();50		for entry in std::fs::read_dir(&prefetch_dir)? {51			tasks.push(async {52				let entry = entry?;53				if !entry.metadata()?.is_file() {54					bail!("only files should exist in prefetch directory");55				}56				let span = info_span!(57					"prefetching",58					name = entry.file_name().to_string_lossy().as_ref()59				);60				let mut path = OsString::new();61				path.push("file://");62				path.push(entry.path());6364				let mut status = config.local_host().cmd("nix").await?;65				status.args(&config.nix_args);66				status.arg("store").arg("prefetch-file").arg(path);67				status.run_nix_string().instrument(span).await?;68				Ok(())69			});70		}71		tasks.try_collect::<Vec<()>>().await?;72		Ok(())73	}74}7576#[derive(Parser)]77enum Opts {78	/// Build system closures79	BuildSystems(BuildSystems),80	/// Upload and switch system closures81	Deploy(Deploy),82	/// Rollback remote machine by redeploying old generation as the new one83	RollbackSingle(RollbackSingle),84	/// Secret management85	#[clap(subcommand)]86	Secret(Secret),87	/// Upload prefetch directory to the nix store88	Prefetch(Prefetch),89	/// Config parsing90	Info(Info),91	/// Command completions92	#[clap(hide(true))]93	Complete(Complete),94	/// Compile and evaluate terranix configuration95	Tf(Tf),96}9798#[derive(Parser)]99#[clap(version, author)]100struct RootOpts {101	#[clap(flatten)]102	fleet_opts: FleetOpts,103	#[clap(subcommand)]104	command: Opts,105	#[clap(long, next_help_heading = "Telemetry", env = "OTEL_FLEET")]106	otel: bool,107	#[clap(flatten)]108	otlp_base: OtlpBaseSettings,109	#[clap(flatten)]110	otel_logs: OtlpLogsSettings,111	#[clap(flatten)]112	otel_traces: OtlpTracesSettings,113}114115async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {116	match command {117		Opts::BuildSystems(c) => c.run(config, &opts).await?,118		Opts::Deploy(d) => d.run(config, &opts).await?,119		Opts::RollbackSingle(r) => r.run(config, &opts).await?,120		Opts::Secret(s) => s.run(config, &opts).await?,121		Opts::Info(i) => i.run(config).await?,122		Opts::Prefetch(p) => p.run(config).await?,123		Opts::Tf(t) => t.run(config).await?,124		// TODO: actually parse commands before starting the async runtime125		Opts::Complete(c) => {126			tokio::task::spawn_blocking(move || c.run(RootOpts::command())).await?127		}128	};129	Ok(())130}131132fn setup_logging(opts: &RootOpts) -> Result<()> {133	#[cfg(feature = "indicatif")]134	let indicatif_layer = {135		use std::time::Duration;136137		IndicatifLayer::new().with_max_progress_bars(10, Some(ProgressStyle::default_spinner()))138			.with_progress_style(139			ProgressStyle::with_template(140				"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",141			)142				.unwrap()143				.with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {144					let Some(len) = state.len() else {145						return;146					};147					let pos = state.pos();148					if pos > len {149						let _ = write!(writer, "{}", pos.human_count_bare());150					} else {151						let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());152					}153				})154				.with_key(155					"color_start",156					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {157						let elapsed = state.elapsed();158159						if elapsed > Duration::from_secs(60) {160							// Red161							let _ = write!(writer, "\x1b[{}m", 1 + 30);162						} else if elapsed > Duration::from_secs(30) {163							// Yellow164							let _ = write!(writer, "\x1b[{}m", 3 + 30);165						}166					},167				)168				.with_key(169					"color_end",170					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {171						if state.elapsed() > Duration::from_secs(30) {172							let _ = write!(writer, "\x1b[0m");173						}174					},175				),176		)177	};178179	let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));180181	let reg = tracing_subscriber::registry().with({182		let sub = tracing_subscriber::fmt::layer()183			.without_time()184			.with_target(false);185		#[cfg(feature = "indicatif")]186		let sub = sub.with_writer(indicatif_layer.get_stderr_writer());187		sub.with_filter(filter) // .without,188	});189190	#[cfg(feature = "indicatif")]191	let reg = reg.with(indicatif_layer);192193	if opts.otel {194		let traces = ResolvedOtlpSettings::traces(&opts.otlp_base, &opts.otel_traces)?;195		let span_exporter = traces.span_exporter()?;196		let logs = ResolvedOtlpSettings::logs(&opts.otlp_base, &opts.otel_logs)?;197		let log_exporter = logs.log_exporter()?;198199		let span_provider = SdkTracerProvider::builder()200			.with_batch_exporter(span_exporter)201			.build();202		let log_provider = SdkLoggerProvider::builder()203			.with_batch_exporter(log_exporter)204			.build();205206		let logger = OpenTelemetryTracingBridge::new(&log_provider);207		let tracer = span_provider.tracer("fleet");208209		let reg = reg210			.with(tracing_opentelemetry::layer().with_tracer(tracer))211			.with(logger);212213		reg.init();214	} else {215		reg.init();216	};217218	Ok(())219}220221fn main() -> ExitCode {222	let opts = RootOpts::parse();223	if let Opts::Complete(c) = &opts.command {224		c.run(RootOpts::command());225		return ExitCode::SUCCESS;226	}227228	if let Err(e) = setup_logging(&opts) {229		eprintln!("{e:#}");230		return ExitCode::FAILURE;231	}232233	init_libraries();234235	let runtime = tokio::runtime::Builder::new_multi_thread()236		.enable_all()237		.on_thread_start(|| {238			gc_register_my_thread();239		})240		.on_thread_stop(|| {241			gc_unregister_my_thread();242		})243		.build()244		.expect("failed to build runtime");245	let runtime = Arc::new(runtime);246247	init_tokio_for_nix(runtime.clone());248249	runtime.block_on(async {250		tokio::task::spawn(async move {251			if let Err(e) = main_real(opts).await {252				error!("{e:#}");253				ExitCode::FAILURE254			} else {255				ExitCode::SUCCESS256			}257		})258		.await259		.expect("primary task panicked")260	})261	// async_main(opts)262}263264async fn main_real(opts: RootOpts) -> Result<()> {265	let nix_args = std::env::var_os("NIX_ARGS")266		.map(|a| extra_args::parse_os(&a))267		.transpose()?268		.unwrap_or_default();269	let config = opts.fleet_opts.build(270		nix_args,271		matches!(opts.command, Opts::Deploy(_) | Opts::BuildSystems(_)),272	)?;273274	match run_command(&config, opts.fleet_opts, opts.command).await {275		Ok(()) => {276			config.save()?;277			Ok(())278		}279		Err(e) => {280			let _ = config.save();281			Err(e)282		}283	}284}285286#[cfg(test)]287mod tests {288	use super::*;289290	#[test]291	fn verify_command() {292		use clap::CommandFactory;293		RootOpts::command().debug_assert();294	}295}