Skip to content

Commit

Permalink
Add CopyToTable task for MySQL (#2553)
Browse files Browse the repository at this point in the history
* Adding an rdbms.CopyToTable subclass for MySQL.

* Adding Bonnier News to list of Luigi users.
  • Loading branch information
Marabou authored and dlstadther committed Oct 21, 2018
1 parent 00eaa95 commit 3cae745
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Some more companies are using Luigi but haven't had a chance yet to write about
* `ISVWorld <http://isvworld.com/>`_
* `Big Data <https://bigdata.com.br/>`_
* `Movio <https://movio.co.nz/>`_
* `Bonnier News <https://www.bonniernews.se/>`_

We're more than happy to have your company added here. Just send a PR on GitHub.

Expand Down
105 changes: 103 additions & 2 deletions luigi/contrib/mysqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

import luigi

from luigi.contrib import rdbms

logger = logging.getLogger('luigi-interface')

try:
import mysql.connector
from mysql.connector import errorcode
from mysql.connector import errorcode, Error
except ImportError as e:
logger.warning("Loading MySQL module without the python package mysql-connector-python. \
This will crash at runtime if MySQL functionality is used.")
This will crash at runtime if MySQL functionality is used.")


class MySqlTarget(luigi.Target):
Expand Down Expand Up @@ -147,3 +149,102 @@ def create_marker_table(self):
else:
raise
connection.close()


class CopyToTable(rdbms.CopyToTable):
"""
Template task for inserting a data set into MySQL
Usage:
Subclass and override the required `host`, `database`, `user`,
`password`, `table` and `columns` attributes.
To customize how to access data from an input task, override the `rows` method
with a generator that yields each row as a tuple with fields ordered according to `columns`.
"""

def rows(self):
"""
Return/yield tuples or lists corresponding to each row to be inserted.
"""
with self.input().open('r') as fobj:
for line in fobj:
yield line.strip('\n').split('\t')

# everything below will rarely have to be overridden

def output(self):
"""
Returns a MySqlTarget representing the inserted dataset.
Normally you don't override this.
"""
return MySqlTarget(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
table=self.table,
update_id=self.update_id

)

def copy(self, cursor, file=None):
values = '({})'.format(','.join(['%s' for i in range(len(self.columns))]))
columns = '({})'.format(','.join([c[0] for c in self.columns]))
query = 'INSERT INTO {} {} VALUES {}'.format(self.table, columns, values)
rows = []

for idx, row in enumerate(self.rows()):
rows.append(row)

if (idx + 1) % self.bulk_size == 0:
cursor.executemany(query, rows)
rows = []

cursor.executemany(query, rows)

def run(self):
"""
Inserts data generated by rows() into target table.
If the target table doesn't exist, self.create_table will be called to attempt to create the table.
Normally you don't want to override this.
"""
if not (self.table and self.columns):
raise Exception("table and columns need to be specified")

connection = self.output().connect()

# attempt to copy the data into mysql
# if it fails because the target table doesn't exist
# try to create it by running self.create_table
for attempt in range(2):
try:
cursor = connection.cursor()
print("caling init copy...")
self.init_copy(connection)
self.copy(cursor)
self.post_copy(connection)
if self.enable_metadata_columns:
self.post_copy_metacolumns(cursor)
except Error as err:
if err.errno == errorcode.ER_NO_SUCH_TABLE and attempt == 0:
# if first attempt fails with "relation not found", try creating table
# logger.info("Creating table %s", self.table)
connection.reconnect()
self.create_table(connection)
else:
raise
else:
break

# mark as complete in same transaction
self.output().touch(connection)
connection.commit()
connection.close()

@property
def bulk_size(self):
return 10000
124 changes: 124 additions & 0 deletions test/contrib/mysqldb_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from luigi.tools.range import RangeDaily

import mock

import luigi.contrib.mysqldb

import datetime
from helpers import unittest

from nose.plugins.attrib import attr


def datetime_to_epoch(dt):
td = dt - datetime.datetime(1970, 1, 1)
return td.days * 86400 + td.seconds + td.microseconds / 1E6


class MockMysqlCursor(mock.Mock):
"""
Keeps state to simulate executing SELECT queries and fetching results.
"""
def __init__(self, existing_update_ids):
super(MockMysqlCursor, self).__init__()
self.existing = existing_update_ids

def execute(self, query, params):
if query.startswith('SELECT 1 FROM table_updates'):
self.fetchone_result = (1, ) if params[0] in self.existing else None
else:
self.fetchone_result = None

def fetchone(self):
return self.fetchone_result


class DummyMysqlImporter(luigi.contrib.mysqldb.CopyToTable):
date = luigi.DateParameter()

host = 'dummy_host'
database = 'dummy_database'
user = 'dummy_user'
password = 'dummy_password'
table = 'dummy_table'
columns = (
('some_text', 'text'),
('some_int', 'int'),
)


# Testing that an existing update will not be run in RangeDaily
@attr('mysql')
class DailyCopyToTableTest(unittest.TestCase):

@mock.patch('mysql.connector.connect')
def test_bulk_complete(self, mock_connect):
mock_cursor = MockMysqlCursor([ # Existing update_ids
DummyMysqlImporter(date=datetime.datetime(2015, 1, 3)).task_id
])
mock_connect.return_value.cursor.return_value = mock_cursor

task = RangeDaily(of=DummyMysqlImporter,
start=datetime.date(2015, 1, 2),
now=datetime_to_epoch(datetime.datetime(2015, 1, 7)))
actual = sorted([t.task_id for t in task.requires()])

self.assertEqual(actual, sorted([
DummyMysqlImporter(date=datetime.datetime(2015, 1, 2)).task_id,
DummyMysqlImporter(date=datetime.datetime(2015, 1, 4)).task_id,
DummyMysqlImporter(date=datetime.datetime(2015, 1, 5)).task_id,
DummyMysqlImporter(date=datetime.datetime(2015, 1, 6)).task_id,
]))
self.assertFalse(task.complete())


@attr('mysql')
class TestCopyToTableWithMetaColumns(unittest.TestCase):
@mock.patch("luigi.contrib.mysqldb.CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=True)
@mock.patch("luigi.contrib.mysqldb.CopyToTable._add_metadata_columns")
@mock.patch("luigi.contrib.mysqldb.CopyToTable.post_copy_metacolumns")
@mock.patch("luigi.contrib.mysqldb.CopyToTable.rows", return_value=['row1', 'row2'])
@mock.patch("luigi.contrib.mysqldb.MySqlTarget")
@mock.patch('mysql.connector.connect')
def test_copy_with_metadata_columns_enabled(self,
mock_connect,
mock_mysql_target,
mock_rows,
mock_add_columns,
mock_update_columns,
mock_metadata_columns_enabled):

task = DummyMysqlImporter(date=datetime.datetime(1991, 3, 24))

mock_cursor = MockMysqlCursor([task.task_id])
mock_connect.return_value.cursor.return_value = mock_cursor

task = DummyMysqlImporter(date=datetime.datetime(1991, 3, 24))
task.run()

self.assertTrue(mock_add_columns.called)
self.assertTrue(mock_update_columns.called)

@mock.patch("luigi.contrib.mysqldb.CopyToTable.enable_metadata_columns", new_callable=mock.PropertyMock, return_value=False)
@mock.patch("luigi.contrib.mysqldb.CopyToTable._add_metadata_columns")
@mock.patch("luigi.contrib.mysqldb.CopyToTable.post_copy_metacolumns")
@mock.patch("luigi.contrib.mysqldb.CopyToTable.rows", return_value=['row1', 'row2'])
@mock.patch("luigi.contrib.mysqldb.MySqlTarget")
@mock.patch('mysql.connector.connect')
def test_copy_with_metadata_columns_disabled(self,
mock_connect,
mock_mysql_target,
mock_rows,
mock_add_columns,
mock_update_columns,
mock_metadata_columns_enabled):

task = DummyMysqlImporter(date=datetime.datetime(1991, 3, 24))

mock_cursor = MockMysqlCursor([task.task_id])
mock_connect.return_value.cursor.return_value = mock_cursor

task.run()

self.assertFalse(mock_add_columns.called)
self.assertFalse(mock_update_columns.called)
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ deps=
cdh,hdp: snakebite>=2.5.2,<2.6.0
cdh,hdp: hdfs>=2.0.4,<3.0.0
postgres: psycopg2<3.0
mysql-connector-python>=8.0.12
gcloud: google-api-python-client>=1.4.0,<2.0
py27-gcloud: avro
py33-gcloud,py34-gcloud,py35-gcloud,py36-gcloud: avro-python3
Expand Down

0 comments on commit 3cae745

Please sign in to comment.