Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f8d9b28
switch from property to method
valeriupredoi Dec 16, 2025
59d54b3
start fixing tests
valeriupredoi Dec 16, 2025
554fe6f
more test fixing
valeriupredoi Dec 16, 2025
8c82b69
more test fixing
valeriupredoi Dec 16, 2025
934b013
final test fixing
valeriupredoi Dec 16, 2025
1f47bcb
real s3 axis test
valeriupredoi Dec 16, 2025
b88a552
turn on axis
valeriupredoi Dec 16, 2025
23acfde
add axis to mock expected
valeriupredoi Dec 16, 2025
29fcaee
fix bug
valeriupredoi Dec 16, 2025
7298051
turn on some screen printing
valeriupredoi Dec 16, 2025
5ef6146
ix test
valeriupredoi Dec 16, 2025
f7f6949
final version of not working Reductionist test
valeriupredoi Dec 16, 2025
72f12ac
too many blank lines
valeriupredoi Dec 16, 2025
622f492
Merge branch 'main' into axis_api
valeriupredoi Jan 6, 2026
be78e13
Merge branch 'main' into axis_api
valeriupredoi Jan 20, 2026
d87005a
Change the Reductionist API to return a JSON payload, ths removes all…
maxstack Jan 21, 2026
0a66655
Merge branch 'main' into axis_api
valeriupredoi Jan 21, 2026
b2e6e49
add json import
valeriupredoi Jan 21, 2026
6857d22
add some prints for response and sizeof
valeriupredoi Jan 21, 2026
432fc4f
Use response.json() as Reductionist should return application/json
maxstack Jan 21, 2026
e371cee
Fix Reductionist unit tests
maxstack Jan 22, 2026
88fa26f
add small file test
valeriupredoi Jan 23, 2026
4b4f3ad
toss a print statement
valeriupredoi Jan 23, 2026
dc41f7b
add validation vs numpy
valeriupredoi Jan 23, 2026
ae71547
betterify test
valeriupredoi Jan 23, 2026
4cdb30f
skip tests that chuck in too much memory from stuffy Reductionst resp…
valeriupredoi Jan 26, 2026
cab6c0e
add tests
valeriupredoi Feb 2, 2026
59728e6
add some prints
valeriupredoi Feb 2, 2026
a3a7439
add a print
valeriupredoi Feb 3, 2026
8bdcdcf
Reductionist API updates:
maxstack Feb 3, 2026
3864de0
add cbor2 to deps lists
valeriupredoi Feb 4, 2026
b5b0df3
fix tests
valeriupredoi Feb 4, 2026
d33cefb
remove print
valeriupredoi Feb 4, 2026
bd6b02a
remove print
valeriupredoi Feb 4, 2026
e0c54e0
add todos in test module
valeriupredoi Feb 4, 2026
fc3e628
Reductionist API change moving from S3 centric fields to more generic…
maxstack Feb 6, 2026
f6045ce
Merge branch 'main' into axis_api
valeriupredoi Feb 11, 2026
6d09d59
Enable http/https interface_type with Reductionist.
maxstack Feb 13, 2026
a861fbc
correct handling of load from http
valeriupredoi Feb 13, 2026
34a889c
make test without pytest raiser
valeriupredoi Feb 13, 2026
0c38b26
Just use the URL when talking to http(s) storage backend.
maxstack Feb 13, 2026
2030d8d
Change test storage_type to https
maxstack Feb 13, 2026
a77b286
Set active_storage_url in failing Reductionist test and remove storag…
maxstack Feb 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 80 additions & 51 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from typing import Optional

import aiohttp
import fsspec
import numpy as np
import pyfive
Expand Down Expand Up @@ -83,14 +84,50 @@ def load_from_s3(uri, storage_options=None):
return ds


def load_from_https(uri):
def get_endpoint_url(storage_options):
"""
Return the endpoint_url defined in storage_options, or `None` if not defined.
"""
if storage_options is not None:
endpoint_url = storage_options.get('endpoint_url')
if endpoint_url is not None:
return endpoint_url
client_kwargs = storage_options.get('client_kwargs')
if client_kwargs:
endpoint_url = client_kwargs.get('endpoint_url')
if endpoint_url is not None:
return endpoint_url
return None


def load_from_https(uri, storage_options=None):
"""
Load a pyfive.high_level.Dataset from a
netCDF4 file on an https server (NGINX).
"""
# TODO need to test if NGINX server behind https://
fs = fsspec.filesystem('http')
http_file = fs.open(uri, 'rb')
if storage_options is None:
# for a basic case we can:
# TODO: fallback defaults?
# storage_options = {
# 'username': None,
# 'password': None,
# 'client_kwargs': {'endpoint_url': "http://localhost:8000"}, # final proxy
# }
# or we can simply load the file over https(s) with auth=None:
client_kwargs = {'auth': None}
# This works for both http and https endpoints
fs = fsspec.filesystem('http', **client_kwargs)
http_file = fs.open(uri, 'rb')
else:
username = storage_options.get("username", None)
password = storage_options.get("password", None)
client_kwargs = {
'auth': aiohttp.BasicAuth(username, password) if username and password else None
}
# This works for both http and https endpoints
fs = fsspec.filesystem('http', **client_kwargs)
http_file = fs.open(uri, 'rb')

ds = pyfive.File(http_file)
print(f"Dataset loaded from https with Pyfive: {uri}")
return ds
Expand Down Expand Up @@ -272,9 +309,10 @@ def __load_nc_file(self):
elif self.storage_type == "s3":
nc = load_from_s3(self.uri, self.storage_options)
elif self.storage_type == "https":
nc = load_from_https(self.uri)
nc = load_from_https(self.uri, self.storage_options)
self.filename = self.uri
self.ds = nc[ncvar]
print("Loaded dataset", self.ds)

def __get_missing_attributes(self):
if self.ds is None:
Expand Down Expand Up @@ -365,19 +403,22 @@ def method(self, value):

self._method = value

@property
def mean(self):
def mean(self, axis=None):
self._method = "mean"
if axis is not None:
self._axis = axis
return self

@property
def min(self):
def min(self, axis=None):
self._method = "min"
if axis is not None:
self._axis = axis
return self

@property
def max(self):
def max(self, axis=None):
self._method = "max"
if axis is not None:
self._axis = axis
return self

@property
Expand Down Expand Up @@ -498,6 +539,15 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
session = reductionist.get_session(S3_ACCESS_KEY,
S3_SECRET_KEY,
S3_ACTIVE_STORAGE_CACERT)
elif self.storage_type == "https" and self._version == 2:
username, password = None, None
if self.storage_options is not None:
username = self.storage_options.get("username", None)
password = self.storage_options.get("password", None)
if username and password:
session = reductionist.get_session(username, password, None)
else:
session = reductionist.get_session(None, None, None)
else:
session = None

Expand Down Expand Up @@ -585,16 +635,10 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,

def _get_endpoint_url(self):
"""Return the endpoint_url of an S3 object store, or `None`"""
endpoint_url = self.storage_options.get('endpoint_url')
endpoint_url = get_endpoint_url(self.storage_options)
if endpoint_url is not None:
return endpoint_url

client_kwargs = self.storage_options.get('client_kwargs')
if client_kwargs:
endpoint_url = client_kwargs.get('endpoint_url')
if endpoint_url is not None:
return endpoint_url

return f"http://{urllib.parse.urlparse(self.filename).netloc}"

def _process_chunk(self,
Expand Down Expand Up @@ -659,9 +703,7 @@ def _process_chunk(self,
# Reductionist returns "count" as a list even for single elements
tmp, count = reductionist.reduce_chunk(session,
S3_ACTIVE_STORAGE_URL,
S3_URL,
bucket,
object,
f"{S3_URL}/{bucket}/{object}",
offset,
size,
compressor,
Expand All @@ -687,9 +729,7 @@ def _process_chunk(self,
tmp, count = reductionist.reduce_chunk(
session,
self.active_storage_url,
self._get_endpoint_url(),
bucket,
object,
f"{self._get_endpoint_url()}/{bucket}/{object}",
offset,
size,
compressor,
Expand All @@ -705,35 +745,24 @@ def _process_chunk(self,
# located files; after that, we can pipe any regular https file through
# to Reductionist, provided the https server is "closer" to Reductionist
elif self.storage_type == "https" and self._version == 2:
# build a simple session
session = requests.Session()
session.auth = (None, None)
session.verify = False
bucket = "https" # really doesn't matter

# note the extra "storage_type" kwarg
# this currently makes Reductionist throw a wobbly
# E activestorage.reductionist.ReductionistError: Reductionist error: HTTP 400: {"error": {"message": "request data is not valid", "caused_by": ["Failed to deserialize the JSON body into the target type", "storage_type: unknown field `storage_type`, expected one of `source`, `bucket`, `object`, `dtype`, `byte_order`, `offset`, `size`, `shape`, `order`, `selection`, `compression`, `filters`, `missing` at line 1 column 550"]}} # noqa

# Reductionist returns "count" as a list even for single elements
tmp, count = reductionist.reduce_chunk(
session,
"https://reductionist.jasmin.ac.uk/", # Wacasoft
self.filename,
bucket,
self.filename,
offset,
size,
compressor,
filters,
self.missing,
np.dtype(ds.dtype),
chunks,
ds._order,
chunk_selection,
axis,
operation=self._method,
storage_type="https")
tmp, count = reductionist.reduce_chunk(session,
self.active_storage_url,
f"{self.uri}",
offset,
size,
compressor,
filters,
self.missing,
np.dtype(ds.dtype),
chunks,
ds._order,
chunk_selection,
axis,
operation=self._method,
storage_type="https")

elif self.storage_type == 'ActivePosix' and self.version == 2:
# This is where the DDN Fuse and Infinia wrappers go
raise NotImplementedError
Expand Down
37 changes: 15 additions & 22 deletions activestorage/reductionist.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Reductionist S3 Active Storage server storage interface module."""

import cbor2 as cbor
import collections.abc
import http.client
import json
Expand All @@ -10,7 +11,7 @@
import numpy as np
import requests

REDUCTIONIST_AXIS_READY = False
REDUCTIONIST_AXIS_READY = True

DEBUG = 0

Expand All @@ -31,9 +32,7 @@ def get_session(username: str, password: str,

def reduce_chunk(session,
server,
source,
bucket,
object,
url,
offset,
size,
compression,
Expand All @@ -50,9 +49,7 @@ def reduce_chunk(session,

:param server: Reductionist server URL
:param cacert: Reductionist CA certificate path
:param source: S3 URL
:param bucket: S3 bucket
:param object: S3 object
:param url: object URL
:param offset: offset of data in object
:param size: size of data in object
:param compression: optional `numcodecs.abc.Codec` compression codec
Expand All @@ -74,9 +71,7 @@ def reduce_chunk(session,
:raises ReductionistError: if the request to Reductionist fails
"""

request_data = build_request_data(source,
bucket,
object,
request_data = build_request_data(url,
offset,
size,
compression,
Expand All @@ -88,10 +83,11 @@ def reduce_chunk(session,
chunk_selection,
axis,
storage_type=storage_type)
print(f"Reductionist request data dictionary: {request_data}")
if DEBUG:
print(f"Reductionist request data dictionary: {request_data}")
api_operation = "sum" if operation == "mean" else operation or "select"
url = f'{server}/v1/{api_operation}/'
url = f'{server}/v2/{api_operation}/'
response = request(session, url, request_data)

if response.ok:
Expand Down Expand Up @@ -174,9 +170,7 @@ def encode_missing(missing):
assert False, "Expected missing values not found"


def build_request_data(source: str,
bucket: str,
object: str,
def build_request_data(url: str,
offset: int,
size: int,
compression,
Expand All @@ -190,15 +184,13 @@ def build_request_data(source: str,
storage_type=None) -> dict:
"""Build request data for Reductionist API."""
request_data = {
'source': source,
'bucket': bucket,
'object': object,
'interface_type': storage_type if storage_type else "s3",
'url': url,
'dtype': dtype.name,
'byte_order': encode_byte_order(dtype),
'offset': int(offset),
'size': int(size),
'order': order,
'storage_type': storage_type,
}
if shape:
request_data["shape"] = shape
Expand Down Expand Up @@ -234,15 +226,16 @@ def request(session: requests.Session, url: str, request_data: dict):

def decode_result(response):
"""Decode a successful response, return as a 2-tuple of (numpy array or scalar, count)."""
dtype = response.headers['x-activestorage-dtype']
shape = json.loads(response.headers['x-activestorage-shape'])
reduction_result = cbor.loads(response.content)
dtype = reduction_result['dtype']
shape = reduction_result['shape'] if "shape" in reduction_result else None

# Result
result = np.frombuffer(response.content, dtype=dtype)
result = np.frombuffer(reduction_result['bytes'], dtype=dtype)
result = result.reshape(shape)

# Counts
count = json.loads(response.headers['x-activestorage-count'])
count = reduction_result['count']
# TODO: When reductionist is ready, we need to fix 'count'

# Mask the result
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ channels:
dependencies:
- python >=3.10
- pyfive >=0.5.0 # earliest support for advanced Pyfive
- cbor2
- fsspec
- h5netcdf
- netcdf4
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dynamic = [
"version",
]
dependencies = [
"cbor2",
"fsspec",
"h5netcdf",
"netcdf4",
Expand Down
12 changes: 6 additions & 6 deletions tests/test_bigger_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def test_cl_mean(tmp_path):
active = Active(ncfile, "cl", storage_type=utils.get_storage_type())
active._version = 2
active.components = True
result2 = active.mean[4:5, 1:2]
result2 = active.mean()[4:5, 1:2]
print(result2, ncfile)
# expect {'sum': array([[[[264.]]]], dtype=float32), 'n': array([[[[12]]]])}
# check for typing and structure
Expand All @@ -151,7 +151,7 @@ def test_cl_min(tmp_path):
ncfile = save_cl_file_with_a(tmp_path)
active = Active(ncfile, "cl", storage_type=utils.get_storage_type())
active._version = 2
result2 = active.min[4:5, 1:2]
result2 = active.min()[4:5, 1:2]
np.testing.assert_array_equal(result2,
np.array([[[[22.]]]], dtype="float32"))

Expand All @@ -160,7 +160,7 @@ def test_cl_max(tmp_path):
ncfile = save_cl_file_with_a(tmp_path)
active = Active(ncfile, "cl", storage_type=utils.get_storage_type())
active._version = 2
result2 = active.max[4:5, 1:2]
result2 = active.max()[4:5, 1:2]
np.testing.assert_array_equal(result2,
np.array([[[[22.]]]], dtype="float32"))

Expand All @@ -169,7 +169,7 @@ def test_cl_global_max(tmp_path):
ncfile = save_cl_file_with_a(tmp_path)
active = Active(ncfile, "cl", storage_type=utils.get_storage_type())
active._version = 2
result2 = active.max[:]
result2 = active.max()[:]
np.testing.assert_array_equal(result2,
np.array([[[[22.]]]], dtype="float32"))

Expand All @@ -192,7 +192,7 @@ def test_ps(tmp_path):
active = Active(ncfile, "ps", storage_type=utils.get_storage_type())
active._version = 2
active.components = True
result2 = active.mean[4:5, 1:2]
result2 = active.mean()[4:5, 1:2]
print(result2, ncfile)
# expect {'sum': array([[[22.]]]), 'n': array([[[4]]])}
# check for typing and structure
Expand Down Expand Up @@ -381,7 +381,7 @@ def test_daily_data_masked_two_stats(test_data_path):
# first a mean
active = Active(uri, "ta", storage_type=utils.get_storage_type())
active._version = 2
result2 = active.min[:]
result2 = active.min()[:]
assert result2 == 245.0020751953125

# then recycle Active object for something else
Expand Down
Loading
Loading