import os import tempfile from collections import defaultdict from pathlib import Path import click import sysrsync import yaml from . import certs, sysaction, templating from .error import NccException CONFIG = { "main_config": "nginx.conf", "conf_dir": ".", "targets": [], "dehydrated_dir": "dehydrated", "sites_dir": "sites", } class Step: """Fancy printing of steps with option to hide the details after it finishes""" def __init__(self, text, hide=False): self.text = text self.lines = 1 self.on_start = True self.hide = hide def echo(self, message, *args, **kwargs) -> None: nl = int(kwargs.get("nl", 1)) self.lines += message.count("\n") + nl if self.on_start: click.echo(" => ", nl=False) self.on_start = nl or (bool(message) and message[-1] == "\n") click.echo("\n => ".join(message.splitlines()), *args, **kwargs) def __enter__(self) -> "Step": click.secho("[ ] " + self.text) return self def __exit__(self, exc, _exc2, _exc3) -> None: click.echo( f'\r\x1b[{self.lines}F[{click.style("K" if not exc else "E", fg="green" if not exc else "red")}' ) if self.hide and not exc: click.echo("\x1b[0J", nl=False) elif self.lines > 1: click.echo(f"\x1b[{self.lines-1}E", nl=False) def load_config(): cwd = Path(os.getcwd()) conf_file = None root = Path(cwd.root) while cwd != root: conf_file = cwd / "ncc.yml" if conf_file.exists(): os.chdir(cwd) break else: conf_file = None cwd = cwd.parent if not conf_file: click.echo( "Failed to find ncc configuration (searched all parent folders)", err=True ) exit(1) with conf_file.open() as f: if yml := yaml.safe_load(f): CONFIG.update(yml) os.chdir(CONFIG["conf_dir"]) @click.group() def cli(): """Update the nginx cluster configuration MUST BE RAN ON MASTER (will detect automatically) """ load_config() @cli.command() @click.option( "--dehydrated-only", type=bool, is_flag=True, help="Only fetches and deploys new certificates", ) @click.option("--skip-master-check", type=bool, is_flag=True) def up(dehydrated_only: bool, skip_master_check: bool): """Deploy the configuration to the cluster Does the following: 1. generates ssl config in genssl folder 2. runs dehydrated 3. checks the configuration 4. deploys to the cluster """ if not skip_master_check and not sysaction.is_keepalived_master(): click.echo("Refusing to start. Not running on master.", err=True) exit(1) with Step("Preparing configuration..."): dehydrated_dir = Path(CONFIG["dehydrated_dir"]) if not dehydrated_only: certs.generate_ssl( Path(CONFIG["main_config"]), Path(CONFIG["dehydrated_dir"]) / "domains.txt", ) ec, stdout = sysaction.run_shell( (str(dehydrated_dir / "dehydrated.sh"), "-c"), window_height=5 ) if ec != 0: log = Path(tempfile.mktemp()) log.write_text(stdout) raise NccException( f"dehydrated returned {ec} (log: {click.format_filename(log)})" ) ec, stdout = sysaction.run_shell( ("nginx", "-t", "-p", ".", "-c", CONFIG["main_config"]), window_height=5 ) if ec != 0: raise NccException("configuration did not pass nginx test", log=stdout) with Step("Deploying to cluster...") as step: for target in CONFIG["targets"]: step.echo('deploying to "' + target + '"', nl=False) try: sysrsync.run( source=os.getcwd(), destination=target, exclusions=[str(dehydrated_dir / "archive/**")], options=["-a", "--delete"], ) except Exception as e: step.echo(click.style(" E", fg="red")) raise NccException("failed to rsync configuration") from e # technically we could be misinterpreting a path here... if (first_part := target.split("/")[0]) and ( remote := first_part.split(":")[0] ): ec, stdout = sysaction.run_shell(("ssh", "-T", remote, "echo 1")) else: ec, stdout = sysaction.run_shell(("echo", "1")) if ec != 0: step.echo(click.style(" E", fg="red")) raise NccException("failed to reload nginx", log=stdout) step.echo(click.style(" K", fg="green")) @cli.command() def test(): """Run nginx -t on the configuration""" ec, stdout = sysaction.run_shell( ("nginx", "-t", "-p", ".", "-c", CONFIG["main_config"]) ) click.echo(stdout, nl=False) exit(ec) def shell_complete_service(_ctx, _param, incomplete): load_config() return [ site for _, site in certs.get_sites(Path(CONFIG["main_config"])) if incomplete in site ] @cli.command() @click.argument("site", shell_complete=shell_complete_service) def edit(site: str): """Edit a site""" file = next( (f for f, s in certs.get_sites(Path(CONFIG["main_config"])) if site == s), None ) if not file or not file.exists(): raise NccException("could not find site") click.edit(filename=str(file)) @cli.command("list") def list_(): """List all sites and the files they are located in""" file_sites = defaultdict(list) for f, s in certs.get_sites(Path(CONFIG["main_config"])): file_sites[f].append(s) for file, sites in file_sites.items(): click.echo(str(file) + ": " + " ".join(sites)) @cli.command() def new(): """Create a new site""" sites_dir = Path(CONFIG["sites_dir"]) if not sites_dir.exists(): raise NccException("sites_dir does not exist") templates = [f.name for f in Path("templates").glob("*.conf")] templates.sort() printed = len(templates) for i, t in enumerate(templates): click.echo(f" {i+1}) {t}") while True: try: printed += 1 choice = int(input("Template number: ")) - 1 except ValueError: continue if 0 <= choice < len(templates): break click.echo(f"\r\x1b[{printed}F\x1b[0J", nl=False) click.secho(f"=== {templates[choice]}", bold=True) name, filled = templating.fill_template( Path(f"templates/{templates[choice]}").read_text() ) if not name: name = input("File name (without .conf): ").removesuffix(".conf") while not name or (file := sites_dir / (name + ".conf")).exists(): if name and click.confirm( f"File {click.format_filename(file)} already exists. Overwrite?" ): break name = input("File name (without .conf): ").removesuffix(".conf") file.write_text(filled) click.echo(f"Site written to {click.format_filename(file)}") if click.confirm("Continue editing?"): click.edit(filename=str(file))