Skip to content

Commit

Permalink
Merge pull request #165 from openpreserve/fix/pronom-sig-update
Browse files Browse the repository at this point in the history
FIX - PRONOM sig update for Python 3
  • Loading branch information
carlwilson authored Oct 11, 2019
2 parents b424d3f + 8748a22 commit f3ea4a7
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 109 deletions.
34 changes: 32 additions & 2 deletions fido/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def save(self, dst=sys.stdout):
# if f.find('signature'):
root.append(f)
self.indent(root)
with open(dst, 'wb') as file_:
with open(dst, 'w') as file_:
# print >>out, ET.tostring(root,encoding='utf-8')
print(ET.tostring(root), file=file_)

Expand Down Expand Up @@ -319,7 +319,37 @@ def compare_formats(f1, f2):
if f1ID == f2ID:
return 0
return 1
return sorted(formatlist, cmp=compare_formats)
return sorted(formatlist, key=_cmp_to_key(compare_formats))


def _cmp_to_key(mycmp):
"""Convert a cmp= function into a key= function."""
# From https://docs.python.org/3/howto/sorting.html#sortinghowto
class K:
"""Wrapper class for comparator function."""

def __init__(self, obj, *_):
self.obj = obj

def __lt__(self, other):
return mycmp(self.obj, other.obj) < 0

def __gt__(self, other):
return mycmp(self.obj, other.obj) > 0

def __eq__(self, other):
return mycmp(self.obj, other.obj) == 0

def __le__(self, other):
return mycmp(self.obj, other.obj) <= 0

def __ge__(self, other):
return mycmp(self.obj, other.obj) >= 0

def __ne__(self, other):
return mycmp(self.obj, other.obj) != 0

return K


def fido_position(pronom_position):
Expand Down
84 changes: 42 additions & 42 deletions fido/pronomutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def check_well_formedness(filename, error=False):
"""
parser = ParserCreate()
try:
parser.ParseFile(open(filename, "r"))
parser.ParseFile(open(filename, "rb"))
except ExpatError as e:
if error is not False:
sys.stderr.write("check_well_formedness: %s: %s;\n" % (filename, e))
Expand Down Expand Up @@ -72,57 +72,57 @@ def get_pronom_signature(type_):
else:
sys.stderr.write("get_pronom_signature(): unknown type: " + type_)
return False
webservice = http_client.HTTP("www.nationalarchives.gov.uk")
webservice.putrequest("POST", "/pronom/service.asmx")
webservice.putheader("Host", "www.nationalarchives.gov.uk")
webservice.putheader("User-Agent", "PRONOM UTILS v{0} (OPF)".format(__version__))
webservice.putheader("Content-type", "text/xml; charset=\"UTF-8\"")
webservice.putheader("Content-length", "%d" % len(soapStr))
webservice.putheader("SOAPAction", soapAction)
headers = {
"Host": "www.nationalarchives.gov.uk",
"User-Agent": "PRONOM UTILS v{0} (OPF)".format(__version__),
"Content-type": "text/xml; charset=\"UTF-8\"",
"Content-length": "%d" % len(soapStr),
"SOAPAction": soapAction
}
connection = http_client.HTTPConnection("www.nationalarchives.gov.uk")
try:
webservice.endheaders()
connection.request("POST", "/pronom/service.asmx", soapStr, headers)
except Exception as e:
sys.stderr.write("get_pronom_signature(): failed to contact PRONOM;\n%s\n" % (e))
sys.exit()
webservice.send(soapStr)
statuscode, statusmessage, header = webservice.getreply()
if statuscode == 200:
xml = webservice.getfile()
if type_ == "version":
exp = re.compile(r"\<Version\>([0-9]{1,4})\<\/Version\>")
sigxml = exp.search(xml.read())
if len(sigxml.group(1)) > 0:
return int(sigxml.group(1))
else:
sys.stderr.write("get_pronom_signature(): could not parse VERSION from SOAP response: " + type_)
response = connection.getresponse()
if response.status != 200:
sys.stderr.write("get_pronom_signature(): webservice error: '" + str(response.status) + " " + response.reason + "'\n")
return False
xml = response.read().decode("utf-8")
if type_ == "version":
exp = re.compile(r"\<Version\>([0-9]{1,4})\<\/Version\>")
sigxml = exp.search(xml)
if len(sigxml.group(1)) > 0:
return int(sigxml.group(1))
else:
sys.stderr.write("get_pronom_signature(): could not parse VERSION from SOAP response: " + type_)
return False
if type_ == "file":
exp = re.compile(r"\<SignatureFile\>.*\<\/SignatureFile\>")
sigxml = exp.search(xml)
sigtxt = sigxml.group(0) if sigxml else ''
if len(sigtxt) > 0:
tmpfile = "./tmp_getPronomSignature.xml"
with open(tmpfile, 'w') as file_:
file_.write("""<?xml version="1.0" encoding="UTF-8"?>""" + "\n")
file_.write(sigtxt)
if not check_well_formedness(tmpfile):
os.unlink(tmpfile)
sys.stderr.write("get_pronom_signature(): signaturefile not well formed")
return False
if type_ == "file":
exp = re.compile(r"\<SignatureFile\>.*\<\/SignatureFile\>")
sigxml = exp.search(xml.read())
sigtxt = sigxml.group(0) if sigxml else ''
if len(sigtxt) > 0:
tmpfile = "./tmp_getPronomSignature.xml"
with open(tmpfile, 'wb') as file_:
file_.write("""<?xml version="1.0" encoding="UTF-8"?>""" + "\n")
file_.write(sigtxt)
if not check_well_formedness(tmpfile):
os.unlink(tmpfile)
sys.stderr.write("get_pronom_signature(): signaturefile not well formed")
return False
else:
os.unlink(tmpfile)
return """<?xml version="1.0" encoding="UTF-8"?>""" + "\n" + sigtxt
else:
sys.stderr.write("get_pronom_signature(): could not parse XML from SOAP response: " + type_)
return False
else:
sys.stderr.write("get_pronom_signature(): webservice error: '" + str(statuscode) + " " + statusmessage + "'\n")
return False
os.unlink(tmpfile)
return """<?xml version="1.0" encoding="UTF-8"?>""" + "\n" + sigtxt
else:
sys.stderr.write("get_pronom_signature(): could not parse XML from SOAP response: " + type_)
return False

sys.stderr.write("get_pronom_signature(): unexpected return")
return False
except Exception as e:
sys.stderr.write("get_pronom_signature(): unknown error: " + str(e))
return False
raise e


class LocalPronomVersions(object):
Expand Down
145 changes: 80 additions & 65 deletions fido/update_signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,35 @@
import zipfile

from six.moves.urllib.request import urlopen
from six.moves.urllib.error import URLError

from . import __version__, CONFIG_DIR, query_yes_no
from .prepare import run as prepare_pronom_to_fido
from .pronomutils import check_well_formedness, get_local_pronom_versions, get_pronom_signature


defaults = {
DEFAULTS = {
'signatureFileName': 'DROID_SignatureFile-v{0}.xml',
'pronomZipFileName': 'pronom-xml-v{0}.zip',
'fidoSignatureVersion': 'format_extensions.xml',
'containerVersion': 'container-signature-20160121.xml', # container version is frozen and needs human attention before updating,
}

options = {
OPTIONS = {
'http_throttle': 0.5, # in secs, to prevent DoS of PRONOM server
'tmp_dir': os.path.join(CONFIG_DIR, 'tmp'),
'deleteTempDirectory': True,
}


def run(defaults=defaults):
def run(defaults=None):
"""
Update PRONOM signatures.
Interactive script, requires keyboard input.
"""
print("FIDO signature updater v{}".format(__version__))

defaults = defaults or DEFAULTS
try:
print("Contacting PRONOM...")
currentVersion = get_pronom_signature("version")
Expand All @@ -70,21 +71,21 @@ def run(defaults=defaults):
if not currentFile:
sys.exit('Failed to obtain PRONOM signature file, please try again.')
print("Writing {0}...".format(defaults['signatureFileName'].format(currentVersion)))
with open(signatureFile, 'wb') as file_:
with open(signatureFile, 'w') as file_:
file_.write(currentFile)

print("Extracting PRONOM PUID's from signature file...")
tree = CET.parse(signatureFile)
puids = []
for node in tree.iter("{http://www.nationalarchives.gov.uk/pronom/SignatureFile}FileFormat"):
puids.append(node.get("PUID"))
numberPuids = len(puids)
print("Found {} PRONOM PUID's".format(numberPuids))
print("Found {} PRONOM PUID's".format(len(puids)))

print("Downloading signatures can take a while")
if not query_yes_no("Continue and download signatures?"):
sys.exit('Aborting update...')
tmpdir = defaults['tmp_dir']
resume_download = False
if os.path.isdir(tmpdir):
print("Found previously created temporary folder for download:", tmpdir)
resume_download = query_yes_no('Do you want to resume download (yes) or start over (no)?')
Expand All @@ -97,65 +98,15 @@ def run(defaults=defaults):
except OSError:
pass
if not os.path.isdir(tmpdir):
print("Failed to create temporary folder for PUID's, using", tmpdir)

print("Downloading signatures, one moment please...")
one_percent = (float(numberPuids) / 100)
numfiles = 0
for puid in puids:
puidType, puidNum = puid.split("/")
puidFileName = "puid." + puidType + "." + puidNum + ".xml"
filename = os.path.join(tmpdir, puidFileName)
if os.path.isfile(filename) and check_well_formedness(filename) and resume_download:
numfiles += 1
continue
puid_url = "http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid)
try:
filehandle = urlopen(puid_url)
except Exception as e:
print("Failed to download signature file:", puid_url)
print("Error:", str(e))
sys.exit('Please restart and resume download.')
with open(filename, 'wb') as file_:
for lines in filehandle.readlines():
file_.write(lines)
filehandle.close()
if not check_well_formedness(filename):
os.unlink(filename)
continue
numfiles += 1
percent = int(float(numfiles) / one_percent)
print(r"{}/{} files [{}%]".format(numfiles, numberPuids, percent))
time.sleep(defaults['http_throttle'])
print("100%")

print("Creating PRONOM zip...")
compression = zipfile.ZIP_DEFLATED if 'zlib' in sys.modules else zipfile.ZIP_STORED
modes = {zipfile.ZIP_DEFLATED: 'deflated', zipfile.ZIP_STORED: 'stored'}
zf = zipfile.ZipFile(os.path.join(CONFIG_DIR, defaults['pronomZipFileName'].format(currentVersion)), mode='w')
print("Adding files with compression mode", modes[compression])
for puid in puids:
puidType, puidNum = puid.split("/")
puidFileName = "puid.{}.{}.xml".format(puidType, puidNum)
filename = os.path.join(tmpdir, puidFileName)
if os.path.isfile(filename):
zf.write(filename, arcname=puidFileName, compress_type=compression)
if defaults['deleteTempDirectory']:
os.unlink(filename)
zf.close()
sys.stderr.write("Failed to create temporary folder for PUID's, using: " + tmpdir)

download_signatures(defaults, puids, resume_download, tmpdir)
create_zip_file(defaults, puids, currentVersion, tmpdir)
if defaults['deleteTempDirectory']:
print("Deleting temporary folder and files...")
rmtree(tmpdir, ignore_errors=True)

print('Updating versions.xml...')
versions = get_local_pronom_versions()
versions.pronom_version = str(currentVersion)
versions.pronom_signature = "formats-v" + str(currentVersion) + ".xml"
versions.pronom_container_signature = defaults['containerVersion']
versions.fido_extension_signature = defaults['fidoSignatureVersion']
versions.update_script = __version__
versions.write()
update_versions_xml(defaults, currentVersion)

# TODO: there should be a check here to handle prepare.main exit() signal (-1/0/1/...)
print("Preparing to convert PRONOM formats to FIDO signatures...")
Expand All @@ -166,14 +117,78 @@ def run(defaults=defaults):
sys.exit('Aborting update...')


def download_signatures(defaults, puids, resume_download, tmpdir):
"""Download PRONOM signatures and write to individual files."""
print("Downloading signatures, one moment please...")
numberPuids = len(puids)
one_percent = (float(numberPuids) / 100)
numfiles = 0
for puid in puids:
puidType, puidNum = puid.split("/")
puidFileName = "puid." + puidType + "." + puidNum + ".xml"
filename = os.path.join(tmpdir, puidFileName)
if os.path.isfile(filename) and check_well_formedness(filename) and resume_download:
numfiles += 1
continue
puid_url = "http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid)
try:
filehandle = urlopen(puid_url)
except URLError as e:
sys.stderr.write("Failed to download signature file:" + puid_url)
sys.stderr.write("Error:" + str(e))
sys.exit('Please restart and resume download.')
with open(filename, 'wb') as file_:
for lines in filehandle.readlines():
file_.write(lines)
filehandle.close()
if not check_well_formedness(filename):
os.unlink(filename)
continue
numfiles += 1
percent = int(float(numfiles) / one_percent)
print(r"{}/{} files [{}%]".format(numfiles, numberPuids, percent))
time.sleep(defaults['http_throttle'])
print("100%")


def create_zip_file(defaults, puids, currentVersion, tmpdir):
"""Create zip file of signatures."""
print("Creating PRONOM zip...")
compression = zipfile.ZIP_DEFLATED if 'zlib' in sys.modules else zipfile.ZIP_STORED
modes = {zipfile.ZIP_DEFLATED: 'deflated', zipfile.ZIP_STORED: 'stored'}
zf = zipfile.ZipFile(os.path.join(CONFIG_DIR, defaults['pronomZipFileName'].format(currentVersion)), mode='w')
print("Adding files with compression mode", modes[compression])
for puid in puids:
puidType, puidNum = puid.split("/")
puidFileName = "puid.{}.{}.xml".format(puidType, puidNum)
filename = os.path.join(tmpdir, puidFileName)
if os.path.isfile(filename):
zf.write(filename, arcname=puidFileName, compress_type=compression)
if defaults['deleteTempDirectory']:
os.unlink(filename)
zf.close()


def update_versions_xml(defaults, currentVersion):
"""Create new versions identified sig XML file."""
print('Updating versions.xml...')
versions = get_local_pronom_versions()
versions.pronom_version = str(currentVersion)
versions.pronom_signature = "formats-v" + str(currentVersion) + ".xml"
versions.pronom_container_signature = defaults['containerVersion']
versions.fido_extension_signature = defaults['fidoSignatureVersion']
versions.update_script = __version__
versions.write()


def main():
"""Main CLI entrypoint."""
parser = ArgumentParser(description='Download and convert the latest PRONOM signatures')
parser.add_argument('-tmpdir', default=options['tmp_dir'], help='Location to store temporary files', dest='tmp_dir')
parser.add_argument('-keep_tmp', default=options['deleteTempDirectory'], help='Do not delete temporary files after completion', dest='deleteTempDirectory', action='store_false')
parser.add_argument('-http_throttle', default=options['http_throttle'], help='Time (in seconds) to wait between downloads', type=float, dest='http_throttle')
parser.add_argument('-tmpdir', default=OPTIONS['tmp_dir'], help='Location to store temporary files', dest='tmp_dir')
parser.add_argument('-keep_tmp', default=OPTIONS['deleteTempDirectory'], help='Do not delete temporary files after completion', dest='deleteTempDirectory', action='store_false')
parser.add_argument('-http_throttle', default=OPTIONS['http_throttle'], help='Time (in seconds) to wait between downloads', type=float, dest='http_throttle')
args = parser.parse_args()
opts = defaults.copy()
opts = DEFAULTS.copy()
opts.update(vars(args))

run(opts)
Expand Down

0 comments on commit f3ea4a7

Please sign in to comment.