hexkit.providers.testing
This sub-package contains providers for testing purposes. These providers should be very lightweight and preferably only need dependencies from the standard library.
ATTENTION: Do not use these providers in production.
1# Copyright 2021 - 2026 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln 2# for the German Human Genome-Phenome Archive (GHGA) 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17""" 18This sub-package contains providers for testing purposes. 19These providers should be very lightweight and preferably only need dependencies from 20the standard library. 21 22ATTENTION: Do not use these providers in production. 23""" 24 25from .dao import MockDAOEmptyError, new_mock_dao_class 26from .eventpub import InMemEventPublisher, InMemEventStore, TopicExhaustedError 27from .kvstore import ( 28 InMemBytesKeyValueStore, 29 InMemDtoKeyValueStore, 30 InMemJsonKeyValueStore, 31 InMemStrKeyValueStore, 32) 33 34__all__ = [ 35 "InMemBytesKeyValueStore", 36 "InMemDtoKeyValueStore", 37 "InMemEventPublisher", 38 "InMemEventStore", 39 "InMemJsonKeyValueStore", 40 "InMemStrKeyValueStore", 41 "MockDAOEmptyError", 42 "TopicExhaustedError", 43 "new_mock_dao_class", 44]
131class InMemBytesKeyValueStore(InMemBaseKeyValueStore): 132 """In-memory KVStore provider for binary (bytes) data.""" 133 134 async def _encode_value(self, value: bytes) -> bytes: 135 """Bytes values are stored as-is.""" 136 if not isinstance(value, bytes): 137 raise TypeError("Value must be of type bytes") 138 return value 139 140 async def _decode_value(self, raw: bytes) -> bytes: 141 """Bytes values are retrieved as-is.""" 142 return raw
In-memory KVStore provider for binary (bytes) data.
173class InMemDtoKeyValueStore(InMemBaseKeyValueStore, Generic[Dto]): 174 """In-memory KVStore provider for Pydantic model (DTO) data.""" 175 176 _dto_model: type[Dto] 177 178 @classmethod 179 @asynccontextmanager 180 async def construct( 181 cls, 182 *, 183 key_prefix: str = DEFAULT_KV_PREFIX, 184 backing_store: dict[str, bytes] | None = None, 185 dto_model: type[Dto] | None = None, 186 ): 187 """Yield an in-memory DTO store instance.""" 188 if dto_model is None: 189 raise ValueError("The `dto_model` argument is required") 190 yield cls( 191 dto_model=dto_model, 192 key_prefix=key_prefix, 193 backing_store=backing_store, 194 ) 195 196 def __init__( 197 self, 198 *, 199 dto_model: type[Dto], 200 key_prefix: str = DEFAULT_KV_PREFIX, 201 backing_store: dict[str, bytes] | None = None, 202 ): 203 """Initialize with DTO model type and optional storage settings.""" 204 super().__init__( 205 key_prefix=key_prefix, 206 backing_store=backing_store, 207 ) 208 self._dto_model = dto_model 209 self._repr = ( 210 f"{self.__class__.__name__}(dto_model={dto_model.__name__}" 211 + (f" key_prefix={key_prefix!r}" if key_prefix != DEFAULT_KV_PREFIX else "") 212 + ")" 213 ) 214 215 async def _encode_value(self, value: Dto) -> bytes: 216 """Serialize DTO to JSON bytes.""" 217 if not isinstance(value, self._dto_model): 218 raise TypeError(f"Value must be of type {self._dto_model.__name__}") 219 return value.model_dump_json().encode("utf-8") 220 221 async def _decode_value(self, raw: bytes) -> Dto: 222 """Deserialize JSON bytes to DTO.""" 223 return self._dto_model.model_validate_json(raw)
In-memory KVStore provider for Pydantic model (DTO) data.
71class InMemEventPublisher(EventPublisherProtocol): 72 """ 73 An in-memory EventPublisher for testing purposes. 74 Please note, this only works when publisher and consumers are running in the same 75 thread. Not suitable for inter-thread or inter-process communication. 76 """ 77 78 def __init__(self, event_store: InMemEventStore | None = None): 79 """Initialize with existing event_store or let it create a new one.""" 80 self.event_store = event_store if event_store else InMemEventStore() 81 82 async def _publish_validated( # noqa: PLR0913 83 self, 84 *, 85 payload: JsonObject, 86 type_: str, 87 key: str, 88 topic: str, 89 event_id: UUID, 90 headers: Mapping[str, str], 91 ) -> None: 92 """Publish an event with already validated topic and type. 93 94 Args: 95 payload (JSON): The payload to ship with the event. 96 type_ (str): The event type. ASCII characters only. 97 key (str): The event type. ASCII characters only. 98 topic (str): The event type. ASCII characters only. 99 """ 100 event = Event(type_=type_, key=key, payload=payload) 101 self.event_store.post(topic=topic, event=event)
An in-memory EventPublisher for testing purposes. Please note, this only works when publisher and consumers are running in the same thread. Not suitable for inter-thread or inter-process communication.
78 def __init__(self, event_store: InMemEventStore | None = None): 79 """Initialize with existing event_store or let it create a new one.""" 80 self.event_store = event_store if event_store else InMemEventStore()
Initialize with existing event_store or let it create a new one.
Inherited Members
47class InMemEventStore: 48 """ 49 A manager for multiple topics, whereby each topic is model as queue not as log. 50 51 It should be seen as a utility of the `InMemEventPublisher` and should only together 52 with that class. 53 """ 54 55 def __init__(self): 56 """Create an in memory topic registry based on collections' deque.""" 57 self.topics: dict[str, deque[Event]] = defaultdict(deque) 58 59 def post(self, topic: str, event: Event) -> None: 60 """Queue a new event to a topic.""" 61 self.topics[topic].append(event) 62 63 def get(self, topic) -> Event: 64 """Get the next element in the queue corresponding to the specified topic.""" 65 try: 66 return self.topics[topic].popleft() 67 except IndexError as error: 68 raise TopicExhaustedError() from error
A manager for multiple topics, whereby each topic is model as queue not as log.
It should be seen as a utility of the InMemEventPublisher and should only together
with that class.
55 def __init__(self): 56 """Create an in memory topic registry based on collections' deque.""" 57 self.topics: dict[str, deque[Event]] = defaultdict(deque)
Create an in memory topic registry based on collections' deque.
59 def post(self, topic: str, event: Event) -> None: 60 """Queue a new event to a topic.""" 61 self.topics[topic].append(event)
Queue a new event to a topic.
63 def get(self, topic) -> Event: 64 """Get the next element in the queue corresponding to the specified topic.""" 65 try: 66 return self.topics[topic].popleft() 67 except IndexError as error: 68 raise TopicExhaustedError() from error
Get the next element in the queue corresponding to the specified topic.
159class InMemJsonKeyValueStore(InMemBaseKeyValueStore): 160 """In-memory KVStore provider for JSON objects.""" 161 162 async def _encode_value(self, value: JsonObject) -> bytes: 163 """Serialize JSON object to UTF-8 encoded JSON string.""" 164 if not isinstance(value, dict): 165 raise TypeError("Value must be a dict representing a JSON object") 166 return json.dumps(value).encode("utf-8") 167 168 async def _decode_value(self, raw: bytes) -> JsonObject: 169 """Deserialize UTF-8 encoded JSON string to JSON object.""" 170 return json.loads(raw.decode("utf-8"))
In-memory KVStore provider for JSON objects.
145class InMemStrKeyValueStore(InMemBaseKeyValueStore): 146 """In-memory KVStore provider for string data.""" 147 148 async def _encode_value(self, value: str) -> bytes: 149 """Encode string to UTF-8 bytes.""" 150 if not isinstance(value, str): 151 raise TypeError("Value must be of type str") 152 return value.encode("utf-8") 153 154 async def _decode_value(self, raw: bytes) -> str: 155 """Decode UTF-8 bytes to string.""" 156 return raw.decode("utf-8")
In-memory KVStore provider for string data.
332class MockDAOEmptyError(RuntimeError): 333 """Raised when attempting to access the `latest` property of an empty mock DAO"""
Raised when attempting to access the latest property of an empty mock DAO
35class TopicExhaustedError(RuntimeError): 36 """Thrown when now more event are queued in a topic."""
Thrown when now more event are queued in a topic.
451def new_mock_dao_class( 452 *, dto_model: type[DTO], id_field: str, handle_mql: bool = True 453) -> type[BaseInMemDao[DTO]]: 454 """Produce a mock DAO for the given DTO model and ID field. 455 456 If `handle_mql` is True, the DAO will attempt to resolve query mappings that 457 use MongoDB query language predicates (e.g. $ne, $in, $ge, etc.). Not all MQL 458 operators are supported. Please see `SUPPORTED_MQL_OPERATORS` for a complete list 459 of the currently supported MQL operators. 460 """ 461 462 class MockDao(BaseInMemDao[DTO]): 463 """Mock dao that stores data in memory""" 464 465 _id_field: str = id_field 466 _handle_mql: bool = handle_mql 467 468 def _serialize(self, dto: DTO) -> Document: 469 return dto_to_document(dto, id_field=id_field) 470 471 def _deserialize(self, document: Document) -> DTO: 472 return document_to_dto( 473 deepcopy(document), id_field=id_field, dto_model=dto_model 474 ) 475 476 return MockDao
Produce a mock DAO for the given DTO model and ID field.
If handle_mql is True, the DAO will attempt to resolve query mappings that
use MongoDB query language predicates (e.g. $ne, $in, $ge, etc.). Not all MQL
operators are supported. Please see SUPPORTED_MQL_OPERATORS for a complete list
of the currently supported MQL operators.