Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix clearsource_history command to avoid disassociation #397

Closed
9 changes: 7 additions & 2 deletions ckanext/harvest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,13 @@ def clear(ctx, id):

@source.command()
@click.argument(u"id", metavar=u"SOURCE_ID_OR_NAME", required=False)
@click.option(
"-k",
"--keep-actual",
default=False
)
@click.pass_context
def clear_history(ctx, id):
def clear_history(ctx, id, keep_actual):
"""If no source id is given the history for all harvest sources
(maximum is 1000) will be cleared.

Expand All @@ -122,7 +127,7 @@ def clear_history(ctx, id):
flask_app = ctx.meta["flask_app"]

with flask_app.test_request_context():
result = utils.clear_harvest_source_history(id)
result = utils.clear_harvest_source_history(id, bool(keep_actual))
click.secho(result, fg="green")


Expand Down
12 changes: 11 additions & 1 deletion ckanext/harvest/commands/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ def __init__(self, name):
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f""",
)

self.parser.add_option(
"-k",
"--keep-actual",
dest="keep_actual",
default=False,
help="Do not delete relevant harvest objects",
)

def command(self):
self._load_config()

Expand Down Expand Up @@ -286,11 +294,13 @@ def create_harvest_source(self):
print(result)

def clear_harvest_source_history(self):
keep_actual = bool(self.options.keep_actual)
source_id = None

if len(self.args) >= 2:
source_id = unicode(self.args[1])

print(utils.clear_harvest_source_history(source_id))
print(utils.clear_harvest_source_history(source_id, keep_actual))

def show_harvest_source(self):

Expand Down
7 changes: 5 additions & 2 deletions ckanext/harvest/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

import ipdb
from ckan import logic
from ckan import model
import ckan.lib.helpers as h
Expand Down Expand Up @@ -55,8 +56,10 @@ def package_list_for_source(source_id):
context['ignore_capacity_check'] = True

query = logic.get_action('package_search')(context, search_dict)

base_url = h.url_for('{0}_read'.format(DATASET_TYPE_NAME), id=source_id)
base_url = h.url_for(
'{0}_read'.format(DATASET_TYPE_NAME),
id=harvest_source.get('name')
)

def pager_url(q=None, page=None):
url = base_url
Expand Down
53 changes: 39 additions & 14 deletions ckanext/harvest/logic/action/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,20 @@ def harvest_sources_job_history_clear(context, data_dict):
check_access('harvest_sources_clear', context, data_dict)

job_history_clear_results = []
keep_actual = data_dict.get('keep_actual')

# We assume that the maximum of 1000 (hard limit) rows should be enough
result = logic.get_action('package_search')(context, {'fq': '+dataset_type:harvest', 'rows': 1000})
result = logic.get_action('package_search')(
context, {'fq': '+dataset_type:harvest', 'rows': 1000})
harvest_packages = result['results']
if harvest_packages:
for data_dict in harvest_packages:
try:
clear_result = get_action('harvest_source_job_history_clear')(context, {'id': data_dict['id']})
clear_result = get_action('harvest_source_job_history_clear')(
context, {
'id': data_dict['id'],
'keep_actual': keep_actual
})
job_history_clear_results.append(clear_result)
except NotFound:
# Ignoring not existent harvest sources because of a possibly corrupt search index
Expand All @@ -274,6 +281,7 @@ def harvest_source_job_history_clear(context, data_dict):
check_access('harvest_source_clear', context, data_dict)

harvest_source_id = data_dict.get('id', None)
keep_actual = data_dict.get('keep_actual', False)

source = HarvestSource.get(harvest_source_id)
if not source:
Expand All @@ -284,19 +292,36 @@ def harvest_source_job_history_clear(context, data_dict):

model = context['model']

sql = '''begin;
delete from harvest_object_error where harvest_object_id
in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id
in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id
in (select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
commit;
'''.format(harvest_source_id=harvest_source_id)
sql = '''BEGIN;
DELETE FROM harvest_object_error WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object_extra WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
'''

model.Session.execute(sql)
if keep_actual:
sql += '''
DELETE FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}'
AND current != true;
DELETE FROM harvest_gather_error AS err WHERE NOT EXISTS
(SELECT id FROM harvest_object WHERE harvest_job_id = err.harvest_job_id
AND harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_job AS job WHERE source_id = '{harvest_source_id}'
AND job.status != 'Running'
AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id);
'''
else:
sql += '''
DELETE FROM harvest_gather_error WHERE harvest_job_id
IN (SELECT id FROM harvest_job WHERE source_id = '{harvest_source_id}');
DELETE FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}';
DELETE FROM harvest_job WHERE source_id = '{harvest_source_id}';
'''
sql += '''
COMMIT;
'''

model.Session.execute(sql.format(harvest_source_id=harvest_source_id))

# Refresh the index for this source to update the status object
get_action('harvest_source_reindex')(context, {'id': harvest_source_id})
Expand Down
2 changes: 1 addition & 1 deletion ckanext/harvest/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def after_show(self, context, data_dict):
# package_show
if data_dict.get('extras'):
data_dict['extras'][:] = [e for e in data_dict.get('extras', [])
if not e['key']
if not e.get('key')
in ('harvest_object_id', 'harvest_source_id', 'harvest_source_title',)]

# We only want to add these extras at index time so they are part
Expand Down
73 changes: 73 additions & 0 deletions ckanext/harvest/tests/nose/test_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,79 @@ def test_harvest_sources_job_history_clear(self):
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2, 'is None'
assert_equal(dataset_from_db_2.id, dataset_2['id'])

def test_harvest_sources_job_history_clear_keep_actual(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)

job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])

job_2 = factories.HarvestJobObj(source=source_2)
# creating harvest_object with empty package_id
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=None)

setattr(object_1_, 'report_status', 'added')
setattr(object_1_, 'current', True)
model.Session.commit()

# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_sources_job_history_clear')(
context, {'keep_actual': True})

# verify
assert_equal(
sorted(result),
sorted([{'id': source_1.id}, {'id': source_2.id}]))

# dataset, related source, object and job still persist!
assert harvest_model.HarvestSource.get(source_1.id)
assert harvest_model.HarvestJob.get(job_1.id)
assert harvest_model.HarvestObject.get(object_1_.id)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert_equal(dataset_from_db_1.id, dataset_1['id'])

# second source persist, but job and object was deleted
assert harvest_model.HarvestSource.get(source_2.id)
assert not harvest_model.HarvestJob.get(job_2.id)
assert not harvest_model.HarvestObject.get(object_2_.id)

def test_harvest_source_job_history_clear_keep_actual(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])

setattr(object_, 'report_status', 'added')
setattr(object_, 'current', True)
model.Session.commit()

# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_actual': True})

# verify
assert_equal(result, {'id': source.id})
assert harvest_model.HarvestSource.get(source.id)
assert harvest_model.HarvestJob.get(job.id)
assert harvest_model.HarvestObject.get(object_.id)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
assert_equal(dataset_from_db.id, dataset['id'])

def test_harvest_source_create_twice_with_unique_url(self):
data_dict = SOURCE_DICT.copy()
Expand Down
78 changes: 77 additions & 1 deletion ckanext/harvest/tests/test_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def test_harvest_sources_job_history_clear(self):
package_id=dataset_2['id'])

# execute
context = {'session': model.Session,
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_sources_job_history_clear')(
context, {})
Expand All @@ -315,6 +315,82 @@ def test_harvest_sources_job_history_clear(self):
assert dataset_from_db_2, 'is None'
assert dataset_from_db_2.id == dataset_2['id']

def test_harvest_sources_job_history_clear_keep_actual(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)

job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])

job_2 = factories.HarvestJobObj(source=source_2)
# creating harvest_object with empty package_id
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=None)

setattr(object_1_, 'report_status', 'added')
setattr(object_1_, 'current', True)
model.Session.commit()

# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_sources_job_history_clear')(
context, {'keep_actual': True})

# verify
assert sorted(result, key=lambda item: item['id']) == sorted(
[{'id': source_1.id}, {'id': source_2.id}], key=lambda item: item['id'])

# dataset, related source, object and job still persist!
assert harvest_model.HarvestSource.get(source_1.id)
assert harvest_model.HarvestJob.get(job_1.id)
assert harvest_model.HarvestObject.get(object_1_.id)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert dataset_from_db_1.id == dataset_1['id']

# second source persist, but job and object was deleted
assert harvest_model.HarvestSource.get(source_2.id)
assert not harvest_model.HarvestJob.get(job_2.id)
assert not harvest_model.HarvestObject.get(object_2_.id)

def test_harvest_source_job_history_clear_keep_actual(self):
# prepare
data_dict = SOURCE_DICT.copy()
data_dict['name'] = 'another-source12'
data_dict['url'] = 'http://another-url12'
source = factories.HarvestSourceObj(**data_dict)

job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])

setattr(object_, 'report_status', 'added')
setattr(object_, 'current', True)
model.Session.commit()

# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_actual': True})

# verify
assert result == {'id': source.id}
assert harvest_model.HarvestSource.get(source.id)
assert harvest_model.HarvestJob.get(job.id)
assert harvest_model.HarvestObject.get(object_.id)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
assert dataset_from_db.id == dataset['id']

def test_harvest_source_create_twice_with_unique_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
Expand Down
10 changes: 6 additions & 4 deletions ckanext/harvest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,24 +206,26 @@ def clear_harvest_source(source_id_or_name):
tk.get_action("harvest_source_clear")(context, {"id": source["id"]})


def clear_harvest_source_history(source_id):

def clear_harvest_source_history(source_id, keep_actual):
context = {
"model": model,
"user": _admin_user()["name"],
"session": model.Session,
}
if source_id is not None:
tk.get_action("harvest_source_job_history_clear")(context, {
"id": source_id
"id": source_id,
"keep_actual": keep_actual
})
return "Cleared job history of harvest source: {0}".format(source_id)
else:
# Purge queues, because we clean all harvest jobs and
# objects in the database.
purge_queues()
cleared_sources_dicts = tk.get_action(
"harvest_sources_job_history_clear")(context, {})
"harvest_sources_job_history_clear")(context, {
"keep_actual": keep_actual
})
return "Cleared job history for all harvest sources: {0} source(s)".format(
len(cleared_sources_dicts))

Expand Down