-
Notifications
You must be signed in to change notification settings - Fork 8
/
test_zarr_spark.py
97 lines (86 loc) · 3.41 KB
/
test_zarr_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import logging
import numpy as np
import unittest
from pyspark.sql import SparkSession
from zarr_spark import repartition_chunks
class TestZarrSpark(unittest.TestCase):
# based on https://blog.cambridgespark.com/unit-testing-with-pyspark-fb31671b1ad8
@classmethod
def suppress_py4j_logging(cls):
logger = logging.getLogger("py4j")
logger.setLevel(logging.WARN)
@classmethod
def create_testing_pyspark_session(cls):
return (
SparkSession.builder.master("local[2]")
.appName("my-local-testing-pyspark-context")
.getOrCreate()
)
@classmethod
def setUpClass(cls):
cls.suppress_py4j_logging()
cls.spark = cls.create_testing_pyspark_session()
cls.sc = cls.spark.sparkContext
@classmethod
def tearDownClass(cls):
cls.spark.stop()
def test_repartition_chunks_no_op(self):
old_rows = [
np.array([[1.0], [2.0], [3.0]]),
np.array([[4.0], [5.0], [6.0]]),
np.array([[7.0], [8.0]]),
]
old_rows_rdd = self.sc.parallelize(old_rows, len(old_rows))
new_rows_rdd = repartition_chunks(self.sc, old_rows_rdd, (3, 1))
new_rows = new_rows_rdd.collect()
for i in range(len(old_rows)):
self.assertTrue(np.array_equal(new_rows[i], old_rows[i]))
def test_repartition_chunks_uneven(self):
old_rows = [
np.array([[1.0], [2.0], [3.0], [4.0]]),
np.array([[5.0], [6.0], [7.0]]),
np.array([[8.0], [9.0], [10.0], [11.0]]),
]
old_rows_rdd = self.sc.parallelize(old_rows, len(old_rows))
new_rows_rdd = repartition_chunks(self.sc, old_rows_rdd, (3, 1))
new_rows = new_rows_rdd.collect()
new_rows_expected = [
np.array([[1.0], [2.0], [3.0]]),
np.array([[4.0], [5.0], [6.0]]),
np.array([[7.0], [8.0], [9.0]]),
np.array([[10.0], [11.0]]),
]
for i in range(len(new_rows_expected)):
self.assertTrue(np.array_equal(new_rows[i], new_rows_expected[i]))
def test_repartition_chunks_subdivide(self):
old_rows = [
np.array([[1.0], [2.0], [3.0], [4.0]]),
np.array([[5.0], [6.0], [7.0], [8.0]]),
]
old_rows_rdd = self.sc.parallelize(old_rows, len(old_rows))
new_rows_rdd = repartition_chunks(self.sc, old_rows_rdd, (2, 1))
new_rows = new_rows_rdd.collect()
new_rows_expected = [
np.array([[1.0], [2.0]]),
np.array([[3.0], [4.0]]),
np.array([[5.0], [6.0]]),
np.array([[7.0], [8.0]]),
]
for i in range(len(new_rows_expected)):
self.assertTrue(np.array_equal(new_rows[i], new_rows_expected[i]))
def test_repartition_chunks_coalesce(self):
old_rows = [
np.array([[1.0], [2.0]]),
np.array([[3.0], [4.0]]),
np.array([[5.0], [6.0]]),
np.array([[7.0], [8.0]]),
]
old_rows_rdd = self.sc.parallelize(old_rows, len(old_rows))
new_rows_rdd = repartition_chunks(self.sc, old_rows_rdd, (4, 1))
new_rows = new_rows_rdd.collect()
new_rows_expected = [
np.array([[1.0], [2.0], [3.0], [4.0]]),
np.array([[5.0], [6.0], [7.0], [8.0]]),
]
for i in range(len(new_rows_expected)):
self.assertTrue(np.array_equal(new_rows[i], new_rows_expected[i]))