difftreelog
refactor shell abstraction
in: trunk
6 files changed
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -472,7 +472,7 @@
($field:ident $($tt:tt)*) => {{
use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};
#[allow(unused_mut, reason = "might be used if indexed")]
- let mut out = NixExprBuilder::field($field);
+ let mut out = NixExprBuilder::field($field.clone());
nix_expr_inner!(@field(out) $($tt)*);
out
}};
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth1use std::os::unix::fs::symlink;2use std::path::PathBuf;3use std::{env::current_dir, time::Duration};45use crate::command::MyCommand;6use crate::host::Config;7use crate::nix_go;8use anyhow::{anyhow, Result};9use clap::Parser;10use itertools::Itertools;11use tokio::{task::LocalSet, time::sleep};12use tracing::{error, field, info, info_span, warn, Instrument};1314#[derive(Parser, Clone)]15pub struct BuildSystems {16 /// Disable automatic rollback17 #[clap(long)]18 disable_rollback: bool,19 #[clap(subcommand)]20 subcommand: Subcommand,21}2223enum UploadAction {24 Test,25 Boot,26 Switch,27}28impl UploadAction {29 fn name(&self) -> &'static str {30 match self {31 UploadAction::Test => "test",32 UploadAction::Boot => "boot",33 UploadAction::Switch => "switch",34 }35 }3637 pub(crate) fn should_switch_profile(&self) -> bool {38 matches!(self, Self::Switch | Self::Boot)39 }40 pub(crate) fn should_activate(&self) -> bool {41 matches!(self, Self::Switch | Self::Test)42 }43 pub(crate) fn should_schedule_rollback_run(&self) -> bool {44 matches!(self, Self::Switch | Self::Test)45 }46}4748enum PackageAction {49 SdImage,50 InstallationCd,51}52impl PackageAction {53 fn build_attr(&self) -> String {54 match self {55 PackageAction::SdImage => "sdImage".to_owned(),56 PackageAction::InstallationCd => "installationCd".to_owned(),57 }58 }59}6061enum Action {62 Upload { action: Option<UploadAction> },63 Package(PackageAction),64}65impl Action {66 fn build_attr(&self) -> String {67 match self {68 Action::Upload { .. } => "toplevel".to_owned(),69 Action::Package(p) => p.build_attr(),70 }71 }72}7374impl From<Subcommand> for Action {75 fn from(s: Subcommand) -> Self {76 match s {77 Subcommand::Upload => Self::Upload { action: None },78 Subcommand::Test => Self::Upload {79 action: Some(UploadAction::Test),80 },81 Subcommand::Boot => Self::Upload {82 action: Some(UploadAction::Boot),83 },84 Subcommand::Switch => Self::Upload {85 action: Some(UploadAction::Switch),86 },87 Subcommand::SdImage => Self::Package(PackageAction::SdImage),88 Subcommand::InstallationCd => Self::Package(PackageAction::InstallationCd),89 }90 }91}9293#[derive(Parser, Clone)]94enum Subcommand {95 /// Upload, but do not switch96 Upload,97 /// Upload + switch to built system until reboot98 Test,99 /// Upload + switch to built system after reboot100 Boot,101 /// Upload + test + boot102 Switch,103104 /// Build SD .img image105 SdImage,106 /// Build an installation cd ISO image107 InstallationCd,108}109110struct Generation {111 id: u32,112 current: bool,113 datetime: String,114}115async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {116 let mut cmd = MyCommand::new("nix-env");117 cmd.comparg("--profile", "/nix/var/nix/profiles/system")118 .arg("--list-generations");119 // Sudo is required due to --list-generations acquiring lock on the profile.120 let data = config.run_string_on(host, cmd, true).await?;121 let generations = data122 .split('\n')123 .map(|e| e.trim())124 .filter(|&l| !l.is_empty())125 .filter_map(|g| {126 let gen: Option<Generation> = try {127 let mut parts = g.split_whitespace();128 let id = parts.next()?;129 let id: u32 = id.parse().ok()?;130 let date = parts.next()?;131 let time = parts.next()?;132 let current = if let Some(current) = parts.next() {133 if current == "(current)" {134 Some(true)135 } else {136 None137 }138 } else {139 Some(false)140 };141 let current = current?;142 if parts.next().is_some() {143 warn!("unexpected text after generation: {g}");144 }145 Generation {146 id,147 current,148 datetime: format!("{date} {time}"),149 }150 };151 if gen.is_none() {152 warn!("bad generation: {g}")153 }154 gen155 })156 .collect::<Vec<_>>();157 let current = generations158 .into_iter()159 .filter(|g| g.current)160 .at_most_one()161 .map_err(|_e| anyhow!("bad list-generations output"))?162 .ok_or_else(|| anyhow!("failed to find generation"))?;163 Ok(current)164}165166async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {167 let mut cmd = MyCommand::new("systemctl");168 cmd.arg("stop").arg(unit);169 config.run_on(host, cmd, true).await170}171172async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {173 let mut cmd = MyCommand::new("systemctl");174 cmd.arg("start").arg(unit);175 config.run_on(host, cmd, true).await176}177178async fn execute_upload(179 build: &BuildSystems,180 config: &Config,181 action: UploadAction,182 host: &str,183 built: PathBuf,184) -> Result<()> {185 let mut failed = false;186 // TODO: Lockfile, to prevent concurrent system switch?187 // TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback188 // is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to189 // unit name conflict in systemd-run190 // This code is tied to rollback.nix191 if !build.disable_rollback {192 let _span = info_span!("preparing").entered();193 info!("preparing for rollback");194 let generation = get_current_generation(config, host).await?;195 info!(196 "rollback target would be {} {}",197 generation.id, generation.datetime198 );199 {200 let mut cmd = MyCommand::new("sh");201 cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));202 if let Err(e) = config.run_on(host, cmd, true).await {203 error!("failed to set rollback marker: {e}");204 failed = true;205 }206 }207 // Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.208 // Kicking it on manually will work best.209 //210 // There wouldn't be conflict, because here we trigger start of the primary service, and systemd will211 // only allow one instance of it.212213 // TODO: We should also watch how this process is going.214 // After running this command, we have less than 3 minutes to deploy everything,215 // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.216 // Anyway, reboot will still help in this case.217 if action.should_schedule_rollback_run() {218 let mut cmd = MyCommand::new("systemd-run");219 cmd.comparg("--on-active", "3min")220 .comparg("--unit", "rollback-watchdog-run")221 .arg("systemctl")222 .arg("start")223 .arg("rollback-watchdog.service");224 if let Err(e) = config.run_on(host, cmd, true).await {225 error!("failed to schedule rollback run: {e}");226 failed = true;227 }228 }229 }230 if action.should_switch_profile() && !failed {231 info!("switching generation");232 let mut cmd = MyCommand::new("nix-env");233 cmd.comparg("--profile", "/nix/var/nix/profiles/system")234 .comparg("--set", &built);235 if let Err(e) = config.run_on(host, cmd, true).await {236 error!("failed to switch generation: {e}");237 failed = true;238 }239 }240 if action.should_activate() && !failed {241 let _span = info_span!("activating").entered();242 info!("executing activation script");243 let mut switch_script = built.clone();244 switch_script.push("bin");245 switch_script.push("switch-to-configuration");246 let mut cmd = MyCommand::new(switch_script);247 cmd.arg(action.name());248 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {249 error!("failed to activate: {e}");250 failed = true;251 }252 }253 if !build.disable_rollback {254 if failed {255 info!("executing rollback");256 if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")257 .instrument(info_span!("rollback"))258 .await259 {260 error!("failed to trigger rollback: {e}")261 }262 } else {263 info!("trying to mark upgrade as successful");264 let mut cmd = MyCommand::new("rm");265 cmd.arg("-f").arg("/etc/fleet_rollback_marker");266 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {267 error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")268 }269 }270 info!("disarming watchdog, just in case");271 if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {272 // It is ok, if there was no reboot - then timer might not be running.273 }274 if action.should_schedule_rollback_run() {275 if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {276 error!("failed to disarm rollback run: {e}");277 }278 }279 } else {280 let mut cmd = MyCommand::new("rm");281 cmd.arg("-f").arg("/etc/fleet_rollback_marker");282 if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {283 // Marker might not exist, yet better try to remove it.284 }285 }286 Ok(())287}288289impl BuildSystems {290 async fn build_task(self, config: Config, host: String) -> Result<()> {291 info!("building");292 let action = Action::from(self.subcommand.clone());293 let fleet_field = &config.fleet_field;294 let drv = nix_go!(fleet_field.buildSystems(Obj {295 localSystem: { config.local_system.clone() }296 }));297 let outputs = drv.build().await.map_err(|e| {298 if action.build_attr() == "sdImage" {299 info!("sd-image build failed");300 info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");301 }302 e303 })?;304 let out_output = outputs305 .get("out")306 .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;307308 match action {309 Action::Upload { action } => {310 if !config.is_local(&host) {311 info!("uploading system closure");312 {313 // Alternatively, nix store make-content-addressed can be used,314 // at least for the first deployment, to provide trusted store key.315 //316 // It is much slower, yet doesn't require root on the deployer machine.317 let mut sign = MyCommand::new("nix");318 // Private key for host machine is registered in nix-sign.nix319 sign.arg("store")320 .arg("sign")321 .comparg("--key-file", "/etc/nix/private-key")322 .arg("-r")323 .arg(out_output);324 if let Err(e) = sign.sudo().run_nix().await {325 warn!("Failed to sign store paths: {e}");326 };327 }328 let mut tries = 0;329 loop {330 let mut nix = MyCommand::new("nix");331 nix.arg("copy")332 .arg("--substitute-on-destination")333 .comparg("--to", format!("ssh-ng://{host}"))334 .arg(out_output);335 match nix.run_nix().await {336 Ok(()) => break,337 Err(e) if tries < 3 => {338 tries += 1;339 warn!("Copy failure ({}/3): {}", tries, e);340 sleep(Duration::from_millis(5000)).await;341 }342 Err(e) => return Err(e),343 }344 }345 }346 if let Some(action) = action {347 execute_upload(&self, &config, action, &host, out_output.clone()).await?348 }349 }350 Action::Package(PackageAction::SdImage) => {351 let mut out = current_dir()?;352 out.push(format!("sd-image-{}", host));353354 info!("linking sd image to {:?}", out);355 symlink(out_output, out)?;356 }357 Action::Package(PackageAction::InstallationCd) => {358 let mut out = current_dir()?;359 out.push(format!("installation-cd-{}", host));360361 info!("linking iso image to {:?}", out);362 symlink(out_output, out)?;363 }364 };365 Ok(())366 }367368 pub async fn run(self, config: &Config) -> Result<()> {369 let hosts = config.list_hosts().await?;370 let set = LocalSet::new();371 let this = &self;372 for host in hosts.into_iter() {373 if config.should_skip(&host.name) {374 continue;375 }376 let config = config.clone();377 let this = this.clone();378 let span = info_span!("deployment", host = field::display(&host.name));379 let hostname = host.name;380 set.spawn_local(381 (async move {382 match this.build_task(config, hostname).await {383 Ok(_) => {}384 Err(e) => {385 error!("failed to deploy host: {}", e)386 }387 }388 })389 .instrument(span),390 );391 }392 set.await;393 Ok(())394 }395}cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,4 +1,5 @@
use crate::{
+ command::MyCommand,
fleetdata::{FleetSecret, FleetSharedSecret},
host::Config,
nix_go, nix_go_json,
@@ -12,6 +13,7 @@
collections::HashSet,
io::{self, Cursor, Read},
path::PathBuf,
+ sync::Arc,
};
use tabled::{Table, Tabled};
use tokio::fs::read_to_string;
@@ -97,8 +99,9 @@
Secret::InvokeGenerator => {
let config_field = &config.config_unchecked_field;
- let generate_impure =
- nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);
+ let secret =
+ nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);
+ let generate_impure = nix_go!(secret.generateImpure);
let on = nix_go!(generate_impure.on);
let call_package = nix_go!(
config_field.buildableSystems(Obj {
@@ -106,13 +109,62 @@
})[on]
.config
.nixpkgs
- .pkgs
+ .resolvedPkgs
.callPackage
);
- let generator = nix_go!(call_package(generate_impure.generator));
- let built = generator.build().await?;
- // .as_json().await?;
- dbg!(&built);
+ let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));
+ let built = &generator.build().await?["out"];
+ let mut nix = MyCommand::new("nix");
+ let on: String = on.as_json().await?;
+ nix.arg("copy")
+ .arg("--substitute-on-destination")
+ .comparg("--to", format!("ssh-ng://{on}"))
+ .arg(built);
+ nix.run_nix().await?;
+
+ let session = config.host(&on).await?;
+
+ let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
+ dbg!(&owners);
+
+ let mut recipients = String::new();
+ for owner in owners {
+ let key = config.key(&owner).await?;
+ recipients.push_str(&format!("-r \"{key}\" "));
+ }
+ recipients.push_str("-e");
+
+ // FIXME: security: created directory might be accessible to other users
+ // This shouldn't be much of a concern, as data is encrypted right after creation, yet
+ // still better to have.
+ let tempdir = session.mktemp_dir().await?;
+
+ let mut gen = session.cmd(built).await?;
+ gen.env("rageArgs", recipients).env("out", &tempdir);
+ gen.run().await?;
+
+ {
+ let marker = session.read_file_text(format!("{tempdir}/marker")).await?;
+ ensure!(marker == "SUCCESS", "generation not succeeded");
+ }
+
+ let public = session
+ .read_file_bin(format!("{tempdir}/public"))
+ .await
+ .ok();
+ let secret = session
+ .read_file_bin(format!("{tempdir}/secret"))
+ .await
+ .ok();
+ if let Some(secret) = &secret {
+ ensure!(
+ age::Decryptor::new(Cursor::new(&secret)).is_ok(),
+ "builder produced non-encrypted value as secret, this is highly insecure"
+ );
+ }
+ dbg!(&secret);
+ // // .as_json().await?;
+ // dbg!(&built);
}
Secret::ForceKeys => {
for host in config.list_hosts().await? {
@@ -249,7 +301,8 @@
if secret.secret.is_empty() {
bail!("no secret {name}");
}
- let data = config.decrypt_on_host(&machine, secret.secret).await?;
+ let host = config.host(&machine).await?;
+ let data = host.decrypt(secret.secret).await?;
if plaintext {
let s = String::from_utf8(data).context("output is not utf8")?;
print!("{s}");
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
ffi::OsStr,
+ pin,
process::Stdio,
sync::{Arc, Mutex},
task::Poll,
@@ -10,7 +11,7 @@
use futures::StreamExt;
use itertools::Either;
use once_cell::sync::Lazy;
-use openssh::{OverSsh, Session};
+use openssh::{OverSsh, OwningCommand, Session};
use regex::Regex;
use serde::{de::Visitor, Deserialize};
use tokio::{io::AsyncRead, process::Command, select};
@@ -44,6 +45,15 @@
ssh_session: Option<Arc<Session>>,
}
impl MyCommand {
+ pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {
+ assert!(!cmd.as_ref().is_empty());
+ Self {
+ command: ostoutf8(cmd),
+ args: vec![],
+ env: vec![],
+ ssh_session: Some(session),
+ }
+ }
pub fn new(cmd: impl AsRef<OsStr>) -> Self {
assert!(!cmd.as_ref().is_empty());
Self {
@@ -66,6 +76,29 @@
out.extend(self.args);
out
}
+
+ /// Translates environment variables into env command execution.
+ /// Required for ssh, as ssh don't allow to send environment variables (at least by default).
+ ///
+ /// FIXME: Insecure, as arguments might be seen by other users on the same machine.
+ /// Figure out some way to transfer environment using stdio?
+ fn translate_env_into_env(self) -> Self {
+ if self.env.is_empty() {
+ return self;
+ }
+ let mut out = Self::new("env");
+ if let Some(session) = self.ssh_session {
+ out = out.ssh_session(session);
+ }
+ for (k, v) in self.env {
+ assert!(!k.contains('='));
+ out.arg(format!("{k}={v}"));
+ }
+ out.arg(self.command);
+ out.args(self.args);
+
+ out
+ }
fn into_string(self) -> String {
let mut out = String::new();
if !self.env.is_empty() {
@@ -98,7 +131,7 @@
}
fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
Ok(if let Some(session) = self.ssh_session.clone() {
- let cmd = self.into_command();
+ let cmd = self.translate_env_into_env().into_command();
Either::Right(
cmd.over_ssh(session)
.map_err(|e| anyhow!("ssh error: {e}"))?,
@@ -126,6 +159,11 @@
self.arg(value);
self
}
+ pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+ self.env
+ .push((name.as_ref().to_owned(), value.as_ref().to_owned()));
+ self
+ }
pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
for arg in args.into_iter() {
let arg = arg.as_ref();
@@ -133,9 +171,10 @@
}
self
}
- pub fn sudo(self) -> Self {
+ pub fn sudo(mut self) -> Self {
if std::env::var_os("NO_SUDO").is_some() {
let mut out = Self::new("su");
+ out.ssh_session = self.ssh_session.take();
out.arg("-c").arg(self.into_string());
out
} else {
@@ -144,27 +183,38 @@
out
}
}
- pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
+ pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
+ self.ssh_session = Some(on);
+ self
+ }
+ pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
let mut out = Self::new("ssh");
+ out.ssh_session = self.ssh_session.take();
out.arg(on).arg("--");
out.arg(self.into_string());
out
}
- pub fn over_ssh(mut self, session: Arc<Session>) -> Self {
- self.ssh_session = Some(session);
- self
- }
pub async fn run(self) -> Result<()> {
let str = self.clone().into_string();
- let cmd = self.into_command();
- run_nix_inner(str, cmd, &mut PlainHandler).await?;
+ let cmd = self.into_command_new()?;
+ match cmd {
+ Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,
+ Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,
+ };
Ok(())
}
pub async fn run_string(self) -> Result<String> {
+ let bytes = self.run_bytes().await?;
+ Ok(String::from_utf8(bytes)?)
+ }
+ pub async fn run_bytes(self) -> Result<Vec<u8>> {
let str = self.clone().into_string();
- let cmd = self.into_command();
- let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;
+ let cmd = self.into_command_new()?;
+ let v = match cmd {
+ Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,
+ Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,
+ };
Ok(v)
}
@@ -172,7 +222,8 @@
let str = self.clone().into_string();
let mut cmd = self.into_command();
cmd.arg("--log-format").arg("internal-json");
- run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await
+ let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;
+ Ok(String::from_utf8(bytes)?)
}
pub async fn run_nix(self) -> Result<()> {
let str = self.clone().into_string();
@@ -198,7 +249,7 @@
str: String,
cmd: Command,
handler: &mut dyn Handler,
-) -> Result<String> {
+) -> Result<Vec<u8>> {
Ok(run_nix_inner_raw(str, cmd, true, handler, None)
.await?
.expect("has out"))
@@ -208,6 +259,24 @@
assert!(v.is_none());
Ok(())
}
+async fn run_nix_inner_stdout_ssh(
+ str: String,
+ cmd: OwningCommand<Arc<Session>>,
+ handler: &mut dyn Handler,
+) -> Result<Vec<u8>> {
+ Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)
+ .await?
+ .expect("has out"))
+}
+async fn run_nix_inner_ssh(
+ str: String,
+ cmd: OwningCommand<Arc<Session>>,
+ handler: &mut dyn Handler,
+) -> Result<()> {
+ let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
+ assert!(v.is_none());
+ Ok(())
+}
pub trait Handler: Send {
fn handle_line(&mut self, e: &str);
@@ -468,7 +537,7 @@
want_stdout: bool,
err_handler: &mut dyn Handler,
mut out_handler: Option<&mut dyn Handler>,
-) -> Result<Option<String>> {
+) -> Result<Option<Vec<u8>>> {
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
let mut child = cmd.spawn()?;
@@ -522,7 +591,71 @@
}
}
- Ok(out_buf.map(String::from_utf8).transpose()?)
+ Ok(out_buf)
+}
+async fn run_nix_inner_raw_ssh(
+ str: String,
+ mut cmd: OwningCommand<Arc<Session>>,
+ want_stdout: bool,
+ err_handler: &mut dyn Handler,
+ mut out_handler: Option<&mut dyn Handler>,
+) -> Result<Option<Vec<u8>>> {
+ cmd.stderr(openssh::Stdio::piped());
+ cmd.stdout(openssh::Stdio::piped());
+ let mut child = cmd.spawn().await?;
+ let mut stderr = child.stderr().take().unwrap();
+ let stdout = child.stdout().take().unwrap();
+ let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+ let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
+ let mut ob = want_stdout
+ .then(|| out.take().unwrap())
+ .unwrap_or_else(|| Box::new(EmptyAsyncRead));
+ let mut ol = (!want_stdout)
+ .then(|| out.take().unwrap())
+ .unwrap_or_else(|| Box::new(EmptyAsyncRead));
+ let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
+ let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
+
+ // while let Some(line) = read.next().await? {}
+
+ let mut out_buf = if want_stdout { Some(vec![]) } else { None };
+
+ let mut wait_future = pin::pin!(child.wait());
+ loop {
+ select! {
+ e = err.next() => {
+ if let Some(e) = e {
+ let e = e?;
+ err_handler.handle_line(&e);
+ }
+ },
+ o = ob.next() => {
+ if let Some(o) = o {
+ out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+ }
+ },
+ o = ol.next() => {
+ if let Some(o) = o {
+ let o = o?;
+ if let Some(out) = out_handler.as_mut() {
+ out.handle_line(&o)
+ } else {
+ err_handler.handle_line(&o)
+ }
+ // out_handler.handle_info(&o);
+ }
+ },
+ code = &mut wait_future => {
+ let code = code?;
+ if !code.success() {
+ anyhow::bail!("command '{str}' failed with status {}", code);
+ }
+ break;
+ }
+ }
+ }
+
+ Ok(out_buf)
}
pub trait ErrorRecorder: Send {
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,10 +1,10 @@
use std::{
env::current_dir,
- ffi::OsString,
+ ffi::{OsStr, OsString},
io::Write,
ops::Deref,
path::PathBuf,
- sync::{Arc, Mutex, MutexGuard},
+ sync::{Arc, Mutex, MutexGuard, OnceLock},
};
use anyhow::{anyhow, bail, Context, Result};
@@ -46,16 +46,55 @@
pub struct ConfigHost {
pub name: String,
+ pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- async fn open_session(&self) -> Result<openssh::Session> {
- let mut session = SessionBuilder::default();
+ pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ // FIXME: TOCTOU
+ if let Some(session) = &self.session.get() {
+ return Ok((*session).clone());
+ };
+ let session = SessionBuilder::default();
- session
+ let session = session
.connect(&self.name)
.await
- .map_err(|e| anyhow!("ssh error: {e}"))
+ .map_err(|e| anyhow!("ssh error: {e}"))?;
+ let session = Arc::new(session);
+ self.session.set(session.clone()).expect("TOCTOU happened");
+ Ok(session)
+ }
+ pub async fn mktemp_dir(&self) -> Result<String> {
+ let mut cmd = self.cmd("mktemp").await?;
+ cmd.arg("-d");
+ let path = cmd.run_string().await?;
+ Ok(path.trim_end().to_owned())
}
+ pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+ let mut cmd = self.cmd("cat").await?;
+ cmd.arg(path);
+ cmd.run_bytes().await
+ }
+ pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+ let mut cmd = self.cmd("cat").await?;
+ cmd.arg(path);
+ cmd.run_string().await
+ }
+ pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
+
+ pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
}
impl Config {
@@ -96,12 +135,21 @@
command.run_string().await
}
+ pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+ Ok(ConfigHost {
+ name: name.to_owned(),
+ session: OnceLock::new(),
+ })
+ }
pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
let fleet_field = &self.fleet_field;
let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
let mut out = vec![];
for name in names {
- out.push(ConfigHost { name })
+ out.push(ConfigHost {
+ name,
+ session: OnceLock::new(),
+ })
}
Ok(out)
}
@@ -152,19 +200,6 @@
host_secrets.insert(secret, value);
}
- pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {
- let data = z85::encode(&data);
- let mut cmd = MyCommand::new("fleet-install-secrets");
- cmd.arg("decrypt").eqarg("--secret", data);
- cmd = cmd.sudo().ssh(host);
- let encoded = cmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- z85::decode(encoded).context("bad encoded data? outdated host?")
- }
pub async fn reencrypt_on_host(
&self,
host: &str,
nixos/meta.nixdiffbeforeafterboth--- a/nixos/meta.nix
+++ b/nixos/meta.nix
@@ -1,11 +1,18 @@
-{ lib, ... }:
-with lib;
{
+ lib,
+ pkgs,
+ ...
+}:
+with lib; {
options = with types; {
+ nixpkgs.resolvedPkgs = mkOption {
+ type = types.pkgs // {description = "nixpkgs.pkgs";};
+ description = "Value of pkgs";
+ };
tags = mkOption {
type = listOf str;
description = "Host tags";
- default = [ ];
+ default = [];
};
network = mkOption {
type = submodule {
@@ -13,12 +20,12 @@
internalIps = mkOption {
type = listOf str;
description = "Internal ips";
- default = [ ];
+ default = [];
};
externalIps = mkOption {
type = listOf str;
description = "External ips";
- default = [ ];
+ default = [];
};
};
};
@@ -29,7 +36,8 @@
};
};
config = {
- tags = [ "all" ];
- network = { };
+ tags = ["all"];
+ network = {};
+ nixpkgs.resolvedPkgs = pkgs;
};
}