Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions bench/ndarray/cumsum_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#######################################################################
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
#######################################################################

# Benchmark to compare NumPy and Blosc2 cumulative_sum for large arrays

import blosc2
import numpy as np
from time import time
import matplotlib.pyplot as plt

blosc2_dt = []
np_dt = []
arr_size = []
sizes = (np.array([1, 2, 4, 8, 16]) * 1024 ** 3 / 8)**(1/3)
for N in sizes:
shape = (int(N),) * 3
arr = blosc2.arange(0, np.prod(shape), shape=shape, dtype=np.float64)
dt = 0
for axis in (0, 1, 2):
tic = time()
res = blosc2.cumulative_sum(arr, axis=axis)
toc = time()
dt += (toc-tic) / 3
blosc2_dt += [dt]

arr = arr[()]
dt = 0
for axis in (0, 1, 2):
tic = time()
res = np.cumulative_sum(arr, axis=axis)
toc = time()
dt += (toc-tic) / 3
np_dt += [dt]
arr_size += [round(arr.dtype.itemsize * np.prod(shape) / 1024**3, 1)]

results = {'blosc2': blosc2_dt, 'numpy': np_dt, 'sizes': arr_size}


blosc2_dt = results['blosc2']
np_dt = results['numpy']
arr_size = results['sizes']
w = 0.2
x = np.arange(len(arr_size))
plt.bar(x, blosc2_dt, width = w, label='Blosc2')
plt.bar(x + w, np_dt, width=w, label='Numpy')
plt.gca().set_yscale('log')
plt.xticks(x, arr_size)
plt.xlabel('Array size (GB)')
plt.ylabel('Average Time (s)')
plt.title(f'Cumulative_sum for 3D array')
plt.legend()
plt.savefig('cumsumbench.png', format='png')
40 changes: 32 additions & 8 deletions src/blosc2/lazyexpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2272,12 +2272,24 @@ def reduce_slices( # noqa: C901
result = reduce_op.value.reduce(result, **reduce_args)

if not out_init:
out_ = convert_none_out(result.dtype, reduce_op, reduced_shape)
# if cumsum/cumprod and arrays large, return blosc2 array with same chunks
chunks_out = (
chunks
if np.prod(reduced_shape) * np.dtype(dtype).itemsize > 4 * blosc2.MAX_FAST_PATH_SIZE
else None
)
chunks_out = chunks_out if _slice == () else None
out_ = convert_none_out(result.dtype, reduce_op, reduced_shape, chunks=chunks_out)
if out is not None:
out[:] = out_
del out_
else:
out = out_
behaved = (
False
if not hasattr(out, "chunks")
else blosc2.are_partitions_behaved(out.shape, out.chunks, out.blocks)
)
out_init = True

# res_out only used be argmin/max and cumulative_sum/prod which only accept axis=int argument
Expand Down Expand Up @@ -2309,7 +2321,12 @@ def reduce_slices( # noqa: C901
else: # CUMULATIVE_PROD
result *= res_out_
res_out_ = result[idx_lastval]
out[reduced_slice] = result
if behaved and result.shape == out.chunks and result.dtype == out.dtype and _slice == ():
# Fast path
# TODO: Check this only works when slice is () as nchunk is incorrect for out otherwise
out.schunk.update_data(nchunk, result, copy=False)
else:
out[reduced_slice] = result
else:
out[reduced_slice] = reduce_op.value(out[reduced_slice], result)

Expand Down Expand Up @@ -2358,15 +2375,22 @@ def _get_res_out(reduced_shape, axis, dtype, reduce_op):
return res_out_


def convert_none_out(dtype, reduce_op, reduced_shape):
def convert_none_out(dtype, reduce_op, reduced_shape, chunks=None):
reduced_shape = (1,) if reduced_shape == () else reduced_shape
# out will be a proper numpy.ndarray
if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM, ReduceOp.PROD, ReduceOp.CUMULATIVE_PROD}:
out = (
np.zeros(reduced_shape, dtype=dtype)
if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM}
else np.ones(reduced_shape, dtype=dtype)
)
if reduce_op in (ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD) and chunks is not None:
out = (
blosc2.zeros(reduced_shape, dtype=dtype, chunks=chunks)
if reduce_op == ReduceOp.CUMULATIVE_SUM
else blosc2.ones(reduced_shape, dtype=dtype, chunks=chunks)
)
else:
out = (
np.zeros(reduced_shape, dtype=dtype)
if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM}
else np.ones(reduced_shape, dtype=dtype)
)
elif reduce_op == ReduceOp.MIN:
if np.issubdtype(dtype, np.integer):
out = np.iinfo(dtype).max * np.ones(reduced_shape, dtype=dtype)
Expand Down
8 changes: 3 additions & 5 deletions tests/ndarray/test_reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,14 @@ def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis):
assert np.allclose(res, nres)

# Try with a slice
b = blosc2.linspace(0, 1, blocks=blocks, chunks=chunks, shape=shape, dtype=a.dtype)
nb = b[:]
slice_ = (slice(5, 7),)
if reduce_op in {"cumulative_sum", "cumulative_prod"}:
axis = 0 if axis is None else axis
oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod"
nres = eval(f"{oploc}((na + nb)[{slice_}], axis={axis})")
nres = eval(f"{oploc}((na - .1)[{slice_}], axis={axis})")
else:
nres = getattr((na + nb)[slice_], reduce_op)(axis=axis)
res = getattr(a + b, reduce_op)(axis=axis, item=slice_)
nres = getattr((na - 0.1)[slice_], reduce_op)(axis=axis)
res = getattr(a - 0.1, reduce_op)(axis=axis, item=slice_)
assert np.allclose(res, nres)


Expand Down
1 change: 0 additions & 1 deletion tests/ndarray/test_resize.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def test_resize(shape, new_shape, chunks, blocks, fill_value):

a.resize(new_shape)
assert a.shape == new_shape

slices = tuple(slice(s) for s in shape)
for i in np.nditer(a[slices]):
assert i == fill_value
Expand Down
Loading