# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client for the Pebble API (HTTP over Unix socket).
For a command-line interface for local testing, see test/pebble_cli.py.
"""
import binascii
import builtins
import copy
import dataclasses
import datetime
import email.message
import email.parser
import enum
import http.client
import io
import json
import logging
import os
import select
import shutil
import signal
import socket
import sys
import tempfile
import threading
import time
import typing
import urllib.error
import urllib.parse
import urllib.request
import warnings
from typing import (
IO,
TYPE_CHECKING,
Any,
AnyStr,
BinaryIO,
Callable,
Dict,
Generator,
Generic,
Iterable,
List,
Literal,
Optional,
Protocol,
Sequence,
TextIO,
Tuple,
TypedDict,
Union,
)
import websocket # type: ignore
from ops._private import timeconv, yaml
# Public as these are used in the Container.add_layer signature
ServiceDict = typing.TypedDict('ServiceDict',
{'summary': str,
'description': str,
'startup': str,
'override': str,
'command': str,
'after': Sequence[str],
'before': Sequence[str],
'requires': Sequence[str],
'environment': Dict[str, str],
'user': str,
'user-id': Optional[int],
'group': str,
'group-id': Optional[int],
'working-dir': str,
'on-success': str,
'on-failure': str,
'on-check-failure': Dict[str, Any],
'backoff-delay': str,
'backoff-factor': Optional[int],
'backoff-limit': str,
'kill-delay': Optional[str],
},
total=False)
HttpDict = typing.TypedDict('HttpDict',
{'url': str,
'headers': Dict[str, str]},
total=False)
TcpDict = typing.TypedDict('TcpDict',
{'port': int,
'host': str},
total=False)
ExecDict = typing.TypedDict('ExecDict',
{'command': str,
# see JujuVersion.supports_exec_service_context
'service-context': str,
'environment': Dict[str, str],
'user-id': Optional[int],
'user': str,
'group-id': Optional[int],
'group': str,
'working-dir': str},
total=False)
CheckDict = typing.TypedDict('CheckDict',
{'override': str,
'level': Union['CheckLevel', str],
'period': Optional[str],
'timeout': Optional[str],
'http': Optional[HttpDict],
'tcp': Optional[TcpDict],
'exec': Optional[ExecDict],
'threshold': Optional[int]},
total=False)
# In Python 3.11+ 'services' and 'labels' should be NotRequired, and total=True.
LogTargetDict = typing.TypedDict('LogTargetDict',
{'override': Union[Literal['merge'], Literal['replace']],
'type': Literal['loki'],
'location': str,
'services': List[str],
'labels': Dict[str, str]},
total=False)
LayerDict = typing.TypedDict('LayerDict',
{'summary': str,
'description': str,
'services': Dict[str, ServiceDict],
'checks': Dict[str, CheckDict],
'log-targets': Dict[str, LogTargetDict]},
total=False)
PlanDict = typing.TypedDict('PlanDict',
{'services': Dict[str, ServiceDict],
'checks': Dict[str, CheckDict],
'log-targets': Dict[str, LogTargetDict]},
total=False)
_AuthDict = TypedDict('_AuthDict',
{'permissions': Optional[str],
'user-id': Optional[int],
'user': Optional[str],
'group-id': Optional[int],
'group': Optional[str],
'path': Optional[str],
'make-dirs': Optional[bool],
'make-parents': Optional[bool],
}, total=False)
_ServiceInfoDict = TypedDict('_ServiceInfoDict',
{'startup': Union['ServiceStartup', str],
'current': Union['ServiceStatus', str],
'name': str})
# Callback types for _MultiParser header and body handlers
class _BodyHandler(Protocol):
def __call__(self, data: bytes, done: bool = False) -> None: ...
_HeaderHandler = Callable[[bytes], None]
# tempfile.NamedTemporaryFile has an odd interface because of that
# 'name' attribute, so we need to make a Protocol for it.
class _Tempfile(Protocol):
name = ''
def write(self, data: bytes): ...
def close(self): ...
class _FileLikeIO(Protocol[typing.AnyStr]): # That also covers TextIO and BytesIO
def read(self, __n: int = ...) -> typing.AnyStr: ... # for BinaryIO
def write(self, __s: typing.AnyStr) -> int: ...
def __enter__(self) -> typing.IO[typing.AnyStr]: ...
_AnyStrFileLikeIO = Union[_FileLikeIO[bytes], _FileLikeIO[str]]
_TextOrBinaryIO = Union[TextIO, BinaryIO]
_IOSource = Union[str, bytes, _AnyStrFileLikeIO]
_SystemInfoDict = TypedDict('_SystemInfoDict', {'version': str})
if TYPE_CHECKING:
from typing_extensions import NotRequired
_CheckInfoDict = TypedDict('_CheckInfoDict',
{"name": str,
"level": NotRequired[Optional[Union['CheckLevel', str]]],
"status": Union['CheckStatus', str],
"failures": NotRequired[int],
"threshold": int})
_FileInfoDict = TypedDict('_FileInfoDict',
{"path": str,
"name": str,
"size": NotRequired[Optional[int]],
"permissions": str,
"last-modified": str,
"user-id": NotRequired[Optional[int]],
"user": NotRequired[Optional[str]],
"group-id": NotRequired[Optional[int]],
"group": NotRequired[Optional[str]],
"type": Union['FileType', str]})
_ProgressDict = TypedDict('_ProgressDict',
{'label': str,
'done': int,
'total': int})
_TaskDict = TypedDict('_TaskDict',
{'id': str,
'kind': str,
'summary': str,
'status': str,
'log': NotRequired[Optional[List[str]]],
'progress': _ProgressDict,
'spawn-time': str,
'ready-time': NotRequired[Optional[str]],
'data': NotRequired[Optional[Dict[str, Any]]]})
_ChangeDict = TypedDict('_ChangeDict',
{'id': str,
'kind': str,
'summary': str,
'status': str,
'ready': bool,
'spawn-time': str,
'tasks': NotRequired[Optional[List[_TaskDict]]],
'err': NotRequired[Optional[str]],
'ready-time': NotRequired[Optional[str]],
'data': NotRequired[Optional[Dict[str, Any]]]})
_Error = TypedDict('_Error',
{'kind': str,
'message': str})
_Item = TypedDict('_Item',
{'path': str,
'error': NotRequired[_Error]})
_FilesResponse = TypedDict('_FilesResponse',
{'result': List[_Item]})
_WarningDict = TypedDict('_WarningDict',
{'message': str,
'first-added': str,
'last-added': str,
'last-shown': NotRequired[Optional[str]],
'expire-after': str,
'repeat-after': str})
_NoticeDict = TypedDict('_NoticeDict', {
'id': str,
'user-id': NotRequired[Optional[int]],
'type': str,
'key': str,
'first-occurred': str,
'last-occurred': str,
'last-repeated': str,
'occurrences': int,
'last-data': NotRequired[Optional[Dict[str, str]]],
'repeat-after': NotRequired[str],
'expire-after': NotRequired[str],
})
class _WebSocket(Protocol):
def connect(self, url: str, socket: socket.socket): ...
def shutdown(self): ...
def send(self, payload: str): ...
def send_binary(self, payload: bytes): ...
def recv(self) -> Union[str, bytes]: ...
logger = logging.getLogger(__name__)
class _NotProvidedFlag:
pass
_not_provided = _NotProvidedFlag()
class _UnixSocketConnection(http.client.HTTPConnection):
"""Implementation of HTTPConnection that connects to a named Unix socket."""
def __init__(self, host: str, socket_path: str,
timeout: Union[_NotProvidedFlag, float] = _not_provided):
if timeout is _not_provided:
super().__init__(host)
else:
assert isinstance(timeout, (int, float)), timeout # type guard for pyright
super().__init__(host, timeout=timeout)
self.socket_path = socket_path
def connect(self):
"""Override connect to use Unix socket (instead of TCP socket)."""
if not hasattr(socket, 'AF_UNIX'):
raise NotImplementedError(f'Unix sockets not supported on {sys.platform}')
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(self.socket_path)
if self.timeout is not _not_provided:
self.sock.settimeout(self.timeout)
class _UnixSocketHandler(urllib.request.AbstractHTTPHandler):
"""Implementation of HTTPHandler that uses a named Unix socket."""
def __init__(self, socket_path: str):
super().__init__()
self.socket_path = socket_path
def http_open(self, req: urllib.request.Request):
"""Override http_open to use a Unix socket connection (instead of TCP)."""
return self.do_open(_UnixSocketConnection, req, # type:ignore
socket_path=self.socket_path)
def _format_timeout(timeout: float) -> str:
"""Format timeout for use in the Pebble API.
The format is in seconds with a millisecond resolution and an 's' suffix,
as accepted by the Pebble API (which uses Go's time.ParseDuration).
"""
return f'{timeout:.3f}s'
def _start_thread(target: Callable[..., Any], *args: Any, **kwargs: Any) -> threading.Thread:
"""Helper to simplify starting a thread."""
thread = threading.Thread(target=target, args=args, kwargs=kwargs)
thread.start()
return thread
[docs]class Error(Exception):
"""Base class of most errors raised by the Pebble client."""
def __repr__(self):
return f'<{type(self).__module__}.{type(self).__name__} {self.args}>'
[docs]class TimeoutError(TimeoutError, Error):
"""Raised when a polling timeout occurs."""
[docs]class ConnectionError(Error):
"""Raised when the Pebble client can't connect to the socket."""
[docs]class ProtocolError(Error):
"""Raised when there's a higher-level protocol error talking to Pebble."""
[docs]class PathError(Error):
"""Raised when there's an error with a specific path."""
kind: typing.Literal["not-found", "permission-denied", "generic-file-error"]
"""Short string representing the kind of error."""
message: str
"""Human-readable error message from the API."""
def __init__(self, kind: str, message: str):
"""This shouldn't be instantiated directly."""
self.kind = kind # type: ignore
self.message = message
def __str__(self):
return f'{self.kind} - {self.message}'
def __repr__(self):
return f'PathError({self.kind!r}, {self.message!r})'
[docs]class APIError(Error):
"""Raised when an HTTP API error occurs talking to the Pebble server."""
body: Dict[str, Any]
"""Body of the HTTP response, parsed as JSON."""
code: int
"""HTTP status code."""
status: str
"""HTTP status string (reason)."""
message: str
"""Human-readable error message from the API."""
def __init__(self, body: Dict[str, Any], code: int, status: str, message: str):
"""This shouldn't be instantiated directly."""
super().__init__(message) # Makes str(e) return message
self.body = body
self.code = code
self.status = status
self.message = message
def __repr__(self):
return f'APIError({self.body!r}, {self.code!r}, {self.status!r}, {self.message!r})'
[docs]class ChangeError(Error):
"""Raised by actions when a change is ready but has an error."""
err: str
"""Human-readable error message."""
change: 'Change'
"""Change object associated with this error."""
def __init__(self, err: str, change: 'Change'):
"""This shouldn't be instantiated directly."""
self.err = err
self.change = change
def __str__(self):
parts = [self.err]
# Append any task logs to the error message
for i, task in enumerate(self.change.tasks):
if not task.log:
continue
parts.append(f'\n----- Logs from task {i} -----\n')
parts.append('\n'.join(task.log))
if len(parts) > 1:
parts.append('\n-----')
return ''.join(parts)
def __repr__(self):
return f'ChangeError({self.err!r}, {self.change!r})'
[docs]class ExecError(Error, Generic[AnyStr]):
"""Raised when a :meth:`Client.exec` command returns a non-zero exit code."""
STR_MAX_OUTPUT = 1024
"""Maximum number of characters that stdout/stderr are truncated to in ``__str__``."""
command: List[str]
"""Command line of command being executed."""
exit_code: int
"""The process's exit code. Because this is an error, this will always be non-zero."""
stdout: Optional[AnyStr]
"""Standard output from the process.
If :meth:`ExecProcess.wait_output` was being called, this is the captured
stdout as a str (or bytes if encoding was None). If :meth:`ExecProcess.wait`
was being called, this is None.
"""
stderr: Optional[AnyStr]
"""Standard error from the process.
If :meth:`ExecProcess.wait_output` was being called and ``combine_stderr``
was False, this is the captured stderr as a str (or bytes if encoding was
None). If :meth:`ExecProcess.wait` was being called or ``combine_stderr``
was True, this is None.
"""
def __init__(
self,
command: List[str],
exit_code: int,
stdout: Optional[AnyStr],
stderr: Optional[AnyStr],
):
self.command = command
self.exit_code = exit_code
self.stdout = stdout
self.stderr = stderr
def __str__(self):
message = f'non-zero exit code {self.exit_code} executing {self.command!r}'
for name, out in [('stdout', self.stdout), ('stderr', self.stderr)]:
if out is None:
continue
truncated = ' [truncated]' if len(out) > self.STR_MAX_OUTPUT else ''
out = out[:self.STR_MAX_OUTPUT]
message = f'{message}, {name}={out!r}{truncated}'
return message
[docs]class WarningState(enum.Enum):
"""Enum of states for get_warnings() select parameter."""
ALL = 'all'
PENDING = 'pending'
[docs]class ChangeState(enum.Enum):
"""Enum of states for get_changes() select parameter."""
ALL = 'all'
IN_PROGRESS = 'in-progress'
READY = 'ready'
[docs]class SystemInfo:
"""System information object."""
def __init__(self, version: str):
self.version = version
[docs] @classmethod
def from_dict(cls, d: '_SystemInfoDict') -> 'SystemInfo':
"""Create new SystemInfo object from dict parsed from JSON."""
return cls(version=d['version'])
def __repr__(self):
return f'SystemInfo(version={self.version!r})'
[docs]class Warning:
"""Warning object."""
def __init__(
self,
message: str,
first_added: datetime.datetime,
last_added: datetime.datetime,
last_shown: Optional[datetime.datetime],
expire_after: str,
repeat_after: str,
):
self.message = message
self.first_added = first_added
self.last_added = last_added
self.last_shown = last_shown
self.expire_after = expire_after
self.repeat_after = repeat_after
[docs] @classmethod
def from_dict(cls, d: '_WarningDict') -> 'Warning':
"""Create new Warning object from dict parsed from JSON."""
return cls(
message=d['message'],
first_added=timeconv.parse_rfc3339(d['first-added']),
last_added=timeconv.parse_rfc3339(d['last-added']),
last_shown=(timeconv.parse_rfc3339(d['last-shown']) # type: ignore
if d.get('last-shown') else None),
expire_after=d['expire-after'],
repeat_after=d['repeat-after'],
)
def __repr__(self):
return ('Warning('
f'message={self.message!r}, '
f'first_added={self.first_added!r}, '
f'last_added={self.last_added!r}, '
f'last_shown={self.last_shown!r}, '
f'expire_after={self.expire_after!r}, '
f'repeat_after={self.repeat_after!r})'
)
[docs]class TaskProgress:
"""Task progress object."""
def __init__(
self,
label: str,
done: int,
total: int,
):
self.label = label
self.done = done
self.total = total
[docs] @classmethod
def from_dict(cls, d: '_ProgressDict') -> 'TaskProgress':
"""Create new TaskProgress object from dict parsed from JSON."""
return cls(
label=d['label'],
done=d['done'],
total=d['total'],
)
def __repr__(self):
return ('TaskProgress('
f'label={self.label!r}, '
f'done={self.done!r}, '
f'total={self.total!r})'
)
[docs]class TaskID(str):
"""Task ID (a more strongly-typed string)."""
def __repr__(self):
return f'TaskID({str(self)!r})'
[docs]class Task:
"""Task object."""
def __init__(
self,
id: TaskID,
kind: str,
summary: str,
status: str,
log: List[str],
progress: TaskProgress,
spawn_time: datetime.datetime,
ready_time: Optional[datetime.datetime],
data: Optional[Dict[str, Any]] = None,
):
self.id = id
self.kind = kind
self.summary = summary
self.status = status
self.log = log
self.progress = progress
self.spawn_time = spawn_time
self.ready_time = ready_time
self.data = data or {}
[docs] @classmethod
def from_dict(cls, d: '_TaskDict') -> 'Task':
"""Create new Task object from dict parsed from JSON."""
return cls(
id=TaskID(d['id']),
kind=d['kind'],
summary=d['summary'],
status=d['status'],
log=d.get('log') or [],
progress=TaskProgress.from_dict(d['progress']),
spawn_time=timeconv.parse_rfc3339(d['spawn-time']),
ready_time=(timeconv.parse_rfc3339(d['ready-time']) # type: ignore
if d.get('ready-time') else None),
data=d.get('data') or {},
)
def __repr__(self):
return ('Task('
f'id={self.id!r}, '
f'kind={self.kind!r}, '
f'summary={self.summary!r}, '
f'status={self.status!r}, '
f'log={self.log!r}, '
f'progress={self.progress!r}, '
f'spawn_time={self.spawn_time!r}, '
f'ready_time={self.ready_time!r}, '
f'data={self.data!r})'
)
[docs]class ChangeID(str):
"""Change ID (a more strongly-typed string)."""
def __repr__(self):
return f'ChangeID({str(self)!r})'
[docs]class Change:
"""Change object."""
def __init__(
self,
id: ChangeID,
kind: str,
summary: str,
status: str,
tasks: List[Task],
ready: bool,
err: Optional[str],
spawn_time: datetime.datetime,
ready_time: Optional[datetime.datetime],
data: Optional[Dict[str, Any]] = None,
):
self.id = id
self.kind = kind
self.summary = summary
self.status = status
self.tasks = tasks
self.ready = ready
self.err = err
self.spawn_time = spawn_time
self.ready_time = ready_time
self.data = data or {}
[docs] @classmethod
def from_dict(cls, d: '_ChangeDict') -> 'Change':
"""Create new Change object from dict parsed from JSON."""
return cls(
id=ChangeID(d['id']),
kind=d['kind'],
summary=d['summary'],
status=d['status'],
tasks=[Task.from_dict(t) for t in d.get('tasks') or []],
ready=d['ready'],
err=d.get('err'),
spawn_time=timeconv.parse_rfc3339(d['spawn-time']),
ready_time=(timeconv.parse_rfc3339(d['ready-time']) # type: ignore
if d.get('ready-time') else None),
data=d.get('data') or {},
)
def __repr__(self):
return ('Change('
f'id={self.id!r}, '
f'kind={self.kind!r}, '
f'summary={self.summary!r}, '
f'status={self.status!r}, '
f'tasks={self.tasks!r}, '
f'ready={self.ready!r}, '
f'err={self.err!r}, '
f'spawn_time={self.spawn_time!r}, '
f'ready_time={self.ready_time!r}, '
f'data={self.data!r})'
)
[docs]class Plan:
"""Represents the effective Pebble configuration.
A plan is the combined layer configuration. The layer configuration is
documented at https://github.com/canonical/pebble/#layer-specification.
"""
def __init__(self, raw: Optional[Union[str, 'PlanDict']] = None):
if isinstance(raw, str): # noqa: SIM108
d = yaml.safe_load(raw) or {} # type: ignore
else:
d = raw or {}
d = typing.cast('PlanDict', d)
self._raw = raw
self._services: Dict[str, Service] = {name: Service(name, service)
for name, service in d.get('services', {}).items()}
self._checks: Dict[str, Check] = {name: Check(name, check)
for name, check in d.get('checks', {}).items()}
self._log_targets: Dict[str, LogTarget] = {
name: LogTarget(name, target)
for name, target in d.get('log-targets', {}).items()}
@property
def services(self) -> Dict[str, 'Service']:
"""This plan's services mapping (maps service name to Service).
This property is currently read-only.
"""
return self._services
@property
def checks(self) -> Dict[str, 'Check']:
"""This plan's checks mapping (maps check name to :class:`Check`).
This property is currently read-only.
"""
return self._checks
@property
def log_targets(self) -> Dict[str, 'LogTarget']:
"""This plan's log targets mapping (maps log target name to :class:`LogTarget`).
This property is currently read-only.
"""
return self._log_targets
[docs] def to_dict(self) -> 'PlanDict':
"""Convert this plan to its dict representation."""
fields = [
('services', {name: service.to_dict() for name, service in self._services.items()}),
('checks', {name: check.to_dict() for name, check in self._checks.items()}),
('log-targets', {name: target.to_dict() for name, target in self._log_targets.items()})
]
dct = {name: value for name, value in fields if value}
return typing.cast('PlanDict', dct)
[docs] def to_yaml(self) -> str:
"""Return this plan's YAML representation."""
return yaml.safe_dump(self.to_dict())
__str__ = to_yaml
def __eq__(self, other: Union['PlanDict', 'Plan']) -> bool:
if isinstance(other, dict):
return self.to_dict() == other
elif isinstance(other, Plan):
return self.to_dict() == other.to_dict()
return NotImplemented
[docs]class Layer:
"""Represents a Pebble configuration layer.
The format of this is documented at
https://github.com/canonical/pebble/#layer-specification.
"""
#: Summary of the purpose of this layer.
summary: str
#: Long-form description of this layer.
description: str
#: Mapping of name to :class:`Service` defined by this layer.
services: Dict[str, 'Service']
#: Mapping of check to :class:`Check` defined by this layer.
checks: Dict[str, 'Check']
#: Mapping of target to :class:`LogTarget` defined by this layer.
log_targets: Dict[str, 'LogTarget']
def __init__(self, raw: Optional[Union[str, 'LayerDict']] = None):
if isinstance(raw, str): # noqa: SIM108
d = yaml.safe_load(raw) or {} # type: ignore # (Any 'raw' type)
else:
d = raw or {}
d = typing.cast('LayerDict', d)
self.summary = d.get('summary', '')
self.description = d.get('description', '')
self.services = {name: Service(name, service)
for name, service in d.get('services', {}).items()}
self.checks = {name: Check(name, check)
for name, check in d.get('checks', {}).items()}
self.log_targets = {name: LogTarget(name, target)
for name, target in d.get('log-targets', {}).items()}
[docs] def to_yaml(self) -> str:
"""Convert this layer to its YAML representation."""
return yaml.safe_dump(self.to_dict())
[docs] def to_dict(self) -> 'LayerDict':
"""Convert this layer to its dict representation."""
fields = [
('summary', self.summary),
('description', self.description),
('services', {name: service.to_dict() for name, service in self.services.items()}),
('checks', {name: check.to_dict() for name, check in self.checks.items()}),
('log-targets', {name: target.to_dict() for name, target in self.log_targets.items()})
]
dct = {name: value for name, value in fields if value}
return typing.cast('LayerDict', dct)
def __repr__(self) -> str:
return f'Layer({self.to_dict()!r})'
def __eq__(self, other: Union['LayerDict', 'Layer']) -> bool:
"""Reports whether this layer configuration is equal to another."""
if isinstance(other, dict):
return self.to_dict() == other
elif isinstance(other, Layer):
return self.to_dict() == other.to_dict()
else:
return NotImplemented
__str__ = to_yaml
[docs]class Service:
"""Represents a service description in a Pebble configuration layer."""
def __init__(self, name: str, raw: Optional['ServiceDict'] = None):
self.name = name
dct: ServiceDict = raw or {}
self.summary = dct.get('summary', '')
self.description = dct.get('description', '')
self.startup = dct.get('startup', '')
self.override = dct.get('override', '')
self.command = dct.get('command', '')
self.after = list(dct.get('after', []))
self.before = list(dct.get('before', []))
self.requires = list(dct.get('requires', []))
self.environment = dict(dct.get('environment', {}))
self.user = dct.get('user', '')
self.user_id = dct.get('user-id')
self.group = dct.get('group', '')
self.group_id = dct.get('group-id')
self.working_dir = dct.get('working-dir', '')
self.on_success = dct.get('on-success', '')
self.on_failure = dct.get('on-failure', '')
self.on_check_failure = dict(dct.get('on-check-failure', {}))
self.backoff_delay = dct.get('backoff-delay', '')
self.backoff_factor = dct.get('backoff-factor')
self.backoff_limit = dct.get('backoff-limit', '')
self.kill_delay = dct.get('kill-delay', '')
[docs] def to_dict(self) -> 'ServiceDict':
"""Convert this service object to its dict representation."""
fields = [
('summary', self.summary),
('description', self.description),
('startup', self.startup),
('override', self.override),
('command', self.command),
('after', self.after),
('before', self.before),
('requires', self.requires),
('environment', self.environment),
('user', self.user),
('user-id', self.user_id),
('group', self.group),
('group-id', self.group_id),
('working-dir', self.working_dir),
('on-success', self.on_success),
('on-failure', self.on_failure),
('on-check-failure', self.on_check_failure),
('backoff-delay', self.backoff_delay),
('backoff-factor', self.backoff_factor),
('backoff-limit', self.backoff_limit),
('kill-delay', self.kill_delay),
]
dct = {name: value for name, value in fields if value}
return typing.cast('ServiceDict', dct)
def _merge(self, other: 'Service'):
"""Merges this service object with another service definition.
For attributes present in both objects, the passed in service
attributes take precedence.
"""
for name, value in other.__dict__.items():
if not value or name == 'name':
continue
if name in ['after', 'before', 'requires']:
getattr(self, name).extend(value)
elif name in ['environment', 'on_check_failure']:
getattr(self, name).update(value)
else:
setattr(self, name, value)
def __repr__(self) -> str:
return f'Service({self.to_dict()!r})'
def __eq__(self, other: Union['ServiceDict', 'Service']) -> bool:
"""Reports whether this service configuration is equal to another."""
if isinstance(other, dict):
return self.to_dict() == other
elif isinstance(other, Service):
return self.to_dict() == other.to_dict()
else:
return NotImplemented
[docs]class ServiceStartup(enum.Enum):
"""Enum of service startup options."""
ENABLED = 'enabled'
DISABLED = 'disabled'
[docs]class ServiceStatus(enum.Enum):
"""Enum of service statuses."""
ACTIVE = 'active'
INACTIVE = 'inactive'
ERROR = 'error'
[docs]class ServiceInfo:
"""Service status information."""
def __init__(
self,
name: str,
startup: Union[ServiceStartup, str],
current: Union[ServiceStatus, str],
):
self.name = name
self.startup = startup
self.current = current
[docs] def is_running(self) -> bool:
"""Return True if this service is running (in the active state)."""
return self.current == ServiceStatus.ACTIVE
[docs] @classmethod
def from_dict(cls, d: '_ServiceInfoDict') -> 'ServiceInfo':
"""Create new ServiceInfo object from dict parsed from JSON."""
try:
startup = ServiceStartup(d['startup'])
except ValueError:
startup = d['startup']
try:
current = ServiceStatus(d['current'])
except ValueError:
current = d['current']
return cls(
name=d['name'],
startup=startup,
current=current,
)
def __repr__(self):
return ('ServiceInfo('
f'name={self.name!r}, '
f'startup={self.startup}, '
f'current={self.current})'
)
[docs]class Check:
"""Represents a check in a Pebble configuration layer."""
def __init__(self, name: str, raw: Optional['CheckDict'] = None):
self.name = name
dct: CheckDict = raw or {}
self.override: str = dct.get('override', '')
try:
level: Union[CheckLevel, str] = CheckLevel(dct.get('level', ''))
except ValueError:
level = dct.get('level', '')
self.level = level
self.period: Optional[str] = dct.get('period', '')
self.timeout: Optional[str] = dct.get('timeout', '')
self.threshold: Optional[int] = dct.get('threshold')
http = dct.get('http')
if http is not None:
http = copy.deepcopy(http)
self.http: Optional[HttpDict] = http
tcp = dct.get('tcp')
if tcp is not None:
tcp = copy.deepcopy(tcp)
self.tcp: Optional[TcpDict] = tcp
exec_ = dct.get('exec')
if exec_ is not None:
exec_ = copy.deepcopy(exec_)
self.exec: Optional[ExecDict] = exec_
[docs] def to_dict(self) -> 'CheckDict':
"""Convert this check object to its dict representation."""
level: str = self.level.value if isinstance(self.level, CheckLevel) else self.level
fields = [
('override', self.override),
('level', level),
('period', self.period),
('timeout', self.timeout),
('threshold', self.threshold),
('http', self.http),
('tcp', self.tcp),
('exec', self.exec),
]
dct = {name: value for name, value in fields if value}
return typing.cast('CheckDict', dct)
def __repr__(self) -> str:
return f'Check({self.to_dict()!r})'
def __eq__(self, other: Union['CheckDict', 'Check']) -> bool:
"""Reports whether this check configuration is equal to another."""
if isinstance(other, dict):
return self.to_dict() == other
elif isinstance(other, Check):
return self.to_dict() == other.to_dict()
else:
return NotImplemented
[docs]class CheckLevel(enum.Enum):
"""Enum of check levels."""
UNSET = ''
ALIVE = 'alive'
READY = 'ready'
[docs]class CheckStatus(enum.Enum):
"""Enum of check statuses."""
UP = 'up'
DOWN = 'down'
[docs]class LogTarget:
"""Represents a log target in a Pebble configuration layer."""
def __init__(self, name: str, raw: Optional['LogTargetDict'] = None):
self.name = name
dct: LogTargetDict = raw or {}
self.override: str = dct.get('override', '')
self.type = dct.get('type', '')
self.location = dct.get('location', '')
self.services: List[str] = list(dct.get('services', []))
labels = dct.get('labels')
if labels is not None:
labels = copy.deepcopy(labels)
self.labels: Optional[Dict[str, str]] = labels
[docs] def to_dict(self) -> 'LogTargetDict':
"""Convert this log target object to its dict representation."""
fields = [
('override', self.override),
('type', self.type),
('location', self.location),
('services', self.services),
('labels', self.labels),
]
dct = {name: value for name, value in fields if value}
return typing.cast('LogTargetDict', dct)
def __repr__(self):
return f'LogTarget({self.to_dict()!r})'
def __eq__(self, other: Union['LogTargetDict', 'LogTarget']):
if isinstance(other, dict):
return self.to_dict() == other
elif isinstance(other, LogTarget):
return self.to_dict() == other.to_dict()
else:
return NotImplemented
[docs]class FileType(enum.Enum):
"""Enum of file types."""
FILE = 'file'
DIRECTORY = 'directory'
SYMLINK = 'symlink'
SOCKET = 'socket'
NAMED_PIPE = 'named-pipe'
DEVICE = 'device'
UNKNOWN = 'unknown'
[docs]class FileInfo:
"""Stat-like information about a single file or directory."""
path: str
"""Full path of the file."""
name: str
"""Base name of the file."""
type: Union['FileType', str]
"""Type of the file ("file", "directory", "symlink", etc)."""
size: Optional[int]
"""Size of the file (will be 0 if ``type`` is not "file")."""
permissions: int
"""Unix permissions of the file."""
last_modified: datetime.datetime
"""Time file was last modified."""
user_id: Optional[int]
"""User ID of the file."""
user: Optional[str]
"""Username of the file."""
group_id: Optional[int]
"""Group ID of the file."""
group: Optional[str]
"""Group name of the file."""
def __init__(
self,
path: str,
name: str,
type: Union['FileType', str],
size: Optional[int],
permissions: int,
last_modified: datetime.datetime,
user_id: Optional[int],
user: Optional[str],
group_id: Optional[int],
group: Optional[str],
):
self.path = path
self.name = name
self.type = type
self.size = size
self.permissions = permissions
self.last_modified = last_modified
self.user_id = user_id
self.user = user
self.group_id = group_id
self.group = group
[docs] @classmethod
def from_dict(cls, d: '_FileInfoDict') -> 'FileInfo':
"""Create new FileInfo object from dict parsed from JSON."""
try:
file_type = FileType(d['type'])
except ValueError:
file_type = d['type']
return cls(
path=d['path'],
name=d['name'],
type=file_type,
size=d.get('size'),
permissions=int(d['permissions'], 8),
last_modified=timeconv.parse_rfc3339(d['last-modified']),
user_id=d.get('user-id'),
user=d.get('user'),
group_id=d.get('group-id'),
group=d.get('group'),
)
def __repr__(self):
return ('FileInfo('
f'path={self.path!r}, '
f'name={self.name!r}, '
f'type={self.type}, '
f'size={self.size}, '
f'permissions=0o{self.permissions:o}, '
f'last_modified={self.last_modified!r}, '
f'user_id={self.user_id}, '
f'user={self.user!r}, '
f'group_id={self.group_id}, '
f'group={self.group!r})'
)
[docs]class CheckInfo:
"""Check status information.
A list of these objects is returned from :meth:`Client.get_checks`.
"""
name: str
"""Name of the check."""
level: Optional[Union[CheckLevel, str]]
"""Check level.
This can be :attr:`CheckLevel.ALIVE`, :attr:`CheckLevel.READY`, or None (level not set).
"""
status: Union[CheckStatus, str]
"""Status of the check.
:attr:`CheckStatus.UP` means the check is healthy (the number of failures
is less than the threshold), :attr:`CheckStatus.DOWN` means the check is
unhealthy (the number of failures has reached the threshold).
"""
failures: int
"""Number of failures since the check last succeeded.
This is reset to zero if the check succeeds.
"""
threshold: int
"""Failure threshold.
This is how many consecutive failures for the check to be considered "down".
"""
def __init__(
self,
name: str,
level: Optional[Union[CheckLevel, str]],
status: Union[CheckStatus, str],
failures: int = 0,
threshold: int = 0,
):
self.name = name
self.level = level
self.status = status
self.failures = failures
self.threshold = threshold
[docs] @classmethod
def from_dict(cls, d: '_CheckInfoDict') -> 'CheckInfo':
"""Create new :class:`CheckInfo` object from dict parsed from JSON."""
try:
level = CheckLevel(d.get('level', ''))
except ValueError:
level = d.get('level')
try:
status = CheckStatus(d['status'])
except ValueError:
status = d['status']
return cls(
name=d['name'],
level=level,
status=status,
failures=d.get('failures', 0),
threshold=d['threshold'],
)
def __repr__(self):
return ('CheckInfo('
f'name={self.name!r}, '
f'level={self.level!r}, '
f'status={self.status}, '
f'failures={self.failures}, '
f'threshold={self.threshold!r})'
)
[docs]class NoticeType(enum.Enum):
"""Enum of notice types."""
CUSTOM = 'custom'
[docs]class NoticesUsers(enum.Enum):
"""Enum of :meth:`Client.get_notices` ``users`` values."""
ALL = 'all'
"""Return notices from all users (any user ID, including public notices).
This only works for Pebble admins (for example, root).
"""
[docs]@dataclasses.dataclass(frozen=True)
class Notice:
"""Information about a single notice."""
id: str
"""Server-generated unique ID for this notice."""
user_id: Optional[int]
"""UID of the user who may view this notice (None means notice is public)."""
type: Union[NoticeType, str]
"""Type of the notice."""
key: str
"""The notice key, a string that differentiates notices of this type.
This is in the format ``example.com/path``.
"""
first_occurred: datetime.datetime
"""The first time one of these notices (type and key combination) occurs."""
last_occurred: datetime.datetime
"""The last time one of these notices occurred."""
last_repeated: datetime.datetime
"""The time this notice was last repeated.
See Pebble's `Notices documentation <https://github.com/canonical/pebble/#notices>`_
for an explanation of what "repeated" means.
"""
occurrences: int
"""The number of times one of these notices has occurred."""
last_data: Dict[str, str] = dataclasses.field(default_factory=dict)
"""Additional data captured from the last occurrence of one of these notices."""
repeat_after: Optional[datetime.timedelta] = None
"""Minimum time after one of these was last repeated before Pebble will repeat it again."""
expire_after: Optional[datetime.timedelta] = None
"""How long since one of these last occurred until Pebble will drop the notice."""
[docs] @classmethod
def from_dict(cls, d: '_NoticeDict') -> 'Notice':
"""Create new Notice object from dict parsed from JSON."""
try:
notice_type = NoticeType(d['type'])
except ValueError:
notice_type = d['type']
return cls(
id=d['id'],
user_id=d.get('user-id'),
type=notice_type,
key=d['key'],
first_occurred=timeconv.parse_rfc3339(d['first-occurred']),
last_occurred=timeconv.parse_rfc3339(d['last-occurred']),
last_repeated=timeconv.parse_rfc3339(d['last-repeated']),
occurrences=d['occurrences'],
last_data=d.get('last-data') or {},
repeat_after=timeconv.parse_duration(d['repeat-after'])
if 'repeat-after' in d else None,
expire_after=timeconv.parse_duration(d['expire-after'])
if 'expire-after' in d else None,
)
[docs]class ExecProcess(Generic[AnyStr]):
"""Represents a process started by :meth:`Client.exec`.
To avoid deadlocks, most users should use :meth:`wait_output` instead of
reading and writing the :attr:`stdin`, :attr:`stdout`, and :attr:`stderr`
attributes directly. Alternatively, users can pass stdin/stdout/stderr to
:meth:`Client.exec`.
This class should not be instantiated directly, only via
:meth:`Client.exec`.
"""
stdin: Optional[IO[AnyStr]]
"""Standard input for the process.
If the stdin argument was not passed to :meth:`Client.exec`, this is a
writable file-like object the caller can use to stream input to the
process. It is None if stdin was passed to :meth:`Client.exec`.
"""
stdout: Optional[IO[AnyStr]]
"""Standard output from the process.
If the stdout argument was not passed to :meth:`Client.exec`, this is a
readable file-like object the caller can use to stream output from the
process. It is None if stdout was passed to :meth:`Client.exec`.
"""
stderr: Optional[IO[AnyStr]]
"""Standard error from the process.
If the stderr argument was not passed to :meth:`Client.exec` and
``combine_stderr`` was False, this is a readable file-like object the
caller can use to stream error output from the process. It is None if
stderr was passed to :meth:`Client.exec` or ``combine_stderr`` was True.
"""
def __init__(
self,
stdin: Optional[IO[AnyStr]],
stdout: Optional[IO[AnyStr]],
stderr: Optional[IO[AnyStr]],
client: 'Client',
timeout: Optional[float],
control_ws: '_WebSocket',
stdio_ws: '_WebSocket',
stderr_ws: Optional['_WebSocket'],
command: List[str],
encoding: Optional[str],
change_id: ChangeID,
cancel_stdin: Optional[Callable[[], None]],
cancel_reader: Optional[int],
threads: List[threading.Thread],
):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self._client = client
self._timeout = timeout
self._control_ws = control_ws
self._stdio_ws = stdio_ws
self._stderr_ws = stderr_ws
self._command = command
self._encoding = encoding
self._change_id = change_id
self._cancel_stdin = cancel_stdin
self._cancel_reader = cancel_reader
self._threads = threads
self._waited = False
def __del__(self):
if not self._waited:
msg = 'ExecProcess instance garbage collected without call to wait() or wait_output()'
warnings.warn(msg, ResourceWarning)
[docs] def wait(self):
"""Wait for the process to finish.
If a timeout was specified to the :meth:`Client.exec` call, this waits
at most that duration.
Raises:
ChangeError: if there was an error starting or running the process.
ExecError: if the process exits with a non-zero exit code.
"""
exit_code = self._wait()
if exit_code != 0:
raise ExecError(self._command, exit_code, None, None)
def _wait(self) -> int:
self._waited = True
timeout = self._timeout
if timeout is not None:
# A bit more than the command timeout to ensure that happens first
timeout += 1
change = self._client.wait_change(self._change_id, timeout=timeout)
# If stdin reader thread is running, stop it
if self._cancel_stdin is not None:
self._cancel_stdin()
# Wait for all threads to finish (e.g., message barrier sent)
for thread in self._threads:
thread.join()
# If we opened a cancel_reader pipe, close the read side now (write
# side was already closed by _cancel_stdin().
if self._cancel_reader is not None:
os.close(self._cancel_reader)
# Close websockets (shutdown doesn't send CLOSE message or wait for response).
self._control_ws.shutdown()
self._stdio_ws.shutdown()
if self._stderr_ws is not None:
self._stderr_ws.shutdown()
if change.err:
raise ChangeError(change.err, change)
exit_code = -1
if change.tasks:
exit_code = change.tasks[0].data.get('exit-code', -1)
return exit_code
[docs] def wait_output(self) -> Tuple[AnyStr, Optional[AnyStr]]:
"""Wait for the process to finish and return tuple of (stdout, stderr).
If a timeout was specified to the :meth:`Client.exec` call, this waits
at most that duration. If combine_stderr was True, stdout will include
the process's standard error, and stderr will be None.
Raises:
ChangeError: if there was an error starting or running the process.
ExecError: if the process exits with a non-zero exit code.
TypeError: if :meth:`Client.exec` was called with the ``stdout`` argument.
"""
if self.stdout is None:
raise TypeError(
"can't use wait_output() when exec was called with the stdout argument; "
"use wait() instead"
)
if self._encoding is not None:
out = io.StringIO()
err = io.StringIO() if self.stderr is not None else None
else:
out = io.BytesIO()
err = io.BytesIO() if self.stderr is not None else None
t = _start_thread(shutil.copyfileobj, self.stdout, out)
self._threads.append(t)
if self.stderr is not None:
t = _start_thread(shutil.copyfileobj, self.stderr, err)
self._threads.append(t)
exit_code: int = self._wait()
out_value = typing.cast(AnyStr, out.getvalue())
err_value = typing.cast(AnyStr, err.getvalue()) if err is not None else None
if exit_code != 0:
raise ExecError[AnyStr](self._command, exit_code, out_value, err_value)
return (out_value, err_value)
[docs] def send_signal(self, sig: Union[int, str]):
"""Send the given signal to the running process.
Args:
sig: Name or number of signal to send, e.g., "SIGHUP", 1, or
signal.SIGHUP.
"""
if isinstance(sig, int):
sig = signal.Signals(sig).name
payload = {
'command': 'signal',
'signal': {'name': sig},
}
msg = json.dumps(payload, sort_keys=True)
self._control_ws.send(msg)
def _has_fileno(f: Any) -> bool:
"""Return True if the file-like object has a valid fileno() method."""
try:
f.fileno()
return True
except Exception:
# Some types define a fileno method that raises io.UnsupportedOperation,
# but just catching all exceptions here won't hurt.
return False
def _reader_to_websocket(reader: '_WebsocketReader',
ws: '_WebSocket',
encoding: str,
cancel_reader: Optional[int] = None,
bufsize: int = 16 * 1024):
"""Read reader through to EOF and send each chunk read to the websocket."""
while True:
if cancel_reader is not None:
# Wait for either a read to be ready or the caller to cancel stdin
result = select.select([cancel_reader, reader], [], [])
if cancel_reader in result[0]:
break
chunk = reader.read(bufsize)
if not chunk:
break
if isinstance(chunk, str):
chunk = chunk.encode(encoding)
ws.send_binary(chunk)
ws.send('{"command":"end"}') # type: ignore # Send "end" command as TEXT frame to signal EOF
def _websocket_to_writer(ws: '_WebSocket', writer: '_WebsocketWriter',
encoding: Optional[str]):
"""Receive messages from websocket (until end signal) and write to writer."""
while True:
chunk = ws.recv()
if isinstance(chunk, str):
try:
payload = json.loads(chunk)
except ValueError:
# Garbage sent, try to keep going
logger.warning('Cannot decode I/O command (invalid JSON)')
continue
command = payload.get('command')
if command != 'end':
# A command we don't recognize, keep going
logger.warning(f'Invalid I/O command {command!r}')
continue
# Received "end" command (EOF signal), stop thread
break
if encoding is not None:
chunk = typing.cast(bytes, chunk).decode(encoding)
writer.write(chunk)
class _WebsocketWriter(io.BufferedIOBase):
"""A writable file-like object that sends what's written to it to a websocket."""
def __init__(self, ws: '_WebSocket'):
self.ws = ws
def writable(self):
"""Denote this file-like object as writable."""
return True
def write(self, chunk: Union[str, bytes]) -> int:
"""Write chunk to the websocket."""
if not isinstance(chunk, bytes):
raise TypeError(f'value to write must be bytes, not {type(chunk).__name__}')
self.ws.send_binary(chunk)
return len(chunk)
def close(self):
"""Send end-of-file message to websocket."""
self.ws.send('{"command":"end"}')
class _WebsocketReader(io.BufferedIOBase):
"""A readable file-like object whose reads come from a websocket."""
def __init__(self, ws: '_WebSocket'):
self.ws = ws
self.remaining = b''
self.eof = False
def readable(self) -> bool:
"""Denote this file-like object as readable."""
return True
def read(self, n: int = -1) -> Union[str, bytes]:
"""Read up to n bytes from the websocket (or one message if n<0)."""
if self.eof:
# Calling read() multiple times after EOF should still return EOF
return b''
while not self.remaining:
chunk = self.ws.recv()
if isinstance(chunk, str):
try:
payload = json.loads(chunk)
except ValueError:
# Garbage sent, try to keep going
logger.warning('Cannot decode I/O command (invalid JSON)')
continue
command = payload.get('command')
if command != 'end':
# A command we don't recognize, keep going
logger.warning(f'Invalid I/O command {command!r}')
continue
# Received "end" command, return EOF designator
self.eof = True
return b''
self.remaining = chunk
if n < 0:
n = len(self.remaining)
result: Union[str, bytes] = self.remaining[:n]
self.remaining = self.remaining[n:]
return result
def read1(self, n: int = -1) -> Union[str, bytes]:
"""An alias for read."""
return self.read(n)
[docs]class Client:
"""Pebble API client.
Defaults to using a Unix socket at socket_path (which must be specified
unless a custom opener is provided).
For methods that wait for changes, such as :meth:`start_services` and :meth:`replan_services`,
if the change fails or times out, then a :class:`ChangeError` or :class:`TimeoutError` will be
raised.
All methods may raise exceptions when there are problems communicating with Pebble. Problems
connecting to or transferring data with Pebble will raise a :class:`ConnectionError`. When an
error occurs executing the request, such as trying to add an invalid layer or execute a command
that does not exist, an :class:`APIError` is raised.
The ``timeout`` parameter specifies a timeout in seconds for blocking operations like the
connection attempt to Pebble; used by ``urllib.request.OpenerDirector.open``. It's not for
methods like :meth:`start_services` and :meth:`replan_services` mentioned above, and it's not
for the command execution timeout defined in method :meth:`Client.exec`.
"""
_chunk_size = 8192
def __init__(self, socket_path: str,
opener: Optional[urllib.request.OpenerDirector] = None,
base_url: str = 'http://localhost',
timeout: float = 5.0):
if not isinstance(socket_path, str):
raise TypeError(f'`socket_path` should be a string, not: {type(socket_path)}')
if opener is None:
opener = self._get_default_opener(socket_path)
self.socket_path = socket_path
self.opener = opener
self.base_url = base_url
self.timeout = timeout
@classmethod
def _get_default_opener(cls, socket_path: str) -> urllib.request.OpenerDirector:
"""Build the default opener to use for requests (HTTP over Unix socket)."""
opener = urllib.request.OpenerDirector()
opener.add_handler(_UnixSocketHandler(socket_path))
opener.add_handler(urllib.request.HTTPDefaultErrorHandler())
opener.add_handler(urllib.request.HTTPRedirectHandler())
opener.add_handler(urllib.request.HTTPErrorProcessor())
return opener
# we need to cast the return type depending on the request params
def _request(self,
method: str,
path: str,
query: Optional[Dict[str, Any]] = None,
body: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Make a JSON request to the Pebble server with the given HTTP method and path.
If query dict is provided, it is encoded and appended as a query string
to the URL. If body dict is provided, it is serialied as JSON and used
as the HTTP body (with Content-Type: "application/json"). The resulting
body is decoded from JSON.
"""
headers = {'Accept': 'application/json'}
data = None
if body is not None:
data = json.dumps(body).encode('utf-8')
headers['Content-Type'] = 'application/json'
response = self._request_raw(method, path, query, headers, data)
self._ensure_content_type(response.headers, 'application/json')
raw_resp: Dict[str, Any] = json.loads(response.read())
return raw_resp
@staticmethod
def _ensure_content_type(headers: email.message.Message,
expected: 'Literal["multipart/form-data", "application/json"]'):
"""Parse Content-Type header from headers and ensure it's equal to expected.
Return a dict of any options in the header, e.g., {'boundary': ...}.
"""
ctype = headers.get_content_type()
params = headers.get_params() or {}
options = {key: value for key, value in params if value}
if ctype != expected:
raise ProtocolError(f'expected Content-Type {expected!r}, got {ctype!r}')
return options
def _request_raw(
self, method: str, path: str,
query: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
data: Optional[Union[bytes, Generator[bytes, Any, Any]]] = None,
) -> http.client.HTTPResponse:
"""Make a request to the Pebble server; return the raw HTTPResponse object."""
url = self.base_url + path
if query:
url = f"{url}?{urllib.parse.urlencode(query, doseq=True)}"
if headers is None:
headers = {}
request = urllib.request.Request(url, method=method, data=data, headers=headers) # noqa: S310
try:
response = self.opener.open(request, timeout=self.timeout)
except urllib.error.HTTPError as e:
code = e.code
status = e.reason
try:
body: Dict[str, Any] = json.loads(e.read())
message: str = body['result']['message']
except (OSError, ValueError, KeyError) as e2:
# Will only happen on read error or if Pebble sends invalid JSON.
body: Dict[str, Any] = {}
message = f'{type(e2).__name__} - {e2}'
raise APIError(body, code, status, message) from None
except urllib.error.URLError as e:
if e.args and isinstance(e.args[0], FileNotFoundError):
raise ConnectionError(
f"Could not connect to Pebble: socket not found at {self.socket_path!r} "
"(container restarted?)") from None
raise ConnectionError(e.reason) from e
return response
[docs] def get_system_info(self) -> SystemInfo:
"""Get system info."""
resp = self._request('GET', '/v1/system-info')
return SystemInfo.from_dict(resp['result'])
[docs] def get_warnings(self, select: WarningState = WarningState.PENDING) -> List[Warning]:
"""Get list of warnings in given state (pending or all)."""
query = {'select': select.value}
resp = self._request('GET', '/v1/warnings', query)
return [Warning.from_dict(w) for w in resp['result']]
[docs] def ack_warnings(self, timestamp: datetime.datetime) -> int:
"""Acknowledge warnings up to given timestamp, return number acknowledged."""
body = {'action': 'okay', 'timestamp': timestamp.isoformat()}
resp = self._request('POST', '/v1/warnings', body=body)
return resp['result']
[docs] def get_changes(
self, select: ChangeState = ChangeState.IN_PROGRESS, service: Optional[str] = None,
) -> List[Change]:
"""Get list of changes in given state, filter by service name if given."""
query: Dict[str, Union[str, int]] = {'select': select.value}
if service is not None:
query['for'] = service
resp = self._request('GET', '/v1/changes', query)
return [Change.from_dict(c) for c in resp['result']]
[docs] def get_change(self, change_id: ChangeID) -> Change:
"""Get single change by ID."""
resp = self._request('GET', f'/v1/changes/{change_id}')
return Change.from_dict(resp['result'])
[docs] def abort_change(self, change_id: ChangeID) -> Change:
"""Abort change with given ID."""
body = {'action': 'abort'}
resp = self._request('POST', f'/v1/changes/{change_id}', body=body)
return Change.from_dict(resp['result'])
[docs] def autostart_services(self, timeout: float = 30.0, delay: float = 0.1) -> ChangeID:
"""Start the startup-enabled services and wait (poll) for them to be started.
Args:
timeout: Seconds before autostart change is considered timed out (float). If
timeout is 0, submit the action but don't wait; just return the change ID
immediately.
delay: Seconds before executing the autostart change (float).
Returns:
ChangeID of the autostart change.
Raises:
ChangeError: if one or more of the services didn't start, and ``timeout`` is non-zero.
"""
return self._services_action('autostart', [], timeout, delay)
[docs] def replan_services(self, timeout: float = 30.0, delay: float = 0.1) -> ChangeID:
"""Replan by (re)starting changed and startup-enabled services and wait for them to start.
Args:
timeout: Seconds before replan change is considered timed out (float). If
timeout is 0, submit the action but don't wait; just return the change
ID immediately.
delay: Seconds before executing the replan change (float).
Returns:
ChangeID of the replan change.
Raises:
ChangeError: if one or more of the services didn't stop/start, and ``timeout`` is
non-zero.
"""
return self._services_action('replan', [], timeout, delay)
[docs] def start_services(
self, services: Iterable[str], timeout: float = 30.0, delay: float = 0.1,
) -> ChangeID:
"""Start services by name and wait (poll) for them to be started.
Args:
services: Non-empty list of services to start.
timeout: Seconds before start change is considered timed out (float). If
timeout is 0, submit the action but don't wait; just return the change
ID immediately.
delay: Seconds before executing the start change (float).
Returns:
ChangeID of the start change.
Raises:
ChangeError: if one or more of the services didn't stop/start, and ``timeout`` is
non-zero.
"""
return self._services_action('start', services, timeout, delay)
[docs] def stop_services(
self, services: Iterable[str], timeout: float = 30.0, delay: float = 0.1,
) -> ChangeID:
"""Stop services by name and wait (poll) for them to be started.
Args:
services: Non-empty list of services to stop.
timeout: Seconds before stop change is considered timed out (float). If
timeout is 0, submit the action but don't wait; just return the change
ID immediately.
delay: Seconds before executing the stop change (float).
Returns:
ChangeID of the stop change.
Raises:
ChangeError: if one or more of the services didn't stop/start and ``timeout`` is
non-zero.
"""
return self._services_action('stop', services, timeout, delay)
[docs] def restart_services(
self, services: Iterable[str], timeout: float = 30.0, delay: float = 0.1,
) -> ChangeID:
"""Restart services by name and wait (poll) for them to be started.
Args:
services: Non-empty list of services to restart.
timeout: Seconds before restart change is considered timed out (float). If
timeout is 0, submit the action but don't wait; just return the change
ID immediately.
delay: Seconds before executing the restart change (float).
Returns:
ChangeID of the restart change.
Raises:
ChangeError: if one or more of the services didn't stop/start and ``timeout`` is
non-zero.
"""
return self._services_action('restart', services, timeout, delay)
def _services_action(
self, action: str, services: Iterable[str], timeout: Optional[float],
delay: float,
) -> ChangeID:
if isinstance(services, (str, bytes)) or not hasattr(services, '__iter__'):
raise TypeError(
f'services must be of type Iterable[str], not {type(services).__name__}')
services = list(services)
for s in services:
if not isinstance(s, str):
raise TypeError(f'service names must be str, not {type(s).__name__}')
body = {'action': action, 'services': services}
resp = self._request('POST', '/v1/services', body=body)
change_id = ChangeID(resp['change'])
if timeout:
change = self.wait_change(change_id, timeout=timeout, delay=delay)
if change.err:
raise ChangeError(change.err, change)
return change_id
[docs] def wait_change(
self, change_id: ChangeID,
timeout: Optional[float] = 30.0,
delay: float = 0.1,
) -> Change:
"""Wait for the given change to be ready.
If the Pebble server supports the /v1/changes/{id}/wait API endpoint,
use that to avoid polling, otherwise poll /v1/changes/{id} every delay
seconds.
Args:
change_id: Change ID of change to wait for.
timeout: Maximum time in seconds to wait for the change to be
ready. It may be None, in which case wait_change never times out.
delay: If polling, this is the delay in seconds between attempts.
Returns:
The Change object being waited on.
Raises:
TimeoutError: If the maximum timeout is reached.
"""
try:
return self._wait_change_using_wait(change_id, timeout)
except NotImplementedError:
# Pebble server doesn't support wait endpoint, fall back to polling
return self._wait_change_using_polling(change_id, timeout, delay)
def _wait_change_using_wait(self, change_id: ChangeID, timeout: Optional[float]):
"""Wait for a change to be ready using the wait-change API."""
deadline = time.time() + timeout if timeout is not None else 0
# Hit the wait endpoint every Client.timeout-1 seconds to avoid long
# requests (the -1 is to ensure it wakes up before the socket timeout)
while True:
this_timeout = max(self.timeout - 1, 1) # minimum of 1 second
if timeout is not None:
time_remaining = deadline - time.time()
if time_remaining <= 0:
break
# Wait the lesser of the time remaining and Client.timeout-1
this_timeout = min(time_remaining, this_timeout)
try:
return self._wait_change(change_id, this_timeout)
except (socket.timeout, builtins.TimeoutError):
# NOTE: in Python 3.10 socket.timeout is an alias of TimeoutError,
# but we still need to support Python 3.8, so catch both.
# Catch timeout from wait endpoint and loop to check deadline.
pass
raise TimeoutError(f'timed out waiting for change {change_id} ({timeout} seconds)')
def _wait_change(self, change_id: ChangeID, timeout: Optional[float] = None) -> Change:
"""Call the wait-change API endpoint directly."""
query: Dict[str, Any] = {}
if timeout is not None:
query['timeout'] = _format_timeout(timeout)
try:
resp = self._request('GET', f'/v1/changes/{change_id}/wait', query)
except APIError as e:
if e.code == 404:
raise NotImplementedError(
'server does not implement wait-change endpoint') from None
if e.code == 504:
raise TimeoutError(
f'timed out waiting for change {change_id} ({timeout} seconds)') from None
raise
return Change.from_dict(resp['result'])
def _wait_change_using_polling(self, change_id: ChangeID, timeout: Optional[float],
delay: float):
"""Wait for a change to be ready by polling the get-change API."""
deadline = time.time() + timeout if timeout is not None else 0
while timeout is None or time.time() < deadline:
change = self.get_change(change_id)
if change.ready:
return change
time.sleep(delay)
raise TimeoutError(f'timed out waiting for change {change_id} ({timeout} seconds)')
[docs] def add_layer(
self, label: str, layer: Union[str, 'LayerDict', Layer], *,
combine: bool = False):
"""Dynamically add a new layer onto the Pebble configuration layers.
If combine is False (the default), append the new layer as the top
layer with the given label. If combine is True and the label already
exists, the two layers are combined into a single one considering the
layer override rules; if the layer doesn't exist, it is added as usual.
"""
if not isinstance(label, str):
raise TypeError(f'label must be a str, not {type(label).__name__}')
if isinstance(layer, str):
layer_yaml = layer
elif isinstance(layer, dict):
layer_yaml = Layer(layer).to_yaml()
elif isinstance(layer, Layer):
layer_yaml = layer.to_yaml()
else:
raise TypeError(
f'layer must be str, dict, or pebble.Layer, not {type(layer).__name__}')
body = {
'action': 'add',
'combine': combine,
'label': label,
'format': 'yaml',
'layer': layer_yaml,
}
self._request('POST', '/v1/layers', body=body)
[docs] def get_plan(self) -> Plan:
"""Get the Pebble plan (contains combined layer configuration)."""
resp = self._request('GET', '/v1/plan', {'format': 'yaml'})
return Plan(resp['result'])
[docs] def get_services(self, names: Optional[Iterable[str]] = None) -> List[ServiceInfo]:
"""Get the service status for the configured services.
If names is specified, only fetch the service status for the services
named.
"""
query = None
if names is not None:
query = {'names': ','.join(names)}
resp = self._request('GET', '/v1/services', query)
return [ServiceInfo.from_dict(info) for info in resp['result']]
@typing.overload
def pull(self, path: str, *, encoding: None) -> BinaryIO:
...
@typing.overload
def pull(self, path: str, *, encoding: str = 'utf-8') -> TextIO:
...
[docs] def pull(self,
path: str,
*,
encoding: Optional[str] = 'utf-8') -> Union[BinaryIO, TextIO]:
"""Read a file's content from the remote system.
Args:
path: Path of the file to read from the remote system.
encoding: Encoding to use for decoding the file's bytes to str,
or None to specify no decoding.
Returns:
A readable file-like object, whose read() method will return str
objects decoded according to the specified encoding, or bytes if
encoding is None.
Raises:
PathError: If there was an error reading the file at path, for
example, if the file doesn't exist or is a directory.
"""
query = {
'action': 'read',
'path': path,
}
headers = {'Accept': 'multipart/form-data'}
response = self._request_raw('GET', '/v1/files', query, headers)
options = self._ensure_content_type(response.headers, 'multipart/form-data')
boundary = options.get('boundary', '')
if not boundary:
raise ProtocolError(f'invalid boundary {boundary!r}')
parser = _FilesParser(boundary)
while True:
chunk = response.read(self._chunk_size)
if not chunk:
break
parser.feed(chunk)
resp = parser.get_response()
if resp is None:
raise ProtocolError('no "response" field in multipart body')
self._raise_on_path_error(resp, path)
filenames = parser.filenames()
if not filenames:
raise ProtocolError('no file content in multipart response')
elif len(filenames) > 1:
raise ProtocolError('single file request resulted in a multi-file response')
filename = filenames[0]
if filename != path:
raise ProtocolError(f'path not expected: {filename!r}')
f = parser.get_file(path, encoding)
parser.remove_files()
return f
@staticmethod
def _raise_on_path_error(resp: '_FilesResponse', path: str):
result = resp['result'] or [] # in case it's null instead of []
paths = {item['path']: item for item in result}
if path not in paths:
raise ProtocolError(f'path not found in response metadata: {resp}')
error = paths[path].get('error')
if error:
raise PathError(error['kind'], error['message'])
[docs] def push(
self, path: str, source: '_IOSource', *,
encoding: str = 'utf-8', make_dirs: bool = False,
permissions: Optional[int] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None):
"""Write content to a given file path on the remote system.
Args:
path: Path of the file to write to on the remote system.
source: Source of data to write. This is either a concrete str or
bytes instance, or a readable file-like object.
encoding: Encoding to use for encoding source str to bytes, or
strings read from source if it is a TextIO type. Ignored if
source is bytes or BinaryIO.
make_dirs: If True, create parent directories if they don't exist.
permissions: Permissions (mode) to create file with (Pebble default
is 0o644).
user_id: User ID (UID) for file.
user: Username for file. User's UID must match user_id if both are
specified.
group_id: Group ID (GID) for file.
group: Group name for file. Group's GID must match group_id if
both are specified.
Raises:
PathError: If there was an error writing the file to the path; for example, if the
destination path doesn't exist and ``make_dirs`` is not used.
"""
info = self._make_auth_dict(permissions, user_id, user, group_id, group)
info['path'] = path
if make_dirs:
info['make-dirs'] = True
metadata = {
'action': 'write',
'files': [info],
}
data, content_type = self._encode_multipart(metadata, path, source, encoding)
headers = {
'Accept': 'application/json',
'Content-Type': content_type,
}
response = self._request_raw('POST', '/v1/files', None, headers, data)
self._ensure_content_type(response.headers, 'application/json')
resp = json.loads(response.read())
# we need to cast the Dict[Any, Any] to _FilesResponse
self._raise_on_path_error(typing.cast('_FilesResponse', resp), path)
@staticmethod
def _make_auth_dict(permissions: Optional[int],
user_id: Optional[int],
user: Optional[str],
group_id: Optional[int],
group: Optional[str]) -> '_AuthDict':
d: _AuthDict = {}
if permissions is not None:
d['permissions'] = format(permissions, '03o')
if user_id is not None:
d['user-id'] = user_id
if user is not None:
d['user'] = user
if group_id is not None:
d['group-id'] = group_id
if group is not None:
d['group'] = group
return d
def _encode_multipart(self, metadata: Dict[str, Any], path: str,
source: '_IOSource', encoding: str):
# Python's stdlib mime/multipart handling is screwy and doesn't handle
# binary properly, so roll our own.
if isinstance(source, str):
source_io: _AnyStrFileLikeIO = io.StringIO(source)
elif isinstance(source, bytes):
source_io: _AnyStrFileLikeIO = io.BytesIO(source)
else:
source_io: _AnyStrFileLikeIO = source # type: ignore
boundary = binascii.hexlify(os.urandom(16))
path_escaped = path.replace('"', '\\"').encode('utf-8')
content_type = f"multipart/form-data; boundary=\"{boundary.decode('utf-8')}\""
def generator() -> Generator[bytes, None, None]:
yield b''.join([
b'--', boundary, b'\r\n',
b'Content-Type: application/json\r\n',
b'Content-Disposition: form-data; name="request"\r\n',
b'\r\n',
json.dumps(metadata).encode('utf-8'), b'\r\n',
b'--', boundary, b'\r\n',
b'Content-Type: application/octet-stream\r\n',
b'Content-Disposition: form-data; name="files"; filename="',
path_escaped, b'"\r\n',
b'\r\n',
])
content: Union[str, bytes] = source_io.read(self._chunk_size)
while content:
if isinstance(content, str):
content = content.encode(encoding)
yield content
content = source_io.read(self._chunk_size)
yield b''.join([
b'\r\n',
b'--', boundary, b'--\r\n',
])
return generator(), content_type
[docs] def list_files(self, path: str, *, pattern: Optional[str] = None,
itself: bool = False) -> List[FileInfo]:
"""Return list of directory entries from given path on remote system.
Despite the name, this method returns a list of files *and*
directories, similar to :func:`os.listdir` or :func:`os.scandir`.
Args:
path: Path of the directory to list, or path of the file to return
information about.
pattern: If specified, filter the list to just the files that match,
for example ``*.txt``.
itself: If path refers to a directory, return information about the
directory itself, rather than its contents.
Raises:
PathError: if there was an error listing the directory; for example, if the directory
does not exist.
"""
query = {
'action': 'list',
'path': path,
}
if pattern:
query['pattern'] = pattern
if itself:
query['itself'] = 'true'
resp = self._request('GET', '/v1/files', query)
result = resp['result'] or [] # in case it's null instead of []
return [FileInfo.from_dict(d) for d in result]
[docs] def make_dir(
self, path: str, *, make_parents: bool = False,
permissions: Optional[int] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None):
"""Create a directory on the remote system with the given attributes.
Args:
path: Path of the directory to create on the remote system.
make_parents: If True, create parent directories if they don't exist.
permissions: Permissions (mode) to create directory with (Pebble
default is 0o755).
user_id: User ID (UID) for directory.
user: Username for directory. User's UID must match user_id if
both are specified.
group_id: Group ID (GID) for directory.
group: Group name for directory. Group's GID must match group_id
if both are specified.
Raises:
PathError: if there was an error making the directory; for example, if the parent path
does not exist, and ``make_parents`` is not used.
"""
info = self._make_auth_dict(permissions, user_id, user, group_id, group)
info['path'] = path
if make_parents:
info['make-parents'] = True
body = {
'action': 'make-dirs',
'dirs': [info],
}
resp = self._request('POST', '/v1/files', None, body)
self._raise_on_path_error(typing.cast('_FilesResponse', resp), path)
[docs] def remove_path(self, path: str, *, recursive: bool = False):
"""Remove a file or directory on the remote system.
Args:
path: Path of the file or directory to delete from the remote system.
recursive: If True, and path is a directory, recursively delete it and
everything under it. If path is a file, delete the file. In
either case, do nothing if the file or directory does not
exist. Behaviourally similar to ``rm -rf <file|dir>``.
Raises:
pebble.PathError: If a relative path is provided, or if `recursive` is False
and the file or directory cannot be removed (it does not exist or is not empty).
"""
info: Dict[str, Any] = {'path': path}
if recursive:
info['recursive'] = True
body = {
'action': 'remove',
'paths': [info],
}
resp = self._request('POST', '/v1/files', None, body)
self._raise_on_path_error(typing.cast('_FilesResponse', resp), path)
# Exec I/O is str if encoding is provided (the default)
@typing.overload
def exec(
self,
command: List[str],
*,
service_context: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
working_dir: Optional[str] = None,
timeout: Optional[float] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None,
stdin: Optional[Union[str, TextIO]] = None,
stdout: Optional[TextIO] = None,
stderr: Optional[TextIO] = None,
encoding: str = 'utf-8',
combine_stderr: bool = False
) -> ExecProcess[str]:
...
# Exec I/O is bytes if encoding is explicitly set to None
@typing.overload
def exec(
self,
command: List[str],
*,
service_context: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
working_dir: Optional[str] = None,
timeout: Optional[float] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None,
stdin: Optional[Union[bytes, BinaryIO]] = None,
stdout: Optional[BinaryIO] = None,
stderr: Optional[BinaryIO] = None,
encoding: None = None,
combine_stderr: bool = False
) -> ExecProcess[bytes]:
...
[docs] def exec(
self,
command: List[str],
*,
service_context: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
working_dir: Optional[str] = None,
timeout: Optional[float] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None,
stdin: Optional[Union[str, bytes, TextIO, BinaryIO]] = None,
stdout: Optional[Union[TextIO, BinaryIO]] = None,
stderr: Optional[Union[TextIO, BinaryIO]] = None,
encoding: Optional[str] = 'utf-8',
combine_stderr: bool = False
) -> ExecProcess[Any]:
r"""Execute the given command on the remote system.
Two method signatures are shown because this method returns an
:class:`ExecProcess` that deals with strings if ``encoding`` is
specified (the default ), or one that deals with bytes if ``encoding``
is set to ``None``.
Most of the parameters are explained in the "Parameters" section
below, however, input/output handling is a bit more complex. Some
examples are shown below::
# Simple command with no output; just check exit code
>>> process = client.exec(['send-emails'])
>>> process.wait()
# Fetch output as string
>>> process = client.exec(['python3', '--version'])
>>> version, _ = process.wait_output()
>>> print(version)
Python 3.8.10
# Fetch both stdout and stderr as strings
>>> process = client.exec(['pg_dump', '-s', ...])
>>> schema, logs = process.wait_output()
# Stream input from a string and write output to files
>>> stdin = 'foo\nbar\n'
>>> with open('out.txt', 'w') as out, open('err.txt', 'w') as err:
... process = client.exec(['awk', '{ print toupper($0) }'],
... stdin=stdin, stdout=out, stderr=err)
... process.wait()
>>> open('out.txt').read()
'FOO\nBAR\n'
>>> open('err.txt').read()
''
# Real-time streaming using ExecProcess.stdin and ExecProcess.stdout
>>> process = client.exec(['cat'])
>>> def stdin_thread():
... for line in ['one\n', '2\n', 'THREE\n']:
... process.stdin.write(line)
... process.stdin.flush()
... time.sleep(1)
... process.stdin.close()
...
>>> threading.Thread(target=stdin_thread).start()
>>> for line in process.stdout:
... print(datetime.datetime.now().strftime('%H:%M:%S'), repr(line))
...
16:20:26 'one\n'
16:20:27 '2\n'
16:20:28 'THREE\n'
>>> process.wait() # will return immediately as stdin was closed
# Show exception raised for non-zero return code
>>> process = client.exec(['ls', 'notexist'])
>>> out, err = process.wait_output()
Traceback (most recent call last):
...
ExecError: "ls" returned exit code 2
>>> exc = sys.last_value
>>> exc.exit_code
2
>>> exc.stdout
''
>>> exc.stderr
"ls: cannot access 'notfound': No such file or directory\n"
Args:
command: Command to execute: the first item is the name (or path)
of the executable, the rest of the items are the arguments.
service_context: If specified, run the command in the context of
this service. Specifically, inherit its environment variables,
user/group settings, and working directory. The other exec
options will override the service context; ``environment``
will be merged on top of the service's.
environment: Environment variables to pass to the process.
working_dir: Working directory to run the command in. If not set,
Pebble uses the target user's $HOME directory (and if the user
argument is not set, $HOME of the user Pebble is running as).
timeout: Timeout in seconds for the command execution, after which
the process will be terminated. If not specified, the
execution never times out.
user_id: User ID (UID) to run the process as.
user: Username to run the process as. User's UID must match
user_id if both are specified.
group_id: Group ID (GID) to run the process as.
group: Group name to run the process as. Group's GID must match
group_id if both are specified.
stdin: A string or readable file-like object that is sent to the
process's standard input. If not set, the caller can write
input to :attr:`ExecProcess.stdin` to stream input to the
process.
stdout: A writable file-like object that the process's standard
output is written to. If not set, the caller can use
:meth:`ExecProcess.wait_output` to capture output as a string,
or read from :meth:`ExecProcess.stdout` to stream output from
the process.
stderr: A writable file-like object that the process's standard
error is written to. If not set, the caller can use
:meth:`ExecProcess.wait_output` to capture error output as a
string, or read from :meth:`ExecProcess.stderr` to stream
error output from the process. Must be None if combine_stderr
is True.
encoding: If encoding is set (the default is UTF-8), the types
read or written to stdin/stdout/stderr are str, and encoding
is used to encode them to bytes. If encoding is None, the
types read or written are raw bytes.
combine_stderr: If True, process's stderr output is combined into
its stdout (the stderr argument must be None). If False,
separate streams are used for stdout and stderr.
Returns:
A Process object representing the state of the running process.
To wait for the command to finish, the caller will typically call
:meth:`ExecProcess.wait` if stdout/stderr were provided as
arguments to :meth:`exec`, or :meth:`ExecProcess.wait_output` if
not.
Raises:
APIError: if an error occurred communicating with pebble, or if the command is not
found.
ExecError: if the command exits with a non-zero exit code.
"""
if not isinstance(command, list) or not all(isinstance(s, str) for s in command):
raise TypeError(f'command must be a list of str, not {type(command).__name__}')
if len(command) < 1:
raise ValueError('command must contain at least one item')
if stdin is not None:
if isinstance(stdin, str):
if encoding is None:
raise ValueError('encoding must be set if stdin is str')
stdin = io.BytesIO(stdin.encode(encoding))
elif isinstance(stdin, bytes):
if encoding is not None:
raise ValueError('encoding must be None if stdin is bytes')
stdin = io.BytesIO(stdin)
elif not hasattr(stdin, 'read'):
raise TypeError('stdin must be str, bytes, or a readable file-like object')
if combine_stderr and stderr is not None:
raise ValueError('stderr must be None if combine_stderr is True')
body = {
'command': command,
'service-context': service_context,
'environment': environment or {},
'working-dir': working_dir,
'timeout': _format_timeout(timeout) if timeout is not None else None,
'user-id': user_id,
'user': user,
'group-id': group_id,
'group': group,
'split-stderr': not combine_stderr,
}
resp = self._request('POST', '/v1/exec', body=body)
change_id = resp['change']
task_id = resp['result']['task-id']
stderr_ws: Optional['_WebSocket'] = None
try:
control_ws = self._connect_websocket(task_id, 'control')
stdio_ws = self._connect_websocket(task_id, 'stdio')
if not combine_stderr:
stderr_ws = self._connect_websocket(task_id, 'stderr')
except websocket.WebSocketException as e: # type: ignore
# Error connecting to websockets, probably due to the exec/change
# finishing early with an error. Call wait_change to pick that up.
change = self.wait_change(ChangeID(change_id))
if change.err:
raise ChangeError(change.err, change) from e
raise ConnectionError(f'unexpected error connecting to websockets: {e}') from e
cancel_stdin: Optional[Callable[[], None]] = None
cancel_reader: Optional[int] = None
threads: List[threading.Thread] = []
if stdin is not None:
if _has_fileno(stdin):
# Create a pipe so _reader_to_websocket can select() on the
# reader as well as this cancel_reader; when we write anything
# to cancel_writer it'll trigger the select and end the thread.
cancel_reader, cancel_writer = os.pipe()
def _cancel_stdin():
os.write(cancel_writer, b'x') # doesn't matter what we write
os.close(cancel_writer)
cancel_stdin = _cancel_stdin
t = _start_thread(_reader_to_websocket, stdin, stdio_ws, encoding, cancel_reader)
threads.append(t)
process_stdin = None
else:
process_stdin = _WebsocketWriter(stdio_ws)
if encoding is not None:
process_stdin = io.TextIOWrapper(
process_stdin, encoding=encoding, newline='') # type: ignore
if stdout is not None:
t = _start_thread(_websocket_to_writer, stdio_ws, stdout, encoding)
threads.append(t)
process_stdout = None
else:
process_stdout = _WebsocketReader(stdio_ws)
if encoding is not None:
process_stdout = io.TextIOWrapper(
process_stdout, encoding=encoding, newline='') # type: ignore
process_stderr = None
if not combine_stderr:
if stderr is not None:
t = _start_thread(_websocket_to_writer, stderr_ws, stderr, encoding)
threads.append(t)
else:
ws = typing.cast('_WebSocket', stderr_ws)
process_stderr = _WebsocketReader(ws)
if encoding is not None:
process_stderr = io.TextIOWrapper(
process_stderr, encoding=encoding, newline='') # type: ignore
process: ExecProcess[Any] = ExecProcess(
stdin=process_stdin, # type: ignore
stdout=process_stdout, # type: ignore
stderr=process_stderr, # type: ignore
client=self,
timeout=timeout,
stdio_ws=stdio_ws,
stderr_ws=stderr_ws,
control_ws=control_ws,
command=command,
encoding=encoding,
change_id=ChangeID(change_id),
cancel_stdin=cancel_stdin,
cancel_reader=cancel_reader,
threads=threads,
)
return process
def _connect_websocket(self, task_id: str, websocket_id: str) -> '_WebSocket':
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self.socket_path)
url = self._websocket_url(task_id, websocket_id)
ws: '_WebSocket' = websocket.WebSocket(skip_utf8_validation=True) # type: ignore
ws.connect(url, socket=sock)
return ws
def _websocket_url(self, task_id: str, websocket_id: str) -> str:
base_url = self.base_url.replace('http://', 'ws://')
url = f'{base_url}/v1/tasks/{task_id}/websocket/{websocket_id}'
return url
[docs] def send_signal(self, sig: Union[int, str], services: Iterable[str]):
"""Send the given signal to the list of services named.
Args:
sig: Name or number of signal to send, for example ``"SIGHUP"``, ``1``, or
``signal.SIGHUP``.
services: Non-empty list of service names to send the signal to.
Raises:
APIError: If any of the services are not in the plan or are not
currently running.
"""
if isinstance(services, (str, bytes)) or not hasattr(services, '__iter__'):
raise TypeError('services must be of type Iterable[str], '
f'not {type(services).__name__}')
for s in services:
if not isinstance(s, str):
raise TypeError(f'service names must be str, not {type(s).__name__}')
if isinstance(sig, int):
sig = signal.Signals(sig).name
body = {
'signal': sig,
'services': services,
}
self._request('POST', '/v1/signals', body=body)
[docs] def get_checks(
self,
level: Optional[CheckLevel] = None,
names: Optional[Iterable[str]] = None
) -> List[CheckInfo]:
"""Get the check status for the configured checks.
Args:
level: Optional check level to query for (default is to fetch
checks with any level).
names: Optional list of check names to query for (default is to
fetch all checks).
Returns:
List of :class:`CheckInfo` objects.
"""
query: Dict[str, Any] = {}
if level is not None:
query['level'] = level.value
if names:
query['names'] = list(names)
resp = self._request('GET', '/v1/checks', query)
return [CheckInfo.from_dict(info) for info in resp['result']]
[docs] def notify(self, type: NoticeType, key: str, *,
data: Optional[Dict[str, str]] = None,
repeat_after: Optional[datetime.timedelta] = None) -> str:
"""Record an occurrence of a notice with the specified options.
Args:
type: Notice type (currently only "custom" notices are supported).
key: Notice key; must be in "example.com/path" format.
data: Data fields for this notice.
repeat_after: Only allow this notice to repeat after this duration
has elapsed (the default is to always repeat).
Returns:
The notice's ID.
"""
body: Dict[str, Any] = {
'action': 'add',
'type': type.value,
'key': key,
}
if data is not None:
body['data'] = data
if repeat_after is not None:
body['repeat-after'] = _format_timeout(repeat_after.total_seconds())
resp = self._request('POST', '/v1/notices', body=body)
return resp['result']['id']
[docs] def get_notice(self, id: str) -> Notice:
"""Get details about a single notice by ID.
Raises:
APIError: if a notice with the given ID is not found (``code`` 404)
"""
resp = self._request('GET', f'/v1/notices/{id}')
return Notice.from_dict(resp['result'])
[docs] def get_notices(
self,
*,
users: Optional[NoticesUsers] = None,
user_id: Optional[int] = None,
types: Optional[Iterable[Union[NoticeType, str]]] = None,
keys: Optional[Iterable[str]] = None,
) -> List[Notice]:
"""Query for notices that match all of the provided filters.
Pebble returns notices that match all of the filters, for example, if
called with ``types=[NoticeType.CUSTOM], keys=["example.com/a"]``,
Pebble will only return custom notices that also have key "example.com/a".
If no filters are specified, return notices viewable by the requesting
user (notices whose ``user_id`` matches the requester UID as well as
public notices).
Note that the "after" filter is not yet implemented, as it's not
needed right now and it's hard to implement correctly with Python's
datetime type, which only has microsecond precision (and Go's Time
type has nanosecond precision).
Args:
users: Change which users' notices to return (instead of returning
notices for the current user).
user_id: Filter for notices for the specified user, including
public notices (only works for Pebble admins).
types: Filter for notices with any of the specified types.
keys: Filter for notices with any of the specified keys.
"""
query: Dict[str, Union[str, List[str]]] = {}
if users is not None:
query['users'] = users.value
if user_id is not None:
query['user-id'] = str(user_id)
if types is not None:
query['types'] = [(t.value if isinstance(t, NoticeType) else t) for t in types]
if keys is not None:
query['keys'] = list(keys)
resp = self._request('GET', '/v1/notices', query)
return [Notice.from_dict(info) for info in resp['result']]
class _FilesParser:
"""A limited purpose multi-part parser backed by files for memory efficiency."""
def __init__(self, boundary: Union[bytes, str]):
self._response: Optional[_FilesResponse] = None # externally managed
self._part_type: Optional[Literal["response", "files"]] = None # externally managed
self._headers: Optional[email.message.Message] = None # externally managed
self._files: Dict[str, _Tempfile] = {}
# Prepare the MIME multipart boundary line patterns.
if isinstance(boundary, str):
boundary = boundary.encode()
# State vars, as we may enter the feed() function multiple times.
self._response_data = bytearray()
self._max_lookahead = 8 * 1024 * 1024
self._parser = _MultipartParser(
boundary,
self._process_header,
self._process_body,
max_lookahead=self._max_lookahead)
# RFC 2046 says that the boundary string needs to be preceded by a CRLF.
# Unfortunately, the request library's header parsing logic strips off one of
# these, so we'll prime the parser buffer with that missing sequence.
self._parser.feed(b'\r\n')
def _process_header(self, data: bytes):
parser = email.parser.BytesFeedParser()
parser.feed(data)
self._headers = parser.close()
content_disposition = self._headers.get_content_disposition()
if content_disposition != 'form-data':
raise ProtocolError(
f'unexpected content disposition: {content_disposition!r}')
name = self._headers.get_param('name', header='content-disposition')
if name == 'files':
filename = self._headers.get_filename()
if filename is None:
raise ProtocolError('multipart "files" part missing filename')
self._prepare_tempfile(filename)
elif name != 'response':
raise ProtocolError(
f'unexpected name in content-disposition header: {name!r}')
self._part_type = typing.cast('Literal["response", "files"]', name)
def _process_body(self, data: bytes, done: bool = False):
if self._part_type == 'response':
self._response_data.extend(data)
if done:
if len(self._response_data) > self._max_lookahead:
raise ProtocolError('response end marker not found')
resp = json.loads(self._response_data.decode())
self._response = typing.cast('_FilesResponse', resp)
self._response_data = bytearray()
elif self._part_type == 'files':
if done:
# This is the final write.
outfile = self._get_open_tempfile()
outfile.write(data)
outfile.close()
self._headers = None
else:
# Not the end of file data yet. Don't open/close file for intermediate writes
outfile = self._get_open_tempfile()
outfile.write(data)
def remove_files(self):
"""Remove all temporary files on disk."""
for file in self._files.values():
os.unlink(file.name)
self._files.clear()
def feed(self, data: bytes):
"""Provide more data to the running parser."""
self._parser.feed(data)
def _prepare_tempfile(self, filename: str):
tf = tempfile.NamedTemporaryFile(delete=False)
self._files[filename] = tf # type: ignore # we have a custom protocol for it
self.current_filename = filename
def _get_open_tempfile(self):
return self._files[self.current_filename]
def get_response(self) -> Optional['_FilesResponse']:
"""Return the deserialized JSON object from the multipart "response" field."""
return self._response
def filenames(self):
"""Return a list of filenames from the "files" parts of the response."""
return list(self._files.keys())
def get_file(self, path: str, encoding: Optional[str]) -> '_TextOrBinaryIO':
"""Return an open file object containing the data."""
mode = 'r' if encoding else 'rb'
# We're using text-based file I/O purely for file encoding purposes, not for
# newline normalization. newline='' serves the line endings as-is.
newline = '' if encoding else None
file_io = open(self._files[path].name, mode, # noqa: SIM115
encoding=encoding, newline=newline)
# open() returns IO[Any]
return typing.cast('_TextOrBinaryIO', file_io)
class _MultipartParser:
def __init__(
self,
marker: bytes,
handle_header: '_HeaderHandler',
handle_body: '_BodyHandler',
max_lookahead: int = 0,
max_boundary_length: int = 0):
r"""Configures a parser for mime multipart messages.
Args:
marker: the multipart boundary marker (i.e. in "\r\n--<marker>--\r\n")
handle_header(data): called once with the entire contents of a part
header as encountered in data fed to the parser
handle_body(data, done=False): called incrementally as part body
data is fed into the parser - its "done" parameter is set to true when
the body is complete.
max_lookahead: maximum amount of bytes to buffer when searching for a complete header.
max_boundary_length: maximum number of bytes that can make up a part
boundary (e.g. \r\n--<marker>--\r\n")
"""
self._marker = marker
self._handle_header = handle_header
self._handle_body = handle_body
self._max_lookahead = max_lookahead
self._max_boundary_length = max_boundary_length
self._buf = bytearray()
self._pos = 0 # current position in buf
self._done = False # whether we have found the terminal boundary and are done parsing
self._header_terminator = b'\r\n\r\n'
# RFC 2046 notes optional "linear whitespace" (e.g. [ \t]+) after the boundary pattern
# and the optional "--" suffix. The boundaries strings can be constructed as follows:
#
# boundary = \r\n--<marker>[ \t]+\r\n
# terminal_boundary = \r\n--<marker>--[ \t]+\r\n
#
# 99 is arbitrarily chosen to represent a max number of linear
# whitespace characters to help avoid wrongly writing boundary
# characters into a (temporary) file.
if not max_boundary_length:
self._max_boundary_length = len(b'\r\n--' + marker + b'--\r\n') + 99
def feed(self, data: bytes):
"""Feeds data incrementally into the parser."""
if self._done:
return
self._buf.extend(data)
while True:
# seek to a boundary if we aren't already on one
i, n, self._done = _next_part_boundary(self._buf, self._marker)
if i == -1 or self._done:
return # waiting for more data or terminal boundary reached
if self._pos == 0:
# parse the part header
if self._max_lookahead and len(self._buf) - self._pos > self._max_lookahead:
raise ProtocolError('header terminator not found')
term_index = self._buf.find(self._header_terminator)
if term_index == -1:
return # waiting for more data
start = i + n
# data includes the double CRLF at the end of the header.
end = term_index + len(self._header_terminator)
self._handle_header(self._buf[start:end])
self._pos = end
else:
# parse the part body
ii, _, self._done = _next_part_boundary(self._buf, self._marker, start=self._pos)
safe_bound = max(0, len(self._buf) - self._max_boundary_length)
if ii != -1:
# part body is finished
self._handle_body(self._buf[self._pos:ii], done=True)
self._buf = self._buf[ii:]
self._pos = 0
if self._done:
return # terminal boundary reached
elif safe_bound > self._pos:
# write partial body data
data = self._buf[self._pos:safe_bound]
self._pos = safe_bound
self._handle_body(data)
return # waiting for more data
else:
return # waiting for more data
def _next_part_boundary(buf: bytes, marker: bytes, start: int = 0
) -> Tuple[int, int, bool]:
"""Returns the index of the next boundary marker in buf beginning at start.
Returns:
(index, length, is_terminal) or (-1, -1, False) if no boundary is found.
"""
prefix = b'\r\n--' + marker
suffix = b'\r\n'
terminal_midfix = b'--'
i = buf.find(prefix, start)
if i == -1:
return -1, -1, False
pos = i + len(prefix)
is_terminal = False
if buf[pos:].startswith(terminal_midfix):
is_terminal = True
pos += len(terminal_midfix)
# Note: RFC 2046 notes optional "linear whitespace" (e.g. [ \t]) after the boundary pattern
# and the optional "--" suffix.
tail = buf[pos:]
for c in tail:
if c not in b' \t':
break
pos += 1
if buf[pos:].startswith(suffix):
pos += len(suffix)
return i, pos - i, is_terminal
return -1, -1, False