Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opensearch template index #485

Merged
merged 6 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
"ctx._source.anomaly_predicted_count += 1; ctx._source.opnilog_anomaly = true;"
)

async def doc_generator(df):
async def doc_generator(df, index, op_type="update"):
main_doc_keywords = {"_op_type", "_index", "_id", "doc"}
df["_op_type"] = "update"
df["_index"] = "logs"
df["_op_type"] = op_type
df["_index"] = index
df.rename(columns={"log_id": "_id"}, inplace=True)
for index, document in df.iterrows():
doc_dict = document.to_dict()
Expand All @@ -38,8 +38,6 @@ async def doc_generator(df):
del doc_dict[k]
yield doc_dict



async def setup_es_connection():
ES_ENDPOINT = os.environ["ES_ENDPOINT"]
ES_USERNAME = os.environ["ES_USERNAME"]
Expand All @@ -66,26 +64,67 @@ async def setup_es_connection():
)


async def consume_logs(nw, inferenced_logs_queue):
async def consume_logs(nw, inferenced_logs_queue, template_logs_queue):
async def subscribe_handler(msg):
data = msg.data
logs_df = pd.DataFrame(PayloadList().parse(data).items)
await inferenced_logs_queue.put(logs_df)

async def template_subscribe_handler(msg):
data = msg.data
logs_df = pd.DataFrame(PayloadList().parse(data).items)
await template_logs_queue.put(logs_df)

await nw.subscribe(
nats_subject="inferenced_logs",
nats_queue="workers",
payload_queue=inferenced_logs_queue,
subscribe_handler=subscribe_handler,
)

await nw.subscribe(
nats_subject="templates_index",
nats_queue="workers",
payload_queue=template_logs_queue,
subscribe_handler=template_subscribe_handler,
)

async def receive_template_data(queue):
es = await setup_es_connection()
while True:
df = await queue.get()
await update_template_data(es, df)


async def receive_logs(queue):
es = await setup_es_connection()
while True:
df = await queue.get()
await update_logs(es, df)

async def update_template_data(es, df):
try:
async for ok, result in async_streaming_bulk(
es,
doc_generator(df[["_id", "log", "template_matched", "template_cluster_id"]], "templates", "index"),
max_retries=1,
initial_backoff=1,
request_timeout=5,
):
action, result = result.popitem()
if not ok:
logging.error("failed to {} document {}".format())
except (BulkIndexError, ConnectionTimeout, TimeoutError) as exception:
logging.error(
"Failed to index data. Re-adding to logs_to_update_in_elasticsearch queue"
)
logging.error(exception)
except TransportError as exception:
logging.info(f"Error in async_streaming_bulk {exception}")
if exception.status_code == "N/A":
logging.info("Elasticsearch connection error")
es = await setup_es_connection()

async def update_logs(es, df):
# This function will be updating Opensearch logs which were inferred on by the DRAIN model.
model_keywords_dict = {"drain": ["_id", "masked_log", "template_matched","template_cluster_id","inference_model", "anomaly_level"],
Expand All @@ -102,7 +141,8 @@ async def update_logs(es, df):
async for ok, result in async_streaming_bulk(
es,
doc_generator(
anomaly_level_df[model_keywords_dict[model_name]]
anomaly_level_df[model_keywords_dict[model_name]],
"logs",
),
max_retries=1,
initial_backoff=1,
Expand Down Expand Up @@ -133,14 +173,16 @@ async def init_nats():
if __name__ == "__main__":
loop = asyncio.get_event_loop()
processed_logs_queue = asyncio.Queue(loop=loop)
task = loop.create_task(init_nats())
nw = loop.run_until_complete(task)
nats_consumer_coroutine = consume_logs(nw, processed_logs_queue)
template_logs_queue = asyncio.Queue(loop=loop)
init_nats_task = loop.create_task(init_nats())
nw = loop.run_until_complete(init_nats_task)
nats_consumer_coroutine = consume_logs(nw, processed_logs_queue, template_logs_queue)
update_logs_coroutine = receive_logs(processed_logs_queue)
update_templates_coroutine = receive_template_data(template_logs_queue)


loop.run_until_complete(
asyncio.gather(nats_consumer_coroutine, update_logs_coroutine)
asyncio.gather(nats_consumer_coroutine, update_logs_coroutine, update_templates_coroutine)
)
try:
loop.run_forever()
Expand Down
6 changes: 6 additions & 0 deletions pkg/resources/opnicluster/elastic/indices/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ func (r *Reconciler) Reconcile() (retResult *reconcile.Result, retErr error) {
}
}

err = r.osReconciler.MaybeCreateIndex(logTemplateIndexName, logTemplateIndexSettings)
if err != nil {
conditions = append(conditions, err.Error())
retErr = errors.Combine(retErr, err)
}

err = r.osReconciler.MaybeCreateIndex(normalIntervalIndexName, normalIntervalIndexSettings)
if err != nil {
conditions = append(conditions, err.Error())
Expand Down
18 changes: 18 additions & 0 deletions pkg/resources/opnicluster/elastic/indices/opni_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
LogIndexPrefix = "logs-v0.5.4"
LogIndexAlias = "logs"
LogIndexTemplateName = "logs_rollover_mapping"
logTemplateIndexName = "templates"
PreProcessingPipelineName = "opni-ingest-pipeline"
drainStatusPolicyName = "opni-drain-model-status-policy"
drainStatusIndexPrefix = "opni-drain-model-status-v0.1.3"
Expand Down Expand Up @@ -750,6 +751,23 @@ var (
},
}

logTemplateIndexSettings = map[string]osapiext.TemplateMappingsSpec{
"mappings": {
Properties: map[string]osapiext.PropertySettings{
"log": {
Type: "text",
},
"template_matched": {
Type: "keyword",
},
"template_cluster_id": {
Type: "integer",
},
},
},
}


normalIntervalIndexSettings = map[string]osapiext.TemplateMappingsSpec{
"mappings": {
Properties: map[string]osapiext.PropertySettings{
Expand Down