Module dempy.acquisitions

Expand source code
from .acquisition import Acquisition, get, count
from .annotation import Annotation
from .device import Device
from .image_sample import ImageSample
from .sensor import Sensor
from .subject import Subject
from .timeseries_sample import TimeseriesSample
from .video_sample import VideoSample

__all__ = [
    "Acquisition", "Subject", "Device", "Sensor", "TimeseriesSample", "ImageSample", "VideoSample", "Annotation",
    "get", "count"
]

Sub-modules

dempy.acquisitions.acquisition
dempy.acquisitions.annotation
dempy.acquisitions.device
dempy.acquisitions.image_sample
dempy.acquisitions.sensor
dempy.acquisitions.subject
dempy.acquisitions.timeseries_sample
dempy.acquisitions.video_sample

Functions

def count() -> int

Get number of acquisitions

Returns

int -- number of acquisitions
 
Expand source code
def count() -> int:
    """Get number of acquisitions

    Returns:
        int -- number of acquisitions
    """
    return _api_calls.get(_ENDPOINT + "count").json()
def get(acquisition_id: str = None, dataset_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Acquisition, List[Acquisition]]

Get an acquisition identified by acquisition_id or a list of all the acquisitions

Keyword Arguments: acquisition_id {str} – id of the acquisition (default: {None}) dataset_id {str} – id of the dataset to which the acquisitions belong to (default: {None}) tags {List[str]} – tags of the acquisitions (default: {[]}) metadata {Dict[str, str]} – metadata of the acquisitions (default: {{}})

Returns

Union[Acquisition, List[Acquisition]] -- acquisition or list of acquisitions
 
Expand source code
def get(acquisition_id: str = None, dataset_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) \
    -> Union[Acquisition, List[Acquisition]]:
    """Get an acquisition identified by `acquisition_id` or a list of all the acquisitions

    Keyword Arguments:
        acquisition_id {str} -- id of the acquisition (default: {None})
        dataset_id {str} -- id of the dataset to which the acquisitions belong to (default: {None})
        tags {List[str]} -- tags of the acquisitions (default: {[]})
        metadata {Dict[str, str]} -- metadata of the acquisitions (default: {{}})

    Returns:
        Union[Acquisition, List[Acquisition]] -- acquisition or list of acquisitions
    """
    if acquisition_id is None:
        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
        acquisitions = _api_calls.get(_ENDPOINT, params={"datasetId": dataset_id, "tags": tags, **processed_metadata}) \
            .json(object_hook=Acquisition.from_json)
        for acquisition in acquisitions:
            cache._cache_data("acquisitions", acquisition.id, acquisition, Acquisition.to_protobuf)
        return acquisitions
    else:
        try:
            acquisition = cache._get_cached_data("acquisitions", acquisition_id, Acquisition.from_protobuf)
        except FileNotFoundError:
            acquisition = _api_calls.get(_ENDPOINT + acquisition_id).json(object_hook=Acquisition.from_json)
            cache._cache_data("acquisitions", acquisition_id, acquisition, Acquisition.to_protobuf)
        return acquisition

Classes

class Acquisition (type: str, id: str, tags: List[str], metadata: Dict[str, str], creation_timestamp: int, sync_offset: int, time_unit: str, owner_id: str, creator_id: str, dataset_id: str, subject: Subject, devices: List[Device], has_timeseries_samples: bool, has_image_samples: bool, has_video_samples: bool)

Acquisition class

Expand source code
class Acquisition(Entity):
    """Acquisition class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], creation_timestamp: int, sync_offset: int,
                 time_unit: str, owner_id: str, creator_id: str, dataset_id: str, subject: Subject, devices: List[Device],
                 has_timeseries_samples: bool, has_image_samples: bool, has_video_samples: bool):
        super().__init__(type, id, tags, metadata)
        self.creation_timestamp = creation_timestamp
        self.sync_offset = sync_offset
        self.time_unit = time_unit
        self.owner_id = owner_id
        self.creator_id = creator_id
        self.dataset_id = dataset_id
        self.has_timeseries_samples = has_timeseries_samples
        self.has_image_samples = has_image_samples
        self.has_video_samples = has_video_samples
        self._subject = subject
        self._devices = devices

    @property
    def subject(self):    
        """Subject's API"""
        class Inner:
            _SUBJECT_ENDPOINT = _ENDPOINT + "{}/subjects/".format(self.id)

            @staticmethod
            def get() -> Subject:
                """Get subject from acquisition

                Returns:
                    Subject -- subject of the acquisition
                """
                return self._subject

        return Inner()

    @property
    def devices(self):
        """Devices' API"""
        class Inner:
            _DEVICES_ENDPOINT = _ENDPOINT + "{}/devices/".format(self.id)

            @staticmethod
            def get(device_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Device, List[Device]]:
                """Get a device identified by `device_id` or list of devices on this acquisition

                Keyword Arguments:
                    device_id {str} -- id of the device (default: {None})
                    tags {List[str]} -- tags of the devices (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the devices (default: {{}})

                Raises:
                    IndexError: device identified by `device_id` does not exist in this acquisition

                Returns:
                    Union[Device, List[Device]] -- device or list of devices
                """
                if device_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        return [d for d in self._devices if
                                len([k for k in d.metadata if k in metadata and d.metadata[k] == metadata[k]]) > 0]

                    return self._devices
                else:
                    try:
                        device = next((device for device in self._devices if device.id == device_id))
                    except StopIteration:
                        raise IndexError(f"device id {device_id} does not exist in acquisition id {self.id}")
                    return device

            @staticmethod
            def usage() -> Dict[str, List[str]]:
                """Get a map identifying which device(s) and sensor(s) were used to acquire time series samples

                Returns:
                    Mapping[str, List[str]] -- map of (key, value) pairs, with key being id of device and the value
                    a list of sensor ids which were used to capture samples
                """
                return _api_calls.get(Inner._DEVICES_ENDPOINT + "usage").json()

            @staticmethod
            def count() -> int:
                """Get the number of devices on this acquisition

                Returns:
                    int -- number of devices
                """
                return len(self._devices)

        return Inner()

    @property
    def timeseries_samples(self):
        """Timeseries samples' API"""
        class Inner:
            _TIMESERIES_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/timeseries/".format(self.id)

            @staticmethod
            def get(tags: List[str] = [], metadata: Dict[str, str] = {}) -> SampleList:
                """Get all the timeseries samples that belong to this acquisition

                Keyword Arguments:
                    tags {List[str]} -- tags of the timeseries samples (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the timeseries samples (default: {{}})

                Returns:
                    SampleList -- list of timeseries samples
                """
                if len(tags) > 0 or len(metadata) > 0:
                    processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                    samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                        .json(object_hook=TimeseriesSample.from_json)
                    samples.sort(key=lambda sample: sample.timestamp)
                    return SampleList(samples)

                try:
                    samples = cache._get_cached_data("samples/{}/".format(self.id), "timeseries", SampleList.from_protobuf)
                except FileNotFoundError:
                    samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT) \
                        .json(object_hook=TimeseriesSample.from_json)
                    samples.sort(key=lambda sample: sample.timestamp)
                    samples = SampleList(samples)
                    cache._cache_data("samples/{}/".format(self.id), "timeseries", samples, SampleList.to_protobuf)
                return samples

            @staticmethod
            def count() -> int:
                """Get the number of timeseries samples on this acquisition

                Returns:
                    int -- number of timeseries samples
                """
                return _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT + "count").json()

            @staticmethod
            def visualize(device_id: str, sensor_id: str = None) -> None:
                """Graphically visualize the timeseries samples of a device identified by `device_id` 
                or of a given sensor identified by `sensor_id` of said device

                Arguments:
                    device_id {str} -- id of the device

                Keyword Arguments:
                    sensor_id {str} -- id of the sensor (default: {None})
                """
                def visualize_sensor_samples(axis, sensor, sensor_samples):
                    timestamps = [s.timestamp for s in sensor_samples]

                    # Sample x, y, z, u, w
                    samples_x = [s.x for s in sensor_samples if hasattr(s, "x")]
                    samples_y = [s.y for s in sensor_samples if hasattr(s, "y")]
                    samples_z = [s.z for s in sensor_samples if hasattr(s, "z")]
                    samples_u = [s.u for s in sensor_samples if hasattr(s, "u")]
                    samples_w = [s.w for s in sensor_samples if hasattr(s, "w")]

                    # Title and x label
                    axis.set_title(f"{sensor.sensor_type}\n{sensor.id}", loc="left")
                    axis.set_xlabel(sensor.time_unit if sensor.time_unit is not None else device_time_unit)

                    # Axis limit
                    axis.set_xlim([0, timestamps[-1]])
                    axis.set_ylim([min(chain(samples_x, samples_y, samples_z, samples_u, samples_w)),
                                   max(chain(samples_x, samples_y, samples_z, samples_u, samples_w))])

                    # Axis formatter
                    axis.xaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))
                    axis.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))

                    # Axis plot
                    labels = []

                    if len(samples_x) > 0:
                        axis.plot(timestamps, samples_x, color="cornflowerblue")
                        labels.append("x")
                    if len(samples_y) > 0:
                        axis.plot(timestamps, samples_y, color="mediumseagreen")
                        labels.append("y")
                    if len(samples_z) > 0:
                        axis.plot(timestamps, samples_z, color="indianred")
                        labels.append("z")
                    if len(samples_u) > 0:
                        axis.plot(timestamps, samples_u, color="mediumorchid")
                        labels.append("u")
                    if len(samples_w) > 0:
                        axis.plot(timestamps, samples_w, color="slategray")
                        labels.append("w")

                    axis.legend(labels=labels, loc="upper right")

                device = self.devices.get(device_id=device_id)
                device_time_unit = device.time_unit if device.time_unit is not None else self.time_unit

                if sensor_id is None:
                    fig, axs = plt.subplots(nrows=device.sensors.count(), figsize=(15, 10), dpi=80, constrained_layout=True)
                    fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                    device_samples = self.timeseries_samples.get().by_device(device_id=device.id)

                    for i, sensor in enumerate(device.sensors.get()):
                        visualize_sensor_samples(axs[i], sensor, device_samples.by_sensor(sensor.id))
                else:
                    fig, ax = plt.subplots(nrows=1, figsize=(10, 4), dpi=80, constrained_layout=True)
                    fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                    sensor_samples = self.timeseries_samples.get().by_sensor(sensor_id)

                    visualize_sensor_samples(ax, device.sensors.get(sensor_id=sensor_id), sensor_samples)

                plt.show()

        return Inner()

    @property
    def image_samples(self):
        """Image samples' API"""
        class Inner:
            _IMAGE_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/images/".format(self.id)

            @staticmethod
            def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[ImageSample, SampleList]:
                """Get all the image samples that belong to this acquisition

                Keyword Arguments:
                    sample_id {str} -- id of the sample (default: {None})
                    tags {List[str]} -- tags of image samples (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

                Returns:
                    Union[ImageSample, SampleList] -- image sample or list of image samples
                """
                if sample_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                        samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                            .json(object_hook=ImageSample.from_json)
                        return SampleList(samples)

                    samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT).json(object_hook=ImageSample.from_json)
                    for sample in samples:
                        cache._cache_data("samples/{}/images/".format(self.id), sample.id, sample, ImageSample.to_protobuf)
                    return SampleList(samples)
                else:
                    try:
                        sample = cache._get_cached_data("samples/{}/images/".format(self.id), sample_id, ImageSample.from_protobuf)
                    except FileNotFoundError:
                        sample = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id).json(object_hook=ImageSample.from_json)
                        cache._cache_data("samples/{}/images/".format(self.id), sample_id, sample, ImageSample.to_protobuf)
                    return sample

            @staticmethod
            def raw(sample_id: str) -> ByteString:
                """Get actual image from image sample identified by `sample_id` on this acquisition

                Arguments:
                    sample_id {str} -- id of the sample

                Returns:
                    ByteString -- bytes of the image
                """
                try:
                    image = cache._get_cached_data("samples/{}/images/raw/".format(self.id), sample_id)
                except FileNotFoundError:
                    image = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id + "/raw")
                    file_ext = "." + image.headers["Content-Type"].split("/")[-1]
                    cache._cache_data("samples/{}/images/raw/".format(self.id), sample_id + file_ext, image.content)

                return image

            @staticmethod
            def count() -> int:
                """Get the number of image samples on this acquisition

                Returns:
                    int -- number of image samples
                """
                return _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + "count").json()

            @staticmethod
            def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
                """Visualize the image of a sample identified by `sample_id`.
                By default opens the predefined system image application.
                A different callback can be given to open the image.

                Arguments:
                    sample_id {str} -- id of the sample

                Keyword Arguments:
                    backend {Callable[[str], None]} -- backend to open the image with (default: {None})
                """
                self.image_samples.raw(sample_id)
                image_path = cache._build_cache_path("samples/{}/images/raw/".format(self.id), sample_id)
                image_path = cache._add_file_extension(image_path)

                if backend is None:
                    system = platform.system()

                    if system == "Darwin":
                        subprocess.call(("open", image_path))
                    elif system == "Windows":
                        os.startfile(image_path)
                    else:
                        subprocess.call(("xdg-open", image_path))
                else:
                    backend(image_path)

        return Inner()

    @property
    def video_samples(self):
        """Video samples' API"""
        class Inner:
            _VIDEO_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/videos/".format(self.id)

            @staticmethod
            def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[VideoSample, SampleList]:
                """Get all the video samples that belong to this acquisition

                Keyword Arguments:
                    sample_id {str} -- id of the sample (default: {None})
                    tags {List[str]} -- tags of image samples (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

                Returns:
                    Union[VideoSample, SampleList] -- video sample or list of video samples
                """
                if sample_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                        samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                            .json(object_hook=VideoSample.from_json)
                        return SampleList(samples)

                    samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT).json(object_hook=VideoSample.from_json)
                    for sample in samples:
                        cache._cache_data("samples/{}/videos/".format(self.id), sample.id, sample, VideoSample.to_protobuf)
                    return SampleList(samples)
                else:
                    try:
                        sample = cache._get_cached_data("samples/{}/videos/".format(self.id), sample_id, VideoSample.from_protobuf)
                    except FileNotFoundError:
                        sample = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id).json(object_hook=VideoSample.from_json)
                        cache._cache_data("samples/{}/videos/".format(self.id), sample_id, sample, VideoSample.to_protobuf)
                    return sample

            @staticmethod
            def raw(sample_id: str) -> ByteString:
                """Get actual video from video sample identified by `sample_id` on this acquisition 

                Arguments:
                    sample_id {str} -- id of the sample

                Returns:
                    ByteString -- bytes of the video
                """
                try:
                    video = cache._get_cached_data("samples/{}/videos/raw/".format(self.id), sample_id)
                except FileNotFoundError:
                    video = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id + "/raw")
                    file_ext = "." + video.headers["Content-Type"].split("/")[-1]
                    cache._cache_data("samples/{}/videos/raw/".format(self.id), sample_id + file_ext, video.content)

                return video

            @staticmethod
            def count() -> int:
                """Get the number of video samples on this acquisition

                Returns:
                    int -- number of video samples
                """
                return _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + "count").json()

            @staticmethod
            def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
                """Visualize the video of a sample identified by `sample_id`.
                By default opens the predefined system video application.
                A different callback can be given to open the video.

                Arguments:
                    sample_id {str} -- id of the sample

                Keyword Arguments:
                    backend {Callable[[str], None]} -- backend to open the video with (default: {None})
                """
                self.video_samples.raw(sample_id)
                video_path = cache._build_cache_path("samples/{}/videos/raw/".format(self.id), sample_id)
                video_path = cache._add_file_extension(video_path)

                if backend is None:
                    system = platform.system()

                    if system == "Darwin":
                        subprocess.call(("open", video_path))
                    elif system == "Windows":
                        os.startfile(video_path)
                    else:
                        subprocess.call(("xdg-open", video_path))
                else:
                    backend(video_path)

        return Inner()

    @property
    def annotations(self):
        """Annotations' API"""
        class Inner:
            _ANNOTATIONS_ENDPOINT = _ENDPOINT + "{}/annotations/".format(self.id)

            @staticmethod
            def get(annotation_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> AnnotationList:
                """Get all the annotations that belong to this acquisition

                Keyword Arguments:
                    annotation_id {str} -- id of the annotation (default: {None})
                    tags {List[str]} -- tags of the annotation (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the annotation (default: {{}})

                Returns:
                    AnnotationList -- annotation or annotation of video samples
                """
                if annotation_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                        annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                            .json(object_hook=Annotation.from_json)
                        return AnnotationList(annotations)

                    annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT).json(object_hook=Annotation.from_json)
                    for annotation in annotations:
                        cache._cache_data("annotations", annotation.id, annotation, Annotation.to_protobuf)
                    return AnnotationList(annotations)
                else:
                    try:
                        annotation = cache._get_cached_data("annotations", annotation_id, Annotation.from_protobuf)
                    except FileNotFoundError:
                        annotation = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + annotation_id).json(object_hook=Annotation.from_json)
                        cache._cache_data("annotations", annotation_id, annotation, Annotation.to_protobuf)
                    return annotation

            @staticmethod
            def count() -> int:
                """Get the number of annotations on this acquisition

                Returns:
                    int -- number of annotations
                """
                return _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + "count").json()

        return Inner()

    @staticmethod
    def to_protobuf(obj: "Acquisition") -> AcquisitionMessage:
        """Encode an acquisition to a Protobuf message

        Arguments:
            obj {Acquisition} -- acquisition to be encoded

        Returns:
            AcquisitionMessage -- encoded acquisition
        """
        acquisition_message = AcquisitionMessage()
        acquisition_message.entity.CopyFrom(Entity.to_protobuf(obj))
        acquisition_message.creation_timestamp = obj.creation_timestamp

        if obj.sync_offset is not None:
            acquisition_message.sync_offset = obj.sync_offset
        if obj.time_unit is not None:
            acquisition_message.time_unit = obj.time_unit
        if obj.owner_id is not None:
            acquisition_message.owner_id = obj.owner_id
        if obj.creator_id is not None:
            acquisition_message.creator_id = obj.creator_id
        if obj.dataset_id is not None:
            acquisition_message.dataset_id = obj.dataset_id

        acquisition_message.subject.CopyFrom(Subject.to_protobuf(obj._subject))
        acquisition_message.devices.extend([Device.to_protobuf(d) for d in obj._devices])
        acquisition_message.has_timeseries_samples = obj.has_timeseries_samples
        acquisition_message.has_image_samples = obj.has_image_samples
        acquisition_message.has_video_samples = obj.has_video_samples

        return acquisition_message

    @staticmethod
    def from_protobuf(obj: ByteString) -> "Acquisition":
        """Decode a Protobuf message to {Acquisition}

        Arguments:
            obj {ByteString} -- message to be decoded

        Returns:
            Acquisition -- decoded acquisition
        """
        acquisition_message = AcquisitionMessage()
        acquisition_message.ParseFromString(obj)

        return Acquisition(
            type=acquisition_message.entity.type,
            id=acquisition_message.entity.id,
            tags=acquisition_message.entity.tags,
            metadata=acquisition_message.entity.metadata,
            creation_timestamp=acquisition_message.creation_timestamp,
            sync_offset=acquisition_message.sync_offset if acquisition_message.HasField("sync_offset") else None,
            time_unit=acquisition_message.time_unit,
            owner_id=acquisition_message.owner_id if acquisition_message.HasField("owner_id") else None,
            creator_id=acquisition_message.creator_id if acquisition_message.HasField("creator_id") else None,
            dataset_id=acquisition_message.dataset_id if acquisition_message.HasField("dataset_id") else None,
            subject=Subject.from_protobuf(acquisition_message.subject),
            devices=[Device.from_protobuf(d) for d in acquisition_message.devices],
            has_timeseries_samples=acquisition_message.has_timeseries_samples,
            has_image_samples=acquisition_message.has_image_samples,
            has_video_samples=acquisition_message.has_video_samples
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {Acquisition}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Raises:
            ValueError: unexpected object or sub-object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj:
            if obj["type"] == "Acquisition":
                return Acquisition(
                    type=obj["type"],
                    id=obj["id"],
                    tags=obj["tags"],
                    metadata=obj["metadata"],
                    creation_timestamp=obj["creationTimestamp"],
                    sync_offset=obj["syncOffset"],
                    time_unit=obj["timeUnit"],
                    owner_id=obj["ownerId"],
                    creator_id=obj["creatorId"],
                    dataset_id=obj["datasetId"],
                    subject=obj["subject"],
                    devices=obj["devices"],
                    has_timeseries_samples=obj["hasTimeSeriesSamples"],
                    has_image_samples=obj["hasImageSamples"],
                    has_video_samples=obj["hasVideoSamples"]
                )
            elif obj["type"].endswith("Subject"):
                return Subject.from_json(obj)
            elif obj["type"] == "Device":
                return Device.from_json(obj)
            elif obj["type"] == "Sensor":
                return Sensor.from_json(obj)
            else:
                raise ValueError

        return obj

Ancestors

  • dempy._base.Entity

Static methods

def from_json(obj: Dict[str, str]) -> Any

Parse a JSON dictionary to {Acquisition}

Arguments

obj {Dict[str, str]} – JSON object

Raises

ValueError
unexpected object or sub-object

Returns

Any -- parsed object and sub-objects
 
Expand source code
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
    """Parse a JSON dictionary to {Acquisition}

    Arguments:
        obj {Dict[str, str]} -- JSON object

    Raises:
        ValueError: unexpected object or sub-object

    Returns:
        Any -- parsed object and sub-objects
    """
    if "type" in obj:
        if obj["type"] == "Acquisition":
            return Acquisition(
                type=obj["type"],
                id=obj["id"],
                tags=obj["tags"],
                metadata=obj["metadata"],
                creation_timestamp=obj["creationTimestamp"],
                sync_offset=obj["syncOffset"],
                time_unit=obj["timeUnit"],
                owner_id=obj["ownerId"],
                creator_id=obj["creatorId"],
                dataset_id=obj["datasetId"],
                subject=obj["subject"],
                devices=obj["devices"],
                has_timeseries_samples=obj["hasTimeSeriesSamples"],
                has_image_samples=obj["hasImageSamples"],
                has_video_samples=obj["hasVideoSamples"]
            )
        elif obj["type"].endswith("Subject"):
            return Subject.from_json(obj)
        elif obj["type"] == "Device":
            return Device.from_json(obj)
        elif obj["type"] == "Sensor":
            return Sensor.from_json(obj)
        else:
            raise ValueError

    return obj
def from_protobuf(obj: ByteString) -> Acquisition

Decode a Protobuf message to {Acquisition}

Arguments

obj {ByteString} – message to be decoded

Returns

Acquisition -- decoded acquisition
 
Expand source code
@staticmethod
def from_protobuf(obj: ByteString) -> "Acquisition":
    """Decode a Protobuf message to {Acquisition}

    Arguments:
        obj {ByteString} -- message to be decoded

    Returns:
        Acquisition -- decoded acquisition
    """
    acquisition_message = AcquisitionMessage()
    acquisition_message.ParseFromString(obj)

    return Acquisition(
        type=acquisition_message.entity.type,
        id=acquisition_message.entity.id,
        tags=acquisition_message.entity.tags,
        metadata=acquisition_message.entity.metadata,
        creation_timestamp=acquisition_message.creation_timestamp,
        sync_offset=acquisition_message.sync_offset if acquisition_message.HasField("sync_offset") else None,
        time_unit=acquisition_message.time_unit,
        owner_id=acquisition_message.owner_id if acquisition_message.HasField("owner_id") else None,
        creator_id=acquisition_message.creator_id if acquisition_message.HasField("creator_id") else None,
        dataset_id=acquisition_message.dataset_id if acquisition_message.HasField("dataset_id") else None,
        subject=Subject.from_protobuf(acquisition_message.subject),
        devices=[Device.from_protobuf(d) for d in acquisition_message.devices],
        has_timeseries_samples=acquisition_message.has_timeseries_samples,
        has_image_samples=acquisition_message.has_image_samples,
        has_video_samples=acquisition_message.has_video_samples
    )
def to_protobuf(obj: Acquisition) -> dempy_pb2.Acquisition

Encode an acquisition to a Protobuf message

Arguments

obj {Acquisition} – acquisition to be encoded

Returns

AcquisitionMessage -- encoded acquisition
 
Expand source code
@staticmethod
def to_protobuf(obj: "Acquisition") -> AcquisitionMessage:
    """Encode an acquisition to a Protobuf message

    Arguments:
        obj {Acquisition} -- acquisition to be encoded

    Returns:
        AcquisitionMessage -- encoded acquisition
    """
    acquisition_message = AcquisitionMessage()
    acquisition_message.entity.CopyFrom(Entity.to_protobuf(obj))
    acquisition_message.creation_timestamp = obj.creation_timestamp

    if obj.sync_offset is not None:
        acquisition_message.sync_offset = obj.sync_offset
    if obj.time_unit is not None:
        acquisition_message.time_unit = obj.time_unit
    if obj.owner_id is not None:
        acquisition_message.owner_id = obj.owner_id
    if obj.creator_id is not None:
        acquisition_message.creator_id = obj.creator_id
    if obj.dataset_id is not None:
        acquisition_message.dataset_id = obj.dataset_id

    acquisition_message.subject.CopyFrom(Subject.to_protobuf(obj._subject))
    acquisition_message.devices.extend([Device.to_protobuf(d) for d in obj._devices])
    acquisition_message.has_timeseries_samples = obj.has_timeseries_samples
    acquisition_message.has_image_samples = obj.has_image_samples
    acquisition_message.has_video_samples = obj.has_video_samples

    return acquisition_message

Instance variables

var annotations

Annotations' API

Expand source code
@property
def annotations(self):
    """Annotations' API"""
    class Inner:
        _ANNOTATIONS_ENDPOINT = _ENDPOINT + "{}/annotations/".format(self.id)

        @staticmethod
        def get(annotation_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> AnnotationList:
            """Get all the annotations that belong to this acquisition

            Keyword Arguments:
                annotation_id {str} -- id of the annotation (default: {None})
                tags {List[str]} -- tags of the annotation (default: {[]})
                metadata {Dict[str, str]} -- metadata of the annotation (default: {{}})

            Returns:
                AnnotationList -- annotation or annotation of video samples
            """
            if annotation_id is None:
                if len(tags) > 0 or len(metadata) > 0:
                    processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                    annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                        .json(object_hook=Annotation.from_json)
                    return AnnotationList(annotations)

                annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT).json(object_hook=Annotation.from_json)
                for annotation in annotations:
                    cache._cache_data("annotations", annotation.id, annotation, Annotation.to_protobuf)
                return AnnotationList(annotations)
            else:
                try:
                    annotation = cache._get_cached_data("annotations", annotation_id, Annotation.from_protobuf)
                except FileNotFoundError:
                    annotation = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + annotation_id).json(object_hook=Annotation.from_json)
                    cache._cache_data("annotations", annotation_id, annotation, Annotation.to_protobuf)
                return annotation

        @staticmethod
        def count() -> int:
            """Get the number of annotations on this acquisition

            Returns:
                int -- number of annotations
            """
            return _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + "count").json()

    return Inner()
var devices

Devices' API

Expand source code
@property
def devices(self):
    """Devices' API"""
    class Inner:
        _DEVICES_ENDPOINT = _ENDPOINT + "{}/devices/".format(self.id)

        @staticmethod
        def get(device_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Device, List[Device]]:
            """Get a device identified by `device_id` or list of devices on this acquisition

            Keyword Arguments:
                device_id {str} -- id of the device (default: {None})
                tags {List[str]} -- tags of the devices (default: {[]})
                metadata {Dict[str, str]} -- metadata of the devices (default: {{}})

            Raises:
                IndexError: device identified by `device_id` does not exist in this acquisition

            Returns:
                Union[Device, List[Device]] -- device or list of devices
            """
            if device_id is None:
                if len(tags) > 0 or len(metadata) > 0:
                    return [d for d in self._devices if
                            len([k for k in d.metadata if k in metadata and d.metadata[k] == metadata[k]]) > 0]

                return self._devices
            else:
                try:
                    device = next((device for device in self._devices if device.id == device_id))
                except StopIteration:
                    raise IndexError(f"device id {device_id} does not exist in acquisition id {self.id}")
                return device

        @staticmethod
        def usage() -> Dict[str, List[str]]:
            """Get a map identifying which device(s) and sensor(s) were used to acquire time series samples

            Returns:
                Mapping[str, List[str]] -- map of (key, value) pairs, with key being id of device and the value
                a list of sensor ids which were used to capture samples
            """
            return _api_calls.get(Inner._DEVICES_ENDPOINT + "usage").json()

        @staticmethod
        def count() -> int:
            """Get the number of devices on this acquisition

            Returns:
                int -- number of devices
            """
            return len(self._devices)

    return Inner()
var image_samples

Image samples' API

Expand source code
@property
def image_samples(self):
    """Image samples' API"""
    class Inner:
        _IMAGE_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/images/".format(self.id)

        @staticmethod
        def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[ImageSample, SampleList]:
            """Get all the image samples that belong to this acquisition

            Keyword Arguments:
                sample_id {str} -- id of the sample (default: {None})
                tags {List[str]} -- tags of image samples (default: {[]})
                metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

            Returns:
                Union[ImageSample, SampleList] -- image sample or list of image samples
            """
            if sample_id is None:
                if len(tags) > 0 or len(metadata) > 0:
                    processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                    samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                        .json(object_hook=ImageSample.from_json)
                    return SampleList(samples)

                samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT).json(object_hook=ImageSample.from_json)
                for sample in samples:
                    cache._cache_data("samples/{}/images/".format(self.id), sample.id, sample, ImageSample.to_protobuf)
                return SampleList(samples)
            else:
                try:
                    sample = cache._get_cached_data("samples/{}/images/".format(self.id), sample_id, ImageSample.from_protobuf)
                except FileNotFoundError:
                    sample = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id).json(object_hook=ImageSample.from_json)
                    cache._cache_data("samples/{}/images/".format(self.id), sample_id, sample, ImageSample.to_protobuf)
                return sample

        @staticmethod
        def raw(sample_id: str) -> ByteString:
            """Get actual image from image sample identified by `sample_id` on this acquisition

            Arguments:
                sample_id {str} -- id of the sample

            Returns:
                ByteString -- bytes of the image
            """
            try:
                image = cache._get_cached_data("samples/{}/images/raw/".format(self.id), sample_id)
            except FileNotFoundError:
                image = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id + "/raw")
                file_ext = "." + image.headers["Content-Type"].split("/")[-1]
                cache._cache_data("samples/{}/images/raw/".format(self.id), sample_id + file_ext, image.content)

            return image

        @staticmethod
        def count() -> int:
            """Get the number of image samples on this acquisition

            Returns:
                int -- number of image samples
            """
            return _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + "count").json()

        @staticmethod
        def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
            """Visualize the image of a sample identified by `sample_id`.
            By default opens the predefined system image application.
            A different callback can be given to open the image.

            Arguments:
                sample_id {str} -- id of the sample

            Keyword Arguments:
                backend {Callable[[str], None]} -- backend to open the image with (default: {None})
            """
            self.image_samples.raw(sample_id)
            image_path = cache._build_cache_path("samples/{}/images/raw/".format(self.id), sample_id)
            image_path = cache._add_file_extension(image_path)

            if backend is None:
                system = platform.system()

                if system == "Darwin":
                    subprocess.call(("open", image_path))
                elif system == "Windows":
                    os.startfile(image_path)
                else:
                    subprocess.call(("xdg-open", image_path))
            else:
                backend(image_path)

    return Inner()
var subject

Subject's API

Expand source code
@property
def subject(self):    
    """Subject's API"""
    class Inner:
        _SUBJECT_ENDPOINT = _ENDPOINT + "{}/subjects/".format(self.id)

        @staticmethod
        def get() -> Subject:
            """Get subject from acquisition

            Returns:
                Subject -- subject of the acquisition
            """
            return self._subject

    return Inner()
var timeseries_samples

Timeseries samples' API

Expand source code
@property
def timeseries_samples(self):
    """Timeseries samples' API"""
    class Inner:
        _TIMESERIES_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/timeseries/".format(self.id)

        @staticmethod
        def get(tags: List[str] = [], metadata: Dict[str, str] = {}) -> SampleList:
            """Get all the timeseries samples that belong to this acquisition

            Keyword Arguments:
                tags {List[str]} -- tags of the timeseries samples (default: {[]})
                metadata {Dict[str, str]} -- metadata of the timeseries samples (default: {{}})

            Returns:
                SampleList -- list of timeseries samples
            """
            if len(tags) > 0 or len(metadata) > 0:
                processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                    .json(object_hook=TimeseriesSample.from_json)
                samples.sort(key=lambda sample: sample.timestamp)
                return SampleList(samples)

            try:
                samples = cache._get_cached_data("samples/{}/".format(self.id), "timeseries", SampleList.from_protobuf)
            except FileNotFoundError:
                samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT) \
                    .json(object_hook=TimeseriesSample.from_json)
                samples.sort(key=lambda sample: sample.timestamp)
                samples = SampleList(samples)
                cache._cache_data("samples/{}/".format(self.id), "timeseries", samples, SampleList.to_protobuf)
            return samples

        @staticmethod
        def count() -> int:
            """Get the number of timeseries samples on this acquisition

            Returns:
                int -- number of timeseries samples
            """
            return _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT + "count").json()

        @staticmethod
        def visualize(device_id: str, sensor_id: str = None) -> None:
            """Graphically visualize the timeseries samples of a device identified by `device_id` 
            or of a given sensor identified by `sensor_id` of said device

            Arguments:
                device_id {str} -- id of the device

            Keyword Arguments:
                sensor_id {str} -- id of the sensor (default: {None})
            """
            def visualize_sensor_samples(axis, sensor, sensor_samples):
                timestamps = [s.timestamp for s in sensor_samples]

                # Sample x, y, z, u, w
                samples_x = [s.x for s in sensor_samples if hasattr(s, "x")]
                samples_y = [s.y for s in sensor_samples if hasattr(s, "y")]
                samples_z = [s.z for s in sensor_samples if hasattr(s, "z")]
                samples_u = [s.u for s in sensor_samples if hasattr(s, "u")]
                samples_w = [s.w for s in sensor_samples if hasattr(s, "w")]

                # Title and x label
                axis.set_title(f"{sensor.sensor_type}\n{sensor.id}", loc="left")
                axis.set_xlabel(sensor.time_unit if sensor.time_unit is not None else device_time_unit)

                # Axis limit
                axis.set_xlim([0, timestamps[-1]])
                axis.set_ylim([min(chain(samples_x, samples_y, samples_z, samples_u, samples_w)),
                               max(chain(samples_x, samples_y, samples_z, samples_u, samples_w))])

                # Axis formatter
                axis.xaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))
                axis.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))

                # Axis plot
                labels = []

                if len(samples_x) > 0:
                    axis.plot(timestamps, samples_x, color="cornflowerblue")
                    labels.append("x")
                if len(samples_y) > 0:
                    axis.plot(timestamps, samples_y, color="mediumseagreen")
                    labels.append("y")
                if len(samples_z) > 0:
                    axis.plot(timestamps, samples_z, color="indianred")
                    labels.append("z")
                if len(samples_u) > 0:
                    axis.plot(timestamps, samples_u, color="mediumorchid")
                    labels.append("u")
                if len(samples_w) > 0:
                    axis.plot(timestamps, samples_w, color="slategray")
                    labels.append("w")

                axis.legend(labels=labels, loc="upper right")

            device = self.devices.get(device_id=device_id)
            device_time_unit = device.time_unit if device.time_unit is not None else self.time_unit

            if sensor_id is None:
                fig, axs = plt.subplots(nrows=device.sensors.count(), figsize=(15, 10), dpi=80, constrained_layout=True)
                fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                device_samples = self.timeseries_samples.get().by_device(device_id=device.id)

                for i, sensor in enumerate(device.sensors.get()):
                    visualize_sensor_samples(axs[i], sensor, device_samples.by_sensor(sensor.id))
            else:
                fig, ax = plt.subplots(nrows=1, figsize=(10, 4), dpi=80, constrained_layout=True)
                fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                sensor_samples = self.timeseries_samples.get().by_sensor(sensor_id)

                visualize_sensor_samples(ax, device.sensors.get(sensor_id=sensor_id), sensor_samples)

            plt.show()

    return Inner()
var video_samples

Video samples' API

Expand source code
@property
def video_samples(self):
    """Video samples' API"""
    class Inner:
        _VIDEO_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/videos/".format(self.id)

        @staticmethod
        def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[VideoSample, SampleList]:
            """Get all the video samples that belong to this acquisition

            Keyword Arguments:
                sample_id {str} -- id of the sample (default: {None})
                tags {List[str]} -- tags of image samples (default: {[]})
                metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

            Returns:
                Union[VideoSample, SampleList] -- video sample or list of video samples
            """
            if sample_id is None:
                if len(tags) > 0 or len(metadata) > 0:
                    processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                    samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                        .json(object_hook=VideoSample.from_json)
                    return SampleList(samples)

                samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT).json(object_hook=VideoSample.from_json)
                for sample in samples:
                    cache._cache_data("samples/{}/videos/".format(self.id), sample.id, sample, VideoSample.to_protobuf)
                return SampleList(samples)
            else:
                try:
                    sample = cache._get_cached_data("samples/{}/videos/".format(self.id), sample_id, VideoSample.from_protobuf)
                except FileNotFoundError:
                    sample = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id).json(object_hook=VideoSample.from_json)
                    cache._cache_data("samples/{}/videos/".format(self.id), sample_id, sample, VideoSample.to_protobuf)
                return sample

        @staticmethod
        def raw(sample_id: str) -> ByteString:
            """Get actual video from video sample identified by `sample_id` on this acquisition 

            Arguments:
                sample_id {str} -- id of the sample

            Returns:
                ByteString -- bytes of the video
            """
            try:
                video = cache._get_cached_data("samples/{}/videos/raw/".format(self.id), sample_id)
            except FileNotFoundError:
                video = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id + "/raw")
                file_ext = "." + video.headers["Content-Type"].split("/")[-1]
                cache._cache_data("samples/{}/videos/raw/".format(self.id), sample_id + file_ext, video.content)

            return video

        @staticmethod
        def count() -> int:
            """Get the number of video samples on this acquisition

            Returns:
                int -- number of video samples
            """
            return _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + "count").json()

        @staticmethod
        def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
            """Visualize the video of a sample identified by `sample_id`.
            By default opens the predefined system video application.
            A different callback can be given to open the video.

            Arguments:
                sample_id {str} -- id of the sample

            Keyword Arguments:
                backend {Callable[[str], None]} -- backend to open the video with (default: {None})
            """
            self.video_samples.raw(sample_id)
            video_path = cache._build_cache_path("samples/{}/videos/raw/".format(self.id), sample_id)
            video_path = cache._add_file_extension(video_path)

            if backend is None:
                system = platform.system()

                if system == "Darwin":
                    subprocess.call(("open", video_path))
                elif system == "Windows":
                    os.startfile(video_path)
                else:
                    subprocess.call(("xdg-open", video_path))
            else:
                backend(video_path)

    return Inner()
class Annotation (type: str, id: str, tags: List[str], metadata: Dict[str, str], acquisition_id: str, creator_id: str, annotation_object: AnnotationObject, color: str, notes: str, **kwargs)

Annotation class

Expand source code
class Annotation(Entity):
    """Annotation class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], acquisition_id: str, creator_id: str,
                 annotation_object: AnnotationObject, color: str, notes: str, **kwargs):
        super().__init__(type, id, tags, metadata)
        self.acquisition_id = acquisition_id
        self.creator_id = creator_id
        self.annotation_object = annotation_object
        self.color = color
        self.notes = notes

        if self.type == "WholeImageAnnotation":
            self.annotated_sample_id: str = kwargs.get("annotated_sample_id")
        elif self.type == "PointAnnotation":
            self.annotated_sample_id: str = kwargs.get("annotated_sample_id")
            self.point: AnnotationPoint = kwargs.get("point")
        elif self.type == "CircleAnnotation":
            self.annotated_sample_id: str = kwargs.get("annotated_sample_id")
            self.center: AnnotationPoint = kwargs.get("center")
            self.radius: float = kwargs.get("radius")
        elif self.type == "DrawAnnotation":
            self.annotated_sample_id: str = kwargs.get("annotated_sample_id")
            self.points: List[AnnotationPoint] = kwargs.get("points")
        elif self.type == "RectangleAnnotation":
            self.annotated_sample_id: str = kwargs.get("annotated_sample_id")
            self.point: AnnotationPoint = kwargs.get("point")
            self.width: float = kwargs.get("width")
            self.height: float = kwargs.get("height")
        elif self.type == "PolygonAnnotation":
            self.annotated_sample_id: str = kwargs.get("annotated_sample_id")
            self.points: List[AnnotationPoint] = kwargs.get("points")
        elif self.type == "TimeSeriesInstantAnnotation":
            self.timestamp: int = kwargs.get("timestamp")
            self.device_id: str = kwargs.get("device_id")
            self.sensor_id: str = kwargs.get("sensor_id")
        elif self.type == "TimeSeriesIntervalAnnotation":
            self.timestamp_start: int = kwargs.get("timestamp_start")
            self.timestamp_end: int = kwargs.get("timestamp_end")
            self.device_id: str = kwargs.get("device_id")
            self.sensor_id: str = kwargs.get("sensor_id")
        else:
            raise ValueError

    @staticmethod
    def to_protobuf(obj: "Annotation") -> AnnotationMessage:
        """Encode an annotation to a Protobuf message

        Arguments:
            obj {Annotation} -- annotation to be encoded

        Returns:
            AnnotationMessage -- encoded annotation
        """
        annotation_message = AnnotationMessage()
        annotation_message.entity.CopyFrom(Entity.to_protobuf(obj))
        annotation_message.acquisition_id = obj.acquisition_id

        if obj.creator_id is not None:
            annotation_message.creator_id = obj.creator_id

        annotation_message.annotation_object.CopyFrom(AnnotationObject.to_protobuf(obj.annotation_object))

        if obj.color is not None:
            annotation_message.color = obj.color
        if obj.notes is not None:
            annotation_message.notes = obj.notes

        if obj.type == "WholeImageAnnotation":
            annotation_message.annotated_sample_id = obj.annotated_sample_id
        elif obj.type == "PointAnnotation":
            annotation_message.annotated_sample_id = obj.annotated_sample_id
            annotation_message.point.CopyFrom(AnnotationPoint.to_protobuf(obj.point))
        elif obj.type == "CircleAnnotation":
            annotation_message.annotated_sample_id = obj.annotated_sample_id
            annotation_message.center.CopyFrom(AnnotationPoint.to_protobuf(obj.center))
            annotation_message.radius = obj.radius
        elif obj.type == "DrawAnnotation":
            annotation_message.annotated_sample_id = obj.annotated_sample_id
            annotation_message.points.extend([AnnotationPoint.to_protobuf(p) for p in obj.points])
        elif obj.type == "RectangleAnnotation":
            annotation_message.annotated_sample_id = obj.annotated_sample_id
            annotation_message.point.CopyFrom(AnnotationPoint.to_protobuf(obj.point))
            annotation_message.width = obj.width
            annotation_message.height = obj.height
        elif obj.type == "PolygonAnnotation":
            annotation_message.annotated_sample_id = obj.annotated_sample_id
            annotation_message.points.extend([AnnotationPoint.to_protobuf(p) for p in obj.points])
        elif obj.type == "TimeSeriesInstantAnnotation":
            annotation_message.timestamp = obj.timestamp

            if obj.device_id is not None:
                annotation_message.device_id = obj.device_id
            if obj.sensor_id is not None:
                annotation_message.sensor_id = obj.sensor_id
        elif obj.type == "TimeSeriesIntervalAnnotation":
            annotation_message.timestamp_start = obj.timestamp_start
            annotation_message.timestamp_end = obj.timestamp_end

            if obj.device_id is not None:
                annotation_message.device_id = obj.device_id
            if obj.sensor_id is not None:
                annotation_message.sensor_id = obj.sensor_id
        else:
            raise ValueError

        return annotation_message

    @staticmethod
    def from_protobuf(obj: ByteString) -> "Annotation":
        """Decode a Protobuf message to {Annotation}

        Arguments:
            obj {ByteString} -- message to be decoded

        Returns:
            Annotation -- decoded annotation
        """
        annotation_message = AnnotationMessage()
        annotation_message.ParseFromString(obj)

        return Annotation(
            type=annotation_message.entity.type,
            id=annotation_message.entity.id,
            tags=annotation_message.entity.tags,
            metadata=annotation_message.entity.metadata,
            acquisition_id=annotation_message.acquisition_id,
            creator_id=annotation_message.creator_id,
            annotation_object=AnnotationObject.from_protobuf(annotation_message.annotation_object),
            color=annotation_message.color if annotation_message.HasField("color") else None,
            notes=annotation_message.notes if annotation_message.HasField("notes") else None,
            annotated_sample_id=annotation_message.annotated_sample_id if annotation_message.HasField("annotated_sample_id") else None,
            point=AnnotationPoint.from_protobuf(annotation_message.point) if annotation_message.HasField("point") else None,
            center=AnnotationPoint.from_protobuf(annotation_message.center) if annotation_message.HasField("center") else None,
            radius=annotation_message.radius if annotation_message.HasField("radius") else None,
            points=[AnnotationPoint.from_protobuf(p) for p in annotation_message.points],
            width=annotation_message.width if annotation_message.HasField("width") else None,
            height=annotation_message.height if annotation_message.HasField("height") else None,
            timestamp=annotation_message.timestamp if annotation_message.HasField("timestamp") else None,
            device_id=annotation_message.device_id if annotation_message.HasField("device_id") else None,
            sensor_id=annotation_message.sensor_id if annotation_message.HasField("sensor_id") else None,
            timestamp_start=annotation_message.timestamp_start if annotation_message.HasField("timestamp_start") else None,
            timestamp_end=annotation_message.timestamp_end if annotation_message.HasField("timestamp_end") else None,
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {Annotation}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Raises:
            ValueError: unexpected object or sub-object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj:
            if obj["type"].endswith("Annotation"):
                annotation = partial(
                    Annotation,
                    type=obj["type"],
                    id=obj["id"],
                    tags=obj["tags"],
                    metadata=obj["metadata"],
                    acquisition_id=obj["acquisitionId"],
                    creator_id=obj["creatorId"],
                    annotation_object=obj["annotationObject"],
                    color=obj["color"],
                    notes=obj["notes"]
                )

                if obj["type"] == "WholeImageAnnotation":
                    return annotation(
                        annotated_sample_id=obj["annotatedSampleId"]
                    )
                elif obj["type"] == "PointAnnotation":
                    return annotation(
                        annotated_sample_id=obj["annotatedSampleId"],
                        point=AnnotationPoint.from_json(obj["point"])
                    )
                elif obj["type"] == "CircleAnnotation":
                    return annotation(
                        annotated_sample_id=obj["annotatedSampleId"],
                        center=AnnotationPoint.from_json(obj["center"]),
                        radius=obj["radius"]
                    )
                elif obj["type"] == "DrawAnnotation":
                    return annotation(
                        annotated_sample_id=obj["annotatedSampleId"],
                        points=[AnnotationPoint.from_json(p) for p in obj["points"]]
                    )
                elif obj["type"] == "RectangleAnnotation":
                    return annotation(
                        annotated_sample_id=obj["annotatedSampleId"],
                        point=AnnotationPoint.from_json(obj["point"]),
                        width=obj["width"],
                        height=obj["height"]
                    )
                elif obj["type"] == "PolygonAnnotation":
                    return annotation(
                        annotated_sample_id=obj["annotatedSampleId"],
                        points=[AnnotationPoint.from_json(p) for p in obj["points"]]
                    )
                elif obj["type"] == "TimeSeriesInstantAnnotation":
                    return annotation(
                        timestamp=obj["timestamp"],
                        device_id=obj["deviceId"],
                        sensor_id=obj["sensorId"]
                    )
                elif obj["type"] == "TimeSeriesIntervalAnnotation":
                    return annotation(
                        timestamp_start=obj["timestampStart"],
                        timestamp_end=obj["timestampEnd"],
                        device_id=obj["deviceId"],
                        sensor_id=obj["sensorId"]
                    )
                else:
                    raise ValueError
            elif obj["type"] == "AnnotationText" or obj["type"] == "AnnotationImage":
                return AnnotationObject.from_json(obj)
            else:
                raise ValueError

        return obj

Ancestors

  • dempy._base.Entity

Static methods

def from_json(obj: Dict[str, str]) -> Any

Parse a JSON dictionary to {Annotation}

Arguments

obj {Dict[str, str]} – JSON object

Raises

ValueError
unexpected object or sub-object

Returns

Any -- parsed object and sub-objects
 
Expand source code
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
    """Parse a JSON dictionary to {Annotation}

    Arguments:
        obj {Dict[str, str]} -- JSON object

    Raises:
        ValueError: unexpected object or sub-object

    Returns:
        Any -- parsed object and sub-objects
    """
    if "type" in obj:
        if obj["type"].endswith("Annotation"):
            annotation = partial(
                Annotation,
                type=obj["type"],
                id=obj["id"],
                tags=obj["tags"],
                metadata=obj["metadata"],
                acquisition_id=obj["acquisitionId"],
                creator_id=obj["creatorId"],
                annotation_object=obj["annotationObject"],
                color=obj["color"],
                notes=obj["notes"]
            )

            if obj["type"] == "WholeImageAnnotation":
                return annotation(
                    annotated_sample_id=obj["annotatedSampleId"]
                )
            elif obj["type"] == "PointAnnotation":
                return annotation(
                    annotated_sample_id=obj["annotatedSampleId"],
                    point=AnnotationPoint.from_json(obj["point"])
                )
            elif obj["type"] == "CircleAnnotation":
                return annotation(
                    annotated_sample_id=obj["annotatedSampleId"],
                    center=AnnotationPoint.from_json(obj["center"]),
                    radius=obj["radius"]
                )
            elif obj["type"] == "DrawAnnotation":
                return annotation(
                    annotated_sample_id=obj["annotatedSampleId"],
                    points=[AnnotationPoint.from_json(p) for p in obj["points"]]
                )
            elif obj["type"] == "RectangleAnnotation":
                return annotation(
                    annotated_sample_id=obj["annotatedSampleId"],
                    point=AnnotationPoint.from_json(obj["point"]),
                    width=obj["width"],
                    height=obj["height"]
                )
            elif obj["type"] == "PolygonAnnotation":
                return annotation(
                    annotated_sample_id=obj["annotatedSampleId"],
                    points=[AnnotationPoint.from_json(p) for p in obj["points"]]
                )
            elif obj["type"] == "TimeSeriesInstantAnnotation":
                return annotation(
                    timestamp=obj["timestamp"],
                    device_id=obj["deviceId"],
                    sensor_id=obj["sensorId"]
                )
            elif obj["type"] == "TimeSeriesIntervalAnnotation":
                return annotation(
                    timestamp_start=obj["timestampStart"],
                    timestamp_end=obj["timestampEnd"],
                    device_id=obj["deviceId"],
                    sensor_id=obj["sensorId"]
                )
            else:
                raise ValueError
        elif obj["type"] == "AnnotationText" or obj["type"] == "AnnotationImage":
            return AnnotationObject.from_json(obj)
        else:
            raise ValueError

    return obj
def from_protobuf(obj: ByteString) -> Annotation

Decode a Protobuf message to {Annotation}

Arguments

obj {ByteString} – message to be decoded

Returns

Annotation -- decoded annotation
 
Expand source code
@staticmethod
def from_protobuf(obj: ByteString) -> "Annotation":
    """Decode a Protobuf message to {Annotation}

    Arguments:
        obj {ByteString} -- message to be decoded

    Returns:
        Annotation -- decoded annotation
    """
    annotation_message = AnnotationMessage()
    annotation_message.ParseFromString(obj)

    return Annotation(
        type=annotation_message.entity.type,
        id=annotation_message.entity.id,
        tags=annotation_message.entity.tags,
        metadata=annotation_message.entity.metadata,
        acquisition_id=annotation_message.acquisition_id,
        creator_id=annotation_message.creator_id,
        annotation_object=AnnotationObject.from_protobuf(annotation_message.annotation_object),
        color=annotation_message.color if annotation_message.HasField("color") else None,
        notes=annotation_message.notes if annotation_message.HasField("notes") else None,
        annotated_sample_id=annotation_message.annotated_sample_id if annotation_message.HasField("annotated_sample_id") else None,
        point=AnnotationPoint.from_protobuf(annotation_message.point) if annotation_message.HasField("point") else None,
        center=AnnotationPoint.from_protobuf(annotation_message.center) if annotation_message.HasField("center") else None,
        radius=annotation_message.radius if annotation_message.HasField("radius") else None,
        points=[AnnotationPoint.from_protobuf(p) for p in annotation_message.points],
        width=annotation_message.width if annotation_message.HasField("width") else None,
        height=annotation_message.height if annotation_message.HasField("height") else None,
        timestamp=annotation_message.timestamp if annotation_message.HasField("timestamp") else None,
        device_id=annotation_message.device_id if annotation_message.HasField("device_id") else None,
        sensor_id=annotation_message.sensor_id if annotation_message.HasField("sensor_id") else None,
        timestamp_start=annotation_message.timestamp_start if annotation_message.HasField("timestamp_start") else None,
        timestamp_end=annotation_message.timestamp_end if annotation_message.HasField("timestamp_end") else None,
    )
def to_protobuf(obj: Annotation) -> dempy_pb2.Annotation

Encode an annotation to a Protobuf message

Arguments

obj {Annotation} – annotation to be encoded

Returns

AnnotationMessage -- encoded annotation
 
Expand source code
@staticmethod
def to_protobuf(obj: "Annotation") -> AnnotationMessage:
    """Encode an annotation to a Protobuf message

    Arguments:
        obj {Annotation} -- annotation to be encoded

    Returns:
        AnnotationMessage -- encoded annotation
    """
    annotation_message = AnnotationMessage()
    annotation_message.entity.CopyFrom(Entity.to_protobuf(obj))
    annotation_message.acquisition_id = obj.acquisition_id

    if obj.creator_id is not None:
        annotation_message.creator_id = obj.creator_id

    annotation_message.annotation_object.CopyFrom(AnnotationObject.to_protobuf(obj.annotation_object))

    if obj.color is not None:
        annotation_message.color = obj.color
    if obj.notes is not None:
        annotation_message.notes = obj.notes

    if obj.type == "WholeImageAnnotation":
        annotation_message.annotated_sample_id = obj.annotated_sample_id
    elif obj.type == "PointAnnotation":
        annotation_message.annotated_sample_id = obj.annotated_sample_id
        annotation_message.point.CopyFrom(AnnotationPoint.to_protobuf(obj.point))
    elif obj.type == "CircleAnnotation":
        annotation_message.annotated_sample_id = obj.annotated_sample_id
        annotation_message.center.CopyFrom(AnnotationPoint.to_protobuf(obj.center))
        annotation_message.radius = obj.radius
    elif obj.type == "DrawAnnotation":
        annotation_message.annotated_sample_id = obj.annotated_sample_id
        annotation_message.points.extend([AnnotationPoint.to_protobuf(p) for p in obj.points])
    elif obj.type == "RectangleAnnotation":
        annotation_message.annotated_sample_id = obj.annotated_sample_id
        annotation_message.point.CopyFrom(AnnotationPoint.to_protobuf(obj.point))
        annotation_message.width = obj.width
        annotation_message.height = obj.height
    elif obj.type == "PolygonAnnotation":
        annotation_message.annotated_sample_id = obj.annotated_sample_id
        annotation_message.points.extend([AnnotationPoint.to_protobuf(p) for p in obj.points])
    elif obj.type == "TimeSeriesInstantAnnotation":
        annotation_message.timestamp = obj.timestamp

        if obj.device_id is not None:
            annotation_message.device_id = obj.device_id
        if obj.sensor_id is not None:
            annotation_message.sensor_id = obj.sensor_id
    elif obj.type == "TimeSeriesIntervalAnnotation":
        annotation_message.timestamp_start = obj.timestamp_start
        annotation_message.timestamp_end = obj.timestamp_end

        if obj.device_id is not None:
            annotation_message.device_id = obj.device_id
        if obj.sensor_id is not None:
            annotation_message.sensor_id = obj.sensor_id
    else:
        raise ValueError

    return annotation_message
class Device (type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str, manufacturer: str, model_name: str, sensors: List[Sensor])

Device class

Expand source code
class Device(Entity):
    """Device class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str,
                 manufacturer: str, model_name: str, sensors: List[Sensor]):
        super().__init__(type, id, tags, metadata)
        self.sync_offset = sync_offset
        self.time_unit = time_unit
        self.serial_number = serial_number
        self.manufacturer = manufacturer
        self.model_name = model_name
        self._sensors = sensors

    @property
    def sensors(self):
        """Sensors' API"""
        class Inner:
            @staticmethod
            def get(sensor_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Sensor, List[Sensor]]:
                """Get all the sensors that belong to this device

                Keyword Arguments:
                    sensor_id {str} -- id of the sensor (default: {None})
                    tags {List[str]} -- tags of the sensor (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the sensor (default: {{}})

                Raises:
                    IndexError: sensor identified by `sensor_id` does not exist in this device

                Returns:
                    Union[Sensor, List[Sensor]] -- sensor or list of sensors
                """                
                if sensor_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        return [s for s in self._sensors if
                                len([k for k in s.metadata if k in metadata and s.metadata[k] == metadata[k]]) > 0]

                    return self._sensors
                else:
                    try:
                        sensor = next((sensor for sensor in self._sensors if sensor.id == sensor_id))
                    except StopIteration:
                        raise IndexError(f"sensor id {sensor_id} does not exist in acquisition id {self.id}")
                    return sensor

            @staticmethod
            def count() -> int:
                """Get the number of sensors on this device

                Returns:
                    int -- number of sensors
                """
                return len(self._sensors)

        return Inner()

    @staticmethod
    def to_protobuf(obj: "Device") -> DeviceMessage:
        """Encode an device to a Protobuf message

        Arguments:
            obj {Device} -- device to be encoded

        Returns:
            DeviceMessage -- encoded device
        """
        device_message = DeviceMessage()
        device_message.entity.CopyFrom(Entity.to_protobuf(obj))

        if obj.sync_offset is not None:
            device_message.sync_offset = obj.sync_offset
        if obj.time_unit is not None:
            device_message.time_unit = obj.time_unit
        if obj.serial_number is not None:
            device_message.serial_number = obj.serial_number
        if obj.manufacturer is not None:
            device_message.manufacturer = obj.manufacturer
        if obj.model_name is not None:
            device_message.model_name = obj.model_name

        device_message.sensors.extend([Sensor.to_protobuf(s) for s in obj._sensors])

        return device_message

    @staticmethod
    def from_protobuf(device_message: DeviceMessage) -> "Device":
        """Decode a Protobuf message to {Device}

        Arguments:
            obj {DeviceMessage} -- message to be decoded

        Returns:
            Device -- decoded device
        """
        return Device(
            type=device_message.entity.type,
            id=device_message.entity.id,
            tags=device_message.entity.tags,
            metadata=device_message.entity.metadata,
            sync_offset=device_message.sync_offset if device_message.HasField("sync_offset") else None,
            time_unit=device_message.time_unit if device_message.HasField("time_unit") else None,
            serial_number=device_message.serial_number if device_message.HasField("serial_number") else None,
            manufacturer=device_message.manufacturer if device_message.HasField("manufacturer") else None,
            model_name=device_message.model_name if device_message.HasField("model_name") else None,
            sensors=[Sensor.from_protobuf(s) for s in device_message.sensors]
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {Device}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Raises:
            ValueError: unexpected object or sub-object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj:
            if obj["type"] == "Device":
                return Device(
                    type=obj["type"],
                    id=obj["id"],
                    tags=obj["tags"],
                    metadata=obj["metadata"],
                    sync_offset=obj["syncOffset"],
                    time_unit=obj["timeUnit"],
                    serial_number=obj["serialNumber"],
                    manufacturer=obj["manufacturer"],
                    model_name=obj["modelName"],
                    sensors=obj["sensors"],
                )
            elif obj["type"] == "Sensor":
                return Sensor.from_json(obj)
            else:
                raise ValueError

        return obj

Ancestors

  • dempy._base.Entity

Static methods

def from_json(obj: Dict[str, str]) -> Any

Parse a JSON dictionary to {Device}

Arguments

obj {Dict[str, str]} – JSON object

Raises

ValueError
unexpected object or sub-object

Returns

Any -- parsed object and sub-objects
 
Expand source code
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
    """Parse a JSON dictionary to {Device}

    Arguments:
        obj {Dict[str, str]} -- JSON object

    Raises:
        ValueError: unexpected object or sub-object

    Returns:
        Any -- parsed object and sub-objects
    """
    if "type" in obj:
        if obj["type"] == "Device":
            return Device(
                type=obj["type"],
                id=obj["id"],
                tags=obj["tags"],
                metadata=obj["metadata"],
                sync_offset=obj["syncOffset"],
                time_unit=obj["timeUnit"],
                serial_number=obj["serialNumber"],
                manufacturer=obj["manufacturer"],
                model_name=obj["modelName"],
                sensors=obj["sensors"],
            )
        elif obj["type"] == "Sensor":
            return Sensor.from_json(obj)
        else:
            raise ValueError

    return obj
def from_protobuf(device_message: dempy_pb2.Device) -> Device

Decode a Protobuf message to {Device}

Arguments

obj {DeviceMessage} – message to be decoded

Returns

Device -- decoded device
 
Expand source code
@staticmethod
def from_protobuf(device_message: DeviceMessage) -> "Device":
    """Decode a Protobuf message to {Device}

    Arguments:
        obj {DeviceMessage} -- message to be decoded

    Returns:
        Device -- decoded device
    """
    return Device(
        type=device_message.entity.type,
        id=device_message.entity.id,
        tags=device_message.entity.tags,
        metadata=device_message.entity.metadata,
        sync_offset=device_message.sync_offset if device_message.HasField("sync_offset") else None,
        time_unit=device_message.time_unit if device_message.HasField("time_unit") else None,
        serial_number=device_message.serial_number if device_message.HasField("serial_number") else None,
        manufacturer=device_message.manufacturer if device_message.HasField("manufacturer") else None,
        model_name=device_message.model_name if device_message.HasField("model_name") else None,
        sensors=[Sensor.from_protobuf(s) for s in device_message.sensors]
    )
def to_protobuf(obj: Device) -> dempy_pb2.Device

Encode an device to a Protobuf message

Arguments

obj {Device} – device to be encoded

Returns

DeviceMessage -- encoded device
 
Expand source code
@staticmethod
def to_protobuf(obj: "Device") -> DeviceMessage:
    """Encode an device to a Protobuf message

    Arguments:
        obj {Device} -- device to be encoded

    Returns:
        DeviceMessage -- encoded device
    """
    device_message = DeviceMessage()
    device_message.entity.CopyFrom(Entity.to_protobuf(obj))

    if obj.sync_offset is not None:
        device_message.sync_offset = obj.sync_offset
    if obj.time_unit is not None:
        device_message.time_unit = obj.time_unit
    if obj.serial_number is not None:
        device_message.serial_number = obj.serial_number
    if obj.manufacturer is not None:
        device_message.manufacturer = obj.manufacturer
    if obj.model_name is not None:
        device_message.model_name = obj.model_name

    device_message.sensors.extend([Sensor.to_protobuf(s) for s in obj._sensors])

    return device_message

Instance variables

var sensors

Sensors' API

Expand source code
@property
def sensors(self):
    """Sensors' API"""
    class Inner:
        @staticmethod
        def get(sensor_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Sensor, List[Sensor]]:
            """Get all the sensors that belong to this device

            Keyword Arguments:
                sensor_id {str} -- id of the sensor (default: {None})
                tags {List[str]} -- tags of the sensor (default: {[]})
                metadata {Dict[str, str]} -- metadata of the sensor (default: {{}})

            Raises:
                IndexError: sensor identified by `sensor_id` does not exist in this device

            Returns:
                Union[Sensor, List[Sensor]] -- sensor or list of sensors
            """                
            if sensor_id is None:
                if len(tags) > 0 or len(metadata) > 0:
                    return [s for s in self._sensors if
                            len([k for k in s.metadata if k in metadata and s.metadata[k] == metadata[k]]) > 0]

                return self._sensors
            else:
                try:
                    sensor = next((sensor for sensor in self._sensors if sensor.id == sensor_id))
                except StopIteration:
                    raise IndexError(f"sensor id {sensor_id} does not exist in acquisition id {self.id}")
                return sensor

        @staticmethod
        def count() -> int:
            """Get the number of sensors on this device

            Returns:
                int -- number of sensors
            """
            return len(self._sensors)

    return Inner()
class ImageSample (type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str, sensor_id: str, media_type: str, image_source: str, has_rotation_metadata: bool)

ImageSample class

Expand source code
class ImageSample(Entity):
    """ImageSample class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str,
                 sensor_id: str, media_type: str, image_source: str, has_rotation_metadata: bool):
        super().__init__(type, id, tags, metadata)
        self.timestamp = timestamp
        self.acquisition_id = acquisition_id
        self.device_id = device_id
        self.sensor_id = sensor_id
        self.media_type = media_type
        self.image_source = image_source
        self.has_rotation_metadata = has_rotation_metadata

    @staticmethod
    def to_protobuf(obj: "ImageSample") -> ImageMessage:
        """Encode a image sample to a Protobuf message

        Arguments:
            obj {ImageSample} -- image sample to be encoded

        Returns:
            ImageMessage -- encoded image sample
        """
        image_message = ImageMessage()
        image_message.entity.CopyFrom(Entity.to_protobuf(obj))

        image_message.timestamp = obj.timestamp
        image_message.acquisition_id = obj.acquisition_id

        if obj.device_id is not None:
            image_message.device_id = obj.device_id
        if obj.sensor_id is not None:
            image_message.sensor_id = obj.sensor_id

        image_message.media_type = obj.media_type
        image_message.image_source = obj.image_source
        image_message.has_rotation_metadata = obj.has_rotation_metadata

        return image_message

    @staticmethod
    def from_protobuf(obj: Union[ByteString, ImageMessage]) -> "ImageSample":
        """Decode a Protobuf message to {ImageSample}

        Arguments:
            obj {Union[ByteString, ImageMessage]} -- message to be decoded

        Returns:
            ImageSample -- decoded image sample
        """
        image_message = obj if isinstance(obj, ImageMessage) else ImageMessage().ParseFromString(obj)

        return ImageSample(
            type=image_message.entity.type,
            id=image_message.entity.id,
            tags=image_message.entity.tags,
            metadata=image_message.entity.metadata,
            timestamp=image_message.timestamp,
            acquisition_id=image_message.acquisition_id,
            device_id=image_message.device_id if image_message.HasField("device_id") else None,
            sensor_id=image_message.sensor_id if image_message.HasField("sensor_id") else None,
            media_type=image_message.media_type,
            image_source=image_message.image_source,
            has_rotation_metadata=image_message.has_rotation_metadata
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {ImageSample}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj and obj["type"] == "ImageSample":
            return ImageSample(
                type=obj["type"],
                id=obj["id"],
                tags=obj["tags"],
                metadata=obj["metadata"],
                timestamp=obj["timestamp"],
                acquisition_id=obj["acquisitionId"],
                device_id=obj["deviceId"],
                sensor_id=obj["sensorId"],
                media_type=obj["mediaType"],
                image_source=obj["imageSource"],
                has_rotation_metadata=obj["hasRotationMetadata"]
            )

        return obj

Ancestors

  • dempy._base.Entity

Static methods

def from_json(obj: Dict[str, str]) -> Any

Parse a JSON dictionary to {ImageSample}

Arguments

obj {Dict[str, str]} – JSON object

Returns

Any -- parsed object and sub-objects
 
Expand source code
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
    """Parse a JSON dictionary to {ImageSample}

    Arguments:
        obj {Dict[str, str]} -- JSON object

    Returns:
        Any -- parsed object and sub-objects
    """
    if "type" in obj and obj["type"] == "ImageSample":
        return ImageSample(
            type=obj["type"],
            id=obj["id"],
            tags=obj["tags"],
            metadata=obj["metadata"],
            timestamp=obj["timestamp"],
            acquisition_id=obj["acquisitionId"],
            device_id=obj["deviceId"],
            sensor_id=obj["sensorId"],
            media_type=obj["mediaType"],
            image_source=obj["imageSource"],
            has_rotation_metadata=obj["hasRotationMetadata"]
        )

    return obj
def from_protobuf(obj: Union[ByteString, dempy_pb2.ImageSample]) -> ImageSample

Decode a Protobuf message to {ImageSample}

Arguments

obj {Union[ByteString, ImageMessage]} – message to be decoded

Returns

ImageSample -- decoded image sample
 
Expand source code
@staticmethod
def from_protobuf(obj: Union[ByteString, ImageMessage]) -> "ImageSample":
    """Decode a Protobuf message to {ImageSample}

    Arguments:
        obj {Union[ByteString, ImageMessage]} -- message to be decoded

    Returns:
        ImageSample -- decoded image sample
    """
    image_message = obj if isinstance(obj, ImageMessage) else ImageMessage().ParseFromString(obj)

    return ImageSample(
        type=image_message.entity.type,
        id=image_message.entity.id,
        tags=image_message.entity.tags,
        metadata=image_message.entity.metadata,
        timestamp=image_message.timestamp,
        acquisition_id=image_message.acquisition_id,
        device_id=image_message.device_id if image_message.HasField("device_id") else None,
        sensor_id=image_message.sensor_id if image_message.HasField("sensor_id") else None,
        media_type=image_message.media_type,
        image_source=image_message.image_source,
        has_rotation_metadata=image_message.has_rotation_metadata
    )
def to_protobuf(obj: ImageSample) -> dempy_pb2.ImageSample

Encode a image sample to a Protobuf message

Arguments

obj {ImageSample} – image sample to be encoded

Returns

ImageMessage -- encoded image sample
 
Expand source code
@staticmethod
def to_protobuf(obj: "ImageSample") -> ImageMessage:
    """Encode a image sample to a Protobuf message

    Arguments:
        obj {ImageSample} -- image sample to be encoded

    Returns:
        ImageMessage -- encoded image sample
    """
    image_message = ImageMessage()
    image_message.entity.CopyFrom(Entity.to_protobuf(obj))

    image_message.timestamp = obj.timestamp
    image_message.acquisition_id = obj.acquisition_id

    if obj.device_id is not None:
        image_message.device_id = obj.device_id
    if obj.sensor_id is not None:
        image_message.sensor_id = obj.sensor_id

    image_message.media_type = obj.media_type
    image_message.image_source = obj.image_source
    image_message.has_rotation_metadata = obj.has_rotation_metadata

    return image_message
class Sensor (type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str, manufacturer: str, model_name: str, sensor_type: str)

Sensor class

Expand source code
class Sensor(Entity):
    """Sensor class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], sync_offset: int, time_unit: str, serial_number: str,
                 manufacturer: str, model_name: str, sensor_type: str):
        super().__init__(type, id, tags, metadata)
        self.sync_offset = sync_offset
        self.time_unit = time_unit
        self.serial_number = serial_number
        self.manufacturer = manufacturer
        self.model_name = model_name
        self.sensor_type = sensor_type

    @staticmethod
    def to_protobuf(obj: "Sensor") -> SensorMessage:
        """Encode an sensor to a Protobuf message

        Arguments:
            obj {Sensor} -- sensor to be encoded

        Returns:
            SensorMessage -- encoded sensor
        """
        sensor_message = SensorMessage()
        sensor_message.entity.CopyFrom(Entity.to_protobuf(obj))

        if obj.sync_offset is not None:
            sensor_message.sync_offset = obj.sync_offset
        if obj.time_unit is not None:
            sensor_message.time_unit = obj.time_unit
        if obj.serial_number is not None:
            sensor_message.serial_number = obj.serial_number
        if obj.manufacturer is not None:
            sensor_message.manufacturer = obj.manufacturer
        if obj.model_name is not None:
            sensor_message.model_name = obj.model_name
        if obj.sensor_type is not None:
            sensor_message.sensor_type = obj.sensor_type

        return sensor_message

    @staticmethod
    def from_protobuf(sensor_message: SensorMessage) -> "Sensor":
        """Decode a Protobuf message to {Sensor}

        Arguments:
            obj {SensorMessage} -- message to be decoded

        Returns:
            Sensor -- decoded sensor
        """
        return Sensor(
            type=sensor_message.entity.type,
            id=sensor_message.entity.id,
            tags=sensor_message.entity.tags,
            metadata=sensor_message.entity.metadata,
            sync_offset=sensor_message.sync_offset if sensor_message.HasField("sync_offset") else None,
            time_unit=sensor_message.time_unit if sensor_message.HasField("time_unit") else None,
            serial_number=sensor_message.serial_number if sensor_message.HasField("serial_number") else None,
            manufacturer=sensor_message.manufacturer if sensor_message.HasField("manufacturer") else None,
            model_name=sensor_message.model_name if sensor_message.HasField("model_name") else None,
            sensor_type=sensor_message.sensor_type if sensor_message.HasField("sensor_type") else None,
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {Sensor}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj and obj["type"] == "Sensor":
            return Sensor(
                type=obj["type"],
                id=obj["id"],
                tags=obj["tags"],
                metadata=obj["metadata"],
                sync_offset=obj["syncOffset"],
                time_unit=obj["timeUnit"],
                serial_number=obj["serialNumber"],
                manufacturer=obj["manufacturer"],
                model_name=obj["modelName"],
                sensor_type=obj["sensorType"],
            )

        return obj

Ancestors

  • dempy._base.Entity

Static methods

def from_json(obj: Dict[str, str]) -> Any

Parse a JSON dictionary to {Sensor}

Arguments

obj {Dict[str, str]} – JSON object

Returns

Any -- parsed object and sub-objects
 
Expand source code
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
    """Parse a JSON dictionary to {Sensor}

    Arguments:
        obj {Dict[str, str]} -- JSON object

    Returns:
        Any -- parsed object and sub-objects
    """
    if "type" in obj and obj["type"] == "Sensor":
        return Sensor(
            type=obj["type"],
            id=obj["id"],
            tags=obj["tags"],
            metadata=obj["metadata"],
            sync_offset=obj["syncOffset"],
            time_unit=obj["timeUnit"],
            serial_number=obj["serialNumber"],
            manufacturer=obj["manufacturer"],
            model_name=obj["modelName"],
            sensor_type=obj["sensorType"],
        )

    return obj
def from_protobuf(sensor_message: dempy_pb2.Sensor) -> Sensor

Decode a Protobuf message to {Sensor}

Arguments

obj {SensorMessage} – message to be decoded

Returns

Sensor -- decoded sensor
 
Expand source code
@staticmethod
def from_protobuf(sensor_message: SensorMessage) -> "Sensor":
    """Decode a Protobuf message to {Sensor}

    Arguments:
        obj {SensorMessage} -- message to be decoded

    Returns:
        Sensor -- decoded sensor
    """
    return Sensor(
        type=sensor_message.entity.type,
        id=sensor_message.entity.id,
        tags=sensor_message.entity.tags,
        metadata=sensor_message.entity.metadata,
        sync_offset=sensor_message.sync_offset if sensor_message.HasField("sync_offset") else None,
        time_unit=sensor_message.time_unit if sensor_message.HasField("time_unit") else None,
        serial_number=sensor_message.serial_number if sensor_message.HasField("serial_number") else None,
        manufacturer=sensor_message.manufacturer if sensor_message.HasField("manufacturer") else None,
        model_name=sensor_message.model_name if sensor_message.HasField("model_name") else None,
        sensor_type=sensor_message.sensor_type if sensor_message.HasField("sensor_type") else None,
    )
def to_protobuf(obj: Sensor) -> dempy_pb2.Sensor

Encode an sensor to a Protobuf message

Arguments

obj {Sensor} – sensor to be encoded

Returns

SensorMessage -- encoded sensor
 
Expand source code
@staticmethod
def to_protobuf(obj: "Sensor") -> SensorMessage:
    """Encode an sensor to a Protobuf message

    Arguments:
        obj {Sensor} -- sensor to be encoded

    Returns:
        SensorMessage -- encoded sensor
    """
    sensor_message = SensorMessage()
    sensor_message.entity.CopyFrom(Entity.to_protobuf(obj))

    if obj.sync_offset is not None:
        sensor_message.sync_offset = obj.sync_offset
    if obj.time_unit is not None:
        sensor_message.time_unit = obj.time_unit
    if obj.serial_number is not None:
        sensor_message.serial_number = obj.serial_number
    if obj.manufacturer is not None:
        sensor_message.manufacturer = obj.manufacturer
    if obj.model_name is not None:
        sensor_message.model_name = obj.model_name
    if obj.sensor_type is not None:
        sensor_message.sensor_type = obj.sensor_type

    return sensor_message
class Subject (type: str, id: str, tags: List[str], metadata: Dict[str, str], birthdate_timestamp: int, description: str, first_name: str, last_name: str)

Subject class

Expand source code
class Subject(Entity):
    """Subject class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], birthdate_timestamp: int, description: str,
                 first_name: str, last_name: str):
        super().__init__(type, id, tags, metadata)
        self.birthdate_timestamp = birthdate_timestamp
        self.description = description
        self.first_name = first_name
        self.last_name = last_name

    @staticmethod
    def to_protobuf(obj: "Subject") -> SubjectMessage:
        """Encode an subject to a Protobuf message

        Arguments:
            obj {Subject} -- subject to be encoded

        Returns:
            SubjectMessage -- encoded subject
        """
        subject_message = SubjectMessage()
        subject_message.entity.CopyFrom(Entity.to_protobuf(obj))

        subject_message.birthdate_timestamp = obj.birthdate_timestamp

        if obj.description is not None:
            subject_message.description = obj.description
        if obj.first_name is not None:
            subject_message.first_name = obj.first_name
        if obj.last_name is not None:
            subject_message.last_name = obj.last_name

        return subject_message

    @staticmethod
    def from_protobuf(subject_message: SubjectMessage) -> "Subject":
        """Decode a Protobuf message to {Subject}

        Arguments:
            obj {SubjectMessage} -- message to be decoded

        Returns:
            Subject -- decoded subject
        """
        return Subject(
            type=subject_message.entity.type,
            id=subject_message.entity.id,
            tags=subject_message.entity.tags,
            metadata=subject_message.entity.metadata,
            birthdate_timestamp=subject_message.birthdate_timestamp if subject_message.HasField("birthdate_timestamp") else None,
            description=subject_message.description if subject_message.HasField("description") else None,
            first_name=subject_message.first_name if subject_message.HasField("first_name") else None,
            last_name=subject_message.last_name if subject_message.HasField("last_name") else None,
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {Subject}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj and obj["type"].endswith("Subject"):
            return Subject(
                type=obj["type"],
                id=obj["id"],
                metadata=obj["metadata"],
                tags=obj["tags"],
                birthdate_timestamp=obj["birthdateTimestamp"],
                description=obj["description"],
                first_name=obj["firstName"],
                last_name=obj["lastName"]
            )

        return obj

Ancestors

  • dempy._base.Entity

Static methods

def from_json(obj: Dict[str, str]) -> Any

Parse a JSON dictionary to {Subject}

Arguments

obj {Dict[str, str]} – JSON object

Returns

Any -- parsed object and sub-objects
 
Expand source code
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
    """Parse a JSON dictionary to {Subject}

    Arguments:
        obj {Dict[str, str]} -- JSON object

    Returns:
        Any -- parsed object and sub-objects
    """
    if "type" in obj and obj["type"].endswith("Subject"):
        return Subject(
            type=obj["type"],
            id=obj["id"],
            metadata=obj["metadata"],
            tags=obj["tags"],
            birthdate_timestamp=obj["birthdateTimestamp"],
            description=obj["description"],
            first_name=obj["firstName"],
            last_name=obj["lastName"]
        )

    return obj
def from_protobuf(subject_message: dempy_pb2.Subject) -> Subject

Decode a Protobuf message to {Subject}

Arguments

obj {SubjectMessage} – message to be decoded

Returns

Subject -- decoded subject
 
Expand source code
@staticmethod
def from_protobuf(subject_message: SubjectMessage) -> "Subject":
    """Decode a Protobuf message to {Subject}

    Arguments:
        obj {SubjectMessage} -- message to be decoded

    Returns:
        Subject -- decoded subject
    """
    return Subject(
        type=subject_message.entity.type,
        id=subject_message.entity.id,
        tags=subject_message.entity.tags,
        metadata=subject_message.entity.metadata,
        birthdate_timestamp=subject_message.birthdate_timestamp if subject_message.HasField("birthdate_timestamp") else None,
        description=subject_message.description if subject_message.HasField("description") else None,
        first_name=subject_message.first_name if subject_message.HasField("first_name") else None,
        last_name=subject_message.last_name if subject_message.HasField("last_name") else None,
    )
def to_protobuf(obj: Subject) -> dempy_pb2.Subject

Encode an subject to a Protobuf message

Arguments

obj {Subject} – subject to be encoded

Returns

SubjectMessage -- encoded subject
 
Expand source code
@staticmethod
def to_protobuf(obj: "Subject") -> SubjectMessage:
    """Encode an subject to a Protobuf message

    Arguments:
        obj {Subject} -- subject to be encoded

    Returns:
        SubjectMessage -- encoded subject
    """
    subject_message = SubjectMessage()
    subject_message.entity.CopyFrom(Entity.to_protobuf(obj))

    subject_message.birthdate_timestamp = obj.birthdate_timestamp

    if obj.description is not None:
        subject_message.description = obj.description
    if obj.first_name is not None:
        subject_message.first_name = obj.first_name
    if obj.last_name is not None:
        subject_message.last_name = obj.last_name

    return subject_message
class TimeseriesSample (type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str, sensor_id: str, **kwargs)

TimeseriesSample class

Expand source code
class TimeseriesSample(Entity):
    """TimeseriesSample class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str,
                 sensor_id: str, **kwargs):
        super().__init__(type, id, tags, metadata)
        self.timestamp = timestamp
        self.acquisition_id = acquisition_id
        self.device_id = device_id
        self.sensor_id = sensor_id

        if self.type == "UniaxialSample":
            self.x: float = kwargs.get("x")
        elif self.type == "BiaxialSample":
            self.x: float = kwargs.get("x")
            self.y: float = kwargs.get("y")
        elif self.type == "TriaxialSample":
            self.x: float = kwargs.get("x")
            self.y: float = kwargs.get("y")
            self.z: float = kwargs.get("z")
        elif self.type == "QuadriaxialSample":
            self.x: float = kwargs.get("x")
            self.y: float = kwargs.get("y")
            self.z: float = kwargs.get("z")
            self.u: float = kwargs.get("u")
        elif self.type == "QuinqueaxialSample":
            self.x: float = kwargs.get("x")
            self.y: float = kwargs.get("y")
            self.z: float = kwargs.get("z")
            self.u: float = kwargs.get("u")
            self.w: float = kwargs.get("w")
        else:
            raise ValueError

    @staticmethod
    def to_protobuf(obj: "TimeseriesSample") -> TimeseriesMessage:
        """Encode a timeseries sample to a Protobuf message

        Arguments:
            obj {TimeseriesSample} -- timeseries sample to be encoded

        Returns:
            TimeseriesMessage -- encoded timeseries sample
        """
        timeseries_message = TimeseriesMessage()
        timeseries_message.entity.CopyFrom(Entity.to_protobuf(obj))

        timeseries_message.timestamp = obj.timestamp
        timeseries_message.acquisition_id = obj.acquisition_id

        if obj.device_id is not None:
            timeseries_message.device_id = obj.device_id
        if obj.sensor_id is not None:
            timeseries_message.sensor_id = obj.sensor_id

        if obj.type == "UniaxialSample":
            timeseries_message.x = obj.x
        elif obj.type == "BiaxialSample":
            timeseries_message.x = obj.x
            timeseries_message.y = obj.y
        elif obj.type == "TriaxialSample":
            timeseries_message.x = obj.x
            timeseries_message.y = obj.y
            timeseries_message.z = obj.z
        elif obj.type == "QuadriaxialSample":
            timeseries_message.x = obj.x
            timeseries_message.y = obj.y
            timeseries_message.z = obj.z
            timeseries_message.u = obj.u
        elif obj.type == "QuinqueaxialSample":
            timeseries_message.x = obj.x
            timeseries_message.y = obj.y
            timeseries_message.z = obj.z
            timeseries_message.u = obj.u
            timeseries_message.w = obj.w
        else:
            raise ValueError

        return timeseries_message

    @staticmethod
    def from_protobuf(timeseries_message: TimeseriesMessage) -> "TimeseriesSample":
        """Decode a Protobuf message to {TimeseriesSample}

        Arguments:
            obj {TimeseriesMessage} -- message to be decoded

        Returns:
            TimeseriesSample -- decoded timeseries sample
        """
        return TimeseriesSample(
            type=timeseries_message.entity.type,
            id=timeseries_message.entity.id,
            tags=timeseries_message.entity.tags,
            metadata=timeseries_message.entity.metadata,
            timestamp=timeseries_message.timestamp,
            acquisition_id=timeseries_message.acquisition_id,
            device_id=timeseries_message.device_id if timeseries_message.HasField("device_id") else None,
            sensor_id=timeseries_message.sensor_id if timeseries_message.HasField("sensor_id") else None,
            x=timeseries_message.x if timeseries_message.HasField("x") else None,
            y=timeseries_message.y if timeseries_message.HasField("y") else None,
            z=timeseries_message.z if timeseries_message.HasField("z") else None,
            u=timeseries_message.u if timeseries_message.HasField("u") else None,
            w=timeseries_message.w if timeseries_message.HasField("w") else None
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {TimeseriesSample}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Raises:
            ValueError: unexpected object or sub-object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj and obj["type"].endswith("axialSample"):
            timeseries = partial(
                TimeseriesSample,
                type=obj["type"],
                id=obj["id"],
                tags=obj["tags"],
                metadata=obj["metadata"],
                timestamp=obj["timestamp"],
                acquisition_id=obj["acquisitionId"],
                device_id=obj["deviceId"],
                sensor_id=obj["sensorId"],
            )

            if obj["type"] == "UniaxialSample":
                return timeseries(x=obj["x"])
            elif obj["type"] == "BiaxialSample":
                return timeseries(x=obj["x"], y=obj["y"])
            elif obj["type"] == "TriaxialSample":
                return timeseries(x=obj["x"], y=obj["y"], z=obj["z"])
            elif obj["type"] == "QuadriaxialSample":
                return timeseries(x=obj["x"], y=obj["y"], z=obj["z"], u=obj["u"])
            elif obj["type"] == "QuinqueaxialSample":
                return timeseries(x=obj["x"], y=obj["y"], z=obj["z"], u=obj["u"], w=obj["w"])
            else:
                raise ValueError

        return obj

Ancestors

  • dempy._base.Entity

Static methods

def from_json(obj: Dict[str, str]) -> Any

Parse a JSON dictionary to {TimeseriesSample}

Arguments

obj {Dict[str, str]} – JSON object

Raises

ValueError
unexpected object or sub-object

Returns

Any -- parsed object and sub-objects
 
Expand source code
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
    """Parse a JSON dictionary to {TimeseriesSample}

    Arguments:
        obj {Dict[str, str]} -- JSON object

    Raises:
        ValueError: unexpected object or sub-object

    Returns:
        Any -- parsed object and sub-objects
    """
    if "type" in obj and obj["type"].endswith("axialSample"):
        timeseries = partial(
            TimeseriesSample,
            type=obj["type"],
            id=obj["id"],
            tags=obj["tags"],
            metadata=obj["metadata"],
            timestamp=obj["timestamp"],
            acquisition_id=obj["acquisitionId"],
            device_id=obj["deviceId"],
            sensor_id=obj["sensorId"],
        )

        if obj["type"] == "UniaxialSample":
            return timeseries(x=obj["x"])
        elif obj["type"] == "BiaxialSample":
            return timeseries(x=obj["x"], y=obj["y"])
        elif obj["type"] == "TriaxialSample":
            return timeseries(x=obj["x"], y=obj["y"], z=obj["z"])
        elif obj["type"] == "QuadriaxialSample":
            return timeseries(x=obj["x"], y=obj["y"], z=obj["z"], u=obj["u"])
        elif obj["type"] == "QuinqueaxialSample":
            return timeseries(x=obj["x"], y=obj["y"], z=obj["z"], u=obj["u"], w=obj["w"])
        else:
            raise ValueError

    return obj
def from_protobuf(timeseries_message: dempy_pb2.TimeseriesSample) -> TimeseriesSample

Decode a Protobuf message to {TimeseriesSample}

Arguments

obj {TimeseriesMessage} – message to be decoded

Returns

TimeseriesSample -- decoded timeseries sample
 
Expand source code
@staticmethod
def from_protobuf(timeseries_message: TimeseriesMessage) -> "TimeseriesSample":
    """Decode a Protobuf message to {TimeseriesSample}

    Arguments:
        obj {TimeseriesMessage} -- message to be decoded

    Returns:
        TimeseriesSample -- decoded timeseries sample
    """
    return TimeseriesSample(
        type=timeseries_message.entity.type,
        id=timeseries_message.entity.id,
        tags=timeseries_message.entity.tags,
        metadata=timeseries_message.entity.metadata,
        timestamp=timeseries_message.timestamp,
        acquisition_id=timeseries_message.acquisition_id,
        device_id=timeseries_message.device_id if timeseries_message.HasField("device_id") else None,
        sensor_id=timeseries_message.sensor_id if timeseries_message.HasField("sensor_id") else None,
        x=timeseries_message.x if timeseries_message.HasField("x") else None,
        y=timeseries_message.y if timeseries_message.HasField("y") else None,
        z=timeseries_message.z if timeseries_message.HasField("z") else None,
        u=timeseries_message.u if timeseries_message.HasField("u") else None,
        w=timeseries_message.w if timeseries_message.HasField("w") else None
    )
def to_protobuf(obj: TimeseriesSample) -> dempy_pb2.TimeseriesSample

Encode a timeseries sample to a Protobuf message

Arguments

obj {TimeseriesSample} – timeseries sample to be encoded

Returns

TimeseriesMessage -- encoded timeseries sample
 
Expand source code
@staticmethod
def to_protobuf(obj: "TimeseriesSample") -> TimeseriesMessage:
    """Encode a timeseries sample to a Protobuf message

    Arguments:
        obj {TimeseriesSample} -- timeseries sample to be encoded

    Returns:
        TimeseriesMessage -- encoded timeseries sample
    """
    timeseries_message = TimeseriesMessage()
    timeseries_message.entity.CopyFrom(Entity.to_protobuf(obj))

    timeseries_message.timestamp = obj.timestamp
    timeseries_message.acquisition_id = obj.acquisition_id

    if obj.device_id is not None:
        timeseries_message.device_id = obj.device_id
    if obj.sensor_id is not None:
        timeseries_message.sensor_id = obj.sensor_id

    if obj.type == "UniaxialSample":
        timeseries_message.x = obj.x
    elif obj.type == "BiaxialSample":
        timeseries_message.x = obj.x
        timeseries_message.y = obj.y
    elif obj.type == "TriaxialSample":
        timeseries_message.x = obj.x
        timeseries_message.y = obj.y
        timeseries_message.z = obj.z
    elif obj.type == "QuadriaxialSample":
        timeseries_message.x = obj.x
        timeseries_message.y = obj.y
        timeseries_message.z = obj.z
        timeseries_message.u = obj.u
    elif obj.type == "QuinqueaxialSample":
        timeseries_message.x = obj.x
        timeseries_message.y = obj.y
        timeseries_message.z = obj.z
        timeseries_message.u = obj.u
        timeseries_message.w = obj.w
    else:
        raise ValueError

    return timeseries_message
class VideoSample (type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str, sensor_id: str, media_type: str, video_source: str)

VideoSample class

Expand source code
class VideoSample(Entity):
    """VideoSample class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], timestamp: int, acquisition_id: str, device_id: str,
                 sensor_id: str, media_type: str, video_source: str):
        super().__init__(type, id, tags, metadata)
        self.timestamp = timestamp
        self.acquisition_id = acquisition_id
        self.device_id = device_id
        self.sensor_id = sensor_id
        self.media_type = media_type
        self.video_source = video_source

    @staticmethod
    def to_protobuf(obj: "VideoSample") -> VideoMessage:
        """Encode a video sample to a Protobuf message

        Arguments:
            obj {VideoSample} -- video sample to be encoded

        Returns:
            VideoMessage -- encoded video sample
        """
        video_message = VideoMessage()
        video_message.entity.CopyFrom(Entity.to_protobuf(obj))

        video_message.timestamp = obj.timestamp
        video_message.acquisition_id = obj.acquisition_id

        if obj.device_id is not None:
            video_message.device_id = obj.device_id
        if obj.sensor_id is not None:
            video_message.sensor_id = obj.sensor_id

        video_message.media_type = obj.media_type
        video_message.video_source = obj.video_source

        return video_message

    @staticmethod
    def from_protobuf(obj: Union[ByteString, VideoMessage]) -> "VideoSample":
        """Decode a Protobuf message to {VideoSample}

        Arguments:
            obj {Union[ByteString, VideoMessage]} -- message to be decoded

        Returns:
            VideoSample -- decoded video sample
        """
        video_message = obj if isinstance(obj, VideoMessage) else VideoMessage().ParseFromString(obj)

        return VideoSample(
            type=video_message.entity.type,
            id=video_message.entity.id,
            tags=video_message.entity.tags,
            metadata=video_message.entity.metadata,
            timestamp=video_message.timestamp,
            acquisition_id=video_message.acquisition_id,
            device_id=video_message.device_id if video_message.HasField("device_id") else None,
            sensor_id=video_message.sensor_id if video_message.HasField("sensor_id") else None,
            media_type=video_message.media_type,
            video_source=video_message.video_source
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {VideoSample}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj and obj["type"] == "VideoSample":
            return VideoSample(
                type=obj["type"],
                id=obj["id"],
                tags=obj["tags"],
                metadata=obj["metadata"],
                timestamp=obj["timestamp"],
                acquisition_id=obj["acquisitionId"],
                device_id=obj["deviceId"],
                sensor_id=obj["sensorId"],
                media_type=obj["mediaType"],
                video_source=obj["videoSource"],
            )

        return obj

Ancestors

  • dempy._base.Entity

Static methods

def from_json(obj: Dict[str, str]) -> Any

Parse a JSON dictionary to {VideoSample}

Arguments

obj {Dict[str, str]} – JSON object

Returns

Any -- parsed object and sub-objects
 
Expand source code
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
    """Parse a JSON dictionary to {VideoSample}

    Arguments:
        obj {Dict[str, str]} -- JSON object

    Returns:
        Any -- parsed object and sub-objects
    """
    if "type" in obj and obj["type"] == "VideoSample":
        return VideoSample(
            type=obj["type"],
            id=obj["id"],
            tags=obj["tags"],
            metadata=obj["metadata"],
            timestamp=obj["timestamp"],
            acquisition_id=obj["acquisitionId"],
            device_id=obj["deviceId"],
            sensor_id=obj["sensorId"],
            media_type=obj["mediaType"],
            video_source=obj["videoSource"],
        )

    return obj
def from_protobuf(obj: Union[ByteString, dempy_pb2.VideoSample]) -> VideoSample

Decode a Protobuf message to {VideoSample}

Arguments

obj {Union[ByteString, VideoMessage]} – message to be decoded

Returns

VideoSample -- decoded video sample
 
Expand source code
@staticmethod
def from_protobuf(obj: Union[ByteString, VideoMessage]) -> "VideoSample":
    """Decode a Protobuf message to {VideoSample}

    Arguments:
        obj {Union[ByteString, VideoMessage]} -- message to be decoded

    Returns:
        VideoSample -- decoded video sample
    """
    video_message = obj if isinstance(obj, VideoMessage) else VideoMessage().ParseFromString(obj)

    return VideoSample(
        type=video_message.entity.type,
        id=video_message.entity.id,
        tags=video_message.entity.tags,
        metadata=video_message.entity.metadata,
        timestamp=video_message.timestamp,
        acquisition_id=video_message.acquisition_id,
        device_id=video_message.device_id if video_message.HasField("device_id") else None,
        sensor_id=video_message.sensor_id if video_message.HasField("sensor_id") else None,
        media_type=video_message.media_type,
        video_source=video_message.video_source
    )
def to_protobuf(obj: VideoSample) -> dempy_pb2.VideoSample

Encode a video sample to a Protobuf message

Arguments

obj {VideoSample} – video sample to be encoded

Returns

VideoMessage -- encoded video sample
 
Expand source code
@staticmethod
def to_protobuf(obj: "VideoSample") -> VideoMessage:
    """Encode a video sample to a Protobuf message

    Arguments:
        obj {VideoSample} -- video sample to be encoded

    Returns:
        VideoMessage -- encoded video sample
    """
    video_message = VideoMessage()
    video_message.entity.CopyFrom(Entity.to_protobuf(obj))

    video_message.timestamp = obj.timestamp
    video_message.acquisition_id = obj.acquisition_id

    if obj.device_id is not None:
        video_message.device_id = obj.device_id
    if obj.sensor_id is not None:
        video_message.sensor_id = obj.sensor_id

    video_message.media_type = obj.media_type
    video_message.video_source = obj.video_source

    return video_message