Dispatchers
- class domprob.dispatchers.basic.BasicDispatcher(*consumers)[source]
Bases:
DispatcherProtocolDispatches observations to registered consumers.
- Parameters:
*consumers (tuple[ConsumerProtocol, …]) – Variable number of consumer instances.
Example
>>> from abc import ABC, abstractmethod >>> from domprob.consumers.basic import BasicConsumer >>> >>> class BaseInstrument(ABC): ... @abstractmethod ... def add(self): ... pass ... >>> class LoggerInstrument: ... @staticmethod ... def add(): ... return "Log message added!" ... >>> class AnalyticsInstrument: ... @staticmethod ... def add(): ... return "Analytics entry added!" ... >>> consumer = BasicConsumer(LoggerInstrument(), AnalyticsInstrument()) >>> dispatcher = BasicDispatcher(consumer) >>> >>> dispatcher BasicDispatcher(consumers=(BasicConsumer(instruments=('<domprob.dispatchers.basic.LoggerInstrument object at 0x...>', '<domprob.dispatchers.basic.AnalyticsInstrument object at 0x...>')),)) >>> >>> from domprob import sensor, BaseObservation >>> >>> class SomeObservation(BaseObservation): ... @sensor(LoggerInstrument) ... @sensor(AnalyticsInstrument) ... def foo(self, instrument: BaseInstrument) -> None: ... print(instrument.add()) ... >>> obs = SomeObservation() >>> dispatcher.dispatch(obs) Analytics entry added! Log message added!
- __repr__()[source]
Return a string representation of the dispatcher.
- Returns:
- A string representation of the dispatcher and its
consumers.
- Return type:
str
Example
>>> from abc import ABC, abstractmethod >>> from domprob.consumers.basic import BasicConsumer >>> >>> class BaseInstrument(ABC): ... @abstractmethod ... def add(self): ... pass ... >>> class LoggerInstrument: ... @staticmethod ... def add(): ... return "Log message added!" ... >>> class AnalyticsInstrument: ... @staticmethod ... def add(): ... return "Analytics entry added!" ... >>> consumer = BasicConsumer(LoggerInstrument(), AnalyticsInstrument()) >>> dispatcher = BasicDispatcher(consumer) >>> repr(dispatcher) "BasicDispatcher(consumers=(BasicConsumer(instruments=('<domprob.dispatchers.basic.LoggerInstrument object at 0x...>', '<domprob.dispatchers.basic.AnalyticsInstrument object at 0x...>')),))"
- _abc_impl = <_abc._abc_data object>
- _is_protocol = False
- _is_runtime_protocol = True
- dispatch(observation)[source]
Dispatch an observation to all available consumers.
This method calls consume on consumers to handle the observation passed.
- Parameters:
observation (_Obs) – The observation to dispatch.
- Return type:
None
Example
>>> from abc import ABC, abstractmethod >>> from domprob.consumers.basic import BasicConsumer >>> >>> class BaseInstrument(ABC): ... @abstractmethod ... def add(self): ... pass ... >>> class LoggerInstrument: ... @staticmethod ... def add(): ... return "Log message added!" ... >>> class AnalyticsInstrument: ... @staticmethod ... def add(): ... return "Analytics entry added!" ... >>> >>> consumer = BasicConsumer(LoggerInstrument(), AnalyticsInstrument()) >>> dispatcher = BasicDispatcher(consumer) >>> >>> dispatcher BasicDispatcher(consumers=(BasicConsumer(instruments=('<domprob.dispatchers.basic.LoggerInstrument object at 0x...>', '<domprob.dispatchers.basic.AnalyticsInstrument object at 0x...>')),)) >>> >>> from domprob import sensor, BaseObservation >>> >>> class SomeObservation(BaseObservation): ... @sensor(LoggerInstrument) ... @sensor(AnalyticsInstrument) ... def foo(self, instrument: BaseInstrument) -> None: ... print(instrument.add()) ... >>> obs = SomeObservation() >>> dispatcher.dispatch(obs) Analytics entry added! Log message added!
- exception domprob.dispatchers.dispatcher.DispatcherException[source]
Bases:
DomprobExceptionBase exception for errors occurring within dispatchers.
This exception is raised when an error occurs while processing observations within a dispatcher.
It inherits from DomprobException, allowing it to be caught alongside other domain-specific exceptions.
- __repr__()
Return repr(self).
- add_note(object, /)
Exception.add_note(note) – add a note to the exception
- args
- with_traceback(object, /)
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class domprob.dispatchers.dispatcher.DispatcherProtocol(*args, **kwargs)[source]
Bases:
ProtocolProtocol defining the structure for dispatchers handling domain observations.
Classes implementing this protocol must define:
dispatch(): Processes an ObservationProtocol and returns a result.
__repr__(): Provides a string representation of the dispatcher.
This protocol is @runtime_checkable, meaning isinstance(dispatcher, DispatcherProtocol) can be used to verify conformance at runtime.
Example
>>> from domprob.dispatchers.dispatcher import DispatcherProtocol >>> from domprob.observations.observation import ObservationProtocol >>> >>> class ConcreteDispatcher: ... @staticmethod ... def dispatch(self, observation: ObservationProtocol) -> str: ... return "Processed observation" ... ... def __repr__(self) -> str: ... return "ConcreteDispatcher()" ... >>> dispatcher = ConcreteDispatcher() >>> assert isinstance(dispatcher, DispatcherProtocol)
- _abc_impl = <_abc._abc_data object>
- _is_protocol = True
- _is_runtime_protocol = True
- dispatch(observation)[source]
Dispatch an observation and return a result.
- Parameters:
observation (ObservationProtocol[_P, _R]) – The observation to process.
- Returns:
The result of processing the observation.
- Return type:
_R
Example
>>> from domprob import sensor, BaseObservation >>> from domprob.dispatchers.dispatcher import DispatcherProtocol >>> from domprob.observations.observation import ObservationProtocol >>> >>> class MyDispatcher: ... @staticmethod ... def dispatch(observation: ObservationProtocol) -> str: ... return "Handled" ... >>> dispatcher = MyDispatcher() >>> >>> class SomeInstrument: ... pass ... >>> class Obs(BaseObservation): ... @sensor(SomeInstrument) ... def foo(self, instrument: SomeInstrument) -> str: ... pass ... >>> result = dispatcher.dispatch(Obs()) >>> print(result) Handled