1#![recursion_limit = "512"]23pub(crate) mod cmds;45pub(crate) mod extra_args;67use std::{env, ffi::OsString, process::ExitCode, sync::Arc};89use anyhow::{Result, bail};10use clap::{CommandFactory, Parser};11use cmds::{12 build_systems::{BuildSystems, Deploy},13 complete::Complete,14 info::Info,15 rollback::RollbackSingle,16 secrets::Secret,17 tf::Tf,18};19use fleet_base::{host::Config, opts::FleetOpts};20use futures::{TryStreamExt, stream::FuturesUnordered};21#[cfg(feature = "indicatif")]22use human_repr::HumanCount;23#[cfg(feature = "indicatif")]24use indicatif::{ProgressState, ProgressStyle};25use nix_eval::{26 gc_register_my_thread, gc_unregister_my_thread, init_libraries, init_tokio_for_nix,27};28use opentelemetry::trace::TracerProvider;29use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;30use opentelemetry_exporter_env::{31 OtlpBaseSettings, OtlpLogsSettings, OtlpTracesSettings, ResolvedOtlpSettings,32};33use opentelemetry_sdk::{logs::SdkLoggerProvider, trace::SdkTracerProvider};34use tracing::{Instrument, error, info, info_span};35#[cfg(feature = "indicatif")]36use tracing_indicatif::IndicatifLayer;37use tracing_subscriber::{EnvFilter, prelude::*};3839#[derive(Parser)]40struct Prefetch {}41impl Prefetch {42 async fn run(&self, config: &Config) -> Result<()> {43 let mut prefetch_dir = config.directory.to_path_buf();44 prefetch_dir.push("prefetch");45 if !prefetch_dir.is_dir() {46 info!("nothing to prefetch: no prefetch directory");47 return Ok(());48 }49 let tasks = FuturesUnordered::new();50 for entry in std::fs::read_dir(&prefetch_dir)? {51 tasks.push(async {52 let entry = entry?;53 if !entry.metadata()?.is_file() {54 bail!("only files should exist in prefetch directory");55 }56 let span = info_span!(57 "prefetching",58 name = entry.file_name().to_string_lossy().as_ref()59 );60 let mut path = OsString::new();61 path.push("file://");62 path.push(entry.path());6364 let mut status = config.local_host().cmd("nix").await?;65 status.args(&config.nix_args);66 status.arg("store").arg("prefetch-file").arg(path);67 status.run_nix_string().instrument(span).await?;68 Ok(())69 });70 }71 tasks.try_collect::<Vec<()>>().await?;72 Ok(())73 }74}7576#[derive(Parser)]77enum Opts {78 79 BuildSystems(BuildSystems),80 81 Deploy(Deploy),82 83 RollbackSingle(RollbackSingle),84 85 #[clap(subcommand)]86 Secret(Secret),87 88 Prefetch(Prefetch),89 90 Info(Info),91 92 #[clap(hide(true))]93 Complete(Complete),94 95 Tf(Tf),96}9798#[derive(Parser)]99#[clap(version, author)]100struct RootOpts {101 #[clap(flatten)]102 fleet_opts: FleetOpts,103 #[clap(subcommand)]104 command: Opts,105 #[clap(long, next_help_heading = "Telemetry", env = "OTEL_FLEET")]106 otel: bool,107 #[clap(flatten)]108 otlp_base: OtlpBaseSettings,109 #[clap(flatten)]110 otel_logs: OtlpLogsSettings,111 #[clap(flatten)]112 otel_traces: OtlpTracesSettings,113}114115async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {116 match command {117 Opts::BuildSystems(c) => c.run(config, &opts).await?,118 Opts::Deploy(d) => d.run(config, &opts).await?,119 Opts::RollbackSingle(r) => r.run(config, &opts).await?,120 Opts::Secret(s) => s.run(config, &opts).await?,121 Opts::Info(i) => i.run(config).await?,122 Opts::Prefetch(p) => p.run(config).await?,123 Opts::Tf(t) => t.run(config).await?,124 125 Opts::Complete(c) => {126 tokio::task::spawn_blocking(move || c.run(RootOpts::command())).await?127 }128 };129 Ok(())130}131132fn setup_logging(opts: &RootOpts) -> Result<()> {133 #[cfg(feature = "indicatif")]134 let indicatif_layer = {135 use std::time::Duration;136137 IndicatifLayer::new().with_max_progress_bars(10, Some(ProgressStyle::default_spinner()))138 .with_progress_style(139 ProgressStyle::with_template(140 "{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",141 )142 .unwrap()143 .with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {144 let Some(len) = state.len() else {145 return;146 };147 let pos = state.pos();148 if pos > len {149 let _ = write!(writer, "{}", pos.human_count_bare());150 } else {151 let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());152 }153 })154 .with_key(155 "color_start",156 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {157 let elapsed = state.elapsed();158159 if elapsed > Duration::from_secs(60) {160 161 let _ = write!(writer, "\x1b[{}m", 1 + 30);162 } else if elapsed > Duration::from_secs(30) {163 164 let _ = write!(writer, "\x1b[{}m", 3 + 30);165 }166 },167 )168 .with_key(169 "color_end",170 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {171 if state.elapsed() > Duration::from_secs(30) {172 let _ = write!(writer, "\x1b[0m");173 }174 },175 ),176 )177 };178179 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));180181 let reg = tracing_subscriber::registry().with({182 let sub = tracing_subscriber::fmt::layer()183 .without_time()184 .with_target(false);185 #[cfg(feature = "indicatif")]186 let sub = sub.with_writer(indicatif_layer.get_stderr_writer());187 sub.with_filter(filter) 188 });189190 #[cfg(feature = "indicatif")]191 let reg = reg.with(indicatif_layer);192193 if opts.otel {194 let traces = ResolvedOtlpSettings::traces(&opts.otlp_base, &opts.otel_traces)?;195 let span_exporter = traces.span_exporter()?;196 let logs = ResolvedOtlpSettings::logs(&opts.otlp_base, &opts.otel_logs)?;197 let log_exporter = logs.log_exporter()?;198199 let span_provider = SdkTracerProvider::builder()200 .with_batch_exporter(span_exporter)201 .build();202 let log_provider = SdkLoggerProvider::builder()203 .with_batch_exporter(log_exporter)204 .build();205206 let logger = OpenTelemetryTracingBridge::new(&log_provider);207 let tracer = span_provider.tracer("fleet");208209 let reg = reg210 .with(tracing_opentelemetry::layer().with_tracer(tracer))211 .with(logger);212213 reg.init();214 } else {215 reg.init();216 };217218 Ok(())219}220221fn main() -> ExitCode {222 let opts = RootOpts::parse();223 if let Opts::Complete(c) = &opts.command {224 c.run(RootOpts::command());225 return ExitCode::SUCCESS;226 }227228 if let Err(e) = setup_logging(&opts) {229 eprintln!("{e:#}");230 return ExitCode::FAILURE;231 }232233 init_libraries();234235 let runtime = tokio::runtime::Builder::new_multi_thread()236 .enable_all()237 .on_thread_start(|| {238 gc_register_my_thread();239 })240 .on_thread_stop(|| {241 gc_unregister_my_thread();242 })243 .build()244 .expect("failed to build runtime");245 let runtime = Arc::new(runtime);246247 init_tokio_for_nix(runtime.clone());248249 runtime.block_on(async {250 tokio::task::spawn(async move {251 if let Err(e) = main_real(opts).await {252 error!("{e:#}");253 ExitCode::FAILURE254 } else {255 ExitCode::SUCCESS256 }257 })258 .await259 .expect("primary task panicked")260 })261 262}263264async fn main_real(opts: RootOpts) -> Result<()> {265 let nix_args = std::env::var_os("NIX_ARGS")266 .map(|a| extra_args::parse_os(&a))267 .transpose()?268 .unwrap_or_default();269 let config = opts.fleet_opts.build(270 nix_args,271 matches!(opts.command, Opts::Deploy(_) | Opts::BuildSystems(_)),272 )?;273274 match run_command(&config, opts.fleet_opts, opts.command).await {275 Ok(()) => {276 config.save()?;277 Ok(())278 }279 Err(e) => {280 let _ = config.save();281 Err(e)282 }283 }284}285286#[cfg(test)]287mod tests {288 use super::*;289290 #[test]291 fn verify_command() {292 use clap::CommandFactory;293 RootOpts::command().debug_assert();294 }295}