hexkit.providers.testing

This sub-package contains providers for testing purposes. These providers should be very lightweight and preferably only need dependencies from the standard library.

ATTENTION: Do not use these providers in production.

 1# Copyright 2021 - 2026 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
 2# for the German Human Genome-Phenome Archive (GHGA)
 3#
 4# Licensed under the Apache License, Version 2.0 (the "License");
 5# you may not use this file except in compliance with the License.
 6# You may obtain a copy of the License at
 7#
 8#     http://www.apache.org/licenses/LICENSE-2.0
 9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18This sub-package contains providers for testing purposes.
19These providers should be very lightweight and preferably only need dependencies from
20the standard library.
21
22ATTENTION: Do not use these providers in production.
23"""
24
25from .dao import MockDAOEmptyError, new_mock_dao_class
26from .eventpub import InMemEventPublisher, InMemEventStore, TopicExhaustedError
27from .kvstore import (
28    InMemBytesKeyValueStore,
29    InMemDtoKeyValueStore,
30    InMemJsonKeyValueStore,
31    InMemStrKeyValueStore,
32)
33
34__all__ = [
35    "InMemBytesKeyValueStore",
36    "InMemDtoKeyValueStore",
37    "InMemEventPublisher",
38    "InMemEventStore",
39    "InMemJsonKeyValueStore",
40    "InMemStrKeyValueStore",
41    "MockDAOEmptyError",
42    "TopicExhaustedError",
43    "new_mock_dao_class",
44]
class InMemBytesKeyValueStore(typing.Protocol, typing.Generic[~V]):
131class InMemBytesKeyValueStore(InMemBaseKeyValueStore):
132    """In-memory KVStore provider for binary (bytes) data."""
133
134    async def _encode_value(self, value: bytes) -> bytes:
135        """Bytes values are stored as-is."""
136        if not isinstance(value, bytes):
137            raise TypeError("Value must be of type bytes")
138        return value
139
140    async def _decode_value(self, raw: bytes) -> bytes:
141        """Bytes values are retrieved as-is."""
142        return raw

In-memory KVStore provider for binary (bytes) data.

class InMemDtoKeyValueStore(hexkit.providers.testing.kvstore.InMemBaseKeyValueStore, typing.Generic[~Dto]):
173class InMemDtoKeyValueStore(InMemBaseKeyValueStore, Generic[Dto]):
174    """In-memory KVStore provider for Pydantic model (DTO) data."""
175
176    _dto_model: type[Dto]
177
178    @classmethod
179    @asynccontextmanager
180    async def construct(
181        cls,
182        *,
183        key_prefix: str = DEFAULT_KV_PREFIX,
184        backing_store: dict[str, bytes] | None = None,
185        dto_model: type[Dto] | None = None,
186    ):
187        """Yield an in-memory DTO store instance."""
188        if dto_model is None:
189            raise ValueError("The `dto_model` argument is required")
190        yield cls(
191            dto_model=dto_model,
192            key_prefix=key_prefix,
193            backing_store=backing_store,
194        )
195
196    def __init__(
197        self,
198        *,
199        dto_model: type[Dto],
200        key_prefix: str = DEFAULT_KV_PREFIX,
201        backing_store: dict[str, bytes] | None = None,
202    ):
203        """Initialize with DTO model type and optional storage settings."""
204        super().__init__(
205            key_prefix=key_prefix,
206            backing_store=backing_store,
207        )
208        self._dto_model = dto_model
209        self._repr = (
210            f"{self.__class__.__name__}(dto_model={dto_model.__name__}"
211            + (f" key_prefix={key_prefix!r}" if key_prefix != DEFAULT_KV_PREFIX else "")
212            + ")"
213        )
214
215    async def _encode_value(self, value: Dto) -> bytes:
216        """Serialize DTO to JSON bytes."""
217        if not isinstance(value, self._dto_model):
218            raise TypeError(f"Value must be of type {self._dto_model.__name__}")
219        return value.model_dump_json().encode("utf-8")
220
221    async def _decode_value(self, raw: bytes) -> Dto:
222        """Deserialize JSON bytes to DTO."""
223        return self._dto_model.model_validate_json(raw)

In-memory KVStore provider for Pydantic model (DTO) data.

class InMemEventPublisher(hexkit.protocols.eventpub.EventPublisherProtocol):
 71class InMemEventPublisher(EventPublisherProtocol):
 72    """
 73    An in-memory EventPublisher for testing purposes.
 74    Please note, this only works when publisher and consumers are running in the same
 75    thread. Not suitable for inter-thread or inter-process communication.
 76    """
 77
 78    def __init__(self, event_store: InMemEventStore | None = None):
 79        """Initialize with existing event_store or let it create a new one."""
 80        self.event_store = event_store if event_store else InMemEventStore()
 81
 82    async def _publish_validated(  # noqa: PLR0913
 83        self,
 84        *,
 85        payload: JsonObject,
 86        type_: str,
 87        key: str,
 88        topic: str,
 89        event_id: UUID,
 90        headers: Mapping[str, str],
 91    ) -> None:
 92        """Publish an event with already validated topic and type.
 93
 94        Args:
 95            payload (JSON): The payload to ship with the event.
 96            type_ (str): The event type. ASCII characters only.
 97            key (str): The event type. ASCII characters only.
 98            topic (str): The event type. ASCII characters only.
 99        """
100        event = Event(type_=type_, key=key, payload=payload)
101        self.event_store.post(topic=topic, event=event)

An in-memory EventPublisher for testing purposes. Please note, this only works when publisher and consumers are running in the same thread. Not suitable for inter-thread or inter-process communication.

InMemEventPublisher( event_store: InMemEventStore | None = None)
78    def __init__(self, event_store: InMemEventStore | None = None):
79        """Initialize with existing event_store or let it create a new one."""
80        self.event_store = event_store if event_store else InMemEventStore()

Initialize with existing event_store or let it create a new one.

event_store
class InMemEventStore:
47class InMemEventStore:
48    """
49    A manager for multiple topics, whereby each topic is model as queue not as log.
50
51    It should be seen as a utility of the `InMemEventPublisher` and should only together
52    with that class.
53    """
54
55    def __init__(self):
56        """Create an in memory topic registry based on collections' deque."""
57        self.topics: dict[str, deque[Event]] = defaultdict(deque)
58
59    def post(self, topic: str, event: Event) -> None:
60        """Queue a new event to a topic."""
61        self.topics[topic].append(event)
62
63    def get(self, topic) -> Event:
64        """Get the next element in the queue corresponding to the specified topic."""
65        try:
66            return self.topics[topic].popleft()
67        except IndexError as error:
68            raise TopicExhaustedError() from error

A manager for multiple topics, whereby each topic is model as queue not as log.

It should be seen as a utility of the InMemEventPublisher and should only together with that class.

InMemEventStore()
55    def __init__(self):
56        """Create an in memory topic registry based on collections' deque."""
57        self.topics: dict[str, deque[Event]] = defaultdict(deque)

Create an in memory topic registry based on collections' deque.

topics: dict[str, collections.deque[hexkit.providers.testing.eventpub.Event]]
def post(self, topic: str, event: hexkit.providers.testing.eventpub.Event) -> None:
59    def post(self, topic: str, event: Event) -> None:
60        """Queue a new event to a topic."""
61        self.topics[topic].append(event)

Queue a new event to a topic.

def get(self, topic) -> hexkit.providers.testing.eventpub.Event:
63    def get(self, topic) -> Event:
64        """Get the next element in the queue corresponding to the specified topic."""
65        try:
66            return self.topics[topic].popleft()
67        except IndexError as error:
68            raise TopicExhaustedError() from error

Get the next element in the queue corresponding to the specified topic.

class InMemJsonKeyValueStore(typing.Protocol, typing.Generic[~V]):
159class InMemJsonKeyValueStore(InMemBaseKeyValueStore):
160    """In-memory KVStore provider for JSON objects."""
161
162    async def _encode_value(self, value: JsonObject) -> bytes:
163        """Serialize JSON object to UTF-8 encoded JSON string."""
164        if not isinstance(value, dict):
165            raise TypeError("Value must be a dict representing a JSON object")
166        return json.dumps(value).encode("utf-8")
167
168    async def _decode_value(self, raw: bytes) -> JsonObject:
169        """Deserialize UTF-8 encoded JSON string to JSON object."""
170        return json.loads(raw.decode("utf-8"))

In-memory KVStore provider for JSON objects.

class InMemStrKeyValueStore(typing.Protocol, typing.Generic[~V]):
145class InMemStrKeyValueStore(InMemBaseKeyValueStore):
146    """In-memory KVStore provider for string data."""
147
148    async def _encode_value(self, value: str) -> bytes:
149        """Encode string to UTF-8 bytes."""
150        if not isinstance(value, str):
151            raise TypeError("Value must be of type str")
152        return value.encode("utf-8")
153
154    async def _decode_value(self, raw: bytes) -> str:
155        """Decode UTF-8 bytes to string."""
156        return raw.decode("utf-8")

In-memory KVStore provider for string data.

class MockDAOEmptyError(builtins.RuntimeError):
332class MockDAOEmptyError(RuntimeError):
333    """Raised when attempting to access the `latest` property of an empty mock DAO"""

Raised when attempting to access the latest property of an empty mock DAO

class TopicExhaustedError(builtins.RuntimeError):
35class TopicExhaustedError(RuntimeError):
36    """Thrown when now more event are queued in a topic."""

Thrown when now more event are queued in a topic.

def new_mock_dao_class( *, dto_model: type[~DTO], id_field: str, handle_mql: bool = True) -> type[hexkit.providers.testing.dao.BaseInMemDao[~DTO]]:
451def new_mock_dao_class(
452    *, dto_model: type[DTO], id_field: str, handle_mql: bool = True
453) -> type[BaseInMemDao[DTO]]:
454    """Produce a mock DAO for the given DTO model and ID field.
455
456    If `handle_mql` is True, the DAO will attempt to resolve query mappings that
457    use MongoDB query language predicates (e.g. $ne, $in, $ge, etc.). Not all MQL
458    operators are supported. Please see `SUPPORTED_MQL_OPERATORS` for a complete list
459    of the currently supported MQL operators.
460    """
461
462    class MockDao(BaseInMemDao[DTO]):
463        """Mock dao that stores data in memory"""
464
465        _id_field: str = id_field
466        _handle_mql: bool = handle_mql
467
468        def _serialize(self, dto: DTO) -> Document:
469            return dto_to_document(dto, id_field=id_field)
470
471        def _deserialize(self, document: Document) -> DTO:
472            return document_to_dto(
473                deepcopy(document), id_field=id_field, dto_model=dto_model
474            )
475
476    return MockDao

Produce a mock DAO for the given DTO model and ID field.

If handle_mql is True, the DAO will attempt to resolve query mappings that use MongoDB query language predicates (e.g. $ne, $in, $ge, etc.). Not all MQL operators are supported. Please see SUPPORTED_MQL_OPERATORS for a complete list of the currently supported MQL operators.