From 1c208ec085402e30ba868f71b84463ab5f523f02 Mon Sep 17 00:00:00 2001 From: Kirill Bolashev Date: Mon, 16 Feb 2026 16:37:45 +0200 Subject: [PATCH 1/5] Overhaul blob loading, changing how the hash is handled and simplifying the get_blob_fields functions --- dagshub/data_engine/annotation/metadata.py | 21 +++++++ dagshub/data_engine/model/datapoint.py | 51 +++++++++++---- dagshub/data_engine/model/query_result.py | 42 ++++++------- .../test_annotation_parsing.py | 62 +++++++++++++++---- 4 files changed, 130 insertions(+), 46 deletions(-) diff --git a/dagshub/data_engine/annotation/metadata.py b/dagshub/data_engine/annotation/metadata.py index 60e9e645..06f7bc28 100644 --- a/dagshub/data_engine/annotation/metadata.py +++ b/dagshub/data_engine/annotation/metadata.py @@ -337,3 +337,24 @@ def to_ls_task(self) -> Optional[bytes]: def __repr__(self): return "Label Studio annotations of unrecognized type" + + +class ErrorMetadataAnnotations(MetadataAnnotations, metaclass=NotImplementedMeta): + def __init__( + self, + datapoint: "Datapoint", + field: str, + error_message: str, + ): + super().__init__(datapoint, field, None, None, None) + self._error_message = error_message + + @property + def value(self) -> Optional[bytes]: + raise ValueError(self._error_message) + + def to_ls_task(self) -> Optional[bytes]: + raise ValueError(self._error_message) + + def __repr__(self): + return f"Label Studio annotation download error: {self._error_message}" diff --git a/dagshub/data_engine/model/datapoint.py b/dagshub/data_engine/model/datapoint.py index b7aa89b5..66e30eba 100644 --- a/dagshub/data_engine/model/datapoint.py +++ b/dagshub/data_engine/model/datapoint.py @@ -3,14 +3,14 @@ from dataclasses import dataclass from os import PathLike from pathlib import Path -from typing import Optional, Union, List, Dict, Any, Callable, TYPE_CHECKING, Literal, Sequence +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Sequence, Union -from tenacity import Retrying, stop_after_attempt, wait_exponential, before_sleep_log, retry_if_exception_type +from tenacity import Retrying, before_sleep_log, retry_if_exception_type, stop_after_attempt, wait_exponential from dagshub.common.download import download_files from dagshub.common.helpers import http_request from dagshub.data_engine.annotation import MetadataAnnotations -from dagshub.data_engine.client.models import MetadataSelectFieldSchema, DatapointHistoryResult +from dagshub.data_engine.client.models import DatapointHistoryResult, MetadataSelectFieldSchema from dagshub.data_engine.dtypes import MetadataFieldType if TYPE_CHECKING: @@ -25,6 +25,23 @@ logger = logging.getLogger(__name__) +@dataclass(frozen=True) +class BlobHashMetadata: + hash: str + + def __str__(self) -> str: + return self.hash + + def __repr__(self) -> str: + return f"BlobHashMetadata(hash={self.hash!r})" + + +class BlobDownloadError(Exception): + def __init__(self, message): + super().__init__(message) + self.message = message + + @dataclass class Datapoint: datapoint_id: int @@ -128,6 +145,7 @@ def from_gql_edge(edge: Dict, datasource: "Datasource", fields: List[MetadataSel float_fields = {f.name for f in fields if f.valueType == MetadataFieldType.FLOAT} date_fields = {f.name for f in fields if f.valueType == MetadataFieldType.DATETIME} + blob_fields = {f.name for f in fields if f.valueType == MetadataFieldType.BLOB} for meta_dict in edge["node"]["metadata"]: key = meta_dict["key"] @@ -138,6 +156,8 @@ def from_gql_edge(edge: Dict, datasource: "Datasource", fields: List[MetadataSel if key in date_fields: timezone = meta_dict.get("timeZone") value = _datetime_from_timestamp(value / 1000, timezone or "+00:00") + elif key in blob_fields and isinstance(value, str): + value = BlobHashMetadata(value) res.metadata[key] = value return res @@ -164,7 +184,7 @@ def get_blob(self, column: str, cache_on_disk=True, store_value=False) -> bytes: if type(current_value) is bytes: # Bytes - it's already there! return current_value - if isinstance(current_value, Path): + elif isinstance(current_value, Path): # Path - assume the path exists and is already downloaded, # because it's unlikely that the user has set it themselves with current_value.open("rb") as f: @@ -173,18 +193,16 @@ def get_blob(self, column: str, cache_on_disk=True, store_value=False) -> bytes: self.metadata[column] = content return content - elif type(current_value) is str: - # String - This is probably the hash of the blob, get that from dagshub - blob_url = self.blob_url(current_value) - blob_location = self.blob_cache_location / current_value + elif isinstance(current_value, BlobHashMetadata): + # Blob hash metadata - download blob from DagsHub + blob_url = self.blob_url(current_value.hash) + blob_location = self.blob_cache_location / current_value.hash # Make sure that the cache location exists if cache_on_disk: self.blob_cache_location.mkdir(parents=True, exist_ok=True) content = _get_blob(blob_url, blob_location, self.datasource.source.repoApi.auth, cache_on_disk, True) - if type(content) is str: - raise RuntimeError(f"Error while downloading blob: {content}") if store_value: self.metadata[column] = content @@ -192,6 +210,8 @@ def get_blob(self, column: str, cache_on_disk=True, store_value=False) -> bytes: self.metadata[column] = blob_location return content + elif isinstance(current_value, MetadataAnnotations): + return current_value.to_ls_task() else: raise ValueError(f"Can't extract blob metadata from value {current_value} of type {type(current_value)}") @@ -274,10 +294,17 @@ def _get_blob( """ Args: url: url to download the blob from - cache_path: where the cache for the blob is (laods from it if exists, stores there if it doesn't) + cache_path: where the cache for the blob is (loads from it if exists, stores there if it doesn't) auth: auth to use for getting the blob cache_on_disk: whether to store the downloaded blob on disk. If False we also turn off the cache checking return_blob: if True returns the blob of the downloaded data, if False returns the path to the file with it + path_format: if return_blob is False, controls path representation. "path" returns Path, "str" returns str + + Returns: + bytes, Path, or str path on success. + + Raises: + BlobDownloadError on download failure. """ if url is None: return None @@ -313,7 +340,7 @@ def get(): with attempt: content = get() except Exception as e: - return f"Error while downloading binary blob: {e}" + raise BlobDownloadError(str(e)) from e if cache_on_disk: with cache_path.open("wb") as f: diff --git a/dagshub/data_engine/model/query_result.py b/dagshub/data_engine/model/query_result.py index 2a0e706e..1eaf40c7 100644 --- a/dagshub/data_engine/model/query_result.py +++ b/dagshub/data_engine/model/query_result.py @@ -30,7 +30,7 @@ from dagshub.common.rich_util import get_rich_progress from dagshub.common.util import lazy_load, multi_urljoin from dagshub.data_engine.annotation import MetadataAnnotations -from dagshub.data_engine.annotation.metadata import UnsupportedMetadataAnnotations +from dagshub.data_engine.annotation.metadata import ErrorMetadataAnnotations, UnsupportedMetadataAnnotations from dagshub.data_engine.annotation.voxel_conversion import ( add_ls_annotations, add_voxel_annotations, @@ -38,7 +38,13 @@ from dagshub.data_engine.client.loaders.base import DagsHubDataset from dagshub.data_engine.client.models import DatasourceType, MetadataSelectFieldSchema from dagshub.data_engine.dtypes import MetadataFieldType -from dagshub.data_engine.model.datapoint import Datapoint, _generated_fields, _get_blob +from dagshub.data_engine.model.datapoint import ( + BlobDownloadError, + BlobHashMetadata, + Datapoint, + _generated_fields, + _get_blob, +) from dagshub.data_engine.model.schema_util import dacite_config from dagshub.data_engine.voxel_plugin_server.utils import set_voxel_envvars @@ -390,10 +396,9 @@ def get_blob_fields( for dp in self.entries: for fld in fields: field_value = dp.metadata.get(fld) - # If field_value is a blob or a path, then ignore, means it's already been downloaded - if not isinstance(field_value, str): + if not isinstance(field_value, BlobHashMetadata): continue - download_task = (dp, fld, dp.blob_url(field_value), dp.blob_cache_location / field_value) + download_task = (dp, fld, dp.blob_url(field_value.hash), dp.blob_cache_location / field_value.hash) to_download.append(download_task) progress = get_rich_progress(rich.progress.MofNCompleteColumn()) @@ -403,8 +408,6 @@ def get_blob_fields( def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path): blob_or_path = _get_blob(url, blob_path, auth, cache_on_disk, load_into_memory, path_format) - if isinstance(blob_or_path, str) and path_format != "str": - logger.warning(f"Error while downloading blob for field {field} in datapoint {dp.path}:{blob_or_path}") dp.metadata[field] = blob_or_path with progress: @@ -416,7 +419,7 @@ def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path): logger.warning(f"Got exception {type(exc)} while downloading blob: {exc}") progress.update(task, advance=1) - self._convert_annotation_fields(*fields, load_into_memory=load_into_memory) + self._convert_annotation_fields(*fields) # Convert any downloaded document fields document_fields = [f for f in fields if f in self.document_fields] @@ -425,18 +428,14 @@ def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path): if document_fields: for dp in self: for fld in document_fields: - if fld in dp.metadata: - # Defensive check to not mangle annotation fields by accident - if isinstance(dp.metadata[fld], MetadataAnnotations): - continue - # Force load the content into memory, even if load_into_memory was set to False - if not load_into_memory or isinstance(dp.metadata[fld], Path): - dp.metadata[fld] = Path(dp.metadata[fld]).read_bytes() - dp.metadata[fld] = dp.metadata[fld].decode("utf-8") + if fld not in dp.metadata: + continue + content = dp.get_blob(fld) + dp.metadata[fld] = content.decode("utf-8") return self - def _convert_annotation_fields(self, *fields, load_into_memory): + def _convert_annotation_fields(self, *fields): # Convert any downloaded annotation column annotation_fields = [f for f in fields if f in self.annotation_fields] if not annotation_fields: @@ -457,13 +456,14 @@ def _convert_annotation_fields(self, *fields, load_into_memory): continue # Parse annotation from the content of the field else: - # Force load the content into memory, even if load_into_memory was set to False - if not load_into_memory or isinstance(dp.metadata[fld], Path): - metadata_value = Path(metadata_value).read_bytes() try: + annotation_content = dp.get_blob(fld) dp.metadata[fld] = MetadataAnnotations.from_ls_task( - datapoint=dp, field=fld, ls_task=metadata_value + datapoint=dp, field=fld, ls_task=annotation_content ) + except BlobDownloadError as e: + dp.metadata[fld] = ErrorMetadataAnnotations(datapoint=dp, field=fld, error_message=e.message) + bad_annotations[fld].append(dp.path) except ValidationError: dp.metadata[fld] = UnsupportedMetadataAnnotations( datapoint=dp, field=fld, original_value=metadata_value diff --git a/tests/data_engine/annotation_import/test_annotation_parsing.py b/tests/data_engine/annotation_import/test_annotation_parsing.py index 6fce8d78..1932f556 100644 --- a/tests/data_engine/annotation_import/test_annotation_parsing.py +++ b/tests/data_engine/annotation_import/test_annotation_parsing.py @@ -9,9 +9,10 @@ from pytest import MonkeyPatch from dagshub.data_engine.annotation import MetadataAnnotations -from dagshub.data_engine.annotation.metadata import UnsupportedMetadataAnnotations +from dagshub.data_engine.annotation.metadata import ErrorMetadataAnnotations, UnsupportedMetadataAnnotations from dagshub.data_engine.dtypes import MetadataFieldType, ReservedTags -from dagshub.data_engine.model import query_result +from dagshub.data_engine.model import datapoint, query_result +from dagshub.data_engine.model.datapoint import BlobDownloadError, BlobHashMetadata from dagshub.data_engine.model.datasource import Datasource from dagshub.data_engine.model.query_result import QueryResult from tests.data_engine.util import add_metadata_field @@ -61,12 +62,15 @@ def mock_get_blob(*args, **kwargs) -> Union[bytes, PathLike]: load_into_memory = args[4] blob_path = _res_folder / f"{blob_hash}.json" - if not blob_path.exists(): - raise FileNotFoundError(f"Mock blob file not found: {blob_path}") - if load_into_memory: - return blob_path.read_bytes() - else: - return blob_path + try: + if not blob_path.exists(): + raise FileNotFoundError(f"Blob with hash {blob_hash} not found in res folder") + if load_into_memory: + return blob_path.read_bytes() + else: + return blob_path + except Exception as e: + raise BlobDownloadError(str(e)) from e def _ds_with_annotation(ds: "Datasource", monkeypatch: MonkeyPatch, annotation_hash: str): @@ -82,6 +86,7 @@ def _ds_with_annotation(ds: "Datasource", monkeypatch: MonkeyPatch, annotation_h ) monkeypatch.setattr(query_result, "_get_blob", mock_get_blob) + monkeypatch.setattr(datapoint, "_get_blob", mock_get_blob) return ds @@ -91,11 +96,6 @@ def ds_with_document_annotation(ds, monkeypatch): yield _ds_with_annotation(ds, monkeypatch, "annotation1") -@pytest.fixture -def ds_with_unsupported_annotation(ds, monkeypatch): - yield _ds_with_annotation(ds, monkeypatch, "audio_annotation") - - def test_annotation_with_document_are_parsed_as_annotation(ds_with_document_annotation): qr = ds_with_document_annotation.all() _test_annotation(qr) @@ -115,6 +115,11 @@ def _test_annotation(qr: QueryResult): assert isinstance(annotation.annotations[0], IRSegmentationImageAnnotation) +@pytest.fixture +def ds_with_unsupported_annotation(ds, monkeypatch): + yield _ds_with_annotation(ds, monkeypatch, "audio_annotation") + + def test_handling_unsupported_annotation(ds_with_unsupported_annotation): qr = ds_with_unsupported_annotation.all() @@ -132,3 +137,34 @@ def test_handling_unsupported_annotation(ds_with_unsupported_annotation): expected_content = (_res_folder / "audio_annotation.json").read_bytes() assert annotation.value == expected_content assert annotation.to_ls_task() == expected_content + + +@pytest.fixture +def ds_with_nonexistent_annotation(ds, monkeypatch): + yield _ds_with_annotation(ds, monkeypatch, "nonexistent_annotation") + + +def test_nonexistent_annotation(ds_with_nonexistent_annotation): + qr = ds_with_nonexistent_annotation.all(load_documents=False, load_annotations=False) + qr.get_annotations() + + annotation: MetadataAnnotations = qr[0].metadata[_annotation_field_name] + + assert isinstance(annotation, ErrorMetadataAnnotations) + # Unsupported annotation is still a subclass of regular annotation + # This is crucial for logic that checks if annotation metadata was parsed already, + # so if this starts failing, that logic will need to be changed too + assert isinstance(annotation, MetadataAnnotations) + + with pytest.raises(NotImplementedError): + annotation.add_image_bbox("cat", 0, 0, 10, 10, 1920, 1080) + + with pytest.raises(ValueError, match="Blob with hash nonexistent_annotation not found in res folder"): + _ = annotation.value + with pytest.raises(ValueError, match="Blob with hash nonexistent_annotation not found in res folder"): + annotation.to_ls_task() + + +def test_blob_metadata_is_wrapped_from_backend(ds_with_document_annotation): + qr = ds_with_document_annotation.all(load_documents=False, load_annotations=False) + assert isinstance(qr[0].metadata[_annotation_field_name], BlobHashMetadata) From 81260deb35d66951ba566360be5be8d5ee7e5d75 Mon Sep 17 00:00:00 2001 From: Kirill Bolashev Date: Mon, 16 Feb 2026 16:44:27 +0200 Subject: [PATCH 2/5] Typo --- dagshub/data_engine/model/query_result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dagshub/data_engine/model/query_result.py b/dagshub/data_engine/model/query_result.py index 1eaf40c7..57b646f2 100644 --- a/dagshub/data_engine/model/query_result.py +++ b/dagshub/data_engine/model/query_result.py @@ -447,7 +447,7 @@ def _convert_annotation_fields(self, *fields): for dp in self: for fld in annotation_fields: metadata_value = dp.metadata.get(fld) - # No value - create ampty annotation container + # No value - create empty annotation container if metadata_value is None: dp.metadata[fld] = MetadataAnnotations(datapoint=dp, field=fld) continue From 342612ba25fb5e9980ef629cc3b9a738252ab18b Mon Sep 17 00:00:00 2001 From: Kirill Bolashev Date: Mon, 16 Feb 2026 16:47:58 +0200 Subject: [PATCH 3/5] Comment logic fix --- tests/data_engine/annotation_import/test_annotation_parsing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/data_engine/annotation_import/test_annotation_parsing.py b/tests/data_engine/annotation_import/test_annotation_parsing.py index 1932f556..c04b0d51 100644 --- a/tests/data_engine/annotation_import/test_annotation_parsing.py +++ b/tests/data_engine/annotation_import/test_annotation_parsing.py @@ -151,7 +151,7 @@ def test_nonexistent_annotation(ds_with_nonexistent_annotation): annotation: MetadataAnnotations = qr[0].metadata[_annotation_field_name] assert isinstance(annotation, ErrorMetadataAnnotations) - # Unsupported annotation is still a subclass of regular annotation + # Error annotation is still a subclass of regular annotation # This is crucial for logic that checks if annotation metadata was parsed already, # so if this starts failing, that logic will need to be changed too assert isinstance(annotation, MetadataAnnotations) From 6aa0ffa9b3f44ea3e556776a9aa66b50476b81f3 Mon Sep 17 00:00:00 2001 From: Kirill Bolashev Date: Mon, 16 Feb 2026 17:12:33 +0200 Subject: [PATCH 4/5] CR fixes --- dagshub/data_engine/model/datapoint.py | 5 ++++- dagshub/data_engine/model/query_result.py | 7 +++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/dagshub/data_engine/model/datapoint.py b/dagshub/data_engine/model/datapoint.py index 66e30eba..f0c31925 100644 --- a/dagshub/data_engine/model/datapoint.py +++ b/dagshub/data_engine/model/datapoint.py @@ -211,7 +211,10 @@ def get_blob(self, column: str, cache_on_disk=True, store_value=False) -> bytes: return content elif isinstance(current_value, MetadataAnnotations): - return current_value.to_ls_task() + ls_task = current_value.to_ls_task() + if ls_task is None: + return b"" + return ls_task else: raise ValueError(f"Can't extract blob metadata from value {current_value} of type {type(current_value)}") diff --git a/dagshub/data_engine/model/query_result.py b/dagshub/data_engine/model/query_result.py index 57b646f2..deaf364c 100644 --- a/dagshub/data_engine/model/query_result.py +++ b/dagshub/data_engine/model/query_result.py @@ -430,8 +430,11 @@ def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path): for fld in document_fields: if fld not in dp.metadata: continue - content = dp.get_blob(fld) - dp.metadata[fld] = content.decode("utf-8") + try: + content = dp.get_blob(fld) + dp.metadata[fld] = content.decode("utf-8") + except BlobDownloadError as e: + logger.warning(f"Failed to download document field '{fld}' for datapoint '{dp.path}': {e}") return self From 7c8fa55d35b4f378e1a48cd64cd3ba885bd51754 Mon Sep 17 00:00:00 2001 From: Kirill Bolashev Date: Mon, 16 Feb 2026 17:23:22 +0200 Subject: [PATCH 5/5] CR fixes --- dagshub/data_engine/model/query_result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dagshub/data_engine/model/query_result.py b/dagshub/data_engine/model/query_result.py index deaf364c..b986b5c3 100644 --- a/dagshub/data_engine/model/query_result.py +++ b/dagshub/data_engine/model/query_result.py @@ -469,7 +469,7 @@ def _convert_annotation_fields(self, *fields): bad_annotations[fld].append(dp.path) except ValidationError: dp.metadata[fld] = UnsupportedMetadataAnnotations( - datapoint=dp, field=fld, original_value=metadata_value + datapoint=dp, field=fld, original_value=annotation_content ) bad_annotations[fld].append(dp.path)