nixpkgs/nixos/tests/acme/python-utils.py
Christian Theune 2d0a489125 nixos/acme: improve scalability - reduce superfluous unit activations
The previous setup caused all renewal units to be triggered upon
ever so slight changes in config. In larger setups (100+ certificates)
adding a new certificate caused high system load and/or large memory
consumption issues. The memory issues are already a alleviated with
the locking mechanism. However, this then causes long delays upwards
of multiple minutes depending on individual runs and also caused
superfluous activations.

In this change we streamline the overall setup of units:

1. The unit that other services can depend upon is 'acme-{cert}.service'.
We call this the 'base unit'. As this one as `RemainAfterExit` set
the `acme-finished-{cert}` targets are not required any longer.

2. We now always generate initial self-signed certificates to simplify
the dependency structure. This deprecates the `preliminarySelfsigned`
option.

3. The `acme-order-renew-{cert}` service gets activated after the base
unit and services using certificates have started and performs all acme
interactions. When it finishes others services (like web servers) will
be notified through the `reloadServices` option or they can use
`wantedBy` and `after` dependencies if they implement their own reload
units.

The renewal timer also triggers this unit.

4. The timer unit is explicitly blocked from being started by s-t-c.

5. Permission management has been cleaned up a bit: there was an
   inconsistency between having the .lego files set to 600 vs 640
   on the exposed side. This is unified to 640 now.

6. Exempt the account target from being restarted by s-t-c. This will
   happen automatically if something relevant to the account changes.
2025-08-08 16:28:42 +02:00

168 lines
5.8 KiB
Python

#!/usr/bin/env python3
import time
TOTAL_RETRIES = 20
# BackoffTracker provides a robust system for handling test retries
class BackoffTracker:
delay = 1
increment = 1
def handle_fail(self, retries, message) -> int:
assert retries < TOTAL_RETRIES, message
print(f"Retrying in {self.delay}s, {retries + 1}/{TOTAL_RETRIES}")
time.sleep(self.delay)
# Only increment after the first try
if retries == 0:
self.delay += self.increment
self.increment *= 2
return retries + 1
def protect(self, func):
def wrapper(*args, retries: int = 0, **kwargs):
try:
return func(*args, **kwargs)
except Exception as err:
retries = self.handle_fail(retries, err.args)
return wrapper(*args, retries=retries, **kwargs)
return wrapper
backoff = BackoffTracker()
def run(node, cmd, fail=False):
if fail:
return node.fail(cmd)
else:
return node.succeed(cmd)
# Waits for the system to finish booting or switching configuration
def wait_for_running(node):
node.succeed("systemctl is-system-running --wait")
# On first switch, this will create a symlink to the current system so that we can
# quickly switch between derivations
def switch_to(node, name, fail=False) -> None:
root_specs = "/tmp/specialisation"
node.execute(
f"test -e {root_specs}"
f" || ln -s $(readlink /run/current-system)/specialisation {root_specs}"
)
switcher_path = (
f"/run/current-system/specialisation/{name}/bin/switch-to-configuration"
)
rc, _ = node.execute(f"test -e '{switcher_path}'")
if rc > 0:
switcher_path = f"/tmp/specialisation/{name}/bin/switch-to-configuration"
cmd = f"{switcher_path} test"
run(node, cmd, fail=fail)
if not fail:
wait_for_running(node)
# Ensures the issuer of our cert matches the chain
# and matches the issuer we expect it to be.
# It's a good validation to ensure the cert.pem and fullchain.pem
# are not still selfsigned after verification
@backoff.protect
def check_issuer(node, cert_name, issuer) -> None:
for fname in ("cert.pem", "fullchain.pem"):
actual_issuer = node.succeed(
f"openssl x509 -noout -issuer -in /var/lib/acme/{cert_name}/{fname}"
).partition("=")[2]
assert (
issuer.lower() in actual_issuer.lower()
), f"{fname} issuer mismatch. Expected {issuer} got {actual_issuer}"
# Ensures the provided domain matches with the given cert
def check_domain(node, cert_name, domain, fail=False) -> None:
cmd = f"openssl x509 -noout -checkhost '{domain}' -in /var/lib/acme/{cert_name}/cert.pem"
run(node, cmd, fail=fail)
# Ensures the required values for OCSP stapling are present
# Pebble doesn't provide a full OCSP responder, so just checks the URL
def check_stapling(node, cert_name, ca_domain, fail=False):
rc, _ = node.execute(
f"openssl x509 -noout -ocsp_uri -in /var/lib/acme/{cert_name}/cert.pem"
f" | grep -i 'http://{ca_domain}:4002' 2>&1",
)
assert rc == 0 or fail, "Failed to find OCSP URI in issued certificate"
run(
node,
f"openssl x509 -noout -ext tlsfeature -in /var/lib/acme/{cert_name}/cert.pem"
f" | grep -iv 'no extensions' 2>&1",
fail=fail,
)
# Checks the keyType by validating the number of bits
def check_key_bits(node, cert_name, bits, fail=False):
run(
node,
f"openssl x509 -noout -text -in /var/lib/acme/{cert_name}/cert.pem"
f" | grep -i Public-Key | grep {bits} | tee /dev/stderr",
fail=fail,
)
# Ensure cert comes before chain in fullchain.pem
def check_fullchain(node, cert_name):
cert_file = f"/var/lib/acme/{cert_name}/fullchain.pem"
num_certs = node.succeed(f"grep -o 'END CERTIFICATE' {cert_file}")
assert len(num_certs.strip().split("\n")) > 1, "Insufficient certs in fullchain.pem"
first_cert_data = node.succeed(
f"grep -m1 -B50 'END CERTIFICATE' {cert_file}"
" | openssl x509 -noout -text"
)
for line in first_cert_data.lower().split("\n"):
if "dns:" in line:
print(f"First DNSName in fullchain.pem: {line}")
assert cert_name.lower() in line, f"{cert_name} not found in {line}"
return
assert False
# Checks the permissions in the cert directories are as expected
def check_permissions(node, cert_name, group):
stat = "stat -L -c '%a %U %G' "
node.succeed(
f"test $({stat} /var/lib/acme/{cert_name}/*.pem"
f" | tee /dev/stderr | grep -v '640 acme {group}' | wc -l) -eq 0"
)
node.execute(f"ls -lahR /var/lib/acme/.lego/{cert_name}/* > /dev/stderr")
node.succeed(
f"test $({stat} /var/lib/acme/.lego/{cert_name}/*/{cert_name}*"
f" | tee /dev/stderr | grep -v '640 acme {group}' | wc -l) -eq 0"
)
node.succeed(
f"test $({stat} /var/lib/acme/{cert_name}"
f" | tee /dev/stderr | grep -v '750 acme {group}' | wc -l) -eq 0"
)
node.succeed(
f"test $(find /var/lib/acme/.lego/accounts -type f -exec {stat} {{}} \\;"
f" | tee /dev/stderr | grep -v '600 acme {group}' | wc -l) -eq 0"
)
@backoff.protect
def download_ca_certs(node, ca_domain):
node.succeed(f"curl https://{ca_domain}:15000/roots/0 > /tmp/ca.crt")
node.succeed(f"curl https://{ca_domain}:15000/intermediate-keys/0 >> /tmp/ca.crt")
@backoff.protect
def check_connection(node, domain, fail=False, minica=False):
cafile = "/tmp/ca.crt"
if minica:
cafile = "/var/lib/acme/.minica/cert.pem"
run(node,
f"openssl s_client -brief -CAfile {cafile}"
f" -verify 2 -verify_return_error -verify_hostname {domain}"
f" -servername {domain} -connect {domain}:443 < /dev/null",
fail=fail,
)