diff --git a/fido/prepare.py b/fido/prepare.py index b38ddaff..c9f85033 100644 --- a/fido/prepare.py +++ b/fido/prepare.py @@ -93,7 +93,7 @@ def save(self, dst=sys.stdout): # if f.find('signature'): root.append(f) self.indent(root) - with open(dst, 'wb') as file_: + with open(dst, 'w') as file_: # print >>out, ET.tostring(root,encoding='utf-8') print(ET.tostring(root), file=file_) @@ -319,7 +319,37 @@ def compare_formats(f1, f2): if f1ID == f2ID: return 0 return 1 - return sorted(formatlist, cmp=compare_formats) + return sorted(formatlist, key=_cmp_to_key(compare_formats)) + + +def _cmp_to_key(mycmp): + """Convert a cmp= function into a key= function.""" + # From https://docs.python.org/3/howto/sorting.html#sortinghowto + class K: + """Wrapper class for comparator function.""" + + def __init__(self, obj, *_): + self.obj = obj + + def __lt__(self, other): + return mycmp(self.obj, other.obj) < 0 + + def __gt__(self, other): + return mycmp(self.obj, other.obj) > 0 + + def __eq__(self, other): + return mycmp(self.obj, other.obj) == 0 + + def __le__(self, other): + return mycmp(self.obj, other.obj) <= 0 + + def __ge__(self, other): + return mycmp(self.obj, other.obj) >= 0 + + def __ne__(self, other): + return mycmp(self.obj, other.obj) != 0 + + return K def fido_position(pronom_position): diff --git a/fido/pronomutils.py b/fido/pronomutils.py index e720e139..5dfee92a 100644 --- a/fido/pronomutils.py +++ b/fido/pronomutils.py @@ -42,7 +42,7 @@ def check_well_formedness(filename, error=False): """ parser = ParserCreate() try: - parser.ParseFile(open(filename, "r")) + parser.ParseFile(open(filename, "rb")) except ExpatError as e: if error is not False: sys.stderr.write("check_well_formedness: %s: %s;\n" % (filename, e)) @@ -72,57 +72,57 @@ def get_pronom_signature(type_): else: sys.stderr.write("get_pronom_signature(): unknown type: " + type_) return False - webservice = http_client.HTTP("www.nationalarchives.gov.uk") - webservice.putrequest("POST", "/pronom/service.asmx") - webservice.putheader("Host", "www.nationalarchives.gov.uk") - webservice.putheader("User-Agent", "PRONOM UTILS v{0} (OPF)".format(__version__)) - webservice.putheader("Content-type", "text/xml; charset=\"UTF-8\"") - webservice.putheader("Content-length", "%d" % len(soapStr)) - webservice.putheader("SOAPAction", soapAction) + headers = { + "Host": "www.nationalarchives.gov.uk", + "User-Agent": "PRONOM UTILS v{0} (OPF)".format(__version__), + "Content-type": "text/xml; charset=\"UTF-8\"", + "Content-length": "%d" % len(soapStr), + "SOAPAction": soapAction + } + connection = http_client.HTTPConnection("www.nationalarchives.gov.uk") try: - webservice.endheaders() + connection.request("POST", "/pronom/service.asmx", soapStr, headers) except Exception as e: sys.stderr.write("get_pronom_signature(): failed to contact PRONOM;\n%s\n" % (e)) sys.exit() - webservice.send(soapStr) - statuscode, statusmessage, header = webservice.getreply() - if statuscode == 200: - xml = webservice.getfile() - if type_ == "version": - exp = re.compile(r"\([0-9]{1,4})\<\/Version\>") - sigxml = exp.search(xml.read()) - if len(sigxml.group(1)) > 0: - return int(sigxml.group(1)) - else: - sys.stderr.write("get_pronom_signature(): could not parse VERSION from SOAP response: " + type_) + response = connection.getresponse() + if response.status != 200: + sys.stderr.write("get_pronom_signature(): webservice error: '" + str(response.status) + " " + response.reason + "'\n") + return False + xml = response.read().decode("utf-8") + if type_ == "version": + exp = re.compile(r"\([0-9]{1,4})\<\/Version\>") + sigxml = exp.search(xml) + if len(sigxml.group(1)) > 0: + return int(sigxml.group(1)) + else: + sys.stderr.write("get_pronom_signature(): could not parse VERSION from SOAP response: " + type_) + return False + if type_ == "file": + exp = re.compile(r"\.*\<\/SignatureFile\>") + sigxml = exp.search(xml) + sigtxt = sigxml.group(0) if sigxml else '' + if len(sigtxt) > 0: + tmpfile = "./tmp_getPronomSignature.xml" + with open(tmpfile, 'w') as file_: + file_.write("""""" + "\n") + file_.write(sigtxt) + if not check_well_formedness(tmpfile): + os.unlink(tmpfile) + sys.stderr.write("get_pronom_signature(): signaturefile not well formed") return False - if type_ == "file": - exp = re.compile(r"\.*\<\/SignatureFile\>") - sigxml = exp.search(xml.read()) - sigtxt = sigxml.group(0) if sigxml else '' - if len(sigtxt) > 0: - tmpfile = "./tmp_getPronomSignature.xml" - with open(tmpfile, 'wb') as file_: - file_.write("""""" + "\n") - file_.write(sigtxt) - if not check_well_formedness(tmpfile): - os.unlink(tmpfile) - sys.stderr.write("get_pronom_signature(): signaturefile not well formed") - return False - else: - os.unlink(tmpfile) - return """""" + "\n" + sigtxt else: - sys.stderr.write("get_pronom_signature(): could not parse XML from SOAP response: " + type_) - return False - else: - sys.stderr.write("get_pronom_signature(): webservice error: '" + str(statuscode) + " " + statusmessage + "'\n") - return False + os.unlink(tmpfile) + return """""" + "\n" + sigtxt + else: + sys.stderr.write("get_pronom_signature(): could not parse XML from SOAP response: " + type_) + return False + sys.stderr.write("get_pronom_signature(): unexpected return") return False except Exception as e: sys.stderr.write("get_pronom_signature(): unknown error: " + str(e)) - return False + raise e class LocalPronomVersions(object): diff --git a/fido/update_signatures.py b/fido/update_signatures.py index b42a1ad3..f05665c6 100644 --- a/fido/update_signatures.py +++ b/fido/update_signatures.py @@ -24,34 +24,35 @@ import zipfile from six.moves.urllib.request import urlopen +from six.moves.urllib.error import URLError from . import __version__, CONFIG_DIR, query_yes_no from .prepare import run as prepare_pronom_to_fido from .pronomutils import check_well_formedness, get_local_pronom_versions, get_pronom_signature -defaults = { +DEFAULTS = { 'signatureFileName': 'DROID_SignatureFile-v{0}.xml', 'pronomZipFileName': 'pronom-xml-v{0}.zip', 'fidoSignatureVersion': 'format_extensions.xml', 'containerVersion': 'container-signature-20160121.xml', # container version is frozen and needs human attention before updating, } -options = { +OPTIONS = { 'http_throttle': 0.5, # in secs, to prevent DoS of PRONOM server 'tmp_dir': os.path.join(CONFIG_DIR, 'tmp'), 'deleteTempDirectory': True, } -def run(defaults=defaults): +def run(defaults=None): """ Update PRONOM signatures. Interactive script, requires keyboard input. """ print("FIDO signature updater v{}".format(__version__)) - + defaults = defaults or DEFAULTS try: print("Contacting PRONOM...") currentVersion = get_pronom_signature("version") @@ -70,7 +71,7 @@ def run(defaults=defaults): if not currentFile: sys.exit('Failed to obtain PRONOM signature file, please try again.') print("Writing {0}...".format(defaults['signatureFileName'].format(currentVersion))) - with open(signatureFile, 'wb') as file_: + with open(signatureFile, 'w') as file_: file_.write(currentFile) print("Extracting PRONOM PUID's from signature file...") @@ -78,13 +79,13 @@ def run(defaults=defaults): puids = [] for node in tree.iter("{http://www.nationalarchives.gov.uk/pronom/SignatureFile}FileFormat"): puids.append(node.get("PUID")) - numberPuids = len(puids) - print("Found {} PRONOM PUID's".format(numberPuids)) + print("Found {} PRONOM PUID's".format(len(puids))) print("Downloading signatures can take a while") if not query_yes_no("Continue and download signatures?"): sys.exit('Aborting update...') tmpdir = defaults['tmp_dir'] + resume_download = False if os.path.isdir(tmpdir): print("Found previously created temporary folder for download:", tmpdir) resume_download = query_yes_no('Do you want to resume download (yes) or start over (no)?') @@ -97,65 +98,15 @@ def run(defaults=defaults): except OSError: pass if not os.path.isdir(tmpdir): - print("Failed to create temporary folder for PUID's, using", tmpdir) - - print("Downloading signatures, one moment please...") - one_percent = (float(numberPuids) / 100) - numfiles = 0 - for puid in puids: - puidType, puidNum = puid.split("/") - puidFileName = "puid." + puidType + "." + puidNum + ".xml" - filename = os.path.join(tmpdir, puidFileName) - if os.path.isfile(filename) and check_well_formedness(filename) and resume_download: - numfiles += 1 - continue - puid_url = "http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid) - try: - filehandle = urlopen(puid_url) - except Exception as e: - print("Failed to download signature file:", puid_url) - print("Error:", str(e)) - sys.exit('Please restart and resume download.') - with open(filename, 'wb') as file_: - for lines in filehandle.readlines(): - file_.write(lines) - filehandle.close() - if not check_well_formedness(filename): - os.unlink(filename) - continue - numfiles += 1 - percent = int(float(numfiles) / one_percent) - print(r"{}/{} files [{}%]".format(numfiles, numberPuids, percent)) - time.sleep(defaults['http_throttle']) - print("100%") - - print("Creating PRONOM zip...") - compression = zipfile.ZIP_DEFLATED if 'zlib' in sys.modules else zipfile.ZIP_STORED - modes = {zipfile.ZIP_DEFLATED: 'deflated', zipfile.ZIP_STORED: 'stored'} - zf = zipfile.ZipFile(os.path.join(CONFIG_DIR, defaults['pronomZipFileName'].format(currentVersion)), mode='w') - print("Adding files with compression mode", modes[compression]) - for puid in puids: - puidType, puidNum = puid.split("/") - puidFileName = "puid.{}.{}.xml".format(puidType, puidNum) - filename = os.path.join(tmpdir, puidFileName) - if os.path.isfile(filename): - zf.write(filename, arcname=puidFileName, compress_type=compression) - if defaults['deleteTempDirectory']: - os.unlink(filename) - zf.close() + sys.stderr.write("Failed to create temporary folder for PUID's, using: " + tmpdir) + download_signatures(defaults, puids, resume_download, tmpdir) + create_zip_file(defaults, puids, currentVersion, tmpdir) if defaults['deleteTempDirectory']: print("Deleting temporary folder and files...") rmtree(tmpdir, ignore_errors=True) - print('Updating versions.xml...') - versions = get_local_pronom_versions() - versions.pronom_version = str(currentVersion) - versions.pronom_signature = "formats-v" + str(currentVersion) + ".xml" - versions.pronom_container_signature = defaults['containerVersion'] - versions.fido_extension_signature = defaults['fidoSignatureVersion'] - versions.update_script = __version__ - versions.write() + update_versions_xml(defaults, currentVersion) # TODO: there should be a check here to handle prepare.main exit() signal (-1/0/1/...) print("Preparing to convert PRONOM formats to FIDO signatures...") @@ -166,14 +117,78 @@ def run(defaults=defaults): sys.exit('Aborting update...') +def download_signatures(defaults, puids, resume_download, tmpdir): + """Download PRONOM signatures and write to individual files.""" + print("Downloading signatures, one moment please...") + numberPuids = len(puids) + one_percent = (float(numberPuids) / 100) + numfiles = 0 + for puid in puids: + puidType, puidNum = puid.split("/") + puidFileName = "puid." + puidType + "." + puidNum + ".xml" + filename = os.path.join(tmpdir, puidFileName) + if os.path.isfile(filename) and check_well_formedness(filename) and resume_download: + numfiles += 1 + continue + puid_url = "http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid) + try: + filehandle = urlopen(puid_url) + except URLError as e: + sys.stderr.write("Failed to download signature file:" + puid_url) + sys.stderr.write("Error:" + str(e)) + sys.exit('Please restart and resume download.') + with open(filename, 'wb') as file_: + for lines in filehandle.readlines(): + file_.write(lines) + filehandle.close() + if not check_well_formedness(filename): + os.unlink(filename) + continue + numfiles += 1 + percent = int(float(numfiles) / one_percent) + print(r"{}/{} files [{}%]".format(numfiles, numberPuids, percent)) + time.sleep(defaults['http_throttle']) + print("100%") + + +def create_zip_file(defaults, puids, currentVersion, tmpdir): + """Create zip file of signatures.""" + print("Creating PRONOM zip...") + compression = zipfile.ZIP_DEFLATED if 'zlib' in sys.modules else zipfile.ZIP_STORED + modes = {zipfile.ZIP_DEFLATED: 'deflated', zipfile.ZIP_STORED: 'stored'} + zf = zipfile.ZipFile(os.path.join(CONFIG_DIR, defaults['pronomZipFileName'].format(currentVersion)), mode='w') + print("Adding files with compression mode", modes[compression]) + for puid in puids: + puidType, puidNum = puid.split("/") + puidFileName = "puid.{}.{}.xml".format(puidType, puidNum) + filename = os.path.join(tmpdir, puidFileName) + if os.path.isfile(filename): + zf.write(filename, arcname=puidFileName, compress_type=compression) + if defaults['deleteTempDirectory']: + os.unlink(filename) + zf.close() + + +def update_versions_xml(defaults, currentVersion): + """Create new versions identified sig XML file.""" + print('Updating versions.xml...') + versions = get_local_pronom_versions() + versions.pronom_version = str(currentVersion) + versions.pronom_signature = "formats-v" + str(currentVersion) + ".xml" + versions.pronom_container_signature = defaults['containerVersion'] + versions.fido_extension_signature = defaults['fidoSignatureVersion'] + versions.update_script = __version__ + versions.write() + + def main(): """Main CLI entrypoint.""" parser = ArgumentParser(description='Download and convert the latest PRONOM signatures') - parser.add_argument('-tmpdir', default=options['tmp_dir'], help='Location to store temporary files', dest='tmp_dir') - parser.add_argument('-keep_tmp', default=options['deleteTempDirectory'], help='Do not delete temporary files after completion', dest='deleteTempDirectory', action='store_false') - parser.add_argument('-http_throttle', default=options['http_throttle'], help='Time (in seconds) to wait between downloads', type=float, dest='http_throttle') + parser.add_argument('-tmpdir', default=OPTIONS['tmp_dir'], help='Location to store temporary files', dest='tmp_dir') + parser.add_argument('-keep_tmp', default=OPTIONS['deleteTempDirectory'], help='Do not delete temporary files after completion', dest='deleteTempDirectory', action='store_false') + parser.add_argument('-http_throttle', default=OPTIONS['http_throttle'], help='Time (in seconds) to wait between downloads', type=float, dest='http_throttle') args = parser.parse_args() - opts = defaults.copy() + opts = DEFAULTS.copy() opts.update(vars(args)) run(opts)