Module dempy.acquisitions.acquisition

Expand source code
import os
import platform
import subprocess
from itertools import chain
from typing import Union, List, Dict, Any, Callable, ByteString

import matplotlib as mpt
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

from dempy import cache, _api_calls
from dempy._base import Entity
from dempy._protofiles import AcquisitionMessage
from dempy.acquisitions._utils import SampleList, AnnotationList
from dempy.acquisitions.annotation import Annotation
from dempy.acquisitions.device import Device
from dempy.acquisitions.image_sample import ImageSample
from dempy.acquisitions.sensor import Sensor
from dempy.acquisitions.subject import Subject
from dempy.acquisitions.timeseries_sample import TimeseriesSample
from dempy.acquisitions.video_sample import VideoSample

mpt.use("TkAgg")


class Acquisition(Entity):
    """Acquisition class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], creation_timestamp: int, sync_offset: int,
                 time_unit: str, owner_id: str, creator_id: str, dataset_id: str, subject: Subject, devices: List[Device],
                 has_timeseries_samples: bool, has_image_samples: bool, has_video_samples: bool):
        super().__init__(type, id, tags, metadata)
        self.creation_timestamp = creation_timestamp
        self.sync_offset = sync_offset
        self.time_unit = time_unit
        self.owner_id = owner_id
        self.creator_id = creator_id
        self.dataset_id = dataset_id
        self.has_timeseries_samples = has_timeseries_samples
        self.has_image_samples = has_image_samples
        self.has_video_samples = has_video_samples
        self._subject = subject
        self._devices = devices

    @property
    def subject(self):    
        """Subject's API"""
        class Inner:
            _SUBJECT_ENDPOINT = _ENDPOINT + "{}/subjects/".format(self.id)

            @staticmethod
            def get() -> Subject:
                """Get subject from acquisition

                Returns:
                    Subject -- subject of the acquisition
                """
                return self._subject

        return Inner()

    @property
    def devices(self):
        """Devices' API"""
        class Inner:
            _DEVICES_ENDPOINT = _ENDPOINT + "{}/devices/".format(self.id)

            @staticmethod
            def get(device_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Device, List[Device]]:
                """Get a device identified by `device_id` or list of devices on this acquisition

                Keyword Arguments:
                    device_id {str} -- id of the device (default: {None})
                    tags {List[str]} -- tags of the devices (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the devices (default: {{}})

                Raises:
                    IndexError: device identified by `device_id` does not exist in this acquisition

                Returns:
                    Union[Device, List[Device]] -- device or list of devices
                """
                if device_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        return [d for d in self._devices if
                                len([k for k in d.metadata if k in metadata and d.metadata[k] == metadata[k]]) > 0]

                    return self._devices
                else:
                    try:
                        device = next((device for device in self._devices if device.id == device_id))
                    except StopIteration:
                        raise IndexError(f"device id {device_id} does not exist in acquisition id {self.id}")
                    return device

            @staticmethod
            def usage() -> Dict[str, List[str]]:
                """Get a map identifying which device(s) and sensor(s) were used to acquire time series samples

                Returns:
                    Mapping[str, List[str]] -- map of (key, value) pairs, with key being id of device and the value
                    a list of sensor ids which were used to capture samples
                """
                return _api_calls.get(Inner._DEVICES_ENDPOINT + "usage").json()

            @staticmethod
            def count() -> int:
                """Get the number of devices on this acquisition

                Returns:
                    int -- number of devices
                """
                return len(self._devices)

        return Inner()

    @property
    def timeseries_samples(self):
        """Timeseries samples' API"""
        class Inner:
            _TIMESERIES_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/timeseries/".format(self.id)

            @staticmethod
            def get(tags: List[str] = [], metadata: Dict[str, str] = {}) -> SampleList:
                """Get all the timeseries samples that belong to this acquisition

                Keyword Arguments:
                    tags {List[str]} -- tags of the timeseries samples (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the timeseries samples (default: {{}})

                Returns:
                    SampleList -- list of timeseries samples
                """
                if len(tags) > 0 or len(metadata) > 0:
                    processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                    samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                        .json(object_hook=TimeseriesSample.from_json)
                    samples.sort(key=lambda sample: sample.timestamp)
                    return SampleList(samples)

                try:
                    samples = cache._get_cached_data("samples/{}/".format(self.id), "timeseries", SampleList.from_protobuf)
                except FileNotFoundError:
                    samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT) \
                        .json(object_hook=TimeseriesSample.from_json)
                    samples.sort(key=lambda sample: sample.timestamp)
                    samples = SampleList(samples)
                    cache._cache_data("samples/{}/".format(self.id), "timeseries", samples, SampleList.to_protobuf)
                return samples

            @staticmethod
            def count() -> int:
                """Get the number of timeseries samples on this acquisition

                Returns:
                    int -- number of timeseries samples
                """
                return _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT + "count").json()

            @staticmethod
            def visualize(device_id: str, sensor_id: str = None) -> None:
                """Graphically visualize the timeseries samples of a device identified by `device_id` 
                or of a given sensor identified by `sensor_id` of said device

                Arguments:
                    device_id {str} -- id of the device

                Keyword Arguments:
                    sensor_id {str} -- id of the sensor (default: {None})
                """
                def visualize_sensor_samples(axis, sensor, sensor_samples):
                    timestamps = [s.timestamp for s in sensor_samples]

                    # Sample x, y, z, u, w
                    samples_x = [s.x for s in sensor_samples if hasattr(s, "x")]
                    samples_y = [s.y for s in sensor_samples if hasattr(s, "y")]
                    samples_z = [s.z for s in sensor_samples if hasattr(s, "z")]
                    samples_u = [s.u for s in sensor_samples if hasattr(s, "u")]
                    samples_w = [s.w for s in sensor_samples if hasattr(s, "w")]

                    # Title and x label
                    axis.set_title(f"{sensor.sensor_type}\n{sensor.id}", loc="left")
                    axis.set_xlabel(sensor.time_unit if sensor.time_unit is not None else device_time_unit)

                    # Axis limit
                    axis.set_xlim([0, timestamps[-1]])
                    axis.set_ylim([min(chain(samples_x, samples_y, samples_z, samples_u, samples_w)),
                                   max(chain(samples_x, samples_y, samples_z, samples_u, samples_w))])

                    # Axis formatter
                    axis.xaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))
                    axis.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))

                    # Axis plot
                    labels = []

                    if len(samples_x) > 0:
                        axis.plot(timestamps, samples_x, color="cornflowerblue")
                        labels.append("x")
                    if len(samples_y) > 0:
                        axis.plot(timestamps, samples_y, color="mediumseagreen")
                        labels.append("y")
                    if len(samples_z) > 0:
                        axis.plot(timestamps, samples_z, color="indianred")
                        labels.append("z")
                    if len(samples_u) > 0:
                        axis.plot(timestamps, samples_u, color="mediumorchid")
                        labels.append("u")
                    if len(samples_w) > 0:
                        axis.plot(timestamps, samples_w, color="slategray")
                        labels.append("w")

                    axis.legend(labels=labels, loc="upper right")

                device = self.devices.get(device_id=device_id)
                device_time_unit = device.time_unit if device.time_unit is not None else self.time_unit

                if sensor_id is None:
                    fig, axs = plt.subplots(nrows=device.sensors.count(), figsize=(15, 10), dpi=80, constrained_layout=True)
                    fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                    device_samples = self.timeseries_samples.get().by_device(device_id=device.id)

                    for i, sensor in enumerate(device.sensors.get()):
                        visualize_sensor_samples(axs[i], sensor, device_samples.by_sensor(sensor.id))
                else:
                    fig, ax = plt.subplots(nrows=1, figsize=(10, 4), dpi=80, constrained_layout=True)
                    fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                    sensor_samples = self.timeseries_samples.get().by_sensor(sensor_id)

                    visualize_sensor_samples(ax, device.sensors.get(sensor_id=sensor_id), sensor_samples)

                plt.show()

        return Inner()

    @property
    def image_samples(self):
        """Image samples' API"""
        class Inner:
            _IMAGE_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/images/".format(self.id)

            @staticmethod
            def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[ImageSample, SampleList]:
                """Get all the image samples that belong to this acquisition

                Keyword Arguments:
                    sample_id {str} -- id of the sample (default: {None})
                    tags {List[str]} -- tags of image samples (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

                Returns:
                    Union[ImageSample, SampleList] -- image sample or list of image samples
                """
                if sample_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                        samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                            .json(object_hook=ImageSample.from_json)
                        return SampleList(samples)

                    samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT).json(object_hook=ImageSample.from_json)
                    for sample in samples:
                        cache._cache_data("samples/{}/images/".format(self.id), sample.id, sample, ImageSample.to_protobuf)
                    return SampleList(samples)
                else:
                    try:
                        sample = cache._get_cached_data("samples/{}/images/".format(self.id), sample_id, ImageSample.from_protobuf)
                    except FileNotFoundError:
                        sample = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id).json(object_hook=ImageSample.from_json)
                        cache._cache_data("samples/{}/images/".format(self.id), sample_id, sample, ImageSample.to_protobuf)
                    return sample

            @staticmethod
            def raw(sample_id: str) -> ByteString:
                """Get actual image from image sample identified by `sample_id` on this acquisition

                Arguments:
                    sample_id {str} -- id of the sample

                Returns:
                    ByteString -- bytes of the image
                """
                try:
                    image = cache._get_cached_data("samples/{}/images/raw/".format(self.id), sample_id)
                except FileNotFoundError:
                    image = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id + "/raw")
                    file_ext = "." + image.headers["Content-Type"].split("/")[-1]
                    cache._cache_data("samples/{}/images/raw/".format(self.id), sample_id + file_ext, image.content)

                return image

            @staticmethod
            def count() -> int:
                """Get the number of image samples on this acquisition

                Returns:
                    int -- number of image samples
                """
                return _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + "count").json()

            @staticmethod
            def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
                """Visualize the image of a sample identified by `sample_id`.
                By default opens the predefined system image application.
                A different callback can be given to open the image.

                Arguments:
                    sample_id {str} -- id of the sample

                Keyword Arguments:
                    backend {Callable[[str], None]} -- backend to open the image with (default: {None})
                """
                self.image_samples.raw(sample_id)
                image_path = cache._build_cache_path("samples/{}/images/raw/".format(self.id), sample_id)
                image_path = cache._add_file_extension(image_path)

                if backend is None:
                    system = platform.system()

                    if system == "Darwin":
                        subprocess.call(("open", image_path))
                    elif system == "Windows":
                        os.startfile(image_path)
                    else:
                        subprocess.call(("xdg-open", image_path))
                else:
                    backend(image_path)

        return Inner()

    @property
    def video_samples(self):
        """Video samples' API"""
        class Inner:
            _VIDEO_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/videos/".format(self.id)

            @staticmethod
            def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[VideoSample, SampleList]:
                """Get all the video samples that belong to this acquisition

                Keyword Arguments:
                    sample_id {str} -- id of the sample (default: {None})
                    tags {List[str]} -- tags of image samples (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

                Returns:
                    Union[VideoSample, SampleList] -- video sample or list of video samples
                """
                if sample_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                        samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                            .json(object_hook=VideoSample.from_json)
                        return SampleList(samples)

                    samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT).json(object_hook=VideoSample.from_json)
                    for sample in samples:
                        cache._cache_data("samples/{}/videos/".format(self.id), sample.id, sample, VideoSample.to_protobuf)
                    return SampleList(samples)
                else:
                    try:
                        sample = cache._get_cached_data("samples/{}/videos/".format(self.id), sample_id, VideoSample.from_protobuf)
                    except FileNotFoundError:
                        sample = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id).json(object_hook=VideoSample.from_json)
                        cache._cache_data("samples/{}/videos/".format(self.id), sample_id, sample, VideoSample.to_protobuf)
                    return sample

            @staticmethod
            def raw(sample_id: str) -> ByteString:
                """Get actual video from video sample identified by `sample_id` on this acquisition 

                Arguments:
                    sample_id {str} -- id of the sample

                Returns:
                    ByteString -- bytes of the video
                """
                try:
                    video = cache._get_cached_data("samples/{}/videos/raw/".format(self.id), sample_id)
                except FileNotFoundError:
                    video = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id + "/raw")
                    file_ext = "." + video.headers["Content-Type"].split("/")[-1]
                    cache._cache_data("samples/{}/videos/raw/".format(self.id), sample_id + file_ext, video.content)

                return video

            @staticmethod
            def count() -> int:
                """Get the number of video samples on this acquisition

                Returns:
                    int -- number of video samples
                """
                return _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + "count").json()

            @staticmethod
            def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
                """Visualize the video of a sample identified by `sample_id`.
                By default opens the predefined system video application.
                A different callback can be given to open the video.

                Arguments:
                    sample_id {str} -- id of the sample

                Keyword Arguments:
                    backend {Callable[[str], None]} -- backend to open the video with (default: {None})
                """
                self.video_samples.raw(sample_id)
                video_path = cache._build_cache_path("samples/{}/videos/raw/".format(self.id), sample_id)
                video_path = cache._add_file_extension(video_path)

                if backend is None:
                    system = platform.system()

                    if system == "Darwin":
                        subprocess.call(("open", video_path))
                    elif system == "Windows":
                        os.startfile(video_path)
                    else:
                        subprocess.call(("xdg-open", video_path))
                else:
                    backend(video_path)

        return Inner()

    @property
    def annotations(self):
        """Annotations' API"""
        class Inner:
            _ANNOTATIONS_ENDPOINT = _ENDPOINT + "{}/annotations/".format(self.id)

            @staticmethod
            def get(annotation_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> AnnotationList:
                """Get all the annotations that belong to this acquisition

                Keyword Arguments:
                    annotation_id {str} -- id of the annotation (default: {None})
                    tags {List[str]} -- tags of the annotation (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the annotation (default: {{}})

                Returns:
                    AnnotationList -- annotation or annotation of video samples
                """
                if annotation_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                        annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                            .json(object_hook=Annotation.from_json)
                        return AnnotationList(annotations)

                    annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT).json(object_hook=Annotation.from_json)
                    for annotation in annotations:
                        cache._cache_data("annotations", annotation.id, annotation, Annotation.to_protobuf)
                    return AnnotationList(annotations)
                else:
                    try:
                        annotation = cache._get_cached_data("annotations", annotation_id, Annotation.from_protobuf)
                    except FileNotFoundError:
                        annotation = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + annotation_id).json(object_hook=Annotation.from_json)
                        cache._cache_data("annotations", annotation_id, annotation, Annotation.to_protobuf)
                    return annotation

            @staticmethod
            def count() -> int:
                """Get the number of annotations on this acquisition

                Returns:
                    int -- number of annotations
                """
                return _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + "count").json()

        return Inner()

    @staticmethod
    def to_protobuf(obj: "Acquisition") -> AcquisitionMessage:
        """Encode an acquisition to a Protobuf message

        Arguments:
            obj {Acquisition} -- acquisition to be encoded

        Returns:
            AcquisitionMessage -- encoded acquisition
        """
        acquisition_message = AcquisitionMessage()
        acquisition_message.entity.CopyFrom(Entity.to_protobuf(obj))
        acquisition_message.creation_timestamp = obj.creation_timestamp

        if obj.sync_offset is not None:
            acquisition_message.sync_offset = obj.sync_offset
        if obj.time_unit is not None:
            acquisition_message.time_unit = obj.time_unit
        if obj.owner_id is not None:
            acquisition_message.owner_id = obj.owner_id
        if obj.creator_id is not None:
            acquisition_message.creator_id = obj.creator_id
        if obj.dataset_id is not None:
            acquisition_message.dataset_id = obj.dataset_id

        acquisition_message.subject.CopyFrom(Subject.to_protobuf(obj._subject))
        acquisition_message.devices.extend([Device.to_protobuf(d) for d in obj._devices])
        acquisition_message.has_timeseries_samples = obj.has_timeseries_samples
        acquisition_message.has_image_samples = obj.has_image_samples
        acquisition_message.has_video_samples = obj.has_video_samples

        return acquisition_message

    @staticmethod
    def from_protobuf(obj: ByteString) -> "Acquisition":
        """Decode a Protobuf message to {Acquisition}

        Arguments:
            obj {ByteString} -- message to be decoded

        Returns:
            Acquisition -- decoded acquisition
        """
        acquisition_message = AcquisitionMessage()
        acquisition_message.ParseFromString(obj)

        return Acquisition(
            type=acquisition_message.entity.type,
            id=acquisition_message.entity.id,
            tags=acquisition_message.entity.tags,
            metadata=acquisition_message.entity.metadata,
            creation_timestamp=acquisition_message.creation_timestamp,
            sync_offset=acquisition_message.sync_offset if acquisition_message.HasField("sync_offset") else None,
            time_unit=acquisition_message.time_unit,
            owner_id=acquisition_message.owner_id if acquisition_message.HasField("owner_id") else None,
            creator_id=acquisition_message.creator_id if acquisition_message.HasField("creator_id") else None,
            dataset_id=acquisition_message.dataset_id if acquisition_message.HasField("dataset_id") else None,
            subject=Subject.from_protobuf(acquisition_message.subject),
            devices=[Device.from_protobuf(d) for d in acquisition_message.devices],
            has_timeseries_samples=acquisition_message.has_timeseries_samples,
            has_image_samples=acquisition_message.has_image_samples,
            has_video_samples=acquisition_message.has_video_samples
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {Acquisition}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Raises:
            ValueError: unexpected object or sub-object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj:
            if obj["type"] == "Acquisition":
                return Acquisition(
                    type=obj["type"],
                    id=obj["id"],
                    tags=obj["tags"],
                    metadata=obj["metadata"],
                    creation_timestamp=obj["creationTimestamp"],
                    sync_offset=obj["syncOffset"],
                    time_unit=obj["timeUnit"],
                    owner_id=obj["ownerId"],
                    creator_id=obj["creatorId"],
                    dataset_id=obj["datasetId"],
                    subject=obj["subject"],
                    devices=obj["devices"],
                    has_timeseries_samples=obj["hasTimeSeriesSamples"],
                    has_image_samples=obj["hasImageSamples"],
                    has_video_samples=obj["hasVideoSamples"]
                )
            elif obj["type"].endswith("Subject"):
                return Subject.from_json(obj)
            elif obj["type"] == "Device":
                return Device.from_json(obj)
            elif obj["type"] == "Sensor":
                return Sensor.from_json(obj)
            else:
                raise ValueError

        return obj


_ENDPOINT = "api/acquisitions/"


def get(acquisition_id: str = None, dataset_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) \
    -> Union[Acquisition, List[Acquisition]]:
    """Get an acquisition identified by `acquisition_id` or a list of all the acquisitions

    Keyword Arguments:
        acquisition_id {str} -- id of the acquisition (default: {None})
        dataset_id {str} -- id of the dataset to which the acquisitions belong to (default: {None})
        tags {List[str]} -- tags of the acquisitions (default: {[]})
        metadata {Dict[str, str]} -- metadata of the acquisitions (default: {{}})

    Returns:
        Union[Acquisition, List[Acquisition]] -- acquisition or list of acquisitions
    """
    if acquisition_id is None:
        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
        acquisitions = _api_calls.get(_ENDPOINT, params={"datasetId": dataset_id, "tags": tags, **processed_metadata}) \
            .json(object_hook=Acquisition.from_json)
        for acquisition in acquisitions:
            cache._cache_data("acquisitions", acquisition.id, acquisition, Acquisition.to_protobuf)
        return acquisitions
    else:
        try:
            acquisition = cache._get_cached_data("acquisitions", acquisition_id, Acquisition.from_protobuf)
        except FileNotFoundError:
            acquisition = _api_calls.get(_ENDPOINT + acquisition_id).json(object_hook=Acquisition.from_json)
            cache._cache_data("acquisitions", acquisition_id, acquisition, Acquisition.to_protobuf)
        return acquisition


def count() -> int:
    """Get number of acquisitions

    Returns:
        int -- number of acquisitions
    """
    return _api_calls.get(_ENDPOINT + "count").json()


__all__ = [
    "Acquisition",
    "get", "count"
]

Functions

def count() -> int

Get number of acquisitions

Returns

int -- number of acquisitions
 
Expand source code
def count() -> int:
    """Get number of acquisitions

    Returns:
        int -- number of acquisitions
    """
    return _api_calls.get(_ENDPOINT + "count").json()
def get(acquisition_id: str = None, dataset_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Acquisition, List[Acquisition]]

Get an acquisition identified by acquisition_id or a list of all the acquisitions

Keyword Arguments: acquisition_id {str} – id of the acquisition (default: {None}) dataset_id {str} – id of the dataset to which the acquisitions belong to (default: {None}) tags {List[str]} – tags of the acquisitions (default: {[]}) metadata {Dict[str, str]} – metadata of the acquisitions (default: {{}})

Returns

Union[Acquisition, List[Acquisition]] -- acquisition or list of acquisitions
 
Expand source code
def get(acquisition_id: str = None, dataset_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) \
    -> Union[Acquisition, List[Acquisition]]:
    """Get an acquisition identified by `acquisition_id` or a list of all the acquisitions

    Keyword Arguments:
        acquisition_id {str} -- id of the acquisition (default: {None})
        dataset_id {str} -- id of the dataset to which the acquisitions belong to (default: {None})
        tags {List[str]} -- tags of the acquisitions (default: {[]})
        metadata {Dict[str, str]} -- metadata of the acquisitions (default: {{}})

    Returns:
        Union[Acquisition, List[Acquisition]] -- acquisition or list of acquisitions
    """
    if acquisition_id is None:
        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
        acquisitions = _api_calls.get(_ENDPOINT, params={"datasetId": dataset_id, "tags": tags, **processed_metadata}) \
            .json(object_hook=Acquisition.from_json)
        for acquisition in acquisitions:
            cache._cache_data("acquisitions", acquisition.id, acquisition, Acquisition.to_protobuf)
        return acquisitions
    else:
        try:
            acquisition = cache._get_cached_data("acquisitions", acquisition_id, Acquisition.from_protobuf)
        except FileNotFoundError:
            acquisition = _api_calls.get(_ENDPOINT + acquisition_id).json(object_hook=Acquisition.from_json)
            cache._cache_data("acquisitions", acquisition_id, acquisition, Acquisition.to_protobuf)
        return acquisition

Classes

class Acquisition (type: str, id: str, tags: List[str], metadata: Dict[str, str], creation_timestamp: int, sync_offset: int, time_unit: str, owner_id: str, creator_id: str, dataset_id: str, subject: Subject, devices: List[Device], has_timeseries_samples: bool, has_image_samples: bool, has_video_samples: bool)

Acquisition class

Expand source code
class Acquisition(Entity):
    """Acquisition class"""
    def __init__(self, type: str, id: str, tags: List[str], metadata: Dict[str, str], creation_timestamp: int, sync_offset: int,
                 time_unit: str, owner_id: str, creator_id: str, dataset_id: str, subject: Subject, devices: List[Device],
                 has_timeseries_samples: bool, has_image_samples: bool, has_video_samples: bool):
        super().__init__(type, id, tags, metadata)
        self.creation_timestamp = creation_timestamp
        self.sync_offset = sync_offset
        self.time_unit = time_unit
        self.owner_id = owner_id
        self.creator_id = creator_id
        self.dataset_id = dataset_id
        self.has_timeseries_samples = has_timeseries_samples
        self.has_image_samples = has_image_samples
        self.has_video_samples = has_video_samples
        self._subject = subject
        self._devices = devices

    @property
    def subject(self):    
        """Subject's API"""
        class Inner:
            _SUBJECT_ENDPOINT = _ENDPOINT + "{}/subjects/".format(self.id)

            @staticmethod
            def get() -> Subject:
                """Get subject from acquisition

                Returns:
                    Subject -- subject of the acquisition
                """
                return self._subject

        return Inner()

    @property
    def devices(self):
        """Devices' API"""
        class Inner:
            _DEVICES_ENDPOINT = _ENDPOINT + "{}/devices/".format(self.id)

            @staticmethod
            def get(device_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Device, List[Device]]:
                """Get a device identified by `device_id` or list of devices on this acquisition

                Keyword Arguments:
                    device_id {str} -- id of the device (default: {None})
                    tags {List[str]} -- tags of the devices (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the devices (default: {{}})

                Raises:
                    IndexError: device identified by `device_id` does not exist in this acquisition

                Returns:
                    Union[Device, List[Device]] -- device or list of devices
                """
                if device_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        return [d for d in self._devices if
                                len([k for k in d.metadata if k in metadata and d.metadata[k] == metadata[k]]) > 0]

                    return self._devices
                else:
                    try:
                        device = next((device for device in self._devices if device.id == device_id))
                    except StopIteration:
                        raise IndexError(f"device id {device_id} does not exist in acquisition id {self.id}")
                    return device

            @staticmethod
            def usage() -> Dict[str, List[str]]:
                """Get a map identifying which device(s) and sensor(s) were used to acquire time series samples

                Returns:
                    Mapping[str, List[str]] -- map of (key, value) pairs, with key being id of device and the value
                    a list of sensor ids which were used to capture samples
                """
                return _api_calls.get(Inner._DEVICES_ENDPOINT + "usage").json()

            @staticmethod
            def count() -> int:
                """Get the number of devices on this acquisition

                Returns:
                    int -- number of devices
                """
                return len(self._devices)

        return Inner()

    @property
    def timeseries_samples(self):
        """Timeseries samples' API"""
        class Inner:
            _TIMESERIES_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/timeseries/".format(self.id)

            @staticmethod
            def get(tags: List[str] = [], metadata: Dict[str, str] = {}) -> SampleList:
                """Get all the timeseries samples that belong to this acquisition

                Keyword Arguments:
                    tags {List[str]} -- tags of the timeseries samples (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the timeseries samples (default: {{}})

                Returns:
                    SampleList -- list of timeseries samples
                """
                if len(tags) > 0 or len(metadata) > 0:
                    processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                    samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                        .json(object_hook=TimeseriesSample.from_json)
                    samples.sort(key=lambda sample: sample.timestamp)
                    return SampleList(samples)

                try:
                    samples = cache._get_cached_data("samples/{}/".format(self.id), "timeseries", SampleList.from_protobuf)
                except FileNotFoundError:
                    samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT) \
                        .json(object_hook=TimeseriesSample.from_json)
                    samples.sort(key=lambda sample: sample.timestamp)
                    samples = SampleList(samples)
                    cache._cache_data("samples/{}/".format(self.id), "timeseries", samples, SampleList.to_protobuf)
                return samples

            @staticmethod
            def count() -> int:
                """Get the number of timeseries samples on this acquisition

                Returns:
                    int -- number of timeseries samples
                """
                return _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT + "count").json()

            @staticmethod
            def visualize(device_id: str, sensor_id: str = None) -> None:
                """Graphically visualize the timeseries samples of a device identified by `device_id` 
                or of a given sensor identified by `sensor_id` of said device

                Arguments:
                    device_id {str} -- id of the device

                Keyword Arguments:
                    sensor_id {str} -- id of the sensor (default: {None})
                """
                def visualize_sensor_samples(axis, sensor, sensor_samples):
                    timestamps = [s.timestamp for s in sensor_samples]

                    # Sample x, y, z, u, w
                    samples_x = [s.x for s in sensor_samples if hasattr(s, "x")]
                    samples_y = [s.y for s in sensor_samples if hasattr(s, "y")]
                    samples_z = [s.z for s in sensor_samples if hasattr(s, "z")]
                    samples_u = [s.u for s in sensor_samples if hasattr(s, "u")]
                    samples_w = [s.w for s in sensor_samples if hasattr(s, "w")]

                    # Title and x label
                    axis.set_title(f"{sensor.sensor_type}\n{sensor.id}", loc="left")
                    axis.set_xlabel(sensor.time_unit if sensor.time_unit is not None else device_time_unit)

                    # Axis limit
                    axis.set_xlim([0, timestamps[-1]])
                    axis.set_ylim([min(chain(samples_x, samples_y, samples_z, samples_u, samples_w)),
                                   max(chain(samples_x, samples_y, samples_z, samples_u, samples_w))])

                    # Axis formatter
                    axis.xaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))
                    axis.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))

                    # Axis plot
                    labels = []

                    if len(samples_x) > 0:
                        axis.plot(timestamps, samples_x, color="cornflowerblue")
                        labels.append("x")
                    if len(samples_y) > 0:
                        axis.plot(timestamps, samples_y, color="mediumseagreen")
                        labels.append("y")
                    if len(samples_z) > 0:
                        axis.plot(timestamps, samples_z, color="indianred")
                        labels.append("z")
                    if len(samples_u) > 0:
                        axis.plot(timestamps, samples_u, color="mediumorchid")
                        labels.append("u")
                    if len(samples_w) > 0:
                        axis.plot(timestamps, samples_w, color="slategray")
                        labels.append("w")

                    axis.legend(labels=labels, loc="upper right")

                device = self.devices.get(device_id=device_id)
                device_time_unit = device.time_unit if device.time_unit is not None else self.time_unit

                if sensor_id is None:
                    fig, axs = plt.subplots(nrows=device.sensors.count(), figsize=(15, 10), dpi=80, constrained_layout=True)
                    fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                    device_samples = self.timeseries_samples.get().by_device(device_id=device.id)

                    for i, sensor in enumerate(device.sensors.get()):
                        visualize_sensor_samples(axs[i], sensor, device_samples.by_sensor(sensor.id))
                else:
                    fig, ax = plt.subplots(nrows=1, figsize=(10, 4), dpi=80, constrained_layout=True)
                    fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                    sensor_samples = self.timeseries_samples.get().by_sensor(sensor_id)

                    visualize_sensor_samples(ax, device.sensors.get(sensor_id=sensor_id), sensor_samples)

                plt.show()

        return Inner()

    @property
    def image_samples(self):
        """Image samples' API"""
        class Inner:
            _IMAGE_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/images/".format(self.id)

            @staticmethod
            def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[ImageSample, SampleList]:
                """Get all the image samples that belong to this acquisition

                Keyword Arguments:
                    sample_id {str} -- id of the sample (default: {None})
                    tags {List[str]} -- tags of image samples (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

                Returns:
                    Union[ImageSample, SampleList] -- image sample or list of image samples
                """
                if sample_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                        samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                            .json(object_hook=ImageSample.from_json)
                        return SampleList(samples)

                    samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT).json(object_hook=ImageSample.from_json)
                    for sample in samples:
                        cache._cache_data("samples/{}/images/".format(self.id), sample.id, sample, ImageSample.to_protobuf)
                    return SampleList(samples)
                else:
                    try:
                        sample = cache._get_cached_data("samples/{}/images/".format(self.id), sample_id, ImageSample.from_protobuf)
                    except FileNotFoundError:
                        sample = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id).json(object_hook=ImageSample.from_json)
                        cache._cache_data("samples/{}/images/".format(self.id), sample_id, sample, ImageSample.to_protobuf)
                    return sample

            @staticmethod
            def raw(sample_id: str) -> ByteString:
                """Get actual image from image sample identified by `sample_id` on this acquisition

                Arguments:
                    sample_id {str} -- id of the sample

                Returns:
                    ByteString -- bytes of the image
                """
                try:
                    image = cache._get_cached_data("samples/{}/images/raw/".format(self.id), sample_id)
                except FileNotFoundError:
                    image = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id + "/raw")
                    file_ext = "." + image.headers["Content-Type"].split("/")[-1]
                    cache._cache_data("samples/{}/images/raw/".format(self.id), sample_id + file_ext, image.content)

                return image

            @staticmethod
            def count() -> int:
                """Get the number of image samples on this acquisition

                Returns:
                    int -- number of image samples
                """
                return _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + "count").json()

            @staticmethod
            def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
                """Visualize the image of a sample identified by `sample_id`.
                By default opens the predefined system image application.
                A different callback can be given to open the image.

                Arguments:
                    sample_id {str} -- id of the sample

                Keyword Arguments:
                    backend {Callable[[str], None]} -- backend to open the image with (default: {None})
                """
                self.image_samples.raw(sample_id)
                image_path = cache._build_cache_path("samples/{}/images/raw/".format(self.id), sample_id)
                image_path = cache._add_file_extension(image_path)

                if backend is None:
                    system = platform.system()

                    if system == "Darwin":
                        subprocess.call(("open", image_path))
                    elif system == "Windows":
                        os.startfile(image_path)
                    else:
                        subprocess.call(("xdg-open", image_path))
                else:
                    backend(image_path)

        return Inner()

    @property
    def video_samples(self):
        """Video samples' API"""
        class Inner:
            _VIDEO_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/videos/".format(self.id)

            @staticmethod
            def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[VideoSample, SampleList]:
                """Get all the video samples that belong to this acquisition

                Keyword Arguments:
                    sample_id {str} -- id of the sample (default: {None})
                    tags {List[str]} -- tags of image samples (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

                Returns:
                    Union[VideoSample, SampleList] -- video sample or list of video samples
                """
                if sample_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                        samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                            .json(object_hook=VideoSample.from_json)
                        return SampleList(samples)

                    samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT).json(object_hook=VideoSample.from_json)
                    for sample in samples:
                        cache._cache_data("samples/{}/videos/".format(self.id), sample.id, sample, VideoSample.to_protobuf)
                    return SampleList(samples)
                else:
                    try:
                        sample = cache._get_cached_data("samples/{}/videos/".format(self.id), sample_id, VideoSample.from_protobuf)
                    except FileNotFoundError:
                        sample = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id).json(object_hook=VideoSample.from_json)
                        cache._cache_data("samples/{}/videos/".format(self.id), sample_id, sample, VideoSample.to_protobuf)
                    return sample

            @staticmethod
            def raw(sample_id: str) -> ByteString:
                """Get actual video from video sample identified by `sample_id` on this acquisition 

                Arguments:
                    sample_id {str} -- id of the sample

                Returns:
                    ByteString -- bytes of the video
                """
                try:
                    video = cache._get_cached_data("samples/{}/videos/raw/".format(self.id), sample_id)
                except FileNotFoundError:
                    video = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id + "/raw")
                    file_ext = "." + video.headers["Content-Type"].split("/")[-1]
                    cache._cache_data("samples/{}/videos/raw/".format(self.id), sample_id + file_ext, video.content)

                return video

            @staticmethod
            def count() -> int:
                """Get the number of video samples on this acquisition

                Returns:
                    int -- number of video samples
                """
                return _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + "count").json()

            @staticmethod
            def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
                """Visualize the video of a sample identified by `sample_id`.
                By default opens the predefined system video application.
                A different callback can be given to open the video.

                Arguments:
                    sample_id {str} -- id of the sample

                Keyword Arguments:
                    backend {Callable[[str], None]} -- backend to open the video with (default: {None})
                """
                self.video_samples.raw(sample_id)
                video_path = cache._build_cache_path("samples/{}/videos/raw/".format(self.id), sample_id)
                video_path = cache._add_file_extension(video_path)

                if backend is None:
                    system = platform.system()

                    if system == "Darwin":
                        subprocess.call(("open", video_path))
                    elif system == "Windows":
                        os.startfile(video_path)
                    else:
                        subprocess.call(("xdg-open", video_path))
                else:
                    backend(video_path)

        return Inner()

    @property
    def annotations(self):
        """Annotations' API"""
        class Inner:
            _ANNOTATIONS_ENDPOINT = _ENDPOINT + "{}/annotations/".format(self.id)

            @staticmethod
            def get(annotation_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> AnnotationList:
                """Get all the annotations that belong to this acquisition

                Keyword Arguments:
                    annotation_id {str} -- id of the annotation (default: {None})
                    tags {List[str]} -- tags of the annotation (default: {[]})
                    metadata {Dict[str, str]} -- metadata of the annotation (default: {{}})

                Returns:
                    AnnotationList -- annotation or annotation of video samples
                """
                if annotation_id is None:
                    if len(tags) > 0 or len(metadata) > 0:
                        processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                        annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                            .json(object_hook=Annotation.from_json)
                        return AnnotationList(annotations)

                    annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT).json(object_hook=Annotation.from_json)
                    for annotation in annotations:
                        cache._cache_data("annotations", annotation.id, annotation, Annotation.to_protobuf)
                    return AnnotationList(annotations)
                else:
                    try:
                        annotation = cache._get_cached_data("annotations", annotation_id, Annotation.from_protobuf)
                    except FileNotFoundError:
                        annotation = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + annotation_id).json(object_hook=Annotation.from_json)
                        cache._cache_data("annotations", annotation_id, annotation, Annotation.to_protobuf)
                    return annotation

            @staticmethod
            def count() -> int:
                """Get the number of annotations on this acquisition

                Returns:
                    int -- number of annotations
                """
                return _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + "count").json()

        return Inner()

    @staticmethod
    def to_protobuf(obj: "Acquisition") -> AcquisitionMessage:
        """Encode an acquisition to a Protobuf message

        Arguments:
            obj {Acquisition} -- acquisition to be encoded

        Returns:
            AcquisitionMessage -- encoded acquisition
        """
        acquisition_message = AcquisitionMessage()
        acquisition_message.entity.CopyFrom(Entity.to_protobuf(obj))
        acquisition_message.creation_timestamp = obj.creation_timestamp

        if obj.sync_offset is not None:
            acquisition_message.sync_offset = obj.sync_offset
        if obj.time_unit is not None:
            acquisition_message.time_unit = obj.time_unit
        if obj.owner_id is not None:
            acquisition_message.owner_id = obj.owner_id
        if obj.creator_id is not None:
            acquisition_message.creator_id = obj.creator_id
        if obj.dataset_id is not None:
            acquisition_message.dataset_id = obj.dataset_id

        acquisition_message.subject.CopyFrom(Subject.to_protobuf(obj._subject))
        acquisition_message.devices.extend([Device.to_protobuf(d) for d in obj._devices])
        acquisition_message.has_timeseries_samples = obj.has_timeseries_samples
        acquisition_message.has_image_samples = obj.has_image_samples
        acquisition_message.has_video_samples = obj.has_video_samples

        return acquisition_message

    @staticmethod
    def from_protobuf(obj: ByteString) -> "Acquisition":
        """Decode a Protobuf message to {Acquisition}

        Arguments:
            obj {ByteString} -- message to be decoded

        Returns:
            Acquisition -- decoded acquisition
        """
        acquisition_message = AcquisitionMessage()
        acquisition_message.ParseFromString(obj)

        return Acquisition(
            type=acquisition_message.entity.type,
            id=acquisition_message.entity.id,
            tags=acquisition_message.entity.tags,
            metadata=acquisition_message.entity.metadata,
            creation_timestamp=acquisition_message.creation_timestamp,
            sync_offset=acquisition_message.sync_offset if acquisition_message.HasField("sync_offset") else None,
            time_unit=acquisition_message.time_unit,
            owner_id=acquisition_message.owner_id if acquisition_message.HasField("owner_id") else None,
            creator_id=acquisition_message.creator_id if acquisition_message.HasField("creator_id") else None,
            dataset_id=acquisition_message.dataset_id if acquisition_message.HasField("dataset_id") else None,
            subject=Subject.from_protobuf(acquisition_message.subject),
            devices=[Device.from_protobuf(d) for d in acquisition_message.devices],
            has_timeseries_samples=acquisition_message.has_timeseries_samples,
            has_image_samples=acquisition_message.has_image_samples,
            has_video_samples=acquisition_message.has_video_samples
        )

    @staticmethod
    def from_json(obj: Dict[str, str]) -> Any:
        """Parse a JSON dictionary to {Acquisition}

        Arguments:
            obj {Dict[str, str]} -- JSON object

        Raises:
            ValueError: unexpected object or sub-object

        Returns:
            Any -- parsed object and sub-objects
        """
        if "type" in obj:
            if obj["type"] == "Acquisition":
                return Acquisition(
                    type=obj["type"],
                    id=obj["id"],
                    tags=obj["tags"],
                    metadata=obj["metadata"],
                    creation_timestamp=obj["creationTimestamp"],
                    sync_offset=obj["syncOffset"],
                    time_unit=obj["timeUnit"],
                    owner_id=obj["ownerId"],
                    creator_id=obj["creatorId"],
                    dataset_id=obj["datasetId"],
                    subject=obj["subject"],
                    devices=obj["devices"],
                    has_timeseries_samples=obj["hasTimeSeriesSamples"],
                    has_image_samples=obj["hasImageSamples"],
                    has_video_samples=obj["hasVideoSamples"]
                )
            elif obj["type"].endswith("Subject"):
                return Subject.from_json(obj)
            elif obj["type"] == "Device":
                return Device.from_json(obj)
            elif obj["type"] == "Sensor":
                return Sensor.from_json(obj)
            else:
                raise ValueError

        return obj

Ancestors

  • dempy._base.Entity

Static methods

def from_json(obj: Dict[str, str]) -> Any

Parse a JSON dictionary to {Acquisition}

Arguments

obj {Dict[str, str]} – JSON object

Raises

ValueError
unexpected object or sub-object

Returns

Any -- parsed object and sub-objects
 
Expand source code
@staticmethod
def from_json(obj: Dict[str, str]) -> Any:
    """Parse a JSON dictionary to {Acquisition}

    Arguments:
        obj {Dict[str, str]} -- JSON object

    Raises:
        ValueError: unexpected object or sub-object

    Returns:
        Any -- parsed object and sub-objects
    """
    if "type" in obj:
        if obj["type"] == "Acquisition":
            return Acquisition(
                type=obj["type"],
                id=obj["id"],
                tags=obj["tags"],
                metadata=obj["metadata"],
                creation_timestamp=obj["creationTimestamp"],
                sync_offset=obj["syncOffset"],
                time_unit=obj["timeUnit"],
                owner_id=obj["ownerId"],
                creator_id=obj["creatorId"],
                dataset_id=obj["datasetId"],
                subject=obj["subject"],
                devices=obj["devices"],
                has_timeseries_samples=obj["hasTimeSeriesSamples"],
                has_image_samples=obj["hasImageSamples"],
                has_video_samples=obj["hasVideoSamples"]
            )
        elif obj["type"].endswith("Subject"):
            return Subject.from_json(obj)
        elif obj["type"] == "Device":
            return Device.from_json(obj)
        elif obj["type"] == "Sensor":
            return Sensor.from_json(obj)
        else:
            raise ValueError

    return obj
def from_protobuf(obj: ByteString) -> Acquisition

Decode a Protobuf message to {Acquisition}

Arguments

obj {ByteString} – message to be decoded

Returns

Acquisition -- decoded acquisition
 
Expand source code
@staticmethod
def from_protobuf(obj: ByteString) -> "Acquisition":
    """Decode a Protobuf message to {Acquisition}

    Arguments:
        obj {ByteString} -- message to be decoded

    Returns:
        Acquisition -- decoded acquisition
    """
    acquisition_message = AcquisitionMessage()
    acquisition_message.ParseFromString(obj)

    return Acquisition(
        type=acquisition_message.entity.type,
        id=acquisition_message.entity.id,
        tags=acquisition_message.entity.tags,
        metadata=acquisition_message.entity.metadata,
        creation_timestamp=acquisition_message.creation_timestamp,
        sync_offset=acquisition_message.sync_offset if acquisition_message.HasField("sync_offset") else None,
        time_unit=acquisition_message.time_unit,
        owner_id=acquisition_message.owner_id if acquisition_message.HasField("owner_id") else None,
        creator_id=acquisition_message.creator_id if acquisition_message.HasField("creator_id") else None,
        dataset_id=acquisition_message.dataset_id if acquisition_message.HasField("dataset_id") else None,
        subject=Subject.from_protobuf(acquisition_message.subject),
        devices=[Device.from_protobuf(d) for d in acquisition_message.devices],
        has_timeseries_samples=acquisition_message.has_timeseries_samples,
        has_image_samples=acquisition_message.has_image_samples,
        has_video_samples=acquisition_message.has_video_samples
    )
def to_protobuf(obj: Acquisition) -> dempy_pb2.Acquisition

Encode an acquisition to a Protobuf message

Arguments

obj {Acquisition} – acquisition to be encoded

Returns

AcquisitionMessage -- encoded acquisition
 
Expand source code
@staticmethod
def to_protobuf(obj: "Acquisition") -> AcquisitionMessage:
    """Encode an acquisition to a Protobuf message

    Arguments:
        obj {Acquisition} -- acquisition to be encoded

    Returns:
        AcquisitionMessage -- encoded acquisition
    """
    acquisition_message = AcquisitionMessage()
    acquisition_message.entity.CopyFrom(Entity.to_protobuf(obj))
    acquisition_message.creation_timestamp = obj.creation_timestamp

    if obj.sync_offset is not None:
        acquisition_message.sync_offset = obj.sync_offset
    if obj.time_unit is not None:
        acquisition_message.time_unit = obj.time_unit
    if obj.owner_id is not None:
        acquisition_message.owner_id = obj.owner_id
    if obj.creator_id is not None:
        acquisition_message.creator_id = obj.creator_id
    if obj.dataset_id is not None:
        acquisition_message.dataset_id = obj.dataset_id

    acquisition_message.subject.CopyFrom(Subject.to_protobuf(obj._subject))
    acquisition_message.devices.extend([Device.to_protobuf(d) for d in obj._devices])
    acquisition_message.has_timeseries_samples = obj.has_timeseries_samples
    acquisition_message.has_image_samples = obj.has_image_samples
    acquisition_message.has_video_samples = obj.has_video_samples

    return acquisition_message

Instance variables

var annotations

Annotations' API

Expand source code
@property
def annotations(self):
    """Annotations' API"""
    class Inner:
        _ANNOTATIONS_ENDPOINT = _ENDPOINT + "{}/annotations/".format(self.id)

        @staticmethod
        def get(annotation_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> AnnotationList:
            """Get all the annotations that belong to this acquisition

            Keyword Arguments:
                annotation_id {str} -- id of the annotation (default: {None})
                tags {List[str]} -- tags of the annotation (default: {[]})
                metadata {Dict[str, str]} -- metadata of the annotation (default: {{}})

            Returns:
                AnnotationList -- annotation or annotation of video samples
            """
            if annotation_id is None:
                if len(tags) > 0 or len(metadata) > 0:
                    processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                    annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                        .json(object_hook=Annotation.from_json)
                    return AnnotationList(annotations)

                annotations = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT).json(object_hook=Annotation.from_json)
                for annotation in annotations:
                    cache._cache_data("annotations", annotation.id, annotation, Annotation.to_protobuf)
                return AnnotationList(annotations)
            else:
                try:
                    annotation = cache._get_cached_data("annotations", annotation_id, Annotation.from_protobuf)
                except FileNotFoundError:
                    annotation = _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + annotation_id).json(object_hook=Annotation.from_json)
                    cache._cache_data("annotations", annotation_id, annotation, Annotation.to_protobuf)
                return annotation

        @staticmethod
        def count() -> int:
            """Get the number of annotations on this acquisition

            Returns:
                int -- number of annotations
            """
            return _api_calls.get(Inner._ANNOTATIONS_ENDPOINT + "count").json()

    return Inner()
var devices

Devices' API

Expand source code
@property
def devices(self):
    """Devices' API"""
    class Inner:
        _DEVICES_ENDPOINT = _ENDPOINT + "{}/devices/".format(self.id)

        @staticmethod
        def get(device_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[Device, List[Device]]:
            """Get a device identified by `device_id` or list of devices on this acquisition

            Keyword Arguments:
                device_id {str} -- id of the device (default: {None})
                tags {List[str]} -- tags of the devices (default: {[]})
                metadata {Dict[str, str]} -- metadata of the devices (default: {{}})

            Raises:
                IndexError: device identified by `device_id` does not exist in this acquisition

            Returns:
                Union[Device, List[Device]] -- device or list of devices
            """
            if device_id is None:
                if len(tags) > 0 or len(metadata) > 0:
                    return [d for d in self._devices if
                            len([k for k in d.metadata if k in metadata and d.metadata[k] == metadata[k]]) > 0]

                return self._devices
            else:
                try:
                    device = next((device for device in self._devices if device.id == device_id))
                except StopIteration:
                    raise IndexError(f"device id {device_id} does not exist in acquisition id {self.id}")
                return device

        @staticmethod
        def usage() -> Dict[str, List[str]]:
            """Get a map identifying which device(s) and sensor(s) were used to acquire time series samples

            Returns:
                Mapping[str, List[str]] -- map of (key, value) pairs, with key being id of device and the value
                a list of sensor ids which were used to capture samples
            """
            return _api_calls.get(Inner._DEVICES_ENDPOINT + "usage").json()

        @staticmethod
        def count() -> int:
            """Get the number of devices on this acquisition

            Returns:
                int -- number of devices
            """
            return len(self._devices)

    return Inner()
var image_samples

Image samples' API

Expand source code
@property
def image_samples(self):
    """Image samples' API"""
    class Inner:
        _IMAGE_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/images/".format(self.id)

        @staticmethod
        def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[ImageSample, SampleList]:
            """Get all the image samples that belong to this acquisition

            Keyword Arguments:
                sample_id {str} -- id of the sample (default: {None})
                tags {List[str]} -- tags of image samples (default: {[]})
                metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

            Returns:
                Union[ImageSample, SampleList] -- image sample or list of image samples
            """
            if sample_id is None:
                if len(tags) > 0 or len(metadata) > 0:
                    processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                    samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                        .json(object_hook=ImageSample.from_json)
                    return SampleList(samples)

                samples = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT).json(object_hook=ImageSample.from_json)
                for sample in samples:
                    cache._cache_data("samples/{}/images/".format(self.id), sample.id, sample, ImageSample.to_protobuf)
                return SampleList(samples)
            else:
                try:
                    sample = cache._get_cached_data("samples/{}/images/".format(self.id), sample_id, ImageSample.from_protobuf)
                except FileNotFoundError:
                    sample = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id).json(object_hook=ImageSample.from_json)
                    cache._cache_data("samples/{}/images/".format(self.id), sample_id, sample, ImageSample.to_protobuf)
                return sample

        @staticmethod
        def raw(sample_id: str) -> ByteString:
            """Get actual image from image sample identified by `sample_id` on this acquisition

            Arguments:
                sample_id {str} -- id of the sample

            Returns:
                ByteString -- bytes of the image
            """
            try:
                image = cache._get_cached_data("samples/{}/images/raw/".format(self.id), sample_id)
            except FileNotFoundError:
                image = _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + sample_id + "/raw")
                file_ext = "." + image.headers["Content-Type"].split("/")[-1]
                cache._cache_data("samples/{}/images/raw/".format(self.id), sample_id + file_ext, image.content)

            return image

        @staticmethod
        def count() -> int:
            """Get the number of image samples on this acquisition

            Returns:
                int -- number of image samples
            """
            return _api_calls.get(Inner._IMAGE_SAMPLE_ENDPOINT + "count").json()

        @staticmethod
        def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
            """Visualize the image of a sample identified by `sample_id`.
            By default opens the predefined system image application.
            A different callback can be given to open the image.

            Arguments:
                sample_id {str} -- id of the sample

            Keyword Arguments:
                backend {Callable[[str], None]} -- backend to open the image with (default: {None})
            """
            self.image_samples.raw(sample_id)
            image_path = cache._build_cache_path("samples/{}/images/raw/".format(self.id), sample_id)
            image_path = cache._add_file_extension(image_path)

            if backend is None:
                system = platform.system()

                if system == "Darwin":
                    subprocess.call(("open", image_path))
                elif system == "Windows":
                    os.startfile(image_path)
                else:
                    subprocess.call(("xdg-open", image_path))
            else:
                backend(image_path)

    return Inner()
var subject

Subject's API

Expand source code
@property
def subject(self):    
    """Subject's API"""
    class Inner:
        _SUBJECT_ENDPOINT = _ENDPOINT + "{}/subjects/".format(self.id)

        @staticmethod
        def get() -> Subject:
            """Get subject from acquisition

            Returns:
                Subject -- subject of the acquisition
            """
            return self._subject

    return Inner()
var timeseries_samples

Timeseries samples' API

Expand source code
@property
def timeseries_samples(self):
    """Timeseries samples' API"""
    class Inner:
        _TIMESERIES_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/timeseries/".format(self.id)

        @staticmethod
        def get(tags: List[str] = [], metadata: Dict[str, str] = {}) -> SampleList:
            """Get all the timeseries samples that belong to this acquisition

            Keyword Arguments:
                tags {List[str]} -- tags of the timeseries samples (default: {[]})
                metadata {Dict[str, str]} -- metadata of the timeseries samples (default: {{}})

            Returns:
                SampleList -- list of timeseries samples
            """
            if len(tags) > 0 or len(metadata) > 0:
                processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                    .json(object_hook=TimeseriesSample.from_json)
                samples.sort(key=lambda sample: sample.timestamp)
                return SampleList(samples)

            try:
                samples = cache._get_cached_data("samples/{}/".format(self.id), "timeseries", SampleList.from_protobuf)
            except FileNotFoundError:
                samples = _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT) \
                    .json(object_hook=TimeseriesSample.from_json)
                samples.sort(key=lambda sample: sample.timestamp)
                samples = SampleList(samples)
                cache._cache_data("samples/{}/".format(self.id), "timeseries", samples, SampleList.to_protobuf)
            return samples

        @staticmethod
        def count() -> int:
            """Get the number of timeseries samples on this acquisition

            Returns:
                int -- number of timeseries samples
            """
            return _api_calls.get(Inner._TIMESERIES_SAMPLE_ENDPOINT + "count").json()

        @staticmethod
        def visualize(device_id: str, sensor_id: str = None) -> None:
            """Graphically visualize the timeseries samples of a device identified by `device_id` 
            or of a given sensor identified by `sensor_id` of said device

            Arguments:
                device_id {str} -- id of the device

            Keyword Arguments:
                sensor_id {str} -- id of the sensor (default: {None})
            """
            def visualize_sensor_samples(axis, sensor, sensor_samples):
                timestamps = [s.timestamp for s in sensor_samples]

                # Sample x, y, z, u, w
                samples_x = [s.x for s in sensor_samples if hasattr(s, "x")]
                samples_y = [s.y for s in sensor_samples if hasattr(s, "y")]
                samples_z = [s.z for s in sensor_samples if hasattr(s, "z")]
                samples_u = [s.u for s in sensor_samples if hasattr(s, "u")]
                samples_w = [s.w for s in sensor_samples if hasattr(s, "w")]

                # Title and x label
                axis.set_title(f"{sensor.sensor_type}\n{sensor.id}", loc="left")
                axis.set_xlabel(sensor.time_unit if sensor.time_unit is not None else device_time_unit)

                # Axis limit
                axis.set_xlim([0, timestamps[-1]])
                axis.set_ylim([min(chain(samples_x, samples_y, samples_z, samples_u, samples_w)),
                               max(chain(samples_x, samples_y, samples_z, samples_u, samples_w))])

                # Axis formatter
                axis.xaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))
                axis.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))

                # Axis plot
                labels = []

                if len(samples_x) > 0:
                    axis.plot(timestamps, samples_x, color="cornflowerblue")
                    labels.append("x")
                if len(samples_y) > 0:
                    axis.plot(timestamps, samples_y, color="mediumseagreen")
                    labels.append("y")
                if len(samples_z) > 0:
                    axis.plot(timestamps, samples_z, color="indianred")
                    labels.append("z")
                if len(samples_u) > 0:
                    axis.plot(timestamps, samples_u, color="mediumorchid")
                    labels.append("u")
                if len(samples_w) > 0:
                    axis.plot(timestamps, samples_w, color="slategray")
                    labels.append("w")

                axis.legend(labels=labels, loc="upper right")

            device = self.devices.get(device_id=device_id)
            device_time_unit = device.time_unit if device.time_unit is not None else self.time_unit

            if sensor_id is None:
                fig, axs = plt.subplots(nrows=device.sensors.count(), figsize=(15, 10), dpi=80, constrained_layout=True)
                fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                device_samples = self.timeseries_samples.get().by_device(device_id=device.id)

                for i, sensor in enumerate(device.sensors.get()):
                    visualize_sensor_samples(axs[i], sensor, device_samples.by_sensor(sensor.id))
            else:
                fig, ax = plt.subplots(nrows=1, figsize=(10, 4), dpi=80, constrained_layout=True)
                fig.suptitle(f"{device.model_name} ({device.manufacturer})\n{device.id}", wrap=True)

                sensor_samples = self.timeseries_samples.get().by_sensor(sensor_id)

                visualize_sensor_samples(ax, device.sensors.get(sensor_id=sensor_id), sensor_samples)

            plt.show()

    return Inner()
var video_samples

Video samples' API

Expand source code
@property
def video_samples(self):
    """Video samples' API"""
    class Inner:
        _VIDEO_SAMPLE_ENDPOINT = _ENDPOINT + "{}/samples/videos/".format(self.id)

        @staticmethod
        def get(sample_id: str = None, tags: List[str] = [], metadata: Dict[str, str] = {}) -> Union[VideoSample, SampleList]:
            """Get all the video samples that belong to this acquisition

            Keyword Arguments:
                sample_id {str} -- id of the sample (default: {None})
                tags {List[str]} -- tags of image samples (default: {[]})
                metadata {Dict[str, str]} -- metadata of the image samples (default: {{}})

            Returns:
                Union[VideoSample, SampleList] -- video sample or list of video samples
            """
            if sample_id is None:
                if len(tags) > 0 or len(metadata) > 0:
                    processed_metadata = {f"metadata.{k}": metadata[k] for k in metadata}
                    samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT, params={"tags": tags, **processed_metadata}) \
                        .json(object_hook=VideoSample.from_json)
                    return SampleList(samples)

                samples = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT).json(object_hook=VideoSample.from_json)
                for sample in samples:
                    cache._cache_data("samples/{}/videos/".format(self.id), sample.id, sample, VideoSample.to_protobuf)
                return SampleList(samples)
            else:
                try:
                    sample = cache._get_cached_data("samples/{}/videos/".format(self.id), sample_id, VideoSample.from_protobuf)
                except FileNotFoundError:
                    sample = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id).json(object_hook=VideoSample.from_json)
                    cache._cache_data("samples/{}/videos/".format(self.id), sample_id, sample, VideoSample.to_protobuf)
                return sample

        @staticmethod
        def raw(sample_id: str) -> ByteString:
            """Get actual video from video sample identified by `sample_id` on this acquisition 

            Arguments:
                sample_id {str} -- id of the sample

            Returns:
                ByteString -- bytes of the video
            """
            try:
                video = cache._get_cached_data("samples/{}/videos/raw/".format(self.id), sample_id)
            except FileNotFoundError:
                video = _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + sample_id + "/raw")
                file_ext = "." + video.headers["Content-Type"].split("/")[-1]
                cache._cache_data("samples/{}/videos/raw/".format(self.id), sample_id + file_ext, video.content)

            return video

        @staticmethod
        def count() -> int:
            """Get the number of video samples on this acquisition

            Returns:
                int -- number of video samples
            """
            return _api_calls.get(Inner._VIDEO_SAMPLE_ENDPOINT + "count").json()

        @staticmethod
        def visualize(sample_id: str, backend: Callable[[str], None] = None) -> None:
            """Visualize the video of a sample identified by `sample_id`.
            By default opens the predefined system video application.
            A different callback can be given to open the video.

            Arguments:
                sample_id {str} -- id of the sample

            Keyword Arguments:
                backend {Callable[[str], None]} -- backend to open the video with (default: {None})
            """
            self.video_samples.raw(sample_id)
            video_path = cache._build_cache_path("samples/{}/videos/raw/".format(self.id), sample_id)
            video_path = cache._add_file_extension(video_path)

            if backend is None:
                system = platform.system()

                if system == "Darwin":
                    subprocess.call(("open", video_path))
                elif system == "Windows":
                    os.startfile(video_path)
                else:
                    subprocess.call(("xdg-open", video_path))
            else:
                backend(video_path)

    return Inner()