difftreelog
refactor remote command management
in: trunk
8 files changed
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,9 +1,10 @@
-use std::{env::current_dir, process::Stdio, time::Duration};
+use std::{env::current_dir, time::Duration};
-use crate::{command::CommandExt, host::Config};
+use crate::command::MyCommand;
+use crate::host::Config;
use anyhow::Result;
use clap::Parser;
-use tokio::{process::Command, task::LocalSet, time::sleep};
+use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
#[derive(Parser, Clone)]
@@ -33,6 +34,9 @@
}
pub(crate) fn should_switch_profile(&self) -> bool {
+ matches!(self, Self::Switch | Self::Boot)
+ }
+ pub(crate) fn should_activate(&self) -> bool {
matches!(self, Self::Switch | Self::Test)
}
}
@@ -108,13 +112,7 @@
dir.path().to_owned()
};
- let mut nix_build = if self.privileged_build {
- let mut out = Command::new("sudo");
- out.arg("nix");
- out
- } else {
- Command::new("nix")
- };
+ let mut nix_build = MyCommand::new("nix");
nix_build
.args([
"build",
@@ -122,9 +120,8 @@
"--json",
// "--show-trace",
"--no-link",
- "--out-link",
])
- .arg(&built)
+ .comparg("--out-link", &built)
.arg(
config.configuration_attr_name(&format!(
"buildSystems.{}.{host}",
@@ -133,6 +130,10 @@
)
.args(&config.nix_args);
+ if self.privileged_build {
+ nix_build = nix_build.sudo();
+ }
+
nix_build.run_nix().await.map_err(|e| {
if action.build_attr() == "sdImage" {
info!("sd-image build failed");
@@ -149,14 +150,11 @@
info!("uploading system closure");
let mut tries = 0;
loop {
- match Command::new("nix")
- .args(["copy", "--to"])
- .arg(format!("ssh://root@{}", host))
- .arg(&built)
- .inherit_stdio()
- .run_nix()
- .await
- {
+ let mut nix = MyCommand::new("nix");
+ nix.arg("copy")
+ .comparg("--to", format!("ssh://root@{host}"))
+ .arg(&built);
+ match nix.run_nix().await {
Ok(()) => break,
Err(e) if tries < 3 => {
tries += 1;
@@ -170,24 +168,20 @@
if let Some(action) = action {
if action.should_switch_profile() {
info!("switching generation");
- config
- .command_on(&host, "nix-env", true)
- .args(["-p", "/nix/var/nix/profiles/system", "--set"])
- .arg(&built)
- .inherit_stdio()
- .run()
- .await?;
+ let mut cmd = MyCommand::new("nix-env");
+ cmd.comparg("--profile", "/nix/var/nix/profiles/system")
+ .comparg("--set", &built);
+ config.run_on(&host, cmd, true).await?;
+ }
+ if action.should_activate() {
+ info!("executing activation script");
+ let mut switch_script = built.clone();
+ switch_script.push("bin");
+ switch_script.push("switch-to-configuration");
+ let mut cmd = MyCommand::new(switch_script);
+ cmd.arg(action.name());
+ config.run_on(&host, cmd, true).await?;
}
- info!("executing activation script");
- let mut switch_script = built.clone();
- switch_script.push("bin");
- switch_script.push("switch-to-configuration");
- config
- .command_on(&host, switch_script, true)
- .arg(action.name())
- .stdout(Stdio::inherit())
- .run()
- .await?;
}
}
Action::Package(PackageAction::SdImage) => {
@@ -195,39 +189,30 @@
out.push(format!("sd-image-{}", host));
info!("building sd image to {:?}", out);
- let mut nix_build = if self.privileged_build {
- let mut out = Command::new("sudo");
- out.arg("nix");
- out
- } else {
- Command::new("nix")
- };
+ let mut nix_build = MyCommand::new("nix");
nix_build
- .args(["build", "--impure", "--no-link", "--out-link"])
- .arg(&out)
+ .args(["build", "--impure", "--no-link"])
+ .comparg("--out-link", &out)
.arg(config.configuration_attr_name(&format!("buildSystems.sdImage.{}", host,)))
.args(&config.nix_args);
if !self.fail_fast {
nix_build.arg("--keep-going");
}
+ if self.privileged_build {
+ nix_build = nix_build.sudo();
+ }
- nix_build.inherit_stdio().run_nix().await?;
+ nix_build.run_nix().await?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
out.push(format!("installation-cd-{}", host));
info!("building sd image to {:?}", out);
- let mut nix_build = if self.privileged_build {
- let mut out = Command::new("sudo");
- out.arg("nix");
- out
- } else {
- Command::new("nix")
- };
+ let mut nix_build = MyCommand::new("nix");
nix_build
- .args(["build", "--impure", "--no-link", "--out-link"])
- .arg(&out)
+ .args(["build", "--impure", "--no-link"])
+ .comparg("--out-link", &out)
.arg(
config.configuration_attr_name(&format!(
"buildSystems.installationCd.{}",
@@ -238,8 +223,11 @@
if !self.fail_fast {
nix_build.arg("--keep-going");
}
+ if self.privileged_build {
+ nix_build = nix_build.sudo();
+ }
- nix_build.inherit_stdio().run_nix().await?;
+ nix_build.run_nix().await?;
}
};
Ok(())
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth1use crate::{2 fleetdata::{FleetSecret, FleetSharedSecret},3 host::Config,4};5use anyhow::{bail, ensure, Context, Result};6use clap::Parser;7use futures::{StreamExt, TryStreamExt};8use std::{9 collections::HashSet,10 io::{self, Cursor, Read},11 path::PathBuf,12};13use tokio::fs::read_to_string;14use tracing::{info, warn, error};1516#[derive(Parser)]17pub enum Secrets {18 /// Force load keys for all defined hosts19 ForceKeys,20 /// Add secret, data should be provided in stdin21 AddShared {22 /// Secret name23 name: String,24 /// Secret owners25 machines: Vec<String>,26 /// Override secret if already present27 #[clap(long)]28 force: bool,29 #[clap(long)]30 public: Option<String>,31 #[clap(long)]32 public_file: Option<PathBuf>,33 },34 /// Add secret, data should be provided in stdin35 Add {36 /// Secret name37 name: String,38 /// Secret owners39 machine: String,40 /// Override secret if already present41 #[clap(long)]42 force: bool,43 #[clap(long)]44 public: Option<String>,45 #[clap(long)]46 public_file: Option<PathBuf>,47 },48 /// Read secret from remote host, requires sudo on said host49 Read {50 name: String,51 machine: String,52 #[clap(long)]53 plaintext: bool,54 },55 UpdateShared {56 name: String,5758 #[clap(long)]59 machines: Option<Vec<String>>,6061 #[clap(long)]62 add_machines: Vec<String>,63 #[clap(long)]64 remove_machines: Vec<String>,6566 /// Which host should we use to decrypt67 #[clap(long)]68 prefer_identities: Vec<String>,69 },70 Regenerate {71 /// Which host should we use to decrypt, in case if reencryption is required, without72 /// regeneration73 #[clap(long)]74 prefer_identities: Vec<String>,75 },76}7778impl Secrets {79 pub async fn run(self, config: &Config) -> Result<()> {80 match self {81 Secrets::ForceKeys => {82 for host in config.list_hosts().await? {83 if config.should_skip(&host) {84 continue;85 }86 config.key(&host).await?;87 }88 }89 Secrets::AddShared {90 machines,91 name,92 force,93 public,94 public_file,95 } => {96 let recipients = futures::stream::iter(machines.iter())97 .then(|m| config.recipient(m))98 .try_collect::<Vec<_>>()99 .await?;100101 let secret = {102 let mut input = vec![];103 io::stdin().read_to_end(&mut input)?;104105 if input.is_empty() {106 input107 } else {108 let mut encrypted = vec![];109 let recipients = recipients110 .iter()111 .cloned()112 .map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)113 .collect();114 let mut encryptor = age::Encryptor::with_recipients(recipients)115 .expect("recipients provided")116 .wrap_output(&mut encrypted)?;117 io::copy(&mut Cursor::new(input), &mut encryptor)?;118 encryptor.finish()?;119 encrypted120 }121 };122123 if config.has_shared(&name) && !force {124 bail!("secret already defined");125 }126 config.replace_shared(127 name,128 FleetSharedSecret {129 owners: machines,130 secret: FleetSecret {131 expire_at: None,132 secret,133 public: match (public, public_file) {134 (Some(v), None) => Some(v),135 (None, Some(v)) => Some(read_to_string(v).await?),136 (Some(_), Some(_)) => {137 bail!("only public or public_file should be set")138 }139 (None, None) => None,140 },141 },142 },143 );144 }145 Secrets::Add {146 machine,147 name,148 force,149 public,150 public_file,151 } => {152 let recipient = config.recipient(&machine).await?;153154 let secret = {155 let mut input = vec![];156 io::stdin().read_to_end(&mut input)?;157 if input.is_empty() {158 bail!("no data provided")159 }160161 let mut encrypted = vec![];162 let recipient = Box::new(recipient) as Box<dyn age::Recipient + Send>;163 let mut encryptor = age::Encryptor::with_recipients(vec![recipient])164 .expect("recipients provided")165 .wrap_output(&mut encrypted)?;166 io::copy(&mut Cursor::new(input), &mut encryptor)?;167 encryptor.finish()?;168 encrypted169 };170171 if config.has_secret(&machine, &name) && !force {172 bail!("secret already defined");173 }174 config.insert_secret(175 &machine,176 name,177 FleetSecret {178 expire_at: None,179 secret,180 public: match (public, public_file) {181 (Some(v), None) => Some(v),182 (None, Some(v)) => Some(std::fs::read_to_string(v)?),183 (Some(_), Some(_)) => bail!("only public or public_file should be set"),184 (None, None) => None,185 },186 },187 );188 }189 // TODO: Instead of using sudo, decode secret on remote machine190 #[allow(clippy::await_holding_refcell_ref)]191 Secrets::Read {192 name,193 machine,194 plaintext,195 } => {196 let secret = config.host_secret(&machine, &name)?;197 if secret.secret.is_empty() {198 bail!("no secret {name}");199 }200 let data = config.decrypt_on_host(&machine, secret.secret).await?;201 if plaintext {202 let s = String::from_utf8(data).context("output is not utf8")?;203 print!("{s}");204 } else {205 println!("{}", z85::encode(&data));206 }207 }208 Secrets::UpdateShared {209 name,210 machines,211 mut add_machines,212 mut remove_machines,213 prefer_identities,214 } => {215 if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {216 bail!("no operation");217 }218219 let mut secret = config.shared_secret(&name)?;220 if secret.secret.secret.is_empty() {221 bail!("no secret");222 }223224 let initial_machines = secret.owners.clone();225 let mut target_machines = secret.owners.clone();226 info!("Currently encrypted for {initial_machines:?}");227228 // ensure!(machines.is_some() || !add_machines.is_empty() || )229 if let Some(machines) = machines {230 ensure!(231 add_machines.is_empty() && remove_machines.is_empty(),232 "can't combine --machines and --add-machines/--remove-machines"233 );234 let target = initial_machines.iter().collect::<HashSet<_>>();235 let source = machines.iter().collect::<HashSet<_>>();236 for removed in target.difference(&source) {237 remove_machines.push((*removed).clone());238 }239 for added in source.difference(&target) {240 add_machines.push((*added).clone());241 }242 }243244 for machine in &remove_machines {245 let mut removed = false;246 while let Some(pos) = target_machines.iter().position(|m| m == machine) {247 target_machines.swap_remove(pos);248 removed = true;249 }250 if !removed {251 warn!("secret is not enabled for {machine}");252 }253 }254 for machine in &add_machines {255 if target_machines.iter().any(|m| m == machine) {256 warn!("secret is already added to {machine}");257 } else {258 target_machines.push(machine.to_owned());259 }260 }261 if !remove_machines.is_empty() {262 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");263 }264265 if target_machines.is_empty() {266 info!("no machines left for secret, removing it");267 config.remove_shared(&name);268 return Ok(());269 }270271 if target_machines == initial_machines {272 warn!("secret owners are already correct");273 return Ok(());274 }275276 let identity_holder = if !prefer_identities.is_empty() {277 prefer_identities278 .iter()279 .find(|i| initial_machines.iter().any(|s| s == *i))280 } else {281 secret.owners.first()282 };283 let Some(identity_holder) = identity_holder else {284 bail!("no available holder found");285 };286 let target_recipients = futures::stream::iter(&target_machines)287 .then(|m| async { config.key(m).await })288 .collect::<Vec<_>>()289 .await;290 let target_recipients =291 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;292293 let encrypted = config294 .reencrypt_on_host(&identity_holder, secret.secret.secret, target_recipients)295 .await?;296297 secret.owners = target_machines;298 secret.secret.secret = encrypted;299 config.replace_shared(name, secret);300 }301 Secrets::Regenerate { prefer_identities } => {302 {303 let expected_shared_set =304 config.shared_config_attr_names("sharedSecrets").await?;305 let expected_shared_set = expected_shared_set.iter().collect::<HashSet<_>>();306 let shared_set = config.list_shared();307 let shared_set = shared_set.iter().collect::<HashSet<_>>();308 for removed in expected_shared_set.difference(&shared_set) {309 warn!("secret needs to be generated: {removed}")310 }311 }312 let mut to_remove = Vec::new();313 for name in &config.list_shared() {314 info!("updating secret: {name}");315 let mut data = config.shared_secret(name)?;316 let expected_owners: Vec<String> = config317 .shared_config_attr(&format!("sharedSecrets.\"{name}\".expectedOwners"))318 .await?;319 if expected_owners.is_empty() {320 warn!("secret was removed from fleet config: {name}, removing from data");321 to_remove.push(name.to_string());322 continue;323 }324 let set = data.owners.iter().collect::<HashSet<_>>();325 let expected_set = expected_owners.iter().collect::<HashSet<_>>();326 let should_remove = set.difference(&expected_set).next().is_some();327 if set != expected_set {328 let owner_dependent: bool = config329 .shared_config_attr(&format!("sharedSecrets.\"{name}\".ownerDependent"))330 .await?;331 if !owner_dependent {332 warn!("reencrypting secret '{name}' for new owner set");333 // TODO: force regeneration334 if should_remove {335 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");336 }337338 let identity_holder = if !prefer_identities.is_empty() {339 prefer_identities340 .iter()341 .find(|i| data.owners.iter().any(|s| s == *i))342 } else {343 data.owners.first()344 };345 let Some(identity_holder) = identity_holder else {346 bail!("no available holder found");347 };348349 let target_recipients = futures::stream::iter(&expected_owners)350 .then(|m| async { config.key(m).await })351 .collect::<Vec<_>>()352 .await;353 let target_recipients =354 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;355356 let encrypted = config357 .reencrypt_on_host(358 &identity_holder,359 data.secret.secret,360 target_recipients,361 )362 .await?;363364 data.secret.secret = encrypted;365 data.owners = expected_owners;366 config.replace_shared(name.to_owned(), data);367 } else if let Some(generator) = config368 .shared_config_attr::<Option<String>>(&format!("sharedSecrets.\"{name}\".generator"))369 .await?370 {371 todo!("regenerate secret {name} with {generator}");372 } else {373 error!("secret '{name}' should be regenerated manually");374 }375 } else {376 info!("secret data is ok")377 }378 }379 for k in to_remove {380 config.remove_shared(&k);381 }382 }383 }384 Ok(())385 }386}cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,4 +1,4 @@
-use std::{ffi::OsStr, process::Stdio};
+use std::{ffi::OsStr, process::Stdio, task::Poll};
use anyhow::{Context, Result};
use async_trait::async_trait;
@@ -7,20 +7,292 @@
de::{DeserializeOwned, Visitor},
Deserialize,
};
-use tokio::{process::Command, select};
+use tokio::{io::AsyncRead, process::Command, select};
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
use tracing::{info, warn};
+fn escape_bash(input: &str, out: &mut String) {
+ const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
+ if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
+ out.push_str(input);
+ return;
+ }
+ out.push('\'');
+ for (i, v) in input.split('\'').enumerate() {
+ if i != 0 {
+ out.push_str("'\"'\"'");
+ }
+ out.push_str(v);
+ }
+ out.push('\'');
+}
+fn ostoutf8(os: impl AsRef<OsStr>) -> String {
+ os.as_ref().to_str().expect("non-utf8 data").to_owned()
+}
+#[derive(Clone)]
+pub struct MyCommand {
+ command: String,
+ args: Vec<String>,
+ env: Vec<(String, String)>,
+}
+impl MyCommand {
+ pub fn new(cmd: impl AsRef<OsStr>) -> Self {
+ assert!(!cmd.as_ref().is_empty());
+ Self {
+ command: ostoutf8(cmd),
+ args: vec![],
+ env: vec![],
+ }
+ }
+ fn into_args(self) -> Vec<String> {
+ let mut out = Vec::new();
+ if !self.env.is_empty() {
+ out.push("env".to_owned());
+ for (k, v) in self.env {
+ assert!(!k.contains("="));
+ out.push(format!("{k}={v}"));
+ }
+ }
+ out.push(self.command);
+ out.extend(self.args.into_iter());
+ out
+ }
+ fn into_string(self) -> String {
+ let mut out = String::new();
+ if !self.env.is_empty() {
+ out.push_str("env");
+ for (k, v) in self.env {
+ out.push(' ');
+ assert!(!k.contains("="));
+ escape_bash(&k, &mut out);
+ out.push('=');
+ escape_bash(&v, &mut out);
+ }
+ }
+ if !out.is_empty() {
+ out.push(' ');
+ }
+ escape_bash(&self.command, &mut out);
+ for arg in self.args {
+ out.push(' ');
+ escape_bash(&arg, &mut out);
+ }
+ out
+ }
+ fn into_command(self) -> Command {
+ let mut out = Command::new(self.command);
+ out.args(self.args);
+ for (k, v) in self.env {
+ out.env(k, v);
+ }
+ out
+ }
+ pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {
+ let arg = arg.as_ref();
+ self.args.push(ostoutf8(arg));
+ self
+ }
+ pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
+ let arg = arg.as_ref();
+ let value = value.as_ref();
+ let arg = ostoutf8(arg);
+ let value = ostoutf8(value);
+ self.arg(format!("{arg}={value}"));
+ self
+ }
+ pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
+ self.arg(arg);
+ self.arg(value);
+ self
+ }
+ pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
+ for arg in args.into_iter() {
+ let arg = arg.as_ref();
+ self.args.push(ostoutf8(arg));
+ }
+ self
+ }
+ pub fn sudo(self) -> Self {
+ let mut out = Self::new("sudo");
+ out.args(self.into_args());
+ out
+ }
+ pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
+ let mut out = Self::new("ssh");
+ out.arg(on).arg("--");
+ out.arg(self.into_string());
+ out
+ }
+
+ pub async fn run(self) -> Result<()> {
+ let str = self.clone().into_string();
+ info!("running {str}");
+ let mut cmd = self.into_command();
+ cmd.inherit_stdio();
+ let out = cmd.spawn()?.wait_with_output().await?;
+ if !out.status.success() {
+ anyhow::bail!("command '{}' failed with status {}", str, out.status);
+ }
+ Ok(())
+ }
+ pub async fn run_string(self) -> Result<String> {
+ let str = self.clone().into_string();
+ info!("running {str}");
+ let mut cmd = self.into_command();
+ cmd.inherit_stdio();
+ cmd.stdout(Stdio::piped());
+ let out = cmd.spawn()?.wait_with_output().await?;
+ if !out.status.success() {
+ anyhow::bail!("command '{}' failed with status {}", str, out.status);
+ }
+ Ok(String::from_utf8(out.stdout)?)
+ }
+ pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {
+ let str = self.run_nix_string().await?;
+ serde_json::from_str(&str).with_context(|| format!("{:?}", str))
+ }
+
+ pub async fn run_nix_string(self) -> Result<String> {
+ let str = self.clone().into_string();
+ let mut cmd = self.into_command();
+ cmd.stdout(Stdio::piped());
+ run_nix_inner(str, cmd).await.map(|v| v.unwrap())
+ }
+ pub async fn run_nix(self) -> Result<()> {
+ let str = self.clone().into_string();
+ let mut cmd = self.into_command();
+ cmd.stdout(Stdio::inherit());
+ run_nix_inner(str, cmd).await.map(|v| {
+ assert!(v.is_none());
+ })
+ }
+}
+
+struct EmptyAsyncRead;
+impl AsyncRead for EmptyAsyncRead {
+ fn poll_read(
+ self: std::pin::Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ _buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ Poll::Pending
+ }
+}
+
+async fn run_nix_inner(str: String, mut cmd: Command) -> Result<Option<String>> {
+ info!("running {str}");
+ cmd.arg("--log-format").arg("internal-json");
+ cmd.stderr(Stdio::piped());
+ let mut child = cmd.spawn()?;
+ let mut stderr = child.stderr.take().unwrap();
+ let stdout = child.stdout.take();
+ let wants_stdout = stdout.is_some();
+ let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+ let mut out: Box<dyn AsyncRead + Unpin> = stdout
+ .map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin>)
+ .unwrap_or_else(|| Box::new(EmptyAsyncRead));
+ let mut out = FramedRead::new(&mut out, BytesCodec::new());
+
+ // while let Some(line) = read.next().await? {}
+
+ let mut out_buf = if wants_stdout { Some(vec![]) } else { None };
+ loop {
+ select! {
+ e = err.next() => {
+ if let Some(e) = e {
+ let e = e?;
+ if let Some(e) = e.strip_prefix("@nix ") {
+
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ continue;
+ },
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ info!(target: "nix", "{raw_msg}\n{msg}")
+ }else {
+ info!(target: "nix", "{msg}")
+
+ }
+ }
+ },
+ NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ info!(target: "nix","building {}", drv)
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ },
+ NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {
+ info!(target: "nix","copying {} {} -> {}", drv, from, to)
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ },
+ NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {
+ if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {
+ info!(target: "nix", "{}", text)
+ }
+ },
+ NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ },
+ NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start { text, level: 3, typ: 111, .. } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Stop { .. } => {},
+ NixLog::Result { .. } => {},
+ _ => warn!("unknown log: {:?}", log)
+ };
+ } else {
+ warn!(target="nix","unknown: {}", e)
+ }
+ }
+ },
+ o = out.next() => {
+ if let Some(o) = o {
+ out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+ }
+ },
+ code = child.wait() => {
+ let code = code?;
+ if !code.success() {
+ anyhow::bail!("command '{str}' failed with status {}", code);
+ }
+ break;
+ }
+ }
+ }
+
+ Ok(out_buf.map(String::from_utf8).transpose()?)
+}
+
#[async_trait]
pub trait CommandExt {
- async fn run_nix(&mut self) -> Result<()>;
- async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;
- async fn run_nix_string(&mut self) -> Result<String>;
- async fn run(&mut self) -> Result<()>;
- async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;
- async fn run_string(&mut self) -> Result<String>;
+ // async fn run_nix(&mut self) -> Result<()>;
+ // async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;
+ // async fn run_nix_string(&mut self) -> Result<String>;
+ // async fn run(&mut self) -> Result<()>;
+ // async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;
+ // async fn run_string(&mut self) -> Result<String>;
fn inherit_stdio(&mut self) -> &mut Self;
- fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self;
}
#[derive(Debug)]
@@ -91,170 +363,9 @@
#[async_trait]
impl CommandExt for Command {
- async fn run_nix(&mut self) -> Result<()> {
- self.run_nix_string().await.map(|_| ())
- }
- async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T> {
- let str = self.run_nix_string().await?;
- serde_json::from_str(&str).with_context(|| format!("{:?}", str))
- }
-
- async fn run_nix_string(&mut self) -> Result<String> {
- self.arg("--log-format").arg("internal-json");
- self.stderr(Stdio::piped());
- self.stdout(Stdio::piped());
- let mut child = self.spawn()?;
- let mut stderr = child.stderr.take().unwrap();
- let mut stdout = child.stdout.take().unwrap();
- let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
- let mut out = FramedRead::new(&mut stdout, BytesCodec::new());
-
- // while let Some(line) = read.next().await? {}
-
- let mut out_buf = vec![];
- loop {
- select! {
- e = err.next() => {
- if let Some(e) = e {
- let e = e?;
- if let Some(e) = e.strip_prefix("@nix ") {
-
- let log: NixLog = match serde_json::from_str(e) {
- Ok(l) => l,
- Err(err) => {
- warn!("failed to parse nix log line {:?}: {}", e, err);
- continue;
- },
- };
- match log {
- NixLog::Msg { msg, raw_msg, .. } => {
- if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
- && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
- && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
- if let Some(raw_msg) = raw_msg {
- info!(target: "nix", "{raw_msg}\n{msg}")
- }else {
- info!(target: "nix", "{msg}")
-
- }
- }
- },
- NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {
- if let [LogField::String(drv), ..] = &fields[..] {
- info!(target: "nix","building {}", drv)
- } else {
- warn!("bad build log: {:?}", log)
- }
- },
- NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {
- if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {
- info!(target: "nix","copying {} {} -> {}", drv, from, to)
- } else {
- warn!("bad copy log: {:?}", log)
- }
- },
- NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {
- if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {
- info!(target: "nix", "{}", text)
- }
- },
- NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {
- // Cache lookup? Coupled with copy log
- },
- NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {
- // Cache lookup
- }
- NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {
- // NAR downloading, coupled with copy log
- }
- NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {
- // Useless repeating notification about build
- }
- NixLog::Start { text, level: 3, typ: 111, .. } if text.starts_with("resolved derivation: ") => {
- // CA resolved
- }
- NixLog::Stop { .. } => {},
- NixLog::Result { .. } => {},
- _ => warn!("unknown log: {:?}", log)
- };
- } else {
- warn!(target="nix","unknown: {}", e)
- }
- }
- },
- o = out.next() => {
- if let Some(o) = o {
- out_buf.extend_from_slice(&o?);
- }
- },
- code = child.wait() => {
- let code = code?;
- if !code.success() {
- anyhow::bail!("command ({:?}) failed with status {}", self, code);
- }
- break;
- }
- }
- }
-
- Ok(String::from_utf8(out_buf)?)
- }
-
fn inherit_stdio(&mut self) -> &mut Self {
self.stderr(Stdio::inherit());
+ self.stdout(Stdio::inherit());
self
- }
-
- async fn run(&mut self) -> Result<()> {
- self.stderr(Stdio::piped());
- self.stdout(Stdio::piped());
- let mut child = self.spawn()?;
- let mut stderr = child.stderr.take().unwrap();
- let mut stdout = child.stdout.take().unwrap();
- let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
- let mut out = FramedRead::new(&mut stdout, LinesCodec::new());
- loop {
- select! {
- e = err.next() => {
- if let Some(e) = e {
- warn!("{}", e?);
- }
- },
- o = out.next() => {
- if let Some(o) = o {
- info!("{}", o?);
- }
- },
- code = child.wait() => {
- let code = code?;
- if !code.success() {
- anyhow::bail!("command ({:?}) failed with status {}", self, code);
- }
- break;
- }
- }
- }
- Ok(())
- }
-
- async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T> {
- let str = self.run_string().await?;
- serde_json::from_str(&str).with_context(|| format!("{:?}", str))
- }
-
- async fn run_string(&mut self) -> Result<String> {
- self.inherit_stdio();
- self.stdout(Stdio::piped());
- let out = self.spawn()?.wait_with_output().await?;
- if !out.status.success() {
- anyhow::bail!("command ({:?}) failed with status {}", self, out.status);
- }
- Ok(String::from_utf8(out.stdout)?)
- }
-
- fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self {
- let mut cmd = Command::new("ssh");
- cmd.arg(host).arg("--").arg(command);
- cmd
}
}
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,7 +1,7 @@
use std::{
cell::{Ref, RefCell, RefMut},
env::current_dir,
- ffi::{OsStr, OsString},
+ ffi::OsString,
io::Write,
ops::Deref,
path::PathBuf,
@@ -12,10 +12,9 @@
use clap::{ArgGroup, Parser};
use serde::de::DeserializeOwned;
use tempfile::NamedTempFile;
-use tokio::process::Command;
use crate::{
- command::CommandExt,
+ command::MyCommand,
fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
};
@@ -52,24 +51,24 @@
self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
}
- pub fn command_on(&self, host: &str, program: impl AsRef<OsStr>, sudo: bool) -> Command {
- if self.is_local(host) {
- if sudo {
- let mut cmd = Command::new("sudo");
- cmd.arg(program);
- cmd
- } else {
- Command::new(program)
- }
- } else {
- let mut cmd = Command::new("ssh");
- cmd.arg(host).arg("--");
- if sudo {
- cmd.arg("sudo");
- }
- cmd.arg(program);
- cmd
+ pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
+ if sudo {
+ command = command.sudo();
+ }
+ if !self.is_local(host) {
+ command = command.ssh(host);
+ }
+ command.run().await
+ }
+ #[must_use]
+ pub async fn run_string_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<String> {
+ if sudo {
+ command = command.sudo();
+ }
+ if !self.is_local(host) {
+ command = command.ssh(host);
}
+ command.run_string().await
}
pub fn configuration_attr_name(&self, name: &str) -> OsString {
@@ -83,36 +82,36 @@
}
pub async fn list_hosts(&self) -> Result<Vec<String>> {
- Command::new("nix")
- .arg("eval")
+ let mut cmd = MyCommand::new("nix");
+ cmd.arg("eval")
.arg(self.configuration_attr_name("configuredHosts"))
.args(["--apply", "builtins.attrNames", "--json", "--show-trace"])
- .args(&self.nix_args)
- .run_nix_json()
+ .args(&self.nix_args);
+ cmd.run_nix_json()
.await
}
pub async fn shared_config_attr<T: DeserializeOwned>(&self, attr: &str) -> Result<T> {
- Command::new("nix")
- .arg("eval")
+ let mut cmd = MyCommand::new("nix");
+ cmd.arg("eval")
.arg(self.configuration_attr_name(&format!("configUnchecked.{}", attr)))
.args(["--json", "--show-trace"])
- .args(&self.nix_args)
- .run_nix_json()
+ .args(&self.nix_args);
+ cmd.run_nix_json()
.await
}
pub async fn shared_config_attr_names(&self, attr: &str) -> Result<Vec<String>> {
- Command::new("nix")
- .arg("eval")
+ let mut cmd = MyCommand::new("nix");
+ cmd.arg("eval")
.arg(self.configuration_attr_name(&format!("configUnchecked.{}", attr)))
.args(["--apply", "builtins.attrNames"])
.args(["--json", "--show-trace"])
- .args(&self.nix_args)
- .run_nix_json()
+ .args(&self.nix_args);
+ cmd.run_nix_json()
.await
}
pub async fn config_attr<T: DeserializeOwned>(&self, host: &str, attr: &str) -> Result<T> {
- Command::new("nix")
- .arg("eval")
+ let mut cmd = MyCommand::new("nix");
+ cmd.arg("eval")
.arg(
self.configuration_attr_name(&format!(
"configuredSystems.{}.config.{}",
@@ -120,8 +119,8 @@
)),
)
.args(["--json", "--show-trace"])
- .args(&self.nix_args)
- .run_nix_json()
+ .args(&self.nix_args);
+ cmd.run_nix_json()
.await
}
@@ -171,23 +170,20 @@
pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>>{
let data = z85::encode(&data);
- let encoded = self.command_on(host, "fleet-install-secrets", true)
- .arg("decrypt")
- .arg("--secret")
- .arg(data).run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();
+ let mut cmd = MyCommand::new("fleet-install-secrets");
+ cmd.arg("decrypt").eqarg("--secret", data);
+ cmd = cmd.sudo().ssh(host);
+ let encoded = cmd.run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();
Ok(z85::decode(encoded).context("bad encoded data? outdated host?")?)
}
pub async fn reencrypt_on_host(&self, host: &str, data: Vec<u8>, targets: Vec<String>) -> Result<Vec<u8>>{
let data = z85::encode(&data);
- let mut recmd = self.command_on(host, "fleet-install-secrets", true);
- recmd
- .arg("reencrypt")
- .arg("--secret")
- .arg(format!("\"{}\"", data.replace('$', "\\$")));
+ let mut recmd = MyCommand::new("fleet-install-secrets");
+ recmd.arg("reencrypt").eqarg("--secret",data);
for target in targets {
- recmd.arg("--targets");
- recmd.arg(format!("\"{target}\""));
+ recmd.eqarg("--targets", target);
}
+ recmd = recmd.sudo().ssh(host);
let encoded = recmd.run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();
Ok(z85::decode(encoded).context("bad encoded data? outdated host?")?)
}
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,7 @@
use std::str::FromStr;
-use crate::{command::CommandExt, host::Config};
+use crate::command::MyCommand;
+use crate::host::Config;
use anyhow::{anyhow, Result};
use tracing::warn;
@@ -26,11 +27,9 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let key = self
- .command_on(host, "cat", false)
- .arg("/etc/ssh/ssh_host_ed25519_key.pub")
- .run_string()
- .await?;
+ let mut cmd = MyCommand::new("cat");
+ cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
+ let key = self.run_string_on(host, cmd, false).await?;
self.update_key(host, key.clone());
Ok(key)
}
cmds/install-secrets/src/main.rsdiffbeforeafterboth--- a/cmds/install-secrets/src/main.rs
+++ b/cmds/install-secrets/src/main.rs
@@ -250,7 +250,7 @@
if plaintext {
let s = String::from_utf8(decrypted).context("output is not utf8")?;
- print!("{}", s);
+ print!("{s}");
} else {
println!("{}", SecretWrapper(decrypted));
}
modules/fleet/secrets.nixdiffbeforeafterboth--- a/modules/fleet/secrets.nix
+++ b/modules/fleet/secrets.nix
@@ -2,15 +2,6 @@
let
sharedSecret = with types; {
options = {
- owners = mkOption {
- type = listOf str;
- description = ''
- For which owners this secret is currently encrypted,
- if not matches expectedOwners - then this secret is considered outdated, and
- should be regenerated/reencrypted
- '';
- default = [ ];
- };
expectedOwners = mkOption {
type = listOf str;
description = ''
@@ -25,12 +16,38 @@
description = "Is this secret owner-dependent, and needs to be regenerated on ownership set change, or it may be just reencrypted";
};
generator = mkOption {
- type = nullOr package;
- description = ''
- Derivation to execute for secret generation
+ type = nullOr (submodule {
+ packages = mkOption {
+ type = attrsOf package;
+ description = ''
+ Derivation to execute for shared secret generation (key = system).
+ This derivation should produce directory, with exactly two files:
+ - publicData
+ - encryptedSecretData
+
+ If null - secret value may only be created manually.
+ '';
+ };
+ expectedData = mkOption {
+ type = types.unspecified;
+ description = "Data expected to be used for secret generation, if doesn't match specified - secret should be regenerated";
+ };
+ dependencies = mkOption {
+ type = listOf str;
+ description = ''
+ List of secrets, on which this secret depends.
- If null - may only be created manually
- '';
+ During generation, generator command will be ran on host, which already has specified secrets generated.
+ '';
+ default = [];
+ };
+ data = mkOption {
+ type = types.unspecified;
+ description = "Data used for secret generation. Imported from fleet.nix";
+ default = null;
+ internal = true;
+ };
+ });
default = null;
};
expireIn = mkOption {
@@ -38,15 +55,28 @@
description = "Time in hours, in which this secret should be regenerated";
default = null;
};
+
+ owners = mkOption {
+ type = listOf str;
+ description = ''
+ For which owners this secret is currently encrypted,
+ if not matches expectedOwners - then this secret is considered outdated, and
+ should be regenerated/reencrypted.
+
+ Imported from fleet.nix
+ '';
+ default = [ ];
+ };
public = mkOption {
type = nullOr str;
- description = "Secret public data";
+ description = "Secret public data. Imported from fleet.nix";
default = null;
};
secret = mkOption {
type = nullOr str;
- description = "Encrypted secret data";
+ description = "Encrypted secret data. Imported from fleet.nix";
default = null;
+ internal = true;
};
};
};
nixos/secrets.nixdiffbeforeafterboth--- a/nixos/secrets.nix
+++ b/nixos/secrets.nix
@@ -5,14 +5,14 @@
let
sysConfig = config;
secretType = types.submodule ({ config, ... }: {
- config = rec {
- stableSecretPath = mkOptionDefault "/run/secrets/secret-stable-${config._module.args.name}";
- secretPath = mkOptionDefault "/run/secrets/secret-${config.secretHash}-${config._module.args.name}";
- secretHash = mkOptionDefault (if config.secret != null then (builtins.hashString "sha1" config.secret) else "<missingno>");
+ config = let secretName = config._module.args.name; in rec {
+ stableSecretPath = mkOptionDefault "/run/secrets/secret-stable-${secretName}";
+ secretPath = mkOptionDefault "/run/secrets/secret-${config.secretHash}-${secretName}";
+ secretHash = mkOptionDefault (if config.secret != null then (builtins.hashString "sha1" config.secret) else throw "secret is not defined for secret ${secretName}");
- stablePublicPath = mkOptionDefault "/run/secrets/public-stable-${config._module.args.name}";
- publicPath = mkOptionDefault "/run/secrets/public-${config.publicHash}-${config._module.args.name}";
- publicHash = mkOptionDefault (if config.public != null then (builtins.hashString "sha1" config.public) else "<missingno>");
+ stablePublicPath = mkOptionDefault "/run/secrets/public-stable-${secretName}";
+ publicPath = mkOptionDefault "/run/secrets/public-${config.publicHash}-${secretName}";
+ publicHash = mkOptionDefault (if config.public != null then (builtins.hashString "sha1" config.public) else throw "public is not defined for secret ${secretName}");
};
options = {
public = mkOption {
@@ -77,7 +77,13 @@
});
secretsFile = pkgs.writeTextFile {
name = "secrets.json";
- text = builtins.toJSON config.secrets;
+ text = builtins.toJSON (mapAttrs (_: value: rec {
+ inherit (value) group mode owner secret public;
+ publicPath = if public != null then value.publicPath else "/missingno";
+ stablePublicPath = if public != null then value.stablePublicPath else "/missingno";
+ secretPath = if secret != null then value.secretPath else "/missingno";
+ stableSecretPath = if secret != null then value.stableSecretPath else "/missingno";
+ }) config.secrets);
};
in
{