# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Infrastructure to build unittests for Charms using the Operator Framework.
Global Variables:
SIMULATE_CAN_CONNECT: This enables can_connect simulation for the test
harness. It should be set *before* you create Harness instances and not
changed after. You *should* set this to true - it will help your tests be
more accurate! This causes all containers' can_connect states initially
be False rather than True and causes the testing with the harness to model
and track can_connect state for containers accurately. This means that
calls that require communication with the container API (e.g.
Container.push, Container.get_plan, Container.add_layer, etc.) will only
succeed if Container.can_connect() returns True and will raise exceptions
otherwise. can_connect state evolves automatically to track with events
associated with container state, (e.g. calling container_pebble_ready).
If SIMULATE_CAN_CONNECT is True, can_connect state for containers can also
be manually controlled using Harness.set_can_connect.
"""
import datetime
import fnmatch
import inspect
import os
import pathlib
import random
import signal
import tempfile
import typing
import warnings
from contextlib import contextmanager
from io import BytesIO, StringIO
from textwrap import dedent
from ops import charm, framework, model, pebble, storage
from ops._private import yaml
# Toggles Container.can_connect simulation globally for all harness instances.
# For this to work, it must be set *before* Harness instances are created.
SIMULATE_CAN_CONNECT = False
# OptionalYAML is something like metadata.yaml or actions.yaml. You can
# pass in a file-like object or the string directly.
OptionalYAML = typing.Optional[typing.Union[str, typing.TextIO]]
# An instance of an Application or Unit, or the name of either.
# This is done here to avoid a scoping issue with the `model` property
# of the Harness class below.
AppUnitOrName = typing.Union[str, model.Application, model.Unit]
# CharmType represents user charms that are derived from CharmBase.
CharmType = typing.TypeVar('CharmType', bound=charm.CharmBase)
# noinspection PyProtectedMember
[docs]class Harness(typing.Generic[CharmType]):
"""This class represents a way to build up the model that will drive a test suite.
The model that is created is from the viewpoint of the charm that you are testing.
Example::
harness = Harness(MyCharm)
# Do initial setup here
relation_id = harness.add_relation('db', 'postgresql')
# Now instantiate the charm to see events as the model changes
harness.begin()
harness.add_relation_unit(relation_id, 'postgresql/0')
harness.update_relation_data(relation_id, 'postgresql/0', {'key': 'val'})
# Check that charm has properly handled the relation_joined event for postgresql/0
self.assertEqual(harness.charm. ...)
Args:
charm_cls: The Charm class that you'll be testing.
meta: charm.CharmBase is a A string or file-like object containing the contents of
metadata.yaml. If not supplied, we will look for a 'metadata.yaml' file in the
parent directory of the Charm, and if not found fall back to a trivial
'name: test-charm' metadata.
actions: A string or file-like object containing the contents of
actions.yaml. If not supplied, we will look for a 'actions.yaml' file in the
parent directory of the Charm.
config: A string or file-like object containing the contents of
config.yaml. If not supplied, we will look for a 'config.yaml' file in the
parent directory of the Charm.
"""
def __init__(
self,
charm_cls: typing.Type[CharmType],
*,
meta: OptionalYAML = None,
actions: OptionalYAML = None,
config: OptionalYAML = None):
self._charm_cls = charm_cls
self._charm = None
self._charm_dir = 'no-disk-path' # this may be updated by _create_meta
self._meta = self._create_meta(meta, actions)
self._unit_name = self._meta.name + '/0'
self._framework = None
self._hooks_enabled = True
self._relation_id_counter = 0
self._backend = _TestingModelBackend(self._unit_name, self._meta)
self._model = model.Model(self._meta, self._backend)
self._storage = storage.SQLiteStorage(':memory:')
self._oci_resources = {}
self._framework = framework.Framework(
self._storage, self._charm_dir, self._meta, self._model)
self._defaults = self._load_config_defaults(config)
self._update_config(key_values=self._defaults)
# TODO: If/when we decide to allow breaking changes for a release,
# change SIMULATE_CAN_CONNECT default value to True and remove the
# warning message below. This warning was added 2022-03-22
if not SIMULATE_CAN_CONNECT:
warnings.warn(
'Please set ops.testing.SIMULATE_CAN_CONNECT=True.'
'See https://juju.is/docs/sdk/testing#heading--simulate-can-connect for details.')
[docs] def set_can_connect(self, container: typing.Union[str, model.Container], val: bool):
"""Change the simulated can_connect status of a container's underlying pebble client.
Calling this method raises an exception if SIMULATE_CAN_CONNECT is False.
"""
if isinstance(container, str):
container = self.model.unit.get_container(container)
self._backend._set_can_connect(container._pebble, val)
@property
def charm(self) -> CharmType:
"""Return the instance of the charm class that was passed to __init__.
Note that the Charm is not instantiated until you have called
:meth:`.begin()`.
"""
return self._charm
@property
def model(self) -> model.Model:
"""Return the :class:`~ops.model.Model` that is being driven by this Harness."""
return self._model
@property
def framework(self) -> framework.Framework:
"""Return the Framework that is being driven by this Harness."""
return self._framework
[docs] def begin(self) -> None:
"""Instantiate the Charm and start handling events.
Before calling :meth:`begin`, there is no Charm instance, so changes to the Model won't
emit events. You must call :meth:`.begin` before :attr:`.charm` is valid.
"""
if self._charm is not None:
raise RuntimeError('cannot call the begin method on the harness more than once')
# The Framework adds attributes to class objects for events, etc. As such, we can't re-use
# the original class against multiple Frameworks. So create a locally defined class
# and register it.
# TODO: jam 2020-03-16 We are looking to changes this to Instance attributes instead of
# Class attributes which should clean up this ugliness. The API can stay the same
class TestEvents(self._charm_cls.on.__class__):
pass
TestEvents.__name__ = self._charm_cls.on.__class__.__name__
class TestCharm(self._charm_cls):
on = TestEvents()
# Note: jam 2020-03-01 This is so that errors in testing say MyCharm has no attribute foo,
# rather than TestCharm has no attribute foo.
TestCharm.__name__ = self._charm_cls.__name__
self._charm = TestCharm(self._framework)
[docs] def begin_with_initial_hooks(self) -> None:
"""Called when you want the Harness to fire the same hooks that Juju would fire at startup.
This triggers install, relation-created, config-changed, start, and any relation-joined
hooks based on what relations have been defined+added before you called begin. This does
NOT trigger a pebble-ready hook. Note that all of these are fired before returning control
to the test suite, so if you want to introspect what happens at each step, you need to fire
them directly (e.g. Charm.on.install.emit()). In your hook callback functions, you should
not assume that workload containers are active; guard such code with checks to
Container.can_connect(). You are encouraged to test this by setting the global
SIMULATE_CAN_CONNECT variable to True.
To use this with all the normal hooks, you should instantiate the harness, setup any
relations that you want active when the charm starts, and then call this method. This
method will automatically create and add peer relations that are specified in
metadata.yaml.
Example::
harness = Harness(MyCharm)
# Do initial setup here
# Add storage if needed before begin_with_initial_hooks() is called
storage_ids = harness.add_storage('data', count=1)[0]
storage_id = storage_id[0] # we only added one storage instance
relation_id = harness.add_relation('db', 'postgresql')
harness.add_relation_unit(relation_id, 'postgresql/0')
harness.update_relation_data(relation_id, 'postgresql/0', {'key': 'val'})
harness.set_leader(True)
harness.update_config({'initial': 'config'})
harness.begin_with_initial_hooks()
# This will cause
# install, db-relation-created('postgresql'), leader-elected, config-changed, start
# db-relation-joined('postrgesql/0'), db-relation-changed('postgresql/0')
# To be fired.
"""
self.begin()
# Checking if disks have been added
# storage-attached events happen before install
for storage_name in self._meta.storages:
for storage_index in self._backend.storage_list(storage_name):
storage_name = storage_name.replace('-', '_')
# Storage device(s) detected, emit storage-attached event(s)
self._charm.on[storage_name].storage_attached.emit(
model.Storage(storage_name, storage_index, self._backend))
# Storage done, emit install event
self._charm.on.install.emit()
# Juju itself iterates what relation to fire based on a map[int]relation, so it doesn't
# guarantee a stable ordering between relation events. It *does* give a stable ordering
# of joined units for a given relation.
items = list(self._meta.relations.items())
random.shuffle(items)
this_app_name = self._meta.name
for relname, rel_meta in items:
if rel_meta.role == charm.RelationRole.peer:
# If the user has directly added a relation, leave it be, but otherwise ensure
# that peer relations are always established at before leader-elected.
rel_ids = self._backend._relation_ids_map.get(relname)
if rel_ids is None:
self.add_relation(relname, self._meta.name)
else:
random.shuffle(rel_ids)
for rel_id in rel_ids:
self._emit_relation_created(relname, rel_id, this_app_name)
else:
rel_ids = self._backend._relation_ids_map.get(relname, [])
random.shuffle(rel_ids)
for rel_id in rel_ids:
app_name = self._backend._relation_app_and_units[rel_id]["app"]
self._emit_relation_created(relname, rel_id, app_name)
if self._backend._is_leader:
self._charm.on.leader_elected.emit()
else:
self._charm.on.leader_settings_changed.emit()
self._charm.on.config_changed.emit()
self._charm.on.start.emit()
# If the initial hooks do not set a unit status, the Juju controller will switch
# the unit status from "Maintenance" to "Unknown". See gh#726
post_setup_sts = self._backend.status_get()
if post_setup_sts.get("status") == "maintenance" and not post_setup_sts.get("message"):
self._backend.status_set("unknown", "", is_app=False)
all_ids = list(self._backend._relation_names.items())
random.shuffle(all_ids)
for rel_id, rel_name in all_ids:
rel_app_and_units = self._backend._relation_app_and_units[rel_id]
app_name = rel_app_and_units["app"]
# Note: Juju *does* fire relation events for a given relation in the sorted order of
# the unit names. It also always fires relation-changed immediately after
# relation-joined for the same unit.
# Juju only fires relation-changed (app) if there is data for the related application
relation = self._model.get_relation(rel_name, rel_id)
if self._backend._relation_data[rel_id].get(app_name):
app = self._model.get_app(app_name)
self._charm.on[rel_name].relation_changed.emit(
relation, app, None)
for unit_name in sorted(rel_app_and_units["units"]):
remote_unit = self._model.get_unit(unit_name)
self._charm.on[rel_name].relation_joined.emit(
relation, remote_unit.app, remote_unit)
self._charm.on[rel_name].relation_changed.emit(
relation, remote_unit.app, remote_unit)
[docs] def cleanup(self) -> None:
"""Called by your test infrastructure to cleanup any temporary directories/files/etc.
Currently this only needs to be called if you test with resources. But it is reasonable
to always include a `testcase.addCleanup(harness.cleanup)` just in case.
"""
self._backend._cleanup()
def _create_meta(self, charm_metadata, action_metadata):
"""Create a CharmMeta object.
Handle the cases where a user doesn't supply explicit metadata snippets.
"""
filename = inspect.getfile(self._charm_cls)
charm_dir = pathlib.Path(filename).parents[1]
if charm_metadata is None:
metadata_path = charm_dir / 'metadata.yaml'
if metadata_path.is_file():
charm_metadata = metadata_path.read_text()
self._charm_dir = charm_dir
else:
# The simplest of metadata that the framework can support
charm_metadata = 'name: test-charm'
elif isinstance(charm_metadata, str):
charm_metadata = dedent(charm_metadata)
if action_metadata is None:
actions_path = charm_dir / 'actions.yaml'
if actions_path.is_file():
action_metadata = actions_path.read_text()
self._charm_dir = charm_dir
elif isinstance(action_metadata, str):
action_metadata = dedent(action_metadata)
return charm.CharmMeta.from_yaml(charm_metadata, action_metadata)
def _load_config_defaults(self, charm_config):
"""Load default values from config.yaml.
Handle the case where a user doesn't supply explicit config snippets.
"""
filename = inspect.getfile(self._charm_cls)
charm_dir = pathlib.Path(filename).parents[1]
if charm_config is None:
config_path = charm_dir / 'config.yaml'
if config_path.is_file():
charm_config = config_path.read_text()
self._charm_dir = charm_dir
else:
# The simplest of config that the framework can support
charm_config = '{}'
elif isinstance(charm_config, str):
charm_config = dedent(charm_config)
charm_config = yaml.safe_load(charm_config)
charm_config = charm_config.get('options', {})
return {key: value.get('default', None) for key, value in charm_config.items()}
[docs] def add_oci_resource(self, resource_name: str,
contents: typing.Mapping[str, str] = None) -> None:
"""Add oci resources to the backend.
This will register an oci resource and create a temporary file for processing metadata
about the resource. A default set of values will be used for all the file contents
unless a specific contents dict is provided.
Args:
resource_name: Name of the resource to add custom contents to.
contents: Optional custom dict to write for the named resource.
"""
if not contents:
contents = {'registrypath': 'registrypath',
'username': 'username',
'password': 'password',
}
if resource_name not in self._meta.resources.keys():
raise RuntimeError('Resource {} is not a defined resources'.format(resource_name))
if self._meta.resources[resource_name].type != "oci-image":
raise RuntimeError('Resource {} is not an OCI Image'.format(resource_name))
as_yaml = yaml.safe_dump(contents)
self._backend._resources_map[resource_name] = ('contents.yaml', as_yaml)
[docs] def add_resource(self, resource_name: str, content: typing.AnyStr) -> None:
"""Add content for a resource to the backend.
This will register the content, so that a call to `Model.resources.fetch(resource_name)`
will return a path to a file containing that content.
Args:
resource_name: The name of the resource being added
content: Either string or bytes content, which will be the content of the filename
returned by resource-get. If contents is a string, it will be encoded in utf-8
"""
if resource_name not in self._meta.resources.keys():
raise RuntimeError('Resource {} is not a defined resources'.format(resource_name))
record = self._meta.resources[resource_name]
if record.type != "file":
raise RuntimeError(
'Resource {} is not a file, but actually {}'.format(resource_name, record.type))
filename = record.filename
if filename is None:
filename = resource_name
self._backend._resources_map[resource_name] = (filename, content)
[docs] def populate_oci_resources(self) -> None:
"""Populate all OCI resources."""
for name, data in self._meta.resources.items():
if data.type == "oci-image":
self.add_oci_resource(name)
[docs] def disable_hooks(self) -> None:
"""Stop emitting hook events when the model changes.
This can be used by developers to stop changes to the model from emitting events that
the charm will react to. Call :meth:`.enable_hooks`
to re-enable them.
"""
self._hooks_enabled = False
[docs] def enable_hooks(self) -> None:
"""Re-enable hook events from charm.on when the model is changed.
By default hook events are enabled once you call :meth:`.begin`,
but if you have used :meth:`.disable_hooks`, this can be used to
enable them again.
"""
self._hooks_enabled = True
[docs] @contextmanager
def hooks_disabled(self):
"""A context manager to run code with hooks disabled.
Example::
with harness.hooks_disabled():
# things in here don't fire events
harness.set_leader(True)
harness.update_config(unset=['foo', 'bar'])
# things here will again fire events
"""
if self._hooks_enabled:
self.disable_hooks()
try:
yield None
finally:
self.enable_hooks()
else:
yield None
def _next_relation_id(self):
rel_id = self._relation_id_counter
self._relation_id_counter += 1
return rel_id
[docs] def add_storage(self, storage_name: str, count: int = 1) -> typing.List[str]:
"""Declare a new storage device attached to this unit.
To have repeatable tests, each device will be initialized with
location set to /<storage_name>N, where N is the counter and
will be a number from [0,total_num_disks-1]
Args:
storage_name: The storage backend name on the Charm
count: Number of disks being added
Return:
A list of storage IDs, e.g. ["my-storage/1", "my-storage/2"].
"""
if storage_name not in self._meta.storages:
raise RuntimeError(
"the key '{}' is not specified as a storage key in metadata".format(storage_name))
storage_indices = self._backend.storage_add(storage_name, count)
# Reset associated cached value in the storage mappings. If we don't do this,
# Model._storages won't return Storage objects for subsequently-added storage.
self._model._storages._invalidate(storage_name)
ids = []
for storage_index in storage_indices:
s = model.Storage(storage_name, storage_index, self._backend)
ids.append(s.full_id)
if self.charm is not None and self._hooks_enabled:
self.charm.on[storage_name].storage_attached.emit(s)
return ids
[docs] def detach_storage(self, storage_id: str) -> None:
"""Detach a storage device.
The intent of this function is to simulate a "juju detach-storage" call.
It will trigger a storage-detaching hook if the storage unit in question exists
and is presently marked as attached.
Args:
storage_id: The full storage ID of the storage unit being detached, including the
storage key, e.g. my-storage/0.
"""
if self.charm is None:
raise RuntimeError('cannot detach storage before Harness is initialised')
storage_name, storage_index = storage_id.split('/', 1)
storage_index = int(storage_index)
if self._backend._storage_is_attached(storage_name, storage_index) and self._hooks_enabled:
self.charm.on[storage_name].storage_detaching.emit(
model.Storage(storage_name, storage_index, self._backend))
self._backend._storage_detach(storage_id)
[docs] def attach_storage(self, storage_id: str) -> None:
"""Attach a storage device.
The intent of this function is to simulate a "juju attach-storage" call.
It will trigger a storage-attached hook if the storage unit in question exists
and is presently marked as detached.
Args:
storage_id: The full storage ID of the storage unit being attached, including the
storage key, e.g. my-storage/0.
"""
if self._backend._storage_attach(storage_id) and self._hooks_enabled:
storage_name, storage_index = storage_id.split('/', 1)
storage_index = int(storage_index)
self.charm.on[storage_name].storage_attached.emit(
model.Storage(storage_name, storage_index, self._backend))
[docs] def remove_storage(self, storage_id: str) -> None:
"""Detach a storage device.
The intent of this function is to simulate a "juju remove-storage" call.
It will trigger a storage-detaching hook if the storage unit in question exists
and is presently marked as attached. Then it will remove the storage
unit from the testing backend.
Args:
storage_id: The full storage ID of the storage unit being removed, including the
storage key, e.g. my-storage/0.
"""
storage_name, storage_index = storage_id.split('/', 1)
storage_index = int(storage_index)
if storage_name not in self._meta.storages:
raise RuntimeError(
"the key '{}' is not specified as a storage key in metadata".format(storage_name))
is_attached = self._backend._storage_is_attached(storage_name, storage_index)
if self.charm is not None and self._hooks_enabled and is_attached:
self.charm.on[storage_name].storage_detaching.emit(
model.Storage(storage_name, storage_index, self._backend))
self._backend._storage_remove(storage_id)
[docs] def add_relation(self, relation_name: str, remote_app: str) -> int:
"""Declare that there is a new relation between this app and `remote_app`.
In the case of adding peer relations, `remote_app` is *this* app. This function creates a
relation with an application and will trigger a relation-created hook. To relate units (and
trigger relation-joined and relation-changed hooks), you should also call
:meth:`.add_relation_unit`.
Args:
relation_name: The relation on Charm that is being related to
remote_app: The name of the application that is being related to
Return:
The relation_id created by this add_relation.
"""
rel_id = self._next_relation_id()
self._backend._relation_ids_map.setdefault(relation_name, []).append(rel_id)
self._backend._relation_names[rel_id] = relation_name
self._backend._relation_list_map[rel_id] = []
self._backend._relation_data[rel_id] = {
remote_app: {},
self._backend.unit_name: {},
self._backend.app_name: {},
}
self._backend._relation_app_and_units[rel_id] = {
"app": remote_app,
"units": [],
}
# Reload the relation_ids list
if self._model is not None:
self._model.relations._invalidate(relation_name)
self._emit_relation_created(relation_name, rel_id, remote_app)
return rel_id
[docs] def remove_relation(self, relation_id: int) -> None:
"""Remove a relation.
Args:
relation_id: The relation ID for the relation to be removed.
Raises:
RelationNotFoundError: if relation id is not valid
"""
try:
relation_name = self._backend._relation_names[relation_id]
remote_app = self._backend.relation_remote_app_name(relation_id)
except KeyError as e:
raise model.RelationNotFoundError from e
for unit_name in self._backend._relation_list_map[relation_id].copy():
self.remove_relation_unit(relation_id, unit_name)
self._emit_relation_broken(relation_name, relation_id, remote_app)
if self._model is not None:
self._model.relations._invalidate(relation_name)
self._backend._relation_app_and_units.pop(relation_id)
self._backend._relation_data.pop(relation_id)
self._backend._relation_list_map.pop(relation_id)
self._backend._relation_ids_map[relation_name].remove(relation_id)
self._backend._relation_names.pop(relation_id)
def _emit_relation_created(self, relation_name: str, relation_id: int,
remote_app: str) -> None:
"""Trigger relation-created for a given relation with a given remote application."""
if self._charm is None or not self._hooks_enabled:
return
relation = self._model.get_relation(relation_name, relation_id)
app = self._model.get_app(remote_app)
self._charm.on[relation_name].relation_created.emit(
relation, app)
def _emit_relation_broken(self, relation_name: str, relation_id: int,
remote_app: str) -> None:
"""Trigger relation-broken for a given relation with a given remote application."""
if self._charm is None or not self._hooks_enabled:
return
relation = self._model.get_relation(relation_name, relation_id)
app = self._model.get_app(remote_app)
self._charm.on[relation_name].relation_broken.emit(
relation, app)
[docs] def add_relation_unit(self, relation_id: int, remote_unit_name: str) -> None:
"""Add a new unit to a relation.
Example::
rel_id = harness.add_relation('db', 'postgresql')
harness.add_relation_unit(rel_id, 'postgresql/0')
This will trigger a `relation_joined` event. This would naturally be
followed by a `relation_changed` event, which you can trigger with
:meth:`.update_relation_data`. This separation is artificial in the
sense that Juju will always fire the two, but is intended to make
testing relations and their data bags slightly more natural.
Args:
relation_id: The integer relation identifier (as returned by add_relation).
remote_unit_name: A string representing the remote unit that is being added.
Return:
None
"""
self._backend._relation_list_map[relation_id].append(remote_unit_name)
self._backend._relation_data[relation_id][remote_unit_name] = {}
# TODO: jam 2020-08-03 This is where we could assert that the unit name matches the
# application name (eg you don't have a relation to 'foo' but add units of 'bar/0'
self._backend._relation_app_and_units[relation_id]["units"].append(remote_unit_name)
relation_name = self._backend._relation_names[relation_id]
# Make sure that the Model reloads the relation_list for this relation_id, as well as
# reloading the relation data for this unit.
remote_unit = self._model.get_unit(remote_unit_name)
relation = self._model.get_relation(relation_name, relation_id)
unit_cache = relation.data.get(remote_unit, None)
if unit_cache is not None:
unit_cache._invalidate()
self._model.relations._invalidate(relation_name)
if self._charm is None or not self._hooks_enabled:
return
self._charm.on[relation_name].relation_joined.emit(
relation, remote_unit.app, remote_unit)
[docs] def remove_relation_unit(self, relation_id: int, remote_unit_name: str) -> None:
"""Remove a unit from a relation.
Example::
rel_id = harness.add_relation('db', 'postgresql')
harness.add_relation_unit(rel_id, 'postgresql/0')
...
harness.remove_relation_unit(rel_id, 'postgresql/0')
This will trigger a `relation_departed` event. This would
normally be followed by a `relation_changed` event triggered
by Juju. However when using the test harness a
`relation_changed` event must be triggererd using
:meth:`.update_relation_data`. This deviation from normal Juju
behaviour, facilitates testing by making each step in the
charm life cycle explicit.
Args:
relation_id: The integer relation identifier (as returned by add_relation).
remote_unit_name: A string representing the remote unit that is being added.
Raises:
KeyError: if relation_id or remote_unit_name is not valid
ValueError: if remote_unit_name is not valid
"""
relation_name = self._backend._relation_names[relation_id]
# gather data to invalidate cache later
remote_unit = self._model.get_unit(remote_unit_name)
relation = self._model.get_relation(relation_name, relation_id)
unit_cache = relation.data.get(remote_unit, None)
# remove the unit from the list of units in the relation
relation.units.remove(remote_unit)
self._emit_relation_departed(relation_id, remote_unit_name)
# remove the relation data for the departed unit now that the event has happened
self._backend._relation_list_map[relation_id].remove(remote_unit_name)
self._backend._relation_app_and_units[relation_id]["units"].remove(remote_unit_name)
self._backend._relation_data[relation_id].pop(remote_unit_name)
self.model._relations._invalidate(relation_name=relation.name)
if unit_cache is not None:
unit_cache._invalidate()
def _emit_relation_departed(self, relation_id, unit_name):
"""Trigger relation-departed event for a given relation id and unit."""
if self._charm is None or not self._hooks_enabled:
return
rel_name = self._backend._relation_names[relation_id]
relation = self.model.get_relation(rel_name, relation_id)
if '/' in unit_name:
app_name = unit_name.split('/')[0]
app = self.model.get_app(app_name)
unit = self.model.get_unit(unit_name)
else:
raise ValueError('Invalid Unit Name')
self._charm.on[rel_name].relation_departed.emit(relation, app, unit)
[docs] def get_relation_data(self, relation_id: int, app_or_unit: AppUnitOrName) -> typing.Mapping:
"""Get the relation data bucket for a single app or unit in a given relation.
This ignores all of the safety checks of who can and can't see data in relations (eg,
non-leaders can't read their own application's relation data because there are no events
that keep that data up-to-date for the unit).
Args:
relation_id: The relation whose content we want to look at.
app_or_unit: An Application or Unit instance, or its name, whose data we want to read
Return:
A dict containing the relation data for `app_or_unit` or None.
Raises:
KeyError: if relation_id doesn't exist
"""
if hasattr(app_or_unit, 'name'):
app_or_unit = app_or_unit.name
return self._backend._relation_data[relation_id].get(app_or_unit, None)
[docs] def get_pod_spec(self) -> (typing.Mapping, typing.Mapping):
"""Return the content of the pod spec as last set by the charm.
This returns both the pod spec and any k8s_resources that were supplied.
See the signature of Model.pod.set_spec
"""
return self._backend._pod_spec
[docs] def get_container_pebble_plan(
self, container_name: str
) -> pebble.Plan:
"""Return the current Plan that pebble is executing for the given container.
Args:
container_name: The simple name of the associated container
Return:
The pebble.Plan for this container. You can use :meth:`ops.pebble.Plan.to_yaml` to get
a string form for the content. Will raise KeyError if no pebble client exists
for that container name. (should only happen if container is not present in
metadata.yaml)
"""
client = self._backend._pebble_clients.get(container_name)
if client is None:
raise KeyError('no known pebble client for container "{}"'.format(container_name))
return client.get_plan()
[docs] def container_pebble_ready(self, container_name: str):
"""Fire the pebble_ready hook for the associated container.
This will do nothing if the begin() has not been called. If
SIMULATE_CAN_CONNECT is True, this will switch the given
container's can_connect state to True before the hook
function is called.
"""
if self.charm is None:
return
container = self.model.unit.get_container(container_name)
if SIMULATE_CAN_CONNECT:
self.set_can_connect(container, True)
self.charm.on[container_name].pebble_ready.emit(container)
[docs] def get_workload_version(self) -> str:
"""Read the workload version that was set by the unit."""
return self._backend._workload_version
[docs] def set_model_info(self, name: str = None, uuid: str = None) -> None:
"""Set the name and uuid of the Model that this is representing.
This cannot be called once begin() has been called. But it lets you set the value that
will be returned by Model.name and Model.uuid.
This is a convenience method to invoke both Harness.set_model_name
and Harness.set_model_uuid at once.
"""
self.set_model_name(name)
self.set_model_uuid(uuid)
[docs] def set_model_name(self, name: str) -> None:
"""Set the name of the Model that this is representing.
This cannot be called once begin() has been called. But it lets you set the value that
will be returned by Model.name.
"""
if self._charm is not None:
raise RuntimeError('cannot set the Model name after begin()')
self._backend.model_name = name
[docs] def set_model_uuid(self, uuid: str) -> None:
"""Set the uuid of the Model that this is representing.
This cannot be called once begin() has been called. But it lets you set the value that
will be returned by Model.uuid.
"""
if self._charm is not None:
raise RuntimeError('cannot set the Model uuid after begin()')
self._backend.model_uuid = uuid
[docs] def update_relation_data(
self,
relation_id: int,
app_or_unit: str,
key_values: typing.Mapping,
) -> None:
"""Update the relation data for a given unit or application in a given relation.
This also triggers the `relation_changed` event for this relation_id.
Args:
relation_id: The integer relation_id representing this relation.
app_or_unit: The unit or application name that is being updated.
This can be the local or remote application.
key_values: Each key/value will be updated in the relation data.
"""
relation_name = self._backend._relation_names[relation_id]
relation = self._model.get_relation(relation_name, relation_id)
if '/' in app_or_unit:
entity = self._model.get_unit(app_or_unit)
else:
entity = self._model.get_app(app_or_unit)
rel_data = relation.data.get(entity, None)
if rel_data is not None:
# rel_data may have cached now-stale data, so _invalidate() it.
# Note, this won't cause the data to be loaded if it wasn't already.
rel_data._invalidate()
new_values = self._backend._relation_data[relation_id][app_or_unit].copy()
values_have_changed = False
for k, v in key_values.items():
if v == '':
if new_values.pop(k, None) != v:
values_have_changed = True
else:
if k not in new_values or new_values[k] != v:
new_values[k] = v
values_have_changed = True
# Update the relation data in any case to avoid spurious references
# by an test to an updated value to be invalidated by a lack of assignment
self._backend._relation_data[relation_id][app_or_unit] = new_values
if not values_have_changed:
# Do not issue a relation changed event if the data bags have not changed
return
if app_or_unit == self._model.unit.name:
# No events for our own unit
return
if app_or_unit == self._model.app.name:
# updating our own app only generates an event if it is a peer relation and we
# aren't the leader
is_peer = self._meta.relations[relation_name].role.is_peer()
if not is_peer:
return
if self._model.unit.is_leader():
return
self._emit_relation_changed(relation_id, app_or_unit)
def _emit_relation_changed(self, relation_id, app_or_unit):
if self._charm is None or not self._hooks_enabled:
return
rel_name = self._backend._relation_names[relation_id]
relation = self.model.get_relation(rel_name, relation_id)
if '/' in app_or_unit:
app_name = app_or_unit.split('/')[0]
unit_name = app_or_unit
app = self.model.get_app(app_name)
unit = self.model.get_unit(unit_name)
args = (relation, app, unit)
else:
app_name = app_or_unit
app = self.model.get_app(app_name)
args = (relation, app)
self._charm.on[rel_name].relation_changed.emit(*args)
def _update_config(
self,
key_values: typing.Mapping[str, str] = None,
unset: typing.Iterable[str] = (),
) -> None:
"""Update the config as seen by the charm.
This will *not* trigger a `config_changed` event, and is intended for internal use.
Note that the `key_values` mapping will only add or update configuration items.
To remove existing ones, see the `unset` parameter.
Args:
key_values: A Mapping of key:value pairs to update in config.
unset: An iterable of keys to remove from config.
"""
# NOTE: jam 2020-03-01 Note that this sort of works "by accident". Config
# is a LazyMapping, but its _load returns a dict and this method mutates
# the dict that Config is caching. Arguably we should be doing some sort
# of charm.framework.model.config._invalidate()
config = self._backend._config
if key_values is not None:
for key, value in key_values.items():
if key in self._defaults:
if value is not None:
config[key] = value
else:
raise ValueError("unknown config option: '{}'".format(key))
for key in unset:
# When the key is unset, revert to the default if one exists
default = self._defaults.get(key, None)
if default is not None:
config[key] = default
else:
config.pop(key, None)
[docs] def update_config(
self,
key_values: typing.Mapping[str, str] = None,
unset: typing.Iterable[str] = (),
) -> None:
"""Update the config as seen by the charm.
This will trigger a `config_changed` event.
Note that the `key_values` mapping will only add or update configuration items.
To remove existing ones, see the `unset` parameter.
Args:
key_values: A Mapping of key:value pairs to update in config.
unset: An iterable of keys to remove from Config. (Note that this does
not currently reset the config values to the default defined in config.yaml.)
"""
self._update_config(key_values, unset)
if self._charm is None or not self._hooks_enabled:
return
self._charm.on.config_changed.emit()
[docs] def set_leader(self, is_leader: bool = True) -> None:
"""Set whether this unit is the leader or not.
If this charm becomes a leader then `leader_elected` will be triggered. If Harness.begin()
has already been called, then the charm's peer relation should usually be added *prior* to
calling this method (i.e. with Harness.add_relation) to properly initialize and make
available relation data that leader elected hooks may want to access.
Args:
is_leader: True/False as to whether this unit is the leader.
"""
self._backend._is_leader = is_leader
# Note: jam 2020-03-01 currently is_leader is cached at the ModelBackend level, not in
# the Model objects, so this automatically gets noticed.
if is_leader and self._charm is not None and self._hooks_enabled:
self._charm.on.leader_elected.emit()
[docs] def set_planned_units(self, num_units: int) -> None:
"""Set the number of "planned" units that "Application.planned_units" should return.
In real world circumstances, this number will be the number of units in the
application. E.g., this number will be the number of peers this unit has, plus one, as we
count our own unit in the total.
A change to the return from planned_units will not generate an event. Typically, a charm
author would check planned units during a config or install hook, or after receiving a peer
relation joined event.
"""
if num_units < 0:
raise TypeError("num_units must be 0 or a positive integer.")
self._backend._planned_units = num_units
[docs] def reset_planned_units(self):
"""Reset the planned units override.
This allows the harness to fall through to the built in methods that will try to
guess at a value for planned units, based on the number of peer relations that
have been setup in the testing harness.
"""
self._backend._planned_units = None
def _get_backend_calls(self, reset: bool = True) -> list:
"""Return the calls that we have made to the TestingModelBackend.
This is useful mostly for testing the framework itself, so that we can assert that we
do/don't trigger extra calls.
Args:
reset: If True, reset the calls list back to empty, if false, the call list is
preserved.
Return:
``[(call1, args...), (call2, args...)]``
"""
calls = self._backend._calls.copy()
if reset:
self._backend._calls.clear()
return calls
def _record_calls(cls):
"""Replace methods on cls with methods that record that they have been called.
Iterate all attributes of cls, and for public methods, replace them with a wrapped method
that records the method called along with the arguments and keyword arguments.
"""
for meth_name, orig_method in cls.__dict__.items():
if meth_name.startswith('_'):
continue
def decorator(orig_method):
def wrapped(self, *args, **kwargs):
full_args = (orig_method.__name__,) + args
if kwargs:
full_args = full_args + (kwargs,)
self._calls.append(full_args)
return orig_method(self, *args, **kwargs)
return wrapped
setattr(cls, meth_name, decorator(orig_method))
return cls
def _copy_docstrings(source_cls):
"""Copy the docstrings from source_cls to target_cls.
Use this as:
@_copy_docstrings(source_class)
class TargetClass:
And for any public method that exists on both classes, it will copy the
__doc__ for that method.
"""
def decorator(target_cls):
for meth_name, orig_method in target_cls.__dict__.items():
if meth_name.startswith('_'):
continue
source_method = source_cls.__dict__.get(meth_name)
if source_method is not None and source_method.__doc__:
target_cls.__dict__[meth_name].__doc__ = source_method.__doc__
return target_cls
return decorator
class _ResourceEntry:
"""Tracks the contents of a Resource."""
def __init__(self, resource_name):
self.name = resource_name
@_copy_docstrings(model._ModelBackend)
@_record_calls
class _TestingModelBackend:
"""This conforms to the interface for ModelBackend but provides canned data.
DO NOT use this class directly, it is used by `Harness`_ to drive the model.
`Harness`_ is responsible for maintaining the internal consistency of the values here,
as the only public methods of this type are for implementing ModelBackend.
"""
def __init__(self, unit_name, meta):
self.unit_name = unit_name
self.app_name = self.unit_name.split('/')[0]
self.model_name = None
self.model_uuid = 'f2c1b2a6-e006-11eb-ba80-0242ac130004'
self._harness_tmp_dir = tempfile.TemporaryDirectory(prefix='ops-harness-')
self._calls = []
self._meta = meta
self._relation_ids_map = {} # relation name to [relation_ids,...]
self._relation_names = {} # reverse map from relation_id to relation_name
self._relation_list_map = {} # relation_id: [unit_name,...]
self._relation_data = {} # {relation_id: {name: data}}
# {relation_id: {"app": app_name, "units": ["app/0",...]}
self._relation_app_and_units = {}
self._config = {}
self._is_leader = False
self._resources_map = {} # {resource_name: resource_content}
self._pod_spec = None
self._app_status = {'status': 'unknown', 'message': ''}
self._unit_status = {'status': 'maintenance', 'message': ''}
self._workload_version = None
self._resource_dir = None
# Format:
# { "storage_name": {"<ID1>": { <other-properties> }, ... }
# <ID1>: device id that is key for given storage_name
# Initialize the _storage_list with values present on metadata.yaml
self._storage_list = {k: {} for k in self._meta.storages}
self._storage_detached = {k: set() for k in self._meta.storages}
self._storage_index_counter = 0
# {container_name : _TestingPebbleClient}
self._pebble_clients = {} # type: {str: _TestingPebbleClient}
self._pebble_clients_can_connect = {} # type: {_TestingPebbleClient: bool}
self._planned_units = None
self._hook_is_running = ''
def _validate_relation_access(self, relation_name, relations):
"""Ensures that the named relation exists/has been added.
This is called whenever relation data is accessed via model.get_relation(...).
"""
if len(relations) > 0:
return
relations = list(self._meta.peers.keys())
relations.extend(self._meta.requires.keys())
relations.extend(self._meta.provides.keys())
if self._hook_is_running == 'leader_elected' and relation_name in relations:
raise RuntimeError(
'cannot access relation data without first adding the relation: '
'use Harness.add_relation({!r}, <app>) before calling set_leader'
.format(relation_name))
def _can_connect(self, pebble_client) -> bool:
"""Returns whether the mock client is active and can support API calls with no errors."""
return self._pebble_clients_can_connect[pebble_client]
def _set_can_connect(self, pebble_client, val):
"""Manually sets the can_connect state for the given mock client."""
if not SIMULATE_CAN_CONNECT:
raise RuntimeError('must set SIMULATE_CAN_CONNECT=True before using set_can_connect')
if pebble_client not in self._pebble_clients_can_connect:
msg = 'cannot set can_connect for the client - are you running a "real" pebble test?'
raise RuntimeError(msg)
self._pebble_clients_can_connect[pebble_client] = val
def _cleanup(self):
if self._resource_dir is not None:
self._resource_dir.cleanup()
self._resource_dir = None
def _get_resource_dir(self) -> pathlib.Path:
if self._resource_dir is None:
# In actual Juju, the resource path for a charm's resource is
# $AGENT_DIR/resources/$RESOURCE_NAME/$RESOURCE_FILENAME
# However, charms shouldn't depend on this.
self._resource_dir = tempfile.TemporaryDirectory(prefix='tmp-ops-test-resource-')
return pathlib.Path(self._resource_dir.name)
def relation_ids(self, relation_name):
try:
return self._relation_ids_map[relation_name]
except KeyError as e:
if relation_name not in self._meta.relations:
raise model.ModelError('{} is not a known relation'.format(relation_name)) from e
return []
def relation_list(self, relation_id):
try:
return self._relation_list_map[relation_id]
except KeyError as e:
raise model.RelationNotFoundError from e
def relation_remote_app_name(self, relation_id: int) -> typing.Optional[str]:
if relation_id not in self._relation_app_and_units:
# Non-existent or dead relation
return None
return self._relation_app_and_units[relation_id]['app']
def relation_get(self, relation_id, member_name, is_app):
if is_app and '/' in member_name:
member_name = member_name.split('/')[0]
if relation_id not in self._relation_data:
raise model.RelationNotFoundError()
return self._relation_data[relation_id][member_name].copy()
def relation_set(self, relation_id, key, value, is_app):
relation = self._relation_data[relation_id]
if is_app:
bucket_key = self.app_name
else:
bucket_key = self.unit_name
if bucket_key not in relation:
relation[bucket_key] = {}
bucket = relation[bucket_key]
if value == '':
bucket.pop(key, None)
else:
bucket[key] = value
def config_get(self):
return self._config
def is_leader(self):
return self._is_leader
def application_version_set(self, version):
self._workload_version = version
def resource_get(self, resource_name):
if resource_name not in self._resources_map:
raise model.ModelError(
"ERROR could not download resource: HTTP request failed: "
"Get https://.../units/unit-{}/resources/{}: resource#{}/{} not found".format(
self.unit_name.replace('/', '-'), resource_name, self.app_name, resource_name
))
filename, contents = self._resources_map[resource_name]
resource_dir = self._get_resource_dir()
resource_filename = resource_dir / resource_name / filename
if not resource_filename.exists():
if isinstance(contents, bytes):
mode = 'wb'
else:
mode = 'wt'
resource_filename.parent.mkdir(exist_ok=True)
with resource_filename.open(mode=mode) as resource_file:
resource_file.write(contents)
return resource_filename
def pod_spec_set(self, spec, k8s_resources):
self._pod_spec = (spec, k8s_resources)
def status_get(self, *, is_app=False):
if is_app:
return self._app_status
else:
return self._unit_status
def status_set(self, status, message='', *, is_app=False):
if is_app:
self._app_status = {'status': status, 'message': message}
else:
self._unit_status = {'status': status, 'message': message}
def storage_list(self, name):
return list(index for index in self._storage_list[name]
if self._storage_is_attached(name, index))
def storage_get(self, storage_name_id, attribute):
name, index = storage_name_id.split("/", 1)
index = int(index)
try:
if index in self._storage_detached[name]:
raise KeyError() # Pretend the key isn't there
else:
return self._storage_list[name][index][attribute]
except KeyError:
raise model.ModelError(
'ERROR invalid value "{}/{}" for option -s: storage not found'.format(name, index))
def storage_add(self, name: str, count: int = 1) -> typing.List[int]:
if '/' in name:
raise model.ModelError('storage name cannot contain "/"')
if name not in self._storage_list:
self._storage_list[name] = {}
result = []
for i in range(count):
index = self._storage_index_counter
self._storage_index_counter += 1
self._storage_list[name][index] = {
'location': os.path.join(self._harness_tmp_dir.name, name, str(index)),
}
result.append(index)
return result
def _storage_detach(self, storage_id: str):
# NOTE: This is an extra function for _TestingModelBackend to simulate
# detachment of a storage unit. This is not present in ops.model._ModelBackend.
name, index = storage_id.split('/', 1)
index = int(index)
for client in self._pebble_clients.values():
client._fs.remove_mount(name)
if self._storage_is_attached(name, index):
self._storage_detached[name].add(index)
def _storage_attach(self, storage_id: str):
# NOTE: This is an extra function for _TestingModelBackend to simulate
# re-attachment of a storage unit. This is not present in
# ops.model._ModelBackend.
name, index = storage_id.split('/', 1)
for container, client in self._pebble_clients.items():
for _, mount in self._meta.containers[container].mounts.items():
if mount.storage != name:
continue
for index, store in self._storage_list[mount.storage].items():
client._fs.add_mount(mount.storage, mount.location, store['location'])
index = int(index)
if not self._storage_is_attached(name, index):
self._storage_detached[name].remove(index)
return True
return False
def _storage_is_attached(self, storage_name, storage_index):
return storage_index not in self._storage_detached[storage_name]
def _storage_remove(self, storage_id: str):
# NOTE: This is an extra function for _TestingModelBackend to simulate
# full removal of a storage unit. This is not present in
# ops.model._ModelBackend.
self._storage_detach(storage_id)
name, index = storage_id.split('/', 1)
index = int(index)
self._storage_list[name].pop(index, None)
def action_get(self):
raise NotImplementedError(self.action_get)
def action_set(self, results):
raise NotImplementedError(self.action_set)
def action_log(self, message):
raise NotImplementedError(self.action_log)
def action_fail(self, message=''):
raise NotImplementedError(self.action_fail)
def network_get(self, endpoint_name, relation_id=None):
raise NotImplementedError(self.network_get)
def add_metrics(self, metrics, labels=None):
raise NotImplementedError(self.add_metrics)
@classmethod
def log_split(cls, message, max_len):
raise NotImplementedError(cls.log_split)
def juju_log(self, level, msg):
raise NotImplementedError(self.juju_log)
def get_pebble(self, socket_path: str) -> 'pebble.Client':
container = socket_path.split('/')[3] # /charm/containers/<container_name>/pebble.socket
client = self._pebble_clients.get(container, None)
if client is None:
client = _TestingPebbleClient(self)
self._pebble_clients[container] = client
# we need to know which container a new pebble client belongs to
# so we can figure out which storage mounts must be simulated on
# this pebble client's mock file systems when storage is
# attached/detached later.
self._pebble_clients[container] = client
self._pebble_clients_can_connect[client] = not SIMULATE_CAN_CONNECT
return client
def planned_units(self):
"""Simulate fetching the number of planned application units from the model.
If self._planned_units is None, then we simulate what the Juju controller will do, which is
to report the number of peers, plus one (we include this unit in the count). This can be
overridden for testing purposes: a charm author can set the number of planned units
explicitly by calling `Harness.set_planned_units`
"""
if self._planned_units is not None:
return self._planned_units
units = []
peer_names = set(self._meta.peers.keys())
for peer_id, peer_name in self._relation_names.items():
if peer_name not in peer_names:
continue
peer_units = self._relation_list_map[peer_id]
units += peer_units
count = len(set(units)) # de-dupe and get length.
return count + 1 # Account for this unit.
@_copy_docstrings(pebble.Client)
class _TestingPebbleClient:
"""This conforms to the interface for pebble.Client but provides canned data.
DO NOT use this class directly, it is used by `Harness`_ to run interactions with Pebble.
`Harness`_ is responsible for maintaining the internal consistency of the values here,
as the only public methods of this type are for implementing Client.
"""
def __init__(self, backend: _TestingModelBackend):
self._backend = _TestingModelBackend
self._layers = {}
# Has a service been started/stopped?
self._service_status = {}
self._fs = _TestingFilesystem()
self._backend = backend
def _check_connection(self):
if not self._backend._can_connect(self):
raise pebble.ConnectionError('cannot connect to pebble')
def get_system_info(self) -> pebble.SystemInfo:
self._check_connection()
return pebble.SystemInfo(version='1.0.0')
def get_warnings(
self, select: pebble.WarningState = pebble.WarningState.PENDING,
) -> typing.List['pebble.Warning']:
raise NotImplementedError(self.get_warnings)
def ack_warnings(self, timestamp: datetime.datetime) -> int:
raise NotImplementedError(self.ack_warnings)
def get_changes(
self, select: pebble.ChangeState = pebble.ChangeState.IN_PROGRESS, service: str = None,
) -> typing.List[pebble.Change]:
raise NotImplementedError(self.get_changes)
def get_change(self, change_id: pebble.ChangeID) -> pebble.Change:
raise NotImplementedError(self.get_change)
def abort_change(self, change_id: pebble.ChangeID) -> pebble.Change:
raise NotImplementedError(self.abort_change)
def autostart_services(self, timeout: float = 30.0, delay: float = 0.1) -> pebble.ChangeID:
self._check_connection()
for name, service in self._render_services().items():
# TODO: jam 2021-04-20 This feels awkward that Service.startup might be a string or
# might be an enum. Probably should make Service.startup a property rather than an
# attribute.
if service.startup == '':
startup = pebble.ServiceStartup.DISABLED
else:
startup = pebble.ServiceStartup(service.startup)
if startup == pebble.ServiceStartup.ENABLED:
self._service_status[name] = pebble.ServiceStatus.ACTIVE
def replan_services(self, timeout: float = 30.0, delay: float = 0.1) -> pebble.ChangeID:
return self.autostart_services(timeout, delay)
def start_services(
self, services: typing.List[str], timeout: float = 30.0, delay: float = 0.1,
) -> pebble.ChangeID:
# A common mistake is to pass just the name of a service, rather than a list of services,
# so trap that so it is caught quickly.
if isinstance(services, str):
raise TypeError('start_services should take a list of names, not just "{}"'.format(
services))
self._check_connection()
# Note: jam 2021-04-20 We don't implement ChangeID, but the default caller of this is
# Container.start() which currently ignores the return value
known_services = self._render_services()
# Names appear to be validated before any are activated, so do two passes
for name in services:
if name not in known_services:
# TODO: jam 2021-04-20 This needs a better error type
raise RuntimeError('400 Bad Request: service "{}" does not exist'.format(name))
current = self._service_status.get(name, pebble.ServiceStatus.INACTIVE)
if current == pebble.ServiceStatus.ACTIVE:
# TODO: jam 2021-04-20 I believe pebble actually validates all the service names
# can be started before starting any, and gives a list of things that couldn't
# be done, but this is good enough for now
raise pebble.ChangeError('''\
cannot perform the following tasks:
- Start service "{}" (service "{}" was previously started)
'''.format(name, name), change=1234) # the change id is not respected
for name in services:
# If you try to start a service which is started, you get a ChangeError:
# $ PYTHONPATH=. python3 ./test/pebble_cli.py start serv
# ChangeError: cannot perform the following tasks:
# - Start service "serv" (service "serv" was previously started)
self._service_status[name] = pebble.ServiceStatus.ACTIVE
def stop_services(
self, services: typing.List[str], timeout: float = 30.0, delay: float = 0.1,
) -> pebble.ChangeID:
# handle a common mistake of passing just a name rather than a list of names
if isinstance(services, str):
raise TypeError('stop_services should take a list of names, not just "{}"'.format(
services))
self._check_connection()
# TODO: handle invalid names
# Note: jam 2021-04-20 We don't implement ChangeID, but the default caller of this is
# Container.stop() which currently ignores the return value
known_services = self._render_services()
for name in services:
if name not in known_services:
# TODO: jam 2021-04-20 This needs a better error type
# 400 Bad Request: service "bal" does not exist
raise RuntimeError('400 Bad Request: service "{}" does not exist'.format(name))
current = self._service_status.get(name, pebble.ServiceStatus.INACTIVE)
if current != pebble.ServiceStatus.ACTIVE:
# TODO: jam 2021-04-20 I believe pebble actually validates all the service names
# can be started before starting any, and gives a list of things that couldn't
# be done, but this is good enough for now
raise pebble.ChangeError('''\
ChangeError: cannot perform the following tasks:
- Stop service "{}" (service "{}" is not active)
'''.format(name, name), change=1234) # the change id is not respected
for name in services:
self._service_status[name] = pebble.ServiceStatus.INACTIVE
def restart_services(
self, services: typing.List[str], timeout: float = 30.0, delay: float = 0.1,
) -> pebble.ChangeID:
# handle a common mistake of passing just a name rather than a list of names
if isinstance(services, str):
raise TypeError('restart_services should take a list of names, not just "{}"'.format(
services))
self._check_connection()
# TODO: handle invalid names
# Note: jam 2021-04-20 We don't implement ChangeID, but the default caller of this is
# Container.restart() which currently ignores the return value
known_services = self._render_services()
for name in services:
if name not in known_services:
# TODO: jam 2021-04-20 This needs a better error type
# 400 Bad Request: service "bal" does not exist
raise RuntimeError('400 Bad Request: service "{}" does not exist'.format(name))
for name in services:
self._service_status[name] = pebble.ServiceStatus.ACTIVE
def wait_change(
self, change_id: pebble.ChangeID, timeout: float = 30.0, delay: float = 0.1,
) -> pebble.Change:
raise NotImplementedError(self.wait_change)
def add_layer(
self, label: str, layer: typing.Union[str, dict, pebble.Layer], *,
combine: bool = False):
# I wish we could combine some of this helpful object corralling with the actual backend,
# rather than having to re-implement it. Maybe we could subclass
if not isinstance(label, str):
raise TypeError('label must be a str, not {}'.format(type(label).__name__))
if isinstance(layer, (str, dict)):
layer_obj = pebble.Layer(layer)
elif isinstance(layer, pebble.Layer):
layer_obj = layer
else:
raise TypeError('layer must be str, dict, or pebble.Layer, not {}'.format(
type(layer).__name__))
self._check_connection()
if label in self._layers:
# TODO: jam 2021-04-19 These should not be RuntimeErrors but should be proper error
# types. https://github.com/canonical/operator/issues/514
if not combine:
raise RuntimeError('400 Bad Request: layer "{}" already exists'.format(label))
layer = self._layers[label]
for name, service in layer_obj.services.items():
# 'override' is actually single quoted in the real error, but
# it shouldn't be, hopefully that gets cleaned up.
if not service.override:
raise RuntimeError('500 Internal Server Error: layer "{}" must define'
'"override" for service "{}"'.format(label, name))
if service.override not in ('merge', 'replace'):
raise RuntimeError('500 Internal Server Error: layer "{}" has invalid '
'"override" value on service "{}"'.format(label, name))
elif service.override == 'replace':
layer.services[name] = service
elif service.override == 'merge':
if combine and name in layer.services:
s = layer.services[name]
s._merge(service)
else:
layer.services[name] = service
else:
self._layers[label] = layer_obj
def _render_services(self) -> typing.Mapping[str, pebble.Service]:
services = {}
for key in sorted(self._layers.keys()):
layer = self._layers[key]
for name, service in layer.services.items():
# TODO: (jam) 2021-04-07 have a way to merge existing services
services[name] = service
return services
def get_plan(self) -> pebble.Plan:
self._check_connection()
plan = pebble.Plan('{}')
services = self._render_services()
if not services:
return plan
for name in sorted(services.keys()):
plan.services[name] = services[name]
return plan
def get_services(self, names: typing.List[str] = None) -> typing.List[pebble.ServiceInfo]:
if isinstance(names, str):
raise TypeError('start_services should take a list of names, not just "{}"'.format(
names))
self._check_connection()
services = self._render_services()
infos = []
if names is None:
names = sorted(services.keys())
for name in sorted(names):
try:
service = services[name]
except KeyError:
# in pebble, it just returns "nothing matched" if there are 0 matches,
# but it ignores services it doesn't recognize
continue
status = self._service_status.get(name, pebble.ServiceStatus.INACTIVE)
if service.startup == '':
startup = pebble.ServiceStartup.DISABLED
else:
startup = pebble.ServiceStartup(service.startup)
info = pebble.ServiceInfo(name,
startup=startup,
current=pebble.ServiceStatus(status))
infos.append(info)
return infos
def pull(self, path: str, *, encoding: str = 'utf-8') -> typing.Union[typing.BinaryIO,
typing.TextIO]:
self._check_connection()
return self._fs.open(path, encoding=encoding)
def push(
self, path: str, source: typing.Union[bytes, str, typing.BinaryIO, typing.TextIO], *,
encoding: str = 'utf-8', make_dirs: bool = False, permissions: int = None,
user_id: int = None, user: str = None, group_id: int = None, group: str = None):
self._check_connection()
if permissions is not None and not (0 <= permissions <= 0o777):
raise pebble.PathError(
'generic-file-error',
'permissions not within 0o000 to 0o777: {:#o}'.format(permissions))
try:
self._fs.create_file(
path, source, encoding=encoding, make_dirs=make_dirs, permissions=permissions,
user_id=user_id, user=user, group_id=group_id, group=group)
except FileNotFoundError as e:
raise pebble.PathError(
'not-found', 'parent directory not found: {}'.format(e.args[0]))
except NonAbsolutePathError as e:
raise pebble.PathError(
'generic-file-error',
'paths must be absolute, got {!r}'.format(e.args[0])
)
def list_files(self, path: str, *, pattern: str = None,
itself: bool = False) -> typing.List[pebble.FileInfo]:
self._check_connection()
try:
files = [self._fs.get_path(path)]
except FileNotFoundError:
# conform with the real pebble api
raise pebble.APIError(
body={}, code=404, status='Not Found',
message="stat {}: no such file or directory".format(path))
if not itself:
try:
files = self._fs.list_dir(path)
except NotADirectoryError:
pass
if pattern is not None:
files = [file for file in files if fnmatch.fnmatch(file.name, pattern)]
type_mappings = {
_File: pebble.FileType.FILE,
_Directory: pebble.FileType.DIRECTORY,
}
return [
pebble.FileInfo(
path=str(file.path),
name=file.name,
type=type_mappings.get(type(file)),
size=file.size if isinstance(file, _File) else None,
permissions=file.kwargs.get('permissions'),
last_modified=file.last_modified,
user_id=file.kwargs.get('user_id'),
user=file.kwargs.get('user'),
group_id=file.kwargs.get('group_id'),
group=file.kwargs.get('group'),
)
for file in files
]
def make_dir(
self, path: str, *, make_parents: bool = False, permissions: int = None,
user_id: int = None, user: str = None, group_id: int = None, group: str = None):
self._check_connection()
if permissions is not None and not (0 <= permissions <= 0o777):
raise pebble.PathError(
'generic-file-error',
'permissions not within 0o000 to 0o777: {:#o}'.format(permissions))
try:
self._fs.create_dir(
path, make_parents=make_parents, permissions=permissions,
user_id=user_id, user=user, group_id=group_id, group=group)
except FileNotFoundError as e:
# Parent directory doesn't exist and make_parents is False
raise pebble.PathError(
'not-found', 'parent directory not found: {}'.format(e.args[0]))
except NotADirectoryError as e:
# Attempted to create a subdirectory of a file
raise pebble.PathError('generic-file-error', 'not a directory: {}'.format(e.args[0]))
except NonAbsolutePathError as e:
raise pebble.PathError(
'generic-file-error',
'paths must be absolute, got {!r}'.format(e.args[0])
)
def remove_path(self, path: str, *, recursive: bool = False):
self._check_connection()
try:
file_or_dir = self._fs.get_path(path)
except FileNotFoundError:
if recursive:
# Pebble doesn't give not-found error when recursive is specified
return
raise pebble.PathError(
'not-found', 'remove {}: no such file or directory'.format(path))
if isinstance(file_or_dir, _Directory) and len(file_or_dir) > 0 and not recursive:
raise pebble.PathError(
'generic-file-error', 'cannot remove non-empty directory without recursive=True')
self._fs.delete_path(path)
def exec(self, command, **kwargs):
raise NotImplementedError(self.exec)
def send_signal(self, sig: typing.Union[int, str], *service_names: str):
if not service_names:
raise TypeError('send_signal expected at least 1 service name, got 0')
self._check_connection()
# Convert signal to str
if isinstance(sig, int):
sig = signal.Signals(sig).name
# pebble first validates the service name, and then the signal name
plan = self.get_plan()
for service in service_names:
if service not in plan.services or not self.get_services([service])[0].is_running():
# conform with the real pebble api
message = 'cannot send signal to "{}": service is not running'.format(service)
body = {'type': 'error', 'status-code': 500, 'status': 'Internal Server Error',
'result': {'message': message}}
raise pebble.APIError(
body=body, code=500, status='Internal Server Error', message=message
)
# Check if signal name is valid
try:
signal.Signals[sig]
except KeyError:
# conform with the real pebble api
message = 'cannot send signal to "{}": invalid signal name "{}"'.format(
service_names[0],
sig)
body = {'type': 'error', 'status-code': 500, 'status': 'Internal Server Error',
'result': {'message': message}}
raise pebble.APIError(
body=body,
code=500,
status='Internal Server Error',
message=message)
def get_checks(self, level=None, names=None):
raise NotImplementedError(self.get_checks)
[docs]class NonAbsolutePathError(Exception):
"""Error raised by _TestingFilesystem.
This error is raised when an absolute path is required but the code instead encountered a
relative path.
"""
class _TestingStorageMount:
"""Simulates a filesystem backend for storage mounts."""
def __init__(self, location: pathlib.PurePosixPath, src: pathlib.Path):
"""Creates a new simulated storage mount.
Args:
location: The path within simulated filesystem at which this storage will be mounted.
src: The temporary on-disk location where the simulated storage will live.
"""
self._src = src
self._location = location
src.mkdir(exist_ok=True, parents=True)
def contains(self, path: typing.Union[str, pathlib.PurePosixPath]) -> bool:
"""Returns true whether path resides within this simulated storage mount's location."""
try:
pathlib.PurePosixPath(path).relative_to(self._location)
return True
except Exception:
return False
def check_contains(self, path: typing.Union[str,
pathlib.PurePosixPath]) -> pathlib.PurePosixPath:
"""Raises if path does not reside within this simulated storage mount's location."""
if not self.contains(path):
msg = 'the provided path "{!s}" does not reside within the mount location "{!s}"' \
.format(path, self._location)
raise RuntimeError(msg)
return pathlib.PurePosixPath(path)
def _srcpath(self, path: pathlib.PurePosixPath) -> pathlib.Path:
"""Returns the disk-backed path where the simulated path will actually be stored."""
suffix = path.relative_to(self._location)
return self._src / suffix
def create_dir(
self,
path: pathlib.PurePosixPath,
make_parents: bool = False,
**kwargs) -> '_Directory':
if not pathlib.PurePosixPath(path).is_absolute():
raise NonAbsolutePathError(str(path))
path = self.check_contains(path)
srcpath = self._srcpath(path)
if srcpath.exists() and srcpath.is_dir() and make_parents:
return _Directory(path, **kwargs) # nothing to do
if srcpath.exists():
raise FileExistsError(str(path))
dirname = srcpath.parent
if not dirname.exists():
if not make_parents:
raise FileNotFoundError(str(path.parent))
dirname.mkdir(parents=True, exist_ok=True)
srcpath.mkdir(exist_ok=True)
return _Directory(path, **kwargs)
def create_file(
self,
path: pathlib.PurePosixPath,
data: typing.Union[bytes, str, typing.BinaryIO, typing.TextIO],
encoding: str = 'utf-8',
make_dirs: bool = False,
**kwargs
) -> '_File':
path = self.check_contains(path)
srcpath = self._srcpath(path)
dirname = srcpath.parent
if not dirname.exists():
if not make_dirs:
raise FileNotFoundError(str(path.parent))
dirname.mkdir(parents=True, exist_ok=True)
if isinstance(data, str):
data = data.encode(encoding=encoding)
elif not isinstance(data, bytes):
data = data.getvalue()
if isinstance(data, str):
data = data.encode()
with srcpath.open('wb') as f:
f.write(data)
return _File(path, data, encoding=encoding, **kwargs)
def list_dir(self, path: pathlib.PurePosixPath) -> typing.List['_File']:
path = self.check_contains(path)
srcpath = self._srcpath(path)
results = []
if not srcpath.exists():
raise FileNotFoundError(str(path))
if not srcpath.is_dir():
raise NotADirectoryError(str(path))
for fpath in srcpath.iterdir():
mountpath = path / fpath.name
if fpath.is_dir():
results.append(_Directory(mountpath))
elif fpath.is_file():
with fpath.open('rb') as f:
results.append(_File(mountpath, f.read()))
else:
raise RuntimeError('unsupported file type at path {}'.format(fpath))
return results
def open(
self,
path: typing.Union[str, pathlib.PurePosixPath],
encoding: typing.Optional[str] = 'utf-8',
) -> typing.Union[typing.BinaryIO, typing.TextIO]:
path = self.check_contains(path)
file = self.get_path(path)
if isinstance(file, _Directory):
raise IsADirectoryError(str(file.path))
return file.open(encoding=encoding)
def get_path(self, path: typing.Union[str, pathlib.PurePosixPath]
) -> typing.Union['_Directory', '_File']:
path = self.check_contains(path)
srcpath = self._srcpath(path)
if srcpath.is_dir():
return _Directory(path)
if not srcpath.exists():
raise FileNotFoundError(str(path))
with srcpath.open('rb') as f:
return _File(path, f.read())
def delete_path(self, path: typing.Union[str, pathlib.PurePosixPath]) -> None:
path = self.check_contains(path)
srcpath = self._srcpath(path)
if srcpath.exists():
srcpath.unlink()
else:
raise FileNotFoundError(str(path))
class _TestingFilesystem:
r"""An in-memory mock of a pebble-controlled container's filesystem.
For now, the filesystem is assumed to be a POSIX-style filesytem; Windows-style directories
(e.g. \, \foo\bar, C:\foo\bar) are not supported.
"""
def __init__(self):
self.root = _Directory(pathlib.PurePosixPath('/'))
self._mounts = {}
def add_mount(self, name, mount_path, backing_src_path):
self._mounts[name] = _TestingStorageMount(
pathlib.PurePosixPath(mount_path), pathlib.Path(backing_src_path))
def remove_mount(self, name):
if name in self._mounts:
del self._mounts[name]
def create_dir(self, path: str, make_parents: bool = False, **kwargs) -> '_Directory':
if not path.startswith('/'):
raise NonAbsolutePathError(path)
for mount in self._mounts.values():
if mount.contains(path):
return mount.create_dir(path, make_parents, **kwargs)
current_dir = self.root
tokens = pathlib.PurePosixPath(path).parts[1:]
for token in tokens[:-1]:
if token in current_dir:
current_dir = current_dir[token]
else:
if make_parents:
# NOTE: other parameters (e.g. ownership, permissions) only get applied to the
# final directory.
# (At the time of writing, Pebble defaults to 0o755 permissions and root:root
# ownership.)
current_dir = current_dir.create_dir(token)
else:
raise FileNotFoundError(str(current_dir.path / token))
if isinstance(current_dir, _File):
raise NotADirectoryError(str(current_dir.path))
# Current backend will always raise an error if the final directory component
# already exists.
token = tokens[-1]
if token not in current_dir:
current_dir = current_dir.create_dir(token, **kwargs)
else:
# If 'make_parents' is specified, behave like 'mkdir -p' and ignore if the dir already
# exists.
if make_parents:
current_dir = _Directory(current_dir.path / token)
else:
raise FileExistsError(str(current_dir.path / token))
return current_dir
def create_file(
self,
path: str,
data: typing.Union[bytes, str, typing.BinaryIO, typing.TextIO],
encoding: typing.Optional[str] = 'utf-8',
make_dirs: bool = False,
**kwargs
) -> '_File':
if not path.startswith('/'):
raise NonAbsolutePathError(path)
for mount in self._mounts.values():
if mount.contains(path):
return mount.create_file(path, data, encoding, make_dirs, **kwargs)
path_obj = pathlib.PurePosixPath(path)
try:
dir_ = self.get_path(path_obj.parent)
except FileNotFoundError:
if make_dirs:
dir_ = self.create_dir(str(path_obj.parent), make_parents=make_dirs)
# NOTE: other parameters (e.g. ownership, permissions) only get applied to the
# final directory.
# (At the time of writing, Pebble defaults to the specified permissions and
# root:root ownership, which is inconsistent with the push function's
# behavior for parent directories.)
else:
raise
if not isinstance(dir_, _Directory):
raise pebble.PathError(
'generic-file-error', 'parent is not a directory: {}'.format(str(dir_)))
return dir_.create_file(path_obj.name, data, encoding=encoding, **kwargs)
def list_dir(self, path) -> typing.List['_File']:
for mount in self._mounts.values():
if mount.contains(path):
return mount.list_dir(path)
current_dir = self.root
tokens = pathlib.PurePosixPath(path).parts[1:]
for token in tokens:
try:
current_dir = current_dir[token]
except KeyError:
raise FileNotFoundError(str(current_dir.path / token))
if isinstance(current_dir, _File):
raise NotADirectoryError(str(current_dir.path))
if not isinstance(current_dir, _Directory):
# For now, ignoring other possible cases besides File and Directory (e.g. Symlink).
raise NotImplementedError()
return [child for child in current_dir]
def open(
self,
path: typing.Union[str, pathlib.PurePosixPath],
encoding: typing.Optional[str] = 'utf-8',
) -> typing.Union[typing.BinaryIO, typing.TextIO]:
for mount in self._mounts.values():
if mount.contains(path):
return mount.open(path, encoding)
path = pathlib.PurePosixPath(path)
file = self.get_path(path) # warning: no check re: directories
if isinstance(file, _Directory):
raise IsADirectoryError(str(file.path))
return file.open(encoding=encoding)
def get_path(self, path: typing.Union[str, pathlib.PurePosixPath]) \
-> typing.Union['_Directory', '_File']:
for mount in self._mounts.values():
if mount.contains(path):
return mount.get_path(path)
path = pathlib.PurePosixPath(path)
tokens = path.parts[1:]
current_object = self.root
for token in tokens:
# ASSUMPTION / TESTME: object might be file
if token in current_object:
current_object = current_object[token]
else:
raise FileNotFoundError(str(current_object.path / token))
return current_object
def delete_path(self, path: typing.Union[str, pathlib.PurePosixPath]) -> None:
for mount in self._mounts.values():
if mount.contains(path):
return mount.delete_path(path)
path = pathlib.PurePosixPath(path)
parent_dir = self.get_path(path.parent)
del parent_dir[path.name]
class _Directory:
def __init__(self, path: pathlib.PurePosixPath, **kwargs):
self.path = path
self._children = {}
self.last_modified = datetime.datetime.now()
self.kwargs = kwargs
@property
def name(self) -> str:
# Need to handle special case for root.
# pathlib.PurePosixPath('/').name is '', but pebble returns '/'.
return self.path.name if self.path.name else '/'
def __contains__(self, child: str) -> bool:
return child in self._children
def __iter__(self) -> typing.Iterator[typing.Union['_File', '_Directory']]:
return (value for value in self._children.values())
def __getitem__(self, key: str) -> typing.Union['_File', '_Directory']:
return self._children[key]
def __delitem__(self, key: str) -> None:
try:
del self._children[key]
except KeyError:
raise FileNotFoundError(str(self.path / key))
def __len__(self):
return len(self._children)
def create_dir(self, name: str, **kwargs) -> '_Directory':
self._children[name] = _Directory(self.path / name, **kwargs)
return self._children[name]
def create_file(
self,
name: str,
data: typing.Union[bytes, str, typing.BinaryIO, typing.TextIO],
encoding: typing.Optional[str] = 'utf-8',
**kwargs
) -> '_File':
self._children[name] = _File(self.path / name, data, encoding=encoding, **kwargs)
return self._children[name]
class _File:
def __init__(
self,
path: pathlib.PurePosixPath,
data: typing.Union[str, bytes, typing.BinaryIO, typing.TextIO],
encoding: typing.Optional[str] = 'utf-8',
**kwargs):
if hasattr(data, 'read'):
data = data.read()
if isinstance(data, str):
data = data.encode(encoding)
data_size = len(data)
self.path = path
self.data = data
self.size = data_size
self.last_modified = datetime.datetime.now()
self.kwargs = kwargs
@property
def name(self) -> str:
return self.path.name
def open(
self,
encoding: typing.Optional[str] = 'utf-8',
) -> typing.Union[typing.TextIO, typing.BinaryIO]:
if encoding is None:
return BytesIO(self.data)
else:
return StringIO(self.data.decode(encoding))