Source code for grafanarmadillo.migrate

"""Migrate from Classic to Unified alerting."""
import contextlib
import datetime
import logging
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import Dict

import docker
import requests
from docker.models.containers import Container

from grafanarmadillo.bulk import BulkExporter
from grafanarmadillo.templator import Templator


l = logging.getLogger(__name__)


[docs]@dataclass class DockerContainer: """Handle on a docker container.""" container: Container image: str host_port: int @property def status(self): """Get the status of the docker container.""" return self.container.status
[docs]def start_container(image_name, volume_path: Path, environment_vars: Dict[str, str]): """Start a container.""" client = docker.from_env() volumes = {str(volume_path): {"bind": "/var/lib/grafana/grafana.db", "mode": "rw"}} container = client.containers.run( image_name, detach=True, ports={"3000/tcp": 0}, volumes=volumes, environment=environment_vars, ) container.reload() host_port = container.attrs["NetworkSettings"]["Ports"]["3000/tcp"][0]["HostPort"] return DockerContainer(container, image_name, int(host_port))
[docs]def read_container_logs(container: DockerContainer): """Read logs from the docker container.""" return container.container.logs().decode("utf-8")
[docs]def stop_container(container: DockerContainer): """Stop container.""" container.container.stop()
[docs]def exec_in_container(container: DockerContainer, command: str): """Execute a command in a running docker container.""" result = container.container.exec_run(command) return result.output.decode("utf-8").strip()
[docs]@contextlib.contextmanager def with_container(image_name, volume_path: Path, environment_vars: Dict[str, str]): """Context manager for Grafana docker container.""" container = start_container(image_name, volume_path, environment_vars) try: yield container finally: stop_container(container)
DEFAULT_TIMEOUT = datetime.timedelta(seconds=300) def _wait_until_ready( container: DockerContainer, timeout: datetime.timedelta = DEFAULT_TIMEOUT, ): """Wait until container's readiness check passes.""" start = datetime.datetime.now() end = start + timeout l.debug(f"waiting for container to be ready start={start}, end={end}, timeout={timeout}") while True: if datetime.datetime.now() > end: raise RuntimeError( f"Could not connect to container in {timeout} logs={read_container_logs(container)}" ) try: if requests.get(f"http://localhost:{container.host_port}/api/health").ok: break except ( ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.HTTPError, ): pass
[docs]def migrate( cfg: dict, grafana_image: str, grafana_db: Path, output_directory: Path, templator: Templator, extra_env_vars: Dict[str, str] = None, grafana_uid: int = 472, timeout: datetime.timedelta = DEFAULT_TIMEOUT, clone_db: bool = True ) -> None: """Migrate from classic to Unified alerting.""" extra_env_vars = extra_env_vars or {} if clone_db: new_db = grafana_db.with_name("migrated").absolute() l.debug(f"cloning db from={grafana_image} to={new_db}") shutil.copyfile(grafana_db, new_db) if not new_db.stat().st_uid == grafana_uid: try: import subprocess subprocess.run(["sudo", "chown", str(grafana_uid), new_db.as_posix()]) except PermissionError: l.warning(f"Could not change owner of Grafana DB. expected={grafana_uid} actual={new_db.stat().st_uid} permissions={oct(new_db.stat().st_mode)}") else: new_db = grafana_db l.debug("begin migrating") with with_container(grafana_image, new_db, extra_env_vars) as container: if container.status != "running": raise RuntimeError(f"Could not start Grafana container {container=}") l.info("wait for migrations to apply") _wait_until_ready(container, timeout=timeout) l.info("migrations applied") l.info("export dashboards") exporter = BulkExporter( {**cfg, **{ "host": "localhost", "port": container.host_port, }}, output_directory, templator=templator, ) exporter.run() l.info("export dashboards complete")