Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 93 additions & 14 deletions python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ def test_supported_types(self):
float=pdf.float * 2,
double=pdf.double * 2,
decim=pdf.decim * 2,
bool=False if pdf.bool else True,
bool=(
(False if pdf.bool else True)
if LooseVersion(pd.__version__) < "3.0.0"
else (~pdf.bool)
),
str=pdf.str + "there",
array=pdf.array,
bin=pdf.bin,
Expand All @@ -139,7 +143,11 @@ def test_supported_types(self):
float=pdf.float * 2,
double=pdf.double * 2,
decim=pdf.decim * 2,
bool=False if pdf.bool else True,
bool=(
(False if pdf.bool else True)
if LooseVersion(pd.__version__) < "3.0.0"
else (~pdf.bool)
),
str=pdf.str + "there",
array=pdf.array,
bin=pdf.bin,
Expand All @@ -159,7 +167,11 @@ def test_supported_types(self):
float=pdf.float * 2,
double=pdf.double * 2,
decim=pdf.decim * 2,
bool=False if pdf.bool else True,
bool=(
(False if pdf.bool else True)
if LooseVersion(pd.__version__) < "3.0.0"
else (~pdf.bool)
),
str=pdf.str + "there",
array=pdf.array,
bin=pdf.bin,
Expand All @@ -170,7 +182,15 @@ def test_supported_types(self):
)

result1 = df.groupby("id").apply(udf1).sort("id").toPandas()
expected1 = df.toPandas().groupby("id").apply(udf1.func).reset_index(drop=True)

pdf = df.toPandas()
if LooseVersion(pd.__version__) < "3.0.0":
grouped_pdf = pdf.groupby("id")
else:
# pandas 3+ GroupBy.apply drops grouping columns when grouped by
# the same DataFrame column, so use a copied Series instead.
grouped_pdf = pdf.groupby(pdf.id.copy())
expected1 = grouped_pdf.apply(udf1.func).reset_index(drop=True)

result2 = df.groupby("id").apply(udf2).sort("id").toPandas()
expected2 = expected1
Expand All @@ -196,7 +216,16 @@ def test_array_type_correct(self):
udf = pandas_udf(lambda pdf: pdf, output_schema, PandasUDFType.GROUPED_MAP)

result = df.groupby("id").apply(udf).sort("id").toPandas()
expected = df.toPandas().groupby("id").apply(udf.func).reset_index(drop=True)

pdf = df.toPandas()
if LooseVersion(pd.__version__) < "3.0.0":
grouped_pdf = pdf.groupby("id", as_index=False)
else:
# pandas 3+ GroupBy.apply drops grouping columns when grouped by
# the same DataFrame column, so use a copied Series instead.
grouped_pdf = pdf.groupby(pdf.id.copy(), as_index=False)
expected = grouped_pdf.apply(udf.func).reset_index(drop=True)

assert_frame_equal(expected, result)

def test_register_grouped_map_udf(self):
Expand Down Expand Up @@ -226,7 +255,16 @@ def foo(pdf):
return pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id)

result = df.groupby("id").apply(foo).sort("id").toPandas()
expected = df.toPandas().groupby("id").apply(foo.func).reset_index(drop=True)

pdf = df.toPandas()
if LooseVersion(pd.__version__) < "3.0.0":
grouped_pdf = pdf.groupby("id")
else:
# pandas 3+ GroupBy.apply drops grouping columns when grouped by
# the same DataFrame column, so use a copied Series instead.
grouped_pdf = pdf.groupby(pdf.id.copy())
expected = grouped_pdf.apply(foo.func).reset_index(drop=True)

assert_frame_equal(expected, result)

def test_coerce(self):
Expand All @@ -235,7 +273,15 @@ def test_coerce(self):
foo = pandas_udf(lambda pdf: pdf, "id long, v double", PandasUDFType.GROUPED_MAP)

result = df.groupby("id").apply(foo).sort("id").toPandas()
expected = df.toPandas().groupby("id").apply(foo.func).reset_index(drop=True)

pdf = df.toPandas()
if LooseVersion(pd.__version__) < "3.0.0":
grouped_pdf = pdf.groupby("id")
else:
# pandas 3+ GroupBy.apply drops grouping columns when grouped by
# the same DataFrame column, so use a copied Series instead.
grouped_pdf = pdf.groupby(pdf.id.copy())
expected = grouped_pdf.apply(foo.func).reset_index(drop=True)
expected = expected.assign(v=expected.v.astype("float64"))
assert_frame_equal(expected, result)

Expand Down Expand Up @@ -418,7 +464,16 @@ def test_datatype_string(self):
)

result = df.groupby("id").apply(foo_udf).sort("id").toPandas()
expected = df.toPandas().groupby("id").apply(foo_udf.func).reset_index(drop=True)

pdf = df.toPandas()
if LooseVersion(pd.__version__) < "3.0.0":
grouped_pdf = pdf.groupby("id")
else:
# pandas 3+ GroupBy.apply drops grouping columns when grouped by
# the same DataFrame column, so use a copied Series instead.
grouped_pdf = pdf.groupby(pdf.id.copy())
expected = grouped_pdf.apply(foo_udf.func).reset_index(drop=True)

assert_frame_equal(expected, result)

def test_wrong_return_type(self):
Expand Down Expand Up @@ -552,9 +607,14 @@ def foo3(key, pdf):

# Test groupby column
result1 = df.groupby("id").apply(udf1).sort("id", "v").toPandas()
if LooseVersion(pd.__version__) < "3.0.0":
grouped_pdf = pdf.groupby("id", as_index=False)
else:
# pandas 3+ GroupBy.apply drops grouping columns when grouped by
# the same DataFrame column, so use a copied Series instead.
grouped_pdf = pdf.groupby(pdf.id.copy(), as_index=False)
expected1 = (
pdf.groupby("id", as_index=False)
.apply(lambda x: udf1.func((x.id.iloc[0],), x))
grouped_pdf.apply(lambda x: udf1.func((x.id.iloc[0],), x))
.sort_values(["id", "v"])
.reset_index(drop=True)
)
Expand All @@ -572,9 +632,14 @@ def foo3(key, pdf):

# Test complex groupby
result3 = df.groupby(df.id, df.v % 2).apply(udf2).sort("id", "v").toPandas()
if LooseVersion(pd.__version__) < "3.0.0":
grouped_pdf = pdf.groupby([pdf.id, pdf.v % 2], as_index=False)
else:
# pandas 3+ GroupBy.apply drops grouping columns when grouped by
# the same DataFrame column, so copy the id grouper instead.
grouped_pdf = pdf.groupby([pdf.id.copy(), pdf.v % 2], as_index=False)
expected3 = (
pdf.groupby([pdf.id, pdf.v % 2], as_index=False)
.apply(
grouped_pdf.apply(
lambda x: udf2.func(
(
x.id.iloc[0],
Expand Down Expand Up @@ -606,7 +671,14 @@ def rename_pdf(pdf, names):

df = self.data
grouped_df = df.groupby("id")
grouped_pdf = df.toPandas().groupby("id", as_index=False)

pdf = df.toPandas()
if LooseVersion(pd.__version__) < "3.0.0":
grouped_pdf = pdf.groupby("id", as_index=False)
else:
# pandas 3+ GroupBy.apply drops grouping columns when grouped by
# the same DataFrame column, so use a copied Series instead.
grouped_pdf = pdf.groupby(pdf.id.copy(), as_index=False)

# Function returns a pdf with required column names, but order could be arbitrary using dict
def change_col_order(pdf):
Expand Down Expand Up @@ -1415,8 +1487,15 @@ def foo(pdf):
return pdf.assign(v1=pdf.v * pdf.id * 1.0)

df = self.data

pdf = df.toPandas()
expected = pdf.groupby("id", as_index=False).apply(foo.func).reset_index(drop=True)
if LooseVersion(pd.__version__) < "3.0.0":
grouped_pdf = pdf.groupby("id", as_index=False)
else:
# pandas 3+ GroupBy.apply drops grouping columns when grouped by
# the same DataFrame column, so use a copied Series instead.
grouped_pdf = pdf.groupby(pdf.id.copy(), as_index=False)
expected = grouped_pdf.apply(foo.func).reset_index(drop=True)

for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
Expand Down