hexkit.protocols.eventpub

Protocol related to event publishing.

 1# Copyright 2021 - 2026 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
 2# for the German Human Genome-Phenome Archive (GHGA)
 3#
 4# Licensed under the Apache License, Version 2.0 (the "License");
 5# you may not use this file except in compliance with the License.
 6# You may obtain a copy of the License at
 7#
 8#     http://www.apache.org/licenses/LICENSE-2.0
 9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""Protocol related to event publishing."""
18
19from abc import ABC, abstractmethod
20from collections.abc import Mapping
21from uuid import UUID, uuid4
22
23from pydantic import UUID4
24
25from hexkit.custom_types import Ascii, JsonObject
26from hexkit.utils import check_ascii
27
28
29class EventPublisherProtocol(ABC):
30    """A protocol for publishing events to an event broker."""
31
32    async def publish(  # noqa: PLR0913
33        self,
34        *,
35        payload: JsonObject,
36        type_: Ascii,
37        key: Ascii,
38        topic: Ascii,
39        event_id: UUID4 | None = None,
40        headers: Mapping[str, str] | None = None,
41    ) -> None:
42        """Publish an event.
43
44        Args:
45        - `payload` (JSON): The payload to ship with the event.
46        - `type_` (str): The event type. ASCII characters only.
47        - `key` (str): The event type. ASCII characters only.
48        - `topic` (str): The event type. ASCII characters only.
49        - `event_id` (UUID4, optional): An optional event ID. If not provided, a new
50          one will be generated.
51        - `headers`: Additional headers to attach to the event.
52        """
53        if event_id:
54            if not isinstance(event_id, UUID):
55                raise TypeError(f"event_id must be a UUID, got {type(event_id)}")
56        else:
57            event_id = uuid4()
58
59        check_ascii(type_, key, topic)
60        if headers is None:
61            headers = {}
62
63        await self._publish_validated(
64            payload=payload,
65            type_=type_,
66            key=key,
67            topic=topic,
68            event_id=event_id,
69            headers=headers,
70        )
71
72    @abstractmethod
73    async def _publish_validated(  # noqa: PLR0913
74        self,
75        *,
76        payload: JsonObject,
77        type_: Ascii,
78        key: Ascii,
79        topic: Ascii,
80        event_id: UUID4,
81        headers: Mapping[str, str],
82    ) -> None:
83        """Publish an event with already validated topic and type.
84
85        Args:
86        - `payload` (JSON): The payload to ship with the event.
87        - `type_` (str): The event type. ASCII characters only.
88        - `key` (str): The event type. ASCII characters only.
89        - `topic` (str): The event type. ASCII characters only.
90        - `event_id` (UUID): The event ID.
91        - `headers`: Additional headers to attach to the event.
92        """
93        ...
class EventPublisherProtocol(abc.ABC):
30class EventPublisherProtocol(ABC):
31    """A protocol for publishing events to an event broker."""
32
33    async def publish(  # noqa: PLR0913
34        self,
35        *,
36        payload: JsonObject,
37        type_: Ascii,
38        key: Ascii,
39        topic: Ascii,
40        event_id: UUID4 | None = None,
41        headers: Mapping[str, str] | None = None,
42    ) -> None:
43        """Publish an event.
44
45        Args:
46        - `payload` (JSON): The payload to ship with the event.
47        - `type_` (str): The event type. ASCII characters only.
48        - `key` (str): The event type. ASCII characters only.
49        - `topic` (str): The event type. ASCII characters only.
50        - `event_id` (UUID4, optional): An optional event ID. If not provided, a new
51          one will be generated.
52        - `headers`: Additional headers to attach to the event.
53        """
54        if event_id:
55            if not isinstance(event_id, UUID):
56                raise TypeError(f"event_id must be a UUID, got {type(event_id)}")
57        else:
58            event_id = uuid4()
59
60        check_ascii(type_, key, topic)
61        if headers is None:
62            headers = {}
63
64        await self._publish_validated(
65            payload=payload,
66            type_=type_,
67            key=key,
68            topic=topic,
69            event_id=event_id,
70            headers=headers,
71        )
72
73    @abstractmethod
74    async def _publish_validated(  # noqa: PLR0913
75        self,
76        *,
77        payload: JsonObject,
78        type_: Ascii,
79        key: Ascii,
80        topic: Ascii,
81        event_id: UUID4,
82        headers: Mapping[str, str],
83    ) -> None:
84        """Publish an event with already validated topic and type.
85
86        Args:
87        - `payload` (JSON): The payload to ship with the event.
88        - `type_` (str): The event type. ASCII characters only.
89        - `key` (str): The event type. ASCII characters only.
90        - `topic` (str): The event type. ASCII characters only.
91        - `event_id` (UUID): The event ID.
92        - `headers`: Additional headers to attach to the event.
93        """
94        ...

A protocol for publishing events to an event broker.

async def publish( self, *, payload: Mapping[str, int | float | str | bool | datetime.date | datetime.datetime | uuid.UUID | Sequence[typing.Any] | Mapping[str, typing.Any] | None], type_: str, key: str, topic: str, event_id: Optional[Annotated[uuid.UUID, UuidVersion(uuid_version=4)]] = None, headers: Mapping[str, str] | None = None) -> None:
33    async def publish(  # noqa: PLR0913
34        self,
35        *,
36        payload: JsonObject,
37        type_: Ascii,
38        key: Ascii,
39        topic: Ascii,
40        event_id: UUID4 | None = None,
41        headers: Mapping[str, str] | None = None,
42    ) -> None:
43        """Publish an event.
44
45        Args:
46        - `payload` (JSON): The payload to ship with the event.
47        - `type_` (str): The event type. ASCII characters only.
48        - `key` (str): The event type. ASCII characters only.
49        - `topic` (str): The event type. ASCII characters only.
50        - `event_id` (UUID4, optional): An optional event ID. If not provided, a new
51          one will be generated.
52        - `headers`: Additional headers to attach to the event.
53        """
54        if event_id:
55            if not isinstance(event_id, UUID):
56                raise TypeError(f"event_id must be a UUID, got {type(event_id)}")
57        else:
58            event_id = uuid4()
59
60        check_ascii(type_, key, topic)
61        if headers is None:
62            headers = {}
63
64        await self._publish_validated(
65            payload=payload,
66            type_=type_,
67            key=key,
68            topic=topic,
69            event_id=event_id,
70            headers=headers,
71        )

Publish an event.

Args:

  • payload (JSON): The payload to ship with the event.
  • type_ (str): The event type. ASCII characters only.
  • key (str): The event type. ASCII characters only.
  • topic (str): The event type. ASCII characters only.
  • event_id (UUID4, optional): An optional event ID. If not provided, a new one will be generated.
  • headers: Additional headers to attach to the event.