Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions dagshub/data_engine/annotation/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,24 @@ def to_ls_task(self) -> Optional[bytes]:

def __repr__(self):
return "Label Studio annotations of unrecognized type"


class ErrorMetadataAnnotations(MetadataAnnotations, metaclass=NotImplementedMeta):
def __init__(
self,
datapoint: "Datapoint",
field: str,
error_message: str,
):
super().__init__(datapoint, field, None, None, None)
self._error_message = error_message

@property
def value(self) -> Optional[bytes]:
raise ValueError(self._error_message)

def to_ls_task(self) -> Optional[bytes]:
raise ValueError(self._error_message)

def __repr__(self):
return f"Label Studio annotation download error: {self._error_message}"
54 changes: 42 additions & 12 deletions dagshub/data_engine/model/datapoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from dataclasses import dataclass
from os import PathLike
from pathlib import Path
from typing import Optional, Union, List, Dict, Any, Callable, TYPE_CHECKING, Literal, Sequence
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Sequence, Union

from tenacity import Retrying, stop_after_attempt, wait_exponential, before_sleep_log, retry_if_exception_type
from tenacity import Retrying, before_sleep_log, retry_if_exception_type, stop_after_attempt, wait_exponential

from dagshub.common.download import download_files
from dagshub.common.helpers import http_request
from dagshub.data_engine.annotation import MetadataAnnotations
from dagshub.data_engine.client.models import MetadataSelectFieldSchema, DatapointHistoryResult
from dagshub.data_engine.client.models import DatapointHistoryResult, MetadataSelectFieldSchema
from dagshub.data_engine.dtypes import MetadataFieldType

if TYPE_CHECKING:
Expand All @@ -25,6 +25,23 @@
logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class BlobHashMetadata:
hash: str

def __str__(self) -> str:
return self.hash

def __repr__(self) -> str:
return f"BlobHashMetadata(hash={self.hash!r})"


class BlobDownloadError(Exception):
def __init__(self, message):
super().__init__(message)
self.message = message


@dataclass
class Datapoint:
datapoint_id: int
Expand Down Expand Up @@ -128,6 +145,7 @@ def from_gql_edge(edge: Dict, datasource: "Datasource", fields: List[MetadataSel

float_fields = {f.name for f in fields if f.valueType == MetadataFieldType.FLOAT}
date_fields = {f.name for f in fields if f.valueType == MetadataFieldType.DATETIME}
blob_fields = {f.name for f in fields if f.valueType == MetadataFieldType.BLOB}

for meta_dict in edge["node"]["metadata"]:
key = meta_dict["key"]
Expand All @@ -138,6 +156,8 @@ def from_gql_edge(edge: Dict, datasource: "Datasource", fields: List[MetadataSel
if key in date_fields:
timezone = meta_dict.get("timeZone")
value = _datetime_from_timestamp(value / 1000, timezone or "+00:00")
elif key in blob_fields and isinstance(value, str):
value = BlobHashMetadata(value)
res.metadata[key] = value
return res

Expand All @@ -164,7 +184,7 @@ def get_blob(self, column: str, cache_on_disk=True, store_value=False) -> bytes:
if type(current_value) is bytes:
# Bytes - it's already there!
return current_value
if isinstance(current_value, Path):
elif isinstance(current_value, Path):
# Path - assume the path exists and is already downloaded,
# because it's unlikely that the user has set it themselves
with current_value.open("rb") as f:
Expand All @@ -173,25 +193,28 @@ def get_blob(self, column: str, cache_on_disk=True, store_value=False) -> bytes:
self.metadata[column] = content
return content

elif type(current_value) is str:
# String - This is probably the hash of the blob, get that from dagshub
blob_url = self.blob_url(current_value)
blob_location = self.blob_cache_location / current_value
elif isinstance(current_value, BlobHashMetadata):
# Blob hash metadata - download blob from DagsHub
blob_url = self.blob_url(current_value.hash)
blob_location = self.blob_cache_location / current_value.hash

# Make sure that the cache location exists
if cache_on_disk:
self.blob_cache_location.mkdir(parents=True, exist_ok=True)

content = _get_blob(blob_url, blob_location, self.datasource.source.repoApi.auth, cache_on_disk, True)
if type(content) is str:
raise RuntimeError(f"Error while downloading blob: {content}")

if store_value:
self.metadata[column] = content
elif cache_on_disk:
self.metadata[column] = blob_location

return content
elif isinstance(current_value, MetadataAnnotations):
ls_task = current_value.to_ls_task()
if ls_task is None:
return b""
return ls_task
else:
raise ValueError(f"Can't extract blob metadata from value {current_value} of type {type(current_value)}")

Expand Down Expand Up @@ -274,10 +297,17 @@ def _get_blob(
"""
Args:
url: url to download the blob from
cache_path: where the cache for the blob is (laods from it if exists, stores there if it doesn't)
cache_path: where the cache for the blob is (loads from it if exists, stores there if it doesn't)
auth: auth to use for getting the blob
cache_on_disk: whether to store the downloaded blob on disk. If False we also turn off the cache checking
return_blob: if True returns the blob of the downloaded data, if False returns the path to the file with it
path_format: if return_blob is False, controls path representation. "path" returns Path, "str" returns str

Returns:
bytes, Path, or str path on success.

Raises:
BlobDownloadError on download failure.
"""
if url is None:
return None
Expand Down Expand Up @@ -313,7 +343,7 @@ def get():
with attempt:
content = get()
except Exception as e:
return f"Error while downloading binary blob: {e}"
raise BlobDownloadError(str(e)) from e

if cache_on_disk:
with cache_path.open("wb") as f:
Expand Down
49 changes: 26 additions & 23 deletions dagshub/data_engine/model/query_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,21 @@
from dagshub.common.rich_util import get_rich_progress
from dagshub.common.util import lazy_load, multi_urljoin
from dagshub.data_engine.annotation import MetadataAnnotations
from dagshub.data_engine.annotation.metadata import UnsupportedMetadataAnnotations
from dagshub.data_engine.annotation.metadata import ErrorMetadataAnnotations, UnsupportedMetadataAnnotations
from dagshub.data_engine.annotation.voxel_conversion import (
add_ls_annotations,
add_voxel_annotations,
)
from dagshub.data_engine.client.loaders.base import DagsHubDataset
from dagshub.data_engine.client.models import DatasourceType, MetadataSelectFieldSchema
from dagshub.data_engine.dtypes import MetadataFieldType
from dagshub.data_engine.model.datapoint import Datapoint, _generated_fields, _get_blob
from dagshub.data_engine.model.datapoint import (
BlobDownloadError,
BlobHashMetadata,
Datapoint,
_generated_fields,
_get_blob,
)
from dagshub.data_engine.model.schema_util import dacite_config
from dagshub.data_engine.voxel_plugin_server.utils import set_voxel_envvars

Expand Down Expand Up @@ -390,10 +396,9 @@ def get_blob_fields(
for dp in self.entries:
for fld in fields:
field_value = dp.metadata.get(fld)
# If field_value is a blob or a path, then ignore, means it's already been downloaded
if not isinstance(field_value, str):
if not isinstance(field_value, BlobHashMetadata):
continue
download_task = (dp, fld, dp.blob_url(field_value), dp.blob_cache_location / field_value)
download_task = (dp, fld, dp.blob_url(field_value.hash), dp.blob_cache_location / field_value.hash)
to_download.append(download_task)

progress = get_rich_progress(rich.progress.MofNCompleteColumn())
Expand All @@ -403,8 +408,6 @@ def get_blob_fields(

def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path):
blob_or_path = _get_blob(url, blob_path, auth, cache_on_disk, load_into_memory, path_format)
if isinstance(blob_or_path, str) and path_format != "str":
logger.warning(f"Error while downloading blob for field {field} in datapoint {dp.path}:{blob_or_path}")
dp.metadata[field] = blob_or_path

with progress:
Expand All @@ -416,7 +419,7 @@ def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path):
logger.warning(f"Got exception {type(exc)} while downloading blob: {exc}")
progress.update(task, advance=1)

self._convert_annotation_fields(*fields, load_into_memory=load_into_memory)
self._convert_annotation_fields(*fields)

# Convert any downloaded document fields
document_fields = [f for f in fields if f in self.document_fields]
Expand All @@ -425,18 +428,17 @@ def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path):
if document_fields:
for dp in self:
for fld in document_fields:
if fld in dp.metadata:
# Defensive check to not mangle annotation fields by accident
if isinstance(dp.metadata[fld], MetadataAnnotations):
continue
# Force load the content into memory, even if load_into_memory was set to False
if not load_into_memory or isinstance(dp.metadata[fld], Path):
dp.metadata[fld] = Path(dp.metadata[fld]).read_bytes()
dp.metadata[fld] = dp.metadata[fld].decode("utf-8")
if fld not in dp.metadata:
continue
try:
content = dp.get_blob(fld)
dp.metadata[fld] = content.decode("utf-8")
except BlobDownloadError as e:
logger.warning(f"Failed to download document field '{fld}' for datapoint '{dp.path}': {e}")

return self

def _convert_annotation_fields(self, *fields, load_into_memory):
def _convert_annotation_fields(self, *fields):
# Convert any downloaded annotation column
annotation_fields = [f for f in fields if f in self.annotation_fields]
if not annotation_fields:
Expand All @@ -448,7 +450,7 @@ def _convert_annotation_fields(self, *fields, load_into_memory):
for dp in self:
for fld in annotation_fields:
metadata_value = dp.metadata.get(fld)
# No value - create ampty annotation container
# No value - create empty annotation container
if metadata_value is None:
dp.metadata[fld] = MetadataAnnotations(datapoint=dp, field=fld)
continue
Expand All @@ -457,16 +459,17 @@ def _convert_annotation_fields(self, *fields, load_into_memory):
continue
# Parse annotation from the content of the field
else:
# Force load the content into memory, even if load_into_memory was set to False
if not load_into_memory or isinstance(dp.metadata[fld], Path):
metadata_value = Path(metadata_value).read_bytes()
try:
annotation_content = dp.get_blob(fld)
dp.metadata[fld] = MetadataAnnotations.from_ls_task(
datapoint=dp, field=fld, ls_task=metadata_value
datapoint=dp, field=fld, ls_task=annotation_content
Comment on lines +463 to +465
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it weird for the dp.get_blob to convert to ls_task (bytes), then reconvert it to MetadataAnnotations here? Or is this for the scenario where dp.get_blob returns bytes directly without going through MetadataAnnotations (loading from disk IIUC)? Feels clumsy, like maybe converting to MetadataAnnotations should happen only here or in dp.get_blob but why in both? In which scenario will elif isinstance(current_value, MetadataAnnotations): in line 213 be true?

Copy link
Member Author

@kbolashev kbolashev Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intentional data flow here is:

  • get blob hashes from backend and make them BlobHashMetadata
  • qr.get_blob_fields is called via autoloading/explicitly by user
  • _get_blob fills in the field's value with, most probably, the raw bytes, but could be other things (e.g. Path if load_into_memory is False)
  • qr._convert_annotation_fields goes over all annotation metadata and tries to convert them
  • During that, in order to get the bytes, dp.get_blob() is called for the field. This ensures that the value in the field is guaranteed to be bytes, and not Path, str, or anything else.

The reason dp.get_blob() converts the annotation to bytes is because that is a publically exposed function, and it makes sense that calling it on metadata would return the raw LS task's bytes. It actually doesn't really have any intention for the main purpose of this PR, and I more just added it as a defensive/interface consistency check.

This part will not be hit if you, for example, try to load and convert an annotation field a second time, because at that point, the conversion would have already happened, and then this type guard prevents doing a double conversion:

elif isinstance(metadata_value, MetadataAnnotations):
continue

)
except BlobDownloadError as e:
dp.metadata[fld] = ErrorMetadataAnnotations(datapoint=dp, field=fld, error_message=e.message)
bad_annotations[fld].append(dp.path)
except ValidationError:
dp.metadata[fld] = UnsupportedMetadataAnnotations(
datapoint=dp, field=fld, original_value=metadata_value
datapoint=dp, field=fld, original_value=annotation_content
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it dangerous to use annotation_content when it's undefined because an error was thrown from dp.get_blob? Or you think it can only be thrown from from_ls_task? Maybe that means the catches should be separate? Just asking questions

Copy link
Member Author

@kbolashev kbolashev Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValidationError is only thrown by the Pydantic code, so on from_ls_task, and that means that get_blob() has already succeeded or thrown a different uncaught error (unlikely file not found, for example).

)
bad_annotations[fld].append(dp.path)

Expand Down
62 changes: 49 additions & 13 deletions tests/data_engine/annotation_import/test_annotation_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
from pytest import MonkeyPatch

from dagshub.data_engine.annotation import MetadataAnnotations
from dagshub.data_engine.annotation.metadata import UnsupportedMetadataAnnotations
from dagshub.data_engine.annotation.metadata import ErrorMetadataAnnotations, UnsupportedMetadataAnnotations
from dagshub.data_engine.dtypes import MetadataFieldType, ReservedTags
from dagshub.data_engine.model import query_result
from dagshub.data_engine.model import datapoint, query_result
from dagshub.data_engine.model.datapoint import BlobDownloadError, BlobHashMetadata
from dagshub.data_engine.model.datasource import Datasource
from dagshub.data_engine.model.query_result import QueryResult
from tests.data_engine.util import add_metadata_field
Expand Down Expand Up @@ -61,12 +62,15 @@ def mock_get_blob(*args, **kwargs) -> Union[bytes, PathLike]:
load_into_memory = args[4]
blob_path = _res_folder / f"{blob_hash}.json"

if not blob_path.exists():
raise FileNotFoundError(f"Mock blob file not found: {blob_path}")
if load_into_memory:
return blob_path.read_bytes()
else:
return blob_path
try:
if not blob_path.exists():
raise FileNotFoundError(f"Blob with hash {blob_hash} not found in res folder")
if load_into_memory:
return blob_path.read_bytes()
else:
return blob_path
except Exception as e:
raise BlobDownloadError(str(e)) from e


def _ds_with_annotation(ds: "Datasource", monkeypatch: MonkeyPatch, annotation_hash: str):
Expand All @@ -82,6 +86,7 @@ def _ds_with_annotation(ds: "Datasource", monkeypatch: MonkeyPatch, annotation_h
)

monkeypatch.setattr(query_result, "_get_blob", mock_get_blob)
monkeypatch.setattr(datapoint, "_get_blob", mock_get_blob)

return ds

Expand All @@ -91,11 +96,6 @@ def ds_with_document_annotation(ds, monkeypatch):
yield _ds_with_annotation(ds, monkeypatch, "annotation1")


@pytest.fixture
def ds_with_unsupported_annotation(ds, monkeypatch):
yield _ds_with_annotation(ds, monkeypatch, "audio_annotation")


def test_annotation_with_document_are_parsed_as_annotation(ds_with_document_annotation):
qr = ds_with_document_annotation.all()
_test_annotation(qr)
Expand All @@ -115,6 +115,11 @@ def _test_annotation(qr: QueryResult):
assert isinstance(annotation.annotations[0], IRSegmentationImageAnnotation)


@pytest.fixture
def ds_with_unsupported_annotation(ds, monkeypatch):
yield _ds_with_annotation(ds, monkeypatch, "audio_annotation")


def test_handling_unsupported_annotation(ds_with_unsupported_annotation):
qr = ds_with_unsupported_annotation.all()

Expand All @@ -132,3 +137,34 @@ def test_handling_unsupported_annotation(ds_with_unsupported_annotation):
expected_content = (_res_folder / "audio_annotation.json").read_bytes()
assert annotation.value == expected_content
assert annotation.to_ls_task() == expected_content


@pytest.fixture
def ds_with_nonexistent_annotation(ds, monkeypatch):
yield _ds_with_annotation(ds, monkeypatch, "nonexistent_annotation")


def test_nonexistent_annotation(ds_with_nonexistent_annotation):
qr = ds_with_nonexistent_annotation.all(load_documents=False, load_annotations=False)
qr.get_annotations()

annotation: MetadataAnnotations = qr[0].metadata[_annotation_field_name]

assert isinstance(annotation, ErrorMetadataAnnotations)
# Error annotation is still a subclass of regular annotation
# This is crucial for logic that checks if annotation metadata was parsed already,
# so if this starts failing, that logic will need to be changed too
assert isinstance(annotation, MetadataAnnotations)

with pytest.raises(NotImplementedError):
annotation.add_image_bbox("cat", 0, 0, 10, 10, 1920, 1080)

with pytest.raises(ValueError, match="Blob with hash nonexistent_annotation not found in res folder"):
_ = annotation.value
with pytest.raises(ValueError, match="Blob with hash nonexistent_annotation not found in res folder"):
annotation.to_ls_task()


def test_blob_metadata_is_wrapped_from_backend(ds_with_document_annotation):
qr = ds_with_document_annotation.all(load_documents=False, load_annotations=False)
assert isinstance(qr[0].metadata[_annotation_field_name], BlobHashMetadata)