Dispatchers

class domprob.dispatchers.basic.BasicDispatcher(*consumers)[source]

Bases: DispatcherProtocol

Dispatches observations to registered consumers.

Parameters:

*consumers (tuple[ConsumerProtocol, …]) – Variable number of consumer instances.

Example

>>> from abc import ABC, abstractmethod
>>> from domprob.consumers.basic import BasicConsumer
>>>
>>> class BaseInstrument(ABC):
...     @abstractmethod
...     def add(self):
...         pass
...
>>> class LoggerInstrument:
...     @staticmethod
...     def add():
...         return "Log message added!"
...
>>> class AnalyticsInstrument:
...     @staticmethod
...     def add():
...         return "Analytics entry added!"
...
>>> consumer = BasicConsumer(LoggerInstrument(), AnalyticsInstrument())
>>> dispatcher = BasicDispatcher(consumer)
>>>
>>> dispatcher
BasicDispatcher(consumers=(BasicConsumer(instruments=('<domprob.dispatchers.basic.LoggerInstrument object at 0x...>', '<domprob.dispatchers.basic.AnalyticsInstrument object at 0x...>')),))
>>>
>>> from domprob import sensor, BaseObservation
>>>
>>> class SomeObservation(BaseObservation):
...     @sensor(LoggerInstrument)
...     @sensor(AnalyticsInstrument)
...     def foo(self, instrument: BaseInstrument) -> None:
...         print(instrument.add())
...
>>> obs = SomeObservation()
>>> dispatcher.dispatch(obs)
Analytics entry added!
Log message added!
__eq__(other)[source]

Return self==value.

Return type:

bool

__repr__()[source]

Return a string representation of the dispatcher.

Returns:

A string representation of the dispatcher and its

consumers.

Return type:

str

Example

>>> from abc import ABC, abstractmethod
>>> from domprob.consumers.basic import BasicConsumer
>>>
>>> class BaseInstrument(ABC):
...     @abstractmethod
...     def add(self):
...         pass
...
>>> class LoggerInstrument:
...     @staticmethod
...     def add():
...         return "Log message added!"
...
>>> class AnalyticsInstrument:
...     @staticmethod
...     def add():
...         return "Analytics entry added!"
...
>>> consumer = BasicConsumer(LoggerInstrument(), AnalyticsInstrument())
>>> dispatcher = BasicDispatcher(consumer)
>>> repr(dispatcher)
"BasicDispatcher(consumers=(BasicConsumer(instruments=('<domprob.dispatchers.basic.LoggerInstrument object at 0x...>', '<domprob.dispatchers.basic.AnalyticsInstrument object at 0x...>')),))"
_abc_impl = <_abc._abc_data object>
_is_protocol = False
_is_runtime_protocol = True
dispatch(observation)[source]

Dispatch an observation to all available consumers.

This method calls consume on consumers to handle the observation passed.

Parameters:

observation (_Obs) – The observation to dispatch.

Return type:

None

Example

>>> from abc import ABC, abstractmethod
>>> from domprob.consumers.basic import BasicConsumer
>>>
>>> class BaseInstrument(ABC):
...     @abstractmethod
...     def add(self):
...         pass
...
>>> class LoggerInstrument:
...     @staticmethod
...     def add():
...         return "Log message added!"
...
>>> class AnalyticsInstrument:
...     @staticmethod
...     def add():
...         return "Analytics entry added!"
...
>>>
>>> consumer = BasicConsumer(LoggerInstrument(), AnalyticsInstrument())
>>> dispatcher = BasicDispatcher(consumer)
>>>
>>> dispatcher
BasicDispatcher(consumers=(BasicConsumer(instruments=('<domprob.dispatchers.basic.LoggerInstrument object at 0x...>', '<domprob.dispatchers.basic.AnalyticsInstrument object at 0x...>')),))
>>>
>>> from domprob import sensor, BaseObservation
>>>
>>> class SomeObservation(BaseObservation):
...     @sensor(LoggerInstrument)
...     @sensor(AnalyticsInstrument)
...     def foo(self, instrument: BaseInstrument) -> None:
...         print(instrument.add())
...
>>> obs = SomeObservation()
>>> dispatcher.dispatch(obs)
Analytics entry added!
Log message added!
exception domprob.dispatchers.dispatcher.DispatcherException[source]

Bases: DomprobException

Base exception for errors occurring within dispatchers.

This exception is raised when an error occurs while processing observations within a dispatcher.

It inherits from DomprobException, allowing it to be caught alongside other domain-specific exceptions.

__repr__()

Return repr(self).

add_note(object, /)

Exception.add_note(note) – add a note to the exception

args
with_traceback(object, /)

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class domprob.dispatchers.dispatcher.DispatcherProtocol(*args, **kwargs)[source]

Bases: Protocol

Protocol defining the structure for dispatchers handling domain observations.

Classes implementing this protocol must define:

  • dispatch(): Processes an ObservationProtocol and returns a result.

  • __repr__(): Provides a string representation of the dispatcher.

This protocol is @runtime_checkable, meaning isinstance(dispatcher, DispatcherProtocol) can be used to verify conformance at runtime.

Example

>>> from domprob.dispatchers.dispatcher import DispatcherProtocol
>>> from domprob.observations.observation import ObservationProtocol
>>>
>>> class ConcreteDispatcher:
...     @staticmethod
...     def dispatch(self, observation: ObservationProtocol) -> str:
...         return "Processed observation"
...
...     def __repr__(self) -> str:
...         return "ConcreteDispatcher()"
...
>>> dispatcher = ConcreteDispatcher()
>>> assert isinstance(dispatcher, DispatcherProtocol)
_abc_impl = <_abc._abc_data object>
_is_protocol = True
_is_runtime_protocol = True
dispatch(observation)[source]

Dispatch an observation and return a result.

Parameters:

observation (ObservationProtocol[_P, _R]) – The observation to process.

Returns:

The result of processing the observation.

Return type:

_R

Example

>>> from domprob import sensor, BaseObservation
>>> from domprob.dispatchers.dispatcher import DispatcherProtocol
>>> from domprob.observations.observation import ObservationProtocol
>>>
>>> class MyDispatcher:
...     @staticmethod
...     def dispatch(observation: ObservationProtocol) -> str:
...         return "Handled"
...
>>> dispatcher = MyDispatcher()
>>>
>>> class SomeInstrument:
...     pass
...
>>> class Obs(BaseObservation):
...     @sensor(SomeInstrument)
...     def foo(self, instrument: SomeInstrument) -> str:
...         pass
...
>>> result = dispatcher.dispatch(Obs())
>>> print(result)
Handled