Skip to content

Commit

Permalink
refactor: rename flow io args
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiao committed May 1, 2020
1 parent c1f84a7 commit 77e49eb
Show file tree
Hide file tree
Showing 22 changed files with 109 additions and 81 deletions.
2 changes: 1 addition & 1 deletion README.de.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(raw_bytes=input_fn)
f.index(input_fn)
```

</td>
Expand Down
2 changes: 1 addition & 1 deletion README.fr.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(raw_bytes=input_fn)
f.index(input_fn)
```

</td>
Expand Down
2 changes: 1 addition & 1 deletion README.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(raw_bytes=input_fn)
f.index(input_fn)
```

</td>
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(raw_bytes=input_fn)
f.index(input_fn)
```

</td>
Expand Down
2 changes: 1 addition & 1 deletion README.ru.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(raw_bytes=input_fn)
f.index(input_fn)
```

</td>
Expand Down
2 changes: 1 addition & 1 deletion README.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(raw_bytes=input_fn)
f.index(input_fn)
```

</td>
Expand Down
10 changes: 5 additions & 5 deletions docs/chapters/flow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,18 @@ You can use `.index()`, `.search()` to feed index data and search query to a flo

```python
with f:
f.index(raw_bytes)
f.index(input_fn)
```

```python
with f:
f.search(raw_bytes, top_k=50, callback=print)
f.search(input_fn, top_k=50, output_fn=print)
```

- `raw_bytes` is `Iterator[bytes]`, each of which corresponds to a bytes representation of a Document.
- `callback` is the callback function after each request, take `Request` protobuf as the only input.
- `input_fn` is `Iterator[bytes]`, each of which corresponds to a bytes representation of a Document.
- `output_fn` is the callback function after each request, take `Request` protobuf as the only input.

A simple `raw_bytes` can be `input_fn` defined as follows:
A simple `input_fn` is defined as follows:

```python
def input_fn():
Expand Down
2 changes: 1 addition & 1 deletion docs/chapters/helloworld/main.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ And the implementation behind? As simple as it should be:
f = Flow.load_config('index.yml')
with f:
f.index(raw_bytes=input_fn)
f.index(input_fn)
.. confval:: YAML spec

Expand Down
8 changes: 4 additions & 4 deletions docs/chapters/io/main.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ with f:

```python
with f:
f.search(input_fn, top_k=50, callback=print)
f.search(input_fn, top_k=50, output_fn=print)
```

`input_fn` is `Iterator[bytes]`, each of which corresponds to a bytes representation of a Document.
Expand Down Expand Up @@ -147,7 +147,7 @@ For example, the following will print the request after a `IndexReqeust` is fini

```python
with f:
f.index(input_fn, callback=print)
f.index(input_fn, output_fn=print)
```

This is quite useful when debugging.
Expand All @@ -167,8 +167,8 @@ def print_html(resp):
```

```python
f.search(raw_bytes=input_fn,
callback=print_html, top_k=args.top_k, batch_size=args.query_batch_size)
f.search(input_fn,
output_fn=print_html, top_k=args.top_k, batch_size=args.query_batch_size)
```


Expand Down
2 changes: 1 addition & 1 deletion docs/chapters/remote/run-remote-pod-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ requests:
Locally, we write:
```python
with f:
f.index(raw_bytes=random_docs(1000), in_proto=True)
f.index(input_fn=random_docs(1000), in_proto=True)

def random_docs(num_docs, chunks_per_doc=5, embed_dim=10):
import numpy as np
Expand Down
2 changes: 1 addition & 1 deletion docs/chapters/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ from jina.flow import Flow
f = Flow().add(...)

with f:
f.index(raw_bytes, callback=print)
f.index(input_fn, output_fn=print)
```

which prints out the protobuf message directly in the console after each request.
Expand Down
28 changes: 14 additions & 14 deletions jina/clients/python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ def __init__(self, args: 'argparse.Namespace'):
:param args: args provided by the CLI
:param delay: if ``True`` then the client starts sending request after initializing, otherwise one needs to set
the :attr:`raw_bytes` before using :func:`start` or :func:`call`
the :attr:`input_fn` before using :func:`start` or :func:`call`
"""
super().__init__(args)
self._raw_bytes = None
self._input_fn = None

def call(self, callback: Callable[['jina_pb2.Message'], None] = None) -> None:
""" Calling the server, better use :func:`start` instead.
:param callback: a callback function, invoke after every response is received
"""
kwargs = vars(self.args)
kwargs['data'] = self.raw_bytes
kwargs['data'] = self.input_fn

from . import request
tname = self.args.mode
Expand All @@ -62,23 +62,23 @@ def call(self, callback: Callable[['jina_pb2.Message'], None] = None) -> None:
p_bar.update(self.args.batch_size)

@property
def raw_bytes(self) -> Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable]:
def input_fn(self) -> Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable]:
""" An iterator of bytes, each element represents a document's raw content,
i.e. ``raw_bytes`` defined int the protobuf
i.e. ``input_fn`` defined int the protobuf
"""
if self._raw_bytes:
return self._raw_bytes
if self._input_fn:
return self._input_fn
else:
raise BadClient('raw_bytes is empty or not set')
raise BadClient('input_fn is empty or not set')

@raw_bytes.setter
def raw_bytes(self, bytes_gen: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable]):
if self._raw_bytes:
self.logger.warning('raw_bytes is not empty, overrided')
@input_fn.setter
def input_fn(self, bytes_gen: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable]):
if self._input_fn:
self.logger.warning('input_fn is not empty, overrided')
if hasattr(bytes_gen, '__call__'):
self._raw_bytes = bytes_gen()
self._input_fn = bytes_gen()
else:
self._raw_bytes = bytes_gen
self._input_fn = bytes_gen

def dry_run(self) -> bool:
"""Send a DRYRUN request to the server, passing through all pods on the server
Expand Down
41 changes: 22 additions & 19 deletions jina/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .. import JINA_GLOBAL
from ..enums import FlowBuildLevel, FlowOptimizeLevel
from ..excepts import FlowTopologyError, FlowMissingPodError, FlowBuildLevelError, FlowConnectivityError
from ..helper import yaml, expand_env_var, get_non_defaults_args, get_parsed_args
from ..helper import yaml, expand_env_var, get_non_defaults_args, get_parsed_args, deprecated_alias
from ..logging import get_logger
from ..logging.sse import start_sse_logger
from ..peapods.pod import SocketType, FlowPod, GatewayFlowPod
Expand Down Expand Up @@ -516,7 +516,7 @@ def __eq__(self, other: 'Flow'):
return a._pod_nodes == b._pod_nodes

@build_required(FlowBuildLevel.GRAPH)
def _get_client(self, bytes_gen: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, **kwargs):
def _get_client(self, input_fn: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, **kwargs):
from ..main.parser import set_client_cli_parser
from ..clients.python import PyClient

Expand All @@ -525,12 +525,13 @@ def _get_client(self, bytes_gen: Union[Iterator['jina_pb2.Document'], Iterator[b
p_args.port_grpc = self._pod_nodes['gateway'].port_grpc
p_args.host = self._pod_nodes['gateway'].host
c = PyClient(p_args)
if bytes_gen:
c.raw_bytes = bytes_gen
if input_fn:
c.input_fn = input_fn
return c

def train(self, raw_bytes: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None,
callback: Callable[['jina_pb2.Message'], None] = None,
@deprecated_alias(raw_bytes='input_fn', callback='output_fn')
def train(self, input_fn: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None,
output_fn: Callable[['jina_pb2.Message'], None] = None,
**kwargs):
"""Do training on the current flow
Expand Down Expand Up @@ -564,14 +565,15 @@ def my_reader():
with f.build(runtime='thread') as flow:
flow.train(bytes_gen=my_reader())
:param raw_bytes: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param callback: the callback function to invoke after training
:param input_fn: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param output_fn: the callback function to invoke after training
:param kwargs: accepts all keyword arguments of `jina client` CLI
"""
self._get_client(raw_bytes, mode='train', **kwargs).start(callback)
self._get_client(input_fn, mode='train', **kwargs).start(output_fn)

def index(self, raw_bytes: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None,
callback: Callable[['jina_pb2.Message'], None] = None,
@deprecated_alias(raw_bytes='input_fn', callback='output_fn')
def index(self, input_fn: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None,
output_fn: Callable[['jina_pb2.Message'], None] = None,
**kwargs):
"""Do indexing on the current flow
Expand Down Expand Up @@ -605,14 +607,15 @@ def my_reader():
It will start a :py:class:`CLIClient` and call :py:func:`index`.
:param raw_bytes: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param callback: the callback function to invoke after indexing
:param input_fn: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param output_fn: the callback function to invoke after indexing
:param kwargs: accepts all keyword arguments of `jina client` CLI
"""
self._get_client(raw_bytes, mode='index', **kwargs).start(callback)
self._get_client(input_fn, mode='index', **kwargs).start(output_fn)

def search(self, raw_bytes: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None,
callback: Callable[['jina_pb2.Message'], None] = None,
@deprecated_alias(raw_bytes='input_fn', callback='output_fn')
def search(self, input_fn: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None,
output_fn: Callable[['jina_pb2.Message'], None] = None,
**kwargs):
"""Do indexing on the current flow
Expand Down Expand Up @@ -647,11 +650,11 @@ def my_reader():
with f.build(runtime='thread') as flow:
flow.search(bytes_gen=my_reader())
:param raw_bytes: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param callback: the callback function to invoke after searching
:param input_fn: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param output_fn: the callback function to invoke after searching
:param kwargs: accepts all keyword arguments of `jina client` CLI
"""
self._get_client(raw_bytes, mode='search', **kwargs).start(callback)
self._get_client(input_fn, mode='search', **kwargs).start(output_fn)

def dry_run(self, **kwargs):
"""Send a DRYRUN request to this flow, passing through all pods in this flow
Expand Down
6 changes: 3 additions & 3 deletions jina/helloworld/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ def hello_world(args):

f = Flow().load_config(args.index_yaml_path)
with f:
f.index(raw_bytes=input_fn(targets['index']['filename']), batch_size=args.index_batch_size)
f.index(input_fn(targets['index']['filename']), batch_size=args.index_batch_size)

countdown(8, reason=colored('behold! im going to switch to query mode', 'cyan',
attrs=['underline', 'bold', 'reverse']))

f = Flow().load_config(args.query_yaml_path)
with f:
f.search(raw_bytes=input_fn(targets['query']['filename'], index=False, num_doc=args.num_query),
callback=print_result, top_k=args.top_k, batch_size=args.query_batch_size)
f.search(input_fn(targets['query']['filename'], index=False, num_doc=args.num_query),
output_fn=print_result, top_k=args.top_k, batch_size=args.query_batch_size)

html_path = os.path.join(args.workdir, 'hello-world.html')
write_html(html_path)
27 changes: 26 additions & 1 deletion jina/helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
__copyright__ = "Copyright (c) 2020 Jina AI Limited. All rights reserved."
__license__ = "Apache-2.0"

import functools
import os
import random
import re
Expand All @@ -22,6 +23,30 @@
'colored', 'kwargs2list', 'valid_yaml_path']


def deprecated_alias(**aliases):
def deco(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
rename_kwargs(f.__name__, kwargs, aliases)
return f(*args, **kwargs)

return wrapper

return deco


def rename_kwargs(func_name, kwargs, aliases):
from .logging import default_logger
for alias, new in aliases.items():
if alias in kwargs:
if new in kwargs:
raise TypeError(f'{func_name} received both {alias} and {new}')
default_logger.warning(
f'"{alias}" is deprecated in "{func_name}()" '
f'and will be removed in the next version; please use "{new}" instead')
kwargs[new] = kwargs.pop(alias)


def get_readable_size(num_bytes):
if num_bytes < 1024:
return f'{num_bytes} Bytes'
Expand All @@ -30,7 +55,7 @@ def get_readable_size(num_bytes):
elif num_bytes < 1024 ** 3:
return f'{num_bytes / (1024 ** 2):.1f} MB'
else:
return f'{num_bytes // (1024 ** 3):.1f} GB'
return f'{num_bytes / (1024 ** 3):.1f} GB'


def print_load_table(load_stat):
Expand Down
4 changes: 2 additions & 2 deletions tests/executors/crafters/test_segmenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ def collect_chunk_id(self, req):
def test_dummy_seg(self):
f = Flow().add(yaml_path='DummySegment')
with f:
f.index(raw_bytes=random_docs(10), in_proto=True, callback=self.get_chunk_id)
f.index(input_fn=random_docs(10), in_proto=True, output_fn=self.get_chunk_id)

def test_dummy_seg_random(self):
f = Flow().add(yaml_path='../../yaml/dummy-seg-random.yml')
with f:
f.index(raw_bytes=random_docs(10), in_proto=True, callback=self.collect_chunk_id)
f.index(input_fn=random_docs(10), in_proto=True, output_fn=self.collect_chunk_id)
Loading

0 comments on commit 77e49eb

Please sign in to comment.