git.delta.rocks / jrsonnet / refs/commits / 3972fee37ee3

difftreelog

source

crates/fleet-base/src/host.rs14.8 KiBsourcehistory
1use std::{2	cell::OnceCell,3	collections::BTreeSet,4	ffi::{OsStr, OsString},5	fmt::Display,6	io::Write,7	ops::Deref,8	path::PathBuf,9	str::FromStr,10	sync::{Arc, Mutex, MutexGuard, OnceLock},11};1213use anyhow::{anyhow, bail, ensure, Context, Result};14use fleet_shared::SecretData;15use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSession, Value};16use openssh::SessionBuilder;17use serde::de::DeserializeOwned;18use tempfile::NamedTempFile;1920use crate::{21	command::MyCommand,22	fleetdata::{FleetData, FleetSecret, FleetSharedSecret},23};2425pub struct FleetConfigInternals {26	pub local_system: String,27	pub directory: PathBuf,28	pub data: Mutex<FleetData>,29	pub nix_args: Vec<OsString>,30	/// fleet_config.config31	pub config_field: Value,32	// TODO: Remove with connectivity refactor33	pub localhost: String,3435	/// import nixpkgs {system = local};36	pub default_pkgs: Value,37	pub nixpkgs: Value,3839	pub nix_session: NixSession,40}4142// TODO: Make field not pub43#[derive(Clone)]44pub struct Config(pub Arc<FleetConfigInternals>);4546impl Deref for Config {47	type Target = FleetConfigInternals;4849	fn deref(&self) -> &Self::Target {50		&self.051	}52}5354#[derive(Clone, Copy, Debug)]55pub enum EscalationStrategy {56	Sudo,57	Run0,58	Su,59}6061pub struct ConfigHost {62	config: Config,63	pub name: String,64	groups: OnceCell<Vec<String>>,6566	pub host_config: Option<Value>,67	pub nixos_config: OnceCell<Value>,68	pub pkgs_override: Option<Value>,6970	// TODO: Move command helpers away with connectivity refactor71	pub local: bool,72	pub session: OnceLock<Arc<openssh::Session>>,73}74// TODO: Move command helpers away with connectivity refactor75impl ConfigHost {76	pub async fn escalation_strategy(&self) -> Result<EscalationStrategy> {77		// Prefer sudo, as run0 has some gotchas with polkit78		// and too many repeating prompts.79		if (self.find_in_path("sudo").await).is_ok() {80			return Ok(EscalationStrategy::Sudo);81		}82		if (self.find_in_path("run0").await).is_ok() {83			return Ok(EscalationStrategy::Run0);84		}85		Ok(EscalationStrategy::Su)86	}87	async fn open_session(&self) -> Result<Arc<openssh::Session>> {88		assert!(!self.local, "do not open ssh connection to local session");89		// FIXME: TOCTOU90		if let Some(session) = &self.session.get() {91			return Ok((*session).clone());92		};93		let session = SessionBuilder::default();94		let session = session95			.connect(&self.name)96			.await97			.map_err(|e| anyhow!("ssh error while connecting to {}: {e}", self.name))?;98		let session = Arc::new(session);99		self.session.set(session.clone()).expect("TOCTOU happened");100		Ok(session)101	}102	pub async fn mktemp_dir(&self) -> Result<String> {103		let mut cmd = self.cmd("mktemp").await?;104		cmd.arg("-d");105		let path = cmd.run_string().await?;106		Ok(path.trim_end().to_owned())107	}108	pub async fn file_exists(&self, path: impl AsRef<OsStr>) -> Result<bool> {109		let mut cmd = self.cmd("sh").await?;110		cmd.arg("-c")111			.arg("test -e \"$1\" && echo true || echo false")112			.arg("_")113			.arg(path);114		Ok(cmd.run_value().await?)115	}116	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {117		let mut cmd = self.cmd("cat").await?;118		cmd.arg(path);119		cmd.run_bytes().await120	}121	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {122		let mut cmd = self.cmd("cat").await?;123		cmd.arg(path);124		cmd.run_string().await125	}126	pub async fn read_dir(&self, path: impl AsRef<OsStr>) -> Result<Vec<String>> {127		let mut cmd = self.cmd("ls").await?;128		cmd.arg(path);129		let out = cmd.run_string().await?;130		let mut lines = out.split('\n');131		if let Some(last) = lines.next_back() {132			ensure!(last.is_empty(), "output of ls should end with newline");133		}134		Ok(lines.map(ToOwned::to_owned).collect())135	}136	#[allow(dead_code)]137	pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {138		let text = self.read_file_text(path).await?;139		Ok(serde_json::from_str(&text)?)140	}141	pub async fn read_env(&self, env: &str) -> Result<String> {142		let mut cmd = self.cmd("printenv").await?;143		cmd.arg(env);144		cmd.run_string().await145	}146	pub async fn find_in_path(&self, command: &str) -> Result<String> {147		// // `which` is not a part of coreutils, and it might not exist on machine.148		// let path = self.read_env("PATH").await?;149		// // Assuming delimiter is :, we don't work with windows host, this check will be much150		// // more sophisticated in remowt backend (and quicker, since actual PATH search will be done on remote machine)151		// for ele in path.split(':') {152		// 	let test_path = format!("{ele}/{cmd}");153		// 	test -x etc154		// }155		// let mut cmd = self.cmd("printenv").await?;156		// cmd.arg(env);157		// Ok(cmd.run_string().await?)158		// Assuming this is an environment issue if which doesn't exist, will be fixed with remowt.159		let mut cmd = self160			.cmd_escalation(161				// Not used162				EscalationStrategy::Su,163				"which",164			)165			.await?;166		cmd.arg(command);167		cmd.run_string().await168	}169	pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>170	where171		<D as FromStr>::Err: Display,172	{173		let text = self.read_file_text(path).await?;174		D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))175	}176	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {177		self.cmd_escalation(self.escalation_strategy().await?, cmd)178			.await179	}180	pub async fn cmd_escalation(181		&self,182		escalation: EscalationStrategy,183		cmd: impl AsRef<OsStr>,184	) -> Result<MyCommand> {185		if self.local {186			Ok(MyCommand::new(escalation, cmd))187		} else {188			let session = self.open_session().await?;189			Ok(MyCommand::new_on(escalation, cmd, session))190		}191	}192193	pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {194		ensure!(data.encrypted, "secret is not encrypted");195		let mut cmd = self.cmd("fleet-install-secrets").await?;196		cmd.arg("decrypt").eqarg("--secret", data.to_string());197		let encoded = cmd198			.sudo()199			.run_string()200			.await201			.context("failed to call remote host for decrypt")?;202		let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;203		ensure!(!data.encrypted, "secret came out encrypted");204		Ok(data.data)205	}206	pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {207		ensure!(data.encrypted, "secret is not encrypted");208		let mut cmd = self.cmd("fleet-install-secrets").await?;209		cmd.arg("reencrypt").eqarg("--secret", data.to_string());210		for target in targets {211			let key = self.config.key(&target).await?;212			cmd.eqarg("--targets", key);213		}214		let encoded = cmd215			.sudo()216			.run_string()217			.await218			.context("failed to call remote host for decrypt")?;219		let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;220		ensure!(data.encrypted, "secret came out not encrypted");221		Ok(data)222	}223	/// Returns path for futureproofing, as path might change i.e on conversion to CA224	pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {225		if self.local {226			// Path is located locally, thus already trusted.227			return Ok(path.to_owned());228		}229		let mut nix = MyCommand::new(230			// Not used231			EscalationStrategy::Su,232			"nix",233		);234		nix.arg("copy")235			.arg("--substitute-on-destination")236			.comparg("--to", format!("ssh-ng://{}", self.name))237			.arg(path);238		nix.run_nix().await.context("nix copy")?;239		Ok(path.to_owned())240	}241	pub async fn systemctl_stop(&self, name: &str) -> Result<()> {242		let mut cmd = self.cmd("systemctl").await?;243		cmd.arg("stop").arg(name);244		cmd.sudo().run().await245	}246	pub async fn systemctl_start(&self, name: &str) -> Result<()> {247		let mut cmd = self.cmd("systemctl").await?;248		cmd.arg("start").arg(name);249		cmd.sudo().run().await250	}251252	pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {253		let mut cmd = self.cmd("rm").await?;254		cmd.arg("-f").arg(path);255		if sudo {256			cmd = cmd.sudo()257		}258		cmd.run().await259	}260}261impl ConfigHost {262	// TOCTOU is possible here in case if config is changed, but this case is not handled anywhere anyway,263	// assuming getting tags always returns the same value.264	pub async fn tags(&self) -> Result<Vec<String>> {265		if let Some(v) = self.groups.get() {266			return Ok(v.clone());267		}268		let Some(host_config) = &self.host_config else {269			return Ok(vec![]);270		};271		let tags: Vec<String> = nix_go_json!(host_config.tags);272273		let _ = self.groups.set(tags.clone());274275		Ok(tags)276	}277	pub async fn nixos_config(&self) -> Result<Value> {278		if let Some(v) = self.nixos_config.get() {279			return Ok(v.clone());280		}281		let Some(host_config) = &self.host_config else {282			bail!("local host has no nixos_config");283		};284		let nixos_config = nix_go!(host_config.nixos.config);285		assert_warn("nixos config evaluation", &nixos_config).await?;286287		let _ = self.nixos_config.set(nixos_config.clone());288289		Ok(nixos_config)290	}291292	pub async fn list_configured_secrets(&self) -> Result<Vec<String>> {293		let nixos = self.nixos_config().await?;294		let secrets = nix_go!(nixos.secrets);295		let mut out = Vec::new();296		for name in secrets.list_fields().await? {297			let secret = nix_go!(secrets[{ name }]);298			let is_shared: bool = nix_go_json!(secret.shared);299			if is_shared {300				continue;301			}302			out.push(name);303		}304		Ok(out)305	}306	pub async fn secret_field(&self, name: &str) -> Result<Value> {307		let nixos = self.nixos_config().await?;308		Ok(nix_go!(nixos.secrets[{ name }]))309	}310311	/// Packages for this host, resolved with nixpkgs overlays312	pub async fn pkgs(&self) -> Result<Value> {313		if let Some(value) = &self.pkgs_override {314			return Ok(value.clone());315		}316		let Some(host_config) = &self.host_config else {317			bail!("local host has no host_config");318		};319		// TODO: Should nixos.options be cached?320		Ok(nix_go!(host_config.nixos.options._module.args.value.pkgs))321	}322}323324impl Config {325	pub async fn tagged_hostnames(&self, tag: &str) -> Result<Vec<String>> {326		let config = &self.config_field;327		let tagged: Vec<String> = nix_go_json!(config.taggedWith[{ tag }]);328		Ok(tagged)329	}330	pub async fn expand_owner_set(&self, owners: Vec<String>) -> Result<BTreeSet<String>> {331		let mut out = BTreeSet::new();332		for owner in owners {333			if let Some(tag) = owner.strip_prefix('@') {334				let hosts = self.tagged_hostnames(tag).await?;335				out.extend(hosts);336			} else {337				out.insert(owner);338			}339		}340		Ok(out)341	}342	pub fn local_host(&self) -> ConfigHost {343		ConfigHost {344			config: self.clone(),345			name: "<virtual localhost>".to_owned(),346			host_config: None,347			nixos_config: OnceCell::new(),348			groups: {349				let cell = OnceCell::new();350				let _ = cell.set(vec![]);351				cell352			},353			pkgs_override: Some(self.default_pkgs.clone()),354355			local: true,356			session: OnceLock::new(),357		}358	}359360	pub async fn host(&self, name: &str) -> Result<ConfigHost> {361		let config = &self.config_field;362		let host_config = nix_go!(config.hosts[{ name }]);363364		Ok(ConfigHost {365			config: self.clone(),366			name: name.to_owned(),367			host_config: Some(host_config),368			nixos_config: OnceCell::new(),369			groups: OnceCell::new(),370			pkgs_override: None,371372			// TODO: Remove with connectivit refactor373			local: self.localhost == name,374			session: OnceLock::new(),375		})376	}377	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {378		let config = &self.config_field;379		let names = nix_go!(config.hosts).list_fields().await?;380		let mut out = vec![];381		for name in names {382			out.push(self.host(&name).await?);383		}384		Ok(out)385	}386	// TODO: Replace usages with .host().nixos_config387	pub async fn system_config(&self, host: &str) -> Result<Value> {388		let fleet_field = &self.config_field;389		Ok(nix_go!(fleet_field.hosts[{ host }].nixos.config))390	}391392	/// Shared secrets configured in fleet.nix or in flake393	pub async fn list_configured_shared(&self) -> Result<Vec<String>> {394		let config_field = &self.config_field;395		Ok(nix_go!(config_field.sharedSecrets).list_fields().await?)396	}397	/// Shared secrets configured in fleet.nix398	pub fn list_shared(&self) -> Vec<String> {399		let data = self.data();400		data.shared_secrets.keys().cloned().collect()401	}402	pub fn has_shared(&self, name: &str) -> bool {403		let data = self.data();404		data.shared_secrets.contains_key(name)405	}406	pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {407		let mut data = self.data_mut();408		data.shared_secrets.insert(name.to_owned(), shared);409	}410	pub fn remove_shared(&self, secret: &str) {411		let mut data = self.data_mut();412		data.shared_secrets.remove(secret);413	}414415	pub fn list_secrets(&self, host: &str) -> Vec<String> {416		let data = self.data();417		let Some(secrets) = data.host_secrets.get(host) else {418			return Vec::new();419		};420		secrets.keys().cloned().collect()421	}422423	pub fn has_secret(&self, host: &str, secret: &str) -> bool {424		let data = self.data();425		let Some(host_secrets) = data.host_secrets.get(host) else {426			return false;427		};428		host_secrets.contains_key(secret)429	}430	pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {431		let mut data = self.data_mut();432		let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();433		host_secrets.insert(secret, value);434	}435436	pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {437		let data = self.data();438		let Some(host_secrets) = data.host_secrets.get(host) else {439			bail!("no secrets for machine {host}");440		};441		let Some(secret) = host_secrets.get(secret) else {442			bail!("machine {host} has no secret {secret}");443		};444		Ok(secret.clone())445	}446	pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {447		let data = self.data();448		let Some(secret) = data.shared_secrets.get(secret) else {449			bail!("no shared secret {secret}");450		};451		Ok(secret.clone())452	}453	pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {454		let config_field = &self.config_field;455		Ok(nix_go_json!(456			config_field.sharedSecrets[{ secret }].expectedOwners457		))458	}459460	// TODO: Should this be something modifiable from other processes?461	// E.g terraform provider might want to update FleetData (e.g secrets),462	// and current implementation assumes only one process holds current fleet.nix463	// Given that it is no longer needs to be a file for nix evaluation,464	// maybe it can be a .nix file for persistence, but accessible only465	// thru some shared state controller? Might it be stored in terraform466	// state provider?467	pub fn data(&self) -> MutexGuard<FleetData> {468		self.data.lock().unwrap()469	}470	pub fn data_mut(&self) -> MutexGuard<FleetData> {471		self.data.lock().unwrap()472	}473	pub fn save(&self) -> Result<()> {474		let mut tempfile = NamedTempFile::new_in(self.directory.clone()).context("failed to create updated version of fleet.nix in the same directory as original.\nDo you have write access to it? Access only to the fleet.nix won't be enough, the directory is used for atomic overwrite operation.\nIt is not recommended to use fleet by root anyway, move fleet project to your home directory.")?;475		let data = nixlike::serialize(&self.data() as &FleetData)?;476		tempfile.write_all(477			format!(478				"# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",479				data480			)481			.as_bytes(),482		)?;483		let mut fleet_data_path = self.directory.clone();484		fleet_data_path.push("fleet.nix");485		tempfile.persist(fleet_data_path)?;486		Ok(())487	}488}