git.delta.rocks / jrsonnet / refs/commits / 505f82ed3097

difftreelog

source

cmds/fleet/src/host.rs18.1 KiBsourcehistory
1use std::{2	cell::{LazyCell, OnceCell},3	collections::BTreeMap,4	env::current_dir,5	ffi::{OsStr, OsString},6	fmt::Display,7	io::Write,8	ops::Deref,9	path::PathBuf,10	str::FromStr,11	sync::{Arc, Mutex, MutexGuard, OnceLock},12};1314use anyhow::{anyhow, bail, ensure, Context, Result};15use clap::Parser;16use fleet_shared::SecretData;17use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};18use nom::{19	bytes::complete::take_while1,20	character::complete::char,21	combinator::{map, opt},22	multi::separated_list1,23	sequence::{preceded, separated_pair},24};25use openssh::SessionBuilder;26use serde::de::DeserializeOwned;27use tempfile::NamedTempFile;28use tracing::error;2930use crate::{31	command::MyCommand,32	fleetdata::{FleetData, FleetSecret, FleetSharedSecret},33};3435pub struct FleetConfigInternals {36	pub local_system: String,37	pub directory: PathBuf,38	pub opts: FleetOpts,39	pub data: Mutex<FleetData>,40	pub nix_args: Vec<OsString>,41	/// fleet_config.config42	pub config_field: Value,4344	/// import nixpkgs {system = local};45	pub default_pkgs: Value,46}4748#[derive(Clone)]49pub struct Config(Arc<FleetConfigInternals>);5051impl Deref for Config {52	type Target = FleetConfigInternals;5354	fn deref(&self) -> &Self::Target {55		&self.056	}57}5859#[derive(Clone, Copy, Debug)]60pub enum EscalationStrategy {61	Sudo,62	Run0,63	Su,64}6566pub struct ConfigHost {67	config: Config,68	pub name: String,69	pub local: bool,70	pub session: OnceLock<Arc<openssh::Session>>,71	groups: OnceCell<Vec<String>>,7273	pub host_config: Option<Value>,74	pub nixos_config: OnceCell<Value>,75}76impl ConfigHost {77	pub async fn escalation_strategy(&self) -> Result<EscalationStrategy> {78		// Prefer sudo, as run0 has some gotchas with polkit79		// and too many repeating prompts.80		if let Ok(_) = self.find_in_path("sudo").await {81			return Ok(EscalationStrategy::Sudo);82		}83		if let Ok(_) = self.find_in_path("run0").await {84			return Ok(EscalationStrategy::Run0);85		}86		Ok(EscalationStrategy::Su)87	}88	// TOCTOU is possible here in case if config is changed, but this case is not handled anywhere anyway,89	// assuming getting tags always returns the same value.90	pub async fn tags(&self) -> Result<Vec<String>> {91		if let Some(v) = self.groups.get() {92			return Ok(v.clone());93		}94		let Some(host_config) = &self.host_config else {95			return Ok(vec![]);96		};97		let tags: Vec<String> = nix_go_json!(host_config.tags);9899		let _ = self.groups.set(tags.clone());100101		Ok(tags)102	}103	pub async fn nixos_config(&self) -> Result<Value> {104		if let Some(v) = self.nixos_config.get() {105			return Ok(v.clone());106		}107		let Some(host_config) = &self.host_config else {108			bail!("local host has no nixos_config");109		};110		let nixos_config = nix_go!(host_config.nixos.config);111		assert_warn("nixos config evaluation", &nixos_config).await?;112113		let _ = self.nixos_config.set(nixos_config.clone());114115		Ok(nixos_config)116	}117	async fn open_session(&self) -> Result<Arc<openssh::Session>> {118		assert!(!self.local, "do not open ssh connection to local session");119		// FIXME: TOCTOU120		if let Some(session) = &self.session.get() {121			return Ok((*session).clone());122		};123		let mut session = SessionBuilder::default();124		let session = session125			.connect(&self.name)126			.await127			.map_err(|e| anyhow!("ssh error while connecting to {}: {e}", self.name))?;128		let session = Arc::new(session);129		self.session.set(session.clone()).expect("TOCTOU happened");130		Ok(session)131	}132	pub async fn mktemp_dir(&self) -> Result<String> {133		let mut cmd = self.cmd("mktemp").await?;134		cmd.arg("-d");135		let path = cmd.run_string().await?;136		Ok(path.trim_end().to_owned())137	}138	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {139		let mut cmd = self.cmd("cat").await?;140		cmd.arg(path);141		cmd.run_bytes().await142	}143	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {144		let mut cmd = self.cmd("cat").await?;145		cmd.arg(path);146		cmd.run_string().await147	}148	pub async fn read_dir(&self, path: impl AsRef<OsStr>) -> Result<Vec<String>> {149		let mut cmd = self.cmd("ls").await?;150		cmd.arg(path);151		let out = cmd.run_string().await?;152		let mut lines = out.split('\n');153		if let Some(last) = lines.next_back() {154			ensure!(last.is_empty(), "output of ls should end with newline");155		}156		Ok(lines.map(ToOwned::to_owned).collect())157	}158	#[allow(dead_code)]159	pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {160		let text = self.read_file_text(path).await?;161		Ok(serde_json::from_str(&text)?)162	}163	pub async fn read_env(&self, env: &str) -> Result<String> {164		let mut cmd = self.cmd("printenv").await?;165		cmd.arg(env);166		Ok(cmd.run_string().await?)167	}168	pub async fn find_in_path(&self, command: &str) -> Result<String> {169		// // `which` is not a part of coreutils, and it might not exist on machine.170		// let path = self.read_env("PATH").await?;171		// // Assuming delimiter is :, we don't work with windows host, this check will be much172		// // more sophisticated in remowt backend (and quicker, since actual PATH search will be done on remote machine)173		// for ele in path.split(':') {174		// 	let test_path = format!("{ele}/{cmd}");175		// 	test -x etc176		// }177		// let mut cmd = self.cmd("printenv").await?;178		// cmd.arg(env);179		// Ok(cmd.run_string().await?)180		// Assuming this is an environment issue if which doesn't exist, will be fixed with remowt.181		let mut cmd = self182			.cmd_escalation(183				// Not used184				EscalationStrategy::Su,185				"which",186			)187			.await?;188		cmd.arg(command);189		cmd.run_string().await190	}191	pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>192	where193		<D as FromStr>::Err: Display,194	{195		let text = self.read_file_text(path).await?;196		D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))197	}198	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {199		self.cmd_escalation(self.escalation_strategy().await?, cmd)200			.await201	}202	pub async fn cmd_escalation(203		&self,204		escalation: EscalationStrategy,205		cmd: impl AsRef<OsStr>,206	) -> Result<MyCommand> {207		if self.local {208			Ok(MyCommand::new(escalation, cmd))209		} else {210			let session = self.open_session().await?;211			Ok(MyCommand::new_on(escalation, cmd, session))212		}213	}214215	pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {216		ensure!(data.encrypted, "secret is not encrypted");217		let mut cmd = self.cmd("fleet-install-secrets").await?;218		cmd.arg("decrypt").eqarg("--secret", data.to_string());219		let encoded = cmd220			.sudo()221			.run_string()222			.await223			.context("failed to call remote host for decrypt")?;224		let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;225		ensure!(!data.encrypted, "secret came out encrypted");226		Ok(data.data)227	}228	pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {229		ensure!(data.encrypted, "secret is not encrypted");230		let mut cmd = self.cmd("fleet-install-secrets").await?;231		cmd.arg("reencrypt").eqarg("--secret", data.to_string());232		for target in targets {233			let key = self.config.key(&target).await?;234			cmd.eqarg("--targets", key);235		}236		let encoded = cmd237			.sudo()238			.run_string()239			.await240			.context("failed to call remote host for decrypt")?;241		let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;242		ensure!(data.encrypted, "secret came out not encrypted");243		Ok(data)244	}245	/// Returns path for futureproofing, as path might change i.e on conversion to CA246	pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {247		if self.local {248			// Path is located locally, thus already trusted.249			return Ok(path.to_owned());250		}251		let mut nix = MyCommand::new(252			// Not used253			EscalationStrategy::Su,254			"nix",255		);256		nix.arg("copy")257			.arg("--substitute-on-destination")258			.comparg("--to", format!("ssh-ng://{}", self.name))259			.arg(path);260		nix.run_nix().await.context("nix copy")?;261		Ok(path.to_owned())262	}263	pub async fn systemctl_stop(&self, name: &str) -> Result<()> {264		let mut cmd = self.cmd("systemctl").await?;265		cmd.arg("stop").arg(name);266		cmd.sudo().run().await267	}268	pub async fn systemctl_start(&self, name: &str) -> Result<()> {269		let mut cmd = self.cmd("systemctl").await?;270		cmd.arg("start").arg(name);271		cmd.sudo().run().await272	}273274	pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {275		let mut cmd = self.cmd("rm").await?;276		cmd.arg("-f").arg(path);277		if sudo {278			cmd = cmd.sudo()279		}280		cmd.run().await281	}282283	pub async fn list_configured_secrets(&self) -> Result<Vec<String>> {284		let nixos = self.nixos_config().await?;285		let secrets = nix_go!(nixos.secrets);286		let mut out = Vec::new();287		for name in secrets.list_fields().await? {288			let secret = nix_go!(secrets[{ name }]);289			let is_shared: bool = nix_go_json!(secret.shared);290			if is_shared {291				continue;292			}293			out.push(name);294		}295		Ok(out)296	}297	pub async fn secret_field(&self, name: &str) -> Result<Value> {298		let nixos = self.nixos_config().await?;299		Ok(nix_go!(nixos.secrets[{ name }]))300	}301302	/// Packages for this host, resolved with nixpkgs overlays303	pub async fn pkgs(&self) -> Result<Value> {304		let Some(host_config) = &self.host_config else {305			bail!("local host has no host_config");306		};307		// TODO: Should nixos.options be cached?308		Ok(nix_go!(host_config.nixos.options._module.args.value.pkgs))309	}310}311312impl Config {313	pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {314		if !self.opts.skip.is_empty() && self.opts.skip.iter().any(|h| h as &str == host.name) {315			return Ok(true);316		}317		if self.opts.only.is_empty() {318			return Ok(false);319		}320		let mut have_group_matches = false;321		for item in self.opts.only.iter() {322			match item {323				HostItem::Host { name, .. } if *name == host.name => {324					return Ok(false);325				}326				HostItem::Tag { .. } => {327					have_group_matches = true;328				}329				_ => {}330			}331		}332		if have_group_matches {333			let host_tags = host.tags().await?;334			for item in self.opts.only.iter() {335				match item {336					HostItem::Tag { name, .. } if host_tags.contains(name) => {337						return Ok(false);338					}339					_ => {}340				}341			}342		}343		Ok(true)344	}345	pub async fn action_attr(&self, host: &ConfigHost, attr: &str) -> Result<Option<String>> {346		if self.opts.only.is_empty() {347			return Ok(None);348		}349		let mut have_group_matches = false;350		for item in self.opts.only.iter() {351			match item {352				HostItem::Host { name, attrs }353					if *name == host.name && attrs.contains_key(attr) =>354				{355					return Ok(attrs.get(attr).cloned());356				}357				HostItem::Tag { attrs, .. } if attrs.contains_key(attr) => {358					have_group_matches = true;359				}360				_ => {}361			}362		}363		if have_group_matches {364			let host_tags = host.tags().await?;365			for item in self.opts.only.iter() {366				match item {367					HostItem::Tag { name, attrs }368						if host_tags.contains(name) && attrs.contains_key(attr) =>369					{370						return Ok(attrs.get(attr).cloned());371					}372					_ => {}373				}374			}375		}376		Ok(None)377	}378	pub fn is_local(&self, host: &str) -> bool {379		self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)380	}381382	pub fn local_host(&self) -> ConfigHost {383		ConfigHost {384			config: self.clone(),385			name: "<virtual localhost>".to_owned(),386			local: true,387			session: OnceLock::new(),388			host_config: None,389			nixos_config: OnceCell::new(),390			groups: {391				let cell = OnceCell::new();392				let _ = cell.set(vec![]);393				cell394			},395		}396	}397398	pub async fn host(&self, name: &str) -> Result<ConfigHost> {399		let config = &self.config_field;400		let host_config = nix_go!(config.hosts[{ name }]);401402		Ok(ConfigHost {403			config: self.clone(),404			name: name.to_owned(),405			local: self.is_local(name),406			session: OnceLock::new(),407			host_config: Some(host_config),408			nixos_config: OnceCell::new(),409			groups: OnceCell::new(),410		})411	}412	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {413		let config = &self.config_field;414		let names = nix_go!(config.hosts).list_fields().await?;415		let mut out = vec![];416		for name in names {417			out.push(self.host(&name).await?);418		}419		Ok(out)420	}421	pub async fn system_config(&self, host: &str) -> Result<Value> {422		let fleet_field = &self.config_field;423		Ok(nix_go!(fleet_field.hosts[{ host }].nixos.config))424	}425426	pub(super) fn data(&self) -> MutexGuard<FleetData> {427		self.data.lock().unwrap()428	}429	pub(super) fn data_mut(&self) -> MutexGuard<FleetData> {430		self.data.lock().unwrap()431	}432	/// Shared secrets configured in fleet.nix or in flake433	pub async fn list_configured_shared(&self) -> Result<Vec<String>> {434		let config_field = &self.config_field;435		Ok(nix_go!(config_field.sharedSecrets).list_fields().await?)436	}437	/// Shared secrets configured in fleet.nix438	pub fn list_shared(&self) -> Vec<String> {439		let data = self.data();440		data.shared_secrets.keys().cloned().collect()441	}442	pub fn has_shared(&self, name: &str) -> bool {443		let data = self.data();444		data.shared_secrets.contains_key(name)445	}446	pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {447		let mut data = self.data_mut();448		data.shared_secrets.insert(name.to_owned(), shared);449	}450	pub fn remove_shared(&self, secret: &str) {451		let mut data = self.data_mut();452		data.shared_secrets.remove(secret);453	}454455	pub fn list_secrets(&self, host: &str) -> Vec<String> {456		let data = self.data();457		let Some(secrets) = data.host_secrets.get(host) else {458			return Vec::new();459		};460		secrets.keys().cloned().collect()461	}462463	pub fn has_secret(&self, host: &str, secret: &str) -> bool {464		let data = self.data();465		let Some(host_secrets) = data.host_secrets.get(host) else {466			return false;467		};468		host_secrets.contains_key(secret)469	}470	pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {471		let mut data = self.data_mut();472		let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();473		host_secrets.insert(secret, value);474	}475476	pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {477		let data = self.data();478		let Some(host_secrets) = data.host_secrets.get(host) else {479			bail!("no secrets for machine {host}");480		};481		let Some(secret) = host_secrets.get(secret) else {482			bail!("machine {host} has no secret {secret}");483		};484		Ok(secret.clone())485	}486	pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {487		let data = self.data();488		let Some(secret) = data.shared_secrets.get(secret) else {489			bail!("no shared secret {secret}");490		};491		Ok(secret.clone())492	}493	pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {494		let config_field = &self.config_field;495		Ok(nix_go_json!(496			config_field.sharedSecrets[{ secret }].expectedOwners497		))498	}499500	pub fn save(&self) -> Result<()> {501		let mut tempfile = NamedTempFile::new_in(self.directory.clone()).context("failed to create updated version of fleet.nix in the same directory as original.\nDo you have write access to it? Access only to the fleet.nix won't be enough, the directory is used for atomic overwrite operation.\nIt is not recommended to use fleet by root anyway, move fleet project to your home directory.")?;502		let data = nixlike::serialize(&self.data() as &FleetData)?;503		tempfile.write_all(504			format!(505				"# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",506				data507			)508			.as_bytes(),509		)?;510		let mut fleet_data_path = self.directory.clone();511		fleet_data_path.push("fleet.nix");512		tempfile.persist(fleet_data_path)?;513		Ok(())514	}515}516517#[derive(Clone)]518enum HostItem {519	Host {520		name: String,521		attrs: BTreeMap<String, String>,522	},523	Tag {524		name: String,525		attrs: BTreeMap<String, String>,526	},527}528fn host_item_parser(input: &str) -> Result<HostItem, String> {529	fn err_to_string(err: nom::Err<nom::error::Error<&str>>) -> String {530		err.to_string()531	}532533	let (input, is_tag) = map(opt(char('@')), |c| c.is_some())(input).map_err(err_to_string)?;534	let (input, name) = map(535		take_while1(|v| v != ',' && v != '?' && v != '@'),536		str::to_owned,537	)(input)538	.map_err(err_to_string)?;539540	let kw_item = separated_pair(541		map(take_while1(|v| v != '&' && v != '='), str::to_owned),542		char('='),543		map(take_while1(|v| v != '&'), str::to_owned),544	);545	let kw = map(separated_list1(char('&'), kw_item), |vec| {546		vec.into_iter().collect::<BTreeMap<_, _>>()547	});548	let mut opt_kw = map(opt(preceded(char('?'), kw)), Option::unwrap_or_default);549550	let (input, attrs) = opt_kw(input).map_err(err_to_string)?;551552	if !input.is_empty() {553		return Err(format!("unexpected trailing input: {input:?}"));554	}555	Ok(if is_tag {556		HostItem::Tag { name, attrs }557	} else {558		HostItem::Host { name, attrs }559	})560}561562#[derive(Parser, Clone)]563pub struct FleetOpts {564	/// All hosts except those would be skipped565	#[clap(long, number_of_values = 1, value_parser = host_item_parser)]566	only: Vec<HostItem>,567568	/// Hosts to skip569	#[clap(long, number_of_values = 1)]570	skip: Vec<String>,571572	/// Host, which should be threaten as current machine573	#[clap(long)]574	pub localhost: Option<String>,575576	/// Override detected system for host, to perform builds via577	/// binfmt-declared qemu instead of trying to crosscompile578	#[clap(long, default_value = "detect")]579	pub local_system: String,580}581582impl FleetOpts {583	pub async fn build(mut self, nix_args: Vec<OsString>) -> Result<Config> {584		if self.localhost.is_none() {585			self.localhost586				.replace(hostname::get().unwrap().to_str().unwrap().to_owned());587		}588		let directory = current_dir()?;589590		let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;591		let root_field = pool.get().await?;592593		let builtins_field = Value::binding(root_field.clone(), "builtins").await?;594		if self.local_system == "detect" {595			self.local_system = nix_go_json!(builtins_field.currentSystem);596		}597		let local_system = self.local_system.clone();598599		let mut fleet_data_path = directory.clone();600		fleet_data_path.push("fleet.nix");601		let bytes = std::fs::read_to_string(fleet_data_path)?;602		let data: Mutex<FleetData> = nixlike::parse_str(&bytes)?;603604		let fleet_root = Value::binding(root_field, "fleetConfigurations").await?;605		let fleet_field = nix_go!(fleet_root.default({ data }));606607		let config_field = nix_go!(fleet_field.config);608609		assert_warn("fleet config evaluation", &config_field).await?;610611		let import = nix_go!(builtins_field.import);612		let overlays = nix_go!(config_field.nixpkgs.overlays);613		let nixpkgs = nix_go!(fleet_field.nixpkgs.buildUsing | import);614615		let default_pkgs = nix_go!(nixpkgs(Obj {616			overlays,617			system: { self.local_system.clone() },618		}));619620		Ok(Config(Arc::new(FleetConfigInternals {621			opts: self,622			directory,623			data,624			local_system,625			nix_args,626			config_field,627			default_pkgs,628		})))629	}630}