git.delta.rocks / jrsonnet / refs/commits / eee08d567ee4

difftreelog

refactor remote command management

Yaroslav Bolyukin2023-06-09parent: #837e795.patch.diff
in: trunk

8 files changed

modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
1use std::{env::current_dir, process::Stdio, time::Duration};1use std::{env::current_dir, time::Duration};
22
3use crate::command::MyCommand;
3use crate::{command::CommandExt, host::Config};4use crate::host::Config;
4use anyhow::Result;5use anyhow::Result;
5use clap::Parser;6use clap::Parser;
6use tokio::{process::Command, task::LocalSet, time::sleep};7use tokio::{task::LocalSet, time::sleep};
7use tracing::{error, field, info, info_span, warn, Instrument};8use tracing::{error, field, info, info_span, warn, Instrument};
89
9#[derive(Parser, Clone)]10#[derive(Parser, Clone)]
32 }33 }
33 }34 }
3435
35 pub(crate) fn should_switch_profile(&self) -> bool {36 pub(crate) fn should_switch_profile(&self) -> bool {
37 matches!(self, Self::Switch | Self::Boot)
38 }
39 pub(crate) fn should_activate(&self) -> bool {
36 matches!(self, Self::Switch | Self::Test)40 matches!(self, Self::Switch | Self::Test)
37 }41 }
38}42}
108 dir.path().to_owned()112 dir.path().to_owned()
109 };113 };
110114
111 let mut nix_build = if self.privileged_build {115 let mut nix_build = MyCommand::new("nix");
112 let mut out = Command::new("sudo");
113 out.arg("nix");
114 out
115 } else {
116 Command::new("nix")
117 };
118 nix_build116 nix_build
119 .args([117 .args([
120 "build",118 "build",
121 "--impure",119 "--impure",
122 "--json",120 "--json",
123 // "--show-trace",121 // "--show-trace",
124 "--no-link",122 "--no-link",
125 "--out-link",
126 ])123 ])
127 .arg(&built)124 .comparg("--out-link", &built)
128 .arg(125 .arg(
129 config.configuration_attr_name(&format!(126 config.configuration_attr_name(&format!(
130 "buildSystems.{}.{host}",127 "buildSystems.{}.{host}",
133 )130 )
134 .args(&config.nix_args);131 .args(&config.nix_args);
132
133 if self.privileged_build {
134 nix_build = nix_build.sudo();
135 }
135136
136 nix_build.run_nix().await.map_err(|e| {137 nix_build.run_nix().await.map_err(|e| {
137 if action.build_attr() == "sdImage" {138 if action.build_attr() == "sdImage" {
149 info!("uploading system closure");150 info!("uploading system closure");
150 let mut tries = 0;151 let mut tries = 0;
151 loop {152 loop {
153 let mut nix = MyCommand::new("nix");
154 nix.arg("copy")
155 .comparg("--to", format!("ssh://root@{host}"))
156 .arg(&built);
152 match Command::new("nix")157 match nix.run_nix().await {
153 .args(["copy", "--to"])
154 .arg(format!("ssh://root@{}", host))
155 .arg(&built)
170 if let Some(action) = action {168 if let Some(action) = action {
171 if action.should_switch_profile() {169 if action.should_switch_profile() {
172 info!("switching generation");170 info!("switching generation");
171 let mut cmd = MyCommand::new("nix-env");
172 cmd.comparg("--profile", "/nix/var/nix/profiles/system")
173 .comparg("--set", &built);
173 config174 config.run_on(&host, cmd, true).await?;
174 .command_on(&host, "nix-env", true)
175 .args(["-p", "/nix/var/nix/profiles/system", "--set"])
176 .arg(&built)
177 .inherit_stdio()
178 .run()
179 .await?;
180 }175 }
176 if action.should_activate() {
181 info!("executing activation script");177 info!("executing activation script");
182 let mut switch_script = built.clone();178 let mut switch_script = built.clone();
183 switch_script.push("bin");179 switch_script.push("bin");
184 switch_script.push("switch-to-configuration");180 switch_script.push("switch-to-configuration");
181 let mut cmd = MyCommand::new(switch_script);
182 cmd.arg(action.name());
185 config183 config.run_on(&host, cmd, true).await?;
186 .command_on(&host, switch_script, true)184 }
187 .arg(action.name())
188 .stdout(Stdio::inherit())
189 .run()
190 .await?;
191 }185 }
192 }186 }
193 Action::Package(PackageAction::SdImage) => {187 Action::Package(PackageAction::SdImage) => {
194 let mut out = current_dir()?;188 let mut out = current_dir()?;
195 out.push(format!("sd-image-{}", host));189 out.push(format!("sd-image-{}", host));
196190
197 info!("building sd image to {:?}", out);191 info!("building sd image to {:?}", out);
198 let mut nix_build = if self.privileged_build {192 let mut nix_build = MyCommand::new("nix");
199 let mut out = Command::new("sudo");
200 out.arg("nix");
201 out
202 } else {
203 Command::new("nix")
204 };
205 nix_build193 nix_build
206 .args(["build", "--impure", "--no-link", "--out-link"])194 .args(["build", "--impure", "--no-link"])
207 .arg(&out)195 .comparg("--out-link", &out)
208 .arg(config.configuration_attr_name(&format!("buildSystems.sdImage.{}", host,)))196 .arg(config.configuration_attr_name(&format!("buildSystems.sdImage.{}", host,)))
209 .args(&config.nix_args);197 .args(&config.nix_args);
210 if !self.fail_fast {198 if !self.fail_fast {
211 nix_build.arg("--keep-going");199 nix_build.arg("--keep-going");
212 }200 }
201 if self.privileged_build {
202 nix_build = nix_build.sudo();
203 }
213204
214 nix_build.inherit_stdio().run_nix().await?;205 nix_build.run_nix().await?;
215 }206 }
216 Action::Package(PackageAction::InstallationCd) => {207 Action::Package(PackageAction::InstallationCd) => {
217 let mut out = current_dir()?;208 let mut out = current_dir()?;
218 out.push(format!("installation-cd-{}", host));209 out.push(format!("installation-cd-{}", host));
219210
220 info!("building sd image to {:?}", out);211 info!("building sd image to {:?}", out);
221 let mut nix_build = if self.privileged_build {212 let mut nix_build = MyCommand::new("nix");
222 let mut out = Command::new("sudo");
223 out.arg("nix");
224 out
225 } else {
226 Command::new("nix")
227 };
228 nix_build213 nix_build
229 .args(["build", "--impure", "--no-link", "--out-link"])214 .args(["build", "--impure", "--no-link"])
230 .arg(&out)215 .comparg("--out-link", &out)
231 .arg(216 .arg(
232 config.configuration_attr_name(&format!(217 config.configuration_attr_name(&format!(
233 "buildSystems.installationCd.{}",218 "buildSystems.installationCd.{}",
238 if !self.fail_fast {223 if !self.fail_fast {
239 nix_build.arg("--keep-going");224 nix_build.arg("--keep-going");
240 }225 }
226 if self.privileged_build {
227 nix_build = nix_build.sudo();
228 }
241229
242 nix_build.inherit_stdio().run_nix().await?;230 nix_build.run_nix().await?;
243 }231 }
244 };232 };
245 Ok(())233 Ok(())
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
2 fleetdata::{FleetSecret, FleetSharedSecret},2 fleetdata::{FleetSecret, FleetSharedSecret},
3 host::Config,3 host::Config,
4};4};
5use age::{Decryptor, Encryptor};
6use anyhow::{bail, ensure, Context, Result};5use anyhow::{bail, ensure, Context, Result};
7use clap::Parser;6use clap::Parser;
8use futures::{StreamExt, TryStreamExt};7use futures::{StreamExt, TryStreamExt};
9use std::{8use std::{
10 collections::HashSet,9 collections::HashSet,
11 io::{self, Cursor, Read, Write},10 io::{self, Cursor, Read},
12 iter,
13 path::PathBuf,11 path::PathBuf,
14};12};
15use tokio::fs::read_to_string;13use tokio::fs::read_to_string;
16use tracing::{info, warn};14use tracing::{info, warn, error};
1715
18#[derive(Parser)]16#[derive(Parser)]
19pub enum Secrets {17pub enum Secrets {
313 }311 }
314 let mut to_remove = Vec::new();312 let mut to_remove = Vec::new();
315 for name in &config.list_shared() {313 for name in &config.list_shared() {
314 info!("updating secret: {name}");
316 let mut data = config.shared_secret(name)?;315 let mut data = config.shared_secret(name)?;
317 let expected_owners: Vec<String> = config316 let expected_owners: Vec<String> = config
318 .shared_config_attr(&format!("sharedSecrets.\"{name}\".expectedOwners"))317 .shared_config_attr(&format!("sharedSecrets.\"{name}\".expectedOwners"))
326 let expected_set = expected_owners.iter().collect::<HashSet<_>>();325 let expected_set = expected_owners.iter().collect::<HashSet<_>>();
327 let should_remove = set.difference(&expected_set).next().is_some();326 let should_remove = set.difference(&expected_set).next().is_some();
328 if set != expected_set {327 if set != expected_set {
329 warn!("reconfiguring owners for {name}");
330 let generator: Option<String> = config328 let owner_dependent: bool = config
331 .shared_config_attr(&format!("sharedSecrets.\"{name}\".generator"))329 .shared_config_attr(&format!("sharedSecrets.\"{name}\".ownerDependent"))
332 .await?;330 .await?;
333 // TODO: if !.owner_dependent
334 if let Some(str) = generator {331 if !owner_dependent {
335 todo!("regenerate")332 warn!("reencrypting secret '{name}' for new owner set");
336 } else {333 // TODO: force regeneration
337 if should_remove {334 if should_remove {
338 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");335 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");
339 }336 }
367 data.secret.secret = encrypted;364 data.secret.secret = encrypted;
368 data.owners = expected_owners;365 data.owners = expected_owners;
369 config.replace_shared(name.to_owned(), data);366 config.replace_shared(name.to_owned(), data);
370 }367 } else if let Some(generator) = config
368 .shared_config_attr::<Option<String>>(&format!("sharedSecrets.\"{name}\".generator"))
369 .await?
370 {
371 todo!("regenerate secret {name} with {generator}");
372 } else {
373 error!("secret '{name}' should be regenerated manually");
374 }
371 }375 } else {
376 info!("secret data is ok")
377 }
372 }378 }
373 for k in to_remove {379 for k in to_remove {
374 config.remove_shared(&k);380 config.remove_shared(&k);
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
1use std::{ffi::OsStr, process::Stdio};1use std::{ffi::OsStr, process::Stdio, task::Poll};
22
3use anyhow::{Context, Result};3use anyhow::{Context, Result};
4use async_trait::async_trait;4use async_trait::async_trait;
7 de::{DeserializeOwned, Visitor},7 de::{DeserializeOwned, Visitor},
8 Deserialize,8 Deserialize,
9};9};
10use tokio::{process::Command, select};10use tokio::{io::AsyncRead, process::Command, select};
11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
12use tracing::{info, warn};12use tracing::{info, warn};
1313
14fn escape_bash(input: &str, out: &mut String) {
15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
17 out.push_str(input);
18 return;
19 }
20 out.push('\'');
21 for (i, v) in input.split('\'').enumerate() {
22 if i != 0 {
23 out.push_str("'\"'\"'");
24 }
25 out.push_str(v);
26 }
27 out.push('\'');
28}
29fn ostoutf8(os: impl AsRef<OsStr>) -> String {
30 os.as_ref().to_str().expect("non-utf8 data").to_owned()
31}
32#[derive(Clone)]
33pub struct MyCommand {
34 command: String,
35 args: Vec<String>,
36 env: Vec<(String, String)>,
37}
38impl MyCommand {
39 pub fn new(cmd: impl AsRef<OsStr>) -> Self {
40 assert!(!cmd.as_ref().is_empty());
41 Self {
42 command: ostoutf8(cmd),
43 args: vec![],
14#[async_trait]44 env: vec![],
45 }
46 }
47 fn into_args(self) -> Vec<String> {
48 let mut out = Vec::new();
49 if !self.env.is_empty() {
50 out.push("env".to_owned());
51 for (k, v) in self.env {
52 assert!(!k.contains("="));
53 out.push(format!("{k}={v}"));
54 }
55 }
56 out.push(self.command);
57 out.extend(self.args.into_iter());
58 out
59 }
60 fn into_string(self) -> String {
61 let mut out = String::new();
62 if !self.env.is_empty() {
63 out.push_str("env");
64 for (k, v) in self.env {
65 out.push(' ');
66 assert!(!k.contains("="));
67 escape_bash(&k, &mut out);
68 out.push('=');
69 escape_bash(&v, &mut out);
70 }
71 }
72 if !out.is_empty() {
73 out.push(' ');
74 }
75 escape_bash(&self.command, &mut out);
76 for arg in self.args {
77 out.push(' ');
78 escape_bash(&arg, &mut out);
79 }
80 out
81 }
82 fn into_command(self) -> Command {
83 let mut out = Command::new(self.command);
84 out.args(self.args);
85 for (k, v) in self.env {
86 out.env(k, v);
87 }
88 out
89 }
15pub trait CommandExt {90 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {
91 let arg = arg.as_ref();
92 self.args.push(ostoutf8(arg));
93 self
94 }
16 async fn run_nix(&mut self) -> Result<()>;95 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
96 let arg = arg.as_ref();
97 let value = value.as_ref();
98 let arg = ostoutf8(arg);
99 let value = ostoutf8(value);
100 self.arg(format!("{arg}={value}"));
101 self
102 }
103 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
104 self.arg(arg);
105 self.arg(value);
106 self
107 }
17 async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;108 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
109 for arg in args.into_iter() {
110 let arg = arg.as_ref();
111 self.args.push(ostoutf8(arg));
112 }
113 self
114 }
115 pub fn sudo(self) -> Self {
116 let mut out = Self::new("sudo");
117 out.args(self.into_args());
118 out
119 }
120 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
121 let mut out = Self::new("ssh");
122 out.arg(on).arg("--");
123 out.arg(self.into_string());
124 out
125 }
126
18 async fn run_nix_string(&mut self) -> Result<String>;127 pub async fn run(self) -> Result<()> {
128 let str = self.clone().into_string();
129 info!("running {str}");
130 let mut cmd = self.into_command();
131 cmd.inherit_stdio();
132 let out = cmd.spawn()?.wait_with_output().await?;
133 if !out.status.success() {
134 anyhow::bail!("command '{}' failed with status {}", str, out.status);
135 }
136 Ok(())
137 }
19 async fn run(&mut self) -> Result<()>;138 pub async fn run_string(self) -> Result<String> {
139 let str = self.clone().into_string();
140 info!("running {str}");
141 let mut cmd = self.into_command();
142 cmd.inherit_stdio();
143 cmd.stdout(Stdio::piped());
144 let out = cmd.spawn()?.wait_with_output().await?;
145 if !out.status.success() {
146 anyhow::bail!("command '{}' failed with status {}", str, out.status);
147 }
148 Ok(String::from_utf8(out.stdout)?)
149 }
20 async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;150 pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {
151 let str = self.run_nix_string().await?;
152 serde_json::from_str(&str).with_context(|| format!("{:?}", str))
153 }
154
21 async fn run_string(&mut self) -> Result<String>;155 pub async fn run_nix_string(self) -> Result<String> {
156 let str = self.clone().into_string();
157 let mut cmd = self.into_command();
158 cmd.stdout(Stdio::piped());
159 run_nix_inner(str, cmd).await.map(|v| v.unwrap())
160 }
22 fn inherit_stdio(&mut self) -> &mut Self;161 pub async fn run_nix(self) -> Result<()> {
23 fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self;162 let str = self.clone().into_string();
24}163 let mut cmd = self.into_command();
164 cmd.stdout(Stdio::inherit());
165 run_nix_inner(str, cmd).await.map(|v| {
166 assert!(v.is_none());
167 })
168 }
169}
170
171struct EmptyAsyncRead;
172impl AsyncRead for EmptyAsyncRead {
173 fn poll_read(
174 self: std::pin::Pin<&mut Self>,
175 _cx: &mut std::task::Context<'_>,
176 _buf: &mut tokio::io::ReadBuf<'_>,
177 ) -> Poll<std::io::Result<()>> {
178 Poll::Pending
179 }
180}
181
182async fn run_nix_inner(str: String, mut cmd: Command) -> Result<Option<String>> {
183 info!("running {str}");
184 cmd.arg("--log-format").arg("internal-json");
185 cmd.stderr(Stdio::piped());
186 let mut child = cmd.spawn()?;
187 let mut stderr = child.stderr.take().unwrap();
188 let stdout = child.stdout.take();
189 let wants_stdout = stdout.is_some();
190 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
191 let mut out: Box<dyn AsyncRead + Unpin> = stdout
192 .map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin>)
193 .unwrap_or_else(|| Box::new(EmptyAsyncRead));
194 let mut out = FramedRead::new(&mut out, BytesCodec::new());
195
196 // while let Some(line) = read.next().await? {}
197
198 let mut out_buf = if wants_stdout { Some(vec![]) } else { None };
199 loop {
200 select! {
201 e = err.next() => {
202 if let Some(e) = e {
203 let e = e?;
204 if let Some(e) = e.strip_prefix("@nix ") {
205
206 let log: NixLog = match serde_json::from_str(e) {
207 Ok(l) => l,
208 Err(err) => {
209 warn!("failed to parse nix log line {:?}: {}", e, err);
210 continue;
211 },
212 };
213 match log {
214 NixLog::Msg { msg, raw_msg, .. } => {
215 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
216 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
217 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
218 if let Some(raw_msg) = raw_msg {
219 info!(target: "nix", "{raw_msg}\n{msg}")
220 }else {
221 info!(target: "nix", "{msg}")
222
223 }
224 }
225 },
226 NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {
227 if let [LogField::String(drv), ..] = &fields[..] {
228 info!(target: "nix","building {}", drv)
229 } else {
230 warn!("bad build log: {:?}", log)
231 }
232 },
233 NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {
234 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {
235 info!(target: "nix","copying {} {} -> {}", drv, from, to)
236 } else {
237 warn!("bad copy log: {:?}", log)
238 }
239 },
240 NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {
241 if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {
242 info!(target: "nix", "{}", text)
243 }
244 },
245 NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {
246 // Cache lookup? Coupled with copy log
247 },
248 NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {
249 // Cache lookup
250 }
251 NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {
252 // NAR downloading, coupled with copy log
253 }
254 NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {
255 // Useless repeating notification about build
256 }
257 NixLog::Start { text, level: 3, typ: 111, .. } if text.starts_with("resolved derivation: ") => {
258 // CA resolved
259 }
260 NixLog::Stop { .. } => {},
261 NixLog::Result { .. } => {},
262 _ => warn!("unknown log: {:?}", log)
263 };
264 } else {
265 warn!(target="nix","unknown: {}", e)
266 }
267 }
268 },
269 o = out.next() => {
270 if let Some(o) = o {
271 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
272 }
273 },
274 code = child.wait() => {
275 let code = code?;
276 if !code.success() {
277 anyhow::bail!("command '{str}' failed with status {}", code);
278 }
279 break;
280 }
281 }
282 }
283
284 Ok(out_buf.map(String::from_utf8).transpose()?)
285}
286
287#[async_trait]
288pub trait CommandExt {
289 // async fn run_nix(&mut self) -> Result<()>;
290 // async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;
291 // async fn run_nix_string(&mut self) -> Result<String>;
292 // async fn run(&mut self) -> Result<()>;
293 // async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;
294 // async fn run_string(&mut self) -> Result<String>;
295 fn inherit_stdio(&mut self) -> &mut Self;
296}
25297
26#[derive(Debug)]298#[derive(Debug)]
27enum LogField {299enum LogField {
91363
92#[async_trait]364#[async_trait]
93impl CommandExt for Command {365impl CommandExt for Command {
94 async fn run_nix(&mut self) -> Result<()> {366 fn inherit_stdio(&mut self) -> &mut Self {
95 self.run_nix_string().await.map(|_| ())
96 }
97 async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T> {
98 let str = self.run_nix_string().await?;
99 serde_json::from_str(&str).with_context(|| format!("{:?}", str))
100 }
101
102 async fn run_nix_string(&mut self) -> Result<String> {
103 self.arg("--log-format").arg("internal-json");
104 self.stderr(Stdio::piped());
105 self.stdout(Stdio::piped());
106 let mut child = self.spawn()?;
107 let mut stderr = child.stderr.take().unwrap();
108 let mut stdout = child.stdout.take().unwrap();
109 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
110 let mut out = FramedRead::new(&mut stdout, BytesCodec::new());
111
112 // while let Some(line) = read.next().await? {}
113
114 let mut out_buf = vec![];
115 loop {
116 select! {
117 e = err.next() => {
118 if let Some(e) = e {
119 let e = e?;
120 if let Some(e) = e.strip_prefix("@nix ") {
121
122 let log: NixLog = match serde_json::from_str(e) {
123 Ok(l) => l,
124 Err(err) => {
125 warn!("failed to parse nix log line {:?}: {}", e, err);
126 continue;
127 },
128 };
129 match log {
130 NixLog::Msg { msg, raw_msg, .. } => {
131 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
132 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
133 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
134 if let Some(raw_msg) = raw_msg {
135 info!(target: "nix", "{raw_msg}\n{msg}")
136 }else {
137 info!(target: "nix", "{msg}")
138
139 }
140 }
141 },
142 NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {
143 if let [LogField::String(drv), ..] = &fields[..] {
144 info!(target: "nix","building {}", drv)
145 } else {
146 warn!("bad build log: {:?}", log)
147 }
148 },
149 NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {
150 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {
151 info!(target: "nix","copying {} {} -> {}", drv, from, to)
152 } else {
153 warn!("bad copy log: {:?}", log)
154 }
155 },
156 NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {
157 if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {
158 info!(target: "nix", "{}", text)
159 }
160 },
161 NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {
162 // Cache lookup? Coupled with copy log
163 },
164 NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {
165 // Cache lookup
166 }
167 NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {
168 // NAR downloading, coupled with copy log
169 }
170 NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {
171 // Useless repeating notification about build
172 }
173 NixLog::Start { text, level: 3, typ: 111, .. } if text.starts_with("resolved derivation: ") => {
174 // CA resolved
175 }
176 NixLog::Stop { .. } => {},
177 NixLog::Result { .. } => {},
178 _ => warn!("unknown log: {:?}", log)
179 };
180 } else {
181 warn!(target="nix","unknown: {}", e)
182 }
183 }
184 },
185 o = out.next() => {
186 if let Some(o) = o {
187 out_buf.extend_from_slice(&o?);
188 }
189 },
190 code = child.wait() => {
191 let code = code?;
192 if !code.success() {
193 anyhow::bail!("command ({:?}) failed with status {}", self, code);
194 }
195 break;
196 }
197 }
198 }
199
200 Ok(String::from_utf8(out_buf)?)
201 }
202
203 fn inherit_stdio(&mut self) -> &mut Self {
204 self.stderr(Stdio::inherit());
205 self
206 }
207
208 async fn run(&mut self) -> Result<()> {
209 self.stderr(Stdio::piped());367 self.stderr(Stdio::inherit());
210 self.stdout(Stdio::piped());368 self.stdout(Stdio::inherit());
211 let mut child = self.spawn()?;369 self
212 let mut stderr = child.stderr.take().unwrap();
213 let mut stdout = child.stdout.take().unwrap();
214 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
215 let mut out = FramedRead::new(&mut stdout, LinesCodec::new());
216 loop {
217 select! {
218 e = err.next() => {
219 if let Some(e) = e {
220 warn!("{}", e?);
221 }
222 },
223 o = out.next() => {
224 if let Some(o) = o {
225 info!("{}", o?);
226 }
227 },
228 code = child.wait() => {
229 let code = code?;
230 if !code.success() {
231 anyhow::bail!("command ({:?}) failed with status {}", self, code);
232 }
233 break;
234 }
235 }
236 }
237 Ok(())
238 }370 }
239
240 async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T> {
241 let str = self.run_string().await?;
242 serde_json::from_str(&str).with_context(|| format!("{:?}", str))
243 }
244
245 async fn run_string(&mut self) -> Result<String> {
246 self.inherit_stdio();
247 self.stdout(Stdio::piped());
248 let out = self.spawn()?.wait_with_output().await?;
249 if !out.status.success() {
250 anyhow::bail!("command ({:?}) failed with status {}", self, out.status);
251 }
252 Ok(String::from_utf8(out.stdout)?)
253 }
254
255 fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self {
256 let mut cmd = Command::new("ssh");
257 cmd.arg(host).arg("--").arg(command);
258 cmd
259 }
260}371}
261372
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
1use std::{1use std::{
2 cell::{Ref, RefCell, RefMut},2 cell::{Ref, RefCell, RefMut},
3 env::current_dir,3 env::current_dir,
4 ffi::{OsStr, OsString},4 ffi::OsString,
5 io::Write,5 io::Write,
6 ops::Deref,6 ops::Deref,
7 path::PathBuf,7 path::PathBuf,
12use clap::{ArgGroup, Parser};12use clap::{ArgGroup, Parser};
13use serde::de::DeserializeOwned;13use serde::de::DeserializeOwned;
14use tempfile::NamedTempFile;14use tempfile::NamedTempFile;
15use tokio::process::Command;
1615
17use crate::{16use crate::{
18 command::CommandExt,17 command::MyCommand,
19 fleetdata::{FleetData, FleetSecret, FleetSharedSecret},18 fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
20};19};
2120
52 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)51 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
53 }52 }
5453
55 pub fn command_on(&self, host: &str, program: impl AsRef<OsStr>, sudo: bool) -> Command {54 pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
55 if sudo {
56 command = command.sudo();
57 }
58 if !self.is_local(host) {
59 command = command.ssh(host);
60 }
61 command.run().await
62 }
63 #[must_use]
56 if self.is_local(host) {64 pub async fn run_string_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<String> {
57 if sudo {
58 let mut cmd = Command::new("sudo");
59 cmd.arg(program);
60 cmd
61 } else {
62 Command::new(program)
63 }
64 } else {
65 let mut cmd = Command::new("ssh");
66 cmd.arg(host).arg("--");
67 if sudo {65 if sudo {
68 cmd.arg("sudo");66 command = command.sudo();
69 }67 }
68 if !self.is_local(host) {
70 cmd.arg(program);69 command = command.ssh(host);
71 cmd70 }
71 command.run_string().await
72 }72 }
73 }
7473
75 pub fn configuration_attr_name(&self, name: &str) -> OsString {74 pub fn configuration_attr_name(&self, name: &str) -> OsString {
76 let mut str = self.directory.as_os_str().to_owned();75 let mut str = self.directory.as_os_str().to_owned();
83 }82 }
8483
85 pub async fn list_hosts(&self) -> Result<Vec<String>> {84 pub async fn list_hosts(&self) -> Result<Vec<String>> {
86 Command::new("nix")85 let mut cmd = MyCommand::new("nix");
87 .arg("eval")86 cmd.arg("eval")
88 .arg(self.configuration_attr_name("configuredHosts"))87 .arg(self.configuration_attr_name("configuredHosts"))
89 .args(["--apply", "builtins.attrNames", "--json", "--show-trace"])
90 .args(&self.nix_args)88 .args(["--apply", "builtins.attrNames", "--json", "--show-trace"])
91 .run_nix_json()89 .args(&self.nix_args);
92 .await90 cmd.run_nix_json()
91 .await
93 }92 }
94 pub async fn shared_config_attr<T: DeserializeOwned>(&self, attr: &str) -> Result<T> {93 pub async fn shared_config_attr<T: DeserializeOwned>(&self, attr: &str) -> Result<T> {
95 Command::new("nix")94 let mut cmd = MyCommand::new("nix");
96 .arg("eval")95 cmd.arg("eval")
97 .arg(self.configuration_attr_name(&format!("configUnchecked.{}", attr)))96 .arg(self.configuration_attr_name(&format!("configUnchecked.{}", attr)))
98 .args(["--json", "--show-trace"])
99 .args(&self.nix_args)97 .args(["--json", "--show-trace"])
100 .run_nix_json()98 .args(&self.nix_args);
101 .await99 cmd.run_nix_json()
100 .await
102 }101 }
103 pub async fn shared_config_attr_names(&self, attr: &str) -> Result<Vec<String>> {102 pub async fn shared_config_attr_names(&self, attr: &str) -> Result<Vec<String>> {
104 Command::new("nix")103 let mut cmd = MyCommand::new("nix");
105 .arg("eval")104 cmd.arg("eval")
106 .arg(self.configuration_attr_name(&format!("configUnchecked.{}", attr)))105 .arg(self.configuration_attr_name(&format!("configUnchecked.{}", attr)))
107 .args(["--apply", "builtins.attrNames"])
108 .args(["--json", "--show-trace"])106 .args(["--apply", "builtins.attrNames"])
109 .args(&self.nix_args)107 .args(["--json", "--show-trace"])
110 .run_nix_json()108 .args(&self.nix_args);
111 .await109 cmd.run_nix_json()
110 .await
112 }111 }
113 pub async fn config_attr<T: DeserializeOwned>(&self, host: &str, attr: &str) -> Result<T> {112 pub async fn config_attr<T: DeserializeOwned>(&self, host: &str, attr: &str) -> Result<T> {
114 Command::new("nix")113 let mut cmd = MyCommand::new("nix");
115 .arg("eval")114 cmd.arg("eval")
116 .arg(115 .arg(
117 self.configuration_attr_name(&format!(116 self.configuration_attr_name(&format!(
118 "configuredSystems.{}.config.{}",117 "configuredSystems.{}.config.{}",
119 host, attr118 host, attr
120 )),119 )),
121 )120 )
122 .args(["--json", "--show-trace"])
123 .args(&self.nix_args)121 .args(["--json", "--show-trace"])
124 .run_nix_json()122 .args(&self.nix_args);
125 .await123 cmd.run_nix_json()
124 .await
126 }125 }
127126
128 pub(super) fn data(&self) -> Ref<FleetData> {127 pub(super) fn data(&self) -> Ref<FleetData> {
171170
172 pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>>{171 pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>>{
173 let data = z85::encode(&data);172 let data = z85::encode(&data);
173 let mut cmd = MyCommand::new("fleet-install-secrets");
174 cmd.arg("decrypt").eqarg("--secret", data);
175 cmd = cmd.sudo().ssh(host);
174 let encoded = self.command_on(host, "fleet-install-secrets", true)176 let encoded = cmd.run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();
175 .arg("decrypt")
176 .arg("--secret")
177 .arg(data).run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();
178 Ok(z85::decode(encoded).context("bad encoded data? outdated host?")?)177 Ok(z85::decode(encoded).context("bad encoded data? outdated host?")?)
179 }178 }
180 pub async fn reencrypt_on_host(&self, host: &str, data: Vec<u8>, targets: Vec<String>) -> Result<Vec<u8>>{179 pub async fn reencrypt_on_host(&self, host: &str, data: Vec<u8>, targets: Vec<String>) -> Result<Vec<u8>>{
181 let data = z85::encode(&data);180 let data = z85::encode(&data);
182 let mut recmd = self.command_on(host, "fleet-install-secrets", true);181 let mut recmd = MyCommand::new("fleet-install-secrets");
183 recmd182 recmd.arg("reencrypt").eqarg("--secret",data);
184 .arg("reencrypt")
185 .arg("--secret")
186 .arg(format!("\"{}\"", data.replace('$', "\\$")));
187 for target in targets {183 for target in targets {
188 recmd.arg("--targets");184 recmd.eqarg("--targets", target);
189 recmd.arg(format!("\"{target}\""));
190 }185 }
186 recmd = recmd.sudo().ssh(host);
191 let encoded = recmd.run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();187 let encoded = recmd.run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();
192 Ok(z85::decode(encoded).context("bad encoded data? outdated host?")?)188 Ok(z85::decode(encoded).context("bad encoded data? outdated host?")?)
193 }189 }
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
1use std::str::FromStr;1use std::str::FromStr;
22
3use crate::command::MyCommand;
3use crate::{command::CommandExt, host::Config};4use crate::host::Config;
4use anyhow::{anyhow, Result};5use anyhow::{anyhow, Result};
5use tracing::warn;6use tracing::warn;
67
26 Ok(key)27 Ok(key)
27 } else {28 } else {
28 warn!("Loading key for {}", host);29 warn!("Loading key for {}", host);
30 let mut cmd = MyCommand::new("cat");
31 cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
29 let key = self32 let key = self.run_string_on(host, cmd, false).await?;
30 .command_on(host, "cat", false)
31 .arg("/etc/ssh/ssh_host_ed25519_key.pub")
32 .run_string()
33 .await?;
modifiedcmds/install-secrets/src/main.rsdiffbeforeafterboth
250250
251 if plaintext {251 if plaintext {
252 let s = String::from_utf8(decrypted).context("output is not utf8")?;252 let s = String::from_utf8(decrypted).context("output is not utf8")?;
253 print!("{}", s);253 print!("{s}");
254 } else {254 } else {
255 println!("{}", SecretWrapper(decrypted));255 println!("{}", SecretWrapper(decrypted));
256 }256 }
modifiedmodules/fleet/secrets.nixdiffbeforeafterboth
2let2let
3 sharedSecret = with types; {3 sharedSecret = with types; {
4 options = {4 options = {
5 owners = mkOption {
6 type = listOf str;
7 description = ''
8 For which owners this secret is currently encrypted,
9 if not matches expectedOwners - then this secret is considered outdated, and
10 should be regenerated/reencrypted
11 '';
12 default = [ ];
13 };
14 expectedOwners = mkOption {5 expectedOwners = mkOption {
15 type = listOf str;6 type = listOf str;
16 description = ''7 description = ''
25 description = "Is this secret owner-dependent, and needs to be regenerated on ownership set change, or it may be just reencrypted";16 description = "Is this secret owner-dependent, and needs to be regenerated on ownership set change, or it may be just reencrypted";
26 };17 };
27 generator = mkOption {18 generator = mkOption {
28 type = nullOr package;19 type = nullOr (submodule {
29 description = ''20 packages = mkOption {
30 Derivation to execute for secret generation21 type = attrsOf package;
3122 description = ''
32 If null - may only be created manually23 Derivation to execute for shared secret generation (key = system).
33 '';24 This derivation should produce directory, with exactly two files:
25 - publicData
26 - encryptedSecretData
27
28 If null - secret value may only be created manually.
29 '';
30 };
31 expectedData = mkOption {
32 type = types.unspecified;
33 description = "Data expected to be used for secret generation, if doesn't match specified - secret should be regenerated";
34 };
35 dependencies = mkOption {
36 type = listOf str;
37 description = ''
38 List of secrets, on which this secret depends.
39
40 During generation, generator command will be ran on host, which already has specified secrets generated.
41 '';
42 default = [];
43 };
44 data = mkOption {
45 type = types.unspecified;
46 description = "Data used for secret generation. Imported from fleet.nix";
47 default = null;
48 internal = true;
49 };
50 });
34 default = null;51 default = null;
35 };52 };
36 expireIn = mkOption {53 expireIn = mkOption {
39 default = null;56 default = null;
40 };57 };
58
59 owners = mkOption {
60 type = listOf str;
61 description = ''
62 For which owners this secret is currently encrypted,
63 if not matches expectedOwners - then this secret is considered outdated, and
64 should be regenerated/reencrypted.
65
66 Imported from fleet.nix
67 '';
68 default = [ ];
69 };
41 public = mkOption {70 public = mkOption {
42 type = nullOr str;71 type = nullOr str;
43 description = "Secret public data";72 description = "Secret public data. Imported from fleet.nix";
44 default = null;73 default = null;
45 };74 };
46 secret = mkOption {75 secret = mkOption {
47 type = nullOr str;76 type = nullOr str;
48 description = "Encrypted secret data";77 description = "Encrypted secret data. Imported from fleet.nix";
49 default = null;78 default = null;
79 internal = true;
50 };80 };
51 };81 };
52 };82 };
modifiednixos/secrets.nixdiffbeforeafterboth
5let5let
6 sysConfig = config;6 sysConfig = config;
7 secretType = types.submodule ({ config, ... }: {7 secretType = types.submodule ({ config, ... }: {
8 config = rec {8 config = let secretName = config._module.args.name; in rec {
9 stableSecretPath = mkOptionDefault "/run/secrets/secret-stable-${config._module.args.name}";9 stableSecretPath = mkOptionDefault "/run/secrets/secret-stable-${secretName}";
10 secretPath = mkOptionDefault "/run/secrets/secret-${config.secretHash}-${config._module.args.name}";10 secretPath = mkOptionDefault "/run/secrets/secret-${config.secretHash}-${secretName}";
11 secretHash = mkOptionDefault (if config.secret != null then (builtins.hashString "sha1" config.secret) else "<missingno>");11 secretHash = mkOptionDefault (if config.secret != null then (builtins.hashString "sha1" config.secret) else throw "secret is not defined for secret ${secretName}");
1212
13 stablePublicPath = mkOptionDefault "/run/secrets/public-stable-${config._module.args.name}";13 stablePublicPath = mkOptionDefault "/run/secrets/public-stable-${secretName}";
14 publicPath = mkOptionDefault "/run/secrets/public-${config.publicHash}-${config._module.args.name}";14 publicPath = mkOptionDefault "/run/secrets/public-${config.publicHash}-${secretName}";
15 publicHash = mkOptionDefault (if config.public != null then (builtins.hashString "sha1" config.public) else "<missingno>");15 publicHash = mkOptionDefault (if config.public != null then (builtins.hashString "sha1" config.public) else throw "public is not defined for secret ${secretName}");
16 };16 };
17 options = {17 options = {
18 public = mkOption {18 public = mkOption {
77 });77 });
78 secretsFile = pkgs.writeTextFile {78 secretsFile = pkgs.writeTextFile {
79 name = "secrets.json";79 name = "secrets.json";
80 text = builtins.toJSON config.secrets;80 text = builtins.toJSON (mapAttrs (_: value: rec {
81 inherit (value) group mode owner secret public;
82 publicPath = if public != null then value.publicPath else "/missingno";
83 stablePublicPath = if public != null then value.stablePublicPath else "/missingno";
84 secretPath = if secret != null then value.secretPath else "/missingno";
85 stableSecretPath = if secret != null then value.stableSecretPath else "/missingno";
86 }) config.secrets);
81 };87 };
82in88in
83{89{