Consumers
- class domprob.consumers.basic.BasicConsumer(*instruments)[source]
Bases:
ConsumerProtocol,Generic[_Instrument]A consumer that processes observations by applying instrument implementations.
This class acts as a consumer that takes in instrument implementations and processes observations by executing their associated sensors methods with the relevant instrument.
- Parameters:
*instruments (_Instrum) – One or more instrument instances.
Example
>>> from domprob import sensor, BaseObservation >>> >>> class LoggerInstrument: ... @staticmethod ... def log(message: str): ... print(f"LOG: {message}") ... >>> class SomeObservation(BaseObservation): ... @sensor(LoggerInstrument) ... def sense_event(self, instrument: LoggerInstrument): ... instrument.log("Event sensed!") ... >>> logger = LoggerInstrument() >>> consumer = BasicConsumer(logger) >>> >>> consumer.consume(SomeObservation()) LOG: Event sensed!
- _abc_impl = <_abc._abc_data object>
- _is_protocol = False
- _is_runtime_protocol = True
- consume(observation)[source]
Processes an observation by invoking the relevant instrument methods.
The method iterates through the observation’s sensors and applies the required instrument implementations.
- Parameters:
observation (ObservationProtocol) – The observation to process.
- Return type:
None
- instrum_imps(observation, sensor)[source]
Retrieves instrument implementations required for sensors.
- Parameters:
observation (ObservationProtocol) – The observation being processed.
sensor (SensorMethod) – The sensors to handle.
- Yields:
_Instrum | None –
- The appropriate instrument
implementation or None if non-required instrument implementations are missing.
- Raises:
ReqInstrumException – If a required instrument is missing.
- Return type:
Generator[Optional[TypeVar(_Instrument, bound=Any)],None,None]
Example
>>> from domprob import sensor, BaseObservation >>> from domprob.sensors.meth import SensorMethod >>> >>> class LoggerInstrument: ... @staticmethod ... def log(message: str): ... print(f"LOG: {message}") ... >>> class SomeObservation(BaseObservation): ... @staticmethod ... @sensor(LoggerInstrument) ... def sense_observation(instrument: LoggerInstrument): ... instrument.log("Event sensed!") ... >>> logger = LoggerInstrument() >>> consumer = BasicConsumer(logger) >>> sensor_meth = SensorMethod(SomeObservation.sense_observation) >>> >>> list(consumer.instrum_imps(SomeObservation(), sensor_meth)) [<domprob.consumers.basic.LoggerInstrument object at 0x...>]
- class domprob.consumers.basic.InstrumentImpRegistry(*instruments)[source]
Bases:
Collection[_Instrument]Registry for instrument implementations, allowing lookup and caching.
This class acts as a collection that stores instruments and supports:
Efficient retrieval of instruments by type.
Caching of previously looked-up instruments for performance optimization.
- Parameters:
*instruments (_Instrum) – Variable number of instrument instances to store.
Example
>>> class LoggerInstrument: ... @staticmethod ... def add(): ... return "Log message added!" ... >>> class AnalyticsInstrument: ... @staticmethod ... def add(): ... return "Analytics entry added!" ... >>> logger = LoggerInstrument() >>> analytics = AnalyticsInstrument() >>> >>> registry = InstrumentImpRegistry(logger, analytics) >>> logger_ = registry.get(LoggerInstrument) >>> >>> logger == logger_ True >>> print(registry.get(object)) None
- __contains__(item)[source]
Check if an instrument exists in the registry.
- Parameters:
item (
object) – The instrument instance or class to check.- Returns:
True if the instrument is present, otherwise False.
- Return type:
bool
Example
>>> class LoggerInstrument: ... @staticmethod ... def add(): ... return "Log message added!" ... >>> class AnalyticsInstrument: ... @staticmethod ... def add(): ... return "Analytics entry added!" ... >>> logger = LoggerInstrument() >>> analytics = AnalyticsInstrument() >>> >>> registry = InstrumentImpRegistry(logger, analytics) >>> logger in registry True >>> object in registry False
- __iter__()[source]
Iterate over stored instruments.
- Returns:
An iterator over the instruments.
- Return type:
Iterator[_Instrum]
Example
>>> class LoggerInstrument: ... @staticmethod ... def add(): ... return "Log message added!" ... >>> class AnalyticsInstrument: ... @staticmethod ... def add(): ... return "Analytics entry added!" ... >>> logger = LoggerInstrument() >>> analytics = AnalyticsInstrument() >>> >>> registry = InstrumentImpRegistry(logger, analytics) >>> >>> for instrument in registry: ... print(instrument.add()) ... Log message added! Analytics entry added!
- __len__()[source]
Return the number of stored instruments.
- Returns:
The number of instruments in the registry.
- Return type:
int
Example
>>> class LoggerInstrument: ... @staticmethod ... def add(): ... return "Log message added!" ... >>> class AnalyticsInstrument: ... @staticmethod ... def add(): ... return "Analytics entry added!" ... >>> logger = LoggerInstrument() >>> analytics = AnalyticsInstrument() >>> >>> registry = InstrumentImpRegistry(logger, analytics) >>> >>> len(registry) 2
- __repr__()[source]
Return a string representation of the registry.
- Returns:
The string representation of the registry.
- Return type:
str
- _abc_impl = <_abc._abc_data object>
- static _is_hashable(obj)[source]
Check if an object is hashable.
- Parameters:
obj (Any) – The object to check.
- Returns:
True if the object is hashable, False otherwise.
- Return type:
bool
- get(instrument_cls, required=False)[source]
Retrieve an instrument instance by its class type.
If the instrument class is hashable, results are cached for efficiency.
- Parameters:
instrument_cls (
type[TypeVar(_Instrument, bound=Any)]) – The class type of the instrument to retrieve.required (
bool) – If True, raises a KeyError if the instrument is not found. If False, returns None.
- Returns:
- The retrieved instrument instance or
None if not found.
- Return type:
_Instrum | None
- Raises:
KeyError – If required is True and the instrument is not found.
Example
>>> class LoggerInstrument: ... @staticmethod ... def add(): ... return "Log message added!" ... >>> class AnalyticsInstrument: ... @staticmethod ... def add(): ... return "Analytics entry added!" ... >>> logger = LoggerInstrument() >>> analytics = AnalyticsInstrument() >>> >>> registry = InstrumentImpRegistry(logger) >>> >>> registry.get(LoggerInstrument).add() 'Log message added!' >>> registry.get(AnalyticsInstrument, required=True) Traceback (most recent call last): ... KeyError: 'Instrument `AnalyticsInstrument` not found in available implementations: `<domprob.consumers.basic.LoggerInstrument object at 0x...>`'
- exception domprob.consumers.basic.ReqInstrumException(observation, sensor, req_supp_instrum, *instrum_imps)[source]
Bases:
ConsumerExceptionException raised when a required instrument is missing an implementation of the same type for an observation sensors.
An instrument is marked as required with the required flag in the @sensors decorator:
>>> from domprob import sensor, BaseObservation >>> >>> class SomeObservation(BaseObservation): ... ... @sensor(..., required=True) ... def some_method(self, instrument: ...) -> None: ... ... ...
- Parameters:
observation (_Obs) – The observation instance where the missing instrument was required.
sensor (SensorMethod) – The sensors method that failed due to the missing instrument.
req_supp_instr (type[_Instrum]) – The instrument type that was expected but not found.
*instrum_imps (_Instrum) – The available instrument instances at the time of the failure.
- __repr__()
Return repr(self).
- add_note(object, /)
Exception.add_note(note) – add a note to the exception
- args
- property msg: str
Constructs a descriptive error message for the exception.
- Returns:
- A formatted string detailing the missing instrument,
the observation method where it was required, and the available instrument implementations.
- Return type:
str
- with_traceback(object, /)
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception domprob.consumers.consumer.ConsumerException[source]
Bases:
DomprobExceptionBase exception for errors occurring within consumers.
This exception is raised when an error occurs while processing observations within a consumer.
It inherits from DomprobException, allowing it to be caught alongside other domain-specific exceptions.
- __repr__()
Return repr(self).
- add_note(object, /)
Exception.add_note(note) – add a note to the exception
- args
- with_traceback(object, /)
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class domprob.consumers.consumer.ConsumerProtocol(*args, **kwargs)[source]
Bases:
ProtocolProtocol defining the structure for dispatchers handling domain observations.
Classes implementing this protocol must define:
dispatch(): Processes an ObservationProtocol and returns a result.
__repr__(): Provides a string representation of the dispatcher.
This protocol is @runtime_checkable, meaning isinstance(dispatcher, DispatcherProtocol) can be used to verify conformance at runtime.
Example
>>> from domprob.dispatchers.dispatcher import DispatcherProtocol >>> from domprob.observations.observation import ObservationProtocol >>> >>> class ConcreteDispatcher: ... @staticmethod ... def dispatch(self, observation: ObservationProtocol) -> str: ... return "Processed observation" ... ... def __repr__(self) -> str: ... return "ConcreteDispatcher()" ... >>> dispatcher = ConcreteDispatcher() >>> assert isinstance(dispatcher, DispatcherProtocol)
- _abc_impl = <_abc._abc_data object>
- _is_protocol = True
- _is_runtime_protocol = True