Skip to content

Commit

Permalink
[BEAM-12071] Don't re-use WriteToPandasSink instances across windows (a…
Browse files Browse the repository at this point in the history
…pache#14374)

* Add (failing) windowed write test

* Dont re-use pandas sink instances across windows
  • Loading branch information
TheNeuralBit authored Mar 31, 2021
1 parent 52c3b0b commit 911771d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/dataframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def expand(self, pcoll):
return pcoll | fileio.WriteToFiles(
path=dir,
file_naming=fileio.default_file_naming(name),
sink=_WriteToPandasFileSink(
sink=lambda _: _WriteToPandasFileSink(
self.writer, self.args, self.kwargs, self.incremental, self.binary))


Expand Down
41 changes: 40 additions & 1 deletion sdks/python/apache_beam/dataframe/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import platform
import shutil
import tempfile
import typing
import unittest
from datetime import datetime
from io import BytesIO
from io import StringIO

Expand All @@ -38,6 +40,11 @@
from apache_beam.testing.util import assert_that


class MyRow(typing.NamedTuple):
timestamp: int
value: int


@unittest.skipIf(platform.system() == 'Windows', 'BEAM-10929')
class IOTest(unittest.TestCase):
def setUp(self):
Expand All @@ -56,12 +63,14 @@ def temp_dir(self, files=None):
fout.write(contents)
return dir + os.path.sep

def read_all_lines(self, pattern):
def read_all_lines(self, pattern, delete=False):
for path in glob.glob(pattern):
with open(path) as fin:
# TODO(Py3): yield from
for line in fin:
yield line.rstrip('\n')
if delete:
os.remove(path)

def test_read_write_csv(self):
input = self.temp_dir({'1.csv': 'a,b\n1,2\n', '2.csv': 'a,b\n3,4\n'})
Expand Down Expand Up @@ -304,6 +313,36 @@ def test_file_not_found(self):
with beam.Pipeline() as p:
_ = p | io.read_csv('/tmp/fake_dir/**')

def test_windowed_write(self):
output = self.temp_dir()
with beam.Pipeline() as p:
pc = (
p | beam.Create([MyRow(timestamp=i, value=i % 3) for i in range(20)])
| beam.Map(lambda v: beam.window.TimestampedValue(v, v.timestamp)).
with_output_types(MyRow)
| beam.WindowInto(
beam.window.FixedWindows(10)).with_output_types(MyRow))

deferred_df = convert.to_dataframe(pc)
deferred_df.to_csv(output + 'out.csv', index=False)

first_window_files = (
f'{output}out.csv-'
f'{datetime.utcfromtimestamp(0).isoformat()}*')
self.assertCountEqual(
['timestamp,value'] + [f'{i},{i%3}' for i in range(10)],
set(self.read_all_lines(first_window_files, delete=True)))

second_window_files = (
f'{output}out.csv-'
f'{datetime.utcfromtimestamp(10).isoformat()}*')
self.assertCountEqual(
['timestamp,value'] + [f'{i},{i%3}' for i in range(10, 20)],
set(self.read_all_lines(second_window_files, delete=True)))

# Check that we've read (and removed) every output file
self.assertEqual(len(glob.glob(f'{output}out.csv*')), 0)


if __name__ == '__main__':
unittest.main()

0 comments on commit 911771d

Please sign in to comment.