Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add a read-only IPFS client #557

Merged
merged 8 commits into from
Jan 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions sourmash/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from .logging import error, set_quiet

from .commands import (categorize, compare, compute, dump, import_csv,
gather, index, sbt_combine, search,
plot, watch, info, storage, migrate, multigather)
gather, index, sbt_combine, search, plot, watch,
info, storage, migrate, multigather, prepare)
from .lca import main as lca_main
from .sig import main as sig_main

Expand Down Expand Up @@ -64,7 +64,9 @@ def main():
'migrate': migrate,
'multigather': multigather,
'sig': sig_main,
'signature': sig_main}
'signature': sig_main,
'prepare': prepare
}
parser = argparse.ArgumentParser(
description='work with compressed biological sequence representations')
parser.add_argument('command', nargs='?')
Expand Down
59 changes: 59 additions & 0 deletions sourmash/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import os.path
import sys
import random
import tempfile
import tarfile

import screed
from .sourmash_args import SourmashArgumentParser
Expand Down Expand Up @@ -1402,3 +1404,60 @@ def migrate(args):

notify('saving SBT under "{}".', args.sbt_name)
tree.save(args.sbt_name, structure_only=True)


def prepare(args):
from .sbt import parse_backend_args
from .sbt_storage import FSStorage

parser = argparse.ArgumentParser()
parser.add_argument('sbt_name', help='name of SBT to prepare')
parser.add_argument('-x', help='new nodegraph size', default=1e5)
parser.add_argument('-b', "--backend", type=str,
help='Backend to convert to',
default='FSStorage')
args = parser.parse_args(args)

notify('saving SBT under "{}".', args.sbt_name)

backend, options = parse_backend_args(args.sbt_name, args.backend)

with backend(*options) as storage:
is_tarfile = False
try:
tarfile.is_tarfile(args.sbt_name)
is_tarfile = True
except IOError:
pass

if is_tarfile:
with tarfile.open(args.sbt_name, 'r') as t:
t.extractall('.')
args.sbt_name = next(f for f in t.getnames()
if f.endswith('.sbt.json'))
else:
with open(args.sbt_name, 'r:*') as f:
import json
temptree = json.load(f)

if ((temptree['storage']['backend'] == 'IPFSStorage') and
(backend == FSStorage) and
('preload' in temptree['storage']['args'])):
# Let's take a shortcut... The preload multihash contains the
# directory in the same structure FSStorage expects.
ipfs_args = temptree['storage']['args']
multihash = ipfs_args.pop('preload')

# TODO: in case the IPFS node is not available, need to
# fallback to read-only client
import ipfsapi
client = ipfsapi.connect(**ipfs_args)

dirpath = os.path.join(storage.location, storage.subdir)
with tempfile.TemporaryDirectory() as temp:
client.get(multihash, filepath=temp)
shutil.rmtree(dirpath)
shutil.move(os.path.join(temp, multihash), dirpath)

sbt = load_sbt_index(args.sbt_name, print_version_warning=False)
sbt.save(args.sbt_name, storage=storage)
95 changes: 75 additions & 20 deletions sourmash/sbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,8 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False):
'args': self.factory.init_args()
}

if not self.is_ready:
self._fill_internal()
if not self.is_ready and structure_only is False:
self._fill_internal_and_save(storage, sparseness)

nodes = {}
leaves = {}
Expand Down Expand Up @@ -445,8 +445,6 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False):

data['filename'] = node.save(data['filename'])

node.storage = storage
data['filename'] = node.save(data['filename'])
if isinstance(node, Node):
nodes[i] = data
else:
Expand Down Expand Up @@ -528,12 +526,15 @@ def _load_v1(jnodes, leaf_loader, dirname, storage, print_version_warning=True):
raise ValueError("Empty tree!")

sbt_nodes = {}
sbt_leaves = {}

max_node = 0

sample_bf = os.path.join(dirname, jnodes[0]['filename'])
ksize, tablesize, ntables = khmer.extract_nodegraph_info(sample_bf)[:3]
factory = GraphFactory(ksize, tablesize, ntables)

for i, jnode in enumerate(jnodes):
for k, jnode in enumerate(jnodes):
if jnode is None:
continue

Expand All @@ -542,13 +543,24 @@ def _load_v1(jnodes, leaf_loader, dirname, storage, print_version_warning=True):
if 'internal' in jnode['name']:
jnode['factory'] = factory
sbt_node = Node.load(jnode, storage)
sbt_nodes[k] = sbt_node
else:
sbt_node = leaf_loader(jnode, storage)
sbt_leaves[k] = sbt_node

sbt_nodes[i] = sbt_node
max_node = max(max_node, k)

tree = SBT(factory)
tree._nodes = sbt_nodes
tree._leaves = sbt_leaves
tree._missing_nodes = {i for i in range(max_node)
if i not in sbt_nodes and i not in sbt_leaves}

if print_version_warning:
error("WARNING: this is an old index version, please run `sourmash migrate` to update it.")
error("WARNING: proceeding with execution, but it will take longer to finish!")

tree._fill_min_n_below()

return tree

Expand All @@ -562,6 +574,8 @@ def _load_v2(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru
sbt_nodes = {}
sbt_leaves = {}

max_node = 0

sample_bf = os.path.join(dirname, nodes[0]['filename'])
k, size, ntables = khmer.extract_nodegraph_info(sample_bf)[:3]
factory = GraphFactory(k, size, ntables)
Expand All @@ -580,9 +594,19 @@ def _load_v2(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru
sbt_node = leaf_loader(node, storage)
sbt_leaves[k] = sbt_node

max_node = max(max_node, k)

tree = cls(factory, d=info['d'])
tree._nodes = sbt_nodes
tree._leaves = sbt_leaves
tree._missing_nodes = {i for i in range(max_node)
if i not in sbt_nodes and i not in sbt_leaves}

if print_version_warning:
error("WARNING: this is an old index version, please run `sourmash migrate` to update it.")
error("WARNING: proceeding with execution, but it will take longer to finish!")

tree._fill_min_n_below()

return tree

Expand Down Expand Up @@ -623,7 +647,7 @@ def _load_v3(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru
tree._nodes = sbt_nodes
tree._leaves = sbt_leaves
tree._missing_nodes = {i for i in range(max_node)
if i not in sbt_nodes and i not in sbt_leaves}
if i not in sbt_nodes and i not in sbt_leaves}

if print_version_warning:
error("WARNING: this is an old index version, please run `sourmash migrate` to update it.")
Expand Down Expand Up @@ -667,9 +691,7 @@ def _load_v4(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru
tree._nodes = sbt_nodes
tree._leaves = sbt_leaves
tree._missing_nodes = {i for i in range(max_node)
if i not in sbt_nodes and i not in sbt_leaves}

tree.next_node = max_node
if i not in sbt_nodes and i not in sbt_leaves}

if print_version_warning:
error("WARNING: this is an old index version, please run `sourmash migrate` to update it.")
Expand All @@ -679,7 +701,10 @@ def _load_v4(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru

@classmethod
def _load_v5(cls, info, leaf_loader, dirname, storage, print_version_warning=True):
nodes = {int(k): v for (k, v) in info['nodes'].items()}
nodes = {}
if 'nodes' in info:
nodes = {int(k): v for (k, v) in info['nodes'].items()}

leaves = {int(k): v for (k, v) in info['leaves'].items()}

if not leaves:
Expand Down Expand Up @@ -713,7 +738,7 @@ def _load_v5(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru
tree._nodes = sbt_nodes
tree._leaves = sbt_leaves
tree._missing_nodes = {i for i in range(max_node)
if i not in sbt_nodes and i not in sbt_leaves}
if i not in sbt_nodes and i not in sbt_leaves}

return tree

Expand Down Expand Up @@ -743,6 +768,24 @@ def fill_min_n_below(node, *args, **kwargs):

self._fill_up(fill_min_n_below)

def _fill_internal_and_save(self, storage, sparseness=0.0):

def fill_nodegraphs_and_save(node, *args, **kwargs):
children = kwargs['children']
for child in children:
if child.node is not None:
child.node.update(node)

if isinstance(node, Node) and random() - sparseness > 0:
child.node.storage = storage
child.node.save(os.path.basename(node.name))

child.node.unload()
return True

self._fill_up(fill_nodegraphs_and_save)
self.is_ready = True

def _fill_internal(self):

def fill_nodegraphs(node, *args, **kwargs):
Expand Down Expand Up @@ -944,6 +987,9 @@ def load(info, storage=None):
new_node.metadata = info.get('metadata', {})
return new_node

def unload(self):
pass

def update(self, parent):
parent.data.update(self.data)
if 'min_n_below' in self.metadata:
Expand Down Expand Up @@ -1008,6 +1054,9 @@ def load(cls, info, storage=None):
path=info['filename'],
storage=storage)

def unload(self):
pass


def filter_distance(filter_a, filter_b, n=1000):
"""
Expand Down Expand Up @@ -1040,20 +1089,18 @@ def filter_distance(filter_a, filter_b, n=1000):
return distance / (8.0 * len(A) * n)


def convert_cmd(name, backend):
from .sbtmh import SigLeaf

def parse_backend_args(name, backend):
options = backend.split('(')
backend = options.pop(0)
backend = backend.lower().strip("'")

if options:
print(options)
options = options[0].split(')')
options = [options.pop(0)]
#options = {}
print(options)
options = options[0].split(')')
options = [options.pop(0)]
#options = {}
else:
options = []
options = []

if backend.lower() in ('ipfs', 'ipfsstorage'):
backend = IPFSStorage
Expand All @@ -1076,6 +1123,14 @@ def convert_cmd(name, backend):
else:
error('backend not recognized: {}'.format(backend))

return backend, options


def convert_cmd(name, backend_args):
from .sbtmh import SigLeaf

backend, options = parse_backend_args(name, backend_args)

with backend(*options) as storage:
sbt = SBT.load(name, leaf_loader=SigLeaf.load)
sbt.save(name, storage=storage)
16 changes: 15 additions & 1 deletion sourmash/sbt_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import abc
from io import BytesIO
import os
import urllib
import tarfile


Expand Down Expand Up @@ -103,10 +104,23 @@ def __init__(self, pin_on_add=True, **kwargs):
import ipfsapi
self.ipfs_args = kwargs
self.pin_on_add = pin_on_add
self.api = ipfsapi.connect(**self.ipfs_args)
self.read_only = False

if 'preload' in self.ipfs_args:
del self.ipfs_args['preload']

try:
self.api = ipfsapi.connect(**self.ipfs_args)
except ipfsapi.exceptions.ConnectionError:
self.api = ipfsapi.connect('ipfs.io', 80)
self.read_only = True

def save(self, path, content):
# api.add_bytes(b"Mary had a little lamb")
if self.read_only:
raise NotImplementedError('This is a read-only client. '
'Start an IPFS node to be able to save '
'data.')
new_obj = self.api.add_bytes(content)
if self.pin_on_add:
self.api.pin_add(new_obj)
Expand Down
Loading