From a9061dc4d8cf7f34ece93fa3ed1f33188c19ae26 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 13 Sep 2018 18:06:03 +0200 Subject: [PATCH] [AIRFLOW-3059] Log how many rows are read from Postgres To know how many data is being read from Postgres, it is nice to log this to the Airflow log --- .../operators/postgres_to_gcs_operator.py | 54 +++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/airflow/contrib/operators/postgres_to_gcs_operator.py b/airflow/contrib/operators/postgres_to_gcs_operator.py index 850d858f94c58..78da78ee2f20d 100644 --- a/airflow/contrib/operators/postgres_to_gcs_operator.py +++ b/airflow/contrib/operators/postgres_to_gcs_operator.py @@ -133,28 +133,38 @@ def _write_local_data_files(self, cursor): contain the data for the GCS objects. """ schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) - file_no = 0 - tmp_file_handle = NamedTemporaryFile(delete=True) - tmp_file_handles = {self.filename.format(file_no): tmp_file_handle} - - for row in cursor: - # Convert datetime objects to utc seconds, and decimals to floats - row = map(self.convert_types, row) - row_dict = dict(zip(schema, row)) - - s = json.dumps(row_dict, sort_keys=True) - if PY3: - s = s.encode('utf-8') - tmp_file_handle.write(s) - - # Append newline to make dumps BigQuery compatible. - tmp_file_handle.write(b'\n') - - # Stop if the file exceeds the file size limit. - if tmp_file_handle.tell() >= self.approx_max_file_size_bytes: - file_no += 1 - tmp_file_handle = NamedTemporaryFile(delete=True) - tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle + tmp_file_handles = {} + row_no = 0 + + def _create_new_file(): + handle = NamedTemporaryFile(delete=True) + filename = self.filename.format(len(tmp_file_handles)) + tmp_file_handles[filename] = handle + return handle + + # Don't create a file if there is nothing to write + if cursor.rowcount > 0: + tmp_file_handle = _create_new_file() + + for row in cursor: + # Convert datetime objects to utc seconds, and decimals to floats + row = map(self.convert_types, row) + row_dict = dict(zip(schema, row)) + + s = json.dumps(row_dict, sort_keys=True) + if PY3: + s = s.encode('utf-8') + tmp_file_handle.write(s) + + # Append newline to make dumps BigQuery compatible. + tmp_file_handle.write(b'\n') + + # Stop if the file exceeds the file size limit. + if tmp_file_handle.tell() >= self.approx_max_file_size_bytes: + tmp_file_handle = _create_new_file() + row_no += 1 + + self.log.info('Received %s rows over %s files', row_no, len(tmp_file_handles)) return tmp_file_handles