# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Infrastructure to build unit tests for charms using the ops library."""
import dataclasses
import datetime
import fnmatch
import inspect
import io
import ipaddress
import os
import pathlib
import random
import shutil
import signal
import tempfile
import uuid
import warnings
from contextlib import contextmanager
from io import BytesIO, IOBase, StringIO
from textwrap import dedent
from typing import (
TYPE_CHECKING,
Any,
AnyStr,
BinaryIO,
Callable,
Dict,
Generic,
Iterable,
List,
Literal,
Mapping,
Optional,
Sequence,
Set,
TextIO,
Tuple,
Type,
TypeVar,
Union,
cast,
)
from ops import charm, framework, model, pebble, storage
from ops._private import yaml
from ops.charm import CharmBase, CharmMeta, RelationRole
from ops.model import Container, RelationNotFoundError
from ops.pebble import ExecProcess
if TYPE_CHECKING:
from typing_extensions import TypedDict
from ops.model import _NetworkDict
ReadableBuffer = Union[bytes, str, StringIO, BytesIO, BinaryIO]
_StringOrPath = Union[str, pathlib.PurePosixPath, pathlib.Path]
_FileKwargs = TypedDict('_FileKwargs', {
'permissions': Optional[int],
'last_modified': datetime.datetime,
'user_id': Optional[int],
'user': Optional[str],
'group_id': Optional[int],
'group': Optional[str],
})
_RelationEntities = TypedDict('_RelationEntities', {
'app': str,
'units': List[str]
})
_ConfigOption = TypedDict('_ConfigOption', {
'type': Literal['string', 'int', 'float', 'boolean'],
'description': str,
'default': Union[str, int, float, bool],
})
_StatusName = Literal['unknown', 'blocked', 'active', 'maintenance', 'waiting']
_RawStatus = TypedDict('_RawStatus', {
'status': _StatusName,
'message': str,
})
RawConfig = TypedDict("RawConfig", {'options': Dict[str, _ConfigOption]})
# YAMLStringOrFile is something like metadata.yaml or actions.yaml. You can
# pass in a file-like object or the string directly.
YAMLStringOrFile = Union[str, TextIO]
# An instance of an Application or Unit, or the name of either.
# This is done here to avoid a scoping issue with the `model` property
# of the Harness class below.
AppUnitOrName = Union[str, model.Application, model.Unit]
# CharmType represents user charms that are derived from CharmBase.
CharmType = TypeVar('CharmType', bound=charm.CharmBase)
[docs]@dataclasses.dataclass
class ExecArgs:
"""Represent arguments captured from the :meth:`ops.Container.exec` method call.
These arguments will be passed to the :meth:`Harness.handle_exec` handler function.
See :meth:`ops.pebble.Client.exec` for documentation of properties.
"""
command: List[str]
environment: Dict[str, str]
working_dir: Optional[str]
timeout: Optional[float]
user_id: Optional[int]
user: Optional[str]
group_id: Optional[int]
group: Optional[str]
stdin: Optional[Union[str, bytes]]
encoding: Optional[str]
combine_stderr: bool
[docs]@dataclasses.dataclass
class ExecResult:
"""Represents the result of a simulated process execution.
This class is typically used to return the output and exit code from the
:meth:`Harness.handle_exec` result or handler function.
"""
exit_code: int = 0
stdout: Union[str, bytes] = b""
stderr: Union[str, bytes] = b""
ExecHandler = Callable[[ExecArgs], Union[None, ExecResult]]
# noinspection PyProtectedMember
[docs]class Harness(Generic[CharmType]):
"""This class represents a way to build up the model that will drive a test suite.
The model created is from the viewpoint of the charm that is being tested.
Below is an example test using :meth:`begin_with_initial_hooks` that ensures
the charm responds correctly to config changes::
class TestCharm(unittest.TestCase):
def test_foo(self):
harness = Harness(MyCharm)
self.addCleanup(harness.cleanup) # always clean up after ourselves
# Instantiate the charm and trigger events that Juju would on startup
harness.begin_with_initial_hooks()
# Update charm config and trigger config-changed
harness.update_config({'log_level': 'warn'})
# Check that charm properly handled config-changed, for example,
# the charm added the correct Pebble layer
plan = harness.get_container_pebble_plan('prometheus')
self.assertIn('--log.level=warn', plan.services['prometheus'].command)
To set up the model without triggering events (or calling charm code), perform the
harness actions before calling :meth:`begin`. Below is an example that adds a
relation before calling ``begin``, and then updates config to trigger the
``config-changed`` event in the charm::
class TestCharm(unittest.TestCase):
def test_bar(self):
harness = Harness(MyCharm)
self.addCleanup(harness.cleanup) # always clean up after ourselves
# Set up model before "begin" (no events triggered)
harness.set_leader(True)
harness.add_relation('db', 'postgresql', unit_data={'key': 'val'})
# Now instantiate the charm to start triggering events as the model changes
harness.begin()
harness.update_config({'some': 'config'})
# Check that charm has properly handled config-changed, for example,
# has written the app's config file
root = harness.get_filesystem_root('container')
assert (root / 'etc' / 'app.conf').exists()
Args:
charm_cls: The Charm class to test.
meta: A string or file-like object containing the contents of
``metadata.yaml``. If not supplied, we will look for a ``metadata.yaml`` file in the
parent directory of the Charm, and if not found fall back to a trivial
``name: test-charm`` metadata.
actions: A string or file-like object containing the contents of
``actions.yaml``. If not supplied, we will look for an ``actions.yaml`` file in the
parent directory of the Charm.
config: A string or file-like object containing the contents of
``config.yaml``. If not supplied, we will look for a ``config.yaml`` file in the
parent directory of the Charm.
"""
def __init__(
self,
charm_cls: Type[CharmType],
*,
meta: Optional[YAMLStringOrFile] = None,
actions: Optional[YAMLStringOrFile] = None,
config: Optional[YAMLStringOrFile] = None):
self._charm_cls = charm_cls
self._charm: Optional[CharmType] = None
self._charm_dir = 'no-disk-path' # this may be updated by _create_meta
self._meta = self._create_meta(meta, actions)
self._unit_name: str = f"{self._meta.name}/0"
self._hooks_enabled: bool = True
self._relation_id_counter: int = 0
config_ = self._get_config(config)
self._backend = _TestingModelBackend(self._unit_name, self._meta, config_)
self._model = model.Model(self._meta, self._backend)
self._storage = storage.SQLiteStorage(':memory:')
self._framework = framework.Framework(
self._storage, self._charm_dir, self._meta, self._model)
def _event_context(self, event_name: str):
"""Configures the Harness to behave as if an event hook were running.
This means that the Harness will perform strict access control of relation data.
Example usage:
# this is how we test that attempting to write a remote app's
# databag will raise RelationDataError.
>>> with harness._event_context('foo'):
>>> with pytest.raises(ops.model.RelationDataError):
>>> my_relation.data[remote_app]['foo'] = 'bar'
# this is how we test with 'realistic conditions' how an event handler behaves
# when we call it directly -- i.e. without going through harness.add_relation
>>> def test_foo():
>>> class MyCharm:
>>> ...
>>> def event_handler(self, event):
>>> # this is expected to raise an exception
>>> event.relation.data[event.relation.app]['foo'] = 'bar'
>>>
>>> harness = Harness(MyCharm)
>>> event = MagicMock()
>>> event.relation = harness.charm.model.relations[0]
>>>
>>> with harness._event_context('my_relation_joined'):
>>> with pytest.raises(ops.model.RelationDataError):
>>> harness.charm.event_handler(event)
If event_name == '', conversely, the Harness will believe that no hook
is running, allowing temporary unrestricted access to read/write a relation's
databags even from inside an event handler.
>>> def test_foo():
>>> class MyCharm:
>>> ...
>>> def event_handler(self, event):
>>> # this is expected to raise an exception since we're not leader
>>> event.relation.data[self.app]['foo'] = 'bar'
>>>
>>> harness = Harness(MyCharm)
>>> event = MagicMock()
>>> event.relation = harness.charm.model.relations[0]
>>>
>>> with harness._event_context('my_relation_joined'):
>>> harness.charm.event_handler(event)
"""
return self._framework._event_context(event_name)
[docs] def set_can_connect(self, container: Union[str, model.Container], val: bool):
"""Change the simulated connection status of a container's underlying Pebble client.
After calling this, :meth:`ops.Container.can_connect` will return val.
"""
if isinstance(container, str):
container = self.model.unit.get_container(container)
self._backend._set_can_connect(container._pebble, val)
@property
def charm(self) -> CharmType:
"""Return the instance of the charm class that was passed to ``__init__``.
Note that the Charm is not instantiated until :meth:`.begin()` is called.
Until then, attempting to access this property will raise an exception.
"""
if self._charm is None:
raise RuntimeError('The charm instance is not available yet. '
'Call Harness.begin() first.')
return self._charm
@property
def model(self) -> model.Model:
"""Return the :class:`~ops.model.Model` that is being driven by this Harness."""
return self._model
@property
def framework(self) -> framework.Framework:
"""Return the Framework that is being driven by this Harness."""
return self._framework
[docs] def begin(self) -> None:
"""Instantiate the Charm and start handling events.
Before calling :meth:`begin`, there is no Charm instance, so changes to the Model won't
emit events. Call :meth:`.begin` for :attr:`.charm` to be valid.
"""
if self._charm is not None:
raise RuntimeError('cannot call the begin method on the harness more than once')
# The Framework adds attributes to class objects for events, etc. As such, we can't re-use
# the original class against multiple Frameworks. So create a locally defined class
# and register it.
# TODO: jam 2020-03-16 We are looking to changes this to Instance attributes instead of
# Class attributes which should clean up this ugliness. The API can stay the same
class TestEvents(self._charm_cls.on.__class__):
pass
TestEvents.__name__ = self._charm_cls.on.__class__.__name__
class TestCharm(self._charm_cls): # type: ignore
on = TestEvents()
# Note: jam 2020-03-01 This is so that errors in testing say MyCharm has no attribute foo,
# rather than TestCharm has no attribute foo.
TestCharm.__name__ = self._charm_cls.__name__
self._charm = TestCharm(self._framework) # type: ignore
[docs] def begin_with_initial_hooks(self) -> None:
"""Fire the same hooks that Juju would fire at startup.
This triggers install, relation-created, config-changed, start, pebble-ready (for any
containers), and any relation-joined hooks based on what relations have been added before
begin was called. Note that all of these are fired before returning control
to the test suite, so to introspect what happens at each step, fire them directly
(for example, ``Charm.on.install.emit()``).
To use this with all the normal hooks, instantiate the harness, setup any relations that
should be active when the charm starts, and then call this method. This method will
automatically create and add peer relations that are specified in metadata.yaml.
If the charm metadata specifies containers, this sets can_connect to True for all
containers (in addition to triggering pebble-ready for each).
Example::
harness = Harness(MyCharm)
# Do initial setup here
# Add storage if needed before begin_with_initial_hooks() is called
storage_ids = harness.add_storage('data', count=1)[0]
storage_id = storage_id[0] # we only added one storage instance
harness.add_relation('db', 'postgresql', unit_data={'key': 'val'})
harness.set_leader(True)
harness.update_config({'initial': 'config'})
harness.begin_with_initial_hooks()
# This will cause
# install, db-relation-created('postgresql'), leader-elected, config-changed, start
# db-relation-joined('postgresql/0'), db-relation-changed('postgresql/0')
# To be fired.
"""
self.begin()
charm = cast(CharmBase, self._charm)
# Checking if disks have been added
# storage-attached events happen before install
for storage_name in self._meta.storages:
for storage_index in self._backend.storage_list(storage_name, include_detached=True):
s = model.Storage(storage_name, storage_index, self._backend)
self.attach_storage(s.full_id)
# Storage done, emit install event
charm.on.install.emit()
# Juju itself iterates what relation to fire based on a map[int]relation, so it doesn't
# guarantee a stable ordering between relation events. It *does* give a stable ordering
# of joined units for a given relation.
items = list(self._meta.relations.items())
random.shuffle(items)
this_app_name = self._meta.name
for relname, rel_meta in items:
if rel_meta.role == RelationRole.peer:
# If the user has directly added a relation, leave it be, but otherwise ensure
# that peer relations are always established at before leader-elected.
rel_ids = self._backend._relation_ids_map.get(relname)
if rel_ids is None:
self.add_relation(relname, self._meta.name)
else:
random.shuffle(rel_ids)
for rel_id in rel_ids:
self._emit_relation_created(relname, rel_id, this_app_name)
else:
rel_ids = self._backend._relation_ids_map.get(relname, [])
random.shuffle(rel_ids)
for rel_id in rel_ids:
app_name = self._backend._relation_app_and_units[rel_id]["app"]
self._emit_relation_created(relname, rel_id, app_name)
if self._backend._is_leader:
charm.on.leader_elected.emit()
else:
charm.on.leader_settings_changed.emit()
charm.on.config_changed.emit()
charm.on.start.emit()
# Set can_connect and fire pebble-ready for any containers.
for container_name in self._meta.containers:
self.container_pebble_ready(container_name)
# If the initial hooks do not set a unit status, the Juju controller will switch
# the unit status from "Maintenance" to "Unknown". See gh#726
post_setup_sts = self._backend.status_get()
if post_setup_sts.get("status") == "maintenance" and not post_setup_sts.get("message"):
self._backend.status_set("unknown", "", is_app=False)
all_ids = list(self._backend._relation_names.items())
random.shuffle(all_ids)
for rel_id, rel_name in all_ids:
rel_app_and_units = self._backend._relation_app_and_units[rel_id]
app_name = rel_app_and_units["app"]
# Note: Juju *does* fire relation events for a given relation in the sorted order of
# the unit names. It also always fires relation-changed immediately after
# relation-joined for the same unit.
# Juju only fires relation-changed (app) if there is data for the related application
relation = self._model.get_relation(rel_name, rel_id)
if self._backend._relation_data_raw[rel_id].get(app_name):
app = self._model.get_app(app_name)
charm.on[rel_name].relation_changed.emit(relation, app, None)
for unit_name in sorted(rel_app_and_units["units"]):
remote_unit = self._model.get_unit(unit_name)
charm.on[rel_name].relation_joined.emit(
relation, remote_unit.app, remote_unit)
charm.on[rel_name].relation_changed.emit(
relation, remote_unit.app, remote_unit)
[docs] def cleanup(self) -> None:
"""Called by the test infrastructure to clean up any temporary directories/files/etc.
Always call ``self.addCleanup(harness.cleanup)`` after creating a :class:`Harness`.
"""
self._backend._cleanup()
def _create_meta(self, charm_metadata_yaml: Optional[YAMLStringOrFile],
action_metadata_yaml: Optional[YAMLStringOrFile]) -> CharmMeta:
"""Create a CharmMeta object.
Handle the cases where a user doesn't supply explicit metadata snippets.
This will try to load metadata from ``<charm_dir>/charmcraft.yaml`` first, then
``<charm_dir>/metadata.yaml`` if charmcraft.yaml does not include metadata,
and ``<charm_dir>/actions.yaml`` if charmcraft.yaml does not include actions.
"""
filename = inspect.getfile(self._charm_cls)
charm_dir = pathlib.Path(filename).parents[1]
charm_metadata: Optional[Dict[str, Any]] = None
charmcraft_metadata: Optional[Dict[str, Any]] = None
# Check charmcraft.yaml and load it if it exists
charmcraft_meta = charm_dir / "charmcraft.yaml"
if charmcraft_meta.is_file():
self._charm_dir = charm_dir
charmcraft_metadata = yaml.safe_load(charmcraft_meta.read_text())
# Load metadata from parameters if provided
if charm_metadata_yaml is not None:
if isinstance(charm_metadata_yaml, str):
charm_metadata_yaml = dedent(charm_metadata_yaml)
charm_metadata = yaml.safe_load(charm_metadata_yaml)
else:
# Check charmcraft.yaml for metadata if no metadata is provided
if charmcraft_metadata is not None:
meta_keys = ["name", "summary", "description"]
if any(key in charmcraft_metadata for key in meta_keys):
# Unrelated keys in the charmcraft.yaml file will be ignored.
charm_metadata = charmcraft_metadata
# Still no metadata, check metadata.yaml
if charm_metadata is None:
metadata_path = charm_dir / 'metadata.yaml'
if metadata_path.is_file():
charm_metadata = yaml.safe_load(metadata_path.read_text())
self._charm_dir = charm_dir
# Use default metadata if metadata is not found
if charm_metadata is None:
charm_metadata = {"name": "test-charm"}
action_metadata: Optional[Dict[str, Any]] = None
# Load actions from parameters if provided
if action_metadata_yaml is not None:
if isinstance(action_metadata_yaml, str):
action_metadata_yaml = dedent(action_metadata_yaml)
action_metadata = yaml.safe_load(action_metadata_yaml)
else:
# Check charmcraft.yaml for actions if no actions are provided
if charmcraft_metadata is not None and "actions" in charmcraft_metadata:
action_metadata = charmcraft_metadata["actions"]
# Still no actions, check actions.yaml
if action_metadata is None:
actions_path = charm_dir / 'actions.yaml'
if actions_path.is_file():
action_metadata = yaml.safe_load(actions_path.read_text())
self._charm_dir = charm_dir
return CharmMeta(charm_metadata, action_metadata)
def _get_config(self, charm_config_yaml: Optional['YAMLStringOrFile']):
"""If the user passed a config to Harness, use it.
Otherwise try to load config from ``<charm_dir>/charmcraft.yaml`` first, then
``<charm_dir>/config.yaml`` if charmcraft.yaml does not include config.
"""
filename = inspect.getfile(self._charm_cls)
charm_dir = pathlib.Path(filename).parents[1]
config: Optional[Dict[str, Any]] = None
# Load config from parameters if provided
if charm_config_yaml is not None:
if isinstance(charm_config_yaml, str):
charm_config_yaml = dedent(charm_config_yaml)
config = yaml.safe_load(charm_config_yaml)
else:
# Check charmcraft.yaml for config if no config is provided
charmcraft_meta = charm_dir / "charmcraft.yaml"
if charmcraft_meta.is_file():
charmcraft_metadata: Dict[str, Any] = yaml.safe_load(charmcraft_meta.read_text())
config = charmcraft_metadata.get("config")
# Still no config, check config.yaml
if config is None:
config_path = charm_dir / 'config.yaml'
if config_path.is_file():
config = yaml.safe_load(config_path.read_text())
self._charm_dir = charm_dir
# Use default config if config is not found
if config is None:
config = {}
if not isinstance(config, dict):
raise TypeError(config)
return cast('RawConfig', config)
[docs] def add_oci_resource(self, resource_name: str,
contents: Optional[Mapping[str, str]] = None) -> None:
"""Add oci resources to the backend.
This will register an oci resource and create a temporary file for processing metadata
about the resource. A default set of values will be used for all the file contents
unless a specific contents dict is provided.
Args:
resource_name: Name of the resource to add custom contents to.
contents: Optional custom dict to write for the named resource.
"""
if not contents:
contents = {'registrypath': 'registrypath',
'username': 'username',
'password': 'password',
}
if resource_name not in self._meta.resources.keys():
raise RuntimeError(f'Resource {resource_name} is not a defined resources')
if self._meta.resources[resource_name].type != "oci-image":
raise RuntimeError(f'Resource {resource_name} is not an OCI Image')
as_yaml = yaml.safe_dump(contents)
self._backend._resources_map[resource_name] = ('contents.yaml', as_yaml)
[docs] def add_resource(self, resource_name: str, content: AnyStr) -> None:
"""Add content for a resource to the backend.
This will register the content, so that a call to ``model.resources.fetch(resource_name)``
will return a path to a file containing that content.
Args:
resource_name: The name of the resource being added
content: Either string or bytes content, which will be the content of the filename
returned by resource-get. If contents is a string, it will be encoded in utf-8
"""
if resource_name not in self._meta.resources.keys():
raise RuntimeError(f'Resource {resource_name} is not a defined resources')
record = self._meta.resources[resource_name]
if record.type != "file":
raise RuntimeError(
f'Resource {resource_name} is not a file, but actually {record.type}')
filename = record.filename
if filename is None:
filename = resource_name
self._backend._resources_map[resource_name] = (filename, content)
[docs] def populate_oci_resources(self) -> None:
"""Populate all OCI resources."""
for name, data in self._meta.resources.items():
if data.type == "oci-image":
self.add_oci_resource(name)
[docs] def disable_hooks(self) -> None:
"""Stop emitting hook events when the model changes.
This can be used by developers to stop changes to the model from emitting events that
the charm will react to. Call :meth:`.enable_hooks`
to re-enable them.
"""
self._hooks_enabled = False
[docs] def enable_hooks(self) -> None:
"""Re-enable hook events from charm.on when the model is changed.
By default, hook events are enabled once :meth:`.begin` is called,
but if :meth:`.disable_hooks` is used, this method will enable
them again.
"""
self._hooks_enabled = True
[docs] @contextmanager
def hooks_disabled(self):
"""A context manager to run code with hooks disabled.
Example::
with harness.hooks_disabled():
# things in here don't fire events
harness.set_leader(True)
harness.update_config(unset=['foo', 'bar'])
# things here will again fire events
"""
if self._hooks_enabled:
self.disable_hooks()
try:
yield None
finally:
self.enable_hooks()
else:
yield None
def _next_relation_id(self):
rel_id = self._relation_id_counter
self._relation_id_counter += 1
return rel_id
[docs] def add_storage(self, storage_name: str, count: int = 1,
*, attach: bool = False) -> List[str]:
"""Create a new storage device and attach it to this unit.
To have repeatable tests, each device will be initialized with
location set to /[tmpdir]/<storage_name>N, where N is the counter and
will be a number from [0,total_num_disks-1].
The test harness uses symbolic links to imitate storage mounts, which may lead to some
inconsistencies compared to the actual charm.
Args:
storage_name: The storage backend name on the Charm
count: Number of disks being added
attach: True to also attach the storage mount and emit storage-attached if
harness.begin() has been called.
Return:
A list of storage IDs, e.g. ["my-storage/1", "my-storage/2"].
"""
if storage_name not in self._meta.storages:
raise RuntimeError(
f"the key '{storage_name}' is not specified as a storage key in metadata")
storage_indices = self._backend.storage_add(storage_name, count)
ids: List[str] = []
for storage_index in storage_indices:
s = model.Storage(storage_name, storage_index, self._backend)
ids.append(s.full_id)
if attach:
self.attach_storage(s.full_id)
return ids
[docs] def detach_storage(self, storage_id: str) -> None:
"""Detach a storage device.
The intent of this function is to simulate a ``juju detach-storage`` call.
It will trigger a storage-detaching hook if the storage unit in question exists
and is presently marked as attached.
Args:
storage_id: The full storage ID of the storage unit being detached, including the
storage key, e.g. my-storage/0.
"""
if self._charm is None:
raise RuntimeError('cannot detach storage before Harness is initialised')
storage_name, storage_index = storage_id.split('/', 1)
storage_index = int(storage_index)
storage_attached = self._backend._storage_is_attached(
storage_name, storage_index)
if storage_attached and self._hooks_enabled:
self.charm.on[storage_name].storage_detaching.emit(
model.Storage(storage_name, storage_index, self._backend))
self._backend._storage_detach(storage_id)
[docs] def attach_storage(self, storage_id: str) -> None:
"""Attach a storage device.
The intent of this function is to simulate a ``juju attach-storage`` call.
It will trigger a storage-attached hook if the storage unit in question exists
and is presently marked as detached.
The test harness uses symbolic links to imitate storage mounts, which may lead to some
inconsistencies compared to the actual charm. Users should be cognizant of
this potential discrepancy.
Args:
storage_id: The full storage ID of the storage unit being attached, including the
storage key, e.g. my-storage/0.
"""
if not self._backend._storage_attach(storage_id):
return # storage was already attached
if not self._charm or not self._hooks_enabled:
return # don't need to run hook callback
storage_name, storage_index = storage_id.split('/', 1)
# Reset associated cached value in the storage mappings. If we don't do this,
# Model._storages won't return Storage objects for subsequently-added storage.
self._model._storages._invalidate(storage_name)
storage_index = int(storage_index)
self.charm.on[storage_name].storage_attached.emit(
model.Storage(storage_name, storage_index, self._backend))
[docs] def remove_storage(self, storage_id: str) -> None:
"""Detach a storage device.
The intent of this function is to simulate a ``juju remove-storage`` call.
It will trigger a storage-detaching hook if the storage unit in question exists
and is presently marked as attached. Then it will remove the storage
unit from the testing backend.
Args:
storage_id: The full storage ID of the storage unit being removed, including the
storage key, e.g. my-storage/0.
"""
storage_name, storage_index = storage_id.split('/', 1)
storage_index = int(storage_index)
if storage_name not in self._meta.storages:
raise RuntimeError(
f"the key '{storage_name}' is not specified as a storage key in metadata")
is_attached = self._backend._storage_is_attached(
storage_name, storage_index)
if self._charm is not None and self._hooks_enabled and is_attached:
self.charm.on[storage_name].storage_detaching.emit(
model.Storage(storage_name, storage_index, self._backend))
self._backend._storage_remove(storage_id)
[docs] def add_relation(self, relation_name: str, remote_app: str, *,
app_data: Optional[Mapping[str, str]] = None,
unit_data: Optional[Mapping[str, str]] = None) -> int:
"""Declare that there is a new relation between this application and `remote_app`.
This function creates a relation with an application and triggers a
:class:`RelationCreatedEvent <ops.RelationCreatedEvent>`.
If `app_data` or `unit_data` are provided, also add a new unit
(``<remote_app>/0``) to the relation and trigger
:class:`RelationJoinedEvent <ops.RelationJoinedEvent>`. Then update
the application data if `app_data` is provided and the unit data if
`unit_data` is provided, triggering
:class:`RelationChangedEvent <ops.RelationChangedEvent>` after each update.
Alternatively, charm tests can call :meth:`add_relation_unit` and
:meth:`update_relation_data` explicitly.
Example usage::
secret_id = harness.add_model_secret('mysql', {'password': 'SECRET'})
harness.add_relation('db', 'mysql', unit_data={
'host': 'mysql.localhost,
'username': 'appuser',
'secret-id': secret_id,
})
Args:
relation_name: The relation on the charm that is being related to.
remote_app: The name of the application that is being related to.
To add a peer relation, set to the name of *this* application.
app_data: If provided, also add a new unit to the relation
(triggering relation-joined) and set the *application* relation data
(triggering relation-changed).
unit_data: If provided, also add a new unit to the relation
(triggering relation-joined) and set the *unit* relation data
(triggering relation-changed).
Return:
The ID of the relation created.
"""
relation_id = self._next_relation_id()
self._backend._relation_ids_map.setdefault(
relation_name, []).append(relation_id)
self._backend._relation_names[relation_id] = relation_name
self._backend._relation_list_map[relation_id] = []
self._backend._relation_data_raw[relation_id] = {
remote_app: {},
self._backend.unit_name: {},
self._backend.app_name: {}}
self._backend._relation_app_and_units[relation_id] = {
"app": remote_app,
"units": [],
}
# Reload the relation_ids list
if self._model is not None:
self._model.relations._invalidate(relation_name)
self._emit_relation_created(relation_name, relation_id, remote_app)
if app_data is not None or unit_data is not None:
remote_unit = remote_app + '/0'
self.add_relation_unit(relation_id, remote_unit)
if app_data is not None:
self.update_relation_data(relation_id, remote_app, app_data)
if unit_data is not None:
self.update_relation_data(relation_id, remote_unit, unit_data)
return relation_id
[docs] def remove_relation(self, relation_id: int) -> None:
"""Remove a relation.
Args:
relation_id: The relation ID for the relation to be removed.
Raises:
RelationNotFoundError: if relation id is not valid
"""
rel_names = self._backend._relation_names
try:
relation_name = rel_names[relation_id]
remote_app = self._backend.relation_remote_app_name(relation_id)
except KeyError as e:
raise model.RelationNotFoundError from e
rel_list_map = self._backend._relation_list_map
for unit_name in rel_list_map[relation_id].copy():
self.remove_relation_unit(relation_id, unit_name)
self._emit_relation_broken(relation_name, relation_id, remote_app)
if self._model is not None:
self._model.relations._invalidate(relation_name)
self._backend._relation_app_and_units.pop(relation_id)
self._backend._relation_data_raw.pop(relation_id)
rel_list_map.pop(relation_id)
ids_map = self._backend._relation_ids_map
ids_map[relation_name].remove(relation_id)
rel_names.pop(relation_id)
# Remove secret grants that give access via this relation
for secret in self._backend._secrets:
secret.grants = {rid: names for rid, names in secret.grants.items()
if rid != relation_id}
def _emit_relation_created(self, relation_name: str, relation_id: int,
remote_app: str) -> None:
"""Trigger relation-created for a given relation with a given remote application."""
if self._charm is None or not self._hooks_enabled:
return
relation = self._model.get_relation(relation_name, relation_id)
app = self._model.get_app(remote_app)
self._charm.on[relation_name].relation_created.emit(
relation, app)
def _emit_relation_broken(self, relation_name: str, relation_id: int,
remote_app: str) -> None:
"""Trigger relation-broken for a given relation with a given remote application."""
if self._charm is None or not self._hooks_enabled:
return
relation = self._model.get_relation(relation_name, relation_id)
app = self._model.get_app(remote_app)
self._charm.on[relation_name].relation_broken.emit(relation, app)
[docs] def add_relation_unit(self, relation_id: int, remote_unit_name: str) -> None:
"""Add a new unit to a relation.
This will trigger a `relation_joined` event. This would naturally be
followed by a `relation_changed` event, which can be triggered with
:meth:`.update_relation_data`. This separation is artificial in the
sense that Juju will always fire the two, but is intended to make
testing relations and their data bags slightly more natural.
Unless finer-grained control is needed, most charm tests can call
:meth:`add_relation` with the `app_data` or `unit_data` argument
instead of using this function.
Example::
rel_id = harness.add_relation('db', 'postgresql')
harness.add_relation_unit(rel_id, 'postgresql/0')
Args:
relation_id: The integer relation identifier (as returned by :meth:`add_relation`).
remote_unit_name: A string representing the remote unit that is being added.
Return:
None
"""
self._backend._relation_list_map[relation_id].append(remote_unit_name)
# we can write remote unit data iff we are not in a hook env
relation_name = self._backend._relation_names[relation_id]
relation = self._model.get_relation(relation_name, relation_id)
if not relation:
raise RuntimeError('Relation id {} is mapped to relation name {},'
'but no relation matching that name was found.')
self._backend._relation_data_raw[relation_id][remote_unit_name] = {}
app = cast(model.Application, relation.app) # should not be None since we're testing
if not remote_unit_name.startswith(app.name):
warnings.warn(
'Remote unit name invalid: the remote application of {} is called {!r}; '
'the remote unit name should be {}/<some-number>, not {!r}.'
''.format(relation_name, app.name, app.name, remote_unit_name))
app_and_units = self._backend._relation_app_and_units
app_and_units[relation_id]["units"].append(remote_unit_name)
# Make sure that the Model reloads the relation_list for this relation_id, as well as
# reloading the relation data for this unit.
remote_unit = self._model.get_unit(remote_unit_name)
unit_cache = relation.data.get(remote_unit, None)
if unit_cache is not None:
unit_cache._invalidate()
self._model.relations._invalidate(relation_name)
if self._charm is None or not self._hooks_enabled:
return
self._charm.on[relation_name].relation_joined.emit(
relation, remote_unit.app, remote_unit)
[docs] def remove_relation_unit(self, relation_id: int, remote_unit_name: str) -> None:
"""Remove a unit from a relation.
Example::
rel_id = harness.add_relation('db', 'postgresql')
harness.add_relation_unit(rel_id, 'postgresql/0')
...
harness.remove_relation_unit(rel_id, 'postgresql/0')
This will trigger a `relation_departed` event. This would
normally be followed by a `relation_changed` event triggered
by Juju. However, when using the test harness, a
`relation_changed` event must be triggered using
:meth:`.update_relation_data`. This deviation from normal Juju
behaviour facilitates testing by making each step in the
charm life cycle explicit.
Args:
relation_id: The integer relation identifier (as returned by :meth:`add_relation`).
remote_unit_name: A string representing the remote unit that is being removed.
"""
relation_name = self._backend._relation_names[relation_id]
# gather data to invalidate cache later
remote_unit = self._model.get_unit(remote_unit_name)
relation = self._model.get_relation(relation_name, relation_id)
if not relation:
# This should not really happen, since there being a relation name mapped
# to this ID in _relation_names should guarantee that you created the relation
# following the proper path, but still...
raise RuntimeError('Relation id {} is mapped to relation name {},'
'but no relation matching that name was found.')
unit_cache = relation.data.get(remote_unit, None)
# remove the unit from the list of units in the relation
relation.units.remove(remote_unit)
self._emit_relation_departed(relation_id, remote_unit_name)
# remove the relation data for the departed unit now that the event has happened
self._backend._relation_list_map[relation_id].remove(remote_unit_name)
self._backend._relation_app_and_units[relation_id]["units"].remove(remote_unit_name)
self._backend._relation_data_raw[relation_id].pop(remote_unit_name)
self.model._relations._invalidate(relation_name=relation.name)
if unit_cache is not None:
unit_cache._invalidate()
def _emit_relation_departed(self, relation_id: int, unit_name: str):
"""Trigger relation-departed event for a given relation id and unit."""
if self._charm is None or not self._hooks_enabled:
return
rel_name = self._backend._relation_names[relation_id]
relation = self.model.get_relation(rel_name, relation_id)
if '/' in unit_name:
app_name = unit_name.split('/')[0]
app = self.model.get_app(app_name)
unit = self.model.get_unit(unit_name)
else:
raise ValueError('Invalid Unit Name')
self._charm.on[rel_name].relation_departed.emit(
relation, app, unit, unit_name)
[docs] def get_relation_data(self, relation_id: int, app_or_unit: AppUnitOrName) -> Mapping[str, str]:
"""Get the relation data bucket for a single app or unit in a given relation.
This ignores all of the safety checks of who can and can't see data in relations (eg,
non-leaders can't read their own application's relation data because there are no events
that keep that data up-to-date for the unit).
Args:
relation_id: The relation whose content we want to look at.
app_or_unit: An :class:`Application <ops.Application>` or
:class:`Unit <ops.Unit>` instance, or its name, whose data we
want to read.
Return:
A dict containing the relation data for ``app_or_unit`` or None.
Raises:
KeyError: if ``relation_id`` doesn't exist
"""
name = _get_app_or_unit_name(app_or_unit)
# bypass access control by going directly to raw
return self._backend._relation_data_raw[relation_id].get(name, None)
[docs] def get_pod_spec(self) -> Tuple[Mapping[Any, Any], Mapping[Any, Any]]:
"""Return the content of the pod spec as last set by the charm.
This returns both the pod spec and any k8s_resources that were supplied.
See the signature of :meth:`Pod.set_spec <ops.Pod.set_spec>`.
"""
return self._backend._pod_spec
[docs] def get_container_pebble_plan(
self, container_name: str
) -> pebble.Plan:
"""Return the current plan that Pebble is executing for the given container.
Args:
container_name: The simple name of the associated container
Return:
The Pebble plan for this container. Use
:meth:`Plan.to_yaml <ops.pebble.Plan.to_yaml>` to get a string
form for the content. Will raise ``KeyError`` if no Pebble client
exists for that container name (should only happen if container is
not present in ``metadata.yaml``).
"""
client = self._backend._pebble_clients.get(container_name)
if client is None:
raise KeyError(f'no known pebble client for container "{container_name}"')
return client.get_plan()
[docs] def container_pebble_ready(self, container_name: str):
"""Fire the pebble_ready hook for the associated container.
This will switch the given container's ``can_connect`` state to True
before the hook function is called.
It will do nothing if :meth:`begin()` has not been called.
"""
if self._charm is None:
return
container = self.model.unit.get_container(container_name)
self.set_can_connect(container, True)
self.charm.on[container_name].pebble_ready.emit(container)
[docs] def get_workload_version(self) -> str:
"""Read the workload version that was set by the unit."""
return self._backend._workload_version
[docs] def set_model_info(self, name: Optional[str] = None, uuid: Optional[str] = None) -> None:
"""Set the name and UUID of the model that this is representing.
Cannot be called once :meth:`begin` has been called. Use it to set the
value that will be returned by :attr:`Model.name <ops.Model.name>` and
:attr:`Model.uuid <ops.Model.uuid>`.
This is a convenience method to invoke both :meth:`set_model_name`
and :meth:`set_model_uuid` at once.
"""
if name is not None:
self.set_model_name(name)
if uuid is not None:
self.set_model_uuid(uuid)
[docs] def set_model_name(self, name: str) -> None:
"""Set the name of the Model that this is representing.
Cannot be called once :meth:`begin` has been called. Use it to set the
value that will be returned by :attr:`Model.name <ops.Model.name>`.
"""
if self._charm is not None:
raise RuntimeError('cannot set the Model name after begin()')
self._backend.model_name = name
[docs] def set_model_uuid(self, uuid: str) -> None:
"""Set the uuid of the Model that this is representing.
Cannot be called once :meth:`begin` has been called. Use it to set the
value that will be returned by :attr:`Model.uuid <ops.Model.uuid>`.
"""
if self._charm is not None:
raise RuntimeError('cannot set the Model uuid after begin()')
self._backend.model_uuid = uuid
[docs] def update_relation_data(
self,
relation_id: int,
app_or_unit: str,
key_values: Mapping[str, str],
) -> None:
"""Update the relation data for a given unit or application in a given relation.
This also triggers the `relation_changed` event for the given ``relation_id``.
Unless finer-grained control is needed, most charm tests can call
:meth:`add_relation` with the `app_data` or `unit_data` argument
instead of using this function.
Args:
relation_id: The integer relation ID representing this relation.
app_or_unit: The unit or application name that is being updated.
This can be the local or remote application.
key_values: Each key/value will be updated in the relation data.
"""
relation_name = self._backend._relation_names[relation_id]
relation = self._model.get_relation(relation_name, relation_id)
if '/' in app_or_unit:
entity = self._model.get_unit(app_or_unit)
else:
entity = self._model.get_app(app_or_unit)
if not relation:
raise RuntimeError('Relation id {} is mapped to relation name {},'
'but no relation matching that name was found.')
rel_data = relation.data.get(entity, None)
if rel_data is not None:
# rel_data may have cached now-stale data, so _invalidate() it.
# Note, this won't cause the data to be loaded if it wasn't already.
rel_data._invalidate()
old_values = self._backend._relation_data_raw[relation_id][app_or_unit].copy()
assert isinstance(old_values, dict), old_values
# get a new relation instance to ensure a clean state
new_relation_instance = self.model.relations._get_unique(relation.name, relation_id)
assert new_relation_instance is not None # type guard; this passed before...
databag = new_relation_instance.data[entity]
# ensure that WE as harness can temporarily write the databag
with self._event_context(''):
values_have_changed = False
for k, v in key_values.items():
if v == '':
if databag.pop(k, None) != v:
values_have_changed = True
else:
if k not in databag or databag[k] != v:
databag[k] = v # this triggers relation-set
values_have_changed = True
if not values_have_changed:
# Do not issue a relation changed event if the data bags have not changed
return
if app_or_unit == self._model.unit.name:
# No events for our own unit
return
if app_or_unit == self._model.app.name:
# updating our own app only generates an event if it is a peer relation and we
# aren't the leader
is_peer = self._meta.relations[relation_name].role.is_peer()
if not is_peer:
return
if self._model.unit.is_leader():
return
self._emit_relation_changed(relation_id, app_or_unit)
def _emit_relation_changed(self, relation_id: int, app_or_unit: str):
if self._charm is None or not self._hooks_enabled:
return
rel_name = self._backend._relation_names[relation_id]
relation = self.model.get_relation(rel_name, relation_id)
if '/' in app_or_unit:
app_name = app_or_unit.split('/')[0]
unit_name = app_or_unit
app = self.model.get_app(app_name)
unit = self.model.get_unit(unit_name)
args = (relation, app, unit)
else:
app_name = app_or_unit
app = self.model.get_app(app_name)
args = (relation, app)
self._charm.on[rel_name].relation_changed.emit(*args)
def _update_config(
self,
key_values: Optional[Mapping[str, Union[str, int, float, bool]]] = None,
unset: Iterable[str] = (),
) -> None:
"""Update the config as seen by the charm.
This will *not* trigger a `config_changed` event, and is intended for internal use.
Note that the `key_values` mapping will only add or update configuration items.
To remove existing ones, see the `unset` parameter.
Args:
key_values: A Mapping of key:value pairs to update in config.
unset: An iterable of keys to remove from config.
"""
# NOTE: jam 2020-03-01 Note that this sort of works "by accident". Config
# is a LazyMapping, but its _load returns a dict and this method mutates
# the dict that Config is caching. Arguably we should be doing some sort
# of charm.framework.model.config._invalidate()
config = self._backend._config
if key_values is not None:
for key, value in key_values.items():
if key in config._defaults:
if value is not None:
config._config_set(key, value)
else:
raise ValueError(f"unknown config option: '{key}'")
for key in unset:
# When the key is unset, revert to the default if one exists
default = config._defaults.get(key, None)
if default is not None:
config._config_set(key, default)
else:
config.pop(key, None)
[docs] def update_config(
self,
key_values: Optional[Mapping[str, Union[str, int, float, bool]]] = None,
unset: Iterable[str] = (),
) -> None:
"""Update the config as seen by the charm.
This will trigger a `config_changed` event.
Note that the ``key_values`` mapping will only add or update configuration items.
To remove existing ones, see the ``unset`` parameter.
Args:
key_values: A Mapping of key:value pairs to update in config.
unset: An iterable of keys to remove from config.
This sets the value to the default if defined,
otherwise removes the key altogether.
"""
self._update_config(key_values, unset)
if self._charm is None or not self._hooks_enabled:
return
self._charm.on.config_changed.emit()
[docs] def set_leader(self, is_leader: bool = True) -> None:
"""Set whether this unit is the leader or not.
If this charm becomes a leader then `leader_elected` will be triggered. If :meth:`begin`
has already been called, then the charm's peer relation should usually be added *prior* to
calling this method (with :meth:`add_relation`) to properly initialize and make
available relation data that leader elected hooks may want to access.
Args:
is_leader: Whether this unit is the leader.
"""
self._backend._is_leader = is_leader
# Note: jam 2020-03-01 currently is_leader is cached at the ModelBackend level, not in
# the Model objects, so this automatically gets noticed.
if is_leader and self._charm is not None and self._hooks_enabled:
self._charm.on.leader_elected.emit()
[docs] def set_planned_units(self, num_units: int) -> None:
"""Set the number of "planned" units.
This is the value that :meth:`Application.planned_units <ops.Application.planned_units>`
should return.
In real world circumstances, this number will be the number of units
in the application. That is, this number will be the number of peers
this unit has, plus one, as we count our own unit in the total.
A change to the return from ``planned_units`` will not generate an
event. Typically, a charm author would check planned units during a
config or install hook, or after receiving a peer relation joined
event.
"""
if num_units < 0:
raise TypeError("num_units must be 0 or a positive integer.")
self._backend._planned_units = num_units
[docs] def reset_planned_units(self) -> None:
"""Reset the planned units override.
This allows the harness to fall through to the built in methods that will try to
guess at a value for planned units, based on the number of peer relations that
have been setup in the testing harness.
"""
self._backend._planned_units = None
[docs] def add_network(self, address: str, *,
endpoint: Optional[str] = None,
relation_id: Optional[int] = None,
cidr: Optional[str] = None,
interface: str = 'eth0',
ingress_addresses: Optional[Iterable[str]] = None,
egress_subnets: Optional[Iterable[str]] = None):
"""Add simulated network data for the given relation endpoint (binding).
Calling this multiple times with the same (binding, relation_id)
combination will replace the associated network data.
Example::
# Set network info for default binding
harness.add_network('10.0.0.10')
# Or set network info for specific endpoint
harness.add_network('10.0.0.10', endpoint='db')
After either of those calls, the following will be true (in the first
case, the simulated network-get will fall back to the default binding)::
binding = harness.model.get_binding('db')
assert binding.network.bind_address == ipaddress.IPv4Address('10.0.0.10'))
Args:
address: Binding's IPv4 or IPv6 address.
endpoint: Name of relation endpoint (binding) to add network
data for. If not provided, add info for the default binding.
relation_id: Relation ID for the binding. If provided, the
endpoint argument must be provided and correspond. If not
provided, add network data for the endpoint's default binding.
cidr: Binding's CIDR. Defaults to "<address>/24" if address is an
IPv4 address, or "<address>/64" if address is IPv6 (the host
bits are cleared).
interface: Name of network interface.
ingress_addresses: List of ingress addresses. Defaults to [address].
egress_subnets: List of egress subnets. Defaults to [cidr].
Raises:
ModelError: If the endpoint is not a known relation name, or the
relation_id is incorrect or doesn't match the endpoint.
ValueError: If address is not an IPv4 or IPv6 address.
"""
if endpoint is not None and endpoint not in self._meta.relations:
raise model.ModelError(f'{endpoint!r} is not a known endpoint')
if relation_id is not None:
if endpoint is None:
raise TypeError('endpoint must be set if relation_id is provided')
relation_name = self._backend._relation_names.get(relation_id)
if relation_name is None:
raise model.ModelError(
f'relation_id {relation_id} has not been added; use add_relation')
if endpoint != relation_name:
raise model.ModelError(
f"endpoint {endpoint!r} does not correspond to relation_id "
+ f"{relation_id} ({relation_name!r})")
parsed_address = ipaddress.ip_address(address) # raises ValueError if not an IP
if cidr is None:
if isinstance(parsed_address, ipaddress.IPv4Address):
cidr = str(ipaddress.IPv4Network(address + '/24', strict=False))
else:
cidr = str(ipaddress.IPv6Network(address + '/64', strict=False))
if ingress_addresses is None:
ingress_addresses = [address]
if egress_subnets is None:
egress_subnets = [cidr]
data = {
'bind-addresses': [{
'interface-name': interface,
'addresses': [
{'cidr': cidr, 'value': address},
],
}],
'egress-subnets': list(egress_subnets),
'ingress-addresses': list(ingress_addresses),
}
self._backend._networks[endpoint, relation_id] = data
def _get_backend_calls(self, reset: bool = True) -> List[Tuple[Any, ...]]:
"""Return the calls that we have made to the TestingModelBackend.
This is useful mostly for testing the framework itself, so that we can assert that we
do/don't trigger extra calls.
Args:
reset: If True, reset the calls list back to empty, if false, the call list is
preserved.
Return:
``[(call1, args...), (call2, args...)]``
"""
calls = self._backend._calls.copy()
if reset:
self._backend._calls.clear()
return calls
[docs] def add_model_secret(self, owner: AppUnitOrName, content: Dict[str, str]) -> str:
"""Add a secret owned by the remote application or unit specified.
This is named :code:`add_model_secret` instead of :code:`add_secret`
to avoid confusion with the :meth:`ops.Application.add_secret`
and :meth:`ops.Unit.add_secret` methods used by secret owner
charms.
Args:
owner: The name of the remote application (or specific remote
unit) that will own the secret.
content: A key-value mapping containing the payload of the secret,
for example :code:`{"password": "foo123"}`.
Return:
The ID of the newly-secret added.
"""
owner_name = _get_app_or_unit_name(owner)
model.Secret._validate_content(content)
return self._backend._secret_add(content, owner_name)
def _ensure_secret(self, secret_id: str) -> '_Secret':
secret = self._backend._get_secret(secret_id)
if secret is None:
raise RuntimeError(f'Secret {secret_id!r} not found')
return secret
[docs] def set_secret_content(self, secret_id: str, content: Dict[str, str]):
"""Update a secret's content, add a new revision, and fire *secret-changed*.
Args:
secret_id: The ID of the secret to update. This should normally be
the return value of :meth:`add_model_secret`.
content: A key-value mapping containing the new payload.
"""
model.Secret._validate_content(content)
secret = self._ensure_secret(secret_id)
if secret.owner_name in [self.model.app.name, self.model.unit.name]:
raise RuntimeError(f'Secret {secret_id!r} owned by the charm under test, '
f"can't call set_secret_content")
new_revision = _SecretRevision(
revision=secret.revisions[-1].revision + 1,
content=content,
)
secret.revisions.append(new_revision)
self.charm.on.secret_changed.emit(secret_id, secret.label)
[docs] def grant_secret(self, secret_id: str, observer: AppUnitOrName):
"""Grant read access to this secret for the given observer application or unit.
If the given application or unit has already been granted access to
this secret, do nothing.
Args:
secret_id: The ID of the secret to grant access to. This should
normally be the return value of :meth:`add_model_secret`.
observer: The name of the application (or specific unit) to grant
access to. A relation between this application and the charm
under test must already have been created.
"""
secret = self._ensure_secret(secret_id)
if secret.owner_name in [self.model.app.name, self.model.unit.name]:
raise RuntimeError(f'Secret {secret_id!r} owned by the charm under test, "'
f"can't call grant_secret")
app_or_unit_name = _get_app_or_unit_name(observer)
relation_id = self._secret_relation_id_to(secret)
if relation_id not in secret.grants:
secret.grants[relation_id] = set()
secret.grants[relation_id].add(app_or_unit_name)
[docs] def revoke_secret(self, secret_id: str, observer: AppUnitOrName):
"""Revoke read access to this secret for the given observer application or unit.
If the given application or unit does not have access to this secret,
do nothing.
Args:
secret_id: The ID of the secret to revoke access for. This should
normally be the return value of :meth:`add_model_secret`.
observer: The name of the application (or specific unit) to revoke
access to. A relation between this application and the charm under
test must have already been created.
"""
secret = self._ensure_secret(secret_id)
if secret.owner_name in [self.model.app.name, self.model.unit.name]:
raise RuntimeError(f'Secret {secret_id!r} owned by the charm under test, "'
f"can't call revoke_secret")
app_or_unit_name = _get_app_or_unit_name(observer)
relation_id = self._secret_relation_id_to(secret)
if relation_id not in secret.grants:
return
secret.grants[relation_id].discard(app_or_unit_name)
def _secret_relation_id_to(self, secret: '_Secret') -> int:
"""Get the relation ID of relation between this charm and the secret owner."""
owner_app = secret.owner_name.split('/')[0]
relation_id = self._backend._relation_id_to(owner_app)
if relation_id is None:
raise RuntimeError(f'No relation between this charm ({self.model.app.name}) '
f'and secret owner ({owner_app})')
return relation_id
[docs] def get_secret_grants(self, secret_id: str, relation_id: int) -> Set[str]:
"""Return the set of app and unit names granted to secret for this relation.
Args:
secret_id: The ID of the secret to get grants for.
relation_id: The ID of the relation granted access.
"""
secret = self._ensure_secret(secret_id)
return secret.grants.get(relation_id, set())
[docs] def get_secret_revisions(self, secret_id: str) -> List[int]:
"""Return the list of revision IDs for the given secret, oldest first.
Args:
secret_id: The ID of the secret to get revisions for.
"""
secret = self._ensure_secret(secret_id)
return [r.revision for r in secret.revisions]
[docs] def trigger_secret_rotation(self, secret_id: str, *, label: Optional[str] = None):
"""Trigger a secret-rotate event for the given secret.
This event is fired by Juju when a secret's rotation time elapses,
however, time-based events cannot be simulated appropriately in the
harness, so this fires it manually.
Args:
secret_id: The ID of the secret associated with the event.
label: Label value to send to the event. If None, the secret's
label is used.
"""
secret = self._ensure_secret(secret_id)
if label is None:
label = secret.label
self.charm.on.secret_rotate.emit(secret_id, label)
[docs] def trigger_secret_removal(self, secret_id: str, revision: int, *,
label: Optional[str] = None):
"""Trigger a secret-remove event for the given secret and revision.
This event is fired by Juju for a specific revision when all the
secret's observers have refreshed to a later revision, however, in the
harness call this method to fire the event manually.
Args:
secret_id: The ID of the secret associated with the event.
revision: Revision number to provide to the event. This should be
an item from the list returned by :meth:`get_secret_revisions`.
label: Label value to send to the event. If None, the secret's
label is used.
"""
secret = self._ensure_secret(secret_id)
if label is None:
label = secret.label
self.charm.on.secret_remove.emit(secret_id, label, revision)
[docs] def trigger_secret_expiration(self, secret_id: str, revision: int, *,
label: Optional[str] = None):
"""Trigger a secret-expired event for the given secret.
This event is fired by Juju when a secret's expiration time elapses,
however, time-based events cannot be simulated appropriately in the
harness, so this fires it manually.
Args:
secret_id: The ID of the secret associated with the event.
revision: Revision number to provide to the event. This should be
an item from the list returned by :meth:`get_secret_revisions`.
label: Label value to send to the event. If None, the secret's
label is used.
"""
secret = self._ensure_secret(secret_id)
if label is None:
label = secret.label
self.charm.on.secret_expired.emit(secret_id, label, revision)
[docs] def get_filesystem_root(self, container: Union[str, Container]) -> pathlib.Path:
"""Return the temp directory path harness will use to simulate the container filesystem.
In a real container runtime, each container has an isolated root filesystem.
To simulate this behaviour, the testing harness manages a temporary directory for
each container. Any Pebble filesystem API calls will be translated
and mapped to this directory, as if the directory was the container's
filesystem root.
This process is quite similar to the ``chroot`` command. Charm tests should
treat the returned directory as the container's root directory (``/``).
The testing harness will not create any files or directories inside the
simulated container's root directory; it's up to the test to populate the container's
root directory with any files or directories the charm needs.
Regarding the file ownership: unprivileged users are unable to create files with distinct
ownership. To circumvent this limitation, the testing harness maps all user and group
options related to file operations to match the current user and group.
Example usage::
# charm.py
import ops
class ExampleCharm(ops.CharmBase):
def __init__(self, *args):
super().__init__(*args)
self.hostname = open("/etc/hostname").read()
# test_charm.py
from ops.testing import Harness
harness = Harness(ExampleCharm)
root = harness.get_filesystem_root("mycontainer")
(root / "etc" / "hostname").write_text("hostname.example.com")
harness.begin()
Args:
container: The name of the container or the container instance.
Return:
The path of the temporary directory associated with the specified container.
"""
# it's okay to access the container directly in this context, as its creation has already
# been ensured during the model's initialization.
if isinstance(container, str):
container_name = container
else:
container_name = container.name
return self._backend._pebble_clients[container_name]._root
[docs] def evaluate_status(self) -> None:
"""Trigger the collect-status events and set application and/or unit status.
This will always trigger ``collect_unit_status``, and set the unit status if any
statuses were added.
If running on the leader unit (:meth:`set_leader` has been called with ``True``),
this will trigger ``collect_app_status``, and set the application status if any
statuses were added.
Tests should normally call this and then assert that ``self.model.app.status``
or ``self.model.unit.status`` is the value expected.
"""
charm._evaluate_status(self.charm)
[docs] def handle_exec(self,
container: Union[str, Container],
command_prefix: Sequence[str],
*,
handler: Optional[ExecHandler] = None,
result: Optional[Union[int, str, bytes, ExecResult]] = None):
r"""Register a handler to simulate the Pebble command execution.
This allows a test harness to simulate the behavior of running commands in a container.
When :meth:`ops.Container.exec` is triggered, the registered handler is used to
generate stdout and stderr for the simulated execution.
A ``handler`` or a ``result`` may be provided, but not both:
- A ``handler`` is a function accepting :class:`ops.testing.ExecArgs` and returning
:class:`ops.testing.ExecResult` as the simulated process outcome. For cases that
have side effects but don't return output, the handler can return ``None``, which
is equivalent to returning ``ExecResult()``.
- A ``result`` is for simulations that don't need to inspect the ``exec`` arguments; the
output or exit code is provided directly. Setting ``result`` to str or bytes means
use that string as stdout (with exit code 0); setting ``result`` to int means return
that exit code (and no stdout).
If ``handle_exec`` is called more than once with overlapping command prefixes, the
longest match takes precedence. The registration of an execution handler can be updated by
re-registering with the same command prefix.
The execution handler receives the timeout value in the ``ExecArgs``. If needed,
it can raise a ``TimeoutError`` to inform the harness that a timeout occurred.
If :meth:`ops.Container.exec` is called with ``combine_stderr=True``, the execution
handler should, if required, weave the simulated standard error into the standard output.
The harness checks the result and will raise an exception if stderr is non-empty.
Args:
container: The specified container or its name.
command_prefix: The command prefix to register against.
handler: A handler function that simulates the command's execution.
result: A simplified form to specify the command's simulated result.
Example usage::
# produce no output and return 0 for every command
harness.handle_exec('container', [], result=0)
# simple example that just produces output (exit code 0)
harness.handle_exec('webserver', ['ls', '/etc'], result='passwd\nprofile\n')
# slightly more complex (use stdin)
harness.handle_exec(
'c1', ['sha1sum'],
handler=lambda args: ExecResult(stdout=hashlib.sha1(args.stdin).hexdigest()))
# more complex example using args.command
def docker_handler(args: testing.ExecArgs) -> testing.ExecResult:
match args.command:
case ['docker', 'run', image]:
return testing.ExecResult(stdout=f'running {image}')
case ['docker', 'ps']:
return testing.ExecResult(stdout='CONTAINER ID IMAGE ...')
case _:
return testing.ExecResult(exit_code=1, stderr='unknown command')
harness.handle_exec('database', ['docker'], handler=docker_handler)
# handle timeout
def handle_timeout(args: testing.ExecArgs) -> int:
if args.timeout is not None and args.timeout < 10:
raise TimeoutError
return 0
harness.handle_exec('database', ['foo'], handler=handle_timeout)
"""
if (handler is None and result is None) or (handler is not None and result is not None):
raise TypeError("Either handler or result must be provided, but not both.")
container_name = container if isinstance(container, str) else container.name
if result is not None:
if isinstance(result, int) and not isinstance(result, bool):
result = ExecResult(exit_code=result)
elif isinstance(result, (str, bytes)):
result = ExecResult(stdout=result)
elif not isinstance(result, ExecResult):
raise TypeError(
f"result must be int, str, bytes, or ExecResult, "
f"not {result.__class__.__name__}")
self._backend._pebble_clients[container_name]._handle_exec(
command_prefix=command_prefix,
handler=(lambda _: result) if handler is None else handler # type: ignore
)
def _get_app_or_unit_name(app_or_unit: AppUnitOrName) -> str:
"""Return name of given application or unit (return strings directly)."""
if isinstance(app_or_unit, model.Application):
return app_or_unit.name
elif isinstance(app_or_unit, model.Unit):
return app_or_unit.name
elif isinstance(app_or_unit, str):
return app_or_unit
else:
raise TypeError(f'Expected Application | Unit | str, got {type(app_or_unit)}')
def _record_calls(cls: Any):
"""Replace methods on cls with methods that record that they have been called.
Iterate all attributes of cls, and for public methods, replace them with a wrapped method
that records the method called along with the arguments and keyword arguments.
"""
for meth_name, orig_method in cls.__dict__.items():
if meth_name.startswith('_'):
continue
def decorator(orig_method: Any):
def wrapped(self: '_TestingModelBackend', *args: Any, **kwargs: Any):
full_args = (orig_method.__name__,) + args
if kwargs:
full_args = full_args + (kwargs,)
self._calls.append(full_args)
return orig_method(self, *args, **kwargs)
return wrapped
setattr(cls, meth_name, decorator(orig_method))
return cls
def _copy_docstrings(source_cls: Any):
"""Copy the docstrings from source_cls to target_cls.
Use this as:
@_copy_docstrings(source_class)
class TargetClass:
And for any public method that exists on both classes, it will copy the
__doc__ for that method.
"""
def decorator(target_cls: Any):
for meth_name, _ in target_cls.__dict__.items():
if meth_name.startswith('_'):
continue
source_method = source_cls.__dict__.get(meth_name)
if source_method is not None and source_method.__doc__:
target_cls.__dict__[meth_name].__doc__ = source_method.__doc__
return target_cls
return decorator
@_record_calls
class _TestingConfig(Dict[str, Union[str, int, float, bool]]):
"""Represents the Juju Config."""
_supported_types = {
'string': str,
'boolean': bool,
'int': int,
'float': float
}
def __init__(self, config: 'RawConfig'):
super().__init__()
self._spec = config
self._defaults = self._load_defaults(config)
for key, value in self._defaults.items():
if value is None:
continue
self._config_set(key, value)
@staticmethod
def _load_defaults(charm_config: 'RawConfig') -> Dict[str, Union[str, int, float, bool]]:
"""Load default values from config.yaml.
Handle the case where a user doesn't supply explicit config snippets.
"""
if not charm_config:
return {}
cfg: Dict[str, '_ConfigOption'] = charm_config.get('options', {})
return {key: value.get('default', None) for key, value in cfg.items()}
def _config_set(self, key: str, value: Union[str, int, float, bool]):
# this is only called by the harness itself
# we don't do real serialization/deserialization, but we do check that the value
# has the expected type.
option = self._spec.get('options', {}).get(key)
if not option:
raise RuntimeError('Unknown config option {}; '
'not declared in `config.yaml`.'
'Check https://juju.is/docs/sdk/config for the '
'spec.'.format(key))
declared_type = option.get('type')
if not declared_type:
raise RuntimeError('Incorrectly formatted `options.yaml`, option {} '
'is expected to declare a `type`.'.format(key))
if declared_type not in self._supported_types:
raise RuntimeError(
'Incorrectly formatted `options.yaml`: `type` needs to be one '
'of [{}], not {}.'.format(', '.join(self._supported_types), declared_type))
if type(value) != self._supported_types[declared_type]:
raise RuntimeError('Config option {} is supposed to be of type '
'{}, not `{}`.'.format(key, declared_type,
type(value).__name__))
# call 'normal' setattr.
dict.__setitem__(self, key, value) # type: ignore
def __setitem__(self, key: Any, value: Any):
# if a charm attempts to config[foo] = bar:
raise TypeError("'ConfigData' object does not support item assignment")
class _TestingRelationDataContents(Dict[str, str]):
def __setitem__(self, key: str, value: str):
if not isinstance(key, str):
raise model.RelationDataError(
f'relation data keys must be strings, not {type(key)}')
if not isinstance(value, str):
raise model.RelationDataError(
f'relation data values must be strings, not {type(value)}')
super().__setitem__(key, value)
def copy(self):
return _TestingRelationDataContents(super().copy())
@dataclasses.dataclass
class _SecretRevision:
revision: int
content: Dict[str, str]
@dataclasses.dataclass
class _Secret:
id: str
owner_name: str
revisions: List[_SecretRevision]
rotate_policy: Optional[str]
expire_time: Optional[datetime.datetime]
label: Optional[str] = None
description: Optional[str] = None
tracked: int = 1
grants: Dict[int, Set[str]] = dataclasses.field(default_factory=dict)
@_copy_docstrings(model._ModelBackend)
@_record_calls
class _TestingModelBackend:
"""This conforms to the interface for ModelBackend but provides canned data.
DO NOT use this class directly, it is used by `Harness`_ to drive the model.
`Harness`_ is responsible for maintaining the internal consistency of the values here,
as the only public methods of this type are for implementing ModelBackend.
"""
def __init__(self, unit_name: str, meta: charm.CharmMeta, config: 'RawConfig'):
self.unit_name = unit_name
self.app_name = self.unit_name.split('/')[0]
self.model_name = None
self.model_uuid = str(uuid.uuid4())
self._harness_tmp_dir = tempfile.TemporaryDirectory(prefix='ops-harness-')
self._harness_storage_path = pathlib.Path(self._harness_tmp_dir.name) / "storages"
self._harness_container_path = pathlib.Path(self._harness_tmp_dir.name) / "containers"
self._harness_storage_path.mkdir()
self._harness_container_path.mkdir()
# this is used by the _record_calls decorator
self._calls: List[Tuple[Any, ...]] = []
self._meta = meta
# relation name to [relation_ids,...]
self._relation_ids_map: Dict[str, List[int]] = {}
# reverse map from relation_id to relation_name
self._relation_names: Dict[int, str] = {}
# relation_id: [unit_name,...]
self._relation_list_map: Dict[int, List[str]] = {}
# {relation_id: {name: Dict[str: str]}}
self._relation_data_raw: Dict[int, Dict[str, Dict[str, str]]] = {}
# {relation_id: {"app": app_name, "units": ["app/0",...]}
self._relation_app_and_units: Dict[int, _RelationEntities] = {}
self._config = _TestingConfig(config)
self._is_leader: bool = False
# {resource_name: resource_content}
# where resource_content is (path, content)
self._resources_map: Dict[str, Tuple[str, Union[str, bytes]]] = {}
# fixme: understand how this is used and adjust the type
self._pod_spec: Optional[Tuple[model.K8sSpec, Any]] = None
self._app_status: _RawStatus = {'status': 'unknown', 'message': ''}
self._unit_status: _RawStatus = {'status': 'maintenance', 'message': ''}
self._workload_version: Optional[str] = None
self._resource_dir: Optional[tempfile.TemporaryDirectory[Any]] = None
# Format:
# { "storage_name": {"<ID1>": { <other-properties> }, ... }
# <ID1>: device id that is key for given storage_name
# Initialize the _storage_list with values present on metadata.yaml
self._storage_list: Dict[str, Dict[int, Dict[str, Any]]] = {
k: {} for k in self._meta.storages}
self._storage_attached: Dict[str, Set[int]] = {k: set() for k in self._meta.storages}
self._storage_index_counter = 0
# {container_name : _TestingPebbleClient}
self._pebble_clients: Dict[str, _TestingPebbleClient] = {}
self._pebble_clients_can_connect: Dict[_TestingPebbleClient, bool] = {}
self._planned_units: Optional[int] = None
self._hook_is_running = ''
self._secrets: List[_Secret] = []
self._opened_ports: Set[model.Port] = set()
self._networks: Dict[Tuple[Optional[str], Optional[int]], _NetworkDict] = {}
def _validate_relation_access(self, relation_name: str, relations: List[model.Relation]):
"""Ensures that the named relation exists/has been added.
This is called whenever relation data is accessed via model.get_relation(...).
"""
if len(relations) > 0:
return
valid_relation_endpoints: List[str] = list(self._meta.peers.keys())
valid_relation_endpoints.extend(self._meta.requires.keys())
valid_relation_endpoints.extend(self._meta.provides.keys())
if self._hook_is_running == 'leader_elected' and relation_name in valid_relation_endpoints:
raise RuntimeError(
'cannot access relation data without first adding the relation: '
'use Harness.add_relation({!r}, <app>) before calling set_leader'
.format(relation_name))
def _can_connect(self, pebble_client: '_TestingPebbleClient') -> bool:
"""Returns whether the mock client is active and can support API calls with no errors."""
return self._pebble_clients_can_connect[pebble_client]
def _set_can_connect(self, pebble_client: '_TestingPebbleClient', val: bool):
"""Manually sets the can_connect state for the given mock client."""
if pebble_client not in self._pebble_clients_can_connect:
msg = 'cannot set can_connect for the client - are you running a "real" pebble test?'
raise RuntimeError(msg)
self._pebble_clients_can_connect[pebble_client] = val
def _cleanup(self):
if self._resource_dir is not None:
self._resource_dir.cleanup()
self._resource_dir = None
self._harness_tmp_dir.cleanup()
def _get_resource_dir(self) -> pathlib.Path:
if self._resource_dir is None:
# In actual Juju, the resource path for a charm's resource is
# $AGENT_DIR/resources/$RESOURCE_NAME/$RESOURCE_FILENAME
# However, charms shouldn't depend on this.
self._resource_dir = tempfile.TemporaryDirectory(prefix='tmp-ops-test-resource-')
res_dir_name = cast(str, self._resource_dir.name)
return pathlib.Path(res_dir_name)
def relation_ids(self, relation_name: str) -> List[int]:
try:
return self._relation_ids_map[relation_name]
except KeyError as e:
if relation_name not in self._meta.relations:
raise model.ModelError(f'{relation_name} is not a known relation') from e
no_ids: List[int] = []
return no_ids
def relation_list(self, relation_id: int):
try:
return self._relation_list_map[relation_id]
except KeyError as e:
raise model.RelationNotFoundError from e
def relation_remote_app_name(self, relation_id: int) -> Optional[str]:
if relation_id not in self._relation_app_and_units:
# Non-existent or dead relation
return None
if 'relation_broken' in self._hook_is_running:
# TODO: if juju ever starts setting JUJU_REMOTE_APP in relation-broken hooks runs,
# then we should kill this if clause.
# See https://bugs.launchpad.net/juju/+bug/1960934
return None
return self._relation_app_and_units[relation_id]['app']
def relation_get(self, relation_id: int, member_name: str, is_app: bool):
if 'relation_broken' in self._hook_is_running and not self.relation_remote_app_name(
relation_id) and member_name != self.app_name and member_name != self.unit_name:
# TODO: if juju gets fixed to set JUJU_REMOTE_APP for this case, then we may opt to
# allow charms to read/get that (stale) relation data.
# See https://bugs.launchpad.net/juju/+bug/1960934
raise RuntimeError(
'remote-side relation data cannot be accessed during a relation-broken event')
if is_app and '/' in member_name:
member_name = member_name.split('/')[0]
if relation_id not in self._relation_data_raw:
raise model.RelationNotFoundError()
return self._relation_data_raw[relation_id][member_name]
def update_relation_data(self, relation_id: int, _entity: Union[model.Unit, model.Application],
key: str, value: str):
# this is where the 'real' backend would call relation-set.
raw_data = self._relation_data_raw[relation_id][_entity.name]
if value == '':
raw_data.pop(key, None)
else:
raw_data[key] = value
def relation_set(self, relation_id: int, key: str, value: str, is_app: bool):
if not isinstance(is_app, bool):
raise TypeError('is_app parameter to relation_set must be a boolean')
if 'relation_broken' in self._hook_is_running and not self.relation_remote_app_name(
relation_id):
raise RuntimeError(
'remote-side relation data cannot be accessed during a relation-broken event')
if relation_id not in self._relation_data_raw:
raise RelationNotFoundError(relation_id)
relation = self._relation_data_raw[relation_id]
if is_app:
bucket_key = self.app_name
else:
bucket_key = self.unit_name
if bucket_key not in relation:
relation[bucket_key] = {}
bucket = relation[bucket_key]
if value == '':
bucket.pop(key, None)
else:
bucket[key] = value
def config_get(self) -> _TestingConfig:
return self._config
def is_leader(self):
return self._is_leader
def application_version_set(self, version: str):
self._workload_version = version
def resource_get(self, resource_name: str):
if resource_name not in self._resources_map:
raise model.ModelError(
"ERROR could not download resource: HTTP request failed: "
"Get https://.../units/unit-{}/resources/{}: resource#{}/{} not found".format(
self.unit_name.replace('/', '-'), resource_name, self.app_name, resource_name
))
filename, contents = self._resources_map[resource_name]
resource_dir = self._get_resource_dir()
resource_filename = resource_dir / resource_name / filename
if not resource_filename.exists():
if isinstance(contents, bytes):
mode = 'wb'
else:
mode = 'wt'
resource_filename.parent.mkdir(exist_ok=True)
with resource_filename.open(mode=mode) as resource_file:
resource_file.write(contents)
return resource_filename
def pod_spec_set(self, spec: 'model.K8sSpec', k8s_resources: Any): # fixme: any
self._pod_spec = (spec, k8s_resources)
def status_get(self, *, is_app: bool = False):
if is_app:
return self._app_status
else:
return self._unit_status
def status_set(self, status: '_StatusName', message: str = '', *, is_app: bool = False):
if is_app:
self._app_status = {'status': status, 'message': message}
else:
self._unit_status = {'status': status, 'message': message}
def storage_list(self, name: str, include_detached: bool = False):
"""Returns a list of all attached storage mounts for the given storage name.
Args:
name: name (i.e. from metadata.yaml).
include_detached: True to include unattached storage mounts as well.
"""
return list(index for index in self._storage_list[name]
if include_detached or self._storage_is_attached(name, index))
def storage_get(self, storage_name_id: str, attribute: str) -> Any:
name, index = storage_name_id.split("/", 1)
index = int(index)
try:
if index not in self._storage_attached[name]:
raise KeyError() # Pretend the key isn't there
else:
return self._storage_list[name][index][attribute]
except KeyError:
raise model.ModelError(
f'ERROR invalid value "{name}/{index}" for option -s: storage not found')
def storage_add(self, name: str, count: int = 1) -> List[int]:
if '/' in name:
raise model.ModelError('storage name cannot contain "/"')
if name not in self._storage_list:
self._storage_list[name] = {}
result: List[int] = []
for _ in range(count):
index = self._storage_index_counter
self._storage_index_counter += 1
self._storage_list[name][index] = {
'location': os.path.join(self._harness_storage_path, name, str(index)),
}
result.append(index)
return result
def _storage_detach(self, storage_id: str):
# NOTE: This is an extra function for _TestingModelBackend to simulate
# detachment of a storage unit. This is not present in ops.model._ModelBackend.
name, index = storage_id.split('/', 1)
index = int(index)
for container, client in self._pebble_clients.items():
for _, mount in self._meta.containers[container].mounts.items():
if mount.storage != name:
continue
root = client._root
(root / mount.location[1:]).unlink()
if self._storage_is_attached(name, index):
self._storage_attached[name].remove(index)
def _storage_attach(self, storage_id: str):
"""Mark the named storage_id as attached and return True if it was previously detached."""
# NOTE: This is an extra function for _TestingModelBackend to simulate
# re-attachment of a storage unit. This is not present in
# ops.model._ModelBackend.
name, index = storage_id.split('/', 1)
for container, client in self._pebble_clients.items():
for _, mount in self._meta.containers[container].mounts.items():
if mount.storage != name:
continue
for index, store in self._storage_list[mount.storage].items():
root = client._root
mounting_dir = root / mount.location[1:]
mounting_dir.parent.mkdir(parents=True, exist_ok=True)
target_dir = pathlib.Path(store["location"])
target_dir.mkdir(parents=True, exist_ok=True)
mounting_dir.symlink_to(target_dir)
index = int(index)
if not self._storage_is_attached(name, index):
self._storage_attached[name].add(index)
return True
return False
def _storage_is_attached(self, storage_name: str, storage_index: int):
return storage_index in self._storage_attached[storage_name]
def _storage_remove(self, storage_id: str):
# NOTE: This is an extra function for _TestingModelBackend to simulate
# full removal of a storage unit. This is not present in
# ops.model._ModelBackend.
self._storage_detach(storage_id)
name, index = storage_id.split('/', 1)
index = int(index)
self._storage_list[name].pop(index, None)
def action_get(self): # type:ignore
raise NotImplementedError(self.action_get) # type:ignore
def action_set(self, results): # type:ignore
raise NotImplementedError(self.action_set) # type:ignore
def action_log(self, message): # type:ignore
raise NotImplementedError(self.action_log) # type:ignore
def action_fail(self, message=''): # type:ignore
raise NotImplementedError(self.action_fail) # type:ignore
def network_get(self, endpoint_name: str, relation_id: Optional[int] = None) -> '_NetworkDict':
data = self._networks.get((endpoint_name, relation_id))
if data is not None:
return data
if relation_id is not None:
# Fall back to the default binding for this endpoint
data = self._networks.get((endpoint_name, None))
if data is not None:
return data
# No custom data per relation ID or binding, return the default binding
data = self._networks.get((None, None))
if data is not None:
return data
raise RelationNotFoundError
def add_metrics(self, metrics, labels=None): # type:ignore
raise NotImplementedError(self.add_metrics) # type:ignore
@classmethod
def log_split(cls, message, max_len=model.MAX_LOG_LINE_LEN): # type:ignore
raise NotImplementedError(cls.log_split) # type:ignore
def juju_log(self, level, msg): # type:ignore
raise NotImplementedError(self.juju_log) # type:ignore
def get_pebble(self, socket_path: str) -> '_TestingPebbleClient':
container = socket_path.split('/')[3] # /charm/containers/<container_name>/pebble.socket
client = self._pebble_clients.get(container, None)
if client is None:
container_root = self._harness_container_path / container
container_root.mkdir()
client = _TestingPebbleClient(self, container_root=container_root)
# we need to know which container a new pebble client belongs to
# so we can figure out which storage mounts must be simulated on
# this pebble client's mock file systems when storage is
# attached/detached later.
self._pebble_clients[container] = client
self._pebble_clients_can_connect[client] = False
return client
def planned_units(self) -> int:
"""Simulate fetching the number of planned application units from the model.
If self._planned_units is None, then we simulate what the Juju controller will do, which is
to report the number of peers, plus one (we include this unit in the count). This can be
overridden for testing purposes: a charm author can set the number of planned units
explicitly by calling `Harness.set_planned_units`
"""
if self._planned_units is not None:
return self._planned_units
units: Set[str] = set()
peer_names: Set[str] = set(self._meta.peers.keys())
for peer_id, peer_name in self._relation_names.items():
if peer_name not in peer_names:
continue
peer_units = self._relation_list_map[peer_id]
units.update(peer_units)
return len(units) + 1 # Account for this unit.
def _get_secret(self, id: str) -> Optional[_Secret]:
return next((s for s in self._secrets if s.id == id), None)
def _ensure_secret(self, id: str) -> _Secret:
secret = self._get_secret(id)
if secret is None:
raise model.SecretNotFoundError(f'Secret {id!r} not found')
return secret
def _ensure_secret_id_or_label(self, id: Optional[str], label: Optional[str]):
secret = None
if id is not None:
secret = self._get_secret(id)
if secret is not None and label is not None:
secret.label = label # both id and label given, update label
if secret is None and label is not None:
secret = next((s for s in self._secrets if s.label == label), None)
if secret is None:
raise model.SecretNotFoundError(
f'Secret not found by ID ({id!r}) or label ({label!r})')
return secret
def secret_get(self, *,
id: Optional[str] = None,
label: Optional[str] = None,
refresh: bool = False,
peek: bool = False) -> Dict[str, str]:
secret = self._ensure_secret_id_or_label(id, label)
# Check that caller has permission to get this secret
if secret.owner_name in [self.app_name, self.unit_name]:
# Owner or peer is calling, get latest revision
peek = True
if refresh:
raise ValueError('Secret owner cannot use refresh=True')
else:
# Observer is calling: does secret have a grant on relation between
# this charm (the observer) and the secret owner's app?
owner_app = secret.owner_name.split('/')[0]
relation_id = self._relation_id_to(owner_app)
if relation_id is None:
raise model.SecretNotFoundError(
f'Secret {id!r} does not have relation to {owner_app!r}')
grants = secret.grants.get(relation_id, set())
if self.app_name not in grants and self.unit_name not in grants:
raise model.SecretNotFoundError(
f'Secret {id!r} not granted access to {self.app_name!r} or {self.unit_name!r}')
if peek or refresh:
revision = secret.revisions[-1]
if refresh:
secret.tracked = revision.revision
else:
revision = next((r for r in secret.revisions if r.revision == secret.tracked), None)
if revision is None:
raise model.SecretNotFoundError(f'Secret {id!r} tracked revision was removed')
return revision.content
def _relation_id_to(self, remote_app: str) -> Optional[int]:
"""Return relation ID of relation from charm's app to remote app."""
for relation_id, app_units in self._relation_app_and_units.items():
if app_units['app'] == remote_app:
return relation_id
return None
def _ensure_secret_owner(self, secret: _Secret):
if secret.owner_name not in [self.app_name, self.unit_name]:
raise model.SecretNotFoundError(
f'You must own secret {secret.id!r} to perform this operation')
def secret_info_get(self, *,
id: Optional[str] = None,
label: Optional[str] = None) -> model.SecretInfo:
secret = self._ensure_secret_id_or_label(id, label)
self._ensure_secret_owner(secret)
rotates = None
rotation = None
if secret.rotate_policy is not None:
rotation = model.SecretRotate(secret.rotate_policy)
if secret.rotate_policy != model.SecretRotate.NEVER:
# Just set a fake rotation time some time in the future
rotates = datetime.datetime.now() + datetime.timedelta(days=1)
return model.SecretInfo(
id=secret.id,
label=secret.label,
revision=secret.tracked,
expires=secret.expire_time,
rotation=rotation,
rotates=rotates,
)
def secret_set(self, id: str, *,
content: Optional[Dict[str, str]] = None,
label: Optional[str] = None,
description: Optional[str] = None,
expire: Optional[datetime.datetime] = None,
rotate: Optional[model.SecretRotate] = None) -> None:
secret = self._ensure_secret(id)
self._ensure_secret_owner(secret)
if content is None:
content = secret.revisions[-1].content
revision = _SecretRevision(
revision=secret.revisions[-1].revision + 1,
content=content
)
secret.revisions.append(revision)
if label is not None:
if label:
secret.label = label
else:
secret.label = None # clear label
if description is not None:
if description:
secret.description = description
else:
secret.description = None # clear description
if expire is not None:
secret.expire_time = expire
if rotate is not None:
if rotate != model.SecretRotate.NEVER:
secret.rotate_policy = rotate.value
else:
secret.rotate_policy = None # clear rotation policy
@classmethod
def _generate_secret_id(cls) -> str:
# Not a proper Juju secrets-style xid, but that's okay
return f"secret:{str(uuid.uuid4())}"
def secret_add(self, content: Dict[str, str], *,
label: Optional[str] = None,
description: Optional[str] = None,
expire: Optional[datetime.datetime] = None,
rotate: Optional[model.SecretRotate] = None,
owner: Optional[str] = None) -> str:
if owner == 'unit':
owner_name = self.unit_name
else:
owner_name = self.app_name
return self._secret_add(content, owner_name,
label=label,
description=description,
expire=expire,
rotate=rotate)
def _secret_add(self, content: Dict[str, str], owner_name: str, *,
label: Optional[str] = None,
description: Optional[str] = None,
expire: Optional[datetime.datetime] = None,
rotate: Optional[model.SecretRotate] = None) -> str:
id = self._generate_secret_id()
revision = _SecretRevision(
revision=1,
content=content,
)
secret = _Secret(
id=id,
owner_name=owner_name,
revisions=[revision],
rotate_policy=rotate.value if rotate is not None else None,
expire_time=expire,
label=label,
description=description,
)
self._secrets.append(secret)
return id
def secret_grant(self, id: str, relation_id: int, *, unit: Optional[str] = None) -> None:
secret = self._ensure_secret(id)
self._ensure_secret_owner(secret)
if relation_id not in secret.grants:
secret.grants[relation_id] = set()
remote_app_name = self._relation_app_and_units[relation_id]['app']
secret.grants[relation_id].add(unit or remote_app_name)
def secret_revoke(self, id: str, relation_id: int, *, unit: Optional[str] = None) -> None:
secret = self._ensure_secret(id)
self._ensure_secret_owner(secret)
if relation_id not in secret.grants:
return
remote_app_name = self._relation_app_and_units[relation_id]['app']
secret.grants[relation_id].discard(unit or remote_app_name)
def secret_remove(self, id: str, *, revision: Optional[int] = None) -> None:
secret = self._ensure_secret(id)
self._ensure_secret_owner(secret)
if revision is not None:
revisions = [r for r in secret.revisions if r.revision != revision]
if len(revisions) == len(secret.revisions):
raise model.SecretNotFoundError(f'Secret {id!r} revision {revision} not found')
if revisions:
secret.revisions = revisions
else:
# Last revision removed, remove entire secret
self._secrets = [s for s in self._secrets if s.id != id]
else:
self._secrets = [s for s in self._secrets if s.id != id]
def open_port(self, protocol: str, port: Optional[int] = None):
self._check_protocol_and_port(protocol, port)
protocol_lit = cast(Literal['tcp', 'udp', 'icmp'], protocol)
self._opened_ports.add(model.Port(protocol_lit, port))
def close_port(self, protocol: str, port: Optional[int] = None):
self._check_protocol_and_port(protocol, port)
protocol_lit = cast(Literal['tcp', 'udp', 'icmp'], protocol)
self._opened_ports.discard(model.Port(protocol_lit, port))
def opened_ports(self) -> Set[model.Port]:
return set(self._opened_ports)
def _check_protocol_and_port(self, protocol: str, port: Optional[int]):
# Simulate the error messages we get from Juju (not that charm tests
# should be testing details of error messages).
if protocol == 'icmp':
if port is not None:
raise model.ModelError(f'ERROR protocol "{protocol}" doesn\'t support any ports; got "{port}"\n') # NOQA: test_quote_backslashes
elif protocol in ['tcp', 'udp']:
if port is None:
raise model.ModelError(f'ERROR invalid port "{protocol}": strconv.Atoi: parsing "{protocol}": invalid syntax\n') # NOQA: test_quote_backslashes
if not (1 <= port <= 65535):
raise model.ModelError(f'ERROR port range bounds must be between 1 and 65535, got {port}-{port}\n') # NOQA: test_quote_backslashes
else:
raise model.ModelError(f'ERROR invalid protocol "{protocol}", expected "tcp", "udp", or "icmp"\n') # NOQA: test_quote_backslashes
@_copy_docstrings(pebble.ExecProcess)
class _TestingExecProcess:
def __init__(self,
command: List[str],
timeout: Optional[float],
exit_code: Optional[int],
stdin: Union[TextIO, BinaryIO, None],
stdout: Union[TextIO, BinaryIO, None],
stderr: Union[TextIO, BinaryIO, None],
is_timeout: bool):
self._command = command
self._timeout = timeout
self._is_timeout = is_timeout
if exit_code is None and not is_timeout:
raise ValueError("when is_timeout is False, exit_code must not be None")
self._exit_code = exit_code
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
def wait(self):
if self._is_timeout:
raise pebble.TimeoutError(
f'timed out waiting for change ({self._timeout} seconds)'
)
if self._exit_code != 0:
raise pebble.ExecError(self._command, cast(int, self._exit_code), None, None)
def wait_output(self) -> Tuple[AnyStr, Optional[AnyStr]]:
if self._is_timeout:
raise pebble.TimeoutError(
f'timed out waiting for change ({self._timeout} seconds)'
)
out_value = self.stdout.read() if self.stdout is not None else None
err_value = self.stderr.read() if self.stderr is not None else None
if self._exit_code != 0:
raise pebble.ExecError[AnyStr](self._command,
cast(int, self._exit_code),
cast(Union[AnyStr, None], out_value),
cast(Union[AnyStr, None], err_value))
return cast(AnyStr, out_value), cast(Union[AnyStr, None], err_value)
def send_signal(self, sig: Union[int, str]):
# the process is always terminated when ExecProcess is return in the simulation.
raise BrokenPipeError("[Errno 32] Broken pipe")
@_copy_docstrings(pebble.Client)
class _TestingPebbleClient:
"""This conforms to the interface for pebble.Client but provides canned data.
DO NOT use this class directly, it is used by `Harness`_ to run interactions with Pebble.
`Harness`_ is responsible for maintaining the internal consistency of the values here,
as the only public methods of this type are for implementing Client.
"""
def __init__(self, backend: _TestingModelBackend, container_root: pathlib.Path):
self._backend = _TestingModelBackend
self._layers: Dict[str, pebble.Layer] = {}
# Has a service been started/stopped?
self._service_status: Dict[str, pebble.ServiceStatus] = {}
self._root = container_root
self._backend = backend
self._exec_handlers: Dict[Tuple[str, ...], ExecHandler] = {}
def _handle_exec(self, command_prefix: Sequence[str], handler: ExecHandler):
prefix = tuple(command_prefix)
self._exec_handlers[prefix] = handler
def _check_connection(self):
if not self._backend._can_connect(self):
msg = ('Cannot connect to Pebble; did you forget to call '
'begin_with_initial_hooks() or set_can_connect()?')
raise pebble.ConnectionError(msg)
def get_system_info(self) -> pebble.SystemInfo:
self._check_connection()
return pebble.SystemInfo(version='1.0.0')
def get_warnings(
self, select: pebble.WarningState = pebble.WarningState.PENDING,
) -> List['pebble.Warning']:
raise NotImplementedError(self.get_warnings)
def ack_warnings(self, timestamp: datetime.datetime) -> int:
raise NotImplementedError(self.ack_warnings)
def get_changes(
self, select: pebble.ChangeState = pebble.ChangeState.IN_PROGRESS,
service: Optional[str] = None,
) -> List[pebble.Change]:
raise NotImplementedError(self.get_changes)
def get_change(self, change_id: pebble.ChangeID) -> pebble.Change:
raise NotImplementedError(self.get_change)
def abort_change(self, change_id: pebble.ChangeID) -> pebble.Change:
raise NotImplementedError(self.abort_change)
def autostart_services(self, timeout: float = 30.0, delay: float = 0.1):
self._check_connection()
for name, service in self._render_services().items():
# TODO: jam 2021-04-20 This feels awkward that Service.startup might be a string or
# might be an enum. Probably should make Service.startup a property rather than an
# attribute.
if service.startup == '':
startup = pebble.ServiceStartup.DISABLED
else:
startup = pebble.ServiceStartup(service.startup)
if startup == pebble.ServiceStartup.ENABLED:
self._service_status[name] = pebble.ServiceStatus.ACTIVE
def replan_services(self, timeout: float = 30.0, delay: float = 0.1):
return self.autostart_services(timeout, delay)
def start_services(
self, services: List[str], timeout: float = 30.0, delay: float = 0.1,
):
# A common mistake is to pass just the name of a service, rather than a list of services,
# so trap that so it is caught quickly.
if isinstance(services, str):
raise TypeError(f'start_services should take a list of names, not just "{services}"')
self._check_connection()
# Note: jam 2021-04-20 We don't implement ChangeID, but the default caller of this is
# Container.start() which currently ignores the return value
known_services = self._render_services()
# Names appear to be validated before any are activated, so do two passes
for name in services:
if name not in known_services:
# TODO: jam 2021-04-20 This needs a better error type
raise RuntimeError(f'400 Bad Request: service "{name}" does not exist')
for name in services:
self._service_status[name] = pebble.ServiceStatus.ACTIVE
def stop_services(
self, services: List[str], timeout: float = 30.0, delay: float = 0.1,
):
# handle a common mistake of passing just a name rather than a list of names
if isinstance(services, str):
raise TypeError(f'stop_services should take a list of names, not just "{services}"')
self._check_connection()
# Note: jam 2021-04-20 We don't implement ChangeID, but the default caller of this is
# Container.stop() which currently ignores the return value
known_services = self._render_services()
for name in services:
if name not in known_services:
# TODO: jam 2021-04-20 This needs a better error type
# 400 Bad Request: service "bal" does not exist
raise RuntimeError(f'400 Bad Request: service "{name}" does not exist')
for name in services:
self._service_status[name] = pebble.ServiceStatus.INACTIVE
def restart_services(
self, services: List[str], timeout: float = 30.0, delay: float = 0.1,
):
# handle a common mistake of passing just a name rather than a list of names
if isinstance(services, str):
raise TypeError(f'restart_services should take a list of names, not just "{services}"')
self._check_connection()
# Note: jam 2021-04-20 We don't implement ChangeID, but the default caller of this is
# Container.restart() which currently ignores the return value
known_services = self._render_services()
for name in services:
if name not in known_services:
# TODO: jam 2021-04-20 This needs a better error type
# 400 Bad Request: service "bal" does not exist
raise RuntimeError(f'400 Bad Request: service "{name}" does not exist')
for name in services:
self._service_status[name] = pebble.ServiceStatus.ACTIVE
def wait_change(
self, change_id: pebble.ChangeID, timeout: float = 30.0, delay: float = 0.1,
) -> pebble.Change:
raise NotImplementedError(self.wait_change)
def add_layer(
self, label: str, layer: Union[str, 'pebble.LayerDict', pebble.Layer], *,
combine: bool = False):
# I wish we could combine some of this helpful object corralling with the actual backend,
# rather than having to re-implement it. Maybe we could subclass
if not isinstance(label, str):
raise TypeError(f'label must be a str, not {type(label).__name__}')
if isinstance(layer, (str, dict)):
layer_obj = pebble.Layer(layer)
elif isinstance(layer, pebble.Layer):
layer_obj = layer
else:
raise TypeError(
f'layer must be str, dict, or pebble.Layer, not {type(layer).__name__}')
self._check_connection()
if label in self._layers:
if not combine:
raise RuntimeError(f'400 Bad Request: layer "{label}" already exists')
layer = self._layers[label]
for name, service in layer_obj.services.items():
# 'override' is actually single quoted in the real error, but
# it shouldn't be, hopefully that gets cleaned up.
if not service.override:
raise RuntimeError('500 Internal Server Error: layer "{}" must define'
'"override" for service "{}"'.format(label, name))
if service.override not in ('merge', 'replace'):
raise RuntimeError('500 Internal Server Error: layer "{}" has invalid '
'"override" value on service "{}"'.format(label, name))
elif service.override == 'replace':
layer.services[name] = service
elif service.override == 'merge':
if combine and name in layer.services:
s = layer.services[name]
s._merge(service)
else:
layer.services[name] = service
else:
self._layers[label] = layer_obj
def _render_services(self) -> Dict[str, pebble.Service]:
services: Dict[str, pebble.Service] = {}
for key in sorted(self._layers.keys()):
layer = self._layers[key]
for name, service in layer.services.items():
# TODO: (jam) 2021-04-07 have a way to merge existing services
services[name] = service
return services
def get_plan(self) -> pebble.Plan:
self._check_connection()
plan = pebble.Plan('{}')
services = self._render_services()
if not services:
return plan
for name in sorted(services.keys()):
plan.services[name] = services[name]
return plan
def get_services(self, names: Optional[List[str]] = None) -> List[pebble.ServiceInfo]:
if isinstance(names, str):
raise TypeError(f'start_services should take a list of names, not just "{names}"')
self._check_connection()
services = self._render_services()
infos: List[pebble.ServiceInfo] = []
if names is None:
names = sorted(services.keys())
for name in sorted(names):
try:
service = services[name]
except KeyError:
# in pebble, it just returns "nothing matched" if there are 0 matches,
# but it ignores services it doesn't recognize
continue
status = self._service_status.get(name, pebble.ServiceStatus.INACTIVE)
if service.startup == '':
startup = pebble.ServiceStartup.DISABLED
else:
startup = pebble.ServiceStartup(service.startup)
info = pebble.ServiceInfo(name,
startup=startup,
current=pebble.ServiceStatus(status))
infos.append(info)
return infos
@staticmethod
def _check_absolute_path(path: str):
if not path.startswith("/"):
raise pebble.PathError(
'generic-file-error',
f'paths must be absolute, got {path!r}'
)
def pull(self, path: str, *,
encoding: Optional[str] = 'utf-8') -> Union[BinaryIO, TextIO]:
self._check_connection()
self._check_absolute_path(path)
file_path = self._root / path[1:]
try:
return cast(
Union[BinaryIO, TextIO],
file_path.open("rb" if encoding is None else "r", encoding=encoding))
except FileNotFoundError:
raise pebble.PathError('not-found', f'stat {path}: no such file or directory')
except IsADirectoryError:
raise pebble.PathError('generic-file-error', f'can only read a regular file: "{path}"')
def push(
self, path: str, source: 'ReadableBuffer', *,
encoding: str = 'utf-8', make_dirs: bool = False, permissions: Optional[int] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None
) -> None:
self._check_connection()
if permissions is not None and not (0 <= permissions <= 0o777):
raise pebble.PathError(
'generic-file-error',
f'permissions not within 0o000 to 0o777: {permissions:#o}')
self._check_absolute_path(path)
file_path = self._root / path[1:]
if make_dirs and not file_path.parent.exists():
self.make_dir(
os.path.dirname(path),
make_parents=True,
permissions=None,
user_id=user_id,
user=user,
group_id=group_id,
group=group)
permissions = permissions if permissions is not None else 0o644
try:
if isinstance(source, str):
file_path.write_text(source, encoding=encoding)
elif isinstance(source, bytes):
file_path.write_bytes(source)
else:
# If source is binary, open file in binary mode and ignore encoding param
is_binary = isinstance(source.read(0), bytes)
open_mode = 'wb' if is_binary else 'w'
open_encoding = None if is_binary else encoding
with file_path.open(open_mode, encoding=open_encoding) as f:
shutil.copyfileobj(cast(IOBase, source), cast(IOBase, f))
os.chmod(file_path, permissions)
except FileNotFoundError as e:
raise pebble.PathError(
'not-found', f'parent directory not found: {e.args[0]}')
except NotADirectoryError:
raise pebble.PathError('generic-file-error',
f'open {path}.~: not a directory')
def list_files(self, path: str, *, pattern: Optional[str] = None,
itself: bool = False) -> List[pebble.FileInfo]:
self._check_connection()
self._check_absolute_path(path)
file_path = self._root / path[1:]
if not file_path.exists():
raise pebble.APIError(
body={}, code=404, status='Not Found',
message=f"stat {path}: no such file or directory")
files = [file_path]
if not itself:
try:
files = [file_path / file for file in os.listdir(file_path)]
except NotADirectoryError:
pass
if pattern is not None:
files = [file for file in files if fnmatch.fnmatch(file.name, pattern)]
file_infos = [
Container._build_fileinfo(file)
for file in files
]
for file_info in file_infos:
rel_path = os.path.relpath(file_info.path, start=self._root)
rel_path = '/' if rel_path == '.' else '/' + rel_path
file_info.path = rel_path
if rel_path == "/":
file_info.name = "/"
return file_infos
def make_dir(
self, path: str, *,
make_parents: bool = False,
permissions: Optional[int] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None
) -> None:
self._check_connection()
if permissions is not None and not (0 <= permissions <= 0o777):
raise pebble.PathError(
'generic-file-error',
f'permissions not within 0o000 to 0o777: {permissions:#o}')
self._check_absolute_path(path)
dir_path = self._root / path[1:]
if not dir_path.parent.exists() and not make_parents:
raise pebble.PathError(
'not-found', f'parent directory not found: {path}')
if not dir_path.parent.exists() and make_parents:
self.make_dir(
os.path.dirname(path),
make_parents=True,
permissions=permissions,
user_id=user_id,
user=user,
group_id=group_id,
group=group)
try:
permissions = permissions if permissions else 0o755
dir_path.mkdir()
os.chmod(dir_path, permissions)
except FileExistsError:
if not make_parents:
raise pebble.PathError('generic-file-error', f'mkdir {path}: file exists')
except NotADirectoryError as e:
# Attempted to create a subdirectory of a file
raise pebble.PathError('generic-file-error', f'not a directory: {e.args[0]}')
def remove_path(self, path: str, *, recursive: bool = False):
self._check_connection()
self._check_absolute_path(path)
file_path = self._root / path[1:]
if not file_path.exists():
if recursive:
return
raise pebble.PathError(
'not-found', f'remove {path}: no such file or directory')
if file_path.is_dir():
if recursive:
shutil.rmtree(file_path)
else:
try:
file_path.rmdir()
except OSError:
raise pebble.PathError(
'generic-file-error',
'cannot remove non-empty directory without recursive=True')
else:
file_path.unlink()
def _find_exec_handler(self, command: List[str]) -> Optional[ExecHandler]:
for prefix_len in reversed(range(len(command) + 1)):
command_prefix = tuple(command[:prefix_len])
if command_prefix in self._exec_handlers:
return self._exec_handlers[command_prefix]
return None
def _transform_exec_handler_output(self,
data: Union[str, bytes],
encoding: Optional[str]) -> Union[io.BytesIO, io.StringIO]:
if isinstance(data, bytes):
if encoding is None:
return io.BytesIO(data)
else:
return io.StringIO(data.decode(encoding=encoding))
else:
if encoding is None:
raise ValueError(
f"exec handler must return bytes if encoding is None,"
f"not {data.__class__.__name__}")
else:
return io.StringIO(data)
def exec(
self,
command: List[str],
*,
service_context: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
working_dir: Optional[str] = None,
timeout: Optional[float] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None,
stdin: Optional[Union[str, bytes, TextIO, BinaryIO]] = None,
stdout: Optional[Union[TextIO, BinaryIO]] = None,
stderr: Optional[Union[TextIO, BinaryIO]] = None,
encoding: Optional[str] = 'utf-8',
combine_stderr: bool = False
) -> ExecProcess[Any]:
self._check_connection()
handler = self._find_exec_handler(command)
if handler is None:
message = "execution handler not found, please register one using Harness.handle_exec"
raise pebble.APIError(
body={}, code=500, status='Internal Server Error', message=message
)
environment = {} if environment is None else environment
if service_context is not None:
plan = self.get_plan()
if service_context not in plan.services:
message = f'context service "{service_context}" not found'
body = {'type': 'error', 'status-code': 500, 'status': 'Internal Server Error',
'result': {'message': message}}
raise pebble.APIError(
body=body, code=500, status='Internal Server Error', message=message
)
service = plan.services[service_context]
environment = {**service.environment, **environment}
working_dir = service.working_dir if working_dir is None else working_dir
user = service.user if user is None else user
user_id = service.user_id if user_id is None else user_id
group = service.group if group is None else group
group_id = service.group_id if group_id is None else group_id
if hasattr(stdin, "read"):
stdin = stdin.read() # type: ignore
exec_args = ExecArgs(
command=command,
environment=environment,
working_dir=working_dir,
timeout=timeout,
user_id=user_id,
user=user,
group_id=group_id,
group=group,
stdin=cast(Union[str, bytes, None], stdin),
encoding=encoding,
combine_stderr=combine_stderr
)
proc_stdin = self._transform_exec_handler_output(b"", encoding)
if stdin is not None:
proc_stdin = None
proc_stdout = self._transform_exec_handler_output(b"", encoding)
proc_stderr = self._transform_exec_handler_output(b"", encoding)
try:
result = handler(exec_args)
except TimeoutError:
if timeout is not None:
exec_process = _TestingExecProcess(command=command,
timeout=timeout,
exit_code=None,
stdin=proc_stdin,
stdout=proc_stdout,
stderr=proc_stderr,
is_timeout=True)
return cast(pebble.ExecProcess[Any], exec_process)
else:
raise RuntimeError(
"a TimeoutError occurred in the execution handler, "
"but no timeout value was provided in the execution arguments."
)
if result is None:
exit_code = 0
proc_stdout = self._transform_exec_handler_output(b'', encoding)
proc_stderr = self._transform_exec_handler_output(b'', encoding)
elif isinstance(result, ExecResult):
exit_code = result.exit_code
proc_stdout = self._transform_exec_handler_output(result.stdout, encoding)
proc_stderr = self._transform_exec_handler_output(result.stderr, encoding)
else:
raise TypeError(f"execution handler returned an unexpected type: {type(result)!r}.")
if combine_stderr and proc_stderr.getvalue():
raise ValueError("execution handler returned a non-empty stderr "
"even though combine_stderr is enabled.")
if stdout is not None:
shutil.copyfileobj(cast(io.IOBase, proc_stdout), cast(io.IOBase, stdout))
proc_stdout = None
if stderr is not None:
shutil.copyfileobj(cast(io.IOBase, proc_stderr), cast(io.IOBase, stderr))
proc_stderr = None
exec_process = _TestingExecProcess(command=command,
timeout=timeout,
exit_code=exit_code,
stdin=proc_stdin,
stdout=proc_stdout,
stderr=proc_stderr,
is_timeout=False)
return cast(pebble.ExecProcess[Any], exec_process)
def send_signal(self, sig: Union[int, str], service_names: Iterable[str]):
if not service_names:
raise TypeError('send_signal expected at least 1 service name, got 0')
self._check_connection()
# Convert signal to str
if isinstance(sig, int):
sig = signal.Signals(sig).name
# pebble first validates the service name, and then the signal name
plan = self.get_plan()
for service in service_names:
if service not in plan.services or not self.get_services([service])[0].is_running():
# conform with the real pebble api
message = f'cannot send signal to "{service}": service is not running'
body = {'type': 'error', 'status-code': 500, 'status': 'Internal Server Error',
'result': {'message': message}}
raise pebble.APIError(
body=body, code=500, status='Internal Server Error', message=message
)
# Check if signal name is valid
try:
signal.Signals[sig]
except KeyError:
# conform with the real pebble api
first_service = next(iter(service_names))
message = f'cannot send signal to "{first_service}": invalid signal name "{sig}"'
body = {'type': 'error', 'status-code': 500, 'status': 'Internal Server Error',
'result': {'message': message}}
raise pebble.APIError(
body=body,
code=500,
status='Internal Server Error',
message=message)
def get_checks(self, level=None, names=None): # type:ignore
raise NotImplementedError(self.get_checks) # type:ignore