Consumers

class domprob.consumers.basic.BasicConsumer(*instruments)[source]

Bases: ConsumerProtocol, Generic[_Instrument]

A consumer that processes observations by applying instrument implementations.

This class acts as a consumer that takes in instrument implementations and processes observations by executing their associated sensors methods with the relevant instrument.

Parameters:

*instruments (_Instrum) – One or more instrument instances.

Example

>>> from domprob import sensor, BaseObservation
>>>
>>> class LoggerInstrument:
...     @staticmethod
...     def log(message: str):
...         print(f"LOG: {message}")
...
>>> class SomeObservation(BaseObservation):
...     @sensor(LoggerInstrument)
...     def sense_event(self, instrument: LoggerInstrument):
...         instrument.log("Event sensed!")
...
>>> logger = LoggerInstrument()
>>> consumer = BasicConsumer(logger)
>>>
>>> consumer.consume(SomeObservation())
LOG: Event sensed!
__eq__(other)[source]

Return self==value.

Return type:

bool

__repr__()[source]

Return repr(self).

Return type:

str

_abc_impl = <_abc._abc_data object>
_is_protocol = False
_is_runtime_protocol = True
consume(observation)[source]

Processes an observation by invoking the relevant instrument methods.

The method iterates through the observation’s sensors and applies the required instrument implementations.

Parameters:

observation (ObservationProtocol) – The observation to process.

Return type:

None

instrum_imps(observation, sensor)[source]

Retrieves instrument implementations required for sensors.

Parameters:
Yields:

_Instrum | None

The appropriate instrument

implementation or None if non-required instrument implementations are missing.

Raises:

ReqInstrumException – If a required instrument is missing.

Return type:

Generator[Optional[TypeVar(_Instrument, bound= Any)], None, None]

Example

>>> from domprob import sensor, BaseObservation
>>> from domprob.sensors.meth import SensorMethod
>>>
>>> class LoggerInstrument:
...     @staticmethod
...     def log(message: str):
...         print(f"LOG: {message}")
...
>>> class SomeObservation(BaseObservation):
...     @staticmethod
...     @sensor(LoggerInstrument)
...     def sense_observation(instrument: LoggerInstrument):
...         instrument.log("Event sensed!")
...
>>> logger = LoggerInstrument()
>>> consumer = BasicConsumer(logger)
>>> sensor_meth = SensorMethod(SomeObservation.sense_observation)
>>>
>>> list(consumer.instrum_imps(SomeObservation(), sensor_meth))
[<domprob.consumers.basic.LoggerInstrument object at 0x...>]
class domprob.consumers.basic.InstrumentImpRegistry(*instruments)[source]

Bases: Collection[_Instrument]

Registry for instrument implementations, allowing lookup and caching.

This class acts as a collection that stores instruments and supports:

  • Efficient retrieval of instruments by type.

  • Caching of previously looked-up instruments for performance optimization.

Parameters:

*instruments (_Instrum) – Variable number of instrument instances to store.

Example

>>> class LoggerInstrument:
...     @staticmethod
...     def add():
...         return "Log message added!"
...
>>> class AnalyticsInstrument:
...     @staticmethod
...     def add():
...         return "Analytics entry added!"
...
>>> logger = LoggerInstrument()
>>> analytics = AnalyticsInstrument()
>>>
>>> registry = InstrumentImpRegistry(logger, analytics)
>>> logger_ = registry.get(LoggerInstrument)
>>>
>>> logger == logger_
True
>>> print(registry.get(object))
None
__contains__(item)[source]

Check if an instrument exists in the registry.

Parameters:

item (object) – The instrument instance or class to check.

Returns:

True if the instrument is present, otherwise False.

Return type:

bool

Example

>>> class LoggerInstrument:
...     @staticmethod
...     def add():
...         return "Log message added!"
...
>>> class AnalyticsInstrument:
...     @staticmethod
...     def add():
...         return "Analytics entry added!"
...
>>> logger = LoggerInstrument()
>>> analytics = AnalyticsInstrument()
>>>
>>> registry = InstrumentImpRegistry(logger, analytics)
>>> logger in registry
True
>>> object in registry
False
__iter__()[source]

Iterate over stored instruments.

Returns:

An iterator over the instruments.

Return type:

Iterator[_Instrum]

Example

>>> class LoggerInstrument:
...     @staticmethod
...     def add():
...         return "Log message added!"
...
>>> class AnalyticsInstrument:
...     @staticmethod
...     def add():
...         return "Analytics entry added!"
...
>>> logger = LoggerInstrument()
>>> analytics = AnalyticsInstrument()
>>>
>>> registry = InstrumentImpRegistry(logger, analytics)
>>>
>>> for instrument in registry:
...     print(instrument.add())
...
Log message added!
Analytics entry added!
__len__()[source]

Return the number of stored instruments.

Returns:

The number of instruments in the registry.

Return type:

int

Example

>>> class LoggerInstrument:
...     @staticmethod
...     def add():
...         return "Log message added!"
...
>>> class AnalyticsInstrument:
...     @staticmethod
...     def add():
...         return "Analytics entry added!"
...
>>> logger = LoggerInstrument()
>>> analytics = AnalyticsInstrument()
>>>
>>> registry = InstrumentImpRegistry(logger, analytics)
>>>
>>> len(registry)
2
__repr__()[source]

Return a string representation of the registry.

Returns:

The string representation of the registry.

Return type:

str

_abc_impl = <_abc._abc_data object>
static _is_hashable(obj)[source]

Check if an object is hashable.

Parameters:

obj (Any) – The object to check.

Returns:

True if the object is hashable, False otherwise.

Return type:

bool

get(instrument_cls, required=False)[source]

Retrieve an instrument instance by its class type.

If the instrument class is hashable, results are cached for efficiency.

Parameters:
  • instrument_cls (type[TypeVar(_Instrument, bound= Any)]) – The class type of the instrument to retrieve.

  • required (bool) – If True, raises a KeyError if the instrument is not found. If False, returns None.

Returns:

The retrieved instrument instance or

None if not found.

Return type:

_Instrum | None

Raises:

KeyError – If required is True and the instrument is not found.

Example

>>> class LoggerInstrument:
...     @staticmethod
...     def add():
...         return "Log message added!"
...
>>> class AnalyticsInstrument:
...     @staticmethod
...     def add():
...         return "Analytics entry added!"
...
>>> logger = LoggerInstrument()
>>> analytics = AnalyticsInstrument()
>>>
>>> registry = InstrumentImpRegistry(logger)
>>>
>>> registry.get(LoggerInstrument).add()
'Log message added!'
>>> registry.get(AnalyticsInstrument, required=True)
Traceback (most recent call last):
    ...
KeyError: 'Instrument `AnalyticsInstrument` not found in available implementations: `<domprob.consumers.basic.LoggerInstrument object at 0x...>`'
exception domprob.consumers.basic.ReqInstrumException(observation, sensor, req_supp_instrum, *instrum_imps)[source]

Bases: ConsumerException

Exception raised when a required instrument is missing an implementation of the same type for an observation sensors.

An instrument is marked as required with the required flag in the @sensors decorator:

>>> from domprob import sensor, BaseObservation
>>>
>>> class SomeObservation(BaseObservation):
...
...     @sensor(..., required=True)
...     def some_method(self, instrument: ...) -> None:
...         ...
...
Parameters:
  • observation (_Obs) – The observation instance where the missing instrument was required.

  • sensor (SensorMethod) – The sensors method that failed due to the missing instrument.

  • req_supp_instr (type[_Instrum]) – The instrument type that was expected but not found.

  • *instrum_imps (_Instrum) – The available instrument instances at the time of the failure.

__repr__()

Return repr(self).

add_note(object, /)

Exception.add_note(note) – add a note to the exception

args
property msg: str

Constructs a descriptive error message for the exception.

Returns:

A formatted string detailing the missing instrument,

the observation method where it was required, and the available instrument implementations.

Return type:

str

with_traceback(object, /)

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception domprob.consumers.consumer.ConsumerException[source]

Bases: DomprobException

Base exception for errors occurring within consumers.

This exception is raised when an error occurs while processing observations within a consumer.

It inherits from DomprobException, allowing it to be caught alongside other domain-specific exceptions.

__repr__()

Return repr(self).

add_note(object, /)

Exception.add_note(note) – add a note to the exception

args
with_traceback(object, /)

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class domprob.consumers.consumer.ConsumerProtocol(*args, **kwargs)[source]

Bases: Protocol

Protocol defining the structure for dispatchers handling domain observations.

Classes implementing this protocol must define:

  • dispatch(): Processes an ObservationProtocol and returns a result.

  • __repr__(): Provides a string representation of the dispatcher.

This protocol is @runtime_checkable, meaning isinstance(dispatcher, DispatcherProtocol) can be used to verify conformance at runtime.

Example

>>> from domprob.dispatchers.dispatcher import DispatcherProtocol
>>> from domprob.observations.observation import ObservationProtocol
>>>
>>> class ConcreteDispatcher:
...     @staticmethod
...     def dispatch(self, observation: ObservationProtocol) -> str:
...         return "Processed observation"
...
...     def __repr__(self) -> str:
...         return "ConcreteDispatcher()"
...
>>> dispatcher = ConcreteDispatcher()
>>> assert isinstance(dispatcher, DispatcherProtocol)
__eq__(other)[source]

Return self==value.

Return type:

bool

_abc_impl = <_abc._abc_data object>
_is_protocol = True
_is_runtime_protocol = True
consume(observation)[source]

Consume an observation and handle execution.

Parameters:

observation (ObservationProtocol[_P, _R]) – The observation to process.

Return type:

None