diff --git a/aiops/opni-opensearch-update-service/opensearch-update-service/app/main.py b/aiops/opni-opensearch-update-service/opensearch-update-service/app/main.py index 64873a7ad7..b387406f2e 100644 --- a/aiops/opni-opensearch-update-service/opensearch-update-service/app/main.py +++ b/aiops/opni-opensearch-update-service/opensearch-update-service/app/main.py @@ -23,10 +23,10 @@ "ctx._source.anomaly_predicted_count += 1; ctx._source.opnilog_anomaly = true;" ) -async def doc_generator(df): +async def doc_generator(df, index, op_type="update"): main_doc_keywords = {"_op_type", "_index", "_id", "doc"} - df["_op_type"] = "update" - df["_index"] = "logs" + df["_op_type"] = op_type + df["_index"] = index df.rename(columns={"log_id": "_id"}, inplace=True) for index, document in df.iterrows(): doc_dict = document.to_dict() @@ -38,8 +38,6 @@ async def doc_generator(df): del doc_dict[k] yield doc_dict - - async def setup_es_connection(): ES_ENDPOINT = os.environ["ES_ENDPOINT"] ES_USERNAME = os.environ["ES_USERNAME"] @@ -66,12 +64,17 @@ async def setup_es_connection(): ) -async def consume_logs(nw, inferenced_logs_queue): +async def consume_logs(nw, inferenced_logs_queue, template_logs_queue): async def subscribe_handler(msg): data = msg.data logs_df = pd.DataFrame(PayloadList().parse(data).items) await inferenced_logs_queue.put(logs_df) + async def template_subscribe_handler(msg): + data = msg.data + logs_df = pd.DataFrame(PayloadList().parse(data).items) + await template_logs_queue.put(logs_df) + await nw.subscribe( nats_subject="inferenced_logs", nats_queue="workers", @@ -79,6 +82,19 @@ async def subscribe_handler(msg): subscribe_handler=subscribe_handler, ) + await nw.subscribe( + nats_subject="templates_index", + nats_queue="workers", + payload_queue=template_logs_queue, + subscribe_handler=template_subscribe_handler, + ) + +async def receive_template_data(queue): + es = await setup_es_connection() + while True: + df = await queue.get() + await update_template_data(es, df) + async def receive_logs(queue): es = await setup_es_connection() @@ -86,6 +102,29 @@ async def receive_logs(queue): df = await queue.get() await update_logs(es, df) +async def update_template_data(es, df): + try: + async for ok, result in async_streaming_bulk( + es, + doc_generator(df[["_id", "log", "template_matched", "template_cluster_id"]], "templates", "index"), + max_retries=1, + initial_backoff=1, + request_timeout=5, + ): + action, result = result.popitem() + if not ok: + logging.error("failed to {} document {}".format()) + except (BulkIndexError, ConnectionTimeout, TimeoutError) as exception: + logging.error( + "Failed to index data. Re-adding to logs_to_update_in_elasticsearch queue" + ) + logging.error(exception) + except TransportError as exception: + logging.info(f"Error in async_streaming_bulk {exception}") + if exception.status_code == "N/A": + logging.info("Elasticsearch connection error") + es = await setup_es_connection() + async def update_logs(es, df): # This function will be updating Opensearch logs which were inferred on by the DRAIN model. model_keywords_dict = {"drain": ["_id", "masked_log", "template_matched","template_cluster_id","inference_model", "anomaly_level"], @@ -102,7 +141,8 @@ async def update_logs(es, df): async for ok, result in async_streaming_bulk( es, doc_generator( - anomaly_level_df[model_keywords_dict[model_name]] + anomaly_level_df[model_keywords_dict[model_name]], + "logs", ), max_retries=1, initial_backoff=1, @@ -133,14 +173,16 @@ async def init_nats(): if __name__ == "__main__": loop = asyncio.get_event_loop() processed_logs_queue = asyncio.Queue(loop=loop) - task = loop.create_task(init_nats()) - nw = loop.run_until_complete(task) - nats_consumer_coroutine = consume_logs(nw, processed_logs_queue) + template_logs_queue = asyncio.Queue(loop=loop) + init_nats_task = loop.create_task(init_nats()) + nw = loop.run_until_complete(init_nats_task) + nats_consumer_coroutine = consume_logs(nw, processed_logs_queue, template_logs_queue) update_logs_coroutine = receive_logs(processed_logs_queue) + update_templates_coroutine = receive_template_data(template_logs_queue) loop.run_until_complete( - asyncio.gather(nats_consumer_coroutine, update_logs_coroutine) + asyncio.gather(nats_consumer_coroutine, update_logs_coroutine, update_templates_coroutine) ) try: loop.run_forever() diff --git a/pkg/resources/opnicluster/elastic/indices/indices.go b/pkg/resources/opnicluster/elastic/indices/indices.go index 8825ebd97a..db3c86127a 100644 --- a/pkg/resources/opnicluster/elastic/indices/indices.go +++ b/pkg/resources/opnicluster/elastic/indices/indices.go @@ -259,6 +259,12 @@ func (r *Reconciler) Reconcile() (retResult *reconcile.Result, retErr error) { } } + err = r.osReconciler.MaybeCreateIndex(logTemplateIndexName, logTemplateIndexSettings) + if err != nil { + conditions = append(conditions, err.Error()) + retErr = errors.Combine(retErr, err) + } + err = r.osReconciler.MaybeCreateIndex(normalIntervalIndexName, normalIntervalIndexSettings) if err != nil { conditions = append(conditions, err.Error()) diff --git a/pkg/resources/opnicluster/elastic/indices/opni_settings.go b/pkg/resources/opnicluster/elastic/indices/opni_settings.go index 5adb17371c..a263ba4c33 100644 --- a/pkg/resources/opnicluster/elastic/indices/opni_settings.go +++ b/pkg/resources/opnicluster/elastic/indices/opni_settings.go @@ -13,6 +13,7 @@ const ( LogIndexPrefix = "logs-v0.5.4" LogIndexAlias = "logs" LogIndexTemplateName = "logs_rollover_mapping" + logTemplateIndexName = "templates" PreProcessingPipelineName = "opni-ingest-pipeline" drainStatusPolicyName = "opni-drain-model-status-policy" drainStatusIndexPrefix = "opni-drain-model-status-v0.1.3" @@ -750,6 +751,23 @@ var ( }, } + logTemplateIndexSettings = map[string]osapiext.TemplateMappingsSpec{ + "mappings": { + Properties: map[string]osapiext.PropertySettings{ + "log": { + Type: "text", + }, + "template_matched": { + Type: "keyword", + }, + "template_cluster_id": { + Type: "integer", + }, + }, + }, + } + + normalIntervalIndexSettings = map[string]osapiext.TemplateMappingsSpec{ "mappings": { Properties: map[string]osapiext.PropertySettings{