Module dempy.acquisitions.sensor
Expand source code
from typing import List, Dict, Any
from dempy._base import Entity
from dempy._protofiles import SensorMessage
class Sensor(Entity):
"""Sensor class"""
def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str,
manufacturer: str, model_name: str, sensor_type: str):
super().__init__(type, id, tags, metadata)
self.sync_offset = sync_offset
self.time_unit = time_unit
self.serial_number = serial_number
self.manufacturer = manufacturer
self.model_name = model_name
self.sensor_type = sensor_type
@staticmethod
def to_protobuf(obj: "Sensor") -> SensorMessage:
"""Encode an sensor to a Protobuf message
Arguments:
obj {Sensor} -- sensor to be encoded
Returns:
SensorMessage -- encoded sensor
"""
sensor_message = SensorMessage()
sensor_message.entity.CopyFrom(Entity.to_protobuf(obj))
if obj.sync_offset is not None:
sensor_message.sync_offset = obj.sync_offset
if obj.time_unit is not None:
sensor_message.time_unit = obj.time_unit
if obj.serial_number is not None:
sensor_message.serial_number = obj.serial_number
if obj.manufacturer is not None:
sensor_message.manufacturer = obj.manufacturer
if obj.model_name is not None:
sensor_message.model_name = obj.model_name
if obj.sensor_type is not None:
sensor_message.sensor_type = obj.sensor_type
return sensor_message
@staticmethod
def from_protobuf(sensor_message: SensorMessage) -> "Sensor":
"""Decode a Protobuf message to {Sensor}
Arguments:
obj {SensorMessage} -- message to be decoded
Returns:
Sensor -- decoded sensor
"""
return Sensor(
type=sensor_message.entity.type,
id=sensor_message.entity.id,
tags=sensor_message.entity.tags,
metadata=sensor_message.entity.metadata,
sync_offset=sensor_message.sync_offset if sensor_message.HasField("sync_offset") else None,
time_unit=sensor_message.time_unit if sensor_message.HasField("time_unit") else None,
serial_number=sensor_message.serial_number if sensor_message.HasField("serial_number") else None,
manufacturer=sensor_message.manufacturer if sensor_message.HasField("manufacturer") else None,
model_name=sensor_message.model_name if sensor_message.HasField("model_name") else None,
sensor_type=sensor_message.sensor_type if sensor_message.HasField("sensor_type") else None,
)
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
"""Parse a JSON dictionary to {Sensor}
Arguments:
obj {Dict[str, str]} -- JSON object
Returns:
Any -- parsed object and sub-objects
"""
if "type" in obj and obj["type"] == "Sensor":
return Sensor(
type=obj["type"],
id=obj["id"],
tags=obj["tags"],
metadata=obj["metadata"],
sync_offset=obj["syncOffset"],
time_unit=obj["timeUnit"],
serial_number=obj["serialNumber"],
manufacturer=obj["manufacturer"],
model_name=obj["modelName"],
sensor_type=obj["sensorType"],
)
return obj
__all__ = [
"Sensor"
]
Classes
class Sensor (type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str, manufacturer: str, model_name: str, sensor_type: str)
-
Sensor class
Expand source code
class Sensor(Entity): """Sensor class""" def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str, manufacturer: str, model_name: str, sensor_type: str): super().__init__(type, id, tags, metadata) self.sync_offset = sync_offset self.time_unit = time_unit self.serial_number = serial_number self.manufacturer = manufacturer self.model_name = model_name self.sensor_type = sensor_type @staticmethod def to_protobuf(obj: "Sensor") -> SensorMessage: """Encode an sensor to a Protobuf message Arguments: obj {Sensor} -- sensor to be encoded Returns: SensorMessage -- encoded sensor """ sensor_message = SensorMessage() sensor_message.entity.CopyFrom(Entity.to_protobuf(obj)) if obj.sync_offset is not None: sensor_message.sync_offset = obj.sync_offset if obj.time_unit is not None: sensor_message.time_unit = obj.time_unit if obj.serial_number is not None: sensor_message.serial_number = obj.serial_number if obj.manufacturer is not None: sensor_message.manufacturer = obj.manufacturer if obj.model_name is not None: sensor_message.model_name = obj.model_name if obj.sensor_type is not None: sensor_message.sensor_type = obj.sensor_type return sensor_message @staticmethod def from_protobuf(sensor_message: SensorMessage) -> "Sensor": """Decode a Protobuf message to {Sensor} Arguments: obj {SensorMessage} -- message to be decoded Returns: Sensor -- decoded sensor """ return Sensor( type=sensor_message.entity.type, id=sensor_message.entity.id, tags=sensor_message.entity.tags, metadata=sensor_message.entity.metadata, sync_offset=sensor_message.sync_offset if sensor_message.HasField("sync_offset") else None, time_unit=sensor_message.time_unit if sensor_message.HasField("time_unit") else None, serial_number=sensor_message.serial_number if sensor_message.HasField("serial_number") else None, manufacturer=sensor_message.manufacturer if sensor_message.HasField("manufacturer") else None, model_name=sensor_message.model_name if sensor_message.HasField("model_name") else None, sensor_type=sensor_message.sensor_type if sensor_message.HasField("sensor_type") else None, ) @staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Sensor} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"] == "Sensor": return Sensor( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], sync_offset=obj["syncOffset"], time_unit=obj["timeUnit"], serial_number=obj["serialNumber"], manufacturer=obj["manufacturer"], model_name=obj["modelName"], sensor_type=obj["sensorType"], ) return obj
Ancestors
- dempy._base.Entity
Static methods
def from_json(obj: Dict[str, str]) -> Any
-
Parse a JSON dictionary to {Sensor}
Arguments
obj {Dict[str, str]} – JSON object
Returns
Any -- parsed object and sub-objects
Expand source code
@staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Sensor} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"] == "Sensor": return Sensor( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], sync_offset=obj["syncOffset"], time_unit=obj["timeUnit"], serial_number=obj["serialNumber"], manufacturer=obj["manufacturer"], model_name=obj["modelName"], sensor_type=obj["sensorType"], ) return obj
def from_protobuf(sensor_message: dempy_pb2.Sensor) -> Sensor
-
Decode a Protobuf message to {Sensor}
Arguments
obj {SensorMessage} – message to be decoded
Returns
Sensor -- decoded sensor
Expand source code
@staticmethod def from_protobuf(sensor_message: SensorMessage) -> "Sensor": """Decode a Protobuf message to {Sensor} Arguments: obj {SensorMessage} -- message to be decoded Returns: Sensor -- decoded sensor """ return Sensor( type=sensor_message.entity.type, id=sensor_message.entity.id, tags=sensor_message.entity.tags, metadata=sensor_message.entity.metadata, sync_offset=sensor_message.sync_offset if sensor_message.HasField("sync_offset") else None, time_unit=sensor_message.time_unit if sensor_message.HasField("time_unit") else None, serial_number=sensor_message.serial_number if sensor_message.HasField("serial_number") else None, manufacturer=sensor_message.manufacturer if sensor_message.HasField("manufacturer") else None, model_name=sensor_message.model_name if sensor_message.HasField("model_name") else None, sensor_type=sensor_message.sensor_type if sensor_message.HasField("sensor_type") else None, )
def to_protobuf(obj: Sensor) -> dempy_pb2.Sensor
-
Encode an sensor to a Protobuf message
Arguments
obj {Sensor} – sensor to be encoded
Returns
SensorMessage -- encoded sensor
Expand source code
@staticmethod def to_protobuf(obj: "Sensor") -> SensorMessage: """Encode an sensor to a Protobuf message Arguments: obj {Sensor} -- sensor to be encoded Returns: SensorMessage -- encoded sensor """ sensor_message = SensorMessage() sensor_message.entity.CopyFrom(Entity.to_protobuf(obj)) if obj.sync_offset is not None: sensor_message.sync_offset = obj.sync_offset if obj.time_unit is not None: sensor_message.time_unit = obj.time_unit if obj.serial_number is not None: sensor_message.serial_number = obj.serial_number if obj.manufacturer is not None: sensor_message.manufacturer = obj.manufacturer if obj.model_name is not None: sensor_message.model_name = obj.model_name if obj.sensor_type is not None: sensor_message.sensor_type = obj.sensor_type return sensor_message