Source code for pyvrs.base

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union

from . import ImageConversion, RecordType

from .record import VRSRecord
from .slice import VRSReaderSlice


[docs]class BaseVRSReader(ABC): """A Pythonic reader for VRS files. Behaves as a filterable list - has a length (number of records), can be indexed to retrieve VRSRecords, and can be iterated over and sliced just like a regular Python list. Significant file reads are only done when record state is queried, so it remains performant even for larger VRS files. This has several child classes and following are the relationship between them. :: |-- SyncVRSReader |-- VRSReader -- | |-- AsyncVRSReader BaseVRSReader --| | |-- SyncFilteredVRSReader |-- FilteredVRSReader -- |-- AsyncFilteredVRSReader Note: - BaseVRSReader: Base abstract class that defines the common functions across all child classes. - VRSReader: Abstract class that represents entire file (e.g. file without any filters). The methods in child classes of this operats against all records. When user call filtered_by_fields method, that call will create FilteredVRSReader that represents slice of the file. - FilteredVRSReader: Abstract class that represents the slice of the original file (After applying filter). This class essentially has the exact same methods as VRSReader but operate against subset of the file. Note that you can't 're-filter' an already filtered VRSReader. - SyncVRSReader: Synchronous version of VRSReader. - AsyncVRSReader: Asynchronous version of VRSReader, only difference between SyncVRSReader is AsyncVRSReader supports __aiter__, __anext__ for async iteration as well as __getitem__ operates asynchronously. - SyncFilteredVRSReader: Synchronous version of FilteredVRSReader. - AsyncFilteredVRSReader: Asynchronous version of FilteredVRSReader, same difference as SyncVRSReader vs AsyncVRSReader. """ @abstractmethod def __getitem__(self, i: Union[int, slice]) -> Union[VRSRecord, VRSReaderSlice]: raise NotImplementedError() def __iter__(self) -> Iterator[VRSRecord]: for i in range(self.n_records): next_item = self[i] assert isinstance(next_item, VRSRecord) yield next_item @abstractmethod def __len__(self) -> int: raise NotImplementedError() @abstractmethod def __repr__(self) -> str: raise NotImplementedError() @abstractmethod def __str__(self) -> str: raise NotImplementedError() @property @abstractmethod def file_tags(self) -> Mapping[str, str]: """ Return a dict of all file tags present in this VRS file. Returns: Dictionary of all file tags: {<tag>: <value>} """ raise NotImplementedError() @property @abstractmethod def stream_tags(self) -> Mapping[str, Mapping[str, Any]]: """ Return a dict of all per-stream tags present in this VRS file. Returns: Dictionary of all per-stream tags: {<stream_id>: {<tag>: <value>}} """ raise NotImplementedError() @property @abstractmethod def n_records(self) -> int: """Return a number of records in this VRS file.""" raise NotImplementedError() @property @abstractmethod def record_types(self) -> Set[str]: """Return a set of record types in this VRS file.""" raise NotImplementedError() @property @abstractmethod def stream_ids(self) -> Set[str]: """Return a set of stream ids in this VRS file.""" raise NotImplementedError() @property @abstractmethod def min_timestamp(self) -> float: """Return a minimum timestamp of this VRS file.""" raise NotImplementedError() @property @abstractmethod def max_timestamp(self) -> float: """Return a maximum timestamp of this VRS file.""" raise NotImplementedError() @property def time_range(self) -> float: """Return a timestamp range of this VRS file.""" return self.max_timestamp - self.min_timestamp
[docs] @abstractmethod def find_stream( self, recordable_type_id: int, tag_name: str, tag_value: str ) -> str: """ Find stream matching recordable type and tag, and return its stream id. Args: recordable_type_id: stream_id is `<recordable_type_id>-<instance_id>` tag_name: tag name that you are interested in tag_value: tag value that you are interested in Returns: Stream ID that starts with recordable_type_id and has a given tag pair. """ raise NotImplementedError()
[docs] @abstractmethod def find_streams(self, recordable_type_id: int, flavor: str = "") -> List[str]: """ Find streams matching recordable type and flavor, and return sets of stream ids. Args: recordable_type_id: stream_id is `<recordable_type_id>-<instance_id>` tag_name: tag name that you are interested in tag_value: tag value that you are interested in Returns: A set of stream IDs that start with recordable_type_id and has a given flavor. """ raise NotImplementedError()
[docs] @abstractmethod def get_stream_info(self, stream_id: str) -> Dict[str, str]: """ Get details about a stream. Args: stream_id: stream_id you are interested in. Returns: An information about the stream in a dictionary. """ raise NotImplementedError()
[docs] @abstractmethod def get_records_count(self, stream_id: str, record_type: RecordType) -> int: """ Get the number of records for the stream_id & record_type. Args: stream_id: stream_id you are interested in. record_type: record type you are interested in. Returns: The number of records for stream_id & record type """ raise NotImplementedError()
[docs] @abstractmethod def get_timestamp_list(self, indices: Optional[List[int]] = None) -> List[float]: """ Get the list of timestamps corresponding to the given indices. Args: indices: the list of indices we want to get the timestamp. Returns: A list of timestamps correspond to the indices, if indices are None, we get the full timestamp list. """ raise NotImplementedError()
[docs] @abstractmethod def get_timestamp_for_index(self, index: int) -> float: """ Get the timestamp corresponding to the given index. Args: index: the index for the record Returns: A timestamp corresponds to the index """ raise NotImplementedError()
[docs] @abstractmethod def set_image_conversion(self, conversion: ImageConversion) -> None: """ Set default image conversion policy, and clears any stream specific setting. Args: conversion: The image conversion you want to apply for all streams. """ raise NotImplementedError()
[docs] @abstractmethod def set_stream_image_conversion( self, stream_id: str, conversion: ImageConversion ) -> None: """ Set image conversion policy for a specific stream. Args: stream_id: The stream_id you want to apply image conversion to. conversion: The image conversion you want to apply for a specific stream. """ raise NotImplementedError()
[docs] @abstractmethod def set_stream_type_image_conversion( self, recordable_type_id: str, conversion: ImageConversion ) -> int: """ Set image conversion policy for streams of a specific device type. Args: recordable_type_id: The recordable_type_id you want to apply image conversion to. If you specify 1000, streams with id 1000-* are the targets. conversion: The image conversion you want to apply for a specific stream. Returns: The number of streams affected. """ raise NotImplementedError()
[docs] @abstractmethod def might_contain_images(self, stream_id: str) -> bool: """ Check if the given stream_id contains an image data. Args: stream_id: stream_id that you are interested in. Returns: Based on the config record, return if the stream contains an image data. """ raise NotImplementedError()
[docs] @abstractmethod def might_contain_audio(self, stream_id: str) -> bool: """ Check if the given stream_id contains an audio data. Args: stream_id: stream_id that you are interested in. Returns: Based on the config record, return if the stream contains an audio data. """ raise NotImplementedError()
[docs] @abstractmethod def get_estimated_frame_rate(self, stream_id: str) -> float: """ Get the estimated frame rate for the given stream_id. Args: stream_id: stream_id that you are interested in. Returns: The estimated frame rate. """ raise NotImplementedError()
[docs] @abstractmethod def get_record_index_by_time( self, stream_id: str, timestamp: float, epsilon: Optional[float] = None, record_type: Optional[RecordType] = None, ) -> int: """ Get index in filtered records by timestamp. Args: stream_id: stream_id that you are interested in. timestamp: timestamp that you are interested in. epsilon: Optional argument. If specified we search for record in range of (timestamp-epsilon)-(timestamp+epsilon) and returns the nearest record. record_type: Optional argument. If specified we search for record with the record_type. Returns: The absolute index of the record corresponds to the stream_id & timestamp. Raises: TimestampNotFoundError: If epsilon is not None and the record doesn't exist within the time range. ValueError: If epsilon is None and the record isn't found using lower_bound. """ raise NotImplementedError()
[docs] @abstractmethod def read_record_by_time( self, stream_id: str, timestamp: float, epsilon: Optional[float] = None, record_type: Optional[RecordType] = None, ) -> VRSRecord: """ Read record by timestamp. Args: stream_id: stream_id that you are interested in. timestamp: timestamp that you are interested in. epsilon: Optional argument. If specified we search for record in range of (timestamp-epsilon)-(timestamp+epsilon) and returns the nearest record. record_type: Optional argument. If specified we search for record with the record_type. Returns: VRSRecord corresponds to the stream_id & timestamp. Raises: TimestampNotFoundError: If epsilon is not None and the record doesn't exist within the time range. ValueError: If epsilon is None and the record isn't found using lower_bound. """ raise NotImplementedError()
[docs] @abstractmethod def read_prev_record( self, stream_id: str, record_type: str, index: int ) -> Optional[VRSRecord]: """ Read the last record that matches stream_id and record_type and its index is smaller or equal than given index. Args: stream_id: stream_id that you are interested in. record_type: record_type that you are interested in. index: the absolute index in the file. Based on this index, try to find the previous record that matches stream_id & record_type Returns: VRSRecord if there is a record, otherwise None """ raise NotImplementedError()
[docs] @abstractmethod def read_next_record( self, stream_id: str, record_type: str, index: int ) -> Optional[VRSRecord]: """ Read the first record that matches stream_id and record_type and its index is greater or equal than given index. Args: stream_id: stream_id that you are interested in. record_type: record_type that you are interested in. index: the absolute index in the file. Based on this index, try to find the previous record that matches stream_id & record_type Returns: VRSRecord if there is a record, otherwise None """ raise NotImplementedError()
@abstractmethod def _read_record(self, indices: List[int], i: Union[int, slice]): raise NotImplementedError() @abstractmethod def _record_count_by_type_from_stream_id(self, stream_id: str) -> Mapping[str, int]: """Retrieve the number of messages from a given stream id sorted by type.""" raise NotImplementedError() @abstractmethod def _generate_filtered_indices(self) -> List[int]: raise NotImplementedError()