diff --git a/pymilvus/client/asynch.py b/pymilvus/client/asynch.py index 544d917ebf..a28df53776 100644 --- a/pymilvus/client/asynch.py +++ b/pymilvus/client/asynch.py @@ -161,11 +161,11 @@ def exception(self): class SearchFuture(Future): def on_response(self, response: Any): - if response.status.error_code == 0: + if response.status.code == 0: return QueryResult(response) status = response.status - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) # TODO: if ChunkedFuture is more common later, consider using ChunkedFuture as Base Class, @@ -246,8 +246,8 @@ def exception(self): def on_response(self, response: Any): for raw in response: - if raw.status.error_code != 0: - raise MilvusException(raw.status.error_code, raw.status.reason) + if raw.status.code != 0: + raise MilvusException(raw.status.code, raw.status.reason) return ChunkedQueryResult(response) @@ -255,19 +255,19 @@ def on_response(self, response: Any): class MutationFuture(Future): def on_response(self, response: Any): status = response.status - if status.error_code == 0: + if status.code == 0: return MutationResult(response) status = response.status - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) class CreateIndexFuture(Future): def on_response(self, response: Any): - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) - return Status(response.error_code, response.reason) + return Status(response.code, response.reason) class CreateFlatIndexFuture(AbstractFuture): @@ -327,17 +327,17 @@ def exception(self): class FlushFuture(Future): def on_response(self, response: Any): - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) class LoadCollectionFuture(Future): def on_response(self, response: Any): - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) class LoadPartitionsFuture(Future): def on_response(self, response: Any): - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) diff --git a/pymilvus/client/grpc_handler.py b/pymilvus/client/grpc_handler.py index a1a69befd7..e233fc6834 100644 --- a/pymilvus/client/grpc_handler.py +++ b/pymilvus/client/grpc_handler.py @@ -288,8 +288,8 @@ def create_collection( if kwargs.get("_async", False): return rf status = rf.result() - if status.error_code != 0: - raise MilvusException(status.error_code, status.reason) + if status.code != 0: + raise MilvusException(status.code, status.reason) return None @retry_on_rpc_failure() @@ -299,8 +299,8 @@ def drop_collection(self, collection_name: str, timeout: Optional[float] = None) rf = self._stub.DropCollection.future(request, timeout=timeout) status = rf.result() - if status.error_code != 0: - raise MilvusException(status.error_code, status.reason) + if status.code != 0: + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def alter_collection( @@ -310,8 +310,8 @@ def alter_collection( request = Prepare.alter_collection_request(collection_name, properties) rf = self._stub.AlterCollection.future(request, timeout=timeout) status = rf.result() - if status.error_code != 0: - raise MilvusException(status.error_code, status.reason) + if status.code != 0: + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def has_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): @@ -320,17 +320,17 @@ def has_collection(self, collection_name: str, timeout: Optional[float] = None, rf = self._stub.DescribeCollection.future(request, timeout=timeout) reply = rf.result() - if reply.status.error_code == common_pb2.Success: + if reply.status.code == common_pb2.Success: return True - # TODO: Workaround for unreasonable describe collection results and error_code + # TODO: Workaround for unreasonable describe collection results and code if ( - reply.status.error_code == common_pb2.UnexpectedError + reply.status.code == common_pb2.UnexpectedError and "can't find collection" in reply.status.reason ): return False - raise MilvusException(reply.status.error_code, reply.status.reason) + raise MilvusException(reply.status.code, reply.status.reason) @retry_on_rpc_failure() def describe_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): @@ -340,10 +340,10 @@ def describe_collection(self, collection_name: str, timeout: Optional[float] = N response = rf.result() status = response.status - if status.error_code == 0: + if status.code == 0: return CollectionSchema(raw=response).dict() - raise DescribeCollectionException(status.error_code, status.reason) + raise DescribeCollectionException(status.code, status.reason) @retry_on_rpc_failure() def list_collections(self, timeout: Optional[float] = None): @@ -351,10 +351,10 @@ def list_collections(self, timeout: Optional[float] = None): rf = self._stub.ShowCollections.future(request, timeout=timeout) response = rf.result() status = response.status - if response.status.error_code == 0: + if response.status.code == 0: return list(response.collection_names) - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def rename_collections( @@ -372,8 +372,8 @@ def rename_collections( rf = self._stub.RenameCollection.future(request, timeout=timeout) response = rf.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) @retry_on_rpc_failure() def create_partition( @@ -383,8 +383,8 @@ def create_partition( request = Prepare.create_partition_request(collection_name, partition_name) rf = self._stub.CreatePartition.future(request, timeout=timeout) response = rf.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) @retry_on_rpc_failure() def drop_partition( @@ -396,8 +396,8 @@ def drop_partition( rf = self._stub.DropPartition.future(request, timeout=timeout) response = rf.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) @retry_on_rpc_failure() def has_partition( @@ -408,10 +408,10 @@ def has_partition( rf = self._stub.HasPartition.future(request, timeout=timeout) response = rf.result() status = response.status - if status.error_code == 0: + if status.code == 0: return response.value - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) # TODO: this is not inuse @retry_on_rpc_failure() @@ -422,13 +422,13 @@ def get_partition_info( rf = self._stub.DescribePartition.future(request, timeout=timeout) response = rf.result() status = response.status - if status.error_code == 0: + if status.code == 0: statistics = response.statistics info_dict = {} for kv in statistics: info_dict[kv.key] = kv.value return info_dict - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def list_partitions(self, collection_name: str, timeout: Optional[float] = None): @@ -438,10 +438,10 @@ def list_partitions(self, collection_name: str, timeout: Optional[float] = None) rf = self._stub.ShowPartitions.future(request, timeout=timeout) response = rf.result() status = response.status - if status.error_code == 0: + if status.code == 0: return list(response.partition_names) - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def get_partition_stats( @@ -452,10 +452,10 @@ def get_partition_stats( future = self._stub.GetPartitionStatistics.future(req, timeout=timeout) response = future.result() status = response.status - if status.error_code == 0: + if status.code == 0: return response.stats - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) def _get_info(self, collection_name: str, timeout: Optional[float] = None, **kwargs): schema = kwargs.get("schema", None) @@ -503,12 +503,12 @@ def insert_rows( ) rf = self._stub.Insert.future(request, timeout=timeout) response = rf.result() - if response.status.error_code == 0: + if response.status.code == 0: m = MutationResult(response) ts_utils.update_collection_ts(collection_name, m.timestamp) return m - raise MilvusException(response.status.error_code, response.status.reason) + raise MilvusException(response.status.code, response.status.reason) def _prepare_batch_insert_request( self, @@ -560,12 +560,12 @@ def batch_insert( return f response = rf.result() - if response.status.error_code == 0: + if response.status.code == 0: m = MutationResult(response) ts_utils.update_collection_ts(collection_name, m.timestamp) return m - raise MilvusException(response.status.error_code, response.status.reason) + raise MilvusException(response.status.code, response.status.reason) except Exception as err: if kwargs.get("_async", False): return MutationFuture(None, None, err) @@ -592,12 +592,12 @@ def delete( return f response = future.result() - if response.status.error_code == 0: + if response.status.code == 0: m = MutationResult(response) ts_utils.update_collection_ts(collection_name, m.timestamp) return m - raise MilvusException(response.status.error_code, response.status.reason) + raise MilvusException(response.status.code, response.status.reason) except Exception as err: if kwargs.get("_async", False): return MutationFuture(None, None, err) @@ -654,12 +654,12 @@ def upsert( return f response = rf.result() - if response.status.error_code == 0: + if response.status.code == 0: m = MutationResult(response) ts_utils.update_collection_ts(collection_name, m.timestamp) return m - raise MilvusException(response.status.error_code, response.status.reason) + raise MilvusException(response.status.code, response.status.reason) except Exception as err: if kwargs.get("_async", False): return MutationFuture(None, None, err) @@ -701,12 +701,12 @@ def upsert_rows( ) rf = self._stub.Upsert.future(request, timeout=timeout) response = rf.result() - if response.status.error_code == 0: + if response.status.code == 0: m = MutationResult(response) ts_utils.update_collection_ts(collection_name, m.timestamp) return m - raise MilvusException(response.status.error_code, response.status.reason) + raise MilvusException(response.status.code, response.status.reason) def _execute_search_requests(self, requests: Any, timeout: Optional[float] = None, **kwargs): try: @@ -722,8 +722,8 @@ def _execute_search_requests(self, requests: Any, timeout: Optional[float] = Non for request in requests: response = self._stub.Search(request, timeout=timeout) - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) raws.append(response) round_decimal = kwargs.get("round_decimal", -1) @@ -781,9 +781,9 @@ def get_query_segment_info(self, collection_name: str, timeout: float = 30, **kw future = self._stub.GetQuerySegmentInfo.future(req, timeout=timeout) response = future.result() status = response.status - if status.error_code == 0: + if status.code == 0: return response.infos # todo: A wrapper class of QuerySegmentInfo - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def create_alias( @@ -793,16 +793,16 @@ def create_alias( request = Prepare.create_alias_request(collection_name, alias) rf = self._stub.CreateAlias.future(request, timeout=timeout) response = rf.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) @retry_on_rpc_failure() def drop_alias(self, alias: str, timeout: Optional[float] = None, **kwargs): request = Prepare.drop_alias_request(alias) rf = self._stub.DropAlias.future(request, timeout=timeout) response = rf.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) @retry_on_rpc_failure() def alter_alias( @@ -812,8 +812,8 @@ def alter_alias( request = Prepare.alter_alias_request(collection_name, alias) rf = self._stub.AlterAlias.future(request, timeout=timeout) response = rf.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) @retry_on_rpc_failure() def create_index( @@ -872,8 +872,8 @@ def _check(): status = future.result() - if status.error_code != 0: - raise MilvusException(status.error_code, status.reason) + if status.code != 0: + raise MilvusException(status.code, status.reason) if kwargs.get("sync", True): index_success, fail_reason = self.wait_for_creating_index( @@ -885,7 +885,7 @@ def _check(): if not index_success: raise MilvusException(message=fail_reason) - return Status(status.error_code, status.reason) + return Status(status.code, status.reason) @retry_on_rpc_failure() def list_indexes(self, collection_name: str, timeout: Optional[float] = None, **kwargs): @@ -895,11 +895,11 @@ def list_indexes(self, collection_name: str, timeout: Optional[float] = None, ** rf = self._stub.DescribeIndex.future(request, timeout=timeout) response = rf.result() status = response.status - if status.error_code == 0: + if status.code == 0: return response.index_descriptions - if status.error_code == Status.INDEX_NOT_EXIST: + if status.code == Status.INDEX_NOT_EXIST: return [] - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def describe_index( @@ -916,10 +916,10 @@ def describe_index( rf = self._stub.DescribeIndex.future(request, timeout=timeout) response = rf.result() status = response.status - if status.error_code == Status.INDEX_NOT_EXIST: + if status.code == Status.INDEX_NOT_EXIST: return None - if status.error_code != 0: - raise MilvusException(status.error_code, status.reason) + if status.code != 0: + raise MilvusException(status.code, status.reason) if len(response.index_descriptions) == 1: info_dict = {kv.key: kv.value for kv in response.index_descriptions[0].params} info_dict["field_name"] = response.index_descriptions[0].field_name @@ -938,7 +938,7 @@ def get_index_build_progress( rf = self._stub.DescribeIndex.future(request, timeout=timeout) response = rf.result() status = response.status - if status.error_code == 0: + if status.code == 0: if len(response.index_descriptions) == 1: index_desc = response.index_descriptions[0] return { @@ -947,7 +947,7 @@ def get_index_build_progress( "pending_index_rows": index_desc.pending_index_rows, } raise AmbiguousIndexName(message=ExceptionsMessage.AmbiguousIndexName) - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def get_index_state( @@ -962,8 +962,8 @@ def get_index_state( rf = self._stub.DescribeIndex.future(request, timeout=timeout) response = rf.result() status = response.status - if status.error_code != 0: - raise MilvusException(status.error_code, status.reason) + if status.code != 0: + raise MilvusException(status.code, status.reason) if len(response.index_descriptions) == 1: index_desc = response.index_descriptions[0] @@ -1016,8 +1016,8 @@ def load_collection( ) rf = self._stub.LoadCollection.future(request, timeout=timeout) response = rf.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) _async = kwargs.get("_async", False) if not _async: self.wait_for_loading_collection(collection_name, timeout, is_refresh=_refresh) @@ -1056,8 +1056,8 @@ def release_collection(self, collection_name: str, timeout: Optional[float] = No request = Prepare.release_collection("", collection_name) rf = self._stub.ReleaseCollection.future(request, timeout=timeout) response = rf.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) @retry_on_rpc_failure() def load_partitions( @@ -1098,8 +1098,8 @@ def _check(): return load_partitions_future response = future.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) sync = kwargs.get("sync", True) if sync: self.wait_for_loading_partitions(collection_name, partition_names, is_refresh=_refresh) @@ -1140,8 +1140,8 @@ def get_loading_progress( ): request = Prepare.get_loading_progress(collection_name, partition_names) response = self._stub.GetLoadingProgress.future(request, timeout=timeout).result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) if is_refresh: return response.refresh_progress @@ -1151,22 +1151,22 @@ def get_loading_progress( def create_database(self, db_name: str, timeout: Optional[float] = None): request = Prepare.create_database_req(db_name) status = self._stub.CreateDatabase(request, timeout=timeout) - if status.error_code != 0: - raise MilvusException(status.error_code, status.reason) + if status.code != 0: + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def drop_database(self, db_name: str, timeout: Optional[float] = None): request = Prepare.drop_database_req(db_name) status = self._stub.DropDatabase(request, timeout=timeout) - if status.error_code != 0: - raise MilvusException(status.error_code, status.reason) + if status.code != 0: + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def list_database(self, timeout: Optional[float] = None): request = Prepare.list_database_req() response = self._stub.ListDatabases(request, timeout=timeout) - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) return list(response.db_names) @retry_on_rpc_failure() @@ -1178,8 +1178,8 @@ def get_load_state( ): request = Prepare.get_load_state(collection_name, partition_names) response = self._stub.GetLoadState.future(request, timeout=timeout).result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) return LoadState(response.state) @retry_on_rpc_failure() @@ -1200,8 +1200,8 @@ def release_partitions( request = Prepare.release_partitions("", collection_name, partition_names) rf = self._stub.ReleasePartitions.future(request, timeout=timeout) response = rf.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) @retry_on_rpc_failure() def get_collection_stats(self, collection_name: str, timeout: Optional[float] = None, **kwargs): @@ -1210,10 +1210,10 @@ def get_collection_stats(self, collection_name: str, timeout: Optional[float] = future = self._stub.GetCollectionStatistics.future(index_param, timeout=timeout) response = future.result() status = response.status - if status.error_code == 0: + if status.code == 0: return response.stats - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def get_flush_state( @@ -1228,9 +1228,9 @@ def get_flush_state( future = self._stub.GetFlushState.future(req, timeout=timeout) response = future.result() status = response.status - if status.error_code == 0: + if status.code == 0: return response.flushed # todo: A wrapper class of PersistentSegmentInfo - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) # TODO seem not in use @retry_on_rpc_failure() @@ -1241,9 +1241,9 @@ def get_persistent_segment_infos( future = self._stub.GetPersistentSegmentInfo.future(req, timeout=timeout) response = future.result() status = response.status - if status.error_code == 0: + if status.code == 0: return response.infos # todo: A wrapper class of PersistentSegmentInfo - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) def _wait_for_flushed( self, @@ -1279,8 +1279,8 @@ def flush(self, collection_names: list, timeout: Optional[float] = None, **kwarg request = Prepare.flush_param(collection_names) future = self._stub.Flush.future(request, timeout=timeout) response = future.result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) def _check(): for collection_name in collection_names: @@ -1314,8 +1314,8 @@ def drop_index( request = Prepare.drop_index_request(collection_name, field_name, index_name) future = self._stub.DropIndex.future(request, timeout=timeout) response = future.result() - if response.error_code != 0: - raise MilvusException(response.error_code, response.reason) + if response.code != 0: + raise MilvusException(response.code, response.reason) @retry_on_rpc_failure() def dummy(self, request_type: Any, timeout: Optional[float] = None, **kwargs): @@ -1363,10 +1363,10 @@ def query( future = self._stub.Query.future(request, timeout=timeout) response = future.result() - if response.status.error_code == Status.EMPTY_COLLECTION: + if response.status.code == Status.EMPTY_COLLECTION: return [] - if response.status.error_code != Status.SUCCESS: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != Status.SUCCESS: + raise MilvusException(response.status.code, response.status.reason) num_fields = len(response.fields_data) # check has fields @@ -1404,22 +1404,22 @@ def load_balance( ) future = self._stub.LoadBalance.future(req, timeout=timeout) status = future.result() - if status.error_code != 0: - raise MilvusException(status.error_code, status.reason) + if status.code != 0: + raise MilvusException(status.code, status.reason) @retry_on_rpc_failure() def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int: request = Prepare.describe_collection_request(collection_name) rf = self._stub.DescribeCollection.future(request, timeout=timeout) response = rf.result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) req = Prepare.manual_compaction(response.collectionID) future = self._stub.ManualCompaction.future(req, timeout=timeout) response = future.result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) return response.compactionID @@ -1431,8 +1431,8 @@ def get_compaction_state( future = self._stub.GetCompactionState.future(req, timeout=timeout) response = future.result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) return CompactionState( compaction_id, @@ -1468,8 +1468,8 @@ def get_compaction_plans( future = self._stub.GetCompactionStateWithPlans.future(req, timeout=timeout) response = future.result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) cp = CompactionPlans(compaction_id, response.state) @@ -1488,8 +1488,8 @@ def get_replicas( req = Prepare.get_replicas(collection_id) future = self._stub.GetReplicas.future(req, timeout=timeout) response = future.result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) groups = [] for replica in response.replicas: @@ -1520,8 +1520,8 @@ def do_bulk_insert( req = Prepare.do_bulk_insert(collection_name, partition_name, files, **kwargs) future = self._stub.Import.future(req, timeout=timeout) response = future.result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) if len(response.tasks) == 0: raise MilvusException(common_pb2.UNEXPECTED_ERROR, "no task id returned from server") return response.tasks[0] @@ -1533,8 +1533,8 @@ def get_bulk_insert_state( req = Prepare.get_bulk_insert_state(task_id) future = self._stub.GetImportState.future(req, timeout=timeout) resp = future.result() - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return BulkInsertState( task_id, resp.state, resp.row_count, resp.id_list, resp.infos, resp.create_ts ) @@ -1546,8 +1546,8 @@ def list_bulk_insert_tasks( req = Prepare.list_bulk_insert_tasks(limit, collection_name) future = self._stub.ListImportTasks.future(req, timeout=timeout) resp = future.result() - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return [ BulkInsertState(t.id, t.state, t.row_count, t.id_list, t.infos, t.create_ts) @@ -1559,8 +1559,8 @@ def create_user(self, user: str, password: str, timeout: Optional[float] = None, check_pass_param(user=user, password=password) req = Prepare.create_user_request(user, password) resp = self._stub.CreateCredential(req, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def update_password( @@ -1573,37 +1573,37 @@ def update_password( ): req = Prepare.update_password_request(user, old_password, new_password) resp = self._stub.UpdateCredential(req, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def delete_user(self, user: str, timeout: Optional[float] = None, **kwargs): req = Prepare.delete_user_request(user) resp = self._stub.DeleteCredential(req, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def list_usernames(self, timeout: Optional[float] = None, **kwargs): req = Prepare.list_usernames_request() resp = self._stub.ListCredUsers(req, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return resp.usernames @retry_on_rpc_failure() def create_role(self, role_name: str, timeout: Optional[float] = None, **kwargs): req = Prepare.create_role_request(role_name) resp = self._stub.CreateRole(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def drop_role(self, role_name: str, timeout: Optional[float] = None, **kwargs): req = Prepare.drop_role_request(role_name) resp = self._stub.DropRole(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def add_user_to_role( @@ -1613,8 +1613,8 @@ def add_user_to_role( username, role_name, milvus_types.OperateUserRoleType.AddUserToRole ) resp = self._stub.OperateUserRole(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def remove_user_from_role( @@ -1624,8 +1624,8 @@ def remove_user_from_role( username, role_name, milvus_types.OperateUserRoleType.RemoveUserFromRole ) resp = self._stub.OperateUserRole(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def select_one_role( @@ -1633,16 +1633,16 @@ def select_one_role( ): req = Prepare.select_role_request(role_name, include_user_info) resp = self._stub.SelectRole(req, wait_for_ready=True, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return RoleInfo(resp.results) @retry_on_rpc_failure() def select_all_role(self, include_user_info: bool, timeout: Optional[float] = None, **kwargs): req = Prepare.select_role_request(None, include_user_info) resp = self._stub.SelectRole(req, wait_for_ready=True, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return RoleInfo(resp.results) @retry_on_rpc_failure() @@ -1651,16 +1651,16 @@ def select_one_user( ): req = Prepare.select_user_request(username, include_role_info) resp = self._stub.SelectUser(req, wait_for_ready=True, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return UserInfo(resp.results) @retry_on_rpc_failure() def select_all_user(self, include_role_info: bool, timeout: Optional[float] = None, **kwargs): req = Prepare.select_user_request(None, include_role_info) resp = self._stub.SelectUser(req, wait_for_ready=True, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return UserInfo(resp.results) @retry_on_rpc_failure() @@ -1683,8 +1683,8 @@ def grant_privilege( milvus_types.OperatePrivilegeType.Grant, ) resp = self._stub.OperatePrivilege(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def revoke_privilege( @@ -1706,8 +1706,8 @@ def revoke_privilege( milvus_types.OperatePrivilegeType.Revoke, ) resp = self._stub.OperatePrivilege(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def select_grant_for_one_role( @@ -1715,8 +1715,8 @@ def select_grant_for_one_role( ): req = Prepare.select_grant_request(role_name, None, None, db_name) resp = self._stub.SelectGrant(req, wait_for_ready=True, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return GrantInfo(resp.entities) @@ -1732,8 +1732,8 @@ def select_grant_for_role_and_object( ): req = Prepare.select_grant_request(role_name, object, object_name, db_name) resp = self._stub.SelectGrant(req, wait_for_ready=True, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return GrantInfo(resp.entities) @@ -1741,8 +1741,8 @@ def select_grant_for_role_and_object( def get_server_version(self, timeout: Optional[float] = None, **kwargs) -> str: req = Prepare.get_server_version() resp = self._stub.GetVersion(req, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return resp.version @@ -1750,22 +1750,22 @@ def get_server_version(self, timeout: Optional[float] = None, **kwargs) -> str: def create_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs): req = Prepare.create_resource_group(name) resp = self._stub.CreateResourceGroup(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def drop_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs): req = Prepare.drop_resource_group(name) resp = self._stub.DropResourceGroup(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def list_resource_groups(self, timeout: Optional[float] = None, **kwargs): req = Prepare.list_resource_groups() resp = self._stub.ListResourceGroups(req, wait_for_ready=True, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return list(resp.resource_groups) @retry_on_rpc_failure() @@ -1774,8 +1774,8 @@ def describe_resource_group( ) -> ResourceGroupInfo: req = Prepare.describe_resource_group(name) resp = self._stub.DescribeResourceGroup(req, wait_for_ready=True, timeout=timeout) - if resp.status.error_code != 0: - raise MilvusException(resp.status.error_code, resp.status.reason) + if resp.status.code != 0: + raise MilvusException(resp.status.code, resp.status.reason) return ResourceGroupInfo(resp.resource_group) @retry_on_rpc_failure() @@ -1784,8 +1784,8 @@ def transfer_node( ): req = Prepare.transfer_node(source, target, num_node) resp = self._stub.TransferNode(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def transfer_replica( @@ -1799,17 +1799,17 @@ def transfer_replica( ): req = Prepare.transfer_replica(source, target, collection_name, num_replica) resp = self._stub.TransferReplica(req, wait_for_ready=True, timeout=timeout) - if resp.error_code != 0: - raise MilvusException(resp.error_code, resp.reason) + if resp.code != 0: + raise MilvusException(resp.code, resp.reason) @retry_on_rpc_failure() def get_flush_all_state(self, flush_all_ts: int, timeout: Optional[float] = None, **kwargs): req = Prepare.get_flush_all_state_request(flush_all_ts) response = self._stub.GetFlushAllState(req, timeout=timeout) status = response.status - if status.error_code == 0: + if status.code == 0: return response.flushed - raise MilvusException(status.error_code, status.reason) + raise MilvusException(status.code, status.reason) def _wait_for_flush_all(self, flush_all_ts: int, timeout: Optional[float] = None, **kwargs): flush_ret = False @@ -1830,8 +1830,8 @@ def flush_all(self, timeout: Optional[float] = None, **kwargs): request = Prepare.flush_all_request() future = self._stub.FlushAll.future(request, timeout=timeout) response = future.result() - if response.status.error_code != 0: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != 0: + raise MilvusException(response.status.code, response.status.reason) def _check(): self._wait_for_flush_all(response.flush_all_ts, timeout, **kwargs) @@ -1854,8 +1854,8 @@ def _check(): def __internal_register(self, user: str, host: str) -> int: req = Prepare.register_request(user, host) response = self._stub.Connect(request=req) - if response.status.error_code != common_pb2.Success: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != common_pb2.Success: + raise MilvusException(response.status.code, response.status.reason) return response.identifier @retry_on_rpc_failure() @@ -1863,6 +1863,6 @@ def __internal_register(self, user: str, host: str) -> int: def alloc_timestamp(self, timeout: Optional[float] = None) -> int: request = milvus_types.AllocTimestampRequest() response = self._stub.AllocTimestamp(request, timeout=timeout) - if response.status.error_code != common_pb2.Success: - raise MilvusException(response.status.error_code, response.status.reason) + if response.status.code != common_pb2.Success: + raise MilvusException(response.status.code, response.status.reason) return response.timestamp diff --git a/pymilvus/decorators.py b/pymilvus/decorators.py index ae566406f7..3b7c6b7538 100644 --- a/pymilvus/decorators.py +++ b/pymilvus/decorators.py @@ -6,8 +6,7 @@ import grpc -from .exceptions import MilvusException -from .grpc_gen import common_pb2 +from .exceptions import MilvusException, ErrorCode LOGGER = logging.getLogger(__name__) WARNING_COLOR = "\033[93m{}\033[0m" @@ -95,7 +94,7 @@ def timeout(start_time: Optional[float] = None) -> bool: raise MilvusException( code=e.code, message=f"{to_msg}, message={e.message}" ) from e - if _retry_on_rate_limit and e.code == common_pb2.RateLimit: + if _retry_on_rate_limit and e.code == ErrorCode.RATE_LIMIT: time.sleep(back_off) back_off = min(back_off * back_off_multiplier, max_back_off) else: diff --git a/pymilvus/exceptions.py b/pymilvus/exceptions.py index eafbb10995..15a5e75aa2 100644 --- a/pymilvus/exceptions.py +++ b/pymilvus/exceptions.py @@ -16,6 +16,8 @@ class ErrorCode(IntEnum): SUCCESS = 0 UNEXPECTED_ERROR = 1 + RATE_LIMIT = 8 + FORCE_DENY = 9 class MilvusException(Exception): diff --git a/pymilvus/orm/mutation.py b/pymilvus/orm/mutation.py index b11ca92769..2fe64e594b 100644 --- a/pymilvus/orm/mutation.py +++ b/pymilvus/orm/mutation.py @@ -63,10 +63,3 @@ def __str__(self) -> str: return self._mr.__str__() if self._mr else "" __repr__ = __str__ - - # TODO - # def error_code(self): - # pass - # - # def error_reason(self): - # pass diff --git a/tests/test_create_collection.py b/tests/test_create_collection.py index 8d2e0b404f..637e187841 100644 --- a/tests/test_create_collection.py +++ b/tests/test_create_collection.py @@ -18,11 +18,13 @@ def __init__(self, **kwargs): def __eq__(self, other): if isinstance(other, Fields.NormalizedField): - return self.name == other.name and \ - self.is_primary_key == other.is_primary_key and \ - self.data_type == other.data_type and \ - self.type_params == other.type_params and \ - self.autoID == other.autoID + return ( + self.name == other.name + and self.is_primary_key == other.is_primary_key + and self.data_type == other.data_type + and self.type_params == other.type_params + and self.autoID == other.autoID + ) return False def __repr__(self): @@ -36,20 +38,23 @@ def __repr__(self): @classmethod def equal(cls, grpc_fields, dict_fields): n_grpc_fields = { - field.name: Fields.NormalizedField(name=field.name, - is_primary_key=field.is_primary_key, - data_type=field.data_type, - type_params={pair.key: pair.value for pair in field.type_params}, - autoID=field.autoID - ) - for field in grpc_fields} + field.name: Fields.NormalizedField( + name=field.name, + is_primary_key=field.is_primary_key, + data_type=field.data_type, + type_params={pair.key: pair.value for pair in field.type_params}, + autoID=field.autoID, + ) + for field in grpc_fields + } n_dict_fields = { - field["name"]: Fields.NormalizedField(name=field["name"], - is_primary_key=field.get("is_primary", False), - data_type=field["type"], - type_params=field.get("params", dict()), - autoID=field.get("auto_id", False) - ) + field["name"]: Fields.NormalizedField( + name=field["name"], + is_primary_key=field.get("is_primary", False), + data_type=field["type"], + type_params=field.get("params", dict()), + autoID=field.get("auto_id", False), + ) for field in dict_fields } return n_grpc_fields == n_dict_fields @@ -63,8 +68,9 @@ def collection_name(self): def setup(self) -> None: self._real_time = grpc_testing.strict_real_time() self._real_time_channel = grpc_testing.channel( - milvus_pb2.DESCRIPTOR.services_by_name.values(), self._real_time) - self._servicer = milvus_pb2.DESCRIPTOR.services_by_name['MilvusService'] + milvus_pb2.DESCRIPTOR.services_by_name.values(), self._real_time + ) + self._servicer = milvus_pb2.DESCRIPTOR.services_by_name["MilvusService"] self._milvus = Milvus(channel=self._real_time_channel) def teardown(self) -> None: @@ -84,13 +90,17 @@ def test_create_collection(self, collection_name): "params": {"dim": "4"}, } fields = {"fields": [id_field, vector_field], "enable_dynamic_field": True} - future = self._milvus.create_collection(collection_name=collection_name, fields=fields, _async=True) + future = self._milvus.create_collection( + collection_name=collection_name, fields=fields, _async=True + ) invocation_metadata, request, rpc = self._real_time_channel.take_unary_unary( - self._servicer.methods_by_name['CreateCollection'] + self._servicer.methods_by_name["CreateCollection"] ) rpc.send_initial_metadata(()) - rpc.terminate(common_pb2.Status(error_code=common_pb2.Success, reason="success"), (), grpc.StatusCode.OK, '') + rpc.terminate( + common_pb2.Status(code=common_pb2.Success, reason="success"), (), grpc.StatusCode.OK, "" + ) request_schema = schema_pb2.CollectionSchema() request_schema.ParseFromString(request.schema) @@ -100,5 +110,5 @@ def test_create_collection(self, collection_name): assert request_schema.enable_dynamic_field == fields["enable_dynamic_field"] return_value = future.result() - assert return_value.error_code == common_pb2.Success + assert return_value.code == 0 assert return_value.reason == "success" diff --git a/tests/test_decorators.py b/tests/test_decorators.py index a361e3db42..b64cfd66ed 100644 --- a/tests/test_decorators.py +++ b/tests/test_decorators.py @@ -4,7 +4,7 @@ import pytest from pymilvus.decorators import retry_on_rpc_failure, error_handler -from pymilvus.exceptions import MilvusUnavailableException, MilvusException +from pymilvus.exceptions import ErrorCode, MilvusUnavailableException, MilvusException from pymilvus.grpc_gen import common_pb2 @@ -15,12 +15,12 @@ def mock_failure(self, code: grpc.StatusCode): if code == MockDeadlineExceededError().code(): raise MockDeadlineExceededError() - def mock_milvus_exception(self, code: common_pb2.ErrorCode): - if code == common_pb2.ForceDeny: + def mock_milvus_exception(self, code: ErrorCode): + if code == ErrorCode.FORCE_DENY: raise MockForceDenyError() - if code == common_pb2.RateLimit: + if code == ErrorCode.RATE_LIMIT: raise MockRateLimitError() - raise MilvusException(common_pb2.UNEXPECTED_ERROR, str("unexpected error")) + raise MilvusException(ErrorCode.UNEXPECTED_ERROR, str("unexpected error")) @pytest.mark.parametrize("times", [0, 1, 2, 3]) def test_retry_decorators_unavailable(self, times): @@ -95,7 +95,7 @@ def test_api(self, code): self.mock_milvus_exception(code) with pytest.raises(MilvusException) as e: - test_api(self, common_pb2.ForceDeny) + test_api(self, ErrorCode.FORCE_DENY) print(e) assert "force deny" in e.value.message @@ -112,7 +112,7 @@ def test_api(self, code, retry_on_rate_limit): self.mock_milvus_exception(code) with pytest.raises(MilvusException) as e: - test_api(self, common_pb2.RateLimit, retry_on_rate_limit=False) + test_api(self, ErrorCode.RATE_LIMIT, retry_on_rate_limit=False) print(e) assert "rate limit" in e.value.message @@ -129,7 +129,7 @@ def test_api(self, code, retry_on_rate_limit): self.mock_milvus_exception(code) with pytest.raises(MilvusException) as e: - test_api(self, common_pb2.RateLimit, retry_on_rate_limit=True) + test_api(self, ErrorCode.RATE_LIMIT, retry_on_rate_limit=True) print(e) assert "rate limit" in e.value.message @@ -152,8 +152,9 @@ def code(self): def details(self): return "details of deadline exceeded" + class MockForceDenyError(MilvusException): - def __init__(self, code=common_pb2.ForceDeny, message="force deny"): + def __init__(self, code=ErrorCode.FORCE_DENY, message="force deny"): super(MilvusException, self).__init__(message) self._code = code self._message = message @@ -166,8 +167,9 @@ def code(self): def message(self): return self._message + class MockRateLimitError(MilvusException): - def __init__(self, code=common_pb2.RateLimit, message="rate limit"): + def __init__(self, code=ErrorCode.RATE_LIMIT, message="rate limit"): super(MilvusException, self).__init__(message) self._code = code self._message = message diff --git a/tests/test_grpc_handler.py b/tests/test_grpc_handler.py index de2d16f511..a21a79df34 100644 --- a/tests/test_grpc_handler.py +++ b/tests/test_grpc_handler.py @@ -4,7 +4,7 @@ from pymilvus.client.grpc_handler import GrpcHandler from pymilvus.grpc_gen import milvus_pb2, common_pb2 -descriptor = milvus_pb2.DESCRIPTOR.services_by_name['MilvusService'] +descriptor = milvus_pb2.DESCRIPTOR.services_by_name["MilvusService"] class TestGrpcHandler: @@ -12,22 +12,20 @@ class TestGrpcHandler: def test_has_collection_no_error(self, channel, client_thread, ifHas): handler = GrpcHandler(channel=channel) - has_collection_future = client_thread.submit( - handler.has_collection, "fake") + has_collection_future = client_thread.submit(handler.has_collection, "fake") - (invocation_metadata, request, rpc) = ( - channel.take_unary_unary(descriptor.methods_by_name['DescribeCollection'])) + (invocation_metadata, request, rpc) = channel.take_unary_unary( + descriptor.methods_by_name["DescribeCollection"] + ) rpc.send_initial_metadata(()) reason = "" if ifHas else "can't find collection" - error_code = common_pb2.Success if ifHas else common_pb2.UnexpectedError + code = 0 if ifHas else 1 expected_result = milvus_pb2.DescribeCollectionResponse( - status=common_pb2.Status( - error_code=error_code, - reason=reason), + status=common_pb2.Status(code=code, reason=reason), ) - rpc.terminate(expected_result, (), grpc.StatusCode.OK, '') + rpc.terminate(expected_result, (), grpc.StatusCode.OK, "") got_result = has_collection_future.result() assert got_result is ifHas @@ -35,19 +33,17 @@ def test_has_collection_no_error(self, channel, client_thread, ifHas): def test_has_collection_error(self, channel, client_thread): handler = GrpcHandler(channel=channel) - has_collection_future = client_thread.submit( - handler.has_collection, "fake") + has_collection_future = client_thread.submit(handler.has_collection, "fake") - (invocation_metadata, request, rpc) = ( - channel.take_unary_unary(descriptor.methods_by_name['DescribeCollection'])) + (invocation_metadata, request, rpc) = channel.take_unary_unary( + descriptor.methods_by_name["DescribeCollection"] + ) rpc.send_initial_metadata(()) expected_result = milvus_pb2.DescribeCollectionResponse( - status=common_pb2.Status( - error_code=common_pb2.UnexpectedError, - reason="other reason"), + status=common_pb2.Status(code=1, reason="other reason"), ) - rpc.terminate(expected_result, (), grpc.StatusCode.OK, '') + rpc.terminate(expected_result, (), grpc.StatusCode.OK, "") with pytest.raises(MilvusException): has_collection_future.result() @@ -57,16 +53,16 @@ def test_has_collection_Unavailable_exception(self, channel, client_thread): channel.close() # Retry is unable to test - has_collection_future = client_thread.submit( - handler.has_collection, "fake", timeout=0) + has_collection_future = client_thread.submit(handler.has_collection, "fake", timeout=0) - (invocation_metadata, request, rpc) = ( - channel.take_unary_unary(descriptor.methods_by_name['DescribeCollection'])) + (invocation_metadata, request, rpc) = channel.take_unary_unary( + descriptor.methods_by_name["DescribeCollection"] + ) rpc.send_initial_metadata(()) expected_result = milvus_pb2.DescribeCollectionResponse() - rpc.terminate(expected_result, (), grpc.StatusCode.UNAVAILABLE, 'server Unavailable') + rpc.terminate(expected_result, (), grpc.StatusCode.UNAVAILABLE, "server Unavailable") with pytest.raises(MilvusException): has_collection_future.result() @@ -74,19 +70,17 @@ def test_has_collection_Unavailable_exception(self, channel, client_thread): def test_get_server_version_error(self, channel, client_thread): handler = GrpcHandler(channel=channel) - get_version_future = client_thread.submit( - handler.get_server_version) + get_version_future = client_thread.submit(handler.get_server_version) - (invocation_metadata, request, rpc) = ( - channel.take_unary_unary(descriptor.methods_by_name['GetVersion'])) + (invocation_metadata, request, rpc) = channel.take_unary_unary( + descriptor.methods_by_name["GetVersion"] + ) rpc.send_initial_metadata(()) expected_result = milvus_pb2.GetVersionResponse( - status=common_pb2.Status( - error_code=common_pb2.UnexpectedError, - reason="unexpected error"), + status=common_pb2.Status(code=1, reason="unexpected error"), ) - rpc.terminate(expected_result, (), grpc.StatusCode.OK, '') + rpc.terminate(expected_result, (), grpc.StatusCode.OK, "") with pytest.raises(MilvusException): get_version_future.result() @@ -95,18 +89,18 @@ def test_get_server_version(self, channel, client_thread): version = "2.2.0" handler = GrpcHandler(channel=channel) - get_version_future = client_thread.submit( - handler.get_server_version) + get_version_future = client_thread.submit(handler.get_server_version) - (invocation_metadata, request, rpc) = ( - channel.take_unary_unary(descriptor.methods_by_name['GetVersion'])) + (invocation_metadata, request, rpc) = channel.take_unary_unary( + descriptor.methods_by_name["GetVersion"] + ) rpc.send_initial_metadata(()) expected_result = milvus_pb2.GetVersionResponse( - status=common_pb2.Status(error_code=common_pb2.Success), + status=common_pb2.Status(code=0), version=version, ) - rpc.terminate(expected_result, (), grpc.StatusCode.OK, '') + rpc.terminate(expected_result, (), grpc.StatusCode.OK, "") got_result = get_version_future.result() assert got_result == version @@ -115,35 +109,35 @@ def test_get_server_version(self, channel, client_thread): def test_flush_all(self, channel, client_thread, _async): handler = GrpcHandler(channel=channel) - flush_all_future = client_thread.submit( - handler.flush_all, _async=_async, timeout=10) + flush_all_future = client_thread.submit(handler.flush_all, _async=_async, timeout=10) - (invocation_metadata, request, rpc) = ( - channel.take_unary_unary(descriptor.methods_by_name['FlushAll'])) + (invocation_metadata, request, rpc) = channel.take_unary_unary( + descriptor.methods_by_name["FlushAll"] + ) rpc.send_initial_metadata(()) expected_result = milvus_pb2.FlushAllResponse( - status=common_pb2.Status(error_code=common_pb2.Success), + status=common_pb2.Status(code=0), flush_all_ts=100, ) - rpc.terminate(expected_result, (), grpc.StatusCode.OK, '') + rpc.terminate(expected_result, (), grpc.StatusCode.OK, "") assert flush_all_future is not None def test_get_flush_all_state(self, channel, client_thread): handler = GrpcHandler(channel=channel) - flushed = client_thread.submit( - handler.get_flush_all_state, flush_all_ts=100) + flushed = client_thread.submit(handler.get_flush_all_state, flush_all_ts=100) - (invocation_metadata, request, rpc) = ( - channel.take_unary_unary(descriptor.methods_by_name['GetFlushAllState'])) + (invocation_metadata, request, rpc) = channel.take_unary_unary( + descriptor.methods_by_name["GetFlushAllState"] + ) rpc.send_initial_metadata(()) expected_result = milvus_pb2.GetFlushStateResponse( - status=common_pb2.Status(error_code=common_pb2.Success), + status=common_pb2.Status(code=0), flushed=True, ) - rpc.terminate(expected_result, (), grpc.StatusCode.OK, '') + rpc.terminate(expected_result, (), grpc.StatusCode.OK, "") assert flushed.result() is True