Source code for ops.testing

# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Infrastructure to build unit tests for charms using the ops library."""


import dataclasses
import datetime
import fnmatch
import http
import inspect
import io
import ipaddress
import os
import pathlib
import random
import shutil
import signal
import tempfile
import typing
import uuid
import warnings
from contextlib import contextmanager
from io import BytesIO, IOBase, StringIO
from textwrap import dedent
from typing import (
    Any,
    AnyStr,
    BinaryIO,
    Callable,
    Dict,
    Generic,
    Iterable,
    List,
    Literal,
    Mapping,
    Optional,
    Sequence,
    Set,
    TextIO,
    Tuple,
    Type,
    TypedDict,
    TypeVar,
    Union,
    cast,
)

from ops import charm, framework, model, pebble, storage
from ops._private import yaml
from ops.charm import CharmBase, CharmMeta, RelationRole
from ops.model import Container, RelationNotFoundError, _NetworkDict
from ops.pebble import ExecProcess

ReadableBuffer = Union[bytes, str, StringIO, BytesIO, BinaryIO]
_StringOrPath = Union[str, pathlib.PurePosixPath, pathlib.Path]
_FileKwargs = TypedDict('_FileKwargs', {
    'permissions': Optional[int],
    'last_modified': datetime.datetime,
    'user_id': Optional[int],
    'user': Optional[str],
    'group_id': Optional[int],
    'group': Optional[str],
})

_RelationEntities = TypedDict('_RelationEntities', {
    'app': str,
    'units': List[str]
})

_StatusName = Literal['unknown', 'blocked', 'active', 'maintenance', 'waiting']
_RawStatus = TypedDict('_RawStatus', {
    'status': _StatusName,
    'message': str,
})
_ConfigOption = TypedDict('_ConfigOption', {
    'type': Literal['string', 'int', 'float', 'boolean', 'secret'],
    'description': str,
    'default': Union[str, int, float, bool],
})
_RawConfig = TypedDict("_RawConfig", {'options': Dict[str, _ConfigOption]})


# YAMLStringOrFile is something like metadata.yaml or actions.yaml. You can
# pass in a file-like object or the string directly.
YAMLStringOrFile = Union[str, TextIO]


# An instance of an Application or Unit, or the name of either.
# This is done here to avoid a scoping issue with the `model` property
# of the Harness class below.
AppUnitOrName = Union[str, model.Application, model.Unit]


# CharmType represents user charms that are derived from CharmBase.
CharmType = TypeVar('CharmType', bound=charm.CharmBase)


[docs]@dataclasses.dataclass class ExecArgs: """Represent arguments captured from the :meth:`ops.Container.exec` method call. These arguments will be passed to the :meth:`Harness.handle_exec` handler function. See :meth:`ops.pebble.Client.exec` for documentation of properties. """ command: List[str] environment: Dict[str, str] working_dir: Optional[str] timeout: Optional[float] user_id: Optional[int] user: Optional[str] group_id: Optional[int] group: Optional[str] stdin: Optional[Union[str, bytes]] encoding: Optional[str] combine_stderr: bool
[docs]@dataclasses.dataclass class ExecResult: """Represents the result of a simulated process execution. This class is typically used to return the output and exit code from the :meth:`Harness.handle_exec` result or handler function. """ exit_code: int = 0 stdout: Union[str, bytes] = b"" stderr: Union[str, bytes] = b""
ExecHandler = Callable[[ExecArgs], Union[None, ExecResult]]
[docs]@dataclasses.dataclass(frozen=True) class ActionOutput: """Contains the logs and results from a :meth:`Harness.run_action` call.""" logs: List[str] """Messages generated by the Charm using :meth:`ops.ActionEvent.log`.""" results: Dict[str, Any] """The action's results, as set or updated by :meth:`ops.ActionEvent.set_results`."""
[docs]class ActionFailed(Exception): # noqa """Raised when :code:`event.fail()` is called during a :meth:`Harness.run_action` call.""" message: str """Optional details of the failure, as provided by :meth:`ops.ActionEvent.fail`.""" output: ActionOutput """Any logs and results set by the Charm.""" def __init__(self, message: str, output: ActionOutput): self.message = message self.output = output
@dataclasses.dataclass() class _RunningAction: name: str output: ActionOutput parameters: Dict[str, Any] failure_message: Optional[str] = None # noinspection PyProtectedMember
[docs]class Harness(Generic[CharmType]): """This class represents a way to build up the model that will drive a test suite. The model created is from the viewpoint of the charm that is being tested. Always call ``harness.cleanup()`` after creating a :class:`Harness`:: @pytest.fixture() def harness(): harness = Harness(MyCharm) yield harness harness.cleanup() Below is an example test using :meth:`begin_with_initial_hooks` that ensures the charm responds correctly to config changes (the parameter ``harness`` in the test function is a pytest fixture that does setup/teardown, see :class:`Harness`):: def test_foo(harness): # Instantiate the charm and trigger events that Juju would on startup harness.begin_with_initial_hooks() # Update charm config and trigger config-changed harness.update_config({'log_level': 'warn'}) # Check that charm properly handled config-changed, for example, # the charm added the correct Pebble layer plan = harness.get_container_pebble_plan('prometheus') assert '--log.level=warn' in plan.services['prometheus'].command To set up the model without triggering events (or calling charm code), perform the harness actions before calling :meth:`begin`. Below is an example that adds a relation before calling ``begin``, and then updates config to trigger the ``config-changed`` event in the charm (the parameter ``harness`` in the test function is a pytest fixture that does setup/teardown, see :class:`Harness`):: def test_bar(harness): # Set up model before "begin" (no events triggered) harness.set_leader(True) harness.add_relation('db', 'postgresql', unit_data={'key': 'val'}) # Now instantiate the charm to start triggering events as the model changes harness.begin() harness.update_config({'some': 'config'}) # Check that charm has properly handled config-changed, for example, # has written the app's config file root = harness.get_filesystem_root('container') assert (root / 'etc' / 'app.conf').exists() Args: charm_cls: The Charm class to test. meta: A string or file-like object containing the contents of ``metadata.yaml``. If not supplied, we will look for a ``metadata.yaml`` file in the parent directory of the Charm, and if not found fall back to a trivial ``name: test-charm`` metadata. actions: A string or file-like object containing the contents of ``actions.yaml``. If not supplied, we will look for an ``actions.yaml`` file in the parent directory of the Charm. config: A string or file-like object containing the contents of ``config.yaml``. If not supplied, we will look for a ``config.yaml`` file in the parent directory of the Charm. """ def __init__( self, charm_cls: Type[CharmType], *, meta: Optional[YAMLStringOrFile] = None, actions: Optional[YAMLStringOrFile] = None, config: Optional[YAMLStringOrFile] = None): self._charm_cls = charm_cls self._charm: Optional[CharmType] = None self._charm_dir = 'no-disk-path' # this may be updated by _create_meta self._meta = self._create_meta(meta, actions) self._unit_name: str = f"{self._meta.name}/0" self._hooks_enabled: bool = True self._relation_id_counter: int = 0 self._action_id_counter: int = 0 config_ = self._get_config(config) self._backend = _TestingModelBackend(self._unit_name, self._meta, config_) self._model = model.Model(self._meta, self._backend) self._storage = storage.SQLiteStorage(':memory:') self._framework = framework.Framework( self._storage, self._charm_dir, self._meta, self._model) def _event_context(self, event_name: str): """Configures the Harness to behave as if an event hook were running. This means that the Harness will perform strict access control of relation data. Example usage: # this is how we test that attempting to write a remote app's # databag will raise RelationDataError. >>> with harness._event_context('foo'): >>> with pytest.raises(ops.model.RelationDataError): >>> my_relation.data[remote_app]['foo'] = 'bar' # this is how we test with 'realistic conditions' how an event handler behaves # when we call it directly -- i.e. without going through harness.add_relation >>> def test_foo(): >>> class MyCharm: >>> ... >>> def event_handler(self, event): >>> # this is expected to raise an exception >>> event.relation.data[event.relation.app]['foo'] = 'bar' >>> >>> harness = Harness(MyCharm) >>> event = MagicMock() >>> event.relation = harness.charm.model.relations[0] >>> >>> with harness._event_context('my_relation_joined'): >>> with pytest.raises(ops.model.RelationDataError): >>> harness.charm.event_handler(event) If event_name == '', conversely, the Harness will believe that no hook is running, allowing temporary unrestricted access to read/write a relation's databags even from inside an event handler. >>> def test_foo(): >>> class MyCharm: >>> ... >>> def event_handler(self, event): >>> # this is expected to raise an exception since we're not leader >>> event.relation.data[self.app]['foo'] = 'bar' >>> >>> harness = Harness(MyCharm) >>> event = MagicMock() >>> event.relation = harness.charm.model.relations[0] >>> >>> with harness._event_context('my_relation_joined'): >>> harness.charm.event_handler(event) """ return self._framework._event_context(event_name)
[docs] def set_can_connect(self, container: Union[str, model.Container], val: bool): """Change the simulated connection status of a container's underlying Pebble client. After calling this, :meth:`ops.Container.can_connect` will return val. """ if isinstance(container, str): container = self.model.unit.get_container(container) self._backend._set_can_connect(container._pebble, val)
@property def charm(self) -> CharmType: """Return the instance of the charm class that was passed to ``__init__``. Note that the Charm is not instantiated until :meth:`.begin()` is called. Until then, attempting to access this property will raise an exception. """ if self._charm is None: raise RuntimeError('The charm instance is not available yet. ' 'Call Harness.begin() first.') return self._charm @property def model(self) -> model.Model: """Return the :class:`~ops.model.Model` that is being driven by this Harness.""" return self._model @property def framework(self) -> framework.Framework: """Return the Framework that is being driven by this Harness.""" return self._framework
[docs] def begin(self) -> None: """Instantiate the Charm and start handling events. Before calling :meth:`begin`, there is no Charm instance, so changes to the Model won't emit events. Call :meth:`.begin` for :attr:`.charm` to be valid. Should only be called once. """ if self._charm is not None: raise RuntimeError('cannot call the begin method on the harness more than once') # The Framework adds attributes to class objects for events, etc. As such, we can't re-use # the original class against multiple Frameworks. So create a locally defined class # and register it. # TODO: jam 2020-03-16 We are looking to changes this to Instance attributes instead of # Class attributes which should clean up this ugliness. The API can stay the same class TestEvents(self._charm_cls.on.__class__): pass TestEvents.__name__ = self._charm_cls.on.__class__.__name__ class TestCharm(self._charm_cls): # type: ignore on = TestEvents() # Note: jam 2020-03-01 This is so that errors in testing say MyCharm has no attribute foo, # rather than TestCharm has no attribute foo. TestCharm.__name__ = self._charm_cls.__name__ self._charm = TestCharm(self._framework) # type: ignore
[docs] def begin_with_initial_hooks(self) -> None: """Fire the same hooks that Juju would fire at startup. This triggers install, relation-created, config-changed, start, pebble-ready (for any containers), and any relation-joined hooks based on what relations have been added before begin was called. Note that all of these are fired before returning control to the test suite, so to introspect what happens at each step, fire them directly (for example, ``Charm.on.install.emit()``). To use this with all the normal hooks, instantiate the harness, setup any relations that should be active when the charm starts, and then call this method. This method will automatically create and add peer relations that are specified in metadata.yaml. If the charm metadata specifies containers, this sets can_connect to True for all containers (in addition to triggering pebble-ready for each). Example:: harness = Harness(MyCharm) # Do initial setup here # Add storage if needed before begin_with_initial_hooks() is called storage_ids = harness.add_storage('data', count=1)[0] storage_id = storage_id[0] # we only added one storage instance harness.add_relation('db', 'postgresql', unit_data={'key': 'val'}) harness.set_leader(True) harness.update_config({'initial': 'config'}) harness.begin_with_initial_hooks() # This will cause # install, db-relation-created('postgresql'), leader-elected, config-changed, start # db-relation-joined('postgresql/0'), db-relation-changed('postgresql/0') # To be fired. """ self.begin() charm = cast(CharmBase, self._charm) # Checking if disks have been added # storage-attached events happen before install for storage_name in self._meta.storages: for storage_index in self._backend.storage_list(storage_name, include_detached=True): s = model.Storage(storage_name, storage_index, self._backend) if self._backend._storage_is_attached(storage_name, storage_index): # Attaching was done already, but we still need the event to be emitted. self.charm.on[storage_name].storage_attached.emit(s) else: self.attach_storage(s.full_id) # Storage done, emit install event charm.on.install.emit() # Juju itself iterates what relation to fire based on a map[int]relation, so it doesn't # guarantee a stable ordering between relation events. It *does* give a stable ordering # of joined units for a given relation. items = list(self._meta.relations.items()) random.shuffle(items) this_app_name = self._meta.name for relname, rel_meta in items: if rel_meta.role == RelationRole.peer: # If the user has directly added a relation, leave it be, but otherwise ensure # that peer relations are always established at before leader-elected. rel_ids = self._backend._relation_ids_map.get(relname) if rel_ids is None: self.add_relation(relname, self._meta.name) else: random.shuffle(rel_ids) for rel_id in rel_ids: self._emit_relation_created(relname, rel_id, this_app_name) else: rel_ids = self._backend._relation_ids_map.get(relname, []) random.shuffle(rel_ids) for rel_id in rel_ids: app_name = self._backend._relation_app_and_units[rel_id]["app"] self._emit_relation_created(relname, rel_id, app_name) if self._backend._is_leader: charm.on.leader_elected.emit() else: charm.on.leader_settings_changed.emit() charm.on.config_changed.emit() charm.on.start.emit() # Set can_connect and fire pebble-ready for any containers. for container_name in self._meta.containers: self.container_pebble_ready(container_name) # If the initial hooks do not set a unit status, the Juju controller will switch # the unit status from "Maintenance" to "Unknown". See gh#726 post_setup_sts = self._backend.status_get() if post_setup_sts.get("status") == "maintenance" and not post_setup_sts.get("message"): self._backend._unit_status = {'status': 'unknown', 'message': ''} all_ids = list(self._backend._relation_names.items()) random.shuffle(all_ids) for rel_id, rel_name in all_ids: rel_app_and_units = self._backend._relation_app_and_units[rel_id] app_name = rel_app_and_units["app"] # Note: Juju *does* fire relation events for a given relation in the sorted order of # the unit names. It also always fires relation-changed immediately after # relation-joined for the same unit. # Juju only fires relation-changed (app) if there is data for the related application relation = self._model.get_relation(rel_name, rel_id) if self._backend._relation_data_raw[rel_id].get(app_name): app = self._model.get_app(app_name) charm.on[rel_name].relation_changed.emit(relation, app, None) for unit_name in sorted(rel_app_and_units["units"]): remote_unit = self._model.get_unit(unit_name) charm.on[rel_name].relation_joined.emit( relation, remote_unit.app, remote_unit) charm.on[rel_name].relation_changed.emit( relation, remote_unit.app, remote_unit)
[docs] def cleanup(self) -> None: """Called by the test infrastructure to clean up any temporary directories/files/etc. Always call ``self.addCleanup(harness.cleanup)`` after creating a :class:`Harness`. """ self._backend._cleanup()
def _create_meta(self, charm_metadata_yaml: Optional[YAMLStringOrFile], action_metadata_yaml: Optional[YAMLStringOrFile]) -> CharmMeta: """Create a CharmMeta object. Handle the cases where a user doesn't supply explicit metadata snippets. This will try to load metadata from ``<charm_dir>/charmcraft.yaml`` first, then ``<charm_dir>/metadata.yaml`` if charmcraft.yaml does not include metadata, and ``<charm_dir>/actions.yaml`` if charmcraft.yaml does not include actions. """ try: filename = inspect.getfile(self._charm_cls) except OSError: charm_dir = None else: charm_dir = pathlib.Path(filename).parents[1] charm_metadata: Optional[Dict[str, Any]] = None charmcraft_metadata: Optional[Dict[str, Any]] = None if charm_dir: # Check charmcraft.yaml and load it if it exists charmcraft_meta = charm_dir / "charmcraft.yaml" if charmcraft_meta.is_file(): self._charm_dir = charm_dir charmcraft_metadata = yaml.safe_load(charmcraft_meta.read_text()) # Load metadata from parameters if provided if charm_metadata_yaml is not None: if isinstance(charm_metadata_yaml, str): charm_metadata_yaml = dedent(charm_metadata_yaml) charm_metadata = yaml.safe_load(charm_metadata_yaml) else: # Check charmcraft.yaml for metadata if no metadata is provided if charmcraft_metadata is not None: meta_keys = ["name", "summary", "description"] if any(key in charmcraft_metadata for key in meta_keys): # Unrelated keys in the charmcraft.yaml file will be ignored. charm_metadata = charmcraft_metadata # Still no metadata, check metadata.yaml if charm_dir and charm_metadata is None: metadata_path = charm_dir / 'metadata.yaml' if metadata_path.is_file(): charm_metadata = yaml.safe_load(metadata_path.read_text()) self._charm_dir = charm_dir # Use default metadata if metadata is not found if charm_metadata is None: charm_metadata = {"name": "test-charm"} action_metadata: Optional[Dict[str, Any]] = None # Load actions from parameters if provided if action_metadata_yaml is not None: if isinstance(action_metadata_yaml, str): action_metadata_yaml = dedent(action_metadata_yaml) action_metadata = yaml.safe_load(action_metadata_yaml) else: # Check charmcraft.yaml for actions if no actions are provided if charmcraft_metadata is not None and "actions" in charmcraft_metadata: action_metadata = charmcraft_metadata["actions"] # Still no actions, check actions.yaml if charm_dir and action_metadata is None: actions_path = charm_dir / 'actions.yaml' if actions_path.is_file(): action_metadata = yaml.safe_load(actions_path.read_text()) self._charm_dir = charm_dir return CharmMeta(charm_metadata, action_metadata) def _get_config(self, charm_config_yaml: Optional['YAMLStringOrFile']): """If the user passed a config to Harness, use it. Otherwise try to load config from ``<charm_dir>/charmcraft.yaml`` first, then ``<charm_dir>/config.yaml`` if charmcraft.yaml does not include config. """ try: filename = inspect.getfile(self._charm_cls) except OSError: charm_dir = None else: charm_dir = pathlib.Path(filename).parents[1] config: Optional[Dict[str, Any]] = None # Load config from parameters if provided if charm_config_yaml is not None: if isinstance(charm_config_yaml, str): charm_config_yaml = dedent(charm_config_yaml) config = yaml.safe_load(charm_config_yaml) else: if charm_dir: # Check charmcraft.yaml for config if no config is provided charmcraft_meta = charm_dir / "charmcraft.yaml" if charmcraft_meta.is_file(): charmcraft_metadata: Dict[str, Any] = yaml.safe_load( charmcraft_meta.read_text()) config = charmcraft_metadata.get("config") # Still no config, check config.yaml if config is None: config_path = charm_dir / 'config.yaml' if config_path.is_file(): config = yaml.safe_load(config_path.read_text()) self._charm_dir = charm_dir # Use default config if config is not found if config is None: config = {} if not isinstance(config, dict): raise TypeError(config) return cast('_RawConfig', config)
[docs] def add_oci_resource(self, resource_name: str, contents: Optional[Mapping[str, str]] = None) -> None: """Add OCI resources to the backend. This will register an OCI resource and create a temporary file for processing metadata about the resource. A default set of values will be used for all the file contents unless a specific contents dict is provided. Args: resource_name: Name of the resource to add custom contents to. contents: Optional custom dict to write for the named resource. """ if not contents: contents = {'registrypath': 'registrypath', 'username': 'username', 'password': 'password', } if resource_name not in self._meta.resources: raise RuntimeError(f'Resource {resource_name} is not a defined resources') if self._meta.resources[resource_name].type != "oci-image": raise RuntimeError(f'Resource {resource_name} is not an OCI Image') as_yaml = yaml.safe_dump(contents) self._backend._resources_map[resource_name] = ('contents.yaml', as_yaml)
[docs] def add_resource(self, resource_name: str, content: AnyStr) -> None: """Add content for a resource to the backend. This will register the content, so that a call to ``model.resources.fetch(resource_name)`` will return a path to a file containing that content. Args: resource_name: The name of the resource being added content: Either string or bytes content, which will be the content of the filename returned by resource-get. If contents is a string, it will be encoded in utf-8 """ if resource_name not in self._meta.resources: raise RuntimeError(f'Resource {resource_name} is not a defined resource') record = self._meta.resources[resource_name] if record.type != "file": raise RuntimeError( f'Resource {resource_name} is not a file, but actually {record.type}') filename = record.filename if filename is None: filename = resource_name self._backend._resources_map[resource_name] = (filename, content)
[docs] def populate_oci_resources(self) -> None: """Populate all OCI resources.""" for name, data in self._meta.resources.items(): if data.type == "oci-image": self.add_oci_resource(name)
[docs] def disable_hooks(self) -> None: """Stop emitting hook events when the model changes. This can be used by developers to stop changes to the model from emitting events that the charm will react to. Call :meth:`.enable_hooks` to re-enable them. """ self._hooks_enabled = False
[docs] def enable_hooks(self) -> None: """Re-enable hook events from charm.on when the model is changed. By default, hook events are enabled once :meth:`.begin` is called, but if :meth:`.disable_hooks` is used, this method will enable them again. """ self._hooks_enabled = True
[docs] @contextmanager def hooks_disabled(self): """A context manager to run code with hooks disabled. Example:: with harness.hooks_disabled(): # things in here don't fire events harness.set_leader(True) harness.update_config(unset=['foo', 'bar']) # things here will again fire events """ if self._hooks_enabled: self.disable_hooks() try: yield None finally: self.enable_hooks() else: yield None
def _next_relation_id(self): rel_id = self._relation_id_counter self._relation_id_counter += 1 return rel_id
[docs] def add_storage(self, storage_name: str, count: int = 1, *, attach: bool = False) -> List[str]: """Create a new storage device and attach it to this unit. To have repeatable tests, each device will be initialized with location set to /[tmpdir]/<storage_name>N, where N is the counter and will be a number from [0,total_num_disks-1]. The test harness uses symbolic links to imitate storage mounts, which may lead to some inconsistencies compared to the actual charm. Args: storage_name: The storage backend name on the Charm count: Number of disks being added attach: True to also attach the storage mount; if :meth:`begin` has been called a True value will also emit storage-attached Return: A list of storage IDs, e.g. ["my-storage/1", "my-storage/2"]. """ if storage_name not in self._meta.storages: raise RuntimeError( f"the key '{storage_name}' is not specified as a storage key in metadata") storage_indices = self._backend.storage_add(storage_name, count) ids: List[str] = [] for storage_index in storage_indices: s = model.Storage(storage_name, storage_index, self._backend) ids.append(s.full_id) if attach: self.attach_storage(s.full_id) return ids
[docs] def detach_storage(self, storage_id: str) -> None: """Detach a storage device. The intent of this function is to simulate a ``juju detach-storage`` call. It will trigger a storage-detaching hook if the storage unit in question exists and is presently marked as attached. Note that the Charm is not instantiated until :meth:`begin` is called. Until then, attempting to use this method will raise an exception. Args: storage_id: The full storage ID of the storage unit being detached, including the storage key, e.g. my-storage/0. """ if self._charm is None: raise RuntimeError('cannot detach storage before Harness is initialised') storage_name, storage_index = storage_id.split('/', 1) storage_index = int(storage_index) storage_attached = self._backend._storage_is_attached( storage_name, storage_index) if storage_attached and self._hooks_enabled: self.charm.on[storage_name].storage_detaching.emit( model.Storage(storage_name, storage_index, self._backend)) self._backend._storage_detach(storage_id)
[docs] def attach_storage(self, storage_id: str) -> None: """Attach a storage device. The intent of this function is to simulate a ``juju attach-storage`` call. If called after :meth:`begin` and hooks are not disabled, it will trigger a storage-attached hook if the storage unit in question exists and is presently marked as detached. The test harness uses symbolic links to imitate storage mounts, which may lead to some inconsistencies compared to the actual charm. Args: storage_id: The full storage ID of the storage unit being attached, including the storage key, e.g. my-storage/0. """ if not self._backend._storage_attach(storage_id): return # storage was already attached if not self._charm or not self._hooks_enabled: return # don't need to run hook callback storage_name, storage_index = storage_id.split('/', 1) # Reset associated cached value in the storage mappings. If we don't do this, # Model._storages won't return Storage objects for subsequently-added storage. self._model._storages._invalidate(storage_name) storage_index = int(storage_index) self.charm.on[storage_name].storage_attached.emit( model.Storage(storage_name, storage_index, self._backend))
[docs] def remove_storage(self, storage_id: str) -> None: """Detach a storage device. The intent of this function is to simulate a ``juju remove-storage`` call. It will trigger a storage-detaching hook if the storage unit in question exists and is presently marked as attached. Then it will remove the storage unit from the testing backend. Args: storage_id: The full storage ID of the storage unit being removed, including the storage key, e.g. my-storage/0. Raises: RuntimeError: if the storage is not in the metadata. """ storage_name, storage_index = storage_id.split('/', 1) storage_index = int(storage_index) if storage_name not in self._meta.storages: raise RuntimeError( f"the key '{storage_name}' is not specified as a storage key in metadata") is_attached = self._backend._storage_is_attached( storage_name, storage_index) if self._charm is not None and self._hooks_enabled and is_attached: self.charm.on[storage_name].storage_detaching.emit( model.Storage(storage_name, storage_index, self._backend)) self._backend._storage_remove(storage_id)
[docs] def add_relation(self, relation_name: str, remote_app: str, *, app_data: Optional[Mapping[str, str]] = None, unit_data: Optional[Mapping[str, str]] = None) -> int: """Declare that there is a new relation between this application and `remote_app`. This function creates a relation with an application and triggers a :class:`RelationCreatedEvent <ops.RelationCreatedEvent>`. To match Juju's behaviour, it also creates a default network binding on this endpoint. If you want to associate a custom network to this binding (or a global default network), provide one using :meth:`add_network` before calling this function. If `app_data` or `unit_data` are provided, also add a new unit (``<remote_app>/0``) to the relation and trigger :class:`RelationJoinedEvent <ops.RelationJoinedEvent>`. Then update the application data if `app_data` is provided and the unit data if `unit_data` is provided, triggering :class:`RelationChangedEvent <ops.RelationChangedEvent>` after each update. Alternatively, charm tests can call :meth:`add_relation_unit` and :meth:`update_relation_data` explicitly. For peer relations defined in the charm's metadata, :meth:`begin_with_initial_hooks` will create them automatically, so the caller doesn't need to call :meth:`add_relation`. If the caller chooses to add a peer relation by themselves, make sure to call :meth:`add_relation` before :meth:`begin_with_initial_hooks` so that Harness won't create it again. Example usage:: secret_id = harness.add_model_secret('mysql', {'password': 'SECRET'}) harness.add_relation('db', 'mysql', unit_data={ 'host': 'mysql.localhost, 'username': 'appuser', 'secret-id': secret_id, }) Args: relation_name: The relation on the charm that is being integrated with. remote_app: The name of the application that is being integrated with. To add a peer relation, set to the name of *this* application. app_data: If provided, also add a new unit to the relation (triggering relation-joined) and set the *application* relation data (triggering relation-changed). unit_data: If provided, also add a new unit to the relation (triggering relation-joined) and set the *unit* relation data (triggering relation-changed). Return: The ID of the relation created. """ if not (relation_name in self._meta.provides or relation_name in self._meta.requires or relation_name in self._meta.peers): raise RelationNotFoundError(f'relation {relation_name!r} not declared in metadata') relation_id = self._next_relation_id() self._backend._relation_ids_map.setdefault( relation_name, []).append(relation_id) self._backend._relation_names[relation_id] = relation_name self._backend._relation_list_map[relation_id] = [] self._backend._relation_data_raw[relation_id] = { remote_app: {}, self._backend.unit_name: {}, self._backend.app_name: {}} self._backend._relation_app_and_units[relation_id] = { "app": remote_app, "units": [], } # Reload the relation_ids list if self._model is not None: self._model.relations._invalidate(relation_name) self._emit_relation_created(relation_name, relation_id, remote_app) if app_data is not None or unit_data is not None: remote_unit = remote_app + '/0' self.add_relation_unit(relation_id, remote_unit) if app_data is not None: self.update_relation_data(relation_id, remote_app, app_data) if unit_data is not None: self.update_relation_data(relation_id, remote_unit, unit_data) # If we have a default network binding configured, respect it. if not self._backend._networks.get((None, None)): # If we don't already have a network binding for this relation id, create one. if not self._backend._networks.get((relation_name, relation_id)): self.add_network("10.0.0.10", endpoint=relation_name, relation_id=relation_id) # If we don't already have a default network binding for this endpoint, create one. if not self._backend._networks.get((relation_name, None)): self.add_network("192.0.2.0", endpoint=relation_name) return relation_id
[docs] def remove_relation(self, relation_id: int) -> None: """Remove a relation. Args: relation_id: The relation ID for the relation to be removed. Raises: RelationNotFoundError: if relation id is not valid """ rel_names = self._backend._relation_names try: relation_name = rel_names[relation_id] remote_app = self._backend.relation_remote_app_name(relation_id) except KeyError as e: raise model.RelationNotFoundError from e rel_list_map = self._backend._relation_list_map for unit_name in rel_list_map[relation_id].copy(): self.remove_relation_unit(relation_id, unit_name) prev_broken_id = None # Silence linter warning. if self._model is not None: # Let the model's RelationMapping know that this relation is broken. # Normally, this is handled in `main`, but while testing we create # the `Model` object and keep it around for multiple events. prev_broken_id = self._model._relations._broken_relation_id self._model.relations._broken_relation_id = relation_id # Ensure that we don't offer a cached relation. self._model.relations._invalidate(relation_name) self._emit_relation_broken(relation_name, relation_id, remote_app) if self._model is not None: self._model.relations._broken_relation_id = prev_broken_id self._model.relations._invalidate(relation_name) self._backend._relation_app_and_units.pop(relation_id) self._backend._relation_data_raw.pop(relation_id) rel_list_map.pop(relation_id) ids_map = self._backend._relation_ids_map ids_map[relation_name].remove(relation_id) rel_names.pop(relation_id) # Remove secret grants that give access via this relation for secret in self._backend._secrets: secret.grants = {rid: names for rid, names in secret.grants.items() if rid != relation_id}
def _emit_relation_created(self, relation_name: str, relation_id: int, remote_app: str) -> None: """Trigger relation-created for a given relation with a given remote application.""" if self._charm is None or not self._hooks_enabled: return relation = self._model.get_relation(relation_name, relation_id) app = self._model.get_app(remote_app) self._charm.on[relation_name].relation_created.emit( relation, app) def _emit_relation_broken(self, relation_name: str, relation_id: int, remote_app: str) -> None: """Trigger relation-broken for a given relation with a given remote application.""" if self._charm is None or not self._hooks_enabled: return relation = self._model.get_relation(relation_name, relation_id) app = self._model.get_app(remote_app) self._charm.on[relation_name].relation_broken.emit(relation, app)
[docs] def add_relation_unit(self, relation_id: int, remote_unit_name: str) -> None: """Add a new unit to a relation. This will trigger a `relation_joined` event. This would naturally be followed by a `relation_changed` event, which can be triggered with :meth:`.update_relation_data`. This separation is artificial in the sense that Juju will always fire the two, but is intended to make testing relations and their data bags slightly more natural. Unless finer-grained control is needed, most charm tests can call :meth:`add_relation` with the `app_data` or `unit_data` argument instead of using this function. Example:: rel_id = harness.add_relation('db', 'postgresql') harness.add_relation_unit(rel_id, 'postgresql/0') Args: relation_id: The integer relation identifier (as returned by :meth:`add_relation`). remote_unit_name: A string representing the remote unit that is being added. """ self._backend._relation_list_map[relation_id].append(remote_unit_name) # we can write remote unit data iff we are not in a hook env relation_name = self._backend._relation_names[relation_id] relation = self._model.get_relation(relation_name, relation_id) if not relation: raise RuntimeError('Relation id {} is mapped to relation name {},' 'but no relation matching that name was found.') self._backend._relation_data_raw[relation_id][remote_unit_name] = {} app = relation.app if not remote_unit_name.startswith(app.name): warnings.warn( f'Remote unit name invalid: ' f'the remote application of {relation_name} is called {app.name!r}; ' f'the remote unit name should be {app.name}/<some-number>, ' f'not {remote_unit_name!r}.' ) app_and_units = self._backend._relation_app_and_units app_and_units[relation_id]["units"].append(remote_unit_name) # Make sure that the Model reloads the relation_list for this relation_id, as well as # reloading the relation data for this unit. remote_unit = self._model.get_unit(remote_unit_name) unit_cache = relation.data.get(remote_unit, None) if unit_cache is not None: unit_cache._invalidate() self._model.relations._invalidate(relation_name) if self._charm is None or not self._hooks_enabled: return self._charm.on[relation_name].relation_joined.emit( relation, remote_unit.app, remote_unit)
[docs] def remove_relation_unit(self, relation_id: int, remote_unit_name: str) -> None: """Remove a unit from a relation. Example:: rel_id = harness.add_relation('db', 'postgresql') harness.add_relation_unit(rel_id, 'postgresql/0') ... harness.remove_relation_unit(rel_id, 'postgresql/0') This will trigger a `relation_departed` event. This would normally be followed by a `relation_changed` event triggered by Juju. However, when using the test harness, a `relation_changed` event must be triggered using :meth:`.update_relation_data`. This deviation from normal Juju behaviour facilitates testing by making each step in the charm life cycle explicit. Args: relation_id: The integer relation identifier (as returned by :meth:`add_relation`). remote_unit_name: A string representing the remote unit that is being removed. """ relation_name = self._backend._relation_names[relation_id] # gather data to invalidate cache later remote_unit = self._model.get_unit(remote_unit_name) relation = self._model.get_relation(relation_name, relation_id) if not relation: # This should not really happen, since there being a relation name mapped # to this ID in _relation_names should guarantee that you created the relation # following the proper path, but still... raise RuntimeError('Relation id {} is mapped to relation name {},' 'but no relation matching that name was found.') unit_cache = relation.data.get(remote_unit, None) # remove the unit from the list of units in the relation relation.units.remove(remote_unit) self._emit_relation_departed(relation_id, remote_unit_name) # remove the relation data for the departed unit now that the event has happened self._backend._relation_list_map[relation_id].remove(remote_unit_name) self._backend._relation_app_and_units[relation_id]["units"].remove(remote_unit_name) self._backend._relation_data_raw[relation_id].pop(remote_unit_name) self.model._relations._invalidate(relation_name=relation.name) if unit_cache is not None: unit_cache._invalidate()
def _emit_relation_departed(self, relation_id: int, unit_name: str): """Trigger relation-departed event for a given relation id and unit.""" if self._charm is None or not self._hooks_enabled: return rel_name = self._backend._relation_names[relation_id] relation = self.model.get_relation(rel_name, relation_id) if '/' in unit_name: app_name = unit_name.split('/')[0] app = self.model.get_app(app_name) unit = self.model.get_unit(unit_name) else: raise ValueError('Invalid Unit Name') self._charm.on[rel_name].relation_departed.emit( relation, app, unit, unit_name)
[docs] def get_relation_data(self, relation_id: int, app_or_unit: AppUnitOrName) -> Mapping[str, str]: """Get the relation data bucket for a single app or unit in a given relation. This ignores all of the safety checks of who can and can't see data in relations (eg, non-leaders can't read their own application's relation data because there are no events that keep that data up-to-date for the unit). Args: relation_id: The relation whose content we want to look at. app_or_unit: An :class:`Application <ops.Application>` or :class:`Unit <ops.Unit>` instance, or its name, whose data we want to read. Return: A dict containing the relation data for ``app_or_unit`` or None. Raises: KeyError: if ``relation_id`` doesn't exist """ name = _get_app_or_unit_name(app_or_unit) # bypass access control by going directly to raw return self._backend._relation_data_raw[relation_id].get(name, None)
[docs] def get_pod_spec(self) -> Tuple[Mapping[Any, Any], Mapping[Any, Any]]: """Return the content of the pod spec as last set by the charm. This returns both the pod spec and any k8s_resources that were supplied. See the signature of :meth:`Pod.set_spec <ops.Pod.set_spec>`. """ return self._backend._pod_spec
[docs] def get_container_pebble_plan( self, container_name: str ) -> pebble.Plan: """Return the current plan that Pebble is executing for the given container. Args: container_name: The simple name of the associated container Return: The Pebble plan for this container. Use :meth:`Plan.to_yaml <ops.pebble.Plan.to_yaml>` to get a string form for the content. Raises: KeyError: if no Pebble client exists for that container name (should only happen if container is not present in ``metadata.yaml``). """ client = self._backend._pebble_clients.get(container_name) if client is None: raise KeyError(f'no known pebble client for container "{container_name}"') return client.get_plan()
[docs] def container_pebble_ready(self, container_name: str): """Fire the pebble_ready hook for the associated container. This will switch the given container's ``can_connect`` state to True before the hook function is called. It will do nothing if :meth:`begin()` has not been called. """ if self._charm is None: return container = self.model.unit.get_container(container_name) self.set_can_connect(container, True) self.charm.on[container_name].pebble_ready.emit(container)
[docs] def pebble_notify(self, container_name: str, key: str, *, data: Optional[Dict[str, str]] = None, repeat_after: Optional[datetime.timedelta] = None, type: pebble.NoticeType = pebble.NoticeType.CUSTOM) -> str: """Record a Pebble notice with the specified key and data. If :meth:`begin` has been called and the notice is new or was repeated, this will trigger a notice event of the appropriate type, for example :class:`ops.PebbleCustomNoticeEvent`. Args: container_name: Name of workload container. key: Notice key; must be in "example.com/path" format. data: Data fields for this notice. repeat_after: Only allow this notice to repeat after this duration has elapsed (the default is to always repeat). type: Notice type (currently only "custom" notices are supported). Returns: The notice's ID. """ container = self.model.unit.get_container(container_name) client = self._backend._pebble_clients[container.name] id, new_or_repeated = client._notify(type, key, data=data, repeat_after=repeat_after) if self._charm is not None and type == pebble.NoticeType.CUSTOM and new_or_repeated: self.charm.on[container_name].pebble_custom_notice.emit(container, id, type.value, key) return id
[docs] def get_workload_version(self) -> str: """Read the workload version that was set by the unit.""" return self._backend._workload_version
[docs] def set_model_info(self, name: Optional[str] = None, uuid: Optional[str] = None) -> None: """Set the name and UUID of the model that this is representing. Cannot be called once :meth:`begin` has been called. Use it to set the value that will be returned by :attr:`Model.name <ops.Model.name>` and :attr:`Model.uuid <ops.Model.uuid>`. This is a convenience method to invoke both :meth:`set_model_name` and :meth:`set_model_uuid` at once. """ if name is not None: self.set_model_name(name) if uuid is not None: self.set_model_uuid(uuid)
[docs] def set_model_name(self, name: str) -> None: """Set the name of the Model that this is representing. Cannot be called once :meth:`begin` has been called. Use it to set the value that will be returned by :attr:`Model.name <ops.Model.name>`. """ if self._charm is not None: raise RuntimeError('cannot set the Model name after begin()') self._backend.model_name = name
[docs] def set_model_uuid(self, uuid: str) -> None: """Set the uuid of the Model that this is representing. Cannot be called once :meth:`begin` has been called. Use it to set the value that will be returned by :attr:`Model.uuid <ops.Model.uuid>`. """ if self._charm is not None: raise RuntimeError('cannot set the Model uuid after begin()') self._backend.model_uuid = uuid
[docs] def update_relation_data( self, relation_id: int, app_or_unit: str, key_values: Mapping[str, str], ) -> None: """Update the relation data for a given unit or application in a given relation. This also triggers the `relation_changed` event for the given ``relation_id``. Unless finer-grained control is needed, most charm tests can call :meth:`add_relation` with the `app_data` or `unit_data` argument instead of using this function. Args: relation_id: The integer relation ID representing this relation. app_or_unit: The unit or application name that is being updated. This can be the local or remote application. key_values: Each key/value will be updated in the relation data. """ relation_name = self._backend._relation_names[relation_id] relation = self._model.get_relation(relation_name, relation_id) if '/' in app_or_unit: entity = self._model.get_unit(app_or_unit) else: entity = self._model.get_app(app_or_unit) if not relation: raise RuntimeError('Relation id {} is mapped to relation name {},' 'but no relation matching that name was found.') rel_data = relation.data.get(entity, None) if rel_data is not None: # rel_data may have cached now-stale data, so _invalidate() it. # Note, this won't cause the data to be loaded if it wasn't already. rel_data._invalidate() old_values = self._backend._relation_data_raw[relation_id][app_or_unit].copy() assert isinstance(old_values, dict), old_values # get a new relation instance to ensure a clean state new_relation_instance = self.model.relations._get_unique(relation.name, relation_id) assert new_relation_instance is not None # type guard; this passed before... databag = new_relation_instance.data[entity] # ensure that WE as harness can temporarily write the databag with self._event_context(''): values_have_changed = False for k, v in key_values.items(): if v == '': if databag.pop(k, None) != v: values_have_changed = True else: if k not in databag or databag[k] != v: databag[k] = v # this triggers relation-set values_have_changed = True if not values_have_changed: # Do not issue a relation changed event if the data bags have not changed return if app_or_unit == self._model.unit.name: # No events for our own unit return if app_or_unit == self._model.app.name: # updating our own app only generates an event if it is a peer relation and we # aren't the leader is_peer = self._meta.relations[relation_name].role.is_peer() if not is_peer: return if self._model.unit.is_leader(): return self._emit_relation_changed(relation_id, app_or_unit)
def _emit_relation_changed(self, relation_id: int, app_or_unit: str): if self._charm is None or not self._hooks_enabled: return rel_name = self._backend._relation_names[relation_id] relation = self.model.get_relation(rel_name, relation_id) if '/' in app_or_unit: app_name = app_or_unit.split('/')[0] unit_name = app_or_unit app = self.model.get_app(app_name) unit = self.model.get_unit(unit_name) args = (relation, app, unit) else: app_name = app_or_unit app = self.model.get_app(app_name) args = (relation, app) self._charm.on[rel_name].relation_changed.emit(*args) def _update_config( self, key_values: Optional[Mapping[str, Union[str, int, float, bool]]] = None, unset: Iterable[str] = (), ) -> None: """Update the config as seen by the charm. This will *not* trigger a `config_changed` event, and is intended for internal use. Note that the `key_values` mapping will only add or update configuration items. To remove existing ones, see the `unset` parameter. Args: key_values: A Mapping of key:value pairs to update in config. unset: An iterable of keys to remove from config. """ # NOTE: jam 2020-03-01 Note that this sort of works "by accident". Config # is a LazyMapping, but its _load returns a dict and this method mutates # the dict that Config is caching. Arguably we should be doing some sort # of charm.framework.model.config._invalidate() config = self._backend._config if key_values is not None: for key, value in key_values.items(): if key in config._defaults: if value is not None: config._config_set(key, value) else: raise ValueError(f"unknown config option: '{key}'") for key in unset: # When the key is unset, revert to the default if one exists default = config._defaults.get(key, None) if default is not None: config._config_set(key, default) else: config.pop(key, None)
[docs] def update_config( self, key_values: Optional[Mapping[str, Union[str, int, float, bool]]] = None, unset: Iterable[str] = (), ) -> None: """Update the config as seen by the charm. This will trigger a `config_changed` event. Note that the ``key_values`` mapping will only add or update configuration items. To remove existing ones, see the ``unset`` parameter. Args: key_values: A Mapping of key:value pairs to update in config. unset: An iterable of keys to remove from config. This sets the value to the default if defined, otherwise removes the key altogether. Raises: ValueError: if the key is not present in the config. """ self._update_config(key_values, unset) if self._charm is None or not self._hooks_enabled: return self._charm.on.config_changed.emit()
[docs] def set_leader(self, is_leader: bool = True) -> None: """Set whether this unit is the leader or not. If this charm becomes a leader then `leader_elected` will be triggered. If :meth:`begin` has already been called, then the charm's peer relation should usually be added *prior* to calling this method (with :meth:`add_relation`) to properly initialise and make available relation data that leader elected hooks may want to access. Args: is_leader: Whether this unit is the leader. """ self._backend._is_leader = is_leader # Note: jam 2020-03-01 currently is_leader is cached at the ModelBackend level, not in # the Model objects, so this automatically gets noticed. if is_leader and self._charm is not None and self._hooks_enabled: self._charm.on.leader_elected.emit()
[docs] def set_planned_units(self, num_units: int) -> None: """Set the number of "planned" units. This is the value that :meth:`Application.planned_units <ops.Application.planned_units>` should return. In real world circumstances, this number will be the number of units in the application. That is, this number will be the number of peers this unit has, plus one, as we count our own unit in the total. A change to the return from ``planned_units`` will not generate an event. Typically, a charm author would check planned units during a config or install hook, or after receiving a peer relation joined event. """ if num_units < 0: raise TypeError("num_units must be 0 or a positive integer.") self._backend._planned_units = num_units
[docs] def reset_planned_units(self) -> None: """Reset the planned units override. This allows the harness to fall through to the built in methods that will try to guess at a value for planned units, based on the number of peer relations that have been setup in the testing harness. """ self._backend._planned_units = None
[docs] def add_network(self, address: str, *, endpoint: Optional[str] = None, relation_id: Optional[int] = None, cidr: Optional[str] = None, interface: str = 'eth0', ingress_addresses: Optional[Iterable[str]] = None, egress_subnets: Optional[Iterable[str]] = None): """Add simulated network data for the given relation endpoint (binding). Calling this multiple times with the same (binding, relation_id) combination will replace the associated network data. Example:: # Set network info for default binding harness.add_network('10.0.0.10') # Or set network info for specific endpoint harness.add_network('10.0.0.10', endpoint='db') After either of those calls, the following will be true (in the first case, the simulated network-get will fall back to the default binding):: binding = harness.model.get_binding('db') assert binding.network.bind_address == ipaddress.IPv4Address('10.0.0.10')) Args: address: Binding's IPv4 or IPv6 address. endpoint: Name of relation endpoint (binding) to add network data for. If not provided, add info for the default binding. relation_id: Relation ID for the binding. If provided, the endpoint argument must be provided and correspond. If not provided, add network data for the endpoint's default binding. cidr: Binding's CIDR. Defaults to "<address>/24" if address is an IPv4 address, or "<address>/64" if address is IPv6 (the host bits are cleared). interface: Name of network interface. ingress_addresses: List of ingress addresses. Defaults to [address]. egress_subnets: List of egress subnets. Defaults to [cidr]. Raises: ModelError: If the endpoint is not a known relation name, or the relation_id is incorrect or doesn't match the endpoint. ValueError: If address is not an IPv4 or IPv6 address. """ if endpoint is not None and endpoint not in self._meta.relations: raise model.ModelError(f'{endpoint!r} is not a known endpoint') if relation_id is not None: if endpoint is None: raise TypeError('endpoint must be set if relation_id is provided') relation_name = self._backend._relation_names.get(relation_id) if relation_name is None: raise model.ModelError( f'relation_id {relation_id} has not been added; use add_relation') if endpoint != relation_name: raise model.ModelError( f"endpoint {endpoint!r} does not correspond to relation_id " + f"{relation_id} ({relation_name!r})") parsed_address = ipaddress.ip_address(address) # raises ValueError if not an IP if cidr is None: if isinstance(parsed_address, ipaddress.IPv4Address): cidr = str(ipaddress.IPv4Network(address + '/24', strict=False)) else: cidr = str(ipaddress.IPv6Network(address + '/64', strict=False)) if ingress_addresses is None: ingress_addresses = [address] if egress_subnets is None: egress_subnets = [cidr] data = { 'bind-addresses': [{ 'interface-name': interface, 'addresses': [ {'cidr': cidr, 'value': address}, ], }], 'egress-subnets': list(egress_subnets), 'ingress-addresses': list(ingress_addresses), } self._backend._networks[endpoint, relation_id] = data
def _get_backend_calls(self, reset: bool = True) -> List[Tuple[Any, ...]]: """Return the calls that we have made to the TestingModelBackend. This is useful mostly for testing the framework itself, so that we can assert that we do/don't trigger extra calls. Args: reset: If True, reset the calls list back to empty, if false, the call list is preserved. Return: ``[(call1, args...), (call2, args...)]`` """ calls = self._backend._calls.copy() if reset: self._backend._calls.clear() return calls
[docs] def add_model_secret(self, owner: AppUnitOrName, content: Dict[str, str]) -> str: """Add a secret owned by the remote application or unit specified. This is named :code:`add_model_secret` instead of :code:`add_secret` to avoid confusion with the :meth:`ops.Application.add_secret` and :meth:`ops.Unit.add_secret` methods used by secret owner charms. Args: owner: The name of the remote application (or specific remote unit) that will own the secret. content: A key-value mapping containing the payload of the secret, for example :code:`{"password": "foo123"}`. Return: The ID of the newly-secret added. """ owner_name = _get_app_or_unit_name(owner) model.Secret._validate_content(content) return self._backend._secret_add(content, owner_name)
[docs] def add_user_secret(self, content: Dict[str, str]) -> str: """Add a secret owned by the user, simulating the ``juju add-secret`` command. Args: content: A key-value mapping containing the payload of the secret, for example :code:`{"password": "foo123"}`. Return: The ID of the newly-added secret. Example usage (the parameter ``harness`` in the test function is a pytest fixture that does setup/teardown, see :class:`Harness`):: # charmcraft.yaml config: options: mysec: type: secret description: "tell me your secrets" # charm.py class MyVMCharm(ops.CharmBase): def __init__(self, framework: ops.Framework): super().__init__(framework) framework.observe(self.on.config_changed, self._on_config_changed) def _on_config_changed(self, event: ops.ConfigChangedEvent): mysec = self.config.get('mysec') if mysec: sec = self.model.get_secret(id=mysec, label="mysec") self.config_from_secret = sec.get_content() # test_charm.py def test_config_changed(harness): secret_content = {'password': 'foo'} secret_id = harness.add_user_secret(secret_content) harness.grant_secret(secret_id, 'test-charm') harness.begin() harness.update_config({'mysec': secret_id}) secret = harness.model.get_secret(id=secret_id).get_content() assert harness.charm.config_from_secret == secret.get_content() """ model.Secret._validate_content(content) # Although it's named a user-owned secret in Juju, technically, the owner is the # Model, so the secret's owner is set to `Model.uuid`. return self._backend._secret_add(content, self.model.uuid)
def _ensure_secret(self, secret_id: str) -> '_Secret': secret = self._backend._get_secret(secret_id) if secret is None: raise RuntimeError(f'Secret {secret_id!r} not found') return secret
[docs] def set_secret_content(self, secret_id: str, content: Dict[str, str]): """Update a secret's content, add a new revision, and fire *secret-changed*. Args: secret_id: The ID of the secret to update. This should normally be the return value of :meth:`add_model_secret`. content: A key-value mapping containing the new payload. """ model.Secret._validate_content(content) secret = self._ensure_secret(secret_id) if secret.owner_name in [self.model.app.name, self.model.unit.name]: raise RuntimeError(f'Secret {secret_id!r} owned by the charm under test, ' f"can't call set_secret_content") new_revision = _SecretRevision( revision=secret.revisions[-1].revision + 1, content=content, ) secret.revisions.append(new_revision) self.charm.on.secret_changed.emit(secret_id, secret.label)
[docs] def grant_secret(self, secret_id: str, observer: AppUnitOrName): """Grant read access to this secret for the given observer application or unit. For user secrets, grant access to the application, simulating the ``juju grant-secret`` command. If the given application or unit has already been granted access to this secret, do nothing. Args: secret_id: The ID of the secret to grant access to. This should normally be the return value of :meth:`add_model_secret`. observer: The name of the application (or specific unit) to grant access to. A relation between this application and the charm under test must already have been created. """ secret = self._ensure_secret(secret_id) app_or_unit_name = _get_app_or_unit_name(observer) # User secrets: if secret.owner_name == self.model.uuid: secret.user_secrets_grants.add(app_or_unit_name) return # Model secrets: if secret.owner_name in [self.model.app.name, self.model.unit.name]: raise RuntimeError(f'Secret {secret_id!r} owned by the charm under test, "' f"can't call grant_secret") relation_id = self._secret_relation_id_to(secret) if relation_id not in secret.grants: secret.grants[relation_id] = set() secret.grants[relation_id].add(app_or_unit_name)
[docs] def revoke_secret(self, secret_id: str, observer: AppUnitOrName): """Revoke read access to this secret for the given observer application or unit. If the given application or unit does not have access to this secret, do nothing. Args: secret_id: The ID of the secret to revoke access for. This should normally be the return value of :meth:`add_model_secret`. observer: The name of the application (or specific unit) to revoke access to. A relation between this application and the charm under test must have already been created. """ secret = self._ensure_secret(secret_id) app_or_unit_name = _get_app_or_unit_name(observer) # User secrets: if secret.owner_name == self.model.uuid: secret.user_secrets_grants.discard(app_or_unit_name) return # Model secrets: if secret.owner_name in [self.model.app.name, self.model.unit.name]: raise RuntimeError(f'Secret {secret_id!r} owned by the charm under test, "' f"can't call revoke_secret") relation_id = self._secret_relation_id_to(secret) if relation_id not in secret.grants: return secret.grants[relation_id].discard(app_or_unit_name)
def _secret_relation_id_to(self, secret: '_Secret') -> int: """Get the relation ID of relation between this charm and the secret owner.""" owner_app = secret.owner_name.split('/')[0] relation_id = self._backend._relation_id_to(owner_app) if relation_id is None: raise RuntimeError(f'No relation between this charm ({self.model.app.name}) ' f'and secret owner ({owner_app})') return relation_id
[docs] def get_secret_grants(self, secret_id: str, relation_id: int) -> Set[str]: """Return the set of app and unit names granted to secret for this relation. Args: secret_id: The ID of the secret to get grants for. relation_id: The ID of the relation granted access. """ secret = self._ensure_secret(secret_id) return secret.grants.get(relation_id, set())
[docs] def get_secret_revisions(self, secret_id: str) -> List[int]: """Return the list of revision IDs for the given secret, oldest first. Args: secret_id: The ID of the secret to get revisions for. """ secret = self._ensure_secret(secret_id) return [r.revision for r in secret.revisions]
[docs] def trigger_secret_rotation(self, secret_id: str, *, label: Optional[str] = None): """Trigger a secret-rotate event for the given secret. This event is fired by Juju when a secret's rotation time elapses, however, time-based events cannot be simulated appropriately in the harness, so this fires it manually. Args: secret_id: The ID of the secret associated with the event. label: Label value to send to the event. If None, the secret's label is used. """ secret = self._ensure_secret(secret_id) if secret.owner_name == self.model.uuid: raise RuntimeError("Cannot trigger the secret-rotate event for a user secret.") if label is None: label = secret.label self.charm.on.secret_rotate.emit(secret_id, label)
[docs] def trigger_secret_removal(self, secret_id: str, revision: int, *, label: Optional[str] = None): """Trigger a secret-remove event for the given secret and revision. This event is fired by Juju for a specific revision when all the secret's observers have refreshed to a later revision, however, in the harness call this method to fire the event manually. Args: secret_id: The ID of the secret associated with the event. revision: Revision number to provide to the event. This should be an item from the list returned by :meth:`get_secret_revisions`. label: Label value to send to the event. If None, the secret's label is used. """ secret = self._ensure_secret(secret_id) if label is None: label = secret.label self.charm.on.secret_remove.emit(secret_id, label, revision)
[docs] def trigger_secret_expiration(self, secret_id: str, revision: int, *, label: Optional[str] = None): """Trigger a secret-expired event for the given secret. This event is fired by Juju when a secret's expiration time elapses, however, time-based events cannot be simulated appropriately in the harness, so this fires it manually. Args: secret_id: The ID of the secret associated with the event. revision: Revision number to provide to the event. This should be an item from the list returned by :meth:`get_secret_revisions`. label: Label value to send to the event. If None, the secret's label is used. """ secret = self._ensure_secret(secret_id) if secret.owner_name == self.model.uuid: raise RuntimeError("Cannot trigger the secret-expired event for a user secret.") if label is None: label = secret.label self.charm.on.secret_expired.emit(secret_id, label, revision)
[docs] def get_filesystem_root(self, container: Union[str, Container]) -> pathlib.Path: """Return the temp directory path harness will use to simulate the container filesystem. In a real container runtime, each container has an isolated root filesystem. To simulate this behaviour, the testing harness manages a temporary directory for each container. Any Pebble filesystem API calls will be translated and mapped to this directory, as if the directory was the container's filesystem root. This process is quite similar to the ``chroot`` command. Charm tests should treat the returned directory as the container's root directory (``/``). The testing harness will not create any files or directories inside the simulated container's root directory; it's up to the test to populate the container's root directory with any files or directories the charm needs. Regarding the file ownership: unprivileged users are unable to create files with distinct ownership. To circumvent this limitation, the testing harness maps all user and group options related to file operations to match the current user and group. Example usage (the parameter ``harness`` in the test function is a pytest fixture that does setup/teardown, see :class:`Harness`):: # charm.py class ExampleCharm(ops.CharmBase): def __init__(self, *args): super().__init__(*args) self.framework.observe(self.on["mycontainer"].pebble_ready, self._on_pebble_ready) def _on_pebble_ready(self, event: ops.PebbleReadyEvent): self.hostname = event.workload.pull("/etc/hostname").read() # test_charm.py def test_hostname(harness): root = harness.get_filesystem_root("mycontainer") (root / "etc").mkdir() (root / "etc" / "hostname").write_text("hostname.example.com") harness.begin_with_initial_hooks() assert harness.charm.hostname == "hostname.example.com" Args: container: The name of the container or the container instance. Return: The path of the temporary directory associated with the specified container. """ # It's okay to access the container directly in this context, as its creation has already # been ensured during the model's initialization. container_name = container if isinstance(container, str) else container.name return self._backend._pebble_clients[container_name]._root
[docs] def evaluate_status(self) -> None: """Trigger the collect-status events and set application and/or unit status. This will always trigger ``collect_unit_status``, and set the unit status if any statuses were added. If running on the leader unit (:meth:`set_leader` has been called with ``True``), this will trigger ``collect_app_status``, and set the application status if any statuses were added. Tests should normally call this and then assert that ``self.model.app.status`` or ``self.model.unit.status`` is the value expected. Evaluation is not "additive"; this method resets the added statuses before triggering each collect-status event. """ self.charm.app._collected_statuses = [] self.charm.unit._collected_statuses = [] charm._evaluate_status(self.charm)
[docs] def handle_exec(self, container: Union[str, Container], command_prefix: Sequence[str], *, handler: Optional[ExecHandler] = None, result: Optional[Union[int, str, bytes, ExecResult]] = None): r"""Register a handler to simulate the Pebble command execution. This allows a test harness to simulate the behavior of running commands in a container. When :meth:`ops.Container.exec` is triggered, the registered handler is used to generate stdout and stderr for the simulated execution. A ``handler`` or a ``result`` may be provided, but not both: - A ``handler`` is a function accepting :class:`ops.testing.ExecArgs` and returning :class:`ops.testing.ExecResult` as the simulated process outcome. For cases that have side effects but don't return output, the handler can return ``None``, which is equivalent to returning ``ExecResult()``. - A ``result`` is for simulations that don't need to inspect the ``exec`` arguments; the output or exit code is provided directly. Setting ``result`` to str or bytes means use that string as stdout (with exit code 0); setting ``result`` to int means return that exit code (and no stdout). If ``handle_exec`` is called more than once with overlapping command prefixes, the longest match takes precedence. The registration of an execution handler can be updated by re-registering with the same command prefix. The execution handler receives the timeout value in the ``ExecArgs``. If needed, it can raise a ``TimeoutError`` to inform the harness that a timeout occurred. If :meth:`ops.Container.exec` is called with ``combine_stderr=True``, the execution handler should, if required, weave the simulated standard error into the standard output. The harness checks the result and will raise an exception if stderr is non-empty. Args: container: The specified container or its name. command_prefix: The command prefix to register against. handler: A handler function that simulates the command's execution. result: A simplified form to specify the command's simulated result. Example usage:: # produce no output and return 0 for every command harness.handle_exec('container', [], result=0) # simple example that just produces output (exit code 0) harness.handle_exec('webserver', ['ls', '/etc'], result='passwd\nprofile\n') # slightly more complex (use stdin) harness.handle_exec( 'c1', ['sha1sum'], handler=lambda args: ExecResult(stdout=hashlib.sha1(args.stdin).hexdigest())) # more complex example using args.command def docker_handler(args: testing.ExecArgs) -> testing.ExecResult: match args.command: case ['docker', 'run', image]: return testing.ExecResult(stdout=f'running {image}') case ['docker', 'ps']: return testing.ExecResult(stdout='CONTAINER ID IMAGE ...') case _: return testing.ExecResult(exit_code=1, stderr='unknown command') harness.handle_exec('database', ['docker'], handler=docker_handler) # handle timeout def handle_timeout(args: testing.ExecArgs) -> int: if args.timeout is not None and args.timeout < 10: raise TimeoutError return 0 harness.handle_exec('database', ['foo'], handler=handle_timeout) """ if (handler is None and result is None) or (handler is not None and result is not None): raise TypeError("Either handler or result must be provided, but not both.") container_name = container if isinstance(container, str) else container.name if result is not None: if isinstance(result, int) and not isinstance(result, bool): result = ExecResult(exit_code=result) elif isinstance(result, (str, bytes)): result = ExecResult(stdout=result) elif not isinstance(result, ExecResult): raise TypeError( f"result must be int, str, bytes, or ExecResult, " f"not {result.__class__.__name__}") self._backend._pebble_clients[container_name]._handle_exec( command_prefix=command_prefix, handler=(lambda _: result) if handler is None else handler # type: ignore )
@property def reboot_count(self) -> int: """Number of times the charm has called :meth:`ops.Unit.reboot`.""" return self._backend._reboot_count
[docs] def run_action(self, action_name: str, params: Optional[Dict[str, Any]] = None) -> ActionOutput: """Simulates running a charm action, as with ``juju run``. Use this only after calling :meth:`begin`. Validates that no required parameters are missing, and that additional parameters are not provided if that is not permitted. Does not validate the types of the parameters - you can use the `jsonschema <https://github.com/python-jsonschema/jsonschema>`_ package to do this in your tests; for example:: schema = harness.charm.meta.actions["action-name"].parameters try: jsonschema.validate(instance=params, schema=schema) except jsonschema.ValidationError: # Do something about the invalid params. ... harness.run_action("action-name", params) Args: action_name: the name of the action to run, as found in ``actions.yaml``. params: override the default parameter values found in ``actions.yaml``. If a parameter is not in ``params``, or ``params`` is ``None``, then the default value from ``actions.yaml`` will be used. Raises: ActionFailed: if :meth:`ops.ActionEvent.fail` is called. Note that this will be raised at the end of the ``run_action`` call, not immediately when :code:`fail()` is called, to match the run-time behaviour. """ try: action_meta = self.charm.meta.actions[action_name] except KeyError: raise RuntimeError(f"Charm does not have a {action_name!r} action.") from None if params is None: params = {} for key in action_meta.required: # Juju requires that the key is in the passed parameters, even if there is a default # value in actions.yaml. if key not in params: raise RuntimeError(f"{key!r} parameter is required, but missing.") if not action_meta.additional_properties: for key in params: if key not in action_meta.parameters: # Match Juju's error message. raise model.ModelError( f'additional property "{key}" is not allowed, ' f'given {{"{key}":{params[key]!r}}}') action_under_test = _RunningAction(action_name, ActionOutput([], {}), params) handler = getattr(self.charm.on, f"{action_name.replace('-', '_')}_action") self._backend._running_action = action_under_test self._action_id_counter += 1 handler.emit(str(self._action_id_counter)) self._backend._running_action = None if action_under_test.failure_message is not None: raise ActionFailed( message=action_under_test.failure_message, output=action_under_test.output) return action_under_test.output
[docs] def set_cloud_spec(self, spec: 'model.CloudSpec'): """Set cloud specification (metadata) including credentials. Call this method before the charm calls :meth:`ops.Model.get_cloud_spec`. Example usage (the parameter ``harness`` in the test function is a pytest fixture that does setup/teardown, see :class:`Harness`):: # charm.py class MyVMCharm(ops.CharmBase): def __init__(self, framework: ops.Framework): super().__init__(framework) framework.observe(self.on.start, self._on_start) def _on_start(self, event: ops.StartEvent): self.cloud_spec = self.model.get_cloud_spec() # test_charm.py def test_start(harness): cloud_spec = ops.model.CloudSpec.from_dict({ 'name': 'localhost', 'type': 'lxd', 'endpoint': 'https://127.0.0.1:8443', 'credential': { 'auth-type': 'certificate', 'attrs': { 'client-cert': 'foo', 'client-key': 'bar', 'server-cert': 'baz' }, }, }) harness.set_cloud_spec(cloud_spec) harness.begin() harness.charm.on.start.emit() assert harness.charm.cloud_spec == cloud_spec """ self._backend._cloud_spec = spec
def _get_app_or_unit_name(app_or_unit: AppUnitOrName) -> str: """Return name of given application or unit (return strings directly).""" if isinstance(app_or_unit, (model.Application, model.Unit)): return app_or_unit.name elif isinstance(app_or_unit, str): return app_or_unit else: raise TypeError(f'Expected Application | Unit | str, got {type(app_or_unit)}') def _record_calls(cls: Any): """Replace methods on cls with methods that record that they have been called. Iterate all attributes of cls, and for public methods, replace them with a wrapped method that records the method called along with the arguments and keyword arguments. """ for meth_name, orig_method in cls.__dict__.items(): if meth_name.startswith('_'): continue def decorator(orig_method: Any): def wrapped(self: '_TestingModelBackend', *args: Any, **kwargs: Any): full_args = (orig_method.__name__, *args) if kwargs: full_args = (*full_args, kwargs) self._calls.append(full_args) return orig_method(self, *args, **kwargs) return wrapped setattr(cls, meth_name, decorator(orig_method)) return cls def _copy_docstrings(source_cls: Any): """Copy the docstrings from source_cls to target_cls. Use this as: @_copy_docstrings(source_class) class TargetClass: And for any public method that exists on both classes, it will copy the __doc__ for that method. """ def decorator(target_cls: Any): for meth_name in target_cls.__dict__: if meth_name.startswith('_'): continue source_method = source_cls.__dict__.get(meth_name) if source_method is not None and source_method.__doc__: target_cls.__dict__[meth_name].__doc__ = source_method.__doc__ return target_cls return decorator @_record_calls class _TestingConfig(Dict[str, Union[str, int, float, bool]]): """Represents the Juju Config.""" _supported_types = { 'string': str, 'boolean': bool, 'int': int, 'float': float, 'secret': str, # There is some special structure, but they are strings. } def __init__(self, config: '_RawConfig'): super().__init__() self._spec = config self._defaults = self._load_defaults(config) for key, value in self._defaults.items(): if value is None: continue self._config_set(key, value) @staticmethod def _load_defaults(charm_config: '_RawConfig') -> Dict[str, Union[str, int, float, bool]]: """Load default values from config.yaml. Handle the case where a user doesn't supply explicit config snippets. """ if not charm_config: return {} cfg: Dict[str, '_ConfigOption'] = charm_config.get('options', {}) return {key: value.get('default', None) for key, value in cfg.items()} def _config_set(self, key: str, value: Union[str, int, float, bool]): # this is only called by the harness itself # we don't do real serialization/deserialization, but we do check that the value # has the expected type. option = self._spec.get('options', {}).get(key) if not option: raise RuntimeError(f'Unknown config option {key}; ' 'not declared in `config.yaml`.' 'Check https://juju.is/docs/sdk/config for the ' 'spec.') declared_type = option.get('type') if not declared_type: raise RuntimeError(f'Incorrectly formatted `options.yaml`, option {key} ' 'is expected to declare a `type`.') if declared_type not in self._supported_types: raise RuntimeError( 'Incorrectly formatted `options.yaml`: `type` needs to be one ' 'of [{}], not {}.'.format(', '.join(self._supported_types), declared_type)) if type(value) is not self._supported_types[declared_type]: raise RuntimeError(f'Config option {key} is supposed to be of type ' f'{declared_type}, not `{type(value).__name__}`.') # call 'normal' setattr. dict.__setitem__(self, key, value) # type: ignore def __setitem__(self, key: Any, value: Any): # if a charm attempts to config[foo] = bar: raise TypeError("'ConfigData' object does not support item assignment") class _TestingRelationDataContents(Dict[str, str]): def __setitem__(self, key: str, value: str): if not isinstance(key, str): raise model.RelationDataError( f'relation data keys must be strings, not {type(key)}') if not isinstance(value, str): raise model.RelationDataError( f'relation data values must be strings, not {type(value)}') super().__setitem__(key, value) def copy(self): return _TestingRelationDataContents(super().copy()) @dataclasses.dataclass class _SecretRevision: revision: int content: Dict[str, str] @dataclasses.dataclass class _Secret: id: str owner_name: str revisions: List[_SecretRevision] rotate_policy: Optional[str] expire_time: Optional[datetime.datetime] label: Optional[str] = None description: Optional[str] = None tracked: int = 1 grants: Dict[int, Set[str]] = dataclasses.field(default_factory=dict) user_secrets_grants: Set[str] = dataclasses.field(default_factory=set) @_copy_docstrings(model._ModelBackend) @_record_calls class _TestingModelBackend: """This conforms to the interface for ModelBackend but provides canned data. DO NOT use this class directly, it is used by `Harness`_ to drive the model. `Harness`_ is responsible for maintaining the internal consistency of the values here, as the only public methods of this type are for implementing ModelBackend. """ def __init__(self, unit_name: str, meta: charm.CharmMeta, config: '_RawConfig'): self.unit_name = unit_name self.app_name = self.unit_name.split('/')[0] self.model_name = None self.model_uuid = str(uuid.uuid4()) self._harness_tmp_dir = tempfile.TemporaryDirectory(prefix='ops-harness-') self._harness_storage_path = pathlib.Path(self._harness_tmp_dir.name) / "storages" self._harness_container_path = pathlib.Path(self._harness_tmp_dir.name) / "containers" self._harness_storage_path.mkdir() self._harness_container_path.mkdir() # this is used by the _record_calls decorator self._calls: List[Tuple[Any, ...]] = [] self._meta = meta # relation name to [relation_ids,...] self._relation_ids_map: Dict[str, List[int]] = {} # reverse map from relation_id to relation_name self._relation_names: Dict[int, str] = {} # relation_id: [unit_name,...] self._relation_list_map: Dict[int, List[str]] = {} # {relation_id: {name: Dict[str: str]}} self._relation_data_raw: Dict[int, Dict[str, Dict[str, str]]] = {} # {relation_id: {"app": app_name, "units": ["app/0",...]} self._relation_app_and_units: Dict[int, _RelationEntities] = {} self._config = _TestingConfig(config) self._is_leader: bool = False # {resource_name: resource_content} # where resource_content is (path, content) self._resources_map: Dict[str, Tuple[str, Union[str, bytes]]] = {} # fixme: understand how this is used and adjust the type self._pod_spec: Optional[Tuple[model.K8sSpec, Any]] = None self._app_status: _RawStatus = {'status': 'unknown', 'message': ''} self._unit_status: _RawStatus = {'status': 'maintenance', 'message': ''} self._workload_version: Optional[str] = None self._resource_dir: Optional[tempfile.TemporaryDirectory[Any]] = None # Format: # { "storage_name": {"<ID1>": { <other-properties> }, ... } # <ID1>: device id that is key for given storage_name # Initialize the _storage_list with values present on metadata.yaml self._storage_list: Dict[str, Dict[int, Dict[str, Any]]] = { k: {} for k in self._meta.storages} self._storage_attached: Dict[str, Set[int]] = {k: set() for k in self._meta.storages} self._storage_index_counter = 0 # {container_name : _TestingPebbleClient} self._pebble_clients: Dict[str, _TestingPebbleClient] = {} self._pebble_clients_can_connect: Dict[_TestingPebbleClient, bool] = {} self._planned_units: Optional[int] = None self._hook_is_running = '' self._secrets: List[_Secret] = [] self._opened_ports: Set[model.Port] = set() self._networks: Dict[Tuple[Optional[str], Optional[int]], _NetworkDict] = {} self._reboot_count = 0 self._running_action: Optional[_RunningAction] = None self._cloud_spec: Optional[model.CloudSpec] = None def _can_connect(self, pebble_client: '_TestingPebbleClient') -> bool: """Returns whether the mock client is active and can support API calls with no errors.""" return self._pebble_clients_can_connect[pebble_client] def _set_can_connect(self, pebble_client: '_TestingPebbleClient', val: bool): """Manually sets the can_connect state for the given mock client.""" if pebble_client not in self._pebble_clients_can_connect: msg = 'cannot set can_connect for the client - are you running a "real" pebble test?' raise RuntimeError(msg) self._pebble_clients_can_connect[pebble_client] = val def _cleanup(self): if self._resource_dir is not None: self._resource_dir.cleanup() self._resource_dir = None self._harness_tmp_dir.cleanup() def _get_resource_dir(self) -> pathlib.Path: if self._resource_dir is None: # In actual Juju, the resource path for a charm's resource is # $AGENT_DIR/resources/$RESOURCE_NAME/$RESOURCE_FILENAME # However, charms shouldn't depend on this. self._resource_dir = tempfile.TemporaryDirectory(prefix='tmp-ops-test-resource-') res_dir_name = cast(str, self._resource_dir.name) return pathlib.Path(res_dir_name) def relation_ids(self, relation_name: str) -> List[int]: try: return self._relation_ids_map[relation_name] except KeyError: if relation_name not in self._meta.relations: raise model.ModelError(f'{relation_name} is not a known relation') from None no_ids: List[int] = [] return no_ids def relation_list(self, relation_id: int): try: return self._relation_list_map[relation_id] except KeyError: raise model.RelationNotFoundError from None def relation_remote_app_name(self, relation_id: int) -> Optional[str]: if relation_id not in self._relation_app_and_units: # Non-existent or dead relation return None return self._relation_app_and_units[relation_id]['app'] def relation_get(self, relation_id: int, member_name: str, is_app: bool): if is_app and '/' in member_name: member_name = member_name.split('/')[0] if relation_id not in self._relation_data_raw: raise model.RelationNotFoundError() return self._relation_data_raw[relation_id][member_name] def update_relation_data(self, relation_id: int, _entity: Union[model.Unit, model.Application], key: str, value: str): # this is where the 'real' backend would call relation-set. raw_data = self._relation_data_raw[relation_id][_entity.name] if value == '': raw_data.pop(key, None) else: raw_data[key] = value def relation_set(self, relation_id: int, key: str, value: str, is_app: bool): if not isinstance(is_app, bool): raise TypeError('is_app parameter to relation_set must be a boolean') if 'relation_broken' in self._hook_is_running and not self.relation_remote_app_name( relation_id): raise RuntimeError( 'remote-side relation data cannot be accessed during a relation-broken event') if relation_id not in self._relation_data_raw: raise RelationNotFoundError(relation_id) relation = self._relation_data_raw[relation_id] bucket_key = self.app_name if is_app else self.unit_name if bucket_key not in relation: relation[bucket_key] = {} bucket = relation[bucket_key] if value == '': bucket.pop(key, None) else: bucket[key] = value def config_get(self) -> _TestingConfig: return self._config def is_leader(self): return self._is_leader def application_version_set(self, version: str): self._workload_version = version def resource_get(self, resource_name: str): if resource_name not in self._resources_map: raise model.ModelError( "ERROR could not download resource: HTTP request failed: " "Get https://.../units/unit-{}/resources/{}: resource#{}/{} not found".format( self.unit_name.replace('/', '-'), resource_name, self.app_name, resource_name )) filename, contents = self._resources_map[resource_name] resource_dir = self._get_resource_dir() resource_filename = resource_dir / resource_name / filename if not resource_filename.exists(): mode = 'wb' if isinstance(contents, bytes) else 'wt' resource_filename.parent.mkdir(exist_ok=True) with resource_filename.open(mode=mode) as resource_file: resource_file.write(contents) return resource_filename def pod_spec_set(self, spec: 'model.K8sSpec', k8s_resources: Any): # fixme: any self._pod_spec = (spec, k8s_resources) def status_get(self, *, is_app: bool = False): if is_app: return self._app_status else: return self._unit_status def status_set(self, status: '_StatusName', message: str = '', *, is_app: bool = False): if status in [model.ErrorStatus.name, model.UnknownStatus.name]: raise model.ModelError(f'ERROR invalid status "{status}", expected one of' ' [maintenance blocked waiting active]') if is_app: self._app_status = {'status': status, 'message': message} else: self._unit_status = {'status': status, 'message': message} def storage_list(self, name: str, include_detached: bool = False): """Returns a list of all attached storage mounts for the given storage name. Args: name: name (i.e. from metadata.yaml). include_detached: True to include unattached storage mounts as well. """ return [index for index in self._storage_list[name] if include_detached or self._storage_is_attached(name, index)] def storage_get(self, storage_name_id: str, attribute: str) -> Any: name, index = storage_name_id.split("/", 1) index = int(index) try: if index not in self._storage_attached[name]: raise KeyError() # Pretend the key isn't there else: return self._storage_list[name][index][attribute] except KeyError: raise model.ModelError( f'ERROR invalid value "{name}/{index}" for option -s: storage not found') from None def storage_add(self, name: str, count: int = 1) -> List[int]: if '/' in name: raise model.ModelError('storage name cannot contain "/"') if name not in self._storage_list: self._storage_list[name] = {} result: List[int] = [] for _ in range(count): index = self._storage_index_counter self._storage_index_counter += 1 self._storage_list[name][index] = { 'location': os.path.join(self._harness_storage_path, name, str(index)), } result.append(index) return result def _storage_detach(self, storage_id: str): # NOTE: This is an extra function for _TestingModelBackend to simulate # detachment of a storage unit. This is not present in ops.model._ModelBackend. name, index = storage_id.split('/', 1) index = int(index) for container, client in self._pebble_clients.items(): for mount in self._meta.containers[container].mounts.values(): if mount.storage != name: continue root = client._root (root / mount.location[1:]).unlink() if self._storage_is_attached(name, index): self._storage_attached[name].remove(index) def _storage_attach(self, storage_id: str): """Mark the named storage_id as attached and return True if it was previously detached.""" # NOTE: This is an extra function for _TestingModelBackend to simulate # re-attachment of a storage unit. This is not present in # ops.model._ModelBackend. name, index = storage_id.split('/', 1) for container, client in self._pebble_clients.items(): for mount in self._meta.containers[container].mounts.values(): if mount.storage != name: continue for store in self._storage_list[mount.storage].values(): root = client._root mounting_dir = root / mount.location[1:] mounting_dir.parent.mkdir(parents=True, exist_ok=True) target_dir = pathlib.Path(store["location"]) target_dir.mkdir(parents=True, exist_ok=True) try: mounting_dir.symlink_to(target_dir, target_is_directory=True) except FileExistsError: # If the symlink is already the one we want, then we # don't need to do anything here. # NOTE: In Python 3.9, this can use `mounting_dir.readlink()` if ( not mounting_dir.is_symlink() or os.readlink(mounting_dir) != str(target_dir) ): raise index = int(index) if not self._storage_is_attached(name, index): self._storage_attached[name].add(index) return True return False def _storage_is_attached(self, storage_name: str, storage_index: int): return storage_index in self._storage_attached[storage_name] def _storage_remove(self, storage_id: str): # NOTE: This is an extra function for _TestingModelBackend to simulate # full removal of a storage unit. This is not present in # ops.model._ModelBackend. self._storage_detach(storage_id) name, index = storage_id.split('/', 1) index = int(index) self._storage_list[name].pop(index, None) def action_get(self) -> Dict[str, Any]: params: Dict[str, Any] = {} assert self._running_action is not None action_meta = self._meta.actions[self._running_action.name] for name, meta in action_meta.parameters.items(): if "default" in meta: params[name] = meta["default"] params.update(self._running_action.parameters) return params def action_set(self, results: Dict[str, Any]): assert self._running_action is not None for key in ("stdout", "stderr", "stdout-encoding", "stderr-encoding"): if key in results: # Match Juju's error message. raise model.ModelError(f'ERROR cannot set reserved action key "{key}"') # Although it's not necessary, we go through the same flattening process # as the real backend, in order to give Charmers advance notice if they # are setting results that will not work. # This also does some validation on keys to make sure that they fit the # Juju constraints. model._format_action_result_dict(results) # Validate, but ignore returned value. self._running_action.output.results.update(results) def action_log(self, message: str): assert self._running_action is not None self._running_action.output.logs.append(message) def action_fail(self, message: str = ''): assert self._running_action is not None # If fail is called multiple times, Juju only retains the most recent failure message. self._running_action.failure_message = message def network_get(self, endpoint_name: str, relation_id: Optional[int] = None) -> '_NetworkDict': data = self._networks.get((endpoint_name, relation_id)) if data is not None: return data if relation_id is not None: # Fall back to the default binding for this endpoint data = self._networks.get((endpoint_name, None)) if data is not None: return data # No custom data per relation ID or binding, return the default binding data = self._networks.get((None, None)) if data is not None: return data raise RelationNotFoundError def add_metrics(self, metrics, labels=None): # type:ignore raise NotImplementedError(self.add_metrics) # type:ignore @classmethod def log_split(cls, message, max_len=model.MAX_LOG_LINE_LEN): # type:ignore raise NotImplementedError(cls.log_split) # type:ignore def juju_log(self, level, msg): # type:ignore raise NotImplementedError(self.juju_log) # type:ignore def get_pebble(self, socket_path: str) -> '_TestingPebbleClient': container = socket_path.split('/')[3] # /charm/containers/<container_name>/pebble.socket client = self._pebble_clients.get(container, None) if client is None: container_root = self._harness_container_path / container container_root.mkdir() client = _TestingPebbleClient(self, container_root=container_root) # we need to know which container a new pebble client belongs to # so we can figure out which storage mounts must be simulated on # this pebble client's mock file systems when storage is # attached/detached later. self._pebble_clients[container] = client self._pebble_clients_can_connect[client] = False return client def planned_units(self) -> int: """Simulate fetching the number of planned application units from the model. If self._planned_units is None, then we simulate what the Juju controller will do, which is to report the number of peers, plus one (we include this unit in the count). This can be overridden for testing purposes: a charm author can set the number of planned units explicitly by calling `Harness.set_planned_units` """ if self._planned_units is not None: return self._planned_units units: Set[str] = set() peer_names: Set[str] = set(self._meta.peers.keys()) for peer_id, peer_name in self._relation_names.items(): if peer_name not in peer_names: continue peer_units = self._relation_list_map[peer_id] units.update(peer_units) return len(units) + 1 # Account for this unit. def _get_secret(self, id: str) -> Optional[_Secret]: return next((s for s in self._secrets if s.id == id), None) def _ensure_secret(self, id: str) -> _Secret: secret = self._get_secret(id) if secret is None: raise model.SecretNotFoundError(f'Secret {id!r} not found') return secret def _ensure_secret_id_or_label(self, id: Optional[str], label: Optional[str]): secret = None if id is not None: secret = self._get_secret(id) if secret is not None and label is not None: secret.label = label # both id and label given, update label if secret is None and label is not None: secret = next((s for s in self._secrets if s.label == label), None) if secret is None: raise model.SecretNotFoundError( f'Secret not found by ID ({id!r}) or label ({label!r})') return secret def secret_get(self, *, id: Optional[str] = None, label: Optional[str] = None, refresh: bool = False, peek: bool = False) -> Dict[str, str]: secret = self._ensure_secret_id_or_label(id, label) if secret.owner_name == self.model_uuid: # This is a user secret - charms only ever have view access. if self.app_name not in secret.user_secrets_grants: raise model.SecretNotFoundError( f'Secret {id!r} not granted access to {self.app_name!r}') elif secret.owner_name not in [self.app_name, self.unit_name]: # This is a model secret - the model might have admin or view access. # Check that caller has permission to get this secret # Observer is calling: does secret have a grant on relation between # this charm (the observer) and the secret owner's app? owner_app = secret.owner_name.split('/')[0] relation_id = self._relation_id_to(owner_app) if relation_id is None: raise model.SecretNotFoundError( f'Secret {id!r} does not have relation to {owner_app!r}') grants = secret.grants.get(relation_id, set()) if self.app_name not in grants and self.unit_name not in grants: raise model.SecretNotFoundError( f'Secret {id!r} not granted access to {self.app_name!r} or {self.unit_name!r}') if peek or refresh: revision = secret.revisions[-1] if refresh: secret.tracked = revision.revision else: revision = next((r for r in secret.revisions if r.revision == secret.tracked), None) if revision is None: raise model.SecretNotFoundError(f'Secret {id!r} tracked revision was removed') return revision.content def _relation_id_to(self, remote_app: str) -> Optional[int]: """Return relation ID of relation from charm's app to remote app.""" for relation_id, app_units in self._relation_app_and_units.items(): if app_units['app'] == remote_app: return relation_id return None def _ensure_secret_owner(self, secret: _Secret): # For unit secrets, the owner unit has manage permissions. For app # secrets, the leader has manage permissions and other units only have # view permissions. # https://discourse.charmhub.io/t/secret-access-permissions/12627 # For user secrets the secret owner is the model, that is, # `secret.owner_name == self.model.uuid`, only model admins have # manage permissions: https://juju.is/docs/juju/secret. unit_secret = secret.owner_name == self.unit_name app_secret = secret.owner_name == self.app_name if unit_secret or (app_secret and self.is_leader()): return raise model.SecretNotFoundError( f'You must own secret {secret.id!r} to perform this operation') def secret_info_get(self, *, id: Optional[str] = None, label: Optional[str] = None) -> model.SecretInfo: secret = self._ensure_secret_id_or_label(id, label) self._ensure_secret_owner(secret) rotates = None rotation = None if secret.rotate_policy is not None: rotation = model.SecretRotate(secret.rotate_policy) if secret.rotate_policy != model.SecretRotate.NEVER: # Just set a fake rotation time some time in the future rotates = datetime.datetime.now() + datetime.timedelta(days=1) return model.SecretInfo( id=secret.id, label=secret.label, revision=secret.tracked, expires=secret.expire_time, rotation=rotation, rotates=rotates, ) def secret_set(self, id: str, *, content: Optional[Dict[str, str]] = None, label: Optional[str] = None, description: Optional[str] = None, expire: Optional[datetime.datetime] = None, rotate: Optional[model.SecretRotate] = None) -> None: secret = self._ensure_secret(id) self._ensure_secret_owner(secret) if content is None: content = secret.revisions[-1].content revision = _SecretRevision( revision=secret.revisions[-1].revision + 1, content=content ) secret.revisions.append(revision) if label is not None: if label: secret.label = label else: secret.label = None # clear label if description is not None: if description: secret.description = description else: secret.description = None # clear description if expire is not None: secret.expire_time = expire if rotate is not None: if rotate != model.SecretRotate.NEVER: secret.rotate_policy = rotate.value else: secret.rotate_policy = None # clear rotation policy @classmethod def _generate_secret_id(cls) -> str: # Not a proper Juju secrets-style xid, but that's okay return f"secret:{uuid.uuid4()}" def secret_add(self, content: Dict[str, str], *, label: Optional[str] = None, description: Optional[str] = None, expire: Optional[datetime.datetime] = None, rotate: Optional[model.SecretRotate] = None, owner: Optional[str] = None) -> str: owner_name = self.unit_name if owner == 'unit' else self.app_name return self._secret_add(content, owner_name, label=label, description=description, expire=expire, rotate=rotate) def _secret_add(self, content: Dict[str, str], owner_name: str, *, label: Optional[str] = None, description: Optional[str] = None, expire: Optional[datetime.datetime] = None, rotate: Optional[model.SecretRotate] = None) -> str: id = self._generate_secret_id() revision = _SecretRevision( revision=1, content=content, ) secret = _Secret( id=id, owner_name=owner_name, revisions=[revision], rotate_policy=rotate.value if rotate is not None else None, expire_time=expire, label=label, description=description, ) self._secrets.append(secret) return id def secret_grant(self, id: str, relation_id: int, *, unit: Optional[str] = None) -> None: secret = self._ensure_secret(id) self._ensure_secret_owner(secret) if relation_id not in secret.grants: secret.grants[relation_id] = set() remote_app_name = self._relation_app_and_units[relation_id]['app'] secret.grants[relation_id].add(unit or remote_app_name) def secret_revoke(self, id: str, relation_id: int, *, unit: Optional[str] = None) -> None: secret = self._ensure_secret(id) self._ensure_secret_owner(secret) if relation_id not in secret.grants: return remote_app_name = self._relation_app_and_units[relation_id]['app'] secret.grants[relation_id].discard(unit or remote_app_name) def secret_remove(self, id: str, *, revision: Optional[int] = None) -> None: secret = self._ensure_secret(id) self._ensure_secret_owner(secret) if revision is not None: revisions = [r for r in secret.revisions if r.revision != revision] if len(revisions) == len(secret.revisions): raise model.SecretNotFoundError(f'Secret {id!r} revision {revision} not found') if revisions: secret.revisions = revisions else: # Last revision removed, remove entire secret self._secrets = [s for s in self._secrets if s.id != id] else: self._secrets = [s for s in self._secrets if s.id != id] def open_port(self, protocol: str, port: Optional[int] = None): self._check_protocol_and_port(protocol, port) protocol_lit = cast(Literal['tcp', 'udp', 'icmp'], protocol) self._opened_ports.add(model.Port(protocol_lit, port)) def close_port(self, protocol: str, port: Optional[int] = None): self._check_protocol_and_port(protocol, port) protocol_lit = cast(Literal['tcp', 'udp', 'icmp'], protocol) self._opened_ports.discard(model.Port(protocol_lit, port)) def opened_ports(self) -> Set[model.Port]: return set(self._opened_ports) def _check_protocol_and_port(self, protocol: str, port: Optional[int]): # Simulate the error messages we get from Juju (not that charm tests # should be testing details of error messages). if protocol == 'icmp': if port is not None: raise model.ModelError(f'ERROR protocol "{protocol}" doesn\'t support any ports; got "{port}"\n') # noqa: E501 elif protocol in ['tcp', 'udp']: if port is None: raise model.ModelError(f'ERROR invalid port "{protocol}": strconv.Atoi: parsing "{protocol}": invalid syntax\n') # noqa: E501 if not (1 <= port <= 65535): raise model.ModelError(f'ERROR port range bounds must be between 1 and 65535, got {port}-{port}\n') # noqa: E501 else: raise model.ModelError(f'ERROR invalid protocol "{protocol}", expected "tcp", "udp", or "icmp"\n') # noqa: E501 def reboot(self, now: bool = False): self._reboot_count += 1 if not now: return # This should exit, reboot, and re-emit the event, but we'll need the caller # to handle everything after the exit. raise SystemExit() def credential_get(self) -> model.CloudSpec: if not self._cloud_spec: raise model.ModelError( 'ERROR cloud spec is empty, set it with `Harness.set_cloud_spec()` first') return self._cloud_spec @_copy_docstrings(pebble.ExecProcess) class _TestingExecProcess: def __init__(self, command: List[str], timeout: Optional[float], exit_code: Optional[int], stdin: Union[TextIO, BinaryIO, None], stdout: Union[TextIO, BinaryIO, None], stderr: Union[TextIO, BinaryIO, None], is_timeout: bool): self._command = command self._timeout = timeout self._is_timeout = is_timeout if exit_code is None and not is_timeout: raise ValueError("when is_timeout is False, exit_code must not be None") self._exit_code = exit_code self.stdin = stdin self.stdout = stdout self.stderr = stderr def wait(self): if self._is_timeout: raise pebble.TimeoutError( f'timed out waiting for change ({self._timeout} seconds)' ) if self._exit_code != 0: raise pebble.ExecError(self._command, cast(int, self._exit_code), None, None) def wait_output(self) -> Tuple[AnyStr, Optional[AnyStr]]: if self._is_timeout: raise pebble.TimeoutError( f'timed out waiting for change ({self._timeout} seconds)' ) out_value = self.stdout.read() if self.stdout is not None else None err_value = self.stderr.read() if self.stderr is not None else None if self._exit_code != 0: raise pebble.ExecError[AnyStr](self._command, cast(int, self._exit_code), cast(Union[AnyStr, None], out_value), cast(Union[AnyStr, None], err_value)) return cast(AnyStr, out_value), cast(Union[AnyStr, None], err_value) def send_signal(self, sig: Union[int, str]): # the process is always terminated when ExecProcess is return in the simulation. raise BrokenPipeError("[Errno 32] Broken pipe") @_copy_docstrings(pebble.Client) class _TestingPebbleClient: """This conforms to the interface for pebble.Client but provides canned data. DO NOT use this class directly, it is used by `Harness`_ to run interactions with Pebble. `Harness`_ is responsible for maintaining the internal consistency of the values here, as the only public methods of this type are for implementing Client. """ def __init__(self, backend: _TestingModelBackend, container_root: pathlib.Path): self._backend = _TestingModelBackend self._layers: Dict[str, pebble.Layer] = {} # Has a service been started/stopped? self._service_status: Dict[str, pebble.ServiceStatus] = {} self._root = container_root self._backend = backend self._exec_handlers: Dict[Tuple[str, ...], ExecHandler] = {} self._notices: Dict[Tuple[str, str], pebble.Notice] = {} self._last_notice_id = 0 def _handle_exec(self, command_prefix: Sequence[str], handler: ExecHandler): prefix = tuple(command_prefix) self._exec_handlers[prefix] = handler def _check_connection(self): if not self._backend._can_connect(self): msg = ('Cannot connect to Pebble; did you forget to call ' 'begin_with_initial_hooks() or set_can_connect()?') raise pebble.ConnectionError(msg) def get_system_info(self) -> pebble.SystemInfo: self._check_connection() return pebble.SystemInfo(version='1.0.0') def get_warnings( self, select: pebble.WarningState = pebble.WarningState.PENDING, ) -> List['pebble.Warning']: raise NotImplementedError(self.get_warnings) def ack_warnings(self, timestamp: datetime.datetime) -> int: raise NotImplementedError(self.ack_warnings) def get_changes( self, select: pebble.ChangeState = pebble.ChangeState.IN_PROGRESS, service: Optional[str] = None, ) -> List[pebble.Change]: raise NotImplementedError(self.get_changes) def get_change(self, change_id: pebble.ChangeID) -> pebble.Change: raise NotImplementedError(self.get_change) def abort_change(self, change_id: pebble.ChangeID) -> pebble.Change: raise NotImplementedError(self.abort_change) def autostart_services(self, timeout: float = 30.0, delay: float = 0.1): self._check_connection() for name, service in self._render_services().items(): # TODO: jam 2021-04-20 This feels awkward that Service.startup might be a string or # might be an enum. Probably should make Service.startup a property rather than an # attribute. if service.startup == '': startup = pebble.ServiceStartup.DISABLED else: startup = pebble.ServiceStartup(service.startup) if startup == pebble.ServiceStartup.ENABLED: self._service_status[name] = pebble.ServiceStatus.ACTIVE def replan_services(self, timeout: float = 30.0, delay: float = 0.1): return self.autostart_services(timeout, delay) def start_services( self, services: List[str], timeout: float = 30.0, delay: float = 0.1, ): # A common mistake is to pass just the name of a service, rather than a list of services, # so trap that so it is caught quickly. if isinstance(services, str): raise TypeError(f'start_services should take a list of names, not just "{services}"') self._check_connection() # Note: jam 2021-04-20 We don't implement ChangeID, but the default caller of this is # Container.start() which currently ignores the return value known_services = self._render_services() # Names appear to be validated before any are activated, so do two passes for name in services: if name not in known_services: # TODO: jam 2021-04-20 This needs a better error type raise RuntimeError(f'400 Bad Request: service "{name}" does not exist') for name in services: self._service_status[name] = pebble.ServiceStatus.ACTIVE def stop_services( self, services: List[str], timeout: float = 30.0, delay: float = 0.1, ): # handle a common mistake of passing just a name rather than a list of names if isinstance(services, str): raise TypeError(f'stop_services should take a list of names, not just "{services}"') self._check_connection() # Note: jam 2021-04-20 We don't implement ChangeID, but the default caller of this is # Container.stop() which currently ignores the return value known_services = self._render_services() for name in services: if name not in known_services: # TODO: jam 2021-04-20 This needs a better error type # 400 Bad Request: service "bal" does not exist raise RuntimeError(f'400 Bad Request: service "{name}" does not exist') for name in services: self._service_status[name] = pebble.ServiceStatus.INACTIVE def restart_services( self, services: List[str], timeout: float = 30.0, delay: float = 0.1, ): # handle a common mistake of passing just a name rather than a list of names if isinstance(services, str): raise TypeError(f'restart_services should take a list of names, not just "{services}"') self._check_connection() # Note: jam 2021-04-20 We don't implement ChangeID, but the default caller of this is # Container.restart() which currently ignores the return value known_services = self._render_services() for name in services: if name not in known_services: # TODO: jam 2021-04-20 This needs a better error type # 400 Bad Request: service "bal" does not exist raise RuntimeError(f'400 Bad Request: service "{name}" does not exist') for name in services: self._service_status[name] = pebble.ServiceStatus.ACTIVE def wait_change( self, change_id: pebble.ChangeID, timeout: float = 30.0, delay: float = 0.1, ) -> pebble.Change: raise NotImplementedError(self.wait_change) def add_layer( self, label: str, layer: Union[str, 'pebble.LayerDict', pebble.Layer], *, combine: bool = False): # I wish we could combine some of this helpful object corralling with the actual backend, # rather than having to re-implement it. Maybe we could subclass if not isinstance(label, str): raise TypeError(f'label must be a str, not {type(label).__name__}') if isinstance(layer, (str, dict)): layer_obj = pebble.Layer(layer) elif isinstance(layer, pebble.Layer): layer_obj = layer else: raise TypeError( f'layer must be str, dict, or pebble.Layer, not {type(layer).__name__}') self._check_connection() if label in self._layers: if not combine: raise RuntimeError(f'400 Bad Request: layer "{label}" already exists') layer = self._layers[label] for name, service in layer_obj.services.items(): # 'override' is actually single quoted in the real error, but # it shouldn't be, hopefully that gets cleaned up. if not service.override: raise RuntimeError(f'500 Internal Server Error: layer "{label}" must define' f'"override" for service "{name}"') if service.override not in ('merge', 'replace'): raise RuntimeError(f'500 Internal Server Error: layer "{label}" has invalid ' f'"override" value on service "{name}"') elif service.override == 'replace': layer.services[name] = service elif service.override == 'merge': if combine and name in layer.services: s = layer.services[name] s._merge(service) else: layer.services[name] = service else: self._layers[label] = layer_obj def _render_services(self) -> Dict[str, pebble.Service]: services: Dict[str, pebble.Service] = {} for key in sorted(self._layers.keys()): layer = self._layers[key] for name, service in layer.services.items(): # TODO: merge existing services https://github.com/canonical/operator/issues/1112 services[name] = service return services def _render_checks(self) -> Dict[str, pebble.Check]: checks: Dict[str, pebble.Check] = {} for key in sorted(self._layers.keys()): layer = self._layers[key] for name, check in layer.checks.items(): checks[name] = check return checks def _render_log_targets(self) -> Dict[str, pebble.LogTarget]: log_targets: Dict[str, pebble.LogTarget] = {} for key in sorted(self._layers.keys()): layer = self._layers[key] for name, log_target in layer.log_targets.items(): log_targets[name] = log_target return log_targets def get_plan(self) -> pebble.Plan: self._check_connection() plan = pebble.Plan('{}') plan.services.update(self._render_services()) plan.checks.update(self._render_checks()) plan.log_targets.update(self._render_log_targets()) return plan def get_services(self, names: Optional[List[str]] = None) -> List[pebble.ServiceInfo]: if isinstance(names, str): raise TypeError(f'start_services should take a list of names, not just "{names}"') self._check_connection() services = self._render_services() infos: List[pebble.ServiceInfo] = [] if names is None: names = sorted(services.keys()) for name in sorted(names): try: service = services[name] except KeyError: # in pebble, it just returns "nothing matched" if there are 0 matches, # but it ignores services it doesn't recognize continue status = self._service_status.get(name, pebble.ServiceStatus.INACTIVE) if service.startup == '': startup = pebble.ServiceStartup.DISABLED else: startup = pebble.ServiceStartup(service.startup) info = pebble.ServiceInfo(name, startup=startup, current=pebble.ServiceStatus(status)) infos.append(info) return infos @staticmethod def _check_absolute_path(path: str): if not path.startswith("/"): raise pebble.PathError( 'generic-file-error', f'paths must be absolute, got {path!r}' ) def pull(self, path: str, *, encoding: Optional[str] = 'utf-8') -> Union[BinaryIO, TextIO]: self._check_connection() self._check_absolute_path(path) file_path = self._root / path[1:] try: return cast( Union[BinaryIO, TextIO], file_path.open("rb" if encoding is None else "r", encoding=encoding)) except FileNotFoundError: raise pebble.PathError('not-found', f'stat {path}: no such file or directory') from None except IsADirectoryError: raise pebble.PathError('generic-file-error', f'can only read a regular file: "{path}"') from None def push( self, path: str, source: 'ReadableBuffer', *, encoding: str = 'utf-8', make_dirs: bool = False, permissions: Optional[int] = None, user_id: Optional[int] = None, user: Optional[str] = None, group_id: Optional[int] = None, group: Optional[str] = None ) -> None: self._check_connection() if permissions is not None and not (0 <= permissions <= 0o777): raise pebble.PathError( 'generic-file-error', f'permissions not within 0o000 to 0o777: {permissions:#o}') self._check_absolute_path(path) file_path = self._root / path[1:] if make_dirs and not file_path.parent.exists(): self.make_dir( os.path.dirname(path), make_parents=True, permissions=None, user_id=user_id, user=user, group_id=group_id, group=group) permissions = permissions if permissions is not None else 0o644 try: if isinstance(source, str): file_path.write_text(source, encoding=encoding) elif isinstance(source, bytes): file_path.write_bytes(source) else: # If source is binary, open file in binary mode and ignore encoding param is_binary = isinstance(source.read(0), bytes) # type: ignore open_mode = 'wb' if is_binary else 'w' open_encoding = None if is_binary else encoding with file_path.open(open_mode, encoding=open_encoding) as f: shutil.copyfileobj(cast(IOBase, source), cast(IOBase, f)) os.chmod(file_path, permissions) except FileNotFoundError as e: raise pebble.PathError( 'not-found', f'parent directory not found: {e.args[0]}') from None except NotADirectoryError: raise pebble.PathError('generic-file-error', f'open {path}.~: not a directory') from None def list_files(self, path: str, *, pattern: Optional[str] = None, itself: bool = False) -> List[pebble.FileInfo]: self._check_connection() self._check_absolute_path(path) file_path = self._root / path[1:] if not file_path.exists(): raise self._api_error(404, f"stat {path}: no such file or directory") files = [file_path] if not itself: try: files = [file_path / file for file in os.listdir(file_path)] except NotADirectoryError: pass if pattern is not None: files = [file for file in files if fnmatch.fnmatch(file.name, pattern)] file_infos = [ Container._build_fileinfo(file) for file in files ] for file_info in file_infos: rel_path = os.path.relpath(file_info.path, start=self._root) rel_path = '/' if rel_path == '.' else '/' + rel_path file_info.path = rel_path if rel_path == "/": file_info.name = "/" return file_infos def make_dir( self, path: str, *, make_parents: bool = False, permissions: Optional[int] = None, user_id: Optional[int] = None, user: Optional[str] = None, group_id: Optional[int] = None, group: Optional[str] = None ) -> None: self._check_connection() if permissions is not None and not (0 <= permissions <= 0o777): raise pebble.PathError( 'generic-file-error', f'permissions not within 0o000 to 0o777: {permissions:#o}') self._check_absolute_path(path) dir_path = self._root / path[1:] if not dir_path.parent.exists() and not make_parents: raise pebble.PathError( 'not-found', f'parent directory not found: {path}') if not dir_path.parent.exists() and make_parents: self.make_dir( os.path.dirname(path), make_parents=True, permissions=permissions, user_id=user_id, user=user, group_id=group_id, group=group) try: permissions = permissions if permissions else 0o755 dir_path.mkdir() os.chmod(dir_path, permissions) except FileExistsError: if not make_parents: raise pebble.PathError( 'generic-file-error', f'mkdir {path}: file exists') from None except NotADirectoryError as e: # Attempted to create a subdirectory of a file raise pebble.PathError('generic-file-error', f'not a directory: {e.args[0]}') from None def remove_path(self, path: str, *, recursive: bool = False): self._check_connection() self._check_absolute_path(path) file_path = self._root / path[1:] if not file_path.exists(): if recursive: return raise pebble.PathError( 'not-found', f'remove {path}: no such file or directory') if file_path.is_dir(): if recursive: shutil.rmtree(file_path) else: try: file_path.rmdir() except OSError as e: raise pebble.PathError( 'generic-file-error', 'cannot remove non-empty directory without recursive=True') from e else: file_path.unlink() def _find_exec_handler(self, command: List[str]) -> Optional[ExecHandler]: for prefix_len in reversed(range(len(command) + 1)): command_prefix = tuple(command[:prefix_len]) if command_prefix in self._exec_handlers: return self._exec_handlers[command_prefix] return None def _transform_exec_handler_output(self, data: Union[str, bytes], encoding: Optional[str]) -> Union[io.BytesIO, io.StringIO]: if isinstance(data, bytes): if encoding is None: return io.BytesIO(data) else: return io.StringIO(data.decode(encoding=encoding)) else: if encoding is None: raise ValueError( f"exec handler must return bytes if encoding is None," f"not {data.__class__.__name__}") else: return io.StringIO(typing.cast(str, data)) def exec( self, command: List[str], *, service_context: Optional[str] = None, environment: Optional[Dict[str, str]] = None, working_dir: Optional[str] = None, timeout: Optional[float] = None, user_id: Optional[int] = None, user: Optional[str] = None, group_id: Optional[int] = None, group: Optional[str] = None, stdin: Optional[Union[str, bytes, TextIO, BinaryIO]] = None, stdout: Optional[Union[TextIO, BinaryIO]] = None, stderr: Optional[Union[TextIO, BinaryIO]] = None, encoding: Optional[str] = 'utf-8', combine_stderr: bool = False ) -> ExecProcess[Any]: self._check_connection() handler = self._find_exec_handler(command) if handler is None: message = "execution handler not found, please register one using Harness.handle_exec" raise self._api_error(500, message) environment = {} if environment is None else environment if service_context is not None: plan = self.get_plan() if service_context not in plan.services: message = f'context service "{service_context}" not found' raise self._api_error(500, message) service = plan.services[service_context] environment = {**service.environment, **environment} working_dir = service.working_dir if working_dir is None else working_dir user = service.user if user is None else user user_id = service.user_id if user_id is None else user_id group = service.group if group is None else group group_id = service.group_id if group_id is None else group_id if hasattr(stdin, "read"): stdin = stdin.read() # type: ignore exec_args = ExecArgs( command=command, environment=environment, working_dir=working_dir, timeout=timeout, user_id=user_id, user=user, group_id=group_id, group=group, stdin=cast(Union[str, bytes, None], stdin), encoding=encoding, combine_stderr=combine_stderr ) proc_stdin = self._transform_exec_handler_output(b"", encoding) if stdin is not None: proc_stdin = None proc_stdout = self._transform_exec_handler_output(b"", encoding) proc_stderr = self._transform_exec_handler_output(b"", encoding) try: result = handler(exec_args) except TimeoutError: if timeout is not None: exec_process = _TestingExecProcess(command=command, timeout=timeout, exit_code=None, stdin=proc_stdin, stdout=proc_stdout, stderr=proc_stderr, is_timeout=True) return cast(pebble.ExecProcess[Any], exec_process) else: raise RuntimeError( "a TimeoutError occurred in the execution handler, " "but no timeout value was provided in the execution arguments." ) from None if result is None: exit_code = 0 proc_stdout = self._transform_exec_handler_output(b'', encoding) proc_stderr = self._transform_exec_handler_output(b'', encoding) elif isinstance(result, ExecResult): exit_code = result.exit_code proc_stdout = self._transform_exec_handler_output(result.stdout, encoding) proc_stderr = self._transform_exec_handler_output(result.stderr, encoding) else: raise TypeError(f"execution handler returned an unexpected type: {type(result)!r}.") if combine_stderr and proc_stderr.getvalue(): raise ValueError("execution handler returned a non-empty stderr " "even though combine_stderr is enabled.") if stdout is not None: shutil.copyfileobj(cast(io.IOBase, proc_stdout), cast(io.IOBase, stdout)) proc_stdout = None if stderr is not None: shutil.copyfileobj(cast(io.IOBase, proc_stderr), cast(io.IOBase, stderr)) proc_stderr = None exec_process = _TestingExecProcess(command=command, timeout=timeout, exit_code=exit_code, stdin=proc_stdin, stdout=proc_stdout, stderr=proc_stderr, is_timeout=False) return cast(pebble.ExecProcess[Any], exec_process) def send_signal(self, sig: Union[int, str], service_names: Iterable[str]): if not service_names: raise TypeError('send_signal expected at least 1 service name, got 0') self._check_connection() # Convert signal to str if isinstance(sig, int): sig = signal.Signals(sig).name # pebble first validates the service name, and then the signal name plan = self.get_plan() for service in service_names: if service not in plan.services or not self.get_services([service])[0].is_running(): # conform with the real pebble api message = f'cannot send signal to "{service}": service is not running' raise self._api_error(500, message) # Check if signal name is valid try: signal.Signals[sig] except KeyError: # conform with the real pebble api first_service = next(iter(service_names)) message = f'cannot send signal to "{first_service}": invalid signal name "{sig}"' raise self._api_error(500, message) from None def get_checks(self, level=None, names=None): # type:ignore raise NotImplementedError(self.get_checks) # type:ignore def notify(self, type: pebble.NoticeType, key: str, *, data: Optional[Dict[str, str]] = None, repeat_after: Optional[datetime.timedelta] = None) -> str: notice_id, _ = self._notify(type, key, data=data, repeat_after=repeat_after) return notice_id def _notify(self, type: pebble.NoticeType, key: str, *, data: Optional[Dict[str, str]] = None, repeat_after: Optional[datetime.timedelta] = None) -> Tuple[str, bool]: """Record an occurrence of a notice with the specified details. Return a tuple of (notice_id, new_or_repeated). """ if type != pebble.NoticeType.CUSTOM: message = f'invalid type "{type.value}" (can only add "custom" notices)' raise self._api_error(400, message) # The shape of the code below is taken from State.AddNotice in Pebble. now = datetime.datetime.now(tz=datetime.timezone.utc) new_or_repeated = False unique_key = (type.value, key) notice = self._notices.get(unique_key) if notice is None: # First occurrence of this notice uid+type+key self._last_notice_id += 1 notice = pebble.Notice( id=str(self._last_notice_id), user_id=0, # Charm should always be able to read pebble_notify notices. type=type, key=key, first_occurred=now, last_occurred=now, last_repeated=now, expire_after=datetime.timedelta(days=7), occurrences=1, last_data=data or {}, repeat_after=repeat_after, ) self._notices[unique_key] = notice new_or_repeated = True else: # Additional occurrence, update existing notice last_repeated = notice.last_repeated if repeat_after is None or now > notice.last_repeated + repeat_after: # Update last repeated time if repeat-after time has elapsed (or is None) last_repeated = now new_or_repeated = True notice = dataclasses.replace( notice, last_occurred=now, last_repeated=last_repeated, occurrences=notice.occurrences + 1, last_data=data or {}, repeat_after=repeat_after, ) self._notices[unique_key] = notice return notice.id, new_or_repeated def _api_error(self, code: int, message: str) -> pebble.APIError: status = http.HTTPStatus(code).phrase body = { 'type': 'error', 'status-code': code, 'status': status, 'result': {'message': message}, } return pebble.APIError(body, code, status, message) def get_notice(self, id: str) -> pebble.Notice: for notice in self._notices.values(): if notice.id == id: return notice raise self._api_error(404, f'cannot find notice with ID "{id}"') def get_notices( self, *, users: Optional[pebble.NoticesUsers] = None, user_id: Optional[int] = None, types: Optional[Iterable[Union[pebble.NoticeType, str]]] = None, keys: Optional[Iterable[str]] = None, ) -> List[pebble.Notice]: # Similar logic as api_notices.go:v1GetNotices in Pebble. filter_user_id = 0 # default is to filter by request UID (root) if user_id is not None: filter_user_id = user_id if users is not None: if user_id is not None: raise self._api_error(400, 'cannot use both "users" and "user_id"') filter_user_id = None if types is not None: types = {(t.value if isinstance(t, pebble.NoticeType) else t) for t in types} if keys is not None: keys = set(keys) notices = [notice for notice in self._notices.values() if self._notice_matches(notice, filter_user_id, types, keys)] notices.sort(key=lambda notice: notice.last_repeated) return notices @staticmethod def _notice_matches(notice: pebble.Notice, user_id: Optional[int] = None, types: Optional[Set[str]] = None, keys: Optional[Set[str]] = None) -> bool: # Same logic as NoticeFilter.matches in Pebble. # For example: if user_id filter is set and it doesn't match, return False. if user_id is not None and not (notice.user_id is None or user_id == notice.user_id): return False if types is not None and notice.type not in types: return False if keys is not None and notice.key not in keys: return False return True