difftreelog
refactor prepare for decoupling fleet-cli from fleet-data-storage
in: trunk
26 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -215,23 +215,98 @@
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
+name = "async-stream"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.76",
+]
+
+[[package]]
name = "async-trait"
-version = "0.1.80"
+version = "0.1.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca"
+checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
+name = "atomic-waker"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
+
+[[package]]
name = "autocfg"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
+name = "axum"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
+dependencies = [
+ "async-trait",
+ "axum-core",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "http-body-util",
+ "itoa",
+ "matchit",
+ "memchr",
+ "mime",
+ "percent-encoding",
+ "pin-project-lite",
+ "rustversion",
+ "serde",
+ "sync_wrapper 1.0.1",
+ "tower",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "axum-core"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "http-body-util",
+ "mime",
+ "pin-project-lite",
+ "rustversion",
+ "sync_wrapper 0.1.2",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
name = "backtrace"
version = "0.3.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -482,7 +557,7 @@
"heck 0.5.0",
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
@@ -617,7 +692,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
@@ -673,7 +748,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
@@ -762,6 +837,12 @@
]
[[package]]
+name = "fixedbitset"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
+
+[[package]]
name = "fleet"
version = "0.2.0"
dependencies = [
@@ -776,6 +857,7 @@
"clap",
"clap_complete",
"crossterm",
+ "fleet-base",
"fleet-shared",
"futures",
"hostname",
@@ -785,7 +867,7 @@
"nix-eval",
"nixlike",
"nom",
- "openssh",
+ "openssh 0.10.4",
"owo-colors",
"peg",
"regex",
@@ -803,6 +885,31 @@
]
[[package]]
+name = "fleet-base"
+version = "0.1.0"
+dependencies = [
+ "age",
+ "anyhow",
+ "better-command",
+ "chrono",
+ "clap",
+ "fleet-shared",
+ "futures",
+ "hostname",
+ "itertools",
+ "nix-eval",
+ "nixlike",
+ "nom",
+ "openssh 0.11.0",
+ "serde",
+ "serde_json",
+ "tempfile",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
+[[package]]
name = "fleet-generator-helper"
version = "0.1.0"
dependencies = [
@@ -949,7 +1056,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
@@ -1020,6 +1127,25 @@
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
+name = "h2"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
+dependencies = [
+ "atomic-waker",
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "http",
+ "indexmap 2.2.6",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
+[[package]]
name = "hashbrown"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1027,6 +1153,12 @@
[[package]]
name = "hashbrown"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
+
+[[package]]
+name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
@@ -1085,12 +1217,112 @@
]
[[package]]
+name = "http"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
+[[package]]
+name = "http-body"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
+dependencies = [
+ "bytes",
+ "http",
+]
+
+[[package]]
+name = "http-body-util"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "httparse"
+version = "1.9.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9"
+
+[[package]]
+name = "httpdate"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
+
+[[package]]
name = "human-repr"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f58b778a5761513caf593693f8951c97a5b610841e754788400f32102eefdff1"
[[package]]
+name = "hyper"
+version = "1.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "httparse",
+ "httpdate",
+ "itoa",
+ "pin-project-lite",
+ "smallvec",
+ "tokio",
+ "want",
+]
+
+[[package]]
+name = "hyper-timeout"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
+dependencies = [
+ "hyper",
+ "hyper-util",
+ "pin-project-lite",
+ "tokio",
+ "tower-service",
+]
+
+[[package]]
+name = "hyper-util"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "http",
+ "http-body",
+ "hyper",
+ "pin-project-lite",
+ "socket2",
+ "tokio",
+ "tower",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
name = "i18n-config"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1142,7 +1374,7 @@
"proc-macro2",
"quote",
"strsim 0.10.0",
- "syn 2.0.66",
+ "syn 2.0.76",
"unic-langid",
]
@@ -1156,7 +1388,7 @@
"i18n-config",
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
@@ -1184,6 +1416,16 @@
[[package]]
name = "indexmap"
+version = "1.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
+dependencies = [
+ "autocfg",
+ "hashbrown 0.12.3",
+]
+
+[[package]]
+name = "indexmap"
version = "2.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26"
@@ -1303,7 +1545,7 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
dependencies = [
- "spin",
+ "spin 0.5.2",
]
[[package]]
@@ -1366,6 +1608,12 @@
]
[[package]]
+name = "matchit"
+version = "0.7.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
+
+[[package]]
name = "memchr"
version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1390,6 +1638,12 @@
]
[[package]]
+name = "mime"
+version = "0.3.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
+
+[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1417,6 +1671,12 @@
]
[[package]]
+name = "multimap"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
+
+[[package]]
name = "nix"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1588,6 +1848,20 @@
]
[[package]]
+name = "openssh"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0f27389e5da64700a3efb7f925e442f824f6e3d4b1c27f75e115a92ad3aecbb1"
+dependencies = [
+ "libc",
+ "once_cell",
+ "shell-escape",
+ "tempfile",
+ "thiserror",
+ "tokio",
+]
+
+[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1637,6 +1911,12 @@
]
[[package]]
+name = "paste"
+version = "1.0.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
+
+[[package]]
name = "pbkdf2"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1674,6 +1954,32 @@
checksum = "e3aeb8f54c078314c2065ee649a7241f46b9d8e418e1a9581ba0546657d7aa3a"
[[package]]
+name = "pem"
+version = "3.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae"
+dependencies = [
+ "base64 0.22.1",
+ "serde",
+]
+
+[[package]]
+name = "percent-encoding"
+version = "2.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
+
+[[package]]
+name = "petgraph"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
+dependencies = [
+ "fixedbitset",
+ "indexmap 2.2.6",
+]
+
+[[package]]
name = "pin-project"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1690,7 +1996,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
@@ -1774,6 +2080,16 @@
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
+name = "prettyplease"
+version = "0.2.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba"
+dependencies = [
+ "proc-macro2",
+ "syn 2.0.76",
+]
+
+[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1807,6 +2123,59 @@
]
[[package]]
+name = "prost"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc"
+dependencies = [
+ "bytes",
+ "prost-derive",
+]
+
+[[package]]
+name = "prost-build"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1"
+dependencies = [
+ "bytes",
+ "heck 0.5.0",
+ "itertools",
+ "log",
+ "multimap",
+ "once_cell",
+ "petgraph",
+ "prettyplease",
+ "prost",
+ "prost-types",
+ "regex",
+ "syn 2.0.76",
+ "tempfile",
+]
+
+[[package]]
+name = "prost-derive"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca"
+dependencies = [
+ "anyhow",
+ "itertools",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.76",
+]
+
+[[package]]
+name = "prost-types"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2"
+dependencies = [
+ "prost",
+]
+
+[[package]]
name = "quote"
version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1857,6 +2226,19 @@
]
[[package]]
+name = "rcgen"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54077e1872c46788540de1ea3d7f4ccb1983d12f9aa909b234468676c1a36779"
+dependencies = [
+ "pem",
+ "ring",
+ "rustls-pki-types",
+ "time",
+ "yasna",
+]
+
+[[package]]
name = "redox_syscall"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1910,8 +2292,41 @@
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]]
-name = "remowt-fs"
-version = "0.1.0"
+name = "ring"
+version = "0.17.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
+dependencies = [
+ "cc",
+ "cfg-if",
+ "getrandom",
+ "libc",
+ "spin 0.9.8",
+ "untrusted",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "rmp"
+version = "0.8.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4"
+dependencies = [
+ "byteorder",
+ "num-traits",
+ "paste",
+]
+
+[[package]]
+name = "rmp-serde"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db"
+dependencies = [
+ "byteorder",
+ "rmp",
+ "serde",
+]
[[package]]
name = "rnix"
@@ -1989,7 +2404,7 @@
"proc-macro2",
"quote",
"rust-embed-utils",
- "syn 2.0.66",
+ "syn 2.0.76",
"walkdir",
]
@@ -2038,6 +2453,54 @@
]
[[package]]
+name = "rustls"
+version = "0.23.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044"
+dependencies = [
+ "log",
+ "once_cell",
+ "ring",
+ "rustls-pki-types",
+ "rustls-webpki",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
+name = "rustls-pemfile"
+version = "2.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425"
+dependencies = [
+ "base64 0.22.1",
+ "rustls-pki-types",
+]
+
+[[package]]
+name = "rustls-pki-types"
+version = "1.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0"
+
+[[package]]
+name = "rustls-webpki"
+version = "0.102.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56"
+dependencies = [
+ "ring",
+ "rustls-pki-types",
+ "untrusted",
+]
+
+[[package]]
+name = "rustversion"
+version = "1.0.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6"
+
+[[package]]
name = "ryu"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2136,6 +2599,15 @@
]
[[package]]
+name = "serde_bytes"
+version = "0.11.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a"
+dependencies = [
+ "serde",
+]
+
+[[package]]
name = "serde_derive"
version = "1.0.203"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2143,16 +2615,17 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
name = "serde_json"
-version = "1.0.117"
+version = "1.0.127"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3"
+checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad"
dependencies = [
"itoa",
+ "memchr",
"ryu",
"serde",
]
@@ -2279,6 +2752,12 @@
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
+name = "spin"
+version = "0.9.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
+
+[[package]]
name = "spki"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2329,9 +2808,9 @@
[[package]]
name = "syn"
-version = "2.0.66"
+version = "2.0.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5"
+checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525"
dependencies = [
"proc-macro2",
"quote",
@@ -2339,6 +2818,18 @@
]
[[package]]
+name = "sync_wrapper"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
+
+[[package]]
+name = "sync_wrapper"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
+
+[[package]]
name = "tabled"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2385,12 +2876,51 @@
]
[[package]]
+name = "terraform-provider-fleet"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "serde",
+ "tf-provider",
+ "tokio",
+]
+
+[[package]]
name = "text-size"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f18aa187839b2bdb1ad2fa35ead8c4c2976b64e4363c386d45ac0f7ee85c9233"
[[package]]
+name = "tf-provider"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d80ea2e5f9f54717952d199888aab7e607dc99275ec5221f1259ce7a5f55f5a6"
+dependencies = [
+ "anyhow",
+ "async-stream",
+ "async-trait",
+ "base64 0.22.1",
+ "futures",
+ "prost",
+ "rcgen",
+ "rmp-serde",
+ "serde",
+ "serde_bytes",
+ "serde_json",
+ "time",
+ "tokio",
+ "tokio-stream",
+ "tokio-util",
+ "tonic",
+ "tonic-build",
+ "tower-http",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
name = "thiserror"
version = "1.0.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2407,7 +2937,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
@@ -2485,7 +3015,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
@@ -2499,6 +3029,29 @@
]
[[package]]
+name = "tokio-rustls"
+version = "0.26.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
+dependencies = [
+ "rustls",
+ "rustls-pki-types",
+ "tokio",
+]
+
+[[package]]
+name = "tokio-stream"
+version = "0.1.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
+dependencies = [
+ "futures-core",
+ "pin-project-lite",
+ "tokio",
+ "tokio-util",
+]
+
+[[package]]
name = "tokio-util"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2547,7 +3100,7 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c127785850e8c20836d49732ae6abfa47616e60bf9d9f57c43c250361a9db96c"
dependencies = [
- "indexmap",
+ "indexmap 2.2.6",
"serde",
"serde_spanned",
"toml_datetime",
@@ -2555,6 +3108,100 @@
]
[[package]]
+name = "tonic"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c6f6ba989e4b2c58ae83d862d3a3e27690b6e3ae630d0deb59f3697f32aa88ad"
+dependencies = [
+ "async-stream",
+ "async-trait",
+ "axum",
+ "base64 0.22.1",
+ "bytes",
+ "h2",
+ "http",
+ "http-body",
+ "http-body-util",
+ "hyper",
+ "hyper-timeout",
+ "hyper-util",
+ "percent-encoding",
+ "pin-project",
+ "prost",
+ "rustls-pemfile",
+ "socket2",
+ "tokio",
+ "tokio-rustls",
+ "tokio-stream",
+ "tower",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "tonic-build"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fe4ee8877250136bd7e3d2331632810a4df4ea5e004656990d8d66d2f5ee8a67"
+dependencies = [
+ "prettyplease",
+ "proc-macro2",
+ "prost-build",
+ "quote",
+ "syn 2.0.76",
+]
+
+[[package]]
+name = "tower"
+version = "0.4.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
+dependencies = [
+ "futures-core",
+ "futures-util",
+ "indexmap 1.9.3",
+ "pin-project",
+ "pin-project-lite",
+ "rand",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "tower-http"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5"
+dependencies = [
+ "bitflags",
+ "bytes",
+ "http",
+ "http-body",
+ "http-body-util",
+ "pin-project-lite",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "tower-layer"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
+
+[[package]]
+name = "tower-service"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
+
+[[package]]
name = "tracing"
version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2573,7 +3220,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
[[package]]
@@ -2610,6 +3257,16 @@
]
[[package]]
+name = "tracing-serde"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
+dependencies = [
+ "serde",
+ "tracing-core",
+]
+
+[[package]]
name = "tracing-subscriber"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2619,15 +3276,24 @@
"nu-ansi-term",
"once_cell",
"regex",
+ "serde",
+ "serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
+ "tracing-serde",
]
[[package]]
+name = "try-lock"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
+
+[[package]]
name = "type-map"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2705,6 +3371,12 @@
]
[[package]]
+name = "untrusted"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
+
+[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2766,6 +3438,15 @@
]
[[package]]
+name = "want"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e"
+dependencies = [
+ "try-lock",
+]
+
+[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2792,7 +3473,7 @@
"once_cell",
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
"wasm-bindgen-shared",
]
@@ -2814,7 +3495,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -3036,6 +3717,15 @@
]
[[package]]
+name = "yasna"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd"
+dependencies = [
+ "time",
+]
+
+[[package]]
name = "z85"
version = "3.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3058,5 +3748,5 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.66",
+ "syn 2.0.76",
]
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -45,6 +45,7 @@
indicatif = { version = "0.17", optional = true }
nix-eval.workspace = true
nom = "7.1.3"
+fleet-base = { version = "0.1.0", path = "../../crates/fleet-base" }
[features]
# Not quite stable
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -2,15 +2,14 @@
use anyhow::{anyhow, Result};
use clap::{Parser, ValueEnum};
+use fleet_base::{
+ host::{Config, ConfigHost},
+ opts::FleetOpts,
+};
use itertools::Itertools as _;
use nix_eval::nix_go;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
-
-use crate::{
- command::MyCommand,
- host::{Config, ConfigHost},
-};
#[derive(Parser)]
pub struct Deploy {
@@ -253,7 +252,6 @@
info!("building");
let host = config.host(&host).await?;
// let action = Action::from(self.subcommand.clone());
- let fleet_config = &config.config_field;
let nixos = host.nixos_config().await?;
let drv = nix_go!(nixos.system.build[{ build_attr }]);
let outputs = drv.build().await.inspect_err(|_| {
@@ -270,12 +268,12 @@
}
impl BuildSystems {
- pub async fn run(self, config: &Config) -> Result<()> {
+ pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
let hosts = config.list_hosts().await?;
let set = LocalSet::new();
let build_attr = self.build_attr.clone();
for host in hosts.into_iter() {
- if config.should_skip(&host).await? {
+ if opts.should_skip(&host).await? {
continue;
}
let config = config.clone();
@@ -320,17 +318,18 @@
}
impl Deploy {
- pub async fn run(self, config: &Config) -> Result<()> {
+ pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
let hosts = config.list_hosts().await?;
let set = LocalSet::new();
for host in hosts.into_iter() {
- if config.should_skip(&host).await? {
+ if opts.should_skip(&host).await? {
continue;
}
let config = config.clone();
let span = info_span!("deploy", host = field::display(&host.name));
let hostname = host.name.clone();
let local_host = config.local_host();
+ let opts = opts.clone();
// FIXME: Fix repl concurrency (see build-systems)
set.spawn_local(
(async move {
@@ -342,7 +341,7 @@
return;
}
};
- if !config.is_local(&hostname) {
+ if !opts.is_local(&hostname) {
info!("uploading system closure");
{
// TODO: Move to remote_derivation method.
@@ -387,7 +386,7 @@
self.action,
&host,
built,
- if let Ok(v) = config.action_attr(&host, "specialisation").await {
+ if let Ok(v) = opts.action_attr(&host, "specialisation").await {
v
} else {
error!("unreachable? failed to get specialization");
cmds/fleet/src/cmds/info.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/info.rs
+++ b/cmds/fleet/src/cmds/info.rs
@@ -2,9 +2,8 @@
use anyhow::{ensure, Result};
use clap::Parser;
+use fleet_base::host::Config;
use nix_eval::nix_go_json;
-
-use crate::host::Config;
#[derive(Parser)]
pub struct Info {
@@ -39,8 +38,7 @@
'host: for host in config.list_hosts().await? {
if !tagged.is_empty() {
let config = &config.config_field;
- let tags: Vec<String> =
- nix_go_json!(config.hosts[{ host.name }].tags);
+ let tags: Vec<String> = nix_go_json!(config.hosts[{ host.name }].tags);
for tag in tagged {
if !tags.contains(tag) {
continue 'host;
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,6 +1,5 @@
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
- ffi::OsString,
io::{self, stdin, stdout, Read, Write},
path::PathBuf,
};
@@ -8,21 +7,19 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use crossterm::{terminal, tty::IsTty};
+use fleet_base::{
+ fleetdata::{encrypt_secret_data, FleetSecret, FleetSecretPart, FleetSharedSecret},
+ host::Config,
+ opts::FleetOpts,
+};
use fleet_shared::SecretData;
-use itertools::Itertools;
use nix_eval::{nix_go, nix_go_json, Value};
use owo_colors::OwoColorize;
use serde::Deserialize;
use tabled::{Table, Tabled};
-use tokio::{fs::read, process::Command};
+use tokio::fs::read;
use tracing::{error, info, info_span, warn, Instrument};
-use crate::{
- fleetdata::{encrypt_secret_data, FleetSecret, FleetSecretPart, FleetSharedSecret},
- host::Config,
-};
-
#[derive(Parser)]
pub enum Secret {
/// Force load host keys for all defined hosts
@@ -432,11 +429,11 @@
Ok(target_machines)
}
impl Secret {
- pub async fn run(self, config: &Config) -> Result<()> {
+ pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
match self {
Secret::ForceKeys => {
for host in config.list_hosts().await? {
- if config.should_skip(&host).await? {
+ if opts.should_skip(&host).await? {
continue;
}
config.key(&host.name).await?;
@@ -639,7 +636,7 @@
}
}
for host in config.list_hosts().await? {
- if config.should_skip(&host).await? {
+ if opts.should_skip(&host).await? {
continue;
}
@@ -757,6 +754,7 @@
}
}
+/*
async fn edit_temp_file(
builder: tempfile::Builder<'_, '_>,
r: Vec<u8>,
@@ -835,3 +833,4 @@
// Ok((success, abs_path))
}
+*/
cmds/fleet/src/cmds/tf.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/tf.rs
+++ b/cmds/fleet/src/cmds/tf.rs
@@ -1,22 +1,67 @@
-use anyhow::Result;
+use std::{
+ collections::{BTreeMap, HashMap},
+ path::PathBuf,
+};
+
+use anyhow::{bail, Context, Result};
use clap::Parser;
-use nix_eval::nix_go_json;
+use fleet_base::host::Config;
+use nix_eval::nix_go;
+use serde::Deserialize;
use serde_json::Value;
-use tokio::fs::write;
-use tracing::info;
+use tokio::{fs::copy, process::Command};
-use crate::host::Config;
+#[derive(Deserialize)]
+pub struct TfData {
+ // Dummy
+ #[allow(dead_code)]
+ managed: bool,
+ // Host => Data
+ #[serde(default)]
+ #[serde(skip_serializing_if = "BTreeMap::is_empty")]
+ pub hosts: BTreeMap<String, Value>,
+}
#[derive(Parser)]
-pub struct Tf;
+pub enum Tf {
+ /// Generate fleet.tf.json file for running terraform.
+ Generate,
+ /// Fetch data from terraform to fleet.
+ Refresh,
+}
impl Tf {
pub async fn run(&self, config: &Config) -> Result<()> {
- let system = &config.local_system;
- let config = &config.config_field;
- let data: Value = nix_go_json!(config.tf({ system }).config);
- let str = serde_json::to_string_pretty(&data)?;
+ match self {
+ Tf::Generate => {
+ let system = &config.local_system;
+ let config = &config.config_field;
+ let data: HashMap<String, PathBuf> = nix_go!(config.tf({ system })).build().await?;
+ let data = &data["out"];
+
+ copy(data, "fleet.tf.json").await?;
+ }
+ Tf::Refresh => {
+ let cmd = Command::new("terraform").arg("refresh").status().await?;
+ if !cmd.success() {
+ bail!("terraform refresh failed")
+ }
- write("fleet.tf.json", str.as_bytes()).await?;
+ let data = Command::new("terraform")
+ .arg("output")
+ .arg("-json")
+ .arg("fleet")
+ .output()
+ .await?;
+ let tf_data: TfData = serde_json::from_slice(&data.stdout)
+ .context("failed to parse terraform fleet output")?;
+
+ let mut data = config.data();
+ data.extra.insert(
+ "terraformHosts".to_owned(),
+ serde_json::to_value(tf_data.hosts).expect("should be valid extra"),
+ );
+ }
+ }
Ok(())
}
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ /dev/null
@@ -1,430 +0,0 @@
-use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
-
-use anyhow::{anyhow, Result};
-use better_command::{Handler, NixHandler, PlainHandler};
-use futures::StreamExt;
-use itertools::Either;
-use openssh::{OverSsh, OwningCommand, Session};
-use tokio::{io::AsyncRead, process::Command, select};
-use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::debug;
-
-use crate::host::EscalationStrategy;
-
-fn escape_bash(input: &str, out: &mut String) {
- const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
- if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
- out.push_str(input);
- return;
- }
- out.push('\'');
- for (i, v) in input.split('\'').enumerate() {
- if i != 0 {
- out.push_str("'\"'\"'");
- }
- out.push_str(v);
- }
- out.push('\'');
-}
-fn ostoutf8(os: impl AsRef<OsStr>) -> String {
- os.as_ref().to_str().expect("non-utf8 data").to_owned()
-}
-
-#[derive(Clone, Debug)]
-pub struct MyCommand {
- command: String,
- args: Vec<String>,
- env: Vec<(String, String)>,
- ssh_session: Option<Arc<Session>>,
- escalation: EscalationStrategy,
- escalate: bool,
-}
-impl MyCommand {
- pub fn new_on(
- escalation: EscalationStrategy,
- cmd: impl AsRef<OsStr>,
- session: Arc<Session>,
- ) -> Self {
- assert!(!cmd.as_ref().is_empty());
- Self {
- command: ostoutf8(cmd),
- args: vec![],
- env: vec![],
- ssh_session: Some(session),
- escalation,
- escalate: false,
- }
- }
- pub fn new(escalation: EscalationStrategy, cmd: impl AsRef<OsStr>) -> Self {
- assert!(!cmd.as_ref().is_empty());
- Self {
- command: ostoutf8(cmd),
- args: vec![],
- env: vec![],
- ssh_session: None,
- escalation,
- escalate: false,
- }
- }
- fn new_here(&self, cmd: impl AsRef<OsStr>) -> Self {
- if let Some(ssh_session) = self.ssh_session.clone() {
- Self::new_on(self.escalation, cmd, ssh_session)
- } else {
- Self::new(self.escalation, cmd)
- }
- }
-
- fn into_args(self) -> Vec<String> {
- let mut out = Vec::new();
- if !self.env.is_empty() {
- out.push("env".to_owned());
- for (k, v) in self.env {
- assert!(!k.contains('='));
- out.push(format!("{k}={v}"));
- }
- }
- out.push(self.command);
- out.extend(self.args);
- out
- }
-
- /// Translates environment variables into env command execution.
- /// Required for ssh, as ssh don't allow to send environment variables (at least by default).
- ///
- /// FIXME: Insecure, as arguments might be seen by other users on the same machine.
- /// Figure out some way to transfer environment using stdio?
- fn translate_env_into_env(self) -> Self {
- if self.env.is_empty() {
- return self;
- }
- let mut out = self.new_here("env");
- for (k, v) in self.env {
- assert!(!k.contains('='));
- out.arg(format!("{k}={v}"));
- }
- out.arg(self.command);
- out.args(self.args);
-
- out
- }
- fn into_string(self) -> String {
- let mut out = String::new();
- if !self.env.is_empty() {
- out.push_str("env");
- for (k, v) in self.env {
- out.push(' ');
- assert!(!k.contains('='));
- escape_bash(&k, &mut out);
- out.push('=');
- escape_bash(&v, &mut out);
- }
- }
- if !out.is_empty() {
- out.push(' ');
- }
- escape_bash(&self.command, &mut out);
- for arg in self.args {
- out.push(' ');
- escape_bash(&arg, &mut out);
- }
- out
- }
- fn into_command(self) -> Command {
- let mut out = Command::new(self.command);
- out.args(self.args);
- for (k, v) in self.env {
- out.env(k, v);
- }
- out
- }
- fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
- Ok(if let Some(session) = self.ssh_session.clone() {
- let cmd = self.translate_env_into_env().into_command();
- Either::Right(
- cmd.over_ssh(session)
- .map_err(|e| anyhow!("ssh error: {e}"))?,
- )
- } else {
- let cmd = self.into_command();
- Either::Left(cmd)
- })
- }
- pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {
- let arg = arg.as_ref();
- self.args.push(ostoutf8(arg));
- self
- }
- pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
- let arg = arg.as_ref();
- let value = value.as_ref();
- let arg = ostoutf8(arg);
- let value = ostoutf8(value);
- self.arg(format!("{arg}={value}"));
- self
- }
- pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
- self.arg(arg);
- self.arg(value);
- self
- }
- pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
- self.env
- .push((name.as_ref().to_owned(), value.as_ref().to_owned()));
- self
- }
- pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
- for arg in args.into_iter() {
- let arg = arg.as_ref();
- self.args.push(ostoutf8(arg));
- }
- self
- }
- pub fn sudo(mut self) -> Self {
- self.escalate = true;
- self
- }
- fn wrap_sudo_if_needed(self) -> Self {
- if !self.escalate {
- return self;
- }
- match self.escalation {
- EscalationStrategy::Su => {
- let mut out = self.new_here("su");
- out.arg("-c").arg(self.into_string());
- out
- }
- EscalationStrategy::Sudo => {
- let mut out = self.new_here("sudo");
- out.args(self.into_args());
- out
- }
- EscalationStrategy::Run0 => {
- // run0 wants interactive authentication by default.
- let mut run0 = self.new_here("run0");
- let mut out = self.new_here("script");
-
- // Red backgrounds messes with fleet formatting
- run0.arg("--background=");
- run0.args(self.into_args());
-
- out.arg("-q");
- out.arg("/dev/null");
- out.arg("-c");
- out.arg(run0.into_string());
- dbg!(&out);
- out
- }
- }
- }
-
- pub async fn run(self) -> Result<()> {
- let str = self.clone().into_string();
- let cmd = self.wrap_sudo_if_needed().into_command_new()?;
- match cmd {
- Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,
- Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,
- };
- Ok(())
- }
- pub async fn run_string(self) -> Result<String> {
- let bytes = self.run_bytes().await?;
- Ok(String::from_utf8(bytes)?)
- }
- pub async fn run_bytes(self) -> Result<Vec<u8>> {
- let str = self.clone().into_string();
- let cmd = self.wrap_sudo_if_needed().into_command_new()?;
- let v = match cmd {
- Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,
- Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,
- };
- Ok(v)
- }
-
- pub async fn run_nix_string(mut self) -> Result<String> {
- let str = self.clone().into_string();
- self.arg("--log-format").arg("internal-json");
- let mut cmd = self.wrap_sudo_if_needed().into_command();
- let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;
- Ok(String::from_utf8(bytes)?)
- }
- pub async fn run_nix(mut self) -> Result<()> {
- let str = self.clone().into_string();
- self.arg("--log-format").arg("internal-json");
- let mut cmd = self.wrap_sudo_if_needed().into_command();
- cmd.stdout(Stdio::inherit());
- run_nix_inner(str, cmd, &mut NixHandler::default()).await
- }
-}
-
-struct EmptyAsyncRead;
-impl AsyncRead for EmptyAsyncRead {
- fn poll_read(
- self: std::pin::Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- _buf: &mut tokio::io::ReadBuf<'_>,
- ) -> Poll<std::io::Result<()>> {
- Poll::Pending
- }
-}
-
-async fn run_nix_inner_stdout(
- str: String,
- cmd: Command,
- handler: &mut dyn Handler,
-) -> Result<Vec<u8>> {
- Ok(run_nix_inner_raw(str, cmd, true, handler, None)
- .await?
- .expect("has out"))
-}
-async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {
- let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;
- assert!(v.is_none());
- Ok(())
-}
-async fn run_nix_inner_stdout_ssh(
- str: String,
- cmd: OwningCommand<Arc<Session>>,
- handler: &mut dyn Handler,
-) -> Result<Vec<u8>> {
- Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)
- .await?
- .expect("has out"))
-}
-async fn run_nix_inner_ssh(
- str: String,
- cmd: OwningCommand<Arc<Session>>,
- handler: &mut dyn Handler,
-) -> Result<()> {
- let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
- assert!(v.is_none());
- Ok(())
-}
-
-async fn run_nix_inner_raw(
- str: String,
- mut cmd: Command,
- want_stdout: bool,
- err_handler: &mut dyn Handler,
- mut out_handler: Option<&mut dyn Handler>,
-) -> Result<Option<Vec<u8>>> {
- cmd.stderr(Stdio::piped());
- cmd.stdout(Stdio::piped());
- debug!("running command {str:?} on local");
- let mut child = cmd.spawn()?;
- let mut stderr = child.stderr.take().unwrap();
- let stdout = child.stdout.take().unwrap();
- let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
- let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
- let mut ob = want_stdout
- .then(|| out.take().unwrap())
- .unwrap_or_else(|| Box::new(EmptyAsyncRead));
- let mut ol = (!want_stdout)
- .then(|| out.take().unwrap())
- .unwrap_or_else(|| Box::new(EmptyAsyncRead));
- let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
- let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
-
- // while let Some(line) = read.next().await? {}
-
- let mut out_buf = if want_stdout { Some(vec![]) } else { None };
- loop {
- select! {
- e = err.next() => {
- if let Some(e) = e {
- let e = e?;
- err_handler.handle_line(&e);
- }
- },
- o = ob.next() => {
- if let Some(o) = o {
- out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
- }
- },
- o = ol.next() => {
- if let Some(o) = o {
- let o = o?;
- if let Some(out) = out_handler.as_mut() {
- out.handle_line(&o)
- } else {
- err_handler.handle_line(&o)
- }
- // out_handler.handle_info(&o);
- }
- },
- code = child.wait() => {
- let code = code?;
- if !code.success() {
- anyhow::bail!("command '{str}' failed with status {}", code);
- }
- break;
- }
- }
- }
-
- Ok(out_buf)
-}
-async fn run_nix_inner_raw_ssh(
- str: String,
- mut cmd: OwningCommand<Arc<Session>>,
- want_stdout: bool,
- err_handler: &mut dyn Handler,
- mut out_handler: Option<&mut dyn Handler>,
-) -> Result<Option<Vec<u8>>> {
- debug!("running command {str:?} over ssh");
- cmd.stderr(openssh::Stdio::piped());
- cmd.stdout(openssh::Stdio::piped());
- let mut child = cmd.spawn().await?;
- let mut stderr = child.stderr().take().unwrap();
- let stdout = child.stdout().take().unwrap();
- let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
- let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
- let mut ob = want_stdout
- .then(|| out.take().unwrap())
- .unwrap_or_else(|| Box::new(EmptyAsyncRead));
- let mut ol = (!want_stdout)
- .then(|| out.take().unwrap())
- .unwrap_or_else(|| Box::new(EmptyAsyncRead));
- let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
- let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
-
- // while let Some(line) = read.next().await? {}
-
- let mut out_buf = if want_stdout { Some(vec![]) } else { None };
-
- let mut wait_future = pin::pin!(child.wait());
- loop {
- select! {
- e = err.next() => {
- if let Some(e) = e {
- let e = e?;
- err_handler.handle_line(&e);
- }
- },
- o = ob.next() => {
- if let Some(o) = o {
- out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
- }
- },
- o = ol.next() => {
- if let Some(o) = o {
- let o = o?;
- if let Some(out) = out_handler.as_mut() {
- out.handle_line(&o)
- } else {
- err_handler.handle_line(&o)
- }
- // out_handler.handle_info(&o);
- }
- },
- code = &mut wait_future => {
- let code = code?;
- if !code.success() {
- anyhow::bail!("command '{str}' failed with status {}", code);
- }
- break;
- }
- }
- }
-
- Ok(out_buf)
-}
cmds/fleet/src/fleetdata.rsdiffbeforeafterboth--- a/cmds/fleet/src/fleetdata.rs
+++ /dev/null
@@ -1,107 +0,0 @@
-use std::{
- collections::BTreeMap,
- io::{self, Cursor},
-};
-
-use age::Recipient;
-use chrono::{DateTime, Utc};
-use fleet_shared::SecretData;
-use itertools::Itertools;
-use serde::{de::Error, Deserialize, Serialize};
-
-#[derive(Serialize, Deserialize, Default)]
-#[serde(rename_all = "camelCase")]
-pub struct HostData {
- #[serde(default)]
- #[serde(skip_serializing_if = "String::is_empty")]
- pub encryption_key: String,
-}
-
-const VERSION: &str = "0.1.0";
-pub struct FleetDataVersion;
-impl Serialize for FleetDataVersion {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: serde::Serializer,
- {
- VERSION.serialize(serializer)
- }
-}
-impl<'de> Deserialize<'de> for FleetDataVersion {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- let version = String::deserialize(deserializer)?;
- if version != VERSION {
- return Err(D::Error::custom(format!(
- "fleet.nix data version mismatch, expected {VERSION}, got {version}.\nFollow the docs for migration instruction"
- )));
- }
- Ok(Self)
- }
-}
-
-#[derive(Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct FleetData {
- pub version: FleetDataVersion,
-
- #[serde(default)]
- pub hosts: BTreeMap<String, HostData>,
- #[serde(default)]
- #[serde(skip_serializing_if = "BTreeMap::is_empty")]
- pub shared_secrets: BTreeMap<String, FleetSharedSecret>,
- #[serde(default)]
- #[serde(skip_serializing_if = "BTreeMap::is_empty")]
- pub host_secrets: BTreeMap<String, BTreeMap<String, FleetSecret>>,
-}
-
-#[derive(Serialize, Deserialize, Clone)]
-#[serde(rename_all = "camelCase")]
-#[must_use]
-pub struct FleetSharedSecret {
- pub owners: Vec<String>,
- #[serde(flatten)]
- pub secret: FleetSecret,
-}
-
-/// Returns None if recipients.is_empty()
-pub fn encrypt_secret_data(
- recipients: impl IntoIterator<Item = impl Recipient + Send + 'static>,
- data: Vec<u8>,
-) -> Option<SecretData> {
- let mut encrypted = vec![];
- let recipients = recipients
- .into_iter()
- .map(|v| Box::new(v) as Box<dyn Recipient + Send>)
- .collect_vec();
- let mut encryptor = age::Encryptor::with_recipients(recipients)?
- .wrap_output(&mut encrypted)
- .expect("in memory write");
- io::copy(&mut Cursor::new(data), &mut encryptor).expect("in memory copy");
- encryptor.finish().expect("in memory flush");
- Some(SecretData {
- data: encrypted,
- encrypted: true,
- })
-}
-
-#[derive(Serialize, Deserialize, Clone)]
-pub struct FleetSecretPart {
- pub raw: SecretData,
-}
-
-#[derive(Serialize, Deserialize, Clone)]
-#[serde(rename_all = "camelCase")]
-#[must_use]
-pub struct FleetSecret {
- #[serde(default = "Utc::now")]
- pub created_at: DateTime<Utc>,
- #[serde(default)]
- #[serde(skip_serializing_if = "Option::is_none", alias = "expire_at")]
- pub expires_at: Option<DateTime<Utc>>,
-
- #[serde(flatten)]
- pub parts: BTreeMap<String, FleetSecretPart>,
-}
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ /dev/null
@@ -1,630 +0,0 @@
-use std::{
- cell::{LazyCell, OnceCell},
- collections::BTreeMap,
- env::current_dir,
- ffi::{OsStr, OsString},
- fmt::Display,
- io::Write,
- ops::Deref,
- path::PathBuf,
- str::FromStr,
- sync::{Arc, Mutex, MutexGuard, OnceLock},
-};
-
-use anyhow::{anyhow, bail, ensure, Context, Result};
-use clap::Parser;
-use fleet_shared::SecretData;
-use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};
-use nom::{
- bytes::complete::take_while1,
- character::complete::char,
- combinator::{map, opt},
- multi::separated_list1,
- sequence::{preceded, separated_pair},
-};
-use openssh::SessionBuilder;
-use serde::de::DeserializeOwned;
-use tempfile::NamedTempFile;
-use tracing::error;
-
-use crate::{
- command::MyCommand,
- fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
-};
-
-pub struct FleetConfigInternals {
- pub local_system: String,
- pub directory: PathBuf,
- pub opts: FleetOpts,
- pub data: Mutex<FleetData>,
- pub nix_args: Vec<OsString>,
- /// fleet_config.config
- pub config_field: Value,
-
- /// import nixpkgs {system = local};
- pub default_pkgs: Value,
-}
-
-#[derive(Clone)]
-pub struct Config(Arc<FleetConfigInternals>);
-
-impl Deref for Config {
- type Target = FleetConfigInternals;
-
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-#[derive(Clone, Copy, Debug)]
-pub enum EscalationStrategy {
- Sudo,
- Run0,
- Su,
-}
-
-pub struct ConfigHost {
- config: Config,
- pub name: String,
- pub local: bool,
- pub session: OnceLock<Arc<openssh::Session>>,
- groups: OnceCell<Vec<String>>,
-
- pub host_config: Option<Value>,
- pub nixos_config: OnceCell<Value>,
-}
-impl ConfigHost {
- pub async fn escalation_strategy(&self) -> Result<EscalationStrategy> {
- // Prefer sudo, as run0 has some gotchas with polkit
- // and too many repeating prompts.
- if let Ok(_) = self.find_in_path("sudo").await {
- return Ok(EscalationStrategy::Sudo);
- }
- if let Ok(_) = self.find_in_path("run0").await {
- return Ok(EscalationStrategy::Run0);
- }
- Ok(EscalationStrategy::Su)
- }
- // TOCTOU is possible here in case if config is changed, but this case is not handled anywhere anyway,
- // assuming getting tags always returns the same value.
- pub async fn tags(&self) -> Result<Vec<String>> {
- if let Some(v) = self.groups.get() {
- return Ok(v.clone());
- }
- let Some(host_config) = &self.host_config else {
- return Ok(vec![]);
- };
- let tags: Vec<String> = nix_go_json!(host_config.tags);
-
- let _ = self.groups.set(tags.clone());
-
- Ok(tags)
- }
- pub async fn nixos_config(&self) -> Result<Value> {
- if let Some(v) = self.nixos_config.get() {
- return Ok(v.clone());
- }
- let Some(host_config) = &self.host_config else {
- bail!("local host has no nixos_config");
- };
- let nixos_config = nix_go!(host_config.nixos.config);
- assert_warn("nixos config evaluation", &nixos_config).await?;
-
- let _ = self.nixos_config.set(nixos_config.clone());
-
- Ok(nixos_config)
- }
- async fn open_session(&self) -> Result<Arc<openssh::Session>> {
- assert!(!self.local, "do not open ssh connection to local session");
- // FIXME: TOCTOU
- if let Some(session) = &self.session.get() {
- return Ok((*session).clone());
- };
- let mut session = SessionBuilder::default();
- let session = session
- .connect(&self.name)
- .await
- .map_err(|e| anyhow!("ssh error while connecting to {}: {e}", self.name))?;
- let session = Arc::new(session);
- self.session.set(session.clone()).expect("TOCTOU happened");
- Ok(session)
- }
- pub async fn mktemp_dir(&self) -> Result<String> {
- let mut cmd = self.cmd("mktemp").await?;
- cmd.arg("-d");
- let path = cmd.run_string().await?;
- Ok(path.trim_end().to_owned())
- }
- pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
- let mut cmd = self.cmd("cat").await?;
- cmd.arg(path);
- cmd.run_bytes().await
- }
- pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
- let mut cmd = self.cmd("cat").await?;
- cmd.arg(path);
- cmd.run_string().await
- }
- pub async fn read_dir(&self, path: impl AsRef<OsStr>) -> Result<Vec<String>> {
- let mut cmd = self.cmd("ls").await?;
- cmd.arg(path);
- let out = cmd.run_string().await?;
- let mut lines = out.split('\n');
- if let Some(last) = lines.next_back() {
- ensure!(last.is_empty(), "output of ls should end with newline");
- }
- Ok(lines.map(ToOwned::to_owned).collect())
- }
- #[allow(dead_code)]
- pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {
- let text = self.read_file_text(path).await?;
- Ok(serde_json::from_str(&text)?)
- }
- pub async fn read_env(&self, env: &str) -> Result<String> {
- let mut cmd = self.cmd("printenv").await?;
- cmd.arg(env);
- Ok(cmd.run_string().await?)
- }
- pub async fn find_in_path(&self, command: &str) -> Result<String> {
- // // `which` is not a part of coreutils, and it might not exist on machine.
- // let path = self.read_env("PATH").await?;
- // // Assuming delimiter is :, we don't work with windows host, this check will be much
- // // more sophisticated in remowt backend (and quicker, since actual PATH search will be done on remote machine)
- // for ele in path.split(':') {
- // let test_path = format!("{ele}/{cmd}");
- // test -x etc
- // }
- // let mut cmd = self.cmd("printenv").await?;
- // cmd.arg(env);
- // Ok(cmd.run_string().await?)
- // Assuming this is an environment issue if which doesn't exist, will be fixed with remowt.
- let mut cmd = self
- .cmd_escalation(
- // Not used
- EscalationStrategy::Su,
- "which",
- )
- .await?;
- cmd.arg(command);
- cmd.run_string().await
- }
- pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>
- where
- <D as FromStr>::Err: Display,
- {
- let text = self.read_file_text(path).await?;
- D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
- }
- pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
- self.cmd_escalation(self.escalation_strategy().await?, cmd)
- .await
- }
- pub async fn cmd_escalation(
- &self,
- escalation: EscalationStrategy,
- cmd: impl AsRef<OsStr>,
- ) -> Result<MyCommand> {
- if self.local {
- Ok(MyCommand::new(escalation, cmd))
- } else {
- let session = self.open_session().await?;
- Ok(MyCommand::new_on(escalation, cmd, session))
- }
- }
-
- pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
- ensure!(data.encrypted, "secret is not encrypted");
- let mut cmd = self.cmd("fleet-install-secrets").await?;
- cmd.arg("decrypt").eqarg("--secret", data.to_string());
- let encoded = cmd
- .sudo()
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?;
- let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;
- ensure!(!data.encrypted, "secret came out encrypted");
- Ok(data.data)
- }
- pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
- ensure!(data.encrypted, "secret is not encrypted");
- let mut cmd = self.cmd("fleet-install-secrets").await?;
- cmd.arg("reencrypt").eqarg("--secret", data.to_string());
- for target in targets {
- let key = self.config.key(&target).await?;
- cmd.eqarg("--targets", key);
- }
- let encoded = cmd
- .sudo()
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?;
- let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;
- ensure!(data.encrypted, "secret came out not encrypted");
- Ok(data)
- }
- /// Returns path for futureproofing, as path might change i.e on conversion to CA
- pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
- if self.local {
- // Path is located locally, thus already trusted.
- return Ok(path.to_owned());
- }
- let mut nix = MyCommand::new(
- // Not used
- EscalationStrategy::Su,
- "nix",
- );
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{}", self.name))
- .arg(path);
- nix.run_nix().await.context("nix copy")?;
- Ok(path.to_owned())
- }
- pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
- let mut cmd = self.cmd("systemctl").await?;
- cmd.arg("stop").arg(name);
- cmd.sudo().run().await
- }
- pub async fn systemctl_start(&self, name: &str) -> Result<()> {
- let mut cmd = self.cmd("systemctl").await?;
- cmd.arg("start").arg(name);
- cmd.sudo().run().await
- }
-
- pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
- let mut cmd = self.cmd("rm").await?;
- cmd.arg("-f").arg(path);
- if sudo {
- cmd = cmd.sudo()
- }
- cmd.run().await
- }
-
- pub async fn list_configured_secrets(&self) -> Result<Vec<String>> {
- let nixos = self.nixos_config().await?;
- let secrets = nix_go!(nixos.secrets);
- let mut out = Vec::new();
- for name in secrets.list_fields().await? {
- let secret = nix_go!(secrets[{ name }]);
- let is_shared: bool = nix_go_json!(secret.shared);
- if is_shared {
- continue;
- }
- out.push(name);
- }
- Ok(out)
- }
- pub async fn secret_field(&self, name: &str) -> Result<Value> {
- let nixos = self.nixos_config().await?;
- Ok(nix_go!(nixos.secrets[{ name }]))
- }
-
- /// Packages for this host, resolved with nixpkgs overlays
- pub async fn pkgs(&self) -> Result<Value> {
- let Some(host_config) = &self.host_config else {
- bail!("local host has no host_config");
- };
- // TODO: Should nixos.options be cached?
- Ok(nix_go!(host_config.nixos.options._module.args.value.pkgs))
- }
-}
-
-impl Config {
- pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {
- if !self.opts.skip.is_empty() && self.opts.skip.iter().any(|h| h as &str == host.name) {
- return Ok(true);
- }
- if self.opts.only.is_empty() {
- return Ok(false);
- }
- let mut have_group_matches = false;
- for item in self.opts.only.iter() {
- match item {
- HostItem::Host { name, .. } if *name == host.name => {
- return Ok(false);
- }
- HostItem::Tag { .. } => {
- have_group_matches = true;
- }
- _ => {}
- }
- }
- if have_group_matches {
- let host_tags = host.tags().await?;
- for item in self.opts.only.iter() {
- match item {
- HostItem::Tag { name, .. } if host_tags.contains(name) => {
- return Ok(false);
- }
- _ => {}
- }
- }
- }
- Ok(true)
- }
- pub async fn action_attr(&self, host: &ConfigHost, attr: &str) -> Result<Option<String>> {
- if self.opts.only.is_empty() {
- return Ok(None);
- }
- let mut have_group_matches = false;
- for item in self.opts.only.iter() {
- match item {
- HostItem::Host { name, attrs }
- if *name == host.name && attrs.contains_key(attr) =>
- {
- return Ok(attrs.get(attr).cloned());
- }
- HostItem::Tag { attrs, .. } if attrs.contains_key(attr) => {
- have_group_matches = true;
- }
- _ => {}
- }
- }
- if have_group_matches {
- let host_tags = host.tags().await?;
- for item in self.opts.only.iter() {
- match item {
- HostItem::Tag { name, attrs }
- if host_tags.contains(name) && attrs.contains_key(attr) =>
- {
- return Ok(attrs.get(attr).cloned());
- }
- _ => {}
- }
- }
- }
- Ok(None)
- }
- pub fn is_local(&self, host: &str) -> bool {
- self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
- }
-
- pub fn local_host(&self) -> ConfigHost {
- ConfigHost {
- config: self.clone(),
- name: "<virtual localhost>".to_owned(),
- local: true,
- session: OnceLock::new(),
- host_config: None,
- nixos_config: OnceCell::new(),
- groups: {
- let cell = OnceCell::new();
- let _ = cell.set(vec![]);
- cell
- },
- }
- }
-
- pub async fn host(&self, name: &str) -> Result<ConfigHost> {
- let config = &self.config_field;
- let host_config = nix_go!(config.hosts[{ name }]);
-
- Ok(ConfigHost {
- config: self.clone(),
- name: name.to_owned(),
- local: self.is_local(name),
- session: OnceLock::new(),
- host_config: Some(host_config),
- nixos_config: OnceCell::new(),
- groups: OnceCell::new(),
- })
- }
- pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
- let config = &self.config_field;
- let names = nix_go!(config.hosts).list_fields().await?;
- let mut out = vec![];
- for name in names {
- out.push(self.host(&name).await?);
- }
- Ok(out)
- }
- pub async fn system_config(&self, host: &str) -> Result<Value> {
- let fleet_field = &self.config_field;
- Ok(nix_go!(fleet_field.hosts[{ host }].nixos.config))
- }
-
- pub(super) fn data(&self) -> MutexGuard<FleetData> {
- self.data.lock().unwrap()
- }
- pub(super) fn data_mut(&self) -> MutexGuard<FleetData> {
- self.data.lock().unwrap()
- }
- /// Shared secrets configured in fleet.nix or in flake
- pub async fn list_configured_shared(&self) -> Result<Vec<String>> {
- let config_field = &self.config_field;
- Ok(nix_go!(config_field.sharedSecrets).list_fields().await?)
- }
- /// Shared secrets configured in fleet.nix
- pub fn list_shared(&self) -> Vec<String> {
- let data = self.data();
- data.shared_secrets.keys().cloned().collect()
- }
- pub fn has_shared(&self, name: &str) -> bool {
- let data = self.data();
- data.shared_secrets.contains_key(name)
- }
- pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {
- let mut data = self.data_mut();
- data.shared_secrets.insert(name.to_owned(), shared);
- }
- pub fn remove_shared(&self, secret: &str) {
- let mut data = self.data_mut();
- data.shared_secrets.remove(secret);
- }
-
- pub fn list_secrets(&self, host: &str) -> Vec<String> {
- let data = self.data();
- let Some(secrets) = data.host_secrets.get(host) else {
- return Vec::new();
- };
- secrets.keys().cloned().collect()
- }
-
- pub fn has_secret(&self, host: &str, secret: &str) -> bool {
- let data = self.data();
- let Some(host_secrets) = data.host_secrets.get(host) else {
- return false;
- };
- host_secrets.contains_key(secret)
- }
- pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {
- let mut data = self.data_mut();
- let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
- host_secrets.insert(secret, value);
- }
-
- pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
- let data = self.data();
- let Some(host_secrets) = data.host_secrets.get(host) else {
- bail!("no secrets for machine {host}");
- };
- let Some(secret) = host_secrets.get(secret) else {
- bail!("machine {host} has no secret {secret}");
- };
- Ok(secret.clone())
- }
- pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {
- let data = self.data();
- let Some(secret) = data.shared_secrets.get(secret) else {
- bail!("no shared secret {secret}");
- };
- Ok(secret.clone())
- }
- pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {
- let config_field = &self.config_field;
- Ok(nix_go_json!(
- config_field.sharedSecrets[{ secret }].expectedOwners
- ))
- }
-
- pub fn save(&self) -> Result<()> {
- let mut tempfile = NamedTempFile::new_in(self.directory.clone()).context("failed to create updated version of fleet.nix in the same directory as original.\nDo you have write access to it? Access only to the fleet.nix won't be enough, the directory is used for atomic overwrite operation.\nIt is not recommended to use fleet by root anyway, move fleet project to your home directory.")?;
- let data = nixlike::serialize(&self.data() as &FleetData)?;
- tempfile.write_all(
- format!(
- "# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",
- data
- )
- .as_bytes(),
- )?;
- let mut fleet_data_path = self.directory.clone();
- fleet_data_path.push("fleet.nix");
- tempfile.persist(fleet_data_path)?;
- Ok(())
- }
-}
-
-#[derive(Clone)]
-enum HostItem {
- Host {
- name: String,
- attrs: BTreeMap<String, String>,
- },
- Tag {
- name: String,
- attrs: BTreeMap<String, String>,
- },
-}
-fn host_item_parser(input: &str) -> Result<HostItem, String> {
- fn err_to_string(err: nom::Err<nom::error::Error<&str>>) -> String {
- err.to_string()
- }
-
- let (input, is_tag) = map(opt(char('@')), |c| c.is_some())(input).map_err(err_to_string)?;
- let (input, name) = map(
- take_while1(|v| v != ',' && v != '?' && v != '@'),
- str::to_owned,
- )(input)
- .map_err(err_to_string)?;
-
- let kw_item = separated_pair(
- map(take_while1(|v| v != '&' && v != '='), str::to_owned),
- char('='),
- map(take_while1(|v| v != '&'), str::to_owned),
- );
- let kw = map(separated_list1(char('&'), kw_item), |vec| {
- vec.into_iter().collect::<BTreeMap<_, _>>()
- });
- let mut opt_kw = map(opt(preceded(char('?'), kw)), Option::unwrap_or_default);
-
- let (input, attrs) = opt_kw(input).map_err(err_to_string)?;
-
- if !input.is_empty() {
- return Err(format!("unexpected trailing input: {input:?}"));
- }
- Ok(if is_tag {
- HostItem::Tag { name, attrs }
- } else {
- HostItem::Host { name, attrs }
- })
-}
-
-#[derive(Parser, Clone)]
-pub struct FleetOpts {
- /// All hosts except those would be skipped
- #[clap(long, number_of_values = 1, value_parser = host_item_parser)]
- only: Vec<HostItem>,
-
- /// Hosts to skip
- #[clap(long, number_of_values = 1)]
- skip: Vec<String>,
-
- /// Host, which should be threaten as current machine
- #[clap(long)]
- pub localhost: Option<String>,
-
- /// Override detected system for host, to perform builds via
- /// binfmt-declared qemu instead of trying to crosscompile
- #[clap(long, default_value = "detect")]
- pub local_system: String,
-}
-
-impl FleetOpts {
- pub async fn build(mut self, nix_args: Vec<OsString>) -> Result<Config> {
- if self.localhost.is_none() {
- self.localhost
- .replace(hostname::get().unwrap().to_str().unwrap().to_owned());
- }
- let directory = current_dir()?;
-
- let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;
- let root_field = pool.get().await?;
-
- let builtins_field = Value::binding(root_field.clone(), "builtins").await?;
- if self.local_system == "detect" {
- self.local_system = nix_go_json!(builtins_field.currentSystem);
- }
- let local_system = self.local_system.clone();
-
- let mut fleet_data_path = directory.clone();
- fleet_data_path.push("fleet.nix");
- let bytes = std::fs::read_to_string(fleet_data_path)?;
- let data: Mutex<FleetData> = nixlike::parse_str(&bytes)?;
-
- let fleet_root = Value::binding(root_field, "fleetConfigurations").await?;
- let fleet_field = nix_go!(fleet_root.default({ data }));
-
- let config_field = nix_go!(fleet_field.config);
-
- assert_warn("fleet config evaluation", &config_field).await?;
-
- let import = nix_go!(builtins_field.import);
- let overlays = nix_go!(config_field.nixpkgs.overlays);
- let nixpkgs = nix_go!(fleet_field.nixpkgs.buildUsing | import);
-
- let default_pkgs = nix_go!(nixpkgs(Obj {
- overlays,
- system: { self.local_system.clone() },
- }));
-
- Ok(Config(Arc::new(FleetConfigInternals {
- opts: self,
- directory,
- data,
- local_system,
- nix_args,
- config_field,
- default_pkgs,
- })))
- }
-}
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ /dev/null
@@ -1,77 +0,0 @@
-use std::str::FromStr;
-
-use age::Recipient;
-use anyhow::{anyhow, Result};
-use futures::{StreamExt, TryStreamExt};
-use itertools::Itertools;
-use tracing::warn;
-
-use crate::host::Config;
-
-impl Config {
- pub fn cached_key(&self, host: &str) -> Option<String> {
- let data = self.data();
- let key = data.hosts.get(host).map(|h| &h.encryption_key);
- if let Some(key) = key {
- if key.is_empty() {
- return None;
- }
- }
- key.cloned()
- }
- pub fn update_key(&self, host: &str, key: String) {
- let mut data = self.data_mut();
- let host = data.hosts.entry(host.to_string()).or_default();
- host.encryption_key = key.trim().to_string();
- }
-
- pub async fn key(&self, host: &str) -> anyhow::Result<String> {
- if let Some(key) = self.cached_key(host) {
- Ok(key)
- } else {
- warn!("Loading key for {}", host);
- let host = self.host(host).await?;
- let mut cmd = host.cmd("cat").await?;
- cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = cmd.run_string().await?;
- self.update_key(&host.name, key.clone());
- Ok(key)
- }
- }
- /// Insecure, requires root
- pub async fn recipient(&self, host: &str) -> anyhow::Result<impl Recipient> {
- let key = self.key(host).await?;
- age::ssh::Recipient::from_str(&key).map_err(|e| anyhow!("parse recipient error: {:?}", e))
- }
-
- pub async fn recipients(&self, hosts: Vec<String>) -> Result<Vec<impl Recipient>> {
- futures::stream::iter(hosts.iter())
- .then(|m| self.recipient(m.as_ref()))
- .try_collect::<Vec<_>>()
- .await
- }
-
- #[allow(dead_code)]
- pub async fn orphaned_data(&self) -> Result<Vec<String>> {
- let mut out = Vec::new();
- let host_names = self
- .list_hosts()
- .await?
- .into_iter()
- .map(|h| h.name)
- .collect_vec();
- for hostname in self
- .data()
- .hosts
- .iter()
- .filter(|(_, host)| !host.encryption_key.is_empty())
- .map(|(n, _)| n)
- {
- if !host_names.contains(hostname) {
- out.push(hostname.to_owned())
- }
- }
-
- Ok(out)
- }
-}
cmds/fleet/src/main.rsdiffbeforeafterboth--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -2,13 +2,8 @@
#![feature(try_blocks)]
pub(crate) mod cmds;
-pub(crate) mod command;
-pub(crate) mod host;
-pub(crate) mod keys;
-
+// pub(crate) mod command;
pub(crate) mod extra_args;
-
-mod fleetdata;
use std::{ffi::OsString, process::ExitCode};
@@ -21,8 +16,9 @@
secrets::Secret,
tf::Tf,
};
+use fleet_base::{host::Config, opts::FleetOpts};
use futures::{future::LocalBoxFuture, stream::FuturesUnordered, TryStreamExt};
-use host::{Config, FleetOpts};
+// use host::Config;
#[cfg(feature = "indicatif")]
use human_repr::HumanCount;
#[cfg(feature = "indicatif")]
@@ -31,8 +27,6 @@
#[cfg(feature = "indicatif")]
use tracing_indicatif::IndicatifLayer;
use tracing_subscriber::{prelude::*, EnvFilter};
-
-use crate::command::MyCommand;
#[derive(Parser)]
struct Prefetch {}
@@ -88,6 +82,7 @@
#[clap(hide(true))]
Complete(Complete),
/// Compile and evaluate terranix configuration
+ #[clap(subcommand)]
Tf(Tf),
}
@@ -100,11 +95,11 @@
command: Opts,
}
-async fn run_command(config: &Config, command: Opts) -> Result<()> {
+async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {
match command {
- Opts::BuildSystems(c) => c.run(config).await?,
- Opts::Deploy(d) => d.run(config).await?,
- Opts::Secret(s) => s.run(config).await?,
+ Opts::BuildSystems(c) => c.run(config, &opts).await?,
+ Opts::Deploy(d) => d.run(config, &opts).await?,
+ Opts::Secret(s) => s.run(config, &opts).await?,
Opts::Info(i) => i.run(config).await?,
Opts::Prefetch(p) => p.run(config).await?,
Opts::Tf(t) => t.run(config).await?,
@@ -211,7 +206,7 @@
.unwrap_or_default();
let config = opts.fleet_opts.build(nix_args).await?;
- match run_command(&config, opts.command).await {
+ match run_command(&config, opts.fleet_opts, opts.command).await {
Ok(()) => {
config.save()?;
Ok(())
crates/fleet-base/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/fleet-base/Cargo.toml
@@ -0,0 +1,25 @@
+[package]
+name = "fleet-base"
+edition = "2021"
+version.workspace = true
+
+[dependencies]
+age.workspace = true
+anyhow.workspace = true
+better-command.workspace = true
+chrono = "0.4.38"
+clap = { workspace = true, features = ["derive"] }
+fleet-shared.workspace = true
+futures = "0.3.30"
+hostname = "0.4.0"
+itertools = "0.13.0"
+nix-eval.workspace = true
+nixlike.workspace = true
+nom = "7.1.3"
+openssh = "0.11.0"
+serde.workspace = true
+serde_json = "1.0.127"
+tempfile.workspace = true
+tokio.workspace = true
+tokio-util = "0.7.11"
+tracing.workspace = true
crates/fleet-base/src/command.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/fleet-base/src/command.rs
@@ -0,0 +1,430 @@
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
+
+use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
+use futures::StreamExt;
+use itertools::Either;
+use openssh::{OverSsh, OwningCommand, Session};
+use tokio::{io::AsyncRead, process::Command, select};
+use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
+use tracing::debug;
+
+use crate::host::EscalationStrategy;
+
+fn escape_bash(input: &str, out: &mut String) {
+ const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
+ if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
+ out.push_str(input);
+ return;
+ }
+ out.push('\'');
+ for (i, v) in input.split('\'').enumerate() {
+ if i != 0 {
+ out.push_str("'\"'\"'");
+ }
+ out.push_str(v);
+ }
+ out.push('\'');
+}
+fn ostoutf8(os: impl AsRef<OsStr>) -> String {
+ os.as_ref().to_str().expect("non-utf8 data").to_owned()
+}
+
+#[derive(Clone, Debug)]
+pub struct MyCommand {
+ command: String,
+ args: Vec<String>,
+ env: Vec<(String, String)>,
+ ssh_session: Option<Arc<Session>>,
+ escalation: EscalationStrategy,
+ escalate: bool,
+}
+impl MyCommand {
+ pub fn new_on(
+ escalation: EscalationStrategy,
+ cmd: impl AsRef<OsStr>,
+ session: Arc<Session>,
+ ) -> Self {
+ assert!(!cmd.as_ref().is_empty());
+ Self {
+ command: ostoutf8(cmd),
+ args: vec![],
+ env: vec![],
+ ssh_session: Some(session),
+ escalation,
+ escalate: false,
+ }
+ }
+ pub fn new(escalation: EscalationStrategy, cmd: impl AsRef<OsStr>) -> Self {
+ assert!(!cmd.as_ref().is_empty());
+ Self {
+ command: ostoutf8(cmd),
+ args: vec![],
+ env: vec![],
+ ssh_session: None,
+ escalation,
+ escalate: false,
+ }
+ }
+ fn new_here(&self, cmd: impl AsRef<OsStr>) -> Self {
+ if let Some(ssh_session) = self.ssh_session.clone() {
+ Self::new_on(self.escalation, cmd, ssh_session)
+ } else {
+ Self::new(self.escalation, cmd)
+ }
+ }
+
+ fn into_args(self) -> Vec<String> {
+ let mut out = Vec::new();
+ if !self.env.is_empty() {
+ out.push("env".to_owned());
+ for (k, v) in self.env {
+ assert!(!k.contains('='));
+ out.push(format!("{k}={v}"));
+ }
+ }
+ out.push(self.command);
+ out.extend(self.args);
+ out
+ }
+
+ /// Translates environment variables into env command execution.
+ /// Required for ssh, as ssh don't allow to send environment variables (at least by default).
+ ///
+ /// FIXME: Insecure, as arguments might be seen by other users on the same machine.
+ /// Figure out some way to transfer environment using stdio?
+ fn translate_env_into_env(self) -> Self {
+ if self.env.is_empty() {
+ return self;
+ }
+ let mut out = self.new_here("env");
+ for (k, v) in self.env {
+ assert!(!k.contains('='));
+ out.arg(format!("{k}={v}"));
+ }
+ out.arg(self.command);
+ out.args(self.args);
+
+ out
+ }
+ fn into_string(self) -> String {
+ let mut out = String::new();
+ if !self.env.is_empty() {
+ out.push_str("env");
+ for (k, v) in self.env {
+ out.push(' ');
+ assert!(!k.contains('='));
+ escape_bash(&k, &mut out);
+ out.push('=');
+ escape_bash(&v, &mut out);
+ }
+ }
+ if !out.is_empty() {
+ out.push(' ');
+ }
+ escape_bash(&self.command, &mut out);
+ for arg in self.args {
+ out.push(' ');
+ escape_bash(&arg, &mut out);
+ }
+ out
+ }
+ fn into_command(self) -> Command {
+ let mut out = Command::new(self.command);
+ out.args(self.args);
+ for (k, v) in self.env {
+ out.env(k, v);
+ }
+ out
+ }
+ fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
+ Ok(if let Some(session) = self.ssh_session.clone() {
+ let cmd = self.translate_env_into_env().into_command();
+ Either::Right(
+ cmd.over_ssh(session)
+ .map_err(|e| anyhow!("ssh error: {e}"))?,
+ )
+ } else {
+ let cmd = self.into_command();
+ Either::Left(cmd)
+ })
+ }
+ pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {
+ let arg = arg.as_ref();
+ self.args.push(ostoutf8(arg));
+ self
+ }
+ pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
+ let arg = arg.as_ref();
+ let value = value.as_ref();
+ let arg = ostoutf8(arg);
+ let value = ostoutf8(value);
+ self.arg(format!("{arg}={value}"));
+ self
+ }
+ pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
+ self.arg(arg);
+ self.arg(value);
+ self
+ }
+ pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+ self.env
+ .push((name.as_ref().to_owned(), value.as_ref().to_owned()));
+ self
+ }
+ pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
+ for arg in args.into_iter() {
+ let arg = arg.as_ref();
+ self.args.push(ostoutf8(arg));
+ }
+ self
+ }
+ pub fn sudo(mut self) -> Self {
+ self.escalate = true;
+ self
+ }
+ fn wrap_sudo_if_needed(self) -> Self {
+ if !self.escalate {
+ return self;
+ }
+ match self.escalation {
+ EscalationStrategy::Su => {
+ let mut out = self.new_here("su");
+ out.arg("-c").arg(self.into_string());
+ out
+ }
+ EscalationStrategy::Sudo => {
+ let mut out = self.new_here("sudo");
+ out.args(self.into_args());
+ out
+ }
+ EscalationStrategy::Run0 => {
+ // run0 wants interactive authentication by default.
+ let mut run0 = self.new_here("run0");
+ let mut out = self.new_here("script");
+
+ // Red backgrounds messes with fleet formatting
+ run0.arg("--background=");
+ run0.args(self.into_args());
+
+ out.arg("-q");
+ out.arg("/dev/null");
+ out.arg("-c");
+ out.arg(run0.into_string());
+ dbg!(&out);
+ out
+ }
+ }
+ }
+
+ pub async fn run(self) -> Result<()> {
+ let str = self.clone().into_string();
+ let cmd = self.wrap_sudo_if_needed().into_command_new()?;
+ match cmd {
+ Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,
+ Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,
+ };
+ Ok(())
+ }
+ pub async fn run_string(self) -> Result<String> {
+ let bytes = self.run_bytes().await?;
+ Ok(String::from_utf8(bytes)?)
+ }
+ pub async fn run_bytes(self) -> Result<Vec<u8>> {
+ let str = self.clone().into_string();
+ let cmd = self.wrap_sudo_if_needed().into_command_new()?;
+ let v = match cmd {
+ Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,
+ Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,
+ };
+ Ok(v)
+ }
+
+ pub async fn run_nix_string(mut self) -> Result<String> {
+ let str = self.clone().into_string();
+ self.arg("--log-format").arg("internal-json");
+ let cmd = self.wrap_sudo_if_needed().into_command();
+ let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;
+ Ok(String::from_utf8(bytes)?)
+ }
+ pub async fn run_nix(mut self) -> Result<()> {
+ let str = self.clone().into_string();
+ self.arg("--log-format").arg("internal-json");
+ let mut cmd = self.wrap_sudo_if_needed().into_command();
+ cmd.stdout(Stdio::inherit());
+ run_nix_inner(str, cmd, &mut NixHandler::default()).await
+ }
+}
+
+struct EmptyAsyncRead;
+impl AsyncRead for EmptyAsyncRead {
+ fn poll_read(
+ self: std::pin::Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ _buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ Poll::Pending
+ }
+}
+
+async fn run_nix_inner_stdout(
+ str: String,
+ cmd: Command,
+ handler: &mut dyn Handler,
+) -> Result<Vec<u8>> {
+ Ok(run_nix_inner_raw(str, cmd, true, handler, None)
+ .await?
+ .expect("has out"))
+}
+async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {
+ let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;
+ assert!(v.is_none());
+ Ok(())
+}
+async fn run_nix_inner_stdout_ssh(
+ str: String,
+ cmd: OwningCommand<Arc<Session>>,
+ handler: &mut dyn Handler,
+) -> Result<Vec<u8>> {
+ Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)
+ .await?
+ .expect("has out"))
+}
+async fn run_nix_inner_ssh(
+ str: String,
+ cmd: OwningCommand<Arc<Session>>,
+ handler: &mut dyn Handler,
+) -> Result<()> {
+ let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
+ assert!(v.is_none());
+ Ok(())
+}
+
+async fn run_nix_inner_raw(
+ str: String,
+ mut cmd: Command,
+ want_stdout: bool,
+ err_handler: &mut dyn Handler,
+ mut out_handler: Option<&mut dyn Handler>,
+) -> Result<Option<Vec<u8>>> {
+ cmd.stderr(Stdio::piped());
+ cmd.stdout(Stdio::piped());
+ debug!("running command {str:?} on local");
+ let mut child = cmd.spawn()?;
+ let mut stderr = child.stderr.take().unwrap();
+ let stdout = child.stdout.take().unwrap();
+ let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+ let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
+ let mut ob = want_stdout
+ .then(|| out.take().unwrap())
+ .unwrap_or_else(|| Box::new(EmptyAsyncRead));
+ let mut ol = (!want_stdout)
+ .then(|| out.take().unwrap())
+ .unwrap_or_else(|| Box::new(EmptyAsyncRead));
+ let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
+ let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
+
+ // while let Some(line) = read.next().await? {}
+
+ let mut out_buf = if want_stdout { Some(vec![]) } else { None };
+ loop {
+ select! {
+ e = err.next() => {
+ if let Some(e) = e {
+ let e = e?;
+ err_handler.handle_line(&e);
+ }
+ },
+ o = ob.next() => {
+ if let Some(o) = o {
+ out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+ }
+ },
+ o = ol.next() => {
+ if let Some(o) = o {
+ let o = o?;
+ if let Some(out) = out_handler.as_mut() {
+ out.handle_line(&o)
+ } else {
+ err_handler.handle_line(&o)
+ }
+ // out_handler.handle_info(&o);
+ }
+ },
+ code = child.wait() => {
+ let code = code?;
+ if !code.success() {
+ anyhow::bail!("command '{str}' failed with status {}", code);
+ }
+ break;
+ }
+ }
+ }
+
+ Ok(out_buf)
+}
+async fn run_nix_inner_raw_ssh(
+ str: String,
+ mut cmd: OwningCommand<Arc<Session>>,
+ want_stdout: bool,
+ err_handler: &mut dyn Handler,
+ mut out_handler: Option<&mut dyn Handler>,
+) -> Result<Option<Vec<u8>>> {
+ debug!("running command {str:?} over ssh");
+ cmd.stderr(openssh::Stdio::piped());
+ cmd.stdout(openssh::Stdio::piped());
+ let mut child = cmd.spawn().await?;
+ let mut stderr = child.stderr().take().unwrap();
+ let stdout = child.stdout().take().unwrap();
+ let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+ let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
+ let mut ob = want_stdout
+ .then(|| out.take().unwrap())
+ .unwrap_or_else(|| Box::new(EmptyAsyncRead));
+ let mut ol = (!want_stdout)
+ .then(|| out.take().unwrap())
+ .unwrap_or_else(|| Box::new(EmptyAsyncRead));
+ let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
+ let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
+
+ // while let Some(line) = read.next().await? {}
+
+ let mut out_buf = if want_stdout { Some(vec![]) } else { None };
+
+ let mut wait_future = pin::pin!(child.wait());
+ loop {
+ select! {
+ e = err.next() => {
+ if let Some(e) = e {
+ let e = e?;
+ err_handler.handle_line(&e);
+ }
+ },
+ o = ob.next() => {
+ if let Some(o) = o {
+ out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+ }
+ },
+ o = ol.next() => {
+ if let Some(o) = o {
+ let o = o?;
+ if let Some(out) = out_handler.as_mut() {
+ out.handle_line(&o)
+ } else {
+ err_handler.handle_line(&o)
+ }
+ // out_handler.handle_info(&o);
+ }
+ },
+ code = &mut wait_future => {
+ let code = code?;
+ if !code.success() {
+ anyhow::bail!("command '{str}' failed with status {}", code);
+ }
+ break;
+ }
+ }
+ }
+
+ Ok(out_buf)
+}
crates/fleet-base/src/fleetdata.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/fleet-base/src/fleetdata.rs
@@ -0,0 +1,113 @@
+use std::{
+ collections::BTreeMap,
+ io::{self, Cursor},
+};
+
+use age::Recipient;
+use chrono::{DateTime, Utc};
+use fleet_shared::SecretData;
+use itertools::Itertools;
+use serde::{de::Error, Deserialize, Serialize};
+use serde_json::Value;
+
+#[derive(Serialize, Deserialize, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct HostData {
+ #[serde(default)]
+ #[serde(skip_serializing_if = "String::is_empty")]
+ pub encryption_key: String,
+}
+
+const VERSION: &str = "0.1.0";
+pub struct FleetDataVersion;
+impl Serialize for FleetDataVersion {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ VERSION.serialize(serializer)
+ }
+}
+impl<'de> Deserialize<'de> for FleetDataVersion {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ let version = String::deserialize(deserializer)?;
+ if version != VERSION {
+ return Err(D::Error::custom(format!(
+ "fleet.nix data version mismatch, expected {VERSION}, got {version}.\nFollow the docs for migration instruction"
+ )));
+ }
+ Ok(Self)
+ }
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct FleetData {
+ pub version: FleetDataVersion,
+
+ #[serde(default)]
+ pub hosts: BTreeMap<String, HostData>,
+ #[serde(default)]
+ #[serde(skip_serializing_if = "BTreeMap::is_empty")]
+ pub shared_secrets: BTreeMap<String, FleetSharedSecret>,
+ #[serde(default)]
+ #[serde(skip_serializing_if = "BTreeMap::is_empty")]
+ pub host_secrets: BTreeMap<String, BTreeMap<String, FleetSecret>>,
+
+ // extra_name => anything
+ #[serde(default)]
+ #[serde(skip_serializing_if = "BTreeMap::is_empty")]
+ pub extra: BTreeMap<String, Value>,
+}
+
+#[derive(Serialize, Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+#[must_use]
+pub struct FleetSharedSecret {
+ pub owners: Vec<String>,
+ #[serde(flatten)]
+ pub secret: FleetSecret,
+}
+
+/// Returns None if recipients.is_empty()
+pub fn encrypt_secret_data(
+ recipients: impl IntoIterator<Item = impl Recipient + Send + 'static>,
+ data: Vec<u8>,
+) -> Option<SecretData> {
+ let mut encrypted = vec![];
+ let recipients = recipients
+ .into_iter()
+ .map(|v| Box::new(v) as Box<dyn Recipient + Send>)
+ .collect_vec();
+ let mut encryptor = age::Encryptor::with_recipients(recipients)?
+ .wrap_output(&mut encrypted)
+ .expect("in memory write");
+ io::copy(&mut Cursor::new(data), &mut encryptor).expect("in memory copy");
+ encryptor.finish().expect("in memory flush");
+ Some(SecretData {
+ data: encrypted,
+ encrypted: true,
+ })
+}
+
+#[derive(Serialize, Deserialize, Clone)]
+pub struct FleetSecretPart {
+ pub raw: SecretData,
+}
+
+#[derive(Serialize, Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+#[must_use]
+pub struct FleetSecret {
+ #[serde(default = "Utc::now")]
+ pub created_at: DateTime<Utc>,
+ #[serde(default)]
+ #[serde(skip_serializing_if = "Option::is_none", alias = "expire_at")]
+ pub expires_at: Option<DateTime<Utc>>,
+
+ #[serde(flatten)]
+ pub parts: BTreeMap<String, FleetSecretPart>,
+}
crates/fleet-base/src/host.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/fleet-base/src/host.rs
@@ -0,0 +1,452 @@
+use std::{
+ cell::OnceCell,
+ ffi::{OsStr, OsString},
+ fmt::Display,
+ io::Write,
+ ops::Deref,
+ path::PathBuf,
+ str::FromStr,
+ sync::{Arc, Mutex, MutexGuard, OnceLock},
+};
+
+use anyhow::{anyhow, bail, ensure, Context, Result};
+use fleet_shared::SecretData;
+use nix_eval::{nix_go, nix_go_json, util::assert_warn, Value};
+use openssh::SessionBuilder;
+use serde::de::DeserializeOwned;
+use tempfile::NamedTempFile;
+
+use crate::{
+ command::MyCommand,
+ fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
+};
+
+pub struct FleetConfigInternals {
+ pub local_system: String,
+ pub directory: PathBuf,
+ pub data: Mutex<FleetData>,
+ pub nix_args: Vec<OsString>,
+ /// fleet_config.config
+ pub config_field: Value,
+ // TODO: Remove with connectivity refactor
+ pub localhost: String,
+
+ /// import nixpkgs {system = local};
+ pub default_pkgs: Value,
+}
+
+// TODO: Make field not pub
+#[derive(Clone)]
+pub struct Config(pub Arc<FleetConfigInternals>);
+
+impl Deref for Config {
+ type Target = FleetConfigInternals;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+#[derive(Clone, Copy, Debug)]
+pub enum EscalationStrategy {
+ Sudo,
+ Run0,
+ Su,
+}
+
+pub struct ConfigHost {
+ config: Config,
+ pub name: String,
+ groups: OnceCell<Vec<String>>,
+
+ pub host_config: Option<Value>,
+ pub nixos_config: OnceCell<Value>,
+
+ // TODO: Move command helpers away with connectivity refactor
+ pub local: bool,
+ pub session: OnceLock<Arc<openssh::Session>>,
+}
+// TODO: Move command helpers away with connectivity refactor
+impl ConfigHost {
+ pub async fn escalation_strategy(&self) -> Result<EscalationStrategy> {
+ // Prefer sudo, as run0 has some gotchas with polkit
+ // and too many repeating prompts.
+ if (self.find_in_path("sudo").await).is_ok() {
+ return Ok(EscalationStrategy::Sudo);
+ }
+ if (self.find_in_path("run0").await).is_ok() {
+ return Ok(EscalationStrategy::Run0);
+ }
+ Ok(EscalationStrategy::Su)
+ }
+ async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ assert!(!self.local, "do not open ssh connection to local session");
+ // FIXME: TOCTOU
+ if let Some(session) = &self.session.get() {
+ return Ok((*session).clone());
+ };
+ let session = SessionBuilder::default();
+ let session = session
+ .connect(&self.name)
+ .await
+ .map_err(|e| anyhow!("ssh error while connecting to {}: {e}", self.name))?;
+ let session = Arc::new(session);
+ self.session.set(session.clone()).expect("TOCTOU happened");
+ Ok(session)
+ }
+ pub async fn mktemp_dir(&self) -> Result<String> {
+ let mut cmd = self.cmd("mktemp").await?;
+ cmd.arg("-d");
+ let path = cmd.run_string().await?;
+ Ok(path.trim_end().to_owned())
+ }
+ pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+ let mut cmd = self.cmd("cat").await?;
+ cmd.arg(path);
+ cmd.run_bytes().await
+ }
+ pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+ let mut cmd = self.cmd("cat").await?;
+ cmd.arg(path);
+ cmd.run_string().await
+ }
+ pub async fn read_dir(&self, path: impl AsRef<OsStr>) -> Result<Vec<String>> {
+ let mut cmd = self.cmd("ls").await?;
+ cmd.arg(path);
+ let out = cmd.run_string().await?;
+ let mut lines = out.split('\n');
+ if let Some(last) = lines.next_back() {
+ ensure!(last.is_empty(), "output of ls should end with newline");
+ }
+ Ok(lines.map(ToOwned::to_owned).collect())
+ }
+ #[allow(dead_code)]
+ pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {
+ let text = self.read_file_text(path).await?;
+ Ok(serde_json::from_str(&text)?)
+ }
+ pub async fn read_env(&self, env: &str) -> Result<String> {
+ let mut cmd = self.cmd("printenv").await?;
+ cmd.arg(env);
+ cmd.run_string().await
+ }
+ pub async fn find_in_path(&self, command: &str) -> Result<String> {
+ // // `which` is not a part of coreutils, and it might not exist on machine.
+ // let path = self.read_env("PATH").await?;
+ // // Assuming delimiter is :, we don't work with windows host, this check will be much
+ // // more sophisticated in remowt backend (and quicker, since actual PATH search will be done on remote machine)
+ // for ele in path.split(':') {
+ // let test_path = format!("{ele}/{cmd}");
+ // test -x etc
+ // }
+ // let mut cmd = self.cmd("printenv").await?;
+ // cmd.arg(env);
+ // Ok(cmd.run_string().await?)
+ // Assuming this is an environment issue if which doesn't exist, will be fixed with remowt.
+ let mut cmd = self
+ .cmd_escalation(
+ // Not used
+ EscalationStrategy::Su,
+ "which",
+ )
+ .await?;
+ cmd.arg(command);
+ cmd.run_string().await
+ }
+ pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>
+ where
+ <D as FromStr>::Err: Display,
+ {
+ let text = self.read_file_text(path).await?;
+ D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
+ }
+ pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+ self.cmd_escalation(self.escalation_strategy().await?, cmd)
+ .await
+ }
+ pub async fn cmd_escalation(
+ &self,
+ escalation: EscalationStrategy,
+ cmd: impl AsRef<OsStr>,
+ ) -> Result<MyCommand> {
+ if self.local {
+ Ok(MyCommand::new(escalation, cmd))
+ } else {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(escalation, cmd, session))
+ }
+ }
+
+ pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
+ ensure!(data.encrypted, "secret is not encrypted");
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("decrypt").eqarg("--secret", data.to_string());
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;
+ ensure!(!data.encrypted, "secret came out encrypted");
+ Ok(data.data)
+ }
+ pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+ ensure!(data.encrypted, "secret is not encrypted");
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("reencrypt").eqarg("--secret", data.to_string());
+ for target in targets {
+ let key = self.config.key(&target).await?;
+ cmd.eqarg("--targets", key);
+ }
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;
+ ensure!(data.encrypted, "secret came out not encrypted");
+ Ok(data)
+ }
+ /// Returns path for futureproofing, as path might change i.e on conversion to CA
+ pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ if self.local {
+ // Path is located locally, thus already trusted.
+ return Ok(path.to_owned());
+ }
+ let mut nix = MyCommand::new(
+ // Not used
+ EscalationStrategy::Su,
+ "nix",
+ );
+ nix.arg("copy")
+ .arg("--substitute-on-destination")
+ .comparg("--to", format!("ssh-ng://{}", self.name))
+ .arg(path);
+ nix.run_nix().await.context("nix copy")?;
+ Ok(path.to_owned())
+ }
+ pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("stop").arg(name);
+ cmd.sudo().run().await
+ }
+ pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("start").arg(name);
+ cmd.sudo().run().await
+ }
+
+ pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+ let mut cmd = self.cmd("rm").await?;
+ cmd.arg("-f").arg(path);
+ if sudo {
+ cmd = cmd.sudo()
+ }
+ cmd.run().await
+ }
+}
+impl ConfigHost {
+ // TOCTOU is possible here in case if config is changed, but this case is not handled anywhere anyway,
+ // assuming getting tags always returns the same value.
+ pub async fn tags(&self) -> Result<Vec<String>> {
+ if let Some(v) = self.groups.get() {
+ return Ok(v.clone());
+ }
+ let Some(host_config) = &self.host_config else {
+ return Ok(vec![]);
+ };
+ let tags: Vec<String> = nix_go_json!(host_config.tags);
+
+ let _ = self.groups.set(tags.clone());
+
+ Ok(tags)
+ }
+ pub async fn nixos_config(&self) -> Result<Value> {
+ if let Some(v) = self.nixos_config.get() {
+ return Ok(v.clone());
+ }
+ let Some(host_config) = &self.host_config else {
+ bail!("local host has no nixos_config");
+ };
+ let nixos_config = nix_go!(host_config.nixos.config);
+ assert_warn("nixos config evaluation", &nixos_config).await?;
+
+ let _ = self.nixos_config.set(nixos_config.clone());
+
+ Ok(nixos_config)
+ }
+
+ pub async fn list_configured_secrets(&self) -> Result<Vec<String>> {
+ let nixos = self.nixos_config().await?;
+ let secrets = nix_go!(nixos.secrets);
+ let mut out = Vec::new();
+ for name in secrets.list_fields().await? {
+ let secret = nix_go!(secrets[{ name }]);
+ let is_shared: bool = nix_go_json!(secret.shared);
+ if is_shared {
+ continue;
+ }
+ out.push(name);
+ }
+ Ok(out)
+ }
+ pub async fn secret_field(&self, name: &str) -> Result<Value> {
+ let nixos = self.nixos_config().await?;
+ Ok(nix_go!(nixos.secrets[{ name }]))
+ }
+
+ /// Packages for this host, resolved with nixpkgs overlays
+ pub async fn pkgs(&self) -> Result<Value> {
+ let Some(host_config) = &self.host_config else {
+ bail!("local host has no host_config");
+ };
+ // TODO: Should nixos.options be cached?
+ Ok(nix_go!(host_config.nixos.options._module.args.value.pkgs))
+ }
+}
+
+impl Config {
+ pub fn local_host(&self) -> ConfigHost {
+ ConfigHost {
+ config: self.clone(),
+ name: "<virtual localhost>".to_owned(),
+ local: true,
+ session: OnceLock::new(),
+ host_config: None,
+ nixos_config: OnceCell::new(),
+ groups: {
+ let cell = OnceCell::new();
+ let _ = cell.set(vec![]);
+ cell
+ },
+ }
+ }
+
+ pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+ let config = &self.config_field;
+ let host_config = nix_go!(config.hosts[{ name }]);
+
+ Ok(ConfigHost {
+ config: self.clone(),
+ name: name.to_owned(),
+ host_config: Some(host_config),
+ nixos_config: OnceCell::new(),
+ groups: OnceCell::new(),
+
+ // TODO: Remove with connectivit refactor
+ local: self.localhost == name,
+ session: OnceLock::new(),
+ })
+ }
+ pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
+ let config = &self.config_field;
+ let names = nix_go!(config.hosts).list_fields().await?;
+ let mut out = vec![];
+ for name in names {
+ out.push(self.host(&name).await?);
+ }
+ Ok(out)
+ }
+ // TODO: Replace usages with .host().nixos_config
+ pub async fn system_config(&self, host: &str) -> Result<Value> {
+ let fleet_field = &self.config_field;
+ Ok(nix_go!(fleet_field.hosts[{ host }].nixos.config))
+ }
+
+ /// Shared secrets configured in fleet.nix or in flake
+ pub async fn list_configured_shared(&self) -> Result<Vec<String>> {
+ let config_field = &self.config_field;
+ Ok(nix_go!(config_field.sharedSecrets).list_fields().await?)
+ }
+ /// Shared secrets configured in fleet.nix
+ pub fn list_shared(&self) -> Vec<String> {
+ let data = self.data();
+ data.shared_secrets.keys().cloned().collect()
+ }
+ pub fn has_shared(&self, name: &str) -> bool {
+ let data = self.data();
+ data.shared_secrets.contains_key(name)
+ }
+ pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {
+ let mut data = self.data_mut();
+ data.shared_secrets.insert(name.to_owned(), shared);
+ }
+ pub fn remove_shared(&self, secret: &str) {
+ let mut data = self.data_mut();
+ data.shared_secrets.remove(secret);
+ }
+
+ pub fn list_secrets(&self, host: &str) -> Vec<String> {
+ let data = self.data();
+ let Some(secrets) = data.host_secrets.get(host) else {
+ return Vec::new();
+ };
+ secrets.keys().cloned().collect()
+ }
+
+ pub fn has_secret(&self, host: &str, secret: &str) -> bool {
+ let data = self.data();
+ let Some(host_secrets) = data.host_secrets.get(host) else {
+ return false;
+ };
+ host_secrets.contains_key(secret)
+ }
+ pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {
+ let mut data = self.data_mut();
+ let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
+ host_secrets.insert(secret, value);
+ }
+
+ pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
+ let data = self.data();
+ let Some(host_secrets) = data.host_secrets.get(host) else {
+ bail!("no secrets for machine {host}");
+ };
+ let Some(secret) = host_secrets.get(secret) else {
+ bail!("machine {host} has no secret {secret}");
+ };
+ Ok(secret.clone())
+ }
+ pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {
+ let data = self.data();
+ let Some(secret) = data.shared_secrets.get(secret) else {
+ bail!("no shared secret {secret}");
+ };
+ Ok(secret.clone())
+ }
+ pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {
+ let config_field = &self.config_field;
+ Ok(nix_go_json!(
+ config_field.sharedSecrets[{ secret }].expectedOwners
+ ))
+ }
+
+ // TODO: Should this be something modifiable from other processes?
+ // E.g terraform provider might want to update FleetData (e.g secrets),
+ // and current implementation assumes only one process holds current fleet.nix
+ // Given that it is no longer needs to be a file for nix evaluation,
+ // maybe it can be a .nix file for persistence, but accessible only
+ // thru some shared state controller? Might it be stored in terraform
+ // state provider?
+ pub fn data(&self) -> MutexGuard<FleetData> {
+ self.data.lock().unwrap()
+ }
+ pub fn data_mut(&self) -> MutexGuard<FleetData> {
+ self.data.lock().unwrap()
+ }
+ pub fn save(&self) -> Result<()> {
+ let mut tempfile = NamedTempFile::new_in(self.directory.clone()).context("failed to create updated version of fleet.nix in the same directory as original.\nDo you have write access to it? Access only to the fleet.nix won't be enough, the directory is used for atomic overwrite operation.\nIt is not recommended to use fleet by root anyway, move fleet project to your home directory.")?;
+ let data = nixlike::serialize(&self.data() as &FleetData)?;
+ tempfile.write_all(
+ format!(
+ "# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",
+ data
+ )
+ .as_bytes(),
+ )?;
+ let mut fleet_data_path = self.directory.clone();
+ fleet_data_path.push("fleet.nix");
+ tempfile.persist(fleet_data_path)?;
+ Ok(())
+ }
+}
crates/fleet-base/src/keys.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/fleet-base/src/keys.rs
@@ -0,0 +1,77 @@
+use std::str::FromStr as _;
+
+use age::Recipient;
+use anyhow::{anyhow, Result};
+use futures::{StreamExt as _, TryStreamExt as _};
+use itertools::Itertools as _;
+use tracing::warn;
+
+use crate::host::Config;
+
+impl Config {
+ pub fn cached_key(&self, host: &str) -> Option<String> {
+ let data = self.data();
+ let key = data.hosts.get(host).map(|h| &h.encryption_key);
+ if let Some(key) = key {
+ if key.is_empty() {
+ return None;
+ }
+ }
+ key.cloned()
+ }
+ pub fn update_key(&self, host: &str, key: String) {
+ let mut data = self.data_mut();
+ let host = data.hosts.entry(host.to_string()).or_default();
+ host.encryption_key = key.trim().to_string();
+ }
+
+ pub async fn key(&self, host: &str) -> anyhow::Result<String> {
+ if let Some(key) = self.cached_key(host) {
+ Ok(key)
+ } else {
+ warn!("Loading key for {}", host);
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
+ cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
+ Ok(key)
+ }
+ }
+ /// Insecure, requires root
+ pub async fn recipient(&self, host: &str) -> anyhow::Result<impl Recipient> {
+ let key = self.key(host).await?;
+ age::ssh::Recipient::from_str(&key).map_err(|e| anyhow!("parse recipient error: {:?}", e))
+ }
+
+ pub async fn recipients(&self, hosts: Vec<String>) -> Result<Vec<impl Recipient>> {
+ futures::stream::iter(hosts.iter())
+ .then(|m| self.recipient(m.as_ref()))
+ .try_collect::<Vec<_>>()
+ .await
+ }
+
+ #[allow(dead_code)]
+ pub async fn orphaned_data(&self) -> Result<Vec<String>> {
+ let mut out = Vec::new();
+ let host_names = self
+ .list_hosts()
+ .await?
+ .into_iter()
+ .map(|h| h.name)
+ .collect_vec();
+ for hostname in self
+ .data()
+ .hosts
+ .iter()
+ .filter(|(_, host)| !host.encryption_key.is_empty())
+ .map(|(n, _)| n)
+ {
+ if !host_names.contains(hostname) {
+ out.push(hostname.to_owned())
+ }
+ }
+
+ Ok(out)
+ }
+}
crates/fleet-base/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/fleet-base/src/lib.rs
@@ -0,0 +1,5 @@
+pub mod fleetdata;
+pub mod host;
+pub mod command;
+pub mod opts;
+mod keys;
crates/fleet-base/src/opts.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/fleet-base/src/opts.rs
@@ -0,0 +1,216 @@
+use std::{
+ collections::BTreeMap,
+ env::current_dir,
+ ffi::OsString,
+ str::FromStr,
+ sync::{Arc, Mutex},
+};
+
+use anyhow::Result;
+use clap::Parser;
+use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};
+use nom::{
+ bytes::complete::take_while1,
+ character::complete::char,
+ combinator::{map, opt},
+ multi::separated_list1,
+ sequence::{preceded, separated_pair},
+};
+
+use crate::{
+ fleetdata::FleetData,
+ host::{Config, ConfigHost, FleetConfigInternals},
+};
+
+#[derive(Clone)]
+pub enum HostItem {
+ Host {
+ name: String,
+ attrs: BTreeMap<String, String>,
+ },
+ Tag {
+ name: String,
+ attrs: BTreeMap<String, String>,
+ },
+}
+fn host_item_parser(input: &str) -> Result<HostItem, String> {
+ fn err_to_string(err: nom::Err<nom::error::Error<&str>>) -> String {
+ err.to_string()
+ }
+
+ let (input, is_tag) = map(opt(char('@')), |c| c.is_some())(input).map_err(err_to_string)?;
+ let (input, name) = map(
+ take_while1(|v| v != ',' && v != '?' && v != '@'),
+ str::to_owned,
+ )(input)
+ .map_err(err_to_string)?;
+
+ let kw_item = separated_pair(
+ map(take_while1(|v| v != '&' && v != '='), str::to_owned),
+ char('='),
+ map(take_while1(|v| v != '&'), str::to_owned),
+ );
+ let kw = map(separated_list1(char('&'), kw_item), |vec| {
+ vec.into_iter().collect::<BTreeMap<_, _>>()
+ });
+ let mut opt_kw = map(opt(preceded(char('?'), kw)), Option::unwrap_or_default);
+
+ let (input, attrs) = opt_kw(input).map_err(err_to_string)?;
+
+ if !input.is_empty() {
+ return Err(format!("unexpected trailing input: {input:?}"));
+ }
+ Ok(if is_tag {
+ HostItem::Tag { name, attrs }
+ } else {
+ HostItem::Host { name, attrs }
+ })
+}
+
+// TODO: Rename to HostSelector
+#[derive(Parser, Clone)]
+pub struct FleetOpts {
+ /// All hosts except those would be skipped
+ #[clap(long, number_of_values = 1, value_parser = host_item_parser)]
+ pub only: Vec<HostItem>,
+
+ /// Hosts to skip
+ #[clap(long, number_of_values = 1)]
+ pub skip: Vec<String>,
+
+ /// Host, which should be threaten as current machine
+ // TODO: Replace with connectivity refactor
+ #[clap(long, default_value_t = hostname::get().expect("unknown hostname").to_str().expect("hostname is not utf-8").to_owned())]
+ pub localhost: String,
+
+ /// Override detected system for host, to perform builds via
+ /// binfmt-declared qemu instead of trying to crosscompile
+ // TODO: Remove, as it is not used anymore.
+ #[clap(long, default_value = "detect")]
+ pub local_system: String,
+}
+
+impl FleetOpts {
+ pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {
+ if self.skip.iter().any(|h| h as &str == host.name) {
+ return Ok(true);
+ }
+ if self.only.is_empty() {
+ return Ok(false);
+ }
+ let mut have_group_matches = false;
+ for item in self.only.iter() {
+ match item {
+ HostItem::Host { name, .. } if *name == host.name => {
+ return Ok(false);
+ }
+ HostItem::Tag { .. } => {
+ have_group_matches = true;
+ }
+ _ => {}
+ }
+ }
+ if have_group_matches {
+ let host_tags = host.tags().await?;
+ for item in self.only.iter() {
+ match item {
+ HostItem::Tag { name, .. } if host_tags.contains(name) => {
+ return Ok(false);
+ }
+ _ => {}
+ }
+ }
+ }
+ Ok(true)
+ }
+ pub async fn action_attr<T: FromStr>(&self, host: &ConfigHost, attr: &str) -> Result<Option<T>>
+ where
+ T::Err: Sync,
+ anyhow::Error: From<T::Err>,
+ {
+ let str = self.action_attr_str(host, attr).await?;
+ Ok(str.map(|v| T::from_str(&v)).transpose()?)
+ }
+ pub async fn action_attr_str(&self, host: &ConfigHost, attr: &str) -> Result<Option<String>> {
+ if self.only.is_empty() {
+ return Ok(None);
+ }
+ let mut have_group_matches = false;
+ for item in self.only.iter() {
+ match item {
+ HostItem::Host { name, attrs }
+ if *name == host.name && attrs.contains_key(attr) =>
+ {
+ return Ok(attrs.get(attr).cloned());
+ }
+ HostItem::Tag { attrs, .. } if attrs.contains_key(attr) => {
+ have_group_matches = true;
+ }
+ _ => {}
+ }
+ }
+ if have_group_matches {
+ let host_tags = host.tags().await?;
+ for item in self.only.iter() {
+ match item {
+ HostItem::Tag { name, attrs }
+ if host_tags.contains(name) && attrs.contains_key(attr) =>
+ {
+ return Ok(attrs.get(attr).cloned());
+ }
+ _ => {}
+ }
+ }
+ }
+ Ok(None)
+ }
+ pub fn is_local(&self, host: &str) -> bool {
+ self.localhost == host
+ }
+
+ // TODO: Config should be detached from opts.
+ pub async fn build(&self, nix_args: Vec<OsString>) -> Result<Config> {
+ let directory = current_dir()?;
+
+ let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;
+ let root_field = pool.get().await?;
+
+ let builtins_field = Value::binding(root_field.clone(), "builtins").await?;
+ let local_system = if self.local_system == "detect" {
+ nix_go_json!(builtins_field.currentSystem)
+ } else {
+ self.local_system.clone()
+ };
+
+ let mut fleet_data_path = directory.clone();
+ fleet_data_path.push("fleet.nix");
+ let bytes = std::fs::read_to_string(fleet_data_path)?;
+ let data: Mutex<FleetData> = nixlike::parse_str(&bytes)?;
+
+ let fleet_root = Value::binding(root_field, "fleetConfigurations").await?;
+ let fleet_field = nix_go!(fleet_root.default({ data }));
+
+ let config_field = nix_go!(fleet_field.config);
+
+ assert_warn("fleet config evaluation", &config_field).await?;
+
+ let import = nix_go!(builtins_field.import);
+ let overlays = nix_go!(config_field.nixpkgs.overlays);
+ let nixpkgs = nix_go!(fleet_field.nixpkgs.buildUsing | import);
+
+ let default_pkgs = nix_go!(nixpkgs(Obj {
+ overlays,
+ system: { self.local_system.clone() },
+ }));
+
+ Ok(Config(Arc::new(FleetConfigInternals {
+ directory,
+ data,
+ local_system,
+ nix_args,
+ config_field,
+ default_pkgs,
+ localhost: self.localhost.to_owned(),
+ })))
+ }
+}
crates/fleet-shared/src/encoding.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/fleet-shared/src/encoding.rs
@@ -0,0 +1,156 @@
+use std::{
+ fmt::{self, Display},
+ str::FromStr,
+};
+
+use base64::engine::{general_purpose::STANDARD_NO_PAD, Engine};
+use serde::{de::Error, Deserialize, Deserializer, Serialize};
+use unicode_categories::UnicodeCategories;
+
+#[derive(Debug, PartialEq, Clone)]
+pub struct SecretData {
+ pub data: Vec<u8>,
+ pub encrypted: bool,
+}
+
+const BASE64_ENCODED_PREFIX: &str = "<BASE64-ENCODED>\n";
+const Z85_ENCODED_PREFIX: &str = "<Z85-ENCODED>\n";
+// Multiline text in Nix can only end with \n, which is not cool for actual single-line strings.
+const PLAINTEXT_NEWLINE_PREFIX: &str = "<PLAINTEXT-NL>\n";
+const PLAINTEXT_PREFIX: &str = "<PLAINTEXT>";
+
+const SECRET_PREFIX: &str = "<ENCRYPTED>";
+
+impl<'de> Deserialize<'de> for SecretData {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let string = String::deserialize(deserializer)?;
+ string.parse().map_err(D::Error::custom)
+ }
+}
+
+impl Serialize for SecretData {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ self.to_string().serialize(serializer)
+ }
+}
+
+impl FromStr for SecretData {
+ type Err = String;
+
+ fn from_str(string: &str) -> Result<Self, Self::Err> {
+ let (encrypted, string) = if let Some(unprefixed) = string.strip_prefix(SECRET_PREFIX) {
+ (true, unprefixed)
+ } else {
+ (false, string)
+ };
+ let data = if let Some(unprefixed) = string.strip_prefix(BASE64_ENCODED_PREFIX) {
+ STANDARD_NO_PAD
+ .decode(unprefixed.replace(|v| matches!(v, '\n' | '\t' | ' '), ""))
+ .map_err(|e| format!("base64-encoded failed: {e}"))?
+ } else if let Some(unprefixed) = string.strip_prefix(Z85_ENCODED_PREFIX) {
+ z85::decode(unprefixed.replace(|v| matches!(v, '\n' | '\t' | ' '), ""))
+ .map_err(|e| format!("z85-encoded failed: {e}"))?
+ } else if let Some(unprefixed) = string.strip_prefix(PLAINTEXT_NEWLINE_PREFIX) {
+ unprefixed.as_bytes().to_owned()
+ } else if let Some(unprefixed) = string.strip_prefix(PLAINTEXT_PREFIX) {
+ unprefixed.as_bytes().to_owned()
+ } else {
+ let secret_prefix = format!("{SECRET_PREFIX}{Z85_ENCODED_PREFIX}");
+ return Err(format!(
+ "unknown secret encoding. If you're migrating from old version of fleet, prefix public secret fields with {PLAINTEXT_PREFIX:?}, and encrypted data with {secret_prefix:?}: {string}"
+ ));
+ };
+ Ok(Self { data, encrypted })
+ }
+}
+
+impl Display for SecretData {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let mut readable = std::str::from_utf8(&self.data).ok();
+ if self.encrypted {
+ write!(f, "{SECRET_PREFIX}")?;
+ // Always base64-encode encrypted fields.
+ readable = None;
+ }
+ if Some(false) == readable.map(is_printable) {
+ readable = None
+ };
+ // TODO: Check if text is readable, and has no unprintable characters?..
+ if let Some(plaintext) = readable {
+ if plaintext.ends_with('\n') {
+ write!(f, "{PLAINTEXT_NEWLINE_PREFIX}")?;
+ } else {
+ write!(f, "{PLAINTEXT_PREFIX}")?;
+ }
+ write!(f, "{plaintext}")?;
+ } else {
+ write!(f, "{BASE64_ENCODED_PREFIX}")?;
+ let encoded = STANDARD_NO_PAD.encode(&self.data);
+ for ele in encoded.as_bytes().chunks(64) {
+ let chunk = std::str::from_utf8(ele).expect(
+ "any slice of base64-encoded text is utf-8 compatible, as it is ascii-based",
+ );
+ writeln!(f, "{chunk}")?;
+ }
+ };
+ Ok(())
+ }
+}
+
+fn is_printable(text: &str) -> bool {
+ text.chars().all(|c| {
+ c.is_letter()
+ || c.is_mark()
+ || c.is_number()
+ || c.is_punctuation()
+ || c.is_separator()
+ || c == '\n' || c == '\t'
+ // Complete base64 alphabet
+ || c == '/' || c == '+'
+ || c == '='
+ })
+}
+
+#[test]
+fn test() {
+ fn check_roundtrip(data: SecretData, expected: &str) {
+ let string = data.to_string();
+ assert_eq!(string, expected, "unexpected encoding");
+ let roundtrip: SecretData = string.parse().expect("roundtrip parse");
+ assert_eq!(data, roundtrip, "roundtrip didn't match");
+ }
+ check_roundtrip(
+ SecretData {
+ data: vec![1, 2, 3, 4, 5, 6],
+ encrypted: false,
+ },
+ "<BASE64-ENCODED>\nAQIDBAUG\n",
+ );
+ check_roundtrip(
+ SecretData {
+ data: vec![1, 2, 3, 4, 5, 6],
+ encrypted: true,
+ },
+ "<ENCRYPTED><BASE64-ENCODED>\nAQIDBAUG\n",
+ );
+ check_roundtrip(
+ SecretData {
+ data: "Привет, мир!\n".to_owned().into(),
+ encrypted: false,
+ },
+ "<PLAINTEXT-NL>\nПривет, мир!\n",
+ );
+ check_roundtrip(
+ SecretData {
+ data: "Привет, мир!".to_owned().into(),
+ encrypted: false,
+ },
+ "<PLAINTEXT>Привет, мир!",
+ );
+}
crates/fleet-shared/src/lib.rsdiffbeforeafterboth--- a/crates/fleet-shared/src/lib.rs
+++ b/crates/fleet-shared/src/lib.rs
@@ -1,156 +1,2 @@
-use std::{
- fmt::{self, Display},
- str::FromStr,
-};
-
-use base64::engine::{general_purpose::STANDARD_NO_PAD, Engine};
-use serde::{de::Error, Deserialize, Deserializer, Serialize};
-use unicode_categories::UnicodeCategories;
-
-#[derive(Debug, PartialEq, Clone)]
-pub struct SecretData {
- pub data: Vec<u8>,
- pub encrypted: bool,
-}
-
-const BASE64_ENCODED_PREFIX: &str = "<BASE64-ENCODED>\n";
-const Z85_ENCODED_PREFIX: &str = "<Z85-ENCODED>\n";
-// Multiline text in Nix can only end with \n, which is not cool for actual single-line strings.
-const PLAINTEXT_NEWLINE_PREFIX: &str = "<PLAINTEXT-NL>\n";
-const PLAINTEXT_PREFIX: &str = "<PLAINTEXT>";
-
-const SECRET_PREFIX: &str = "<ENCRYPTED>";
-
-impl<'de> Deserialize<'de> for SecretData {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: Deserializer<'de>,
- {
- let string = String::deserialize(deserializer)?;
- string.parse().map_err(D::Error::custom)
- }
-}
-
-impl Serialize for SecretData {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: serde::Serializer,
- {
- self.to_string().serialize(serializer)
- }
-}
-
-impl FromStr for SecretData {
- type Err = String;
-
- fn from_str(string: &str) -> Result<Self, Self::Err> {
- let (encrypted, string) = if let Some(unprefixed) = string.strip_prefix(SECRET_PREFIX) {
- (true, unprefixed)
- } else {
- (false, string)
- };
- let data = if let Some(unprefixed) = string.strip_prefix(BASE64_ENCODED_PREFIX) {
- STANDARD_NO_PAD
- .decode(unprefixed.replace(|v| matches!(v, '\n' | '\t' | ' '), ""))
- .map_err(|e| format!("base64-encoded failed: {e}"))?
- } else if let Some(unprefixed) = string.strip_prefix(Z85_ENCODED_PREFIX) {
- z85::decode(unprefixed.replace(|v| matches!(v, '\n' | '\t' | ' '), ""))
- .map_err(|e| format!("z85-encoded failed: {e}"))?
- } else if let Some(unprefixed) = string.strip_prefix(PLAINTEXT_NEWLINE_PREFIX) {
- unprefixed.as_bytes().to_owned()
- } else if let Some(unprefixed) = string.strip_prefix(PLAINTEXT_PREFIX) {
- unprefixed.as_bytes().to_owned()
- } else {
- let secret_prefix = format!("{SECRET_PREFIX}{Z85_ENCODED_PREFIX}");
- return Err(format!(
- "unknown secret encoding. If you're migrating from old version of fleet, prefix public secret fields with {PLAINTEXT_PREFIX:?}, and encrypted data with {secret_prefix:?}: {string}"
- ));
- };
- Ok(Self { data, encrypted })
- }
-}
-
-impl Display for SecretData {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- let mut readable = std::str::from_utf8(&self.data).ok();
- if self.encrypted {
- write!(f, "{SECRET_PREFIX}")?;
- // Always base64-encode encrypted fields.
- readable = None;
- }
- if Some(false) == readable.map(is_printable) {
- readable = None
- };
- // TODO: Check if text is readable, and has no unprintable characters?..
- if let Some(plaintext) = readable {
- if plaintext.ends_with('\n') {
- write!(f, "{PLAINTEXT_NEWLINE_PREFIX}")?;
- } else {
- write!(f, "{PLAINTEXT_PREFIX}")?;
- }
- write!(f, "{plaintext}")?;
- } else {
- write!(f, "{BASE64_ENCODED_PREFIX}")?;
- let encoded = STANDARD_NO_PAD.encode(&self.data);
- for ele in encoded.as_bytes().chunks(64) {
- let chunk = std::str::from_utf8(ele).expect(
- "any slice of base64-encoded text is utf-8 compatible, as it is ascii-based",
- );
- writeln!(f, "{chunk}")?;
- }
- };
- Ok(())
- }
-}
-
-fn is_printable(text: &str) -> bool {
- text.chars().all(|c| {
- c.is_letter()
- || c.is_mark()
- || c.is_number()
- || c.is_punctuation()
- || c.is_separator()
- || c == '\n' || c == '\t'
- // Complete base64 alphabet
- || c == '/' || c == '+'
- || c == '='
- })
-}
-
-#[test]
-fn test() {
- fn check_roundtrip(data: SecretData, expected: &str) {
- let string = data.to_string();
- assert_eq!(string, expected, "unexpected encoding");
- let roundtrip: SecretData = string.parse().expect("roundtrip parse");
- assert_eq!(data, roundtrip, "roundtrip didn't match");
- }
- check_roundtrip(
- SecretData {
- data: vec![1, 2, 3, 4, 5, 6],
- encrypted: false,
- },
- "<BASE64-ENCODED>\nAQIDBAUG\n",
- );
- check_roundtrip(
- SecretData {
- data: vec![1, 2, 3, 4, 5, 6],
- encrypted: true,
- },
- "<ENCRYPTED><BASE64-ENCODED>\nAQIDBAUG\n",
- );
- check_roundtrip(
- SecretData {
- data: "Привет, мир!\n".to_owned().into(),
- encrypted: false,
- },
- "<PLAINTEXT-NL>\nПривет, мир!\n",
- );
- check_roundtrip(
- SecretData {
- data: "Привет, мир!".to_owned().into(),
- encrypted: false,
- },
- "<PLAINTEXT>Привет, мир!",
- );
-}
+mod encoding;
+pub use encoding::SecretData;
crates/nix-eval/src/session.rsdiffbeforeafterboth1use std::{ffi::OsStr, num::ParseIntError, process::Stdio, sync::Arc};23use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler};4use futures::StreamExt;5use itertools::Itertools as _;6use serde::{de::DeserializeOwned, Deserialize};7use thiserror::Error;8use tokio::{9 io::AsyncWriteExt,10 process::{ChildStderr, ChildStdin, ChildStdout, Command},11 select,12 sync::{mpsc, oneshot, Mutex},13};14use tokio_util::codec::{FramedRead, LinesCodec};15use tracing::{debug, error, info, warn, Level};1617#[derive(Error, Debug)]18pub enum Error {19 #[error("failed to create nix repl session: {0}")]20 SessionInit(&'static str),21 #[error("unexpected end of output, nix crashed?")]22 MissingDelimiter,2324 #[error("expression did'nt produce any output")]25 ExpectedOutput,26 #[error("expression produced output, which is unexpected")]27 UnexpectedOutput,2829 #[error("unexpected expression output type")]30 InvalidType,3132 #[error("failed to build attr {attribute}:\n{error}")]33 BuildFailed { attribute: String, error: String },3435 #[error("output: {0}")]36 Json(#[from] serde_json::Error),37 // int outputs are too specific, and should not be used,38 // thus error is ok to be not informative.39 #[error("int output: {0}")]40 Int(ParseIntError),41 #[error("pool: {0}")]42 Pool(#[from] r2d2::Error),43 #[error("io: {0}")]44 Io(#[from] std::io::Error),4546 // TODO: Should be done by wrapper/in different type.47 #[error("at {0}: {1}")]48 InContext(String, Box<Self>),4950 #[error("error: {0}")]51 NixError(String),52}53impl Error {54 pub(crate) fn context(self, context: String) -> Self {55 Self::InContext(context, Box::new(self))56 }57}58pub type Result<T, E = Error> = std::result::Result<T, E>;5960enum OutputLine {61 Out(String),62 Err(String),63}64struct OutputHandler {65 rx: mpsc::Receiver<OutputLine>,66 _cancel_handle: oneshot::Receiver<()>,67}68impl OutputHandler {69 fn new(out: ChildStdout, err: ChildStderr) -> Self {70 let mut out = FramedRead::new(out, LinesCodec::new());71 let mut err = FramedRead::new(err, LinesCodec::new());72 let (tx, rx) = mpsc::channel(20);73 let (mut cancelled, _cancel_handle) = oneshot::channel();74 tokio::spawn(async move {75 loop {76 select! {77 // We should receive errors earlier than synchronization78 biased;79 e = err.next() => {80 let Some(Ok(e)) = e else {81 if e.is_some() {82 error!("bad repl stderr: {e:?}");83 }84 continue;85 };86 let _ = tx.send(OutputLine::Err(e)).await;87 }88 o = out.next() => {89 let Some(Ok(o)) = o else {90 if o.is_some() {91 error!("bad repl stdout: {o:?}");92 }93 continue;94 };95 let _ = tx.send(OutputLine::Out(o)).await;96 }97 // Reader doesn't care about stdout, as this is cancelled.98 // Error still might be useful, to process leftover span closures?99 _ = cancelled.closed() => {100 break;101 }102 }103 }104 });105 Self { rx, _cancel_handle }106 }107 async fn next(&mut self) -> Option<OutputLine> {108 self.rx.recv().await109 }110}111112#[must_use]113struct ErrorCollector<'i, H> {114 collected: Vec<String>,115 inner: &'i mut H,116}117impl<'i, H> ErrorCollector<'i, H> {118 fn new(inner: &'i mut H) -> Self {119 Self {120 collected: vec![],121 inner,122 }123 }124}125impl<H> ErrorCollector<'_, H> {126 fn handle_line_inner(&mut self, msg: &str) -> bool {127 let Some(msg) = msg.strip_prefix("@nix ") else {128 return false;129 };130 #[derive(Deserialize)]131 struct ErrorAction {132 action: String,133 level: u32,134 msg: String,135 }136 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {137 return false;138 };139 if act.action != "msg" || act.level != 0 {140 return false;141 }142 self.collected.push(act.msg);143 true144 }145 fn finish(self) -> Result<()> {146 // fn dedent(s: String) -> String {147 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)148 // }149 if !self.collected.is_empty() {150 return Err(Error::NixError(format!(151 "{}",152 self.collected153 .iter()154 .map(|v| {155 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {156 let v = unindent::unindent(f.trim_start());157 v.trim().to_owned()158 } else {159 v.to_owned()160 }161 })162 .join("\n"),163 )));164 }165 Ok(())166 }167 fn flush(self) {168 for line in self.collected {169 warn!("{line}");170 }171 }172}173impl<H: Handler> Handler for ErrorCollector<'_, H> {174 fn handle_line(&mut self, e: &str) {175 if self.handle_line_inner(e) {176 return;177 }178 self.inner.handle_line(e)179 }180}181182pub struct NixSessionInner {183 full_delimiter: String,184 nix_handler: ClonableHandler<NixHandler>,185 out: OutputHandler,186 stdin: ChildStdin,187 string_wrapping: (String, String),188 number_wrapping: (String, String),189190 executing_command: Arc<Mutex<()>>,191192 next_id: u32,193 pub(crate) free_list: Vec<u32>,194}195196/// Discover inter-message repl delimiter197const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";198/// Discover formatting around strings199const TRAIN_STRING: &str = "\"TRAIN_STRING\"";200/// Discover formatting around numbers201const TRAIN_NUMBER: &str = "13141516";202// Other types of formatting are not discovered, because they are not used, JSON serialization is used instead203// Techically, number training is also not required, because numbers can be converted to string too...204// Eh, I'll remove it later.205206impl NixSessionInner {207 pub(crate) async fn new(208 flake: &OsStr,209 extra_args: impl IntoIterator<Item = &OsStr>,210 ) -> Result<Self> {211 let mut cmd = Command::new("nix");212 cmd.arg("repl")213 .arg(flake)214 .arg("--log-format")215 .arg("internal-json");216 for arg in extra_args {217 cmd.arg(arg);218 }219 cmd.stdin(Stdio::piped());220 cmd.stdout(Stdio::piped());221 cmd.stderr(Stdio::piped());222 let cmd = cmd.spawn()?;223 let stdout = cmd.stdout.unwrap();224 let stderr = cmd.stderr.unwrap();225 let mut out = OutputHandler::new(stdout, stderr);226 let mut stdin = cmd.stdin.unwrap();227 // Standard repl hello doesn't work with internal-json logger228 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;229 stdin.write_all(b"\n").await?;230 stdin.flush().await?;231 let nix_handler = NixHandler::default();232 let mut full_delimiter = None;233 let mut errors = vec![];234 while let Some(line) = out.next().await {235 let line = match line {236 OutputLine::Out(o) => o,237 OutputLine::Err(_e) => {238 // Handle startup errors, but skip repl hello?239 errors.push(_e);240 continue;241 }242 };243 if line.contains(REPL_DELIMITER) {244 debug!("discovered repl delimiter with added colors: {line}");245 full_delimiter = Some(line.to_owned());246 break;247 }248 }249 let Some(full_delimiter) = full_delimiter else {250 for e in errors {251 error!("{e}");252 }253 return Err(Error::SessionInit("failed to discover delimiter"));254 };255 let mut res = Self {256 full_delimiter,257 nix_handler: ClonableHandler::new(nix_handler),258 out,259 stdin,260 string_wrapping: Default::default(),261 number_wrapping: Default::default(),262263 executing_command: Arc::new(Mutex::new(())),264265 next_id: 0,266 free_list: vec![],267 };268 res.train().await?;269 Ok(res)270 }271 async fn train(&mut self) -> Result<()> {272 {273 let full_string = self274 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)275 .await?;276 let string_offset = full_string.find(TRAIN_STRING).expect("contained");277 let string_prefix = &full_string[..string_offset];278 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];279 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());280 }281 {282 let full_number = self283 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)284 .await?;285 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");286 let number_prefix = &full_number[..number_offset];287 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];288 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());289 }290 Ok(())291 }292 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {293 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {294 let cmd_str = String::from_utf8_lossy(cmd.as_ref());295 tracing::debug!("{cmd_str}");296 };297 self.stdin.write_all(cmd.as_ref()).await?;298 self.stdin.write_all(b"\n").await?;299 Ok(())300 }301 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {302 let mut out = String::new();303 while let Some(line) = self.out.next().await {304 let line = match line {305 OutputLine::Out(out) => out,306 OutputLine::Err(err) => {307 err_handler.handle_line(&err);308 continue;309 }310 };311 if line == self.full_delimiter {312 return Ok(out);313 }314 if !out.is_empty() {315 out.push('\n');316 }317 out.push_str(&line);318 }319 return Err(Error::MissingDelimiter);320 }321 pub(crate) async fn execute_expression_number(322 &mut self,323 expr: impl AsRef<[u8]>,324 ) -> Result<u64> {325 let num = self.number_wrapping.clone();326 let n = self.execute_expression_wrapping(expr, &num).await?;327 n.parse::<u64>().map_err(Error::Int)328 }329 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {330 // builtins.toJSON escapes some thing in incorrect way, e.g escaped "$" in "\${" is being outputed as "\$",331 // while this escape should be removed as it is intended for nix itself, not for json output.332 //333 // This regex only allows \$ in the beginning of the string, it is easier to implement correctly.334 // TODO: Add peg parser for nix-produced JSON?..335 let regex = regex::Regex::new(r#"(?<prefix>[: {,\[]\\")\\\$"#).expect("fixup json");336337 let num = self.string_wrapping.clone();338 let n = self.execute_expression_wrapping(expr, &num).await?;339 let n = regex.replace_all(&n, "$prefix$$");340 let str: String = serde_json::from_str(&n)?;341 Ok(str)342 }343 pub(crate) async fn execute_expression_to_json<V: DeserializeOwned>(344 &mut self,345 expr: impl AsRef<[u8]>,346 ) -> Result<V> {347 let mut fexpr = b"builtins.toJSON (".to_vec();348 fexpr.extend_from_slice(expr.as_ref());349 fexpr.push(b')');350 let s = String::from_utf8_lossy(expr.as_ref());351 let v = self.execute_expression_string(fexpr).await?;352 Ok(serde_json::from_str(&v)?)353 }354 async fn execute_expression_wrapping(355 &mut self,356 expr: impl AsRef<[u8]>,357 wrapping: &(String, String),358 ) -> Result<String> {359 let mut nix_handler = self.nix_handler.clone();360 let mut collected = ErrorCollector::new(&mut nix_handler);361 let res = self.execute_expression_raw(expr, &mut collected).await?;362 if res.is_empty() {363 collected.finish()?;364 return Err(Error::ExpectedOutput);365 } else {366 collected.flush()367 };368 let Some(res) = res.strip_prefix(&wrapping.0) else {369 return Err(Error::InvalidType);370 };371 let Some(res) = res.strip_suffix(&wrapping.1) else {372 return Err(Error::InvalidType);373 };374 Ok(res.to_owned())375 }376 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {377 let mut nix_handler = self.nix_handler.clone();378 let mut collected = ErrorCollector::new(&mut nix_handler);379 let v = self.execute_expression_raw(expr, &mut collected).await?;380 collected.finish()?;381 if !v.is_empty() {382 return Err(Error::UnexpectedOutput);383 }384 Ok(())385 }386 pub(crate) async fn execute_expression_raw(387 &mut self,388 expr: impl AsRef<[u8]>,389 err_handler: &mut dyn Handler,390 ) -> Result<String> {391 // Prevent two commands from being executed in parallel, messing with each other.392 let _lock = self.executing_command.clone();393 let _guard = _lock.lock().await;394395 self.send_command(expr).await?;396 // It will be echoed397 self.send_command(REPL_DELIMITER).await?;398 self.read_until_delimiter(err_handler).await399 }400 pub(crate) async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {401 let id = self.allocate_id();402 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))403 .await?;404 Ok(id)405 }406407 /// Id should be immediately used408 fn allocate_id(&mut self) -> u32 {409 if let Some(free) = self.free_list.pop() {410 free411 } else {412 let v = self.next_id;413 self.next_id += 1;414 v415 }416 }417 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.418 // async fn free_id(&mut self, id: u32) -> Result<()> {419 // self.execute_expression_empty(format!("sess_field_{id} = null"))420 // .await?;421 // self.free_list.push(id);422 // Ok(())423 // }424}crates/remowt-fs/Cargo.tomldiffbeforeafterboth--- a/crates/remowt-fs/Cargo.toml
+++ /dev/null
@@ -1,8 +0,0 @@
-[package]
-name = "remowt-fs"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
crates/remowt-fs/src/lib.rsdiffbeforeafterboth--- a/crates/remowt-fs/src/lib.rs
+++ /dev/null
@@ -1 +0,0 @@
-trait RemowtFS {}
flake.nixdiffbeforeafterboth--- a/flake.nix
+++ b/flake.nix
@@ -116,6 +116,7 @@
bacon
nil
];
+ env.PROTOC = "${pkgs.protobuf}/bin/protoc";
};
};
# fleet-install-secrets will not be built normally, because they are not ran directly by user most of the time.
modules/extras/tf.nixdiffbeforeafterboth--- a/modules/extras/tf.nix
+++ b/modules/extras/tf.nix
@@ -1,26 +1,45 @@
{
config,
lib,
+ fleetLib,
inputs,
...
}: let
- inherit (lib) mkOption;
- inherit (lib.types) deferredModule;
+ inherit (lib.options) mkOption;
+ inherit (lib.types) deferredModule attrsOf unspecified;
+ inherit (fleetLib.options) mkDataOption;
in {
- options.tf = mkOption {
- type = deferredModule;
- apply = module: system:
- inputs.terranix.lib.terranixConfigurationAst {
- inherit system;
- pkgs = config.nixpkgs.buildUsing.legacyPackages.${system};
- modules = [module];
+ options = {
+ tf = mkOption {
+ type = deferredModule;
+ apply = module: system:
+ inputs.terranix.lib.terranixConfiguration {
+ inherit system;
+ pkgs = config.nixpkgs.buildUsing.legacyPackages.${system};
+ modules = [
+ module
+ ];
+ };
+ };
+ data = mkDataOption {
+ # host => hostData
+ options.extra.terraformHosts = mkOption {
+ default = {};
+ type = attrsOf (attrsOf unspecified);
+ description = "Hosts data provided by fleet tf";
};
+ };
};
- config.tf.output.fleet = {
- value = {
- managed = true;
+
+ config = {
+ tf.output.fleet = {
+ value = {
+ managed = true;
+ };
+ # Just to avoid printing this attribute on every apply, the whole fleet attribute
+ # will be somehow processed by fleet tf.
+ sensitive = true;
};
- # Just to avoid printing this attribute on every apply.
- sensitive = true;
+ hosts = config.data.extra.terraformHosts;
};
}
modules/secrets-data.nixdiffbeforeafterboth--- a/modules/secrets-data.nix
+++ b/modules/secrets-data.nix
@@ -6,8 +6,8 @@
}: let
inherit (fleetLib.options) mkDataOption;
inherit (lib.options) mkOption;
- inherit (lib.types) lazyAttrsOf nullOr listOf str attrsOf submodule bool;
- inherit (lib.attrsets) mapAttrsToList mapAttrs catAttrs filterAttrs genAttrs;
+ inherit (lib.types) nullOr listOf str attrsOf submodule bool;
+ inherit (lib.attrsets) mapAttrsToList mapAttrs filterAttrs genAttrs;
inherit (lib.lists) sort unique concatLists;
inherit (lib.strings) toJSON;