difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
use std::sync::{Arc, OnceLock};
use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
use futures::StreamExt;
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::codec::{FramedRead, LinesCodec};
use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
@@ -30,6 +32,8 @@
string_wrapping: (String, String),
number_wrapping: (String, String),
+ executing_command: Arc<Mutex<()>>,
+
next_id: u32,
free_list: Vec<u32>,
}
@@ -219,6 +223,8 @@
string_wrapping: Default::default(),
number_wrapping: Default::default(),
+ executing_command: Arc::new(Mutex::new(())),
+
next_id: 0,
free_list: vec![],
};
@@ -331,6 +337,10 @@
expr: impl AsRef<[u8]>,
err_handler: &mut dyn Handler,
) -> Result<String> {
+ // Prevent two commands from being executed in parallel, messing with each other.
+ let _lock = self.executing_command.clone();
+ let _guard = _lock.lock().await;
+
self.send_command(expr).await?;
// It will be echoed
self.send_command(REPL_DELIMITER).await?;
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -112,12 +112,12 @@
current: bool,
datetime: String,
}
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
- let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.arg("--list-generations");
// Sudo is required due to --list-generations acquiring lock on the profile.
- let data = config.run_string_on(host, cmd, true).await?;
+ let data = cmd.sudo().run_string().await?;
let generations = data
.split('\n')
.map(|e| e.trim())
@@ -161,25 +161,12 @@
.map_err(|_e| anyhow!("bad list-generations output"))?
.ok_or_else(|| anyhow!("failed to find generation"))?;
Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("stop").arg(unit);
- config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("start").arg(unit);
- config.run_on(host, cmd, true).await
}
async fn execute_upload(
build: &BuildSystems,
- config: &Config,
action: UploadAction,
- host: &str,
+ host: &ConfigHost,
built: PathBuf,
) -> Result<()> {
let mut failed = false;
@@ -191,15 +178,15 @@
if !build.disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
- let generation = get_current_generation(config, host).await?;
+ let generation = get_current_generation(&host).await?;
info!(
"rollback target would be {} {}",
generation.id, generation.datetime
);
{
- let mut cmd = MyCommand::new("sh");
+ let mut cmd = host.cmd("sh").await?;
cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to set rollback marker: {e}");
failed = true;
}
@@ -215,37 +202,41 @@
// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
// Anyway, reboot will still help in this case.
if action.should_schedule_rollback_run() {
- let mut cmd = MyCommand::new("systemd-run");
+ let mut cmd = host.cmd("systemd-run").await?;
cmd.comparg("--on-active", "3min")
.comparg("--unit", "rollback-watchdog-run")
.arg("systemctl")
.arg("start")
.arg("rollback-watchdog.service");
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to schedule rollback run: {e}");
failed = true;
}
}
}
+
if action.should_switch_profile() && !failed {
info!("switching generation");
- let mut cmd = MyCommand::new("nix-env");
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to switch generation: {e}");
failed = true;
}
}
+
+ // FIXME: Connection might be disconnected after activation run
+
if action.should_activate() && !failed {
let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
- let mut cmd = MyCommand::new(switch_script);
+ let mut cmd = host.cmd(switch_script).await?;
cmd.arg(action.name());
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
@@ -253,7 +244,8 @@
if !build.disable_rollback {
if failed {
info!("executing rollback");
- if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+ if let Err(e) = host
+ .systemctl_start("rollback-watchdog.service")
.instrument(info_span!("rollback"))
.await
{
@@ -261,27 +253,29 @@
}
} else {
info!("trying to mark upgrade as successful");
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
}
}
info!("disarming watchdog, just in case");
- if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+ if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
// It is ok, if there was no reboot - then timer might not be running.
}
if action.should_schedule_rollback_run() {
- if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+ if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
error!("failed to disarm rollback run: {e}");
}
}
- } else {
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
- // Marker might not exist, yet better try to remove it.
- }
+ } else if let Err(_e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
+ // Marker might not exist, yet better try to remove it.
}
Ok(())
}
@@ -289,12 +283,13 @@
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
+ let host = config.host(&host).await?;
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
let drv = nix_go!(
fleet_field.buildSystems(Obj {
localSystem: { config.local_system.clone() }
- })[{ action.build_attr() }][{ host }]
+ })[{ action.build_attr() }][{ &host.name }]
);
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
match action {
Action::Upload { action } => {
- if !config.is_local(&host) {
+ if !config.is_local(&host.name) {
info!("uploading system closure");
{
+ // TODO: Move to remote_derivation method.
// Alternatively, nix store make-content-addressed can be used,
// at least for the first deployment, to provide trusted store key.
//
@@ -329,13 +325,11 @@
}
let mut tries = 0;
loop {
- let mut nix = MyCommand::new("nix");
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{host}"))
- .arg(out_output);
- match nix.run_nix().await {
- Ok(()) => break,
+ match host.remote_derivation(out_output).await {
+ Ok(remote) => {
+ assert!(&remote == out_output, "CA derivations aren't implemented");
+ break;
+ }
Err(e) if tries < 3 => {
tries += 1;
warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, out_output.clone()).await?
+ execute_upload(&self, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host));
+ out.push(format!("sd-image-{}", host.name));
info!("linking sd image to {:?}", out);
symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host));
+ out.push(format!("installation-cd-{}", host.name));
info!("linking iso image to {:?}", out);
symlink(out_output, out)?;
@@ -379,6 +373,17 @@
let this = this.clone();
let span = info_span!("deployment", host = field::display(&host.name));
let hostname = host.name;
+ // FIXME: Since the introduction of better-nix-eval,
+ // due to single repl used for builds, hosts are waiting for each other to build,
+ // instead of building concurrently.
+ //
+ // Open multiple repls?
+ //
+ // Create build batcher, which will behave similar to golangs
+ // WaitGroup, and start executing once all the build tasks are scheduled?
+ // This also allows to cleanup build output, as there will be no longer
+ // "waiting for remote machine" messages in the cases when one package is needed for
+ // multiple hosts.
set.spawn_local(
(async move {
match this.build_task(config, hostname).await {
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
@@ -404,9 +404,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(data) = secret.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, data, target_recipients)
- .await?;
+ let host = config.host(&identity_holder).await?;
+ let encrypted = host.reencrypt(data, target_recipients).await?;
secret.secret.secret = Some(encrypted);
}
@@ -481,9 +480,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(secret) = data.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret, target_recipients)
- .await?;
+ let host = config.host(identity_holder).await?;
+ let encrypted = host.reencrypt(secret, target_recipients).await?;
data.secret.secret = Some(encrypted);
}
cmds/fleet/src/command.rsdiffbeforeafterboth1use std::{2 collections::HashMap,3 ffi::OsStr,4 pin,5 process::Stdio,6 sync::{Arc, Mutex},7 task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, OwningCommand, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25 out.push_str(input);26 return;27 }28 out.push('\'');29 for (i, v) in input.split('\'').enumerate() {30 if i != 0 {31 out.push_str("'\"'\"'");32 }33 out.push_str(v);34 }35 out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38 os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42 command: String,43 args: Vec<String>,44 env: Vec<(String, String)>,45 ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48 pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {49 assert!(!cmd.as_ref().is_empty());50 Self {51 command: ostoutf8(cmd),52 args: vec![],53 env: vec![],54 ssh_session: Some(session),55 }56 }57 pub fn new(cmd: impl AsRef<OsStr>) -> Self {58 assert!(!cmd.as_ref().is_empty());59 Self {60 command: ostoutf8(cmd),61 args: vec![],62 env: vec![],63 ssh_session: None,64 }65 }66 fn into_args(self) -> Vec<String> {67 let mut out = Vec::new();68 if !self.env.is_empty() {69 out.push("env".to_owned());70 for (k, v) in self.env {71 assert!(!k.contains('='));72 out.push(format!("{k}={v}"));73 }74 }75 out.push(self.command);76 out.extend(self.args);77 out78 }7980 /// Translates environment variables into env command execution.81 /// Required for ssh, as ssh don't allow to send environment variables (at least by default).82 ///83 /// FIXME: Insecure, as arguments might be seen by other users on the same machine.84 /// Figure out some way to transfer environment using stdio?85 fn translate_env_into_env(self) -> Self {86 if self.env.is_empty() {87 return self;88 }89 let mut out = Self::new("env");90 if let Some(session) = self.ssh_session {91 out = out.ssh_session(session);92 }93 for (k, v) in self.env {94 assert!(!k.contains('='));95 out.arg(format!("{k}={v}"));96 }97 out.arg(self.command);98 out.args(self.args);99100 out101 }102 fn into_string(self) -> String {103 let mut out = String::new();104 if !self.env.is_empty() {105 out.push_str("env");106 for (k, v) in self.env {107 out.push(' ');108 assert!(!k.contains('='));109 escape_bash(&k, &mut out);110 out.push('=');111 escape_bash(&v, &mut out);112 }113 }114 if !out.is_empty() {115 out.push(' ');116 }117 escape_bash(&self.command, &mut out);118 for arg in self.args {119 out.push(' ');120 escape_bash(&arg, &mut out);121 }122 out123 }124 fn into_command(self) -> Command {125 let mut out = Command::new(self.command);126 out.args(self.args);127 for (k, v) in self.env {128 out.env(k, v);129 }130 out131 }132 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {133 Ok(if let Some(session) = self.ssh_session.clone() {134 let cmd = self.translate_env_into_env().into_command();135 Either::Right(136 cmd.over_ssh(session)137 .map_err(|e| anyhow!("ssh error: {e}"))?,138 )139 } else {140 let cmd = self.into_command();141 Either::Left(cmd)142 })143 }144 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {145 let arg = arg.as_ref();146 self.args.push(ostoutf8(arg));147 self148 }149 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {150 let arg = arg.as_ref();151 let value = value.as_ref();152 let arg = ostoutf8(arg);153 let value = ostoutf8(value);154 self.arg(format!("{arg}={value}"));155 self156 }157 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158 self.arg(arg);159 self.arg(value);160 self161 }162 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {163 self.env164 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));165 self166 }167 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {168 for arg in args.into_iter() {169 let arg = arg.as_ref();170 self.args.push(ostoutf8(arg));171 }172 self173 }174 pub fn sudo(mut self) -> Self {175 if std::env::var_os("NO_SUDO").is_some() {176 let mut out = Self::new("su");177 out.ssh_session = self.ssh_session.take();178 out.arg("-c").arg(self.into_string());179 out180 } else {181 let mut out = Self::new("sudo");182 out.args(self.into_args());183 out184 }185 }186 pub fn ssh_session(mut self, on: Arc<Session>) -> Self {187 self.ssh_session = Some(on);188 self189 }190 pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {191 let mut out = Self::new("ssh");192 out.ssh_session = self.ssh_session.take();193 out.arg(on).arg("--");194 out.arg(self.into_string());195 out196 }197198 pub async fn run(self) -> Result<()> {199 let str = self.clone().into_string();200 let cmd = self.into_command_new()?;201 match cmd {202 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,203 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,204 };205 Ok(())206 }207 pub async fn run_string(self) -> Result<String> {208 let bytes = self.run_bytes().await?;209 Ok(String::from_utf8(bytes)?)210 }211 pub async fn run_bytes(self) -> Result<Vec<u8>> {212 let str = self.clone().into_string();213 let cmd = self.into_command_new()?;214 let v = match cmd {215 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,216 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,217 };218 Ok(v)219 }220221 pub async fn run_nix_string(self) -> Result<String> {222 let str = self.clone().into_string();223 let mut cmd = self.into_command();224 cmd.arg("--log-format").arg("internal-json");225 let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;226 Ok(String::from_utf8(bytes)?)227 }228 pub async fn run_nix(self) -> Result<()> {229 let str = self.clone().into_string();230 let mut cmd = self.into_command();231 cmd.arg("--log-format").arg("internal-json");232 cmd.stdout(Stdio::inherit());233 run_nix_inner(str, cmd, &mut NixHandler::default()).await234 }235}236237struct EmptyAsyncRead;238impl AsyncRead for EmptyAsyncRead {239 fn poll_read(240 self: std::pin::Pin<&mut Self>,241 _cx: &mut std::task::Context<'_>,242 _buf: &mut tokio::io::ReadBuf<'_>,243 ) -> Poll<std::io::Result<()>> {244 Poll::Pending245 }246}247248async fn run_nix_inner_stdout(249 str: String,250 cmd: Command,251 handler: &mut dyn Handler,252) -> Result<Vec<u8>> {253 Ok(run_nix_inner_raw(str, cmd, true, handler, None)254 .await?255 .expect("has out"))256}257async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {258 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;259 assert!(v.is_none());260 Ok(())261}262async fn run_nix_inner_stdout_ssh(263 str: String,264 cmd: OwningCommand<Arc<Session>>,265 handler: &mut dyn Handler,266) -> Result<Vec<u8>> {267 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)268 .await?269 .expect("has out"))270}271async fn run_nix_inner_ssh(272 str: String,273 cmd: OwningCommand<Arc<Session>>,274 handler: &mut dyn Handler,275) -> Result<()> {276 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;277 assert!(v.is_none());278 Ok(())279}280281pub trait Handler: Send {282 fn handle_line(&mut self, e: &str);283}284285pub struct ClonableHandler<H>(Arc<Mutex<H>>);286impl<H> Clone for ClonableHandler<H> {287 fn clone(&self) -> Self {288 Self(self.0.clone())289 }290}291impl<H> ClonableHandler<H> {292 pub fn new(inner: H) -> Self {293 Self(Arc::new(Mutex::new(inner)))294 }295}296impl<H: Handler> Handler for ClonableHandler<H> {297 fn handle_line(&mut self, e: &str) {298 self.0.lock().unwrap().handle_line(e)299 }300}301302struct PlainHandler;303impl Handler for PlainHandler {304 fn handle_line(&mut self, e: &str) {305 info!(target: "log", "{e}");306 }307}308309pub struct NoopHandler;310impl Handler for NoopHandler {311 fn handle_line(&mut self, _e: &str) {}312}313314#[derive(Default)]315pub struct NixHandler {316 spans: HashMap<u64, Span>,317}318fn process_message(m: &str) -> String {319 static OSC_CLEANER: Lazy<Regex> =320 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());321 static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());322 let m = OSC_CLEANER.replace_all(m, "");323 // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,324 // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.325 DETABBER.replace_all(m.as_ref(), " ").to_string()326}327impl Handler for NixHandler {328 fn handle_line(&mut self, e: &str) {329 if let Some(e) = e.strip_prefix("@nix ") {330 let log: NixLog = match serde_json::from_str(e) {331 Ok(l) => l,332 Err(err) => {333 warn!("failed to parse nix log line {:?}: {}", e, err);334 return;335 }336 };337 match log {338 NixLog::Msg { msg, raw_msg, .. } => {339 #[allow(clippy::nonminimal_bool)]340 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))341 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")342 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {343 if let Some(raw_msg) = raw_msg {344 if !msg.is_empty() {345 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())346 } else {347 info!(target: "nix", "{}", raw_msg.trim_end())348 }349 } else {350 info!(target: "nix", "{}", msg.trim_end())351 }352 }353 }354 NixLog::Start {355 ref fields,356 typ,357 id,358 ..359 } if typ == 105 && !fields.is_empty() => {360 if let [LogField::String(drv), ..] = &fields[..] {361 let mut drv = drv.as_str();362 if let Some(pkg) = drv.strip_prefix("/nix/store/") {363 let mut it = pkg.splitn(2, '-');364 it.next();365 if let Some(pkg) = it.next() {366 drv = pkg;367 }368 }369 info!(target: "nix","building {}", drv);370 let span = info_span!("build", drv);371 span.pb_start();372 self.spans.insert(id, span);373 } else {374 warn!("bad build log: {:?}", log)375 }376 }377 NixLog::Start {378 ref fields,379 typ,380 id,381 ..382 } if typ == 100 && fields.len() >= 3 => {383 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =384 &fields[..]385 {386 let mut drv = drv.as_str();387388 if let Some(pkg) = drv.strip_prefix("/nix/store/") {389 let mut it = pkg.splitn(2, '-');390 it.next();391 if let Some(pkg) = it.next() {392 drv = pkg;393 }394 }395 // info!(target: "nix","copying {} {} -> {}", drv, from, to);396 let span = info_span!("copy", from, to, drv);397 span.pb_start();398 self.spans.insert(id, span);399 } else {400 warn!("bad copy log: {:?}", log)401 }402 }403 NixLog::Start { text, typ, id, .. }404 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>405 {406 if !text.is_empty()407 && text != "querying info about missing paths"408 && text != "copying 0 paths"409 // Too much spam on lazy-trees branch410 && !(text.starts_with("copying '") && text.ends_with("' to the store"))411 {412 let span = info_span!("job");413 span.pb_start();414 span.pb_set_message(&process_message(text.trim()));415 self.spans.insert(id, span);416 info!(target: "nix", "{}", text);417 }418 }419 NixLog::Start {420 text,421 level: 0,422 typ: 108,423 ..424 } if text.is_empty() => {425 // Cache lookup? Coupled with copy log426 }427 NixLog::Start {428 text,429 level: 4,430 typ: 109,431 ..432 } if text.starts_with("querying info about ") => {433 // Cache lookup434 }435 NixLog::Start {436 text,437 level: 4,438 typ: 101,439 ..440 } if text.starts_with("downloading ") => {441 // NAR downloading, coupled with copy log442 }443 NixLog::Start {444 text,445 level: 1,446 typ: 111,447 ..448 } if text.starts_with("waiting for a machine to build ") => {449 // Useless repeating notification about build450 }451 NixLog::Start {452 text,453 level: 3,454 typ: 111,455 ..456 } if text.starts_with("resolved derivation: ") => {457 // CA resolved458 }459 NixLog::Start {460 text,461 level: 1,462 typ: 111,463 id,464 ..465 } if text.starts_with("waiting for lock on ") => {466 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();467 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {468 drv = txt;469 }470 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {471 drv = txt;472 }473 if let Some(txt) = drv.split("', '").next() {474 drv = txt;475 }476 if let Some(pkg) = drv.strip_prefix("/nix/store/") {477 let mut it = pkg.splitn(2, '-');478 it.next();479 if let Some(pkg) = it.next() {480 drv = pkg;481 }482 }483 let span = info_span!("waiting on drv", drv);484 span.pb_start();485 self.spans.insert(id, span);486 // Concurrent build of the same message487 }488 NixLog::Stop { id, .. } => {489 self.spans.remove(&id);490 }491 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {492 if let Some(span) = self.spans.get(&id) {493 if let LogField::String(s) = &fields[0] {494 span.pb_set_message(&process_message(s.trim()));495 } else {496 warn!("bad fields: {fields:?}");497 }498 } else {499 warn!("unknown result id: {id} {typ} {fields:?}");500 }501 // dbg!(fields, id, typ);502 }503 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {504 if let Some(span) = self.spans.get(&id) {505 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =506 &fields[..4]507 {508 span.pb_set_length(*expected);509 span.pb_set_position(*done);510 } else {511 warn!("bad fields: {fields:?}");512 }513 } else {514 // warn!("unknown result id: {id} {typ} {fields:?}");515 // Unaccounted progress.516 }517 // dbg!(fields, id, typ);518 }519 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {520 // Set phase, expected521 }522 _ => warn!("unknown log: {:?}", log),523 };524 } else {525 let e = e.trim();526 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {527 return;528 }529 info!("{e}")530 }531 }532}533534async fn run_nix_inner_raw(535 str: String,536 mut cmd: Command,537 want_stdout: bool,538 err_handler: &mut dyn Handler,539 mut out_handler: Option<&mut dyn Handler>,540) -> Result<Option<Vec<u8>>> {541 cmd.stderr(Stdio::piped());542 cmd.stdout(Stdio::piped());543 let mut child = cmd.spawn()?;544 let mut stderr = child.stderr.take().unwrap();545 let stdout = child.stdout.take().unwrap();546 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());547 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));548 let mut ob = want_stdout549 .then(|| out.take().unwrap())550 .unwrap_or_else(|| Box::new(EmptyAsyncRead));551 let mut ol = (!want_stdout)552 .then(|| out.take().unwrap())553 .unwrap_or_else(|| Box::new(EmptyAsyncRead));554 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());555 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());556557 // while let Some(line) = read.next().await? {}558559 let mut out_buf = if want_stdout { Some(vec![]) } else { None };560 loop {561 select! {562 e = err.next() => {563 if let Some(e) = e {564 let e = e?;565 err_handler.handle_line(&e);566 }567 },568 o = ob.next() => {569 if let Some(o) = o {570 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);571 }572 },573 o = ol.next() => {574 if let Some(o) = o {575 let o = o?;576 if let Some(out) = out_handler.as_mut() {577 out.handle_line(&o)578 } else {579 err_handler.handle_line(&o)580 }581 // out_handler.handle_info(&o);582 }583 },584 code = child.wait() => {585 let code = code?;586 if !code.success() {587 anyhow::bail!("command '{str}' failed with status {}", code);588 }589 break;590 }591 }592 }593594 Ok(out_buf)595}596async fn run_nix_inner_raw_ssh(597 str: String,598 mut cmd: OwningCommand<Arc<Session>>,599 want_stdout: bool,600 err_handler: &mut dyn Handler,601 mut out_handler: Option<&mut dyn Handler>,602) -> Result<Option<Vec<u8>>> {603 cmd.stderr(openssh::Stdio::piped());604 cmd.stdout(openssh::Stdio::piped());605 let mut child = cmd.spawn().await?;606 let mut stderr = child.stderr().take().unwrap();607 let stdout = child.stdout().take().unwrap();608 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());609 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));610 let mut ob = want_stdout611 .then(|| out.take().unwrap())612 .unwrap_or_else(|| Box::new(EmptyAsyncRead));613 let mut ol = (!want_stdout)614 .then(|| out.take().unwrap())615 .unwrap_or_else(|| Box::new(EmptyAsyncRead));616 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());617 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());618619 // while let Some(line) = read.next().await? {}620621 let mut out_buf = if want_stdout { Some(vec![]) } else { None };622623 let mut wait_future = pin::pin!(child.wait());624 loop {625 select! {626 e = err.next() => {627 if let Some(e) = e {628 let e = e?;629 err_handler.handle_line(&e);630 }631 },632 o = ob.next() => {633 if let Some(o) = o {634 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);635 }636 },637 o = ol.next() => {638 if let Some(o) = o {639 let o = o?;640 if let Some(out) = out_handler.as_mut() {641 out.handle_line(&o)642 } else {643 err_handler.handle_line(&o)644 }645 // out_handler.handle_info(&o);646 }647 },648 code = &mut wait_future => {649 let code = code?;650 if !code.success() {651 anyhow::bail!("command '{str}' failed with status {}", code);652 }653 break;654 }655 }656 }657658 Ok(out_buf)659}660661pub trait ErrorRecorder: Send {662 /// Return true to discard message from logging663 fn push_message(&mut self, msg: &str) -> bool;664}665666#[derive(Debug)]667enum LogField {668 String(String),669 Num(u64),670}671672impl<'de> Deserialize<'de> for LogField {673 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>674 where675 D: serde::Deserializer<'de>,676 {677 struct StringOrNum;678 impl<'de> Visitor<'de> for StringOrNum {679 type Value = LogField;680681 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {682 write!(f, "string or unsigned")683 }684685 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>686 where687 E: serde::de::Error,688 {689 Ok(LogField::String(v.to_owned()))690 }691692 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>693 where694 E: serde::de::Error,695 {696 Ok(LogField::Num(v))697 }698 }699700 deserializer.deserialize_any(StringOrNum)701 }702}703704#[derive(Deserialize, Debug)]705#[serde(rename_all = "camelCase", tag = "action")]706#[allow(dead_code)]707enum NixLog {708 Msg {709 level: u32,710 msg: String,711 raw_msg: Option<String>,712 },713 Start {714 id: u64,715 level: u32,716 #[serde(default)]717 fields: Vec<LogField>,718 text: String,719 #[serde(rename = "type")]720 typ: u32,721 },722 Stop {723 id: u64,724 },725 Result {726 id: u64,727 #[serde(rename = "type")]728 typ: u32,729 #[serde(default)]730 fields: Vec<LogField>,731 },732}cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
sync::{Arc, Mutex, MutexGuard, OnceLock},
};
-use age::Recipient;
use anyhow::{anyhow, bail, Context, Result};
use clap::{ArgGroup, Parser};
use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
pub struct ConfigHost {
pub name: String,
+ pub local: bool,
pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ assert!(!self.local, "do not open ssh connection to local session");
// FIXME: TOCTOU
if let Some(session) = &self.session.get() {
return Ok((*session).clone());
@@ -96,8 +97,12 @@
D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
}
pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
- let session = self.open_session().await?;
- Ok(MyCommand::new_on(cmd, session))
+ if self.local {
+ Ok(MyCommand::new(cmd))
+ } else {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
}
pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
.context("failed to call remote host for decrypt")?;
z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
}
+ pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+ for target in targets {
+ cmd.eqarg("--targets", target);
+ }
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
/// Returns path for futureproofing, as path might change i.e on conversion to CA
pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ if self.local {
+ // Path is located locally, thus already trusted.
+ return Ok(path.to_owned());
+ }
let mut nix = MyCommand::new("nix");
nix.arg("copy")
.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
nix.run_nix().await?;
Ok(path.to_owned())
}
+ pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("stop").arg(name);
+ cmd.sudo().run().await
+ }
+ pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("start").arg(name);
+ cmd.sudo().run().await
+ }
+
+ pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+ let mut cmd = self.cmd("rm").await?;
+ cmd.arg("-f").arg(path);
+ if sudo {
+ cmd = cmd.sudo()
+ }
+ cmd.run().await
+ }
}
impl Config {
@@ -134,35 +175,12 @@
}
pub fn is_local(&self, host: &str) -> bool {
self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
- }
-
- pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run().await
- }
- pub async fn run_string_on(
- &self,
- host: &str,
- mut command: MyCommand,
- sudo: bool,
- ) -> Result<String> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run_string().await
}
pub async fn host(&self, name: &str) -> Result<ConfigHost> {
Ok(ConfigHost {
name: name.to_owned(),
+ local: self.is_local(name),
session: OnceLock::new(),
})
}
@@ -172,6 +190,7 @@
let mut out = vec![];
for name in names {
out.push(ConfigHost {
+ local: self.is_local(&name),
name,
session: OnceLock::new(),
})
@@ -225,27 +244,6 @@
let mut data = self.data_mut();
let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
host_secrets.insert(secret, value);
- }
-
- pub async fn reencrypt_on_host(
- &self,
- host: &str,
- data: SecretData,
- targets: Vec<String>,
- ) -> Result<SecretData> {
- let mut recmd = MyCommand::new("fleet-install-secrets");
- recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
- for target in targets {
- recmd.eqarg("--targets", target);
- }
- recmd = recmd.sudo().ssh(host);
- let encoded = recmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- SecretData::decode_z85(&encoded)
}
pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}