Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX - PRONOM sig update for Python 3 #165

Merged
merged 4 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions fido/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def save(self, dst=sys.stdout):
# if f.find('signature'):
root.append(f)
self.indent(root)
with open(dst, 'wb') as file_:
with open(dst, 'w') as file_:
# print >>out, ET.tostring(root,encoding='utf-8')
print(ET.tostring(root), file=file_)

Expand Down Expand Up @@ -319,7 +319,37 @@ def compare_formats(f1, f2):
if f1ID == f2ID:
return 0
return 1
return sorted(formatlist, cmp=compare_formats)
return sorted(formatlist, key=_cmp_to_key(compare_formats))


def _cmp_to_key(mycmp):
"""Convert a cmp= function into a key= function."""
# From https://docs.python.org/3/howto/sorting.html#sortinghowto
class K:
"""Wrapper class for comparator function."""

def __init__(self, obj, *_):
self.obj = obj

def __lt__(self, other):
return mycmp(self.obj, other.obj) < 0

def __gt__(self, other):
return mycmp(self.obj, other.obj) > 0

def __eq__(self, other):
return mycmp(self.obj, other.obj) == 0

def __le__(self, other):
return mycmp(self.obj, other.obj) <= 0

def __ge__(self, other):
return mycmp(self.obj, other.obj) >= 0

def __ne__(self, other):
return mycmp(self.obj, other.obj) != 0

return K


def fido_position(pronom_position):
Expand Down
84 changes: 42 additions & 42 deletions fido/pronomutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def check_well_formedness(filename, error=False):
"""
parser = ParserCreate()
try:
parser.ParseFile(open(filename, "r"))
parser.ParseFile(open(filename, "rb"))
except ExpatError as e:
if error is not False:
sys.stderr.write("check_well_formedness: %s: %s;\n" % (filename, e))
Expand Down Expand Up @@ -72,57 +72,57 @@ def get_pronom_signature(type_):
else:
sys.stderr.write("get_pronom_signature(): unknown type: " + type_)
return False
webservice = http_client.HTTP("www.nationalarchives.gov.uk")
webservice.putrequest("POST", "/pronom/service.asmx")
webservice.putheader("Host", "www.nationalarchives.gov.uk")
webservice.putheader("User-Agent", "PRONOM UTILS v{0} (OPF)".format(__version__))
webservice.putheader("Content-type", "text/xml; charset=\"UTF-8\"")
webservice.putheader("Content-length", "%d" % len(soapStr))
webservice.putheader("SOAPAction", soapAction)
headers = {
"Host": "www.nationalarchives.gov.uk",
"User-Agent": "PRONOM UTILS v{0} (OPF)".format(__version__),
"Content-type": "text/xml; charset=\"UTF-8\"",
"Content-length": "%d" % len(soapStr),
"SOAPAction": soapAction
}
connection = http_client.HTTPConnection("www.nationalarchives.gov.uk")
try:
webservice.endheaders()
connection.request("POST", "/pronom/service.asmx", soapStr, headers)
except Exception as e:
sys.stderr.write("get_pronom_signature(): failed to contact PRONOM;\n%s\n" % (e))
sys.exit()
webservice.send(soapStr)
statuscode, statusmessage, header = webservice.getreply()
if statuscode == 200:
xml = webservice.getfile()
if type_ == "version":
exp = re.compile(r"\<Version\>([0-9]{1,4})\<\/Version\>")
sigxml = exp.search(xml.read())
if len(sigxml.group(1)) > 0:
return int(sigxml.group(1))
else:
sys.stderr.write("get_pronom_signature(): could not parse VERSION from SOAP response: " + type_)
response = connection.getresponse()
if response.status != 200:
sys.stderr.write("get_pronom_signature(): webservice error: '" + str(response.status) + " " + response.reason + "'\n")
return False
xml = response.read().decode("utf-8")
if type_ == "version":
exp = re.compile(r"\<Version\>([0-9]{1,4})\<\/Version\>")
sigxml = exp.search(xml)
if len(sigxml.group(1)) > 0:
return int(sigxml.group(1))
else:
sys.stderr.write("get_pronom_signature(): could not parse VERSION from SOAP response: " + type_)
return False
if type_ == "file":
exp = re.compile(r"\<SignatureFile\>.*\<\/SignatureFile\>")
sigxml = exp.search(xml)
sigtxt = sigxml.group(0) if sigxml else ''
if len(sigtxt) > 0:
tmpfile = "./tmp_getPronomSignature.xml"
with open(tmpfile, 'w') as file_:
file_.write("""<?xml version="1.0" encoding="UTF-8"?>""" + "\n")
file_.write(sigtxt)
if not check_well_formedness(tmpfile):
os.unlink(tmpfile)
sys.stderr.write("get_pronom_signature(): signaturefile not well formed")
return False
if type_ == "file":
exp = re.compile(r"\<SignatureFile\>.*\<\/SignatureFile\>")
sigxml = exp.search(xml.read())
sigtxt = sigxml.group(0) if sigxml else ''
if len(sigtxt) > 0:
tmpfile = "./tmp_getPronomSignature.xml"
with open(tmpfile, 'wb') as file_:
file_.write("""<?xml version="1.0" encoding="UTF-8"?>""" + "\n")
file_.write(sigtxt)
if not check_well_formedness(tmpfile):
os.unlink(tmpfile)
sys.stderr.write("get_pronom_signature(): signaturefile not well formed")
return False
else:
os.unlink(tmpfile)
return """<?xml version="1.0" encoding="UTF-8"?>""" + "\n" + sigtxt
else:
sys.stderr.write("get_pronom_signature(): could not parse XML from SOAP response: " + type_)
return False
else:
sys.stderr.write("get_pronom_signature(): webservice error: '" + str(statuscode) + " " + statusmessage + "'\n")
return False
os.unlink(tmpfile)
return """<?xml version="1.0" encoding="UTF-8"?>""" + "\n" + sigtxt
else:
sys.stderr.write("get_pronom_signature(): could not parse XML from SOAP response: " + type_)
return False

sys.stderr.write("get_pronom_signature(): unexpected return")
return False
except Exception as e:
sys.stderr.write("get_pronom_signature(): unknown error: " + str(e))
return False
raise e


class LocalPronomVersions(object):
Expand Down
145 changes: 80 additions & 65 deletions fido/update_signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,35 @@
import zipfile

from six.moves.urllib.request import urlopen
from six.moves.urllib.error import URLError

from . import __version__, CONFIG_DIR, query_yes_no
from .prepare import run as prepare_pronom_to_fido
from .pronomutils import check_well_formedness, get_local_pronom_versions, get_pronom_signature


defaults = {
DEFAULTS = {
'signatureFileName': 'DROID_SignatureFile-v{0}.xml',
'pronomZipFileName': 'pronom-xml-v{0}.zip',
'fidoSignatureVersion': 'format_extensions.xml',
'containerVersion': 'container-signature-20160121.xml', # container version is frozen and needs human attention before updating,
}

options = {
OPTIONS = {
'http_throttle': 0.5, # in secs, to prevent DoS of PRONOM server
'tmp_dir': os.path.join(CONFIG_DIR, 'tmp'),
'deleteTempDirectory': True,
}


def run(defaults=defaults):
def run(defaults=None):
"""
Update PRONOM signatures.

Interactive script, requires keyboard input.
"""
print("FIDO signature updater v{}".format(__version__))

defaults = defaults or DEFAULTS
try:
print("Contacting PRONOM...")
currentVersion = get_pronom_signature("version")
Expand All @@ -70,21 +71,21 @@ def run(defaults=defaults):
if not currentFile:
sys.exit('Failed to obtain PRONOM signature file, please try again.')
print("Writing {0}...".format(defaults['signatureFileName'].format(currentVersion)))
with open(signatureFile, 'wb') as file_:
with open(signatureFile, 'w') as file_:
file_.write(currentFile)

print("Extracting PRONOM PUID's from signature file...")
tree = CET.parse(signatureFile)
puids = []
for node in tree.iter("{http://www.nationalarchives.gov.uk/pronom/SignatureFile}FileFormat"):
puids.append(node.get("PUID"))
numberPuids = len(puids)
print("Found {} PRONOM PUID's".format(numberPuids))
print("Found {} PRONOM PUID's".format(len(puids)))

print("Downloading signatures can take a while")
if not query_yes_no("Continue and download signatures?"):
sys.exit('Aborting update...')
tmpdir = defaults['tmp_dir']
resume_download = False
if os.path.isdir(tmpdir):
print("Found previously created temporary folder for download:", tmpdir)
resume_download = query_yes_no('Do you want to resume download (yes) or start over (no)?')
Expand All @@ -97,65 +98,15 @@ def run(defaults=defaults):
except OSError:
pass
if not os.path.isdir(tmpdir):
print("Failed to create temporary folder for PUID's, using", tmpdir)

print("Downloading signatures, one moment please...")
one_percent = (float(numberPuids) / 100)
numfiles = 0
for puid in puids:
puidType, puidNum = puid.split("/")
puidFileName = "puid." + puidType + "." + puidNum + ".xml"
filename = os.path.join(tmpdir, puidFileName)
if os.path.isfile(filename) and check_well_formedness(filename) and resume_download:
numfiles += 1
continue
puid_url = "http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid)
try:
filehandle = urlopen(puid_url)
except Exception as e:
print("Failed to download signature file:", puid_url)
print("Error:", str(e))
sys.exit('Please restart and resume download.')
with open(filename, 'wb') as file_:
for lines in filehandle.readlines():
file_.write(lines)
filehandle.close()
if not check_well_formedness(filename):
os.unlink(filename)
continue
numfiles += 1
percent = int(float(numfiles) / one_percent)
print(r"{}/{} files [{}%]".format(numfiles, numberPuids, percent))
time.sleep(defaults['http_throttle'])
print("100%")

print("Creating PRONOM zip...")
compression = zipfile.ZIP_DEFLATED if 'zlib' in sys.modules else zipfile.ZIP_STORED
modes = {zipfile.ZIP_DEFLATED: 'deflated', zipfile.ZIP_STORED: 'stored'}
zf = zipfile.ZipFile(os.path.join(CONFIG_DIR, defaults['pronomZipFileName'].format(currentVersion)), mode='w')
print("Adding files with compression mode", modes[compression])
for puid in puids:
puidType, puidNum = puid.split("/")
puidFileName = "puid.{}.{}.xml".format(puidType, puidNum)
filename = os.path.join(tmpdir, puidFileName)
if os.path.isfile(filename):
zf.write(filename, arcname=puidFileName, compress_type=compression)
if defaults['deleteTempDirectory']:
os.unlink(filename)
zf.close()
sys.stderr.write("Failed to create temporary folder for PUID's, using: " + tmpdir)

download_signatures(defaults, puids, resume_download, tmpdir)
create_zip_file(defaults, puids, currentVersion, tmpdir)
if defaults['deleteTempDirectory']:
print("Deleting temporary folder and files...")
rmtree(tmpdir, ignore_errors=True)

print('Updating versions.xml...')
versions = get_local_pronom_versions()
versions.pronom_version = str(currentVersion)
versions.pronom_signature = "formats-v" + str(currentVersion) + ".xml"
versions.pronom_container_signature = defaults['containerVersion']
versions.fido_extension_signature = defaults['fidoSignatureVersion']
versions.update_script = __version__
versions.write()
update_versions_xml(defaults, currentVersion)

# TODO: there should be a check here to handle prepare.main exit() signal (-1/0/1/...)
print("Preparing to convert PRONOM formats to FIDO signatures...")
Expand All @@ -166,14 +117,78 @@ def run(defaults=defaults):
sys.exit('Aborting update...')


def download_signatures(defaults, puids, resume_download, tmpdir):
"""Download PRONOM signatures and write to individual files."""
print("Downloading signatures, one moment please...")
numberPuids = len(puids)
one_percent = (float(numberPuids) / 100)
numfiles = 0
for puid in puids:
puidType, puidNum = puid.split("/")
puidFileName = "puid." + puidType + "." + puidNum + ".xml"
filename = os.path.join(tmpdir, puidFileName)
if os.path.isfile(filename) and check_well_formedness(filename) and resume_download:
numfiles += 1
continue
puid_url = "http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid)
try:
filehandle = urlopen(puid_url)
except URLError as e:
sys.stderr.write("Failed to download signature file:" + puid_url)
sys.stderr.write("Error:" + str(e))
sys.exit('Please restart and resume download.')
with open(filename, 'wb') as file_:
for lines in filehandle.readlines():
file_.write(lines)
filehandle.close()
if not check_well_formedness(filename):
os.unlink(filename)
continue
numfiles += 1
percent = int(float(numfiles) / one_percent)
print(r"{}/{} files [{}%]".format(numfiles, numberPuids, percent))
time.sleep(defaults['http_throttle'])
print("100%")


def create_zip_file(defaults, puids, currentVersion, tmpdir):
"""Create zip file of signatures."""
print("Creating PRONOM zip...")
compression = zipfile.ZIP_DEFLATED if 'zlib' in sys.modules else zipfile.ZIP_STORED
modes = {zipfile.ZIP_DEFLATED: 'deflated', zipfile.ZIP_STORED: 'stored'}
zf = zipfile.ZipFile(os.path.join(CONFIG_DIR, defaults['pronomZipFileName'].format(currentVersion)), mode='w')
print("Adding files with compression mode", modes[compression])
for puid in puids:
puidType, puidNum = puid.split("/")
puidFileName = "puid.{}.{}.xml".format(puidType, puidNum)
filename = os.path.join(tmpdir, puidFileName)
if os.path.isfile(filename):
zf.write(filename, arcname=puidFileName, compress_type=compression)
if defaults['deleteTempDirectory']:
os.unlink(filename)
zf.close()


def update_versions_xml(defaults, currentVersion):
"""Create new versions identified sig XML file."""
print('Updating versions.xml...')
versions = get_local_pronom_versions()
versions.pronom_version = str(currentVersion)
versions.pronom_signature = "formats-v" + str(currentVersion) + ".xml"
versions.pronom_container_signature = defaults['containerVersion']
versions.fido_extension_signature = defaults['fidoSignatureVersion']
versions.update_script = __version__
versions.write()


def main():
"""Main CLI entrypoint."""
parser = ArgumentParser(description='Download and convert the latest PRONOM signatures')
parser.add_argument('-tmpdir', default=options['tmp_dir'], help='Location to store temporary files', dest='tmp_dir')
parser.add_argument('-keep_tmp', default=options['deleteTempDirectory'], help='Do not delete temporary files after completion', dest='deleteTempDirectory', action='store_false')
parser.add_argument('-http_throttle', default=options['http_throttle'], help='Time (in seconds) to wait between downloads', type=float, dest='http_throttle')
parser.add_argument('-tmpdir', default=OPTIONS['tmp_dir'], help='Location to store temporary files', dest='tmp_dir')
parser.add_argument('-keep_tmp', default=OPTIONS['deleteTempDirectory'], help='Do not delete temporary files after completion', dest='deleteTempDirectory', action='store_false')
parser.add_argument('-http_throttle', default=OPTIONS['http_throttle'], help='Time (in seconds) to wait between downloads', type=float, dest='http_throttle')
args = parser.parse_args()
opts = defaults.copy()
opts = DEFAULTS.copy()
opts.update(vars(args))

run(opts)
Expand Down