Skip to content

Commit

Permalink
TEST-modin-project#2290: add class scope fixture
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Myskov <alexander.myskov@intel.com>
  • Loading branch information
amyskov committed Dec 1, 2020
1 parent 3f33825 commit eefd016
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 31 deletions.
114 changes: 83 additions & 31 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
IO_OPS_DATA_DIR,
io_ops_bad_exc,
eval_io_from_str,
SharedVars,
)

from modin.config import Engine, Backend, IsExperimental
Expand All @@ -66,6 +67,7 @@
TEST_FWF_FILENAME = "test_fwf.txt"
TEST_GBQ_FILENAME = "test_gbq."
SMALL_ROW_SIZE = 2000
READ_CSV_SHARED_DATA_FILE = "read_csv_shared_data.txt"


if not os.path.exists(IO_OPS_DATA_DIR):
Expand Down Expand Up @@ -298,6 +300,78 @@ def make_csv_file():
pass


@pytest.fixture(scope="class")
def TestReadCSVFixture(worker_id):
# Process shared variables are needed because `xdist` spawns
# workers in separate processes with separate namespaces
shared_vars = SharedVars(
READ_CSV_SHARED_DATA_FILE,
{
"setup_started": False,
"setup_completed": False,
"teardowned_workers": [],
"filenames": [],
"csvs_names": {},
},
)
if not shared_vars.get_var_value("setup_started"):
shared_vars.set_var_value("setup_started", True)
filenames = []
files_ids = [
"test_read_csv_regular",
"test_read_csv_blank_lines",
"test_read_csv_yes_no",
]
pytest.csvs_names = {
file_id: get_unique_filename(file_id, debug_mode=True)
for file_id in files_ids
}
# test_read_csv_col_handling, test_read_csv_parsing
_make_csv_file(filenames)(
filename=pytest.csvs_names["test_read_csv_regular"],
)
# test_read_csv_parsing
_make_csv_file(filenames)(
filename=pytest.csvs_names["test_read_csv_yes_no"],
additional_col_values=["Yes", "true", "No", "false"],
)
# test_read_csv_col_handling
_make_csv_file(filenames)(
filename=pytest.csvs_names["test_read_csv_blank_lines"],
add_blank_lines=True,
)
filenames.extend(
[READ_CSV_SHARED_DATA_FILE, f"{READ_CSV_SHARED_DATA_FILE}.lock"]
)
shared_vars.set_var_value("setup_completed", True)
shared_vars.set_var_value("csvs_names", pytest.csvs_names)
shared_vars.set_var_value("filenames", filenames)

else:
# wait until first spawned worker finishes fixture setup
import time

while not shared_vars.get_var_value("setup_completed"):
time.sleep(1)

pytest.csvs_names = shared_vars.get_var_value("csvs_names")

yield
shared_vars.set_var_value("teardowned_workers", worker_id, append=True)

# execute fixture teardown only if all workers finished their tasks
if len(shared_vars.get_var_value("teardowned_workers")) == int(
os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1")
):
# Delete csv files that were created
for filename in shared_vars.get_var_value("filenames"):
if os.path.exists(filename):
try:
os.remove(filename)
except PermissionError:
pass


def setup_json_file(row_size, force=False):
if os.path.exists(TEST_JSON_FILENAME) and not force:
pass
Expand Down Expand Up @@ -496,6 +570,7 @@ def teardown_fwf_file():
pass


@pytest.mark.usefixtures("TestReadCSVFixture")
@pytest.mark.skipif(
IsExperimental.get() and Backend.get() == "Pyarrow",
reason="Segmentation fault; see PR #2347 ffor details",
Expand Down Expand Up @@ -546,7 +621,6 @@ def test_read_csv_delimiters(
@pytest.mark.parametrize("skip_blank_lines", [True, False])
def test_read_csv_col_handling(
self,
make_csv_file,
request,
header,
index_col,
Expand All @@ -569,13 +643,8 @@ def test_read_csv_col_handling(
"skip_blank_lines": skip_blank_lines,
}

unique_name = get_unique_filename()
make_csv_file(
filename=unique_name,
add_blank_lines=True,
)
eval_io(
filepath_or_buffer=unique_name,
filepath_or_buffer=pytest.csvs_names["test_read_csv_blank_lines"],
fn_name="read_csv",
**kwargs,
)
Expand All @@ -597,7 +666,6 @@ def test_read_csv_col_handling(
@pytest.mark.parametrize("skipfooter", [0, 10])
def test_read_csv_parsing_1(
self,
make_csv_file,
request,
dtype,
engine,
Expand All @@ -615,17 +683,16 @@ def test_read_csv_parsing_1(
"skipfooter": skipfooter,
}

unique_name = get_unique_filename()
make_csv_file(
filename=unique_name,
)
if kwargs["dtype"]:
kwargs["dtype"] = {
col: "object" for col in pandas.read_csv(unique_name, nrows=1).columns
col: "object"
for col in pandas.read_csv(
pytest.csvs_names["test_read_csv_regular"], nrows=1
).columns
}

eval_io(
filepath_or_buffer=unique_name,
filepath_or_buffer=pytest.csvs_names["test_read_csv_regular"],
fn_name="read_csv",
check_exception_type=None, # issue #2320
raising_exceptions=None,
Expand All @@ -641,7 +708,6 @@ def test_read_csv_parsing_1(
@pytest.mark.parametrize("names", [["c1", "c2", "c3", "c4"], None])
def test_read_csv_parsing_2(
self,
make_csv_file,
request,
true_values,
false_values,
Expand All @@ -665,16 +731,8 @@ def test_read_csv_parsing_2(
"names": names,
}

unique_name = get_unique_filename()
make_csv_file(
filename=unique_name,
additional_col_values=["Yes", "true", "No", "false"]
if true_values or false_values
else None,
)

eval_io(
filepath_or_buffer=unique_name,
filepath_or_buffer=pytest.csvs_names["test_read_csv_yes_no"],
fn_name="read_csv",
check_exception_type=None, # issue #2320
raising_exceptions=None,
Expand Down Expand Up @@ -746,7 +804,6 @@ def test_read_csv_mangle_dupe_cols(self):
@pytest.mark.parametrize("cache_dates", [True, False])
def test_read_csv_datetime(
self,
make_csv_file,
request,
parse_dates,
infer_datetime_format,
Expand Down Expand Up @@ -775,13 +832,8 @@ def test_read_csv_datetime(
"cache_dates": cache_dates,
}

unique_name = get_unique_filename()
make_csv_file(
filename=unique_name,
)

eval_io(
filepath_or_buffer=unique_name,
filepath_or_buffer=pytest.csvs_names["test_read_csv_regular"],
fn_name="read_csv",
check_kwargs_callable=not callable(date_parser),
raising_exceptions=raising_exceptions,
Expand Down
35 changes: 35 additions & 0 deletions modin/pandas/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import os
from string import ascii_letters
import csv
import json
from filelock import FileLock

random_state = np.random.RandomState(seed=42)

Expand Down Expand Up @@ -999,3 +1001,36 @@ def insert_lines_to_csv(
**csv_reader_writer_params,
)
writer.writerows(lines)


class SharedVars:
"""implements variables that can be shared among processes"""

def __init__(self, shared_file_path: str, data: dict):
self._shared_file_path = shared_file_path
self._lock = FileLock(f"{self._shared_file_path}.lock")
if not os.path.exists(self._shared_file_path):
with self._lock:
self._write_data(data)

def _write_data(self, data):
open(self._shared_file_path, "a").close()
with open(self._shared_file_path, "w") as f:
json.dump(data, f)

def get_var_value(self, var):
with self._lock:
with open(self._shared_file_path) as f:
data = json.load(f)
return data[var]

def set_var_value(self, var, value, append=False):
with self._lock:
with open(self._shared_file_path) as f:
data = json.load(f)
if append:
assert isinstance(data[var], list)
data[var].append(value)
else:
data[var] = value
self._write_data(data)

0 comments on commit eefd016

Please sign in to comment.