Module dempy.acquisitions
Expand source code
from .acquisition import Acquisition, get, count
from .annotation import Annotation
from .device import Device
from .image_sample import ImageSample
from .sensor import Sensor
from .subject import Subject
from .timeseries_sample import TimeseriesSample
from .video_sample import VideoSample
__all__ = [
"Acquisition", "Subject", "Device", "Sensor", "TimeseriesSample", "ImageSample", "VideoSample", "Annotation",
"get", "count"
]
Sub-modules
dempy.acquisitions.acquisition
dempy.acquisitions.annotation
dempy.acquisitions.device
dempy.acquisitions.image_sample
dempy.acquisitions.sensor
dempy.acquisitions.subject
dempy.acquisitions.timeseries_sample
dempy.acquisitions.video_sample
Functions
def count() -> int
-
Get number of acquisitions
Returns
int -- number
ofacquisitions
Expand source code
def count() -> int: """Get number of acquisitions Returns: int -- number of acquisitions """ return _api_calls.get(_ENDPOINT + "count").json()
def get(acquisition_id: str = None, dataset_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Acquisition, List[Acquisition]]
-
Get an acquisition identified by
acquisition_id
or a list of all the acquisitionsKeyword Arguments: acquisition_id {str} – id of the acquisition (default: {None}) dataset_id {str} – id of the dataset to which the acquisitions belong to (default: {None}) tags {List[str]} – tags of the acquisitions (default: {[]}) metadata {Dict[str, str]} – metadata of the acquisitions (default: {{}})
Returns
Union[Acquisition, List[Acquisition]] -- acquisition
orlist
ofacquisitions
Expand source code
def get(acquisition_id: str = None, dataset_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) \ -> Union[Acquisition, List[Acquisition]]: """Get an acquisition identified by `acquisition_id` or a list of all the acquisitions Keyword Arguments: acquisition_id {str} -- id of the acquisition (default: {None}) dataset_id {str} -- id of the dataset to which the acquisitions belong to (default: {None}) tags {List[str]} -- tags of the acquisitions (default: {[]}) metadata {Dict[str, str]} -- metadata of the acquisitions (default: {{}}) Returns: Union[Acquisition, List[Acquisition]] -- acquisition or list of acquisitions """ if acquisition_id is None: processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata} acquisitions = _api_calls.get(_ENDPOINT, params={"datasetId": dataset_id, "tags": tags, **processed_metadata}) \ .json(object_hook=Acquisition.from_json) for acquisition in acquisitions: cache._cache_data("acquisitions", acquisition.id, acquisition, Acquisition.to_protobuf) return acquisitions else: try: acquisition = cache._get_cached_data("acquisitions", acquisition_id, Acquisition.from_protobuf) except FileNotFoundError: acquisition = _api_calls.get(_ENDPOINT + acquisition_id).json(object_hook=Acquisition.from_json) cache._cache_data("acquisitions", acquisition_id, acquisition, Acquisition.to_protobuf) return acquisition
Classes
class Acquisition (type: str, id: str, tags: List[str], metadata: Dict[str, str], creation_timestamp: int, sync_offset: int, time_unit: str, owner_id: str, creator_id: str, dataset_id: str, subject: Subject, devices: List[Device], has_timeseries_samples: bool, has_image_samples: bool, has_video_samples: bool)
-
Acquisition class
Expand source code
class Acquisition(Entity): """Acquisition class""" def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], creation_timestamp: int, sync_offset: int, time_unit: str, owner_id: str, creator_id: str, dataset_id: str, subject: Subject, devices: List[Device], has_timeseries_samples: bool, has_image_samples: bool, has_video_samples: bool): super().__init__(type, id, tags, metadata) self.creation_timestamp = creation_timestamp self.sync_offset = sync_offset self.time_unit = time_unit self.owner_id = owner_id self.creator_id = creator_id self.dataset_id = dataset_id self.has_timeseries_samples = has_timeseries_samples self.has_image_samples = has_image_samples self.has_video_samples = has_video_samples self._subject = subject self._devices = devices @property def subject(self): """Subject's API""" class Inner: _SUBJECT_ENDPOINT = _ENDPOINT + "{}/subjects/".format(self.id) @staticmethod def get() -> Subject: """Get subject from acquisition Returns: Subject -- subject of the acquisition """ return self._subject return Inner() @property def devices(self): """Devices' API""" class Inner: _DEVICES_ENDPOINT = _ENDPOINT + "{}/devices/".format(self.id) @staticmethod def get(device_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Device, List[Device]]: """Get a device identified by `device_id` or list of devices on this acquisition Keyword Arguments: device_id {str} -- id of the device (default: {None}) tags {List[str]} -- tags of the devices (default: {[]}) metadata {Dict[str, str]} -- metadata of the devices (default: {{}}) Raises: IndexError: device identified by `device_id` does not exist in this acquisition Returns: Union[Device, List[Device]] -- device or list of devices """ if device_id is None: if len(tags) > 0 or len(metadata) > 0: return [d for d in self._devices if len([k for k in d.metadata if k in metadata and d.metadata[k] == metadata[k]]) > 0] return self._devices else: try: device = next((device for device in self._devices if device.id == device_id)) except StopIteration: raise IndexError(f"device id {device_id} does not exist in acquisition id {self.id}") return device @staticmethod def usage() -> Dict[str, List[str]]: """Get a map identifying which device(s) and sensor(s) were used to acquire time series samples Returns: Mapping[str, List[str]] -- map of (key, value) pairs, with key being id of device and the value a list of sensor ids which were used to capture samples """ return _api_calls.get(Inner._DEVICES_ENDPOINT + "usage").json() @staticmethod def count() -> int: """Get the number of devices on this acquisition Returns: int -- number of devices """ return len(self._devices) return Inner() @property def timeseries_samples(self): """Timeseries samples' API""" class Inner: _TIMESERIES_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/timeseries/".format(self.id) @staticmethod def get(tags: List[str] = [], metadata: Dict[str, str] = {}) -> SampleList: """Get all the timeseries samples that belong to this acquisition Keyword Arguments: tags {List[str]} -- tags of the timeseries samples (default: {[]}) metadata {Dict[str, str]} -- metadata of the timeseries samples (default: {{}}) Returns: SampleList -- list of timeseries samples """ if len(tags) > 0 or len(metadata) > 0: processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata} samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \ .json(object_hook=TimeseriesSample.from_json) samples.sort(key=lambda sample: sample.timestamp) return SampleList(samples) try: samples = cache._get_cached_data("samples/{}/".format(self.id), "timeseries", SampleList.from_protobuf) except FileNotFoundError: samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT) \ .json(object_hook=TimeseriesSample.from_json) samples.sort(key=lambda sample: sample.timestamp) samples = SampleList(samples) cache._cache_data("samples/{}/".format(self.id), "timeseries", samples, SampleList.to_protobuf) return samples @staticmethod def count() -> int: """Get the number of timeseries samples on this acquisition Returns: int -- number of timeseries samples """ return _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT + "count").json() @staticmethod def visualize(device_id: str, sensor_id: str = None) -> None: """Graphically visualize the timeseries samples of a device identified by `device_id` or of a given sensor identified by `sensor_id` of said device Arguments: device_id {str} -- id of the device Keyword Arguments: sensor_id {str} -- id of the sensor (default: {None}) """ def visualize_sensor_samples(axis, sensor, sensor_samples): timestamps = [s.timestamp for s in sensor_samples] # Sample x, y, z, u, w samples_x = [s.x for s in sensor_samples if hasattr(s, "x")] samples_y = [s.y for s in sensor_samples if hasattr(s, "y")] samples_z = [s.z for s in sensor_samples if hasattr(s, "z")] samples_u = [s.u for s in sensor_samples if hasattr(s, "u")] samples_w = [s.w for s in sensor_samples if hasattr(s, "w")] # Title and x label axis.set_title(f"{sensor.sensor_type}\n{sensor.id}", loc="left") axis.set_xlabel(sensor.time_unit if sensor.time_unit is not None else device_time_unit) # Axis limit axis.set_xlim([0, timestamps[-1]]) axis.set_ylim([min(chain(samples_x, samples_y, samples_z, samples_u, samples_w)), max(chain(samples_x, samples_y, samples_z, samples_u, samples_w))]) # Axis formatter axis.xaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f")) axis.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f")) # Axis plot labels = [] if len(samples_x) > 0: axis.plot(timestamps, samples_x, color="cornflowerblue") labels.append("x") if len(samples_y) > 0: axis.plot(timestamps, samples_y, color="mediumseagreen") labels.append("y") if len(samples_z) > 0: axis.plot(timestamps, samples_z, color="indianred") labels.append("z") if len(samples_u) > 0: axis.plot(timestamps, samples_u, color="mediumorchid") labels.append("u") if len(samples_w) > 0: axis.plot(timestamps, samples_w, color="slategray") labels.append("w") axis.legend(labels=labels, loc="upper right") device = self.devices.get(device_id=device_id) device_time_unit = device.time_unit if device.time_unit is not None else self.time_unit if sensor_id is None: fig, axs = plt.subplots(nrows=device.sensors.count(), figsize=(15, 10), dpi=80, constrained_layout=True) fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True) device_samples = self.timeseries_samples.get().by_device(device_id=device.id) for i, sensor in enumerate(device.sensors.get()): visualize_sensor_samples(axs[i], sensor, device_samples.by_sensor(sensor.id)) else: fig, ax = plt.subplots(nrows=1, figsize=(10, 4), dpi=80, constrained_layout=True) fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True) sensor_samples = self.timeseries_samples.get().by_sensor(sensor_id) visualize_sensor_samples(ax, device.sensors.get(sensor_id=sensor_id), sensor_samples) plt.show() return Inner() @property def image_samples(self): """Image samples' API""" class Inner: _IMAGE_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/images/".format(self.id) @staticmethod def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[ImageSample, SampleList]: """Get all the image samples that belong to this acquisition Keyword Arguments: sample_id {str} -- id of the sample (default: {None}) tags {List[str]} -- tags of image samples (default: {[]}) metadata {Dict[str, str]} -- metadata of the image samples (default: {{}}) Returns: Union[ImageSample, SampleList] -- image sample or list of image samples """ if sample_id is None: if len(tags) > 0 or len(metadata) > 0: processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata} samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \ .json(object_hook=ImageSample.from_json) return SampleList(samples) samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT).json(object_hook=ImageSample.from_json) for sample in samples: cache._cache_data("samples/{}/images/".format(self.id), sample.id, sample, ImageSample.to_protobuf) return SampleList(samples) else: try: sample = cache._get_cached_data("samples/{}/images/".format(self.id), sample_id, ImageSample.from_protobuf) except FileNotFoundError: sample = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id).json(object_hook=ImageSample.from_json) cache._cache_data("samples/{}/images/".format(self.id), sample_id, sample, ImageSample.to_protobuf) return sample @staticmethod def raw(sample_id: str) -> ByteString: """Get actual image from image sample identified by `sample_id` on this acquisition Arguments: sample_id {str} -- id of the sample Returns: ByteString -- bytes of the image """ try: image = cache._get_cached_data("samples/{}/images/raw/".format(self.id), sample_id) except FileNotFoundError: image = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id + "/raw") file_ext = "." + image.headers["Content-Type"].split("/")[-1] cache._cache_data("samples/{}/images/raw/".format(self.id), sample_id + file_ext, image.content) return image @staticmethod def count() -> int: """Get the number of image samples on this acquisition Returns: int -- number of image samples """ return _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + "count").json() @staticmethod def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None: """Visualize the image of a sample identified by `sample_id`. By default opens the predefined system image application. A different callback can be given to open the image. Arguments: sample_id {str} -- id of the sample Keyword Arguments: backend {Callable[[str], None]} -- backend to open the image with (default: {None}) """ self.image_samples.raw(sample_id) image_path = cache._build_cache_path("samples/{}/images/raw/".format(self.id), sample_id) image_path = cache._add_file_extension(image_path) if backend is None: system = platform.system() if system == "Darwin": subprocess.call(("open", image_path)) elif system == "Windows": os.startfile(image_path) else: subprocess.call(("xdg-open", image_path)) else: backend(image_path) return Inner() @property def video_samples(self): """Video samples' API""" class Inner: _VIDEO_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/videos/".format(self.id) @staticmethod def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[VideoSample, SampleList]: """Get all the video samples that belong to this acquisition Keyword Arguments: sample_id {str} -- id of the sample (default: {None}) tags {List[str]} -- tags of image samples (default: {[]}) metadata {Dict[str, str]} -- metadata of the image samples (default: {{}}) Returns: Union[VideoSample, SampleList] -- video sample or list of video samples """ if sample_id is None: if len(tags) > 0 or len(metadata) > 0: processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata} samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \ .json(object_hook=VideoSample.from_json) return SampleList(samples) samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT).json(object_hook=VideoSample.from_json) for sample in samples: cache._cache_data("samples/{}/videos/".format(self.id), sample.id, sample, VideoSample.to_protobuf) return SampleList(samples) else: try: sample = cache._get_cached_data("samples/{}/videos/".format(self.id), sample_id, VideoSample.from_protobuf) except FileNotFoundError: sample = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id).json(object_hook=VideoSample.from_json) cache._cache_data("samples/{}/videos/".format(self.id), sample_id, sample, VideoSample.to_protobuf) return sample @staticmethod def raw(sample_id: str) -> ByteString: """Get actual video from video sample identified by `sample_id` on this acquisition Arguments: sample_id {str} -- id of the sample Returns: ByteString -- bytes of the video """ try: video = cache._get_cached_data("samples/{}/videos/raw/".format(self.id), sample_id) except FileNotFoundError: video = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id + "/raw") file_ext = "." + video.headers["Content-Type"].split("/")[-1] cache._cache_data("samples/{}/videos/raw/".format(self.id), sample_id + file_ext, video.content) return video @staticmethod def count() -> int: """Get the number of video samples on this acquisition Returns: int -- number of video samples """ return _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + "count").json() @staticmethod def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None: """Visualize the video of a sample identified by `sample_id`. By default opens the predefined system video application. A different callback can be given to open the video. Arguments: sample_id {str} -- id of the sample Keyword Arguments: backend {Callable[[str], None]} -- backend to open the video with (default: {None}) """ self.video_samples.raw(sample_id) video_path = cache._build_cache_path("samples/{}/videos/raw/".format(self.id), sample_id) video_path = cache._add_file_extension(video_path) if backend is None: system = platform.system() if system == "Darwin": subprocess.call(("open", video_path)) elif system == "Windows": os.startfile(video_path) else: subprocess.call(("xdg-open", video_path)) else: backend(video_path) return Inner() @property def annotations(self): """Annotations' API""" class Inner: _ANNOTATIONS_ENDPOINT = _ENDPOINT + "{}/annotations/".format(self.id) @staticmethod def get(annotation_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> AnnotationList: """Get all the annotations that belong to this acquisition Keyword Arguments: annotation_id {str} -- id of the annotation (default: {None}) tags {List[str]} -- tags of the annotation (default: {[]}) metadata {Dict[str, str]} -- metadata of the annotation (default: {{}}) Returns: AnnotationList -- annotation or annotation of video samples """ if annotation_id is None: if len(tags) > 0 or len(metadata) > 0: processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata} annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT, params={"tags": tags, **processed_metadata}) \ .json(object_hook=Annotation.from_json) return AnnotationList(annotations) annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT).json(object_hook=Annotation.from_json) for annotation in annotations: cache._cache_data("annotations", annotation.id, annotation, Annotation.to_protobuf) return AnnotationList(annotations) else: try: annotation = cache._get_cached_data("annotations", annotation_id, Annotation.from_protobuf) except FileNotFoundError: annotation = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + annotation_id).json(object_hook=Annotation.from_json) cache._cache_data("annotations", annotation_id, annotation, Annotation.to_protobuf) return annotation @staticmethod def count() -> int: """Get the number of annotations on this acquisition Returns: int -- number of annotations """ return _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + "count").json() return Inner() @staticmethod def to_protobuf(obj: "Acquisition") -> AcquisitionMessage: """Encode an acquisition to a Protobuf message Arguments: obj {Acquisition} -- acquisition to be encoded Returns: AcquisitionMessage -- encoded acquisition """ acquisition_message = AcquisitionMessage() acquisition_message.entity.CopyFrom(Entity.to_protobuf(obj)) acquisition_message.creation_timestamp = obj.creation_timestamp if obj.sync_offset is not None: acquisition_message.sync_offset = obj.sync_offset if obj.time_unit is not None: acquisition_message.time_unit = obj.time_unit if obj.owner_id is not None: acquisition_message.owner_id = obj.owner_id if obj.creator_id is not None: acquisition_message.creator_id = obj.creator_id if obj.dataset_id is not None: acquisition_message.dataset_id = obj.dataset_id acquisition_message.subject.CopyFrom(Subject.to_protobuf(obj._subject)) acquisition_message.devices.extend([Device.to_protobuf(d) for d in obj._devices]) acquisition_message.has_timeseries_samples = obj.has_timeseries_samples acquisition_message.has_image_samples = obj.has_image_samples acquisition_message.has_video_samples = obj.has_video_samples return acquisition_message @staticmethod def from_protobuf(obj: ByteString) -> "Acquisition": """Decode a Protobuf message to {Acquisition} Arguments: obj {ByteString} -- message to be decoded Returns: Acquisition -- decoded acquisition """ acquisition_message = AcquisitionMessage() acquisition_message.ParseFromString(obj) return Acquisition( type=acquisition_message.entity.type, id=acquisition_message.entity.id, tags=acquisition_message.entity.tags, metadata=acquisition_message.entity.metadata, creation_timestamp=acquisition_message.creation_timestamp, sync_offset=acquisition_message.sync_offset if acquisition_message.HasField("sync_offset") else None, time_unit=acquisition_message.time_unit, owner_id=acquisition_message.owner_id if acquisition_message.HasField("owner_id") else None, creator_id=acquisition_message.creator_id if acquisition_message.HasField("creator_id") else None, dataset_id=acquisition_message.dataset_id if acquisition_message.HasField("dataset_id") else None, subject=Subject.from_protobuf(acquisition_message.subject), devices=[Device.from_protobuf(d) for d in acquisition_message.devices], has_timeseries_samples=acquisition_message.has_timeseries_samples, has_image_samples=acquisition_message.has_image_samples, has_video_samples=acquisition_message.has_video_samples ) @staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Acquisition} Arguments: obj {Dict[str, str]} -- JSON object Raises: ValueError: unexpected object or sub-object Returns: Any -- parsed object and sub-objects """ if "type" in obj: if obj["type"] == "Acquisition": return Acquisition( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], creation_timestamp=obj["creationTimestamp"], sync_offset=obj["syncOffset"], time_unit=obj["timeUnit"], owner_id=obj["ownerId"], creator_id=obj["creatorId"], dataset_id=obj["datasetId"], subject=obj["subject"], devices=obj["devices"], has_timeseries_samples=obj["hasTimeSeriesSamples"], has_image_samples=obj["hasImageSamples"], has_video_samples=obj["hasVideoSamples"] ) elif obj["type"].endswith("Subject"): return Subject.from_json(obj) elif obj["type"] == "Device": return Device.from_json(obj) elif obj["type"] == "Sensor": return Sensor.from_json(obj) else: raise ValueError return obj
Ancestors
- dempy._base.Entity
Static methods
def from_json(obj: Dict[str, str]) -> Any
-
Parse a JSON dictionary to {Acquisition}
Arguments
obj {Dict[str, str]} – JSON object
Raises
ValueError
- unexpected object or sub-object
Returns
Any -- parsed object and sub-objects
Expand source code
@staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Acquisition} Arguments: obj {Dict[str, str]} -- JSON object Raises: ValueError: unexpected object or sub-object Returns: Any -- parsed object and sub-objects """ if "type" in obj: if obj["type"] == "Acquisition": return Acquisition( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], creation_timestamp=obj["creationTimestamp"], sync_offset=obj["syncOffset"], time_unit=obj["timeUnit"], owner_id=obj["ownerId"], creator_id=obj["creatorId"], dataset_id=obj["datasetId"], subject=obj["subject"], devices=obj["devices"], has_timeseries_samples=obj["hasTimeSeriesSamples"], has_image_samples=obj["hasImageSamples"], has_video_samples=obj["hasVideoSamples"] ) elif obj["type"].endswith("Subject"): return Subject.from_json(obj) elif obj["type"] == "Device": return Device.from_json(obj) elif obj["type"] == "Sensor": return Sensor.from_json(obj) else: raise ValueError return obj
def from_protobuf(obj: ByteString) -> Acquisition
-
Decode a Protobuf message to {Acquisition}
Arguments
obj {ByteString} – message to be decoded
Returns
Acquisition -- decoded acquisition
Expand source code
@staticmethod def from_protobuf(obj: ByteString) -> "Acquisition": """Decode a Protobuf message to {Acquisition} Arguments: obj {ByteString} -- message to be decoded Returns: Acquisition -- decoded acquisition """ acquisition_message = AcquisitionMessage() acquisition_message.ParseFromString(obj) return Acquisition( type=acquisition_message.entity.type, id=acquisition_message.entity.id, tags=acquisition_message.entity.tags, metadata=acquisition_message.entity.metadata, creation_timestamp=acquisition_message.creation_timestamp, sync_offset=acquisition_message.sync_offset if acquisition_message.HasField("sync_offset") else None, time_unit=acquisition_message.time_unit, owner_id=acquisition_message.owner_id if acquisition_message.HasField("owner_id") else None, creator_id=acquisition_message.creator_id if acquisition_message.HasField("creator_id") else None, dataset_id=acquisition_message.dataset_id if acquisition_message.HasField("dataset_id") else None, subject=Subject.from_protobuf(acquisition_message.subject), devices=[Device.from_protobuf(d) for d in acquisition_message.devices], has_timeseries_samples=acquisition_message.has_timeseries_samples, has_image_samples=acquisition_message.has_image_samples, has_video_samples=acquisition_message.has_video_samples )
def to_protobuf(obj: Acquisition) -> dempy_pb2.Acquisition
-
Encode an acquisition to a Protobuf message
Arguments
obj {Acquisition} – acquisition to be encoded
Returns
AcquisitionMessage -- encoded acquisition
Expand source code
@staticmethod def to_protobuf(obj: "Acquisition") -> AcquisitionMessage: """Encode an acquisition to a Protobuf message Arguments: obj {Acquisition} -- acquisition to be encoded Returns: AcquisitionMessage -- encoded acquisition """ acquisition_message = AcquisitionMessage() acquisition_message.entity.CopyFrom(Entity.to_protobuf(obj)) acquisition_message.creation_timestamp = obj.creation_timestamp if obj.sync_offset is not None: acquisition_message.sync_offset = obj.sync_offset if obj.time_unit is not None: acquisition_message.time_unit = obj.time_unit if obj.owner_id is not None: acquisition_message.owner_id = obj.owner_id if obj.creator_id is not None: acquisition_message.creator_id = obj.creator_id if obj.dataset_id is not None: acquisition_message.dataset_id = obj.dataset_id acquisition_message.subject.CopyFrom(Subject.to_protobuf(obj._subject)) acquisition_message.devices.extend([Device.to_protobuf(d) for d in obj._devices]) acquisition_message.has_timeseries_samples = obj.has_timeseries_samples acquisition_message.has_image_samples = obj.has_image_samples acquisition_message.has_video_samples = obj.has_video_samples return acquisition_message
Instance variables
var annotations
-
Annotations' API
Expand source code
@property def annotations(self): """Annotations' API""" class Inner: _ANNOTATIONS_ENDPOINT = _ENDPOINT + "{}/annotations/".format(self.id) @staticmethod def get(annotation_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> AnnotationList: """Get all the annotations that belong to this acquisition Keyword Arguments: annotation_id {str} -- id of the annotation (default: {None}) tags {List[str]} -- tags of the annotation (default: {[]}) metadata {Dict[str, str]} -- metadata of the annotation (default: {{}}) Returns: AnnotationList -- annotation or annotation of video samples """ if annotation_id is None: if len(tags) > 0 or len(metadata) > 0: processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata} annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT, params={"tags": tags, **processed_metadata}) \ .json(object_hook=Annotation.from_json) return AnnotationList(annotations) annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT).json(object_hook=Annotation.from_json) for annotation in annotations: cache._cache_data("annotations", annotation.id, annotation, Annotation.to_protobuf) return AnnotationList(annotations) else: try: annotation = cache._get_cached_data("annotations", annotation_id, Annotation.from_protobuf) except FileNotFoundError: annotation = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + annotation_id).json(object_hook=Annotation.from_json) cache._cache_data("annotations", annotation_id, annotation, Annotation.to_protobuf) return annotation @staticmethod def count() -> int: """Get the number of annotations on this acquisition Returns: int -- number of annotations """ return _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + "count").json() return Inner()
var devices
-
Devices' API
Expand source code
@property def devices(self): """Devices' API""" class Inner: _DEVICES_ENDPOINT = _ENDPOINT + "{}/devices/".format(self.id) @staticmethod def get(device_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Device, List[Device]]: """Get a device identified by `device_id` or list of devices on this acquisition Keyword Arguments: device_id {str} -- id of the device (default: {None}) tags {List[str]} -- tags of the devices (default: {[]}) metadata {Dict[str, str]} -- metadata of the devices (default: {{}}) Raises: IndexError: device identified by `device_id` does not exist in this acquisition Returns: Union[Device, List[Device]] -- device or list of devices """ if device_id is None: if len(tags) > 0 or len(metadata) > 0: return [d for d in self._devices if len([k for k in d.metadata if k in metadata and d.metadata[k] == metadata[k]]) > 0] return self._devices else: try: device = next((device for device in self._devices if device.id == device_id)) except StopIteration: raise IndexError(f"device id {device_id} does not exist in acquisition id {self.id}") return device @staticmethod def usage() -> Dict[str, List[str]]: """Get a map identifying which device(s) and sensor(s) were used to acquire time series samples Returns: Mapping[str, List[str]] -- map of (key, value) pairs, with key being id of device and the value a list of sensor ids which were used to capture samples """ return _api_calls.get(Inner._DEVICES_ENDPOINT + "usage").json() @staticmethod def count() -> int: """Get the number of devices on this acquisition Returns: int -- number of devices """ return len(self._devices) return Inner()
var image_samples
-
Image samples' API
Expand source code
@property def image_samples(self): """Image samples' API""" class Inner: _IMAGE_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/images/".format(self.id) @staticmethod def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[ImageSample, SampleList]: """Get all the image samples that belong to this acquisition Keyword Arguments: sample_id {str} -- id of the sample (default: {None}) tags {List[str]} -- tags of image samples (default: {[]}) metadata {Dict[str, str]} -- metadata of the image samples (default: {{}}) Returns: Union[ImageSample, SampleList] -- image sample or list of image samples """ if sample_id is None: if len(tags) > 0 or len(metadata) > 0: processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata} samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \ .json(object_hook=ImageSample.from_json) return SampleList(samples) samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT).json(object_hook=ImageSample.from_json) for sample in samples: cache._cache_data("samples/{}/images/".format(self.id), sample.id, sample, ImageSample.to_protobuf) return SampleList(samples) else: try: sample = cache._get_cached_data("samples/{}/images/".format(self.id), sample_id, ImageSample.from_protobuf) except FileNotFoundError: sample = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id).json(object_hook=ImageSample.from_json) cache._cache_data("samples/{}/images/".format(self.id), sample_id, sample, ImageSample.to_protobuf) return sample @staticmethod def raw(sample_id: str) -> ByteString: """Get actual image from image sample identified by `sample_id` on this acquisition Arguments: sample_id {str} -- id of the sample Returns: ByteString -- bytes of the image """ try: image = cache._get_cached_data("samples/{}/images/raw/".format(self.id), sample_id) except FileNotFoundError: image = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id + "/raw") file_ext = "." + image.headers["Content-Type"].split("/")[-1] cache._cache_data("samples/{}/images/raw/".format(self.id), sample_id + file_ext, image.content) return image @staticmethod def count() -> int: """Get the number of image samples on this acquisition Returns: int -- number of image samples """ return _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + "count").json() @staticmethod def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None: """Visualize the image of a sample identified by `sample_id`. By default opens the predefined system image application. A different callback can be given to open the image. Arguments: sample_id {str} -- id of the sample Keyword Arguments: backend {Callable[[str], None]} -- backend to open the image with (default: {None}) """ self.image_samples.raw(sample_id) image_path = cache._build_cache_path("samples/{}/images/raw/".format(self.id), sample_id) image_path = cache._add_file_extension(image_path) if backend is None: system = platform.system() if system == "Darwin": subprocess.call(("open", image_path)) elif system == "Windows": os.startfile(image_path) else: subprocess.call(("xdg-open", image_path)) else: backend(image_path) return Inner()
var subject
-
Subject's API
Expand source code
@property def subject(self): """Subject's API""" class Inner: _SUBJECT_ENDPOINT = _ENDPOINT + "{}/subjects/".format(self.id) @staticmethod def get() -> Subject: """Get subject from acquisition Returns: Subject -- subject of the acquisition """ return self._subject return Inner()
var timeseries_samples
-
Timeseries samples' API
Expand source code
@property def timeseries_samples(self): """Timeseries samples' API""" class Inner: _TIMESERIES_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/timeseries/".format(self.id) @staticmethod def get(tags: List[str] = [], metadata: Dict[str, str] = {}) -> SampleList: """Get all the timeseries samples that belong to this acquisition Keyword Arguments: tags {List[str]} -- tags of the timeseries samples (default: {[]}) metadata {Dict[str, str]} -- metadata of the timeseries samples (default: {{}}) Returns: SampleList -- list of timeseries samples """ if len(tags) > 0 or len(metadata) > 0: processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata} samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \ .json(object_hook=TimeseriesSample.from_json) samples.sort(key=lambda sample: sample.timestamp) return SampleList(samples) try: samples = cache._get_cached_data("samples/{}/".format(self.id), "timeseries", SampleList.from_protobuf) except FileNotFoundError: samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT) \ .json(object_hook=TimeseriesSample.from_json) samples.sort(key=lambda sample: sample.timestamp) samples = SampleList(samples) cache._cache_data("samples/{}/".format(self.id), "timeseries", samples, SampleList.to_protobuf) return samples @staticmethod def count() -> int: """Get the number of timeseries samples on this acquisition Returns: int -- number of timeseries samples """ return _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT + "count").json() @staticmethod def visualize(device_id: str, sensor_id: str = None) -> None: """Graphically visualize the timeseries samples of a device identified by `device_id` or of a given sensor identified by `sensor_id` of said device Arguments: device_id {str} -- id of the device Keyword Arguments: sensor_id {str} -- id of the sensor (default: {None}) """ def visualize_sensor_samples(axis, sensor, sensor_samples): timestamps = [s.timestamp for s in sensor_samples] # Sample x, y, z, u, w samples_x = [s.x for s in sensor_samples if hasattr(s, "x")] samples_y = [s.y for s in sensor_samples if hasattr(s, "y")] samples_z = [s.z for s in sensor_samples if hasattr(s, "z")] samples_u = [s.u for s in sensor_samples if hasattr(s, "u")] samples_w = [s.w for s in sensor_samples if hasattr(s, "w")] # Title and x label axis.set_title(f"{sensor.sensor_type}\n{sensor.id}", loc="left") axis.set_xlabel(sensor.time_unit if sensor.time_unit is not None else device_time_unit) # Axis limit axis.set_xlim([0, timestamps[-1]]) axis.set_ylim([min(chain(samples_x, samples_y, samples_z, samples_u, samples_w)), max(chain(samples_x, samples_y, samples_z, samples_u, samples_w))]) # Axis formatter axis.xaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f")) axis.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f")) # Axis plot labels = [] if len(samples_x) > 0: axis.plot(timestamps, samples_x, color="cornflowerblue") labels.append("x") if len(samples_y) > 0: axis.plot(timestamps, samples_y, color="mediumseagreen") labels.append("y") if len(samples_z) > 0: axis.plot(timestamps, samples_z, color="indianred") labels.append("z") if len(samples_u) > 0: axis.plot(timestamps, samples_u, color="mediumorchid") labels.append("u") if len(samples_w) > 0: axis.plot(timestamps, samples_w, color="slategray") labels.append("w") axis.legend(labels=labels, loc="upper right") device = self.devices.get(device_id=device_id) device_time_unit = device.time_unit if device.time_unit is not None else self.time_unit if sensor_id is None: fig, axs = plt.subplots(nrows=device.sensors.count(), figsize=(15, 10), dpi=80, constrained_layout=True) fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True) device_samples = self.timeseries_samples.get().by_device(device_id=device.id) for i, sensor in enumerate(device.sensors.get()): visualize_sensor_samples(axs[i], sensor, device_samples.by_sensor(sensor.id)) else: fig, ax = plt.subplots(nrows=1, figsize=(10, 4), dpi=80, constrained_layout=True) fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True) sensor_samples = self.timeseries_samples.get().by_sensor(sensor_id) visualize_sensor_samples(ax, device.sensors.get(sensor_id=sensor_id), sensor_samples) plt.show() return Inner()
var video_samples
-
Video samples' API
Expand source code
@property def video_samples(self): """Video samples' API""" class Inner: _VIDEO_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/videos/".format(self.id) @staticmethod def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[VideoSample, SampleList]: """Get all the video samples that belong to this acquisition Keyword Arguments: sample_id {str} -- id of the sample (default: {None}) tags {List[str]} -- tags of image samples (default: {[]}) metadata {Dict[str, str]} -- metadata of the image samples (default: {{}}) Returns: Union[VideoSample, SampleList] -- video sample or list of video samples """ if sample_id is None: if len(tags) > 0 or len(metadata) > 0: processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata} samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \ .json(object_hook=VideoSample.from_json) return SampleList(samples) samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT).json(object_hook=VideoSample.from_json) for sample in samples: cache._cache_data("samples/{}/videos/".format(self.id), sample.id, sample, VideoSample.to_protobuf) return SampleList(samples) else: try: sample = cache._get_cached_data("samples/{}/videos/".format(self.id), sample_id, VideoSample.from_protobuf) except FileNotFoundError: sample = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id).json(object_hook=VideoSample.from_json) cache._cache_data("samples/{}/videos/".format(self.id), sample_id, sample, VideoSample.to_protobuf) return sample @staticmethod def raw(sample_id: str) -> ByteString: """Get actual video from video sample identified by `sample_id` on this acquisition Arguments: sample_id {str} -- id of the sample Returns: ByteString -- bytes of the video """ try: video = cache._get_cached_data("samples/{}/videos/raw/".format(self.id), sample_id) except FileNotFoundError: video = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id + "/raw") file_ext = "." + video.headers["Content-Type"].split("/")[-1] cache._cache_data("samples/{}/videos/raw/".format(self.id), sample_id + file_ext, video.content) return video @staticmethod def count() -> int: """Get the number of video samples on this acquisition Returns: int -- number of video samples """ return _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + "count").json() @staticmethod def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None: """Visualize the video of a sample identified by `sample_id`. By default opens the predefined system video application. A different callback can be given to open the video. Arguments: sample_id {str} -- id of the sample Keyword Arguments: backend {Callable[[str], None]} -- backend to open the video with (default: {None}) """ self.video_samples.raw(sample_id) video_path = cache._build_cache_path("samples/{}/videos/raw/".format(self.id), sample_id) video_path = cache._add_file_extension(video_path) if backend is None: system = platform.system() if system == "Darwin": subprocess.call(("open", video_path)) elif system == "Windows": os.startfile(video_path) else: subprocess.call(("xdg-open", video_path)) else: backend(video_path) return Inner()
class Annotation (type: str, id: str, tags: List[str], metadata: Dict[str, str], acquisition_id: str, creator_id: str, annotation_object: AnnotationObject, color: str, notes: str, **kwargs)
-
Annotation class
Expand source code
class Annotation(Entity): """Annotation class""" def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], acquisition_id: str, creator_id: str, annotation_object: AnnotationObject, color: str, notes: str, **kwargs): super().__init__(type, id, tags, metadata) self.acquisition_id = acquisition_id self.creator_id = creator_id self.annotation_object = annotation_object self.color = color self.notes = notes if self.type == "WholeImageAnnotation": self.annotated_sample_id: str = kwargs.get("annotated_sample_id") elif self.type == "PointAnnotation": self.annotated_sample_id: str = kwargs.get("annotated_sample_id") self.point: AnnotationPoint = kwargs.get("point") elif self.type == "CircleAnnotation": self.annotated_sample_id: str = kwargs.get("annotated_sample_id") self.center: AnnotationPoint = kwargs.get("center") self.radius: float = kwargs.get("radius") elif self.type == "DrawAnnotation": self.annotated_sample_id: str = kwargs.get("annotated_sample_id") self.points: List[AnnotationPoint] = kwargs.get("points") elif self.type == "RectangleAnnotation": self.annotated_sample_id: str = kwargs.get("annotated_sample_id") self.point: AnnotationPoint = kwargs.get("point") self.width: float = kwargs.get("width") self.height: float = kwargs.get("height") elif self.type == "PolygonAnnotation": self.annotated_sample_id: str = kwargs.get("annotated_sample_id") self.points: List[AnnotationPoint] = kwargs.get("points") elif self.type == "TimeSeriesInstantAnnotation": self.timestamp: int = kwargs.get("timestamp") self.device_id: str = kwargs.get("device_id") self.sensor_id: str = kwargs.get("sensor_id") elif self.type == "TimeSeriesIntervalAnnotation": self.timestamp_start: int = kwargs.get("timestamp_start") self.timestamp_end: int = kwargs.get("timestamp_end") self.device_id: str = kwargs.get("device_id") self.sensor_id: str = kwargs.get("sensor_id") else: raise ValueError @staticmethod def to_protobuf(obj: "Annotation") -> AnnotationMessage: """Encode an annotation to a Protobuf message Arguments: obj {Annotation} -- annotation to be encoded Returns: AnnotationMessage -- encoded annotation """ annotation_message = AnnotationMessage() annotation_message.entity.CopyFrom(Entity.to_protobuf(obj)) annotation_message.acquisition_id = obj.acquisition_id if obj.creator_id is not None: annotation_message.creator_id = obj.creator_id annotation_message.annotation_object.CopyFrom(AnnotationObject.to_protobuf(obj.annotation_object)) if obj.color is not None: annotation_message.color = obj.color if obj.notes is not None: annotation_message.notes = obj.notes if obj.type == "WholeImageAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id elif obj.type == "PointAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.point.CopyFrom(AnnotationPoint.to_protobuf(obj.point)) elif obj.type == "CircleAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.center.CopyFrom(AnnotationPoint.to_protobuf(obj.center)) annotation_message.radius = obj.radius elif obj.type == "DrawAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.points.extend([AnnotationPoint.to_protobuf(p) for p in obj.points]) elif obj.type == "RectangleAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.point.CopyFrom(AnnotationPoint.to_protobuf(obj.point)) annotation_message.width = obj.width annotation_message.height = obj.height elif obj.type == "PolygonAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.points.extend([AnnotationPoint.to_protobuf(p) for p in obj.points]) elif obj.type == "TimeSeriesInstantAnnotation": annotation_message.timestamp = obj.timestamp if obj.device_id is not None: annotation_message.device_id = obj.device_id if obj.sensor_id is not None: annotation_message.sensor_id = obj.sensor_id elif obj.type == "TimeSeriesIntervalAnnotation": annotation_message.timestamp_start = obj.timestamp_start annotation_message.timestamp_end = obj.timestamp_end if obj.device_id is not None: annotation_message.device_id = obj.device_id if obj.sensor_id is not None: annotation_message.sensor_id = obj.sensor_id else: raise ValueError return annotation_message @staticmethod def from_protobuf(obj: ByteString) -> "Annotation": """Decode a Protobuf message to {Annotation} Arguments: obj {ByteString} -- message to be decoded Returns: Annotation -- decoded annotation """ annotation_message = AnnotationMessage() annotation_message.ParseFromString(obj) return Annotation( type=annotation_message.entity.type, id=annotation_message.entity.id, tags=annotation_message.entity.tags, metadata=annotation_message.entity.metadata, acquisition_id=annotation_message.acquisition_id, creator_id=annotation_message.creator_id, annotation_object=AnnotationObject.from_protobuf(annotation_message.annotation_object), color=annotation_message.color if annotation_message.HasField("color") else None, notes=annotation_message.notes if annotation_message.HasField("notes") else None, annotated_sample_id=annotation_message.annotated_sample_id if annotation_message.HasField("annotated_sample_id") else None, point=AnnotationPoint.from_protobuf(annotation_message.point) if annotation_message.HasField("point") else None, center=AnnotationPoint.from_protobuf(annotation_message.center) if annotation_message.HasField("center") else None, radius=annotation_message.radius if annotation_message.HasField("radius") else None, points=[AnnotationPoint.from_protobuf(p) for p in annotation_message.points], width=annotation_message.width if annotation_message.HasField("width") else None, height=annotation_message.height if annotation_message.HasField("height") else None, timestamp=annotation_message.timestamp if annotation_message.HasField("timestamp") else None, device_id=annotation_message.device_id if annotation_message.HasField("device_id") else None, sensor_id=annotation_message.sensor_id if annotation_message.HasField("sensor_id") else None, timestamp_start=annotation_message.timestamp_start if annotation_message.HasField("timestamp_start") else None, timestamp_end=annotation_message.timestamp_end if annotation_message.HasField("timestamp_end") else None, ) @staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Annotation} Arguments: obj {Dict[str, str]} -- JSON object Raises: ValueError: unexpected object or sub-object Returns: Any -- parsed object and sub-objects """ if "type" in obj: if obj["type"].endswith("Annotation"): annotation = partial( Annotation, type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], acquisition_id=obj["acquisitionId"], creator_id=obj["creatorId"], annotation_object=obj["annotationObject"], color=obj["color"], notes=obj["notes"] ) if obj["type"] == "WholeImageAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"] ) elif obj["type"] == "PointAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], point=AnnotationPoint.from_json(obj["point"]) ) elif obj["type"] == "CircleAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], center=AnnotationPoint.from_json(obj["center"]), radius=obj["radius"] ) elif obj["type"] == "DrawAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], points=[AnnotationPoint.from_json(p) for p in obj["points"]] ) elif obj["type"] == "RectangleAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], point=AnnotationPoint.from_json(obj["point"]), width=obj["width"], height=obj["height"] ) elif obj["type"] == "PolygonAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], points=[AnnotationPoint.from_json(p) for p in obj["points"]] ) elif obj["type"] == "TimeSeriesInstantAnnotation": return annotation( timestamp=obj["timestamp"], device_id=obj["deviceId"], sensor_id=obj["sensorId"] ) elif obj["type"] == "TimeSeriesIntervalAnnotation": return annotation( timestamp_start=obj["timestampStart"], timestamp_end=obj["timestampEnd"], device_id=obj["deviceId"], sensor_id=obj["sensorId"] ) else: raise ValueError elif obj["type"] == "AnnotationText" or obj["type"] == "AnnotationImage": return AnnotationObject.from_json(obj) else: raise ValueError return obj
Ancestors
- dempy._base.Entity
Static methods
def from_json(obj: Dict[str, str]) -> Any
-
Parse a JSON dictionary to {Annotation}
Arguments
obj {Dict[str, str]} – JSON object
Raises
ValueError
- unexpected object or sub-object
Returns
Any -- parsed object and sub-objects
Expand source code
@staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Annotation} Arguments: obj {Dict[str, str]} -- JSON object Raises: ValueError: unexpected object or sub-object Returns: Any -- parsed object and sub-objects """ if "type" in obj: if obj["type"].endswith("Annotation"): annotation = partial( Annotation, type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], acquisition_id=obj["acquisitionId"], creator_id=obj["creatorId"], annotation_object=obj["annotationObject"], color=obj["color"], notes=obj["notes"] ) if obj["type"] == "WholeImageAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"] ) elif obj["type"] == "PointAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], point=AnnotationPoint.from_json(obj["point"]) ) elif obj["type"] == "CircleAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], center=AnnotationPoint.from_json(obj["center"]), radius=obj["radius"] ) elif obj["type"] == "DrawAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], points=[AnnotationPoint.from_json(p) for p in obj["points"]] ) elif obj["type"] == "RectangleAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], point=AnnotationPoint.from_json(obj["point"]), width=obj["width"], height=obj["height"] ) elif obj["type"] == "PolygonAnnotation": return annotation( annotated_sample_id=obj["annotatedSampleId"], points=[AnnotationPoint.from_json(p) for p in obj["points"]] ) elif obj["type"] == "TimeSeriesInstantAnnotation": return annotation( timestamp=obj["timestamp"], device_id=obj["deviceId"], sensor_id=obj["sensorId"] ) elif obj["type"] == "TimeSeriesIntervalAnnotation": return annotation( timestamp_start=obj["timestampStart"], timestamp_end=obj["timestampEnd"], device_id=obj["deviceId"], sensor_id=obj["sensorId"] ) else: raise ValueError elif obj["type"] == "AnnotationText" or obj["type"] == "AnnotationImage": return AnnotationObject.from_json(obj) else: raise ValueError return obj
def from_protobuf(obj: ByteString) -> Annotation
-
Decode a Protobuf message to {Annotation}
Arguments
obj {ByteString} – message to be decoded
Returns
Annotation -- decoded annotation
Expand source code
@staticmethod def from_protobuf(obj: ByteString) -> "Annotation": """Decode a Protobuf message to {Annotation} Arguments: obj {ByteString} -- message to be decoded Returns: Annotation -- decoded annotation """ annotation_message = AnnotationMessage() annotation_message.ParseFromString(obj) return Annotation( type=annotation_message.entity.type, id=annotation_message.entity.id, tags=annotation_message.entity.tags, metadata=annotation_message.entity.metadata, acquisition_id=annotation_message.acquisition_id, creator_id=annotation_message.creator_id, annotation_object=AnnotationObject.from_protobuf(annotation_message.annotation_object), color=annotation_message.color if annotation_message.HasField("color") else None, notes=annotation_message.notes if annotation_message.HasField("notes") else None, annotated_sample_id=annotation_message.annotated_sample_id if annotation_message.HasField("annotated_sample_id") else None, point=AnnotationPoint.from_protobuf(annotation_message.point) if annotation_message.HasField("point") else None, center=AnnotationPoint.from_protobuf(annotation_message.center) if annotation_message.HasField("center") else None, radius=annotation_message.radius if annotation_message.HasField("radius") else None, points=[AnnotationPoint.from_protobuf(p) for p in annotation_message.points], width=annotation_message.width if annotation_message.HasField("width") else None, height=annotation_message.height if annotation_message.HasField("height") else None, timestamp=annotation_message.timestamp if annotation_message.HasField("timestamp") else None, device_id=annotation_message.device_id if annotation_message.HasField("device_id") else None, sensor_id=annotation_message.sensor_id if annotation_message.HasField("sensor_id") else None, timestamp_start=annotation_message.timestamp_start if annotation_message.HasField("timestamp_start") else None, timestamp_end=annotation_message.timestamp_end if annotation_message.HasField("timestamp_end") else None, )
def to_protobuf(obj: Annotation) -> dempy_pb2.Annotation
-
Encode an annotation to a Protobuf message
Arguments
obj {Annotation} – annotation to be encoded
Returns
AnnotationMessage -- encoded annotation
Expand source code
@staticmethod def to_protobuf(obj: "Annotation") -> AnnotationMessage: """Encode an annotation to a Protobuf message Arguments: obj {Annotation} -- annotation to be encoded Returns: AnnotationMessage -- encoded annotation """ annotation_message = AnnotationMessage() annotation_message.entity.CopyFrom(Entity.to_protobuf(obj)) annotation_message.acquisition_id = obj.acquisition_id if obj.creator_id is not None: annotation_message.creator_id = obj.creator_id annotation_message.annotation_object.CopyFrom(AnnotationObject.to_protobuf(obj.annotation_object)) if obj.color is not None: annotation_message.color = obj.color if obj.notes is not None: annotation_message.notes = obj.notes if obj.type == "WholeImageAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id elif obj.type == "PointAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.point.CopyFrom(AnnotationPoint.to_protobuf(obj.point)) elif obj.type == "CircleAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.center.CopyFrom(AnnotationPoint.to_protobuf(obj.center)) annotation_message.radius = obj.radius elif obj.type == "DrawAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.points.extend([AnnotationPoint.to_protobuf(p) for p in obj.points]) elif obj.type == "RectangleAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.point.CopyFrom(AnnotationPoint.to_protobuf(obj.point)) annotation_message.width = obj.width annotation_message.height = obj.height elif obj.type == "PolygonAnnotation": annotation_message.annotated_sample_id = obj.annotated_sample_id annotation_message.points.extend([AnnotationPoint.to_protobuf(p) for p in obj.points]) elif obj.type == "TimeSeriesInstantAnnotation": annotation_message.timestamp = obj.timestamp if obj.device_id is not None: annotation_message.device_id = obj.device_id if obj.sensor_id is not None: annotation_message.sensor_id = obj.sensor_id elif obj.type == "TimeSeriesIntervalAnnotation": annotation_message.timestamp_start = obj.timestamp_start annotation_message.timestamp_end = obj.timestamp_end if obj.device_id is not None: annotation_message.device_id = obj.device_id if obj.sensor_id is not None: annotation_message.sensor_id = obj.sensor_id else: raise ValueError return annotation_message
class Device (type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str, manufacturer: str, model_name: str, sensors: List[Sensor])
-
Device class
Expand source code
class Device(Entity): """Device class""" def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str, manufacturer: str, model_name: str, sensors: List[Sensor]): super().__init__(type, id, tags, metadata) self.sync_offset = sync_offset self.time_unit = time_unit self.serial_number = serial_number self.manufacturer = manufacturer self.model_name = model_name self._sensors = sensors @property def sensors(self): """Sensors' API""" class Inner: @staticmethod def get(sensor_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Sensor, List[Sensor]]: """Get all the sensors that belong to this device Keyword Arguments: sensor_id {str} -- id of the sensor (default: {None}) tags {List[str]} -- tags of the sensor (default: {[]}) metadata {Dict[str, str]} -- metadata of the sensor (default: {{}}) Raises: IndexError: sensor identified by `sensor_id` does not exist in this device Returns: Union[Sensor, List[Sensor]] -- sensor or list of sensors """ if sensor_id is None: if len(tags) > 0 or len(metadata) > 0: return [s for s in self._sensors if len([k for k in s.metadata if k in metadata and s.metadata[k] == metadata[k]]) > 0] return self._sensors else: try: sensor = next((sensor for sensor in self._sensors if sensor.id == sensor_id)) except StopIteration: raise IndexError(f"sensor id {sensor_id} does not exist in acquisition id {self.id}") return sensor @staticmethod def count() -> int: """Get the number of sensors on this device Returns: int -- number of sensors """ return len(self._sensors) return Inner() @staticmethod def to_protobuf(obj: "Device") -> DeviceMessage: """Encode an device to a Protobuf message Arguments: obj {Device} -- device to be encoded Returns: DeviceMessage -- encoded device """ device_message = DeviceMessage() device_message.entity.CopyFrom(Entity.to_protobuf(obj)) if obj.sync_offset is not None: device_message.sync_offset = obj.sync_offset if obj.time_unit is not None: device_message.time_unit = obj.time_unit if obj.serial_number is not None: device_message.serial_number = obj.serial_number if obj.manufacturer is not None: device_message.manufacturer = obj.manufacturer if obj.model_name is not None: device_message.model_name = obj.model_name device_message.sensors.extend([Sensor.to_protobuf(s) for s in obj._sensors]) return device_message @staticmethod def from_protobuf(device_message: DeviceMessage) -> "Device": """Decode a Protobuf message to {Device} Arguments: obj {DeviceMessage} -- message to be decoded Returns: Device -- decoded device """ return Device( type=device_message.entity.type, id=device_message.entity.id, tags=device_message.entity.tags, metadata=device_message.entity.metadata, sync_offset=device_message.sync_offset if device_message.HasField("sync_offset") else None, time_unit=device_message.time_unit if device_message.HasField("time_unit") else None, serial_number=device_message.serial_number if device_message.HasField("serial_number") else None, manufacturer=device_message.manufacturer if device_message.HasField("manufacturer") else None, model_name=device_message.model_name if device_message.HasField("model_name") else None, sensors=[Sensor.from_protobuf(s) for s in device_message.sensors] ) @staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Device} Arguments: obj {Dict[str, str]} -- JSON object Raises: ValueError: unexpected object or sub-object Returns: Any -- parsed object and sub-objects """ if "type" in obj: if obj["type"] == "Device": return Device( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], sync_offset=obj["syncOffset"], time_unit=obj["timeUnit"], serial_number=obj["serialNumber"], manufacturer=obj["manufacturer"], model_name=obj["modelName"], sensors=obj["sensors"], ) elif obj["type"] == "Sensor": return Sensor.from_json(obj) else: raise ValueError return obj
Ancestors
- dempy._base.Entity
Static methods
def from_json(obj: Dict[str, str]) -> Any
-
Parse a JSON dictionary to {Device}
Arguments
obj {Dict[str, str]} – JSON object
Raises
ValueError
- unexpected object or sub-object
Returns
Any -- parsed object and sub-objects
Expand source code
@staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Device} Arguments: obj {Dict[str, str]} -- JSON object Raises: ValueError: unexpected object or sub-object Returns: Any -- parsed object and sub-objects """ if "type" in obj: if obj["type"] == "Device": return Device( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], sync_offset=obj["syncOffset"], time_unit=obj["timeUnit"], serial_number=obj["serialNumber"], manufacturer=obj["manufacturer"], model_name=obj["modelName"], sensors=obj["sensors"], ) elif obj["type"] == "Sensor": return Sensor.from_json(obj) else: raise ValueError return obj
def from_protobuf(device_message: dempy_pb2.Device) -> Device
-
Decode a Protobuf message to {Device}
Arguments
obj {DeviceMessage} – message to be decoded
Returns
Device -- decoded device
Expand source code
@staticmethod def from_protobuf(device_message: DeviceMessage) -> "Device": """Decode a Protobuf message to {Device} Arguments: obj {DeviceMessage} -- message to be decoded Returns: Device -- decoded device """ return Device( type=device_message.entity.type, id=device_message.entity.id, tags=device_message.entity.tags, metadata=device_message.entity.metadata, sync_offset=device_message.sync_offset if device_message.HasField("sync_offset") else None, time_unit=device_message.time_unit if device_message.HasField("time_unit") else None, serial_number=device_message.serial_number if device_message.HasField("serial_number") else None, manufacturer=device_message.manufacturer if device_message.HasField("manufacturer") else None, model_name=device_message.model_name if device_message.HasField("model_name") else None, sensors=[Sensor.from_protobuf(s) for s in device_message.sensors] )
def to_protobuf(obj: Device) -> dempy_pb2.Device
-
Encode an device to a Protobuf message
Arguments
obj {Device} – device to be encoded
Returns
DeviceMessage -- encoded device
Expand source code
@staticmethod def to_protobuf(obj: "Device") -> DeviceMessage: """Encode an device to a Protobuf message Arguments: obj {Device} -- device to be encoded Returns: DeviceMessage -- encoded device """ device_message = DeviceMessage() device_message.entity.CopyFrom(Entity.to_protobuf(obj)) if obj.sync_offset is not None: device_message.sync_offset = obj.sync_offset if obj.time_unit is not None: device_message.time_unit = obj.time_unit if obj.serial_number is not None: device_message.serial_number = obj.serial_number if obj.manufacturer is not None: device_message.manufacturer = obj.manufacturer if obj.model_name is not None: device_message.model_name = obj.model_name device_message.sensors.extend([Sensor.to_protobuf(s) for s in obj._sensors]) return device_message
Instance variables
var sensors
-
Sensors' API
Expand source code
@property def sensors(self): """Sensors' API""" class Inner: @staticmethod def get(sensor_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Sensor, List[Sensor]]: """Get all the sensors that belong to this device Keyword Arguments: sensor_id {str} -- id of the sensor (default: {None}) tags {List[str]} -- tags of the sensor (default: {[]}) metadata {Dict[str, str]} -- metadata of the sensor (default: {{}}) Raises: IndexError: sensor identified by `sensor_id` does not exist in this device Returns: Union[Sensor, List[Sensor]] -- sensor or list of sensors """ if sensor_id is None: if len(tags) > 0 or len(metadata) > 0: return [s for s in self._sensors if len([k for k in s.metadata if k in metadata and s.metadata[k] == metadata[k]]) > 0] return self._sensors else: try: sensor = next((sensor for sensor in self._sensors if sensor.id == sensor_id)) except StopIteration: raise IndexError(f"sensor id {sensor_id} does not exist in acquisition id {self.id}") return sensor @staticmethod def count() -> int: """Get the number of sensors on this device Returns: int -- number of sensors """ return len(self._sensors) return Inner()
class ImageSample (type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str, sensor_id: str, media_type: str, image_source: str, has_rotation_metadata: bool)
-
ImageSample class
Expand source code
class ImageSample(Entity): """ImageSample class""" def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str, sensor_id: str, media_type: str, image_source: str, has_rotation_metadata: bool): super().__init__(type, id, tags, metadata) self.timestamp = timestamp self.acquisition_id = acquisition_id self.device_id = device_id self.sensor_id = sensor_id self.media_type = media_type self.image_source = image_source self.has_rotation_metadata = has_rotation_metadata @staticmethod def to_protobuf(obj: "ImageSample") -> ImageMessage: """Encode a image sample to a Protobuf message Arguments: obj {ImageSample} -- image sample to be encoded Returns: ImageMessage -- encoded image sample """ image_message = ImageMessage() image_message.entity.CopyFrom(Entity.to_protobuf(obj)) image_message.timestamp = obj.timestamp image_message.acquisition_id = obj.acquisition_id if obj.device_id is not None: image_message.device_id = obj.device_id if obj.sensor_id is not None: image_message.sensor_id = obj.sensor_id image_message.media_type = obj.media_type image_message.image_source = obj.image_source image_message.has_rotation_metadata = obj.has_rotation_metadata return image_message @staticmethod def from_protobuf(obj: Union[ByteString, ImageMessage]) -> "ImageSample": """Decode a Protobuf message to {ImageSample} Arguments: obj {Union[ByteString, ImageMessage]} -- message to be decoded Returns: ImageSample -- decoded image sample """ image_message = obj if isinstance(obj, ImageMessage) else ImageMessage().ParseFromString(obj) return ImageSample( type=image_message.entity.type, id=image_message.entity.id, tags=image_message.entity.tags, metadata=image_message.entity.metadata, timestamp=image_message.timestamp, acquisition_id=image_message.acquisition_id, device_id=image_message.device_id if image_message.HasField("device_id") else None, sensor_id=image_message.sensor_id if image_message.HasField("sensor_id") else None, media_type=image_message.media_type, image_source=image_message.image_source, has_rotation_metadata=image_message.has_rotation_metadata ) @staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {ImageSample} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"] == "ImageSample": return ImageSample( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], timestamp=obj["timestamp"], acquisition_id=obj["acquisitionId"], device_id=obj["deviceId"], sensor_id=obj["sensorId"], media_type=obj["mediaType"], image_source=obj["imageSource"], has_rotation_metadata=obj["hasRotationMetadata"] ) return obj
Ancestors
- dempy._base.Entity
Static methods
def from_json(obj: Dict[str, str]) -> Any
-
Parse a JSON dictionary to {ImageSample}
Arguments
obj {Dict[str, str]} – JSON object
Returns
Any -- parsed object and sub-objects
Expand source code
@staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {ImageSample} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"] == "ImageSample": return ImageSample( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], timestamp=obj["timestamp"], acquisition_id=obj["acquisitionId"], device_id=obj["deviceId"], sensor_id=obj["sensorId"], media_type=obj["mediaType"], image_source=obj["imageSource"], has_rotation_metadata=obj["hasRotationMetadata"] ) return obj
def from_protobuf(obj: Union[ByteString, dempy_pb2.ImageSample]) -> ImageSample
-
Decode a Protobuf message to {ImageSample}
Arguments
obj {Union[ByteString, ImageMessage]} – message to be decoded
Returns
ImageSample -- decoded image sample
Expand source code
@staticmethod def from_protobuf(obj: Union[ByteString, ImageMessage]) -> "ImageSample": """Decode a Protobuf message to {ImageSample} Arguments: obj {Union[ByteString, ImageMessage]} -- message to be decoded Returns: ImageSample -- decoded image sample """ image_message = obj if isinstance(obj, ImageMessage) else ImageMessage().ParseFromString(obj) return ImageSample( type=image_message.entity.type, id=image_message.entity.id, tags=image_message.entity.tags, metadata=image_message.entity.metadata, timestamp=image_message.timestamp, acquisition_id=image_message.acquisition_id, device_id=image_message.device_id if image_message.HasField("device_id") else None, sensor_id=image_message.sensor_id if image_message.HasField("sensor_id") else None, media_type=image_message.media_type, image_source=image_message.image_source, has_rotation_metadata=image_message.has_rotation_metadata )
def to_protobuf(obj: ImageSample) -> dempy_pb2.ImageSample
-
Encode a image sample to a Protobuf message
Arguments
obj {ImageSample} – image sample to be encoded
Returns
ImageMessage -- encoded image sample
Expand source code
@staticmethod def to_protobuf(obj: "ImageSample") -> ImageMessage: """Encode a image sample to a Protobuf message Arguments: obj {ImageSample} -- image sample to be encoded Returns: ImageMessage -- encoded image sample """ image_message = ImageMessage() image_message.entity.CopyFrom(Entity.to_protobuf(obj)) image_message.timestamp = obj.timestamp image_message.acquisition_id = obj.acquisition_id if obj.device_id is not None: image_message.device_id = obj.device_id if obj.sensor_id is not None: image_message.sensor_id = obj.sensor_id image_message.media_type = obj.media_type image_message.image_source = obj.image_source image_message.has_rotation_metadata = obj.has_rotation_metadata return image_message
class Sensor (type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str, manufacturer: str, model_name: str, sensor_type: str)
-
Sensor class
Expand source code
class Sensor(Entity): """Sensor class""" def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str, manufacturer: str, model_name: str, sensor_type: str): super().__init__(type, id, tags, metadata) self.sync_offset = sync_offset self.time_unit = time_unit self.serial_number = serial_number self.manufacturer = manufacturer self.model_name = model_name self.sensor_type = sensor_type @staticmethod def to_protobuf(obj: "Sensor") -> SensorMessage: """Encode an sensor to a Protobuf message Arguments: obj {Sensor} -- sensor to be encoded Returns: SensorMessage -- encoded sensor """ sensor_message = SensorMessage() sensor_message.entity.CopyFrom(Entity.to_protobuf(obj)) if obj.sync_offset is not None: sensor_message.sync_offset = obj.sync_offset if obj.time_unit is not None: sensor_message.time_unit = obj.time_unit if obj.serial_number is not None: sensor_message.serial_number = obj.serial_number if obj.manufacturer is not None: sensor_message.manufacturer = obj.manufacturer if obj.model_name is not None: sensor_message.model_name = obj.model_name if obj.sensor_type is not None: sensor_message.sensor_type = obj.sensor_type return sensor_message @staticmethod def from_protobuf(sensor_message: SensorMessage) -> "Sensor": """Decode a Protobuf message to {Sensor} Arguments: obj {SensorMessage} -- message to be decoded Returns: Sensor -- decoded sensor """ return Sensor( type=sensor_message.entity.type, id=sensor_message.entity.id, tags=sensor_message.entity.tags, metadata=sensor_message.entity.metadata, sync_offset=sensor_message.sync_offset if sensor_message.HasField("sync_offset") else None, time_unit=sensor_message.time_unit if sensor_message.HasField("time_unit") else None, serial_number=sensor_message.serial_number if sensor_message.HasField("serial_number") else None, manufacturer=sensor_message.manufacturer if sensor_message.HasField("manufacturer") else None, model_name=sensor_message.model_name if sensor_message.HasField("model_name") else None, sensor_type=sensor_message.sensor_type if sensor_message.HasField("sensor_type") else None, ) @staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Sensor} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"] == "Sensor": return Sensor( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], sync_offset=obj["syncOffset"], time_unit=obj["timeUnit"], serial_number=obj["serialNumber"], manufacturer=obj["manufacturer"], model_name=obj["modelName"], sensor_type=obj["sensorType"], ) return obj
Ancestors
- dempy._base.Entity
Static methods
def from_json(obj: Dict[str, str]) -> Any
-
Parse a JSON dictionary to {Sensor}
Arguments
obj {Dict[str, str]} – JSON object
Returns
Any -- parsed object and sub-objects
Expand source code
@staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Sensor} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"] == "Sensor": return Sensor( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], sync_offset=obj["syncOffset"], time_unit=obj["timeUnit"], serial_number=obj["serialNumber"], manufacturer=obj["manufacturer"], model_name=obj["modelName"], sensor_type=obj["sensorType"], ) return obj
def from_protobuf(sensor_message: dempy_pb2.Sensor) -> Sensor
-
Decode a Protobuf message to {Sensor}
Arguments
obj {SensorMessage} – message to be decoded
Returns
Sensor -- decoded sensor
Expand source code
@staticmethod def from_protobuf(sensor_message: SensorMessage) -> "Sensor": """Decode a Protobuf message to {Sensor} Arguments: obj {SensorMessage} -- message to be decoded Returns: Sensor -- decoded sensor """ return Sensor( type=sensor_message.entity.type, id=sensor_message.entity.id, tags=sensor_message.entity.tags, metadata=sensor_message.entity.metadata, sync_offset=sensor_message.sync_offset if sensor_message.HasField("sync_offset") else None, time_unit=sensor_message.time_unit if sensor_message.HasField("time_unit") else None, serial_number=sensor_message.serial_number if sensor_message.HasField("serial_number") else None, manufacturer=sensor_message.manufacturer if sensor_message.HasField("manufacturer") else None, model_name=sensor_message.model_name if sensor_message.HasField("model_name") else None, sensor_type=sensor_message.sensor_type if sensor_message.HasField("sensor_type") else None, )
def to_protobuf(obj: Sensor) -> dempy_pb2.Sensor
-
Encode an sensor to a Protobuf message
Arguments
obj {Sensor} – sensor to be encoded
Returns
SensorMessage -- encoded sensor
Expand source code
@staticmethod def to_protobuf(obj: "Sensor") -> SensorMessage: """Encode an sensor to a Protobuf message Arguments: obj {Sensor} -- sensor to be encoded Returns: SensorMessage -- encoded sensor """ sensor_message = SensorMessage() sensor_message.entity.CopyFrom(Entity.to_protobuf(obj)) if obj.sync_offset is not None: sensor_message.sync_offset = obj.sync_offset if obj.time_unit is not None: sensor_message.time_unit = obj.time_unit if obj.serial_number is not None: sensor_message.serial_number = obj.serial_number if obj.manufacturer is not None: sensor_message.manufacturer = obj.manufacturer if obj.model_name is not None: sensor_message.model_name = obj.model_name if obj.sensor_type is not None: sensor_message.sensor_type = obj.sensor_type return sensor_message
class Subject (type: str, id: str, tags: List[str], metadata: Dict[str, str], birthdate_timestamp: int, description: str, first_name: str, last_name: str)
-
Subject class
Expand source code
class Subject(Entity): """Subject class""" def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], birthdate_timestamp: int, description: str, first_name: str, last_name: str): super().__init__(type, id, tags, metadata) self.birthdate_timestamp = birthdate_timestamp self.description = description self.first_name = first_name self.last_name = last_name @staticmethod def to_protobuf(obj: "Subject") -> SubjectMessage: """Encode an subject to a Protobuf message Arguments: obj {Subject} -- subject to be encoded Returns: SubjectMessage -- encoded subject """ subject_message = SubjectMessage() subject_message.entity.CopyFrom(Entity.to_protobuf(obj)) subject_message.birthdate_timestamp = obj.birthdate_timestamp if obj.description is not None: subject_message.description = obj.description if obj.first_name is not None: subject_message.first_name = obj.first_name if obj.last_name is not None: subject_message.last_name = obj.last_name return subject_message @staticmethod def from_protobuf(subject_message: SubjectMessage) -> "Subject": """Decode a Protobuf message to {Subject} Arguments: obj {SubjectMessage} -- message to be decoded Returns: Subject -- decoded subject """ return Subject( type=subject_message.entity.type, id=subject_message.entity.id, tags=subject_message.entity.tags, metadata=subject_message.entity.metadata, birthdate_timestamp=subject_message.birthdate_timestamp if subject_message.HasField("birthdate_timestamp") else None, description=subject_message.description if subject_message.HasField("description") else None, first_name=subject_message.first_name if subject_message.HasField("first_name") else None, last_name=subject_message.last_name if subject_message.HasField("last_name") else None, ) @staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Subject} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"].endswith("Subject"): return Subject( type=obj["type"], id=obj["id"], metadata=obj["metadata"], tags=obj["tags"], birthdate_timestamp=obj["birthdateTimestamp"], description=obj["description"], first_name=obj["firstName"], last_name=obj["lastName"] ) return obj
Ancestors
- dempy._base.Entity
Static methods
def from_json(obj: Dict[str, str]) -> Any
-
Parse a JSON dictionary to {Subject}
Arguments
obj {Dict[str, str]} – JSON object
Returns
Any -- parsed object and sub-objects
Expand source code
@staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {Subject} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"].endswith("Subject"): return Subject( type=obj["type"], id=obj["id"], metadata=obj["metadata"], tags=obj["tags"], birthdate_timestamp=obj["birthdateTimestamp"], description=obj["description"], first_name=obj["firstName"], last_name=obj["lastName"] ) return obj
def from_protobuf(subject_message: dempy_pb2.Subject) -> Subject
-
Decode a Protobuf message to {Subject}
Arguments
obj {SubjectMessage} – message to be decoded
Returns
Subject -- decoded subject
Expand source code
@staticmethod def from_protobuf(subject_message: SubjectMessage) -> "Subject": """Decode a Protobuf message to {Subject} Arguments: obj {SubjectMessage} -- message to be decoded Returns: Subject -- decoded subject """ return Subject( type=subject_message.entity.type, id=subject_message.entity.id, tags=subject_message.entity.tags, metadata=subject_message.entity.metadata, birthdate_timestamp=subject_message.birthdate_timestamp if subject_message.HasField("birthdate_timestamp") else None, description=subject_message.description if subject_message.HasField("description") else None, first_name=subject_message.first_name if subject_message.HasField("first_name") else None, last_name=subject_message.last_name if subject_message.HasField("last_name") else None, )
def to_protobuf(obj: Subject) -> dempy_pb2.Subject
-
Encode an subject to a Protobuf message
Arguments
obj {Subject} – subject to be encoded
Returns
SubjectMessage -- encoded subject
Expand source code
@staticmethod def to_protobuf(obj: "Subject") -> SubjectMessage: """Encode an subject to a Protobuf message Arguments: obj {Subject} -- subject to be encoded Returns: SubjectMessage -- encoded subject """ subject_message = SubjectMessage() subject_message.entity.CopyFrom(Entity.to_protobuf(obj)) subject_message.birthdate_timestamp = obj.birthdate_timestamp if obj.description is not None: subject_message.description = obj.description if obj.first_name is not None: subject_message.first_name = obj.first_name if obj.last_name is not None: subject_message.last_name = obj.last_name return subject_message
class TimeseriesSample (type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str, sensor_id: str, **kwargs)
-
TimeseriesSample class
Expand source code
class TimeseriesSample(Entity): """TimeseriesSample class""" def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str, sensor_id: str, **kwargs): super().__init__(type, id, tags, metadata) self.timestamp = timestamp self.acquisition_id = acquisition_id self.device_id = device_id self.sensor_id = sensor_id if self.type == "UniaxialSample": self.x: float = kwargs.get("x") elif self.type == "BiaxialSample": self.x: float = kwargs.get("x") self.y: float = kwargs.get("y") elif self.type == "TriaxialSample": self.x: float = kwargs.get("x") self.y: float = kwargs.get("y") self.z: float = kwargs.get("z") elif self.type == "QuadriaxialSample": self.x: float = kwargs.get("x") self.y: float = kwargs.get("y") self.z: float = kwargs.get("z") self.u: float = kwargs.get("u") elif self.type == "QuinqueaxialSample": self.x: float = kwargs.get("x") self.y: float = kwargs.get("y") self.z: float = kwargs.get("z") self.u: float = kwargs.get("u") self.w: float = kwargs.get("w") else: raise ValueError @staticmethod def to_protobuf(obj: "TimeseriesSample") -> TimeseriesMessage: """Encode a timeseries sample to a Protobuf message Arguments: obj {TimeseriesSample} -- timeseries sample to be encoded Returns: TimeseriesMessage -- encoded timeseries sample """ timeseries_message = TimeseriesMessage() timeseries_message.entity.CopyFrom(Entity.to_protobuf(obj)) timeseries_message.timestamp = obj.timestamp timeseries_message.acquisition_id = obj.acquisition_id if obj.device_id is not None: timeseries_message.device_id = obj.device_id if obj.sensor_id is not None: timeseries_message.sensor_id = obj.sensor_id if obj.type == "UniaxialSample": timeseries_message.x = obj.x elif obj.type == "BiaxialSample": timeseries_message.x = obj.x timeseries_message.y = obj.y elif obj.type == "TriaxialSample": timeseries_message.x = obj.x timeseries_message.y = obj.y timeseries_message.z = obj.z elif obj.type == "QuadriaxialSample": timeseries_message.x = obj.x timeseries_message.y = obj.y timeseries_message.z = obj.z timeseries_message.u = obj.u elif obj.type == "QuinqueaxialSample": timeseries_message.x = obj.x timeseries_message.y = obj.y timeseries_message.z = obj.z timeseries_message.u = obj.u timeseries_message.w = obj.w else: raise ValueError return timeseries_message @staticmethod def from_protobuf(timeseries_message: TimeseriesMessage) -> "TimeseriesSample": """Decode a Protobuf message to {TimeseriesSample} Arguments: obj {TimeseriesMessage} -- message to be decoded Returns: TimeseriesSample -- decoded timeseries sample """ return TimeseriesSample( type=timeseries_message.entity.type, id=timeseries_message.entity.id, tags=timeseries_message.entity.tags, metadata=timeseries_message.entity.metadata, timestamp=timeseries_message.timestamp, acquisition_id=timeseries_message.acquisition_id, device_id=timeseries_message.device_id if timeseries_message.HasField("device_id") else None, sensor_id=timeseries_message.sensor_id if timeseries_message.HasField("sensor_id") else None, x=timeseries_message.x if timeseries_message.HasField("x") else None, y=timeseries_message.y if timeseries_message.HasField("y") else None, z=timeseries_message.z if timeseries_message.HasField("z") else None, u=timeseries_message.u if timeseries_message.HasField("u") else None, w=timeseries_message.w if timeseries_message.HasField("w") else None ) @staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {TimeseriesSample} Arguments: obj {Dict[str, str]} -- JSON object Raises: ValueError: unexpected object or sub-object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"].endswith("axialSample"): timeseries = partial( TimeseriesSample, type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], timestamp=obj["timestamp"], acquisition_id=obj["acquisitionId"], device_id=obj["deviceId"], sensor_id=obj["sensorId"], ) if obj["type"] == "UniaxialSample": return timeseries(x=obj["x"]) elif obj["type"] == "BiaxialSample": return timeseries(x=obj["x"], y=obj["y"]) elif obj["type"] == "TriaxialSample": return timeseries(x=obj["x"], y=obj["y"], z=obj["z"]) elif obj["type"] == "QuadriaxialSample": return timeseries(x=obj["x"], y=obj["y"], z=obj["z"], u=obj["u"]) elif obj["type"] == "QuinqueaxialSample": return timeseries(x=obj["x"], y=obj["y"], z=obj["z"], u=obj["u"], w=obj["w"]) else: raise ValueError return obj
Ancestors
- dempy._base.Entity
Static methods
def from_json(obj: Dict[str, str]) -> Any
-
Parse a JSON dictionary to {TimeseriesSample}
Arguments
obj {Dict[str, str]} – JSON object
Raises
ValueError
- unexpected object or sub-object
Returns
Any -- parsed object and sub-objects
Expand source code
@staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {TimeseriesSample} Arguments: obj {Dict[str, str]} -- JSON object Raises: ValueError: unexpected object or sub-object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"].endswith("axialSample"): timeseries = partial( TimeseriesSample, type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], timestamp=obj["timestamp"], acquisition_id=obj["acquisitionId"], device_id=obj["deviceId"], sensor_id=obj["sensorId"], ) if obj["type"] == "UniaxialSample": return timeseries(x=obj["x"]) elif obj["type"] == "BiaxialSample": return timeseries(x=obj["x"], y=obj["y"]) elif obj["type"] == "TriaxialSample": return timeseries(x=obj["x"], y=obj["y"], z=obj["z"]) elif obj["type"] == "QuadriaxialSample": return timeseries(x=obj["x"], y=obj["y"], z=obj["z"], u=obj["u"]) elif obj["type"] == "QuinqueaxialSample": return timeseries(x=obj["x"], y=obj["y"], z=obj["z"], u=obj["u"], w=obj["w"]) else: raise ValueError return obj
def from_protobuf(timeseries_message: dempy_pb2.TimeseriesSample) -> TimeseriesSample
-
Decode a Protobuf message to {TimeseriesSample}
Arguments
obj {TimeseriesMessage} – message to be decoded
Returns
TimeseriesSample -- decoded timeseries sample
Expand source code
@staticmethod def from_protobuf(timeseries_message: TimeseriesMessage) -> "TimeseriesSample": """Decode a Protobuf message to {TimeseriesSample} Arguments: obj {TimeseriesMessage} -- message to be decoded Returns: TimeseriesSample -- decoded timeseries sample """ return TimeseriesSample( type=timeseries_message.entity.type, id=timeseries_message.entity.id, tags=timeseries_message.entity.tags, metadata=timeseries_message.entity.metadata, timestamp=timeseries_message.timestamp, acquisition_id=timeseries_message.acquisition_id, device_id=timeseries_message.device_id if timeseries_message.HasField("device_id") else None, sensor_id=timeseries_message.sensor_id if timeseries_message.HasField("sensor_id") else None, x=timeseries_message.x if timeseries_message.HasField("x") else None, y=timeseries_message.y if timeseries_message.HasField("y") else None, z=timeseries_message.z if timeseries_message.HasField("z") else None, u=timeseries_message.u if timeseries_message.HasField("u") else None, w=timeseries_message.w if timeseries_message.HasField("w") else None )
def to_protobuf(obj: TimeseriesSample) -> dempy_pb2.TimeseriesSample
-
Encode a timeseries sample to a Protobuf message
Arguments
obj {TimeseriesSample} – timeseries sample to be encoded
Returns
TimeseriesMessage -- encoded timeseries sample
Expand source code
@staticmethod def to_protobuf(obj: "TimeseriesSample") -> TimeseriesMessage: """Encode a timeseries sample to a Protobuf message Arguments: obj {TimeseriesSample} -- timeseries sample to be encoded Returns: TimeseriesMessage -- encoded timeseries sample """ timeseries_message = TimeseriesMessage() timeseries_message.entity.CopyFrom(Entity.to_protobuf(obj)) timeseries_message.timestamp = obj.timestamp timeseries_message.acquisition_id = obj.acquisition_id if obj.device_id is not None: timeseries_message.device_id = obj.device_id if obj.sensor_id is not None: timeseries_message.sensor_id = obj.sensor_id if obj.type == "UniaxialSample": timeseries_message.x = obj.x elif obj.type == "BiaxialSample": timeseries_message.x = obj.x timeseries_message.y = obj.y elif obj.type == "TriaxialSample": timeseries_message.x = obj.x timeseries_message.y = obj.y timeseries_message.z = obj.z elif obj.type == "QuadriaxialSample": timeseries_message.x = obj.x timeseries_message.y = obj.y timeseries_message.z = obj.z timeseries_message.u = obj.u elif obj.type == "QuinqueaxialSample": timeseries_message.x = obj.x timeseries_message.y = obj.y timeseries_message.z = obj.z timeseries_message.u = obj.u timeseries_message.w = obj.w else: raise ValueError return timeseries_message
class VideoSample (type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str, sensor_id: str, media_type: str, video_source: str)
-
VideoSample class
Expand source code
class VideoSample(Entity): """VideoSample class""" def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str, sensor_id: str, media_type: str, video_source: str): super().__init__(type, id, tags, metadata) self.timestamp = timestamp self.acquisition_id = acquisition_id self.device_id = device_id self.sensor_id = sensor_id self.media_type = media_type self.video_source = video_source @staticmethod def to_protobuf(obj: "VideoSample") -> VideoMessage: """Encode a video sample to a Protobuf message Arguments: obj {VideoSample} -- video sample to be encoded Returns: VideoMessage -- encoded video sample """ video_message = VideoMessage() video_message.entity.CopyFrom(Entity.to_protobuf(obj)) video_message.timestamp = obj.timestamp video_message.acquisition_id = obj.acquisition_id if obj.device_id is not None: video_message.device_id = obj.device_id if obj.sensor_id is not None: video_message.sensor_id = obj.sensor_id video_message.media_type = obj.media_type video_message.video_source = obj.video_source return video_message @staticmethod def from_protobuf(obj: Union[ByteString, VideoMessage]) -> "VideoSample": """Decode a Protobuf message to {VideoSample} Arguments: obj {Union[ByteString, VideoMessage]} -- message to be decoded Returns: VideoSample -- decoded video sample """ video_message = obj if isinstance(obj, VideoMessage) else VideoMessage().ParseFromString(obj) return VideoSample( type=video_message.entity.type, id=video_message.entity.id, tags=video_message.entity.tags, metadata=video_message.entity.metadata, timestamp=video_message.timestamp, acquisition_id=video_message.acquisition_id, device_id=video_message.device_id if video_message.HasField("device_id") else None, sensor_id=video_message.sensor_id if video_message.HasField("sensor_id") else None, media_type=video_message.media_type, video_source=video_message.video_source ) @staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {VideoSample} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"] == "VideoSample": return VideoSample( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], timestamp=obj["timestamp"], acquisition_id=obj["acquisitionId"], device_id=obj["deviceId"], sensor_id=obj["sensorId"], media_type=obj["mediaType"], video_source=obj["videoSource"], ) return obj
Ancestors
- dempy._base.Entity
Static methods
def from_json(obj: Dict[str, str]) -> Any
-
Parse a JSON dictionary to {VideoSample}
Arguments
obj {Dict[str, str]} – JSON object
Returns
Any -- parsed object and sub-objects
Expand source code
@staticmethod def from_json(obj: Dict[str, str]) -> Any: """Parse a JSON dictionary to {VideoSample} Arguments: obj {Dict[str, str]} -- JSON object Returns: Any -- parsed object and sub-objects """ if "type" in obj and obj["type"] == "VideoSample": return VideoSample( type=obj["type"], id=obj["id"], tags=obj["tags"], metadata=obj["metadata"], timestamp=obj["timestamp"], acquisition_id=obj["acquisitionId"], device_id=obj["deviceId"], sensor_id=obj["sensorId"], media_type=obj["mediaType"], video_source=obj["videoSource"], ) return obj
def from_protobuf(obj: Union[ByteString, dempy_pb2.VideoSample]) -> VideoSample
-
Decode a Protobuf message to {VideoSample}
Arguments
obj {Union[ByteString, VideoMessage]} – message to be decoded
Returns
VideoSample -- decoded video sample
Expand source code
@staticmethod def from_protobuf(obj: Union[ByteString, VideoMessage]) -> "VideoSample": """Decode a Protobuf message to {VideoSample} Arguments: obj {Union[ByteString, VideoMessage]} -- message to be decoded Returns: VideoSample -- decoded video sample """ video_message = obj if isinstance(obj, VideoMessage) else VideoMessage().ParseFromString(obj) return VideoSample( type=video_message.entity.type, id=video_message.entity.id, tags=video_message.entity.tags, metadata=video_message.entity.metadata, timestamp=video_message.timestamp, acquisition_id=video_message.acquisition_id, device_id=video_message.device_id if video_message.HasField("device_id") else None, sensor_id=video_message.sensor_id if video_message.HasField("sensor_id") else None, media_type=video_message.media_type, video_source=video_message.video_source )
def to_protobuf(obj: VideoSample) -> dempy_pb2.VideoSample
-
Encode a video sample to a Protobuf message
Arguments
obj {VideoSample} – video sample to be encoded
Returns
VideoMessage -- encoded video sample
Expand source code
@staticmethod def to_protobuf(obj: "VideoSample") -> VideoMessage: """Encode a video sample to a Protobuf message Arguments: obj {VideoSample} -- video sample to be encoded Returns: VideoMessage -- encoded video sample """ video_message = VideoMessage() video_message.entity.CopyFrom(Entity.to_protobuf(obj)) video_message.timestamp = obj.timestamp video_message.acquisition_id = obj.acquisition_id if obj.device_id is not None: video_message.device_id = obj.device_id if obj.sensor_id is not None: video_message.sensor_id = obj.sensor_id video_message.media_type = obj.media_type video_message.video_source = obj.video_source return video_message