diff --git a/README.de.md b/README.de.md index d26c852bd3a56..1a92e49067a15 100644 --- a/README.de.md +++ b/README.de.md @@ -144,7 +144,7 @@ from jina.flow import Flow f = Flow.load_config('index.yml') with f: - f.index(raw_bytes=input_fn) + f.index(input_fn) ``` diff --git a/README.fr.md b/README.fr.md index c8cd7e42183f8..8908b46ae89bf 100644 --- a/README.fr.md +++ b/README.fr.md @@ -144,7 +144,7 @@ from jina.flow import Flow f = Flow.load_config('index.yml') with f: - f.index(raw_bytes=input_fn) + f.index(input_fn) ``` diff --git a/README.ja.md b/README.ja.md index 986b41872645c..85cceee3ae178 100644 --- a/README.ja.md +++ b/README.ja.md @@ -144,7 +144,7 @@ from jina.flow import Flow f = Flow.load_config('index.yml') with f: - f.index(raw_bytes=input_fn) + f.index(input_fn) ``` diff --git a/README.md b/README.md index 05289050e1726..d69b4e8bbaa89 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,7 @@ from jina.flow import Flow f = Flow.load_config('index.yml') with f: - f.index(raw_bytes=input_fn) + f.index(input_fn) ``` diff --git a/README.ru.md b/README.ru.md index 11f97eaa15b91..e0530aa44e59a 100644 --- a/README.ru.md +++ b/README.ru.md @@ -144,7 +144,7 @@ from jina.flow import Flow f = Flow.load_config('index.yml') with f: - f.index(raw_bytes=input_fn) + f.index(input_fn) ``` diff --git a/README.zh.md b/README.zh.md index 4cd080c6d9b7f..57b1e55647495 100644 --- a/README.zh.md +++ b/README.zh.md @@ -144,7 +144,7 @@ from jina.flow import Flow f = Flow.load_config('index.yml') with f: - f.index(raw_bytes=input_fn) + f.index(input_fn) ``` diff --git a/docs/chapters/flow/README.md b/docs/chapters/flow/README.md index cf2cdd23892ec..ecc03fe9ce2a8 100644 --- a/docs/chapters/flow/README.md +++ b/docs/chapters/flow/README.md @@ -177,18 +177,18 @@ You can use `.index()`, `.search()` to feed index data and search query to a flo ```python with f: - f.index(raw_bytes) + f.index(input_fn) ``` ```python with f: - f.search(raw_bytes, top_k=50, callback=print) + f.search(input_fn, top_k=50, output_fn=print) ``` -- `raw_bytes` is `Iterator[bytes]`, each of which corresponds to a bytes representation of a Document. -- `callback` is the callback function after each request, take `Request` protobuf as the only input. +- `input_fn` is `Iterator[bytes]`, each of which corresponds to a bytes representation of a Document. +- `output_fn` is the callback function after each request, take `Request` protobuf as the only input. -A simple `raw_bytes` can be `input_fn` defined as follows: +A simple `input_fn` is defined as follows: ```python def input_fn(): diff --git a/docs/chapters/helloworld/main.rst b/docs/chapters/helloworld/main.rst index 4246b21322f24..6720875fee62d 100644 --- a/docs/chapters/helloworld/main.rst +++ b/docs/chapters/helloworld/main.rst @@ -43,7 +43,7 @@ And the implementation behind? As simple as it should be: f = Flow.load_config('index.yml') with f: - f.index(raw_bytes=input_fn) + f.index(input_fn) .. confval:: YAML spec diff --git a/docs/chapters/io/main.md b/docs/chapters/io/main.md index e4f421b3036de..7042ef91ba1ea 100644 --- a/docs/chapters/io/main.md +++ b/docs/chapters/io/main.md @@ -18,7 +18,7 @@ with f: ```python with f: - f.search(input_fn, top_k=50, callback=print) + f.search(input_fn, top_k=50, output_fn=print) ``` `input_fn` is `Iterator[bytes]`, each of which corresponds to a bytes representation of a Document. @@ -147,7 +147,7 @@ For example, the following will print the request after a `IndexReqeust` is fini ```python with f: - f.index(input_fn, callback=print) + f.index(input_fn, output_fn=print) ``` This is quite useful when debugging. @@ -167,8 +167,8 @@ def print_html(resp): ``` ```python -f.search(raw_bytes=input_fn, - callback=print_html, top_k=args.top_k, batch_size=args.query_batch_size) +f.search(input_fn, + output_fn=print_html, top_k=args.top_k, batch_size=args.query_batch_size) ``` diff --git a/docs/chapters/remote/run-remote-pod-flow.md b/docs/chapters/remote/run-remote-pod-flow.md index fac1b4b8a76b2..677e7abdc5e57 100644 --- a/docs/chapters/remote/run-remote-pod-flow.md +++ b/docs/chapters/remote/run-remote-pod-flow.md @@ -80,7 +80,7 @@ requests: Locally, we write: ```python with f: - f.index(raw_bytes=random_docs(1000), in_proto=True) + f.index(input_fn=random_docs(1000), in_proto=True) def random_docs(num_docs, chunks_per_doc=5, embed_dim=10): import numpy as np diff --git a/docs/chapters/troubleshooting.md b/docs/chapters/troubleshooting.md index cab5011429f95..ac6273578de6d 100644 --- a/docs/chapters/troubleshooting.md +++ b/docs/chapters/troubleshooting.md @@ -23,7 +23,7 @@ from jina.flow import Flow f = Flow().add(...) with f: - f.index(raw_bytes, callback=print) + f.index(input_fn, output_fn=print) ``` which prints out the protobuf message directly in the console after each request. diff --git a/jina/clients/python/__init__.py b/jina/clients/python/__init__.py index 519905e8d4f2c..d50dc4538ccc6 100644 --- a/jina/clients/python/__init__.py +++ b/jina/clients/python/__init__.py @@ -33,10 +33,10 @@ def __init__(self, args: 'argparse.Namespace'): :param args: args provided by the CLI :param delay: if ``True`` then the client starts sending request after initializing, otherwise one needs to set - the :attr:`raw_bytes` before using :func:`start` or :func:`call` + the :attr:`input_fn` before using :func:`start` or :func:`call` """ super().__init__(args) - self._raw_bytes = None + self._input_fn = None def call(self, callback: Callable[['jina_pb2.Message'], None] = None) -> None: """ Calling the server, better use :func:`start` instead. @@ -44,7 +44,7 @@ def call(self, callback: Callable[['jina_pb2.Message'], None] = None) -> None: :param callback: a callback function, invoke after every response is received """ kwargs = vars(self.args) - kwargs['data'] = self.raw_bytes + kwargs['data'] = self.input_fn from . import request tname = self.args.mode @@ -62,23 +62,23 @@ def call(self, callback: Callable[['jina_pb2.Message'], None] = None) -> None: p_bar.update(self.args.batch_size) @property - def raw_bytes(self) -> Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable]: + def input_fn(self) -> Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable]: """ An iterator of bytes, each element represents a document's raw content, - i.e. ``raw_bytes`` defined int the protobuf + i.e. ``input_fn`` defined int the protobuf """ - if self._raw_bytes: - return self._raw_bytes + if self._input_fn: + return self._input_fn else: - raise BadClient('raw_bytes is empty or not set') + raise BadClient('input_fn is empty or not set') - @raw_bytes.setter - def raw_bytes(self, bytes_gen: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable]): - if self._raw_bytes: - self.logger.warning('raw_bytes is not empty, overrided') + @input_fn.setter + def input_fn(self, bytes_gen: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable]): + if self._input_fn: + self.logger.warning('input_fn is not empty, overrided') if hasattr(bytes_gen, '__call__'): - self._raw_bytes = bytes_gen() + self._input_fn = bytes_gen() else: - self._raw_bytes = bytes_gen + self._input_fn = bytes_gen def dry_run(self) -> bool: """Send a DRYRUN request to the server, passing through all pods on the server diff --git a/jina/flow/__init__.py b/jina/flow/__init__.py index 250e1429ce176..985b14fa92521 100644 --- a/jina/flow/__init__.py +++ b/jina/flow/__init__.py @@ -17,7 +17,7 @@ from .. import JINA_GLOBAL from ..enums import FlowBuildLevel, FlowOptimizeLevel from ..excepts import FlowTopologyError, FlowMissingPodError, FlowBuildLevelError, FlowConnectivityError -from ..helper import yaml, expand_env_var, get_non_defaults_args, get_parsed_args +from ..helper import yaml, expand_env_var, get_non_defaults_args, get_parsed_args, deprecated_alias from ..logging import get_logger from ..logging.sse import start_sse_logger from ..peapods.pod import SocketType, FlowPod, GatewayFlowPod @@ -516,7 +516,7 @@ def __eq__(self, other: 'Flow'): return a._pod_nodes == b._pod_nodes @build_required(FlowBuildLevel.GRAPH) - def _get_client(self, bytes_gen: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, **kwargs): + def _get_client(self, input_fn: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, **kwargs): from ..main.parser import set_client_cli_parser from ..clients.python import PyClient @@ -525,12 +525,13 @@ def _get_client(self, bytes_gen: Union[Iterator['jina_pb2.Document'], Iterator[b p_args.port_grpc = self._pod_nodes['gateway'].port_grpc p_args.host = self._pod_nodes['gateway'].host c = PyClient(p_args) - if bytes_gen: - c.raw_bytes = bytes_gen + if input_fn: + c.input_fn = input_fn return c - def train(self, raw_bytes: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, - callback: Callable[['jina_pb2.Message'], None] = None, + @deprecated_alias(raw_bytes='input_fn', callback='output_fn') + def train(self, input_fn: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, + output_fn: Callable[['jina_pb2.Message'], None] = None, **kwargs): """Do training on the current flow @@ -564,14 +565,15 @@ def my_reader(): with f.build(runtime='thread') as flow: flow.train(bytes_gen=my_reader()) - :param raw_bytes: An iterator of bytes. If not given, then you have to specify it in `kwargs`. - :param callback: the callback function to invoke after training + :param input_fn: An iterator of bytes. If not given, then you have to specify it in `kwargs`. + :param output_fn: the callback function to invoke after training :param kwargs: accepts all keyword arguments of `jina client` CLI """ - self._get_client(raw_bytes, mode='train', **kwargs).start(callback) + self._get_client(input_fn, mode='train', **kwargs).start(output_fn) - def index(self, raw_bytes: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, - callback: Callable[['jina_pb2.Message'], None] = None, + @deprecated_alias(raw_bytes='input_fn', callback='output_fn') + def index(self, input_fn: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, + output_fn: Callable[['jina_pb2.Message'], None] = None, **kwargs): """Do indexing on the current flow @@ -605,14 +607,15 @@ def my_reader(): It will start a :py:class:`CLIClient` and call :py:func:`index`. - :param raw_bytes: An iterator of bytes. If not given, then you have to specify it in `kwargs`. - :param callback: the callback function to invoke after indexing + :param input_fn: An iterator of bytes. If not given, then you have to specify it in `kwargs`. + :param output_fn: the callback function to invoke after indexing :param kwargs: accepts all keyword arguments of `jina client` CLI """ - self._get_client(raw_bytes, mode='index', **kwargs).start(callback) + self._get_client(input_fn, mode='index', **kwargs).start(output_fn) - def search(self, raw_bytes: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, - callback: Callable[['jina_pb2.Message'], None] = None, + @deprecated_alias(raw_bytes='input_fn', callback='output_fn') + def search(self, input_fn: Union[Iterator['jina_pb2.Document'], Iterator[bytes], Callable] = None, + output_fn: Callable[['jina_pb2.Message'], None] = None, **kwargs): """Do indexing on the current flow @@ -647,11 +650,11 @@ def my_reader(): with f.build(runtime='thread') as flow: flow.search(bytes_gen=my_reader()) - :param raw_bytes: An iterator of bytes. If not given, then you have to specify it in `kwargs`. - :param callback: the callback function to invoke after searching + :param input_fn: An iterator of bytes. If not given, then you have to specify it in `kwargs`. + :param output_fn: the callback function to invoke after searching :param kwargs: accepts all keyword arguments of `jina client` CLI """ - self._get_client(raw_bytes, mode='search', **kwargs).start(callback) + self._get_client(input_fn, mode='search', **kwargs).start(output_fn) def dry_run(self, **kwargs): """Send a DRYRUN request to this flow, passing through all pods in this flow diff --git a/jina/helloworld/__init__.py b/jina/helloworld/__init__.py index 857475da39f40..d29a95062a22e 100644 --- a/jina/helloworld/__init__.py +++ b/jina/helloworld/__init__.py @@ -45,15 +45,15 @@ def hello_world(args): f = Flow().load_config(args.index_yaml_path) with f: - f.index(raw_bytes=input_fn(targets['index']['filename']), batch_size=args.index_batch_size) + f.index(input_fn(targets['index']['filename']), batch_size=args.index_batch_size) countdown(8, reason=colored('behold! im going to switch to query mode', 'cyan', attrs=['underline', 'bold', 'reverse'])) f = Flow().load_config(args.query_yaml_path) with f: - f.search(raw_bytes=input_fn(targets['query']['filename'], index=False, num_doc=args.num_query), - callback=print_result, top_k=args.top_k, batch_size=args.query_batch_size) + f.search(input_fn(targets['query']['filename'], index=False, num_doc=args.num_query), + output_fn=print_result, top_k=args.top_k, batch_size=args.query_batch_size) html_path = os.path.join(args.workdir, 'hello-world.html') write_html(html_path) diff --git a/jina/helper.py b/jina/helper.py index 2a94f3adc0b2a..95fc41286f03a 100644 --- a/jina/helper.py +++ b/jina/helper.py @@ -1,6 +1,7 @@ __copyright__ = "Copyright (c) 2020 Jina AI Limited. All rights reserved." __license__ = "Apache-2.0" +import functools import os import random import re @@ -22,6 +23,30 @@ 'colored', 'kwargs2list', 'valid_yaml_path'] +def deprecated_alias(**aliases): + def deco(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + rename_kwargs(f.__name__, kwargs, aliases) + return f(*args, **kwargs) + + return wrapper + + return deco + + +def rename_kwargs(func_name, kwargs, aliases): + from .logging import default_logger + for alias, new in aliases.items(): + if alias in kwargs: + if new in kwargs: + raise TypeError(f'{func_name} received both {alias} and {new}') + default_logger.warning( + f'"{alias}" is deprecated in "{func_name}()" ' + f'and will be removed in the next version; please use "{new}" instead') + kwargs[new] = kwargs.pop(alias) + + def get_readable_size(num_bytes): if num_bytes < 1024: return f'{num_bytes} Bytes' @@ -30,7 +55,7 @@ def get_readable_size(num_bytes): elif num_bytes < 1024 ** 3: return f'{num_bytes / (1024 ** 2):.1f} MB' else: - return f'{num_bytes // (1024 ** 3):.1f} GB' + return f'{num_bytes / (1024 ** 3):.1f} GB' def print_load_table(load_stat): diff --git a/tests/executors/crafters/test_segmenter.py b/tests/executors/crafters/test_segmenter.py index 47f86e354bce1..d219382ca7568 100644 --- a/tests/executors/crafters/test_segmenter.py +++ b/tests/executors/crafters/test_segmenter.py @@ -29,9 +29,9 @@ def collect_chunk_id(self, req): def test_dummy_seg(self): f = Flow().add(yaml_path='DummySegment') with f: - f.index(raw_bytes=random_docs(10), in_proto=True, callback=self.get_chunk_id) + f.index(input_fn=random_docs(10), in_proto=True, output_fn=self.get_chunk_id) def test_dummy_seg_random(self): f = Flow().add(yaml_path='../../yaml/dummy-seg-random.yml') with f: - f.index(raw_bytes=random_docs(10), in_proto=True, callback=self.collect_chunk_id) + f.index(input_fn=random_docs(10), in_proto=True, output_fn=self.collect_chunk_id) diff --git a/tests/test_container.py b/tests/test_container.py index 71b01c548e62d..3e8ba83d60182 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -76,14 +76,14 @@ def test_flow_with_one_container_pod(self): .add(name='dummyEncoder', image=img_name)) with f: - f.index(raw_bytes=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) def test_flow_with_one_container_ext_yaml(self): f = (Flow() .add(name='dummyEncoder', image=img_name, yaml_path='./mwu-encoder/mwu_encoder_ext.yml')) with f: - f.index(raw_bytes=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) def test_flow_with_replica_container_ext_yaml(self): f = (Flow() @@ -93,9 +93,9 @@ def test_flow_with_replica_container_ext_yaml(self): replicas=3)) with f: - f.index(raw_bytes=random_docs(10), in_proto=True) - f.index(raw_bytes=random_docs(10), in_proto=True) - f.index(raw_bytes=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) def test_flow_topo1(self): f = (Flow() @@ -106,7 +106,7 @@ def test_flow_topo1(self): .join(['d3', 'd2'])) with f: - f.index(raw_bytes=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) def test_flow_topo_mixed(self): f = (Flow() @@ -118,7 +118,7 @@ def test_flow_topo_mixed(self): ) with f: - f.index(raw_bytes=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) def test_flow_topo_replicas(self): f = (Flow() @@ -131,14 +131,14 @@ def test_flow_topo_replicas(self): with f: f.dry_run() - f.index(raw_bytes=random_docs(1000), in_proto=True) + f.index(input_fn=random_docs(1000), in_proto=True) def test_container_volume(self): f = (Flow() .add(name='dummyEncoder', image=img_name, volumes='./abc', yaml_path='mwu-encoder/mwu_encoder_upd.yml')) with f: - f.index(raw_bytes=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) out_file = './abc/ext-mwu-encoder.bin' self.assertTrue(os.path.exists(out_file)) diff --git a/tests/test_flow.py b/tests/test_flow.py index d2f78120fb58b..872a6f417043f 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -91,14 +91,14 @@ def bytes_fn(): .add(yaml_path='_forward')) with f: - f.index(raw_bytes=bytes_gen) + f.index(input_fn=bytes_gen) with f: - f.index(raw_bytes=bytes_fn) + f.index(input_fn=bytes_fn) with f: - f.index(raw_bytes=bytes_fn) - f.index(raw_bytes=bytes_fn) + f.index(input_fn=bytes_fn) + f.index(input_fn=bytes_fn) def test_load_flow_from_yaml(self): with open('yaml/test-flow.yml') as fp: @@ -144,7 +144,7 @@ def test_flow_no_container(self): .add(name='dummyEncoder', yaml_path='mwu-encoder/mwu_encoder.yml')) with f: - f.index(raw_bytes=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) def test_flow_yaml_dump(self): f = Flow(logserver_config='yaml/test-server-config.yml', diff --git a/tests/test_index.py b/tests/test_index.py index 60bc3b4126255..855efa1e7f420 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -87,7 +87,7 @@ def test_doc_iters(self): def test_simple_route(self): f = Flow().add(yaml_path='_forward') with f: - f.index(raw_bytes=random_docs(10), in_proto=True) + f.index(input_fn=random_docs(10), in_proto=True) def test_update_method(self): a = DummyIndexer(index_filename='test.bin') @@ -119,7 +119,7 @@ def test_two_client_route_replicas(self): # f3 = Flow(optimize_level=FlowOptimizeLevel.FULL).add(yaml_path='_forward', replicas=3) def start_client(fl): - fl.index(raw_bytes=random_docs(10), in_proto=True) + fl.index(input_fn=random_docs(10), in_proto=True) with f1: self.assertEqual(f1.num_peas, 6) @@ -151,7 +151,7 @@ def test_two_client_route(self): f = Flow().add(yaml_path='_forward') def start_client(fl): - fl.index(raw_bytes=random_docs(10), in_proto=True) + fl.index(input_fn=random_docs(10), in_proto=True) with f: t1 = mp.Process(target=start_client, args=(f,)) @@ -166,7 +166,7 @@ def start_client(fl): def test_index(self): f = Flow().add(yaml_path='yaml/test-index.yml', replicas=3, separated_workspace=True) with f: - f.index(raw_bytes=random_docs(1000), in_proto=True) + f.index(input_fn=random_docs(1000), in_proto=True) for j in range(3): self.assertTrue(os.path.exists(f'test2-{j + 1}/test2.bin')) @@ -175,7 +175,7 @@ def test_index(self): time.sleep(3) with f: - f.search(raw_bytes=random_docs(1), in_proto=True, callback=get_result, top_k=100) + f.search(input_fn=random_docs(1), in_proto=True, output_fn=get_result, top_k=100) if __name__ == '__main__': diff --git a/tests/test_index_remote.py b/tests/test_index_remote.py index 7175c8238e90a..12b81e5180677 100644 --- a/tests/test_index_remote.py +++ b/tests/test_index_remote.py @@ -97,7 +97,7 @@ def start_gateway(): host='localhost', port_grpc=f_args.port_grpc) with f: - f.index(raw_bytes=random_docs(1000), in_proto=True) + f.index(input_fn=random_docs(1000), in_proto=True) time.sleep(3) for j in range(3): @@ -122,7 +122,7 @@ def start_gateway(): host='192.168.31.76', port_grpc=44444)) with f: - f.index(raw_bytes=random_docs(1000), in_proto=True) + f.index(input_fn=random_docs(1000), in_proto=True) if __name__ == '__main__': diff --git a/tests/test_loadbalance.py b/tests/test_loadbalance.py index 232153194a9a3..6c734b0611113 100644 --- a/tests/test_loadbalance.py +++ b/tests/test_loadbalance.py @@ -55,7 +55,7 @@ def test_lb(self): yaml_path='SlowWorker', replicas=10) with f: - f.index(raw_bytes=random_docs(100), in_proto=True, batch_size=10) + f.index(input_fn=random_docs(100), in_proto=True, batch_size=10) def test_roundrobin(self): f = Flow(runtime='process').add( @@ -63,4 +63,4 @@ def test_roundrobin(self): yaml_path='SlowWorker', replicas=10, scheduling=SchedulerType.ROUND_ROBIN) with f: - f.index(raw_bytes=random_docs(100), in_proto=True, batch_size=10) + f.index(input_fn=random_docs(100), in_proto=True, batch_size=10) diff --git a/tests/test_quant.py b/tests/test_quant.py index aa60bf38a2ec3..038ef0c1e2d82 100644 --- a/tests/test_quant.py +++ b/tests/test_quant.py @@ -50,7 +50,7 @@ def f1(self, quant): yaml_path='_forward').add( yaml_path='_forward').add(yaml_path='_forward').add(yaml_path='_forward').add(yaml_path='_forward') with f as fl: - fl.index(random_docs, callback=get_output, in_proto=True) + fl.index(random_docs, output_fn=get_output, in_proto=True) def test_quant(self): for j in ('fp32', 'fp16', 'uint8'):