This repository has been archived by the owner on Jul 13, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsettings.py
137 lines (125 loc) · 5.3 KB
/
settings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
##################################################
# sorting_hat_step Settings File
##################################################
import os
from credentials import get_mongodb_credentials
from schemas.output_schema import SCHEMA
# Set the global logging level to debug
LOGGING_DEBUG = os.getenv("LOGGING_DEBUG", False)
# Export prometheus metrics
PROMETHEUS = True
DB_CONFIG = get_mongodb_credentials()
# Consumer configuration
# Each consumer has different parameters and can be found in the documentation
CONSUMER_CONFIG = {
"CLASS": os.getenv("CONSUMER_CLASS", "apf.consumers.KafkaConsumer"),
"PARAMS": {
"bootstrap.servers": os.environ["CONSUMER_SERVER"],
"group.id": os.environ["CONSUMER_GROUP_ID"],
"auto.offset.reset": "beginning",
"max.poll.interval.ms": 3600000,
},
"consume.timeout": int(os.getenv("CONSUME_TIMEOUT", 10)),
"consume.messages": int(os.getenv("CONSUME_MESSAGES", 100)),
}
if os.getenv("TOPIC_STRATEGY_TOPIC_FORMAT"):
CONSUMER_CONFIG["TOPIC_STRATEGY"] = {
"CLASS": "apf.core.topic_management.DailyTopicStrategy",
"PARAMS": {
"topic_format": os.environ["TOPIC_STRATEGY_TOPIC_FORMAT"]
.strip()
.split(","),
"date_format": os.getenv("TOPIC_STRATEGY_DATE_FORMAT", "%Y%m%d"),
"change_hour": int(os.getenv("TOPIC_STRATEGY_CHANGE_HOUR", 23)),
},
}
elif os.getenv("CONSUMER_TOPICS"):
CONSUMER_CONFIG["TOPICS"] = os.environ["CONSUMER_TOPICS"].strip().split(",")
else:
raise Exception("Add TOPIC_STRATEGY or CONSUMER_TOPICS")
if os.getenv("CONSUMER_CLASS") == "apf.consumers.KafkaSchemalessConsumer":
CONSUMER_CONFIG["SCHEMA_PATH"] = os.path.join(
os.path.dirname(__file__), "schemas/elasticc/elasticc.v0_9_1.alert.avsc"
)
# Producer configuration
PRODUCER_CONFIG = {
"CLASS": os.getenv("PRODUCER_CLASS", "apf.producers.KafkaProducer"),
"TOPIC": os.environ["PRODUCER_TOPIC"],
"PARAMS": {
"bootstrap.servers": os.environ["PRODUCER_SERVER"],
},
"SCHEMA": SCHEMA,
}
METRICS_CONFIG = {
"CLASS": os.getenv("METRICS_CLASS", "apf.metrics.KafkaMetricsProducer"),
"EXTRA_METRICS": [
{"key": "candid", "format": lambda x: str(x)},
],
"PARAMS": {
"PARAMS": {
"bootstrap.servers": os.getenv("METRICS_HOST"),
"auto.offset.reset": "smallest",
},
"TOPIC": os.getenv("METRICS_TOPIC", "metrics"),
"SCHEMA": {
"$schema": "http://json-schema.org/draft-07/schema",
"$id": "http://example.com/example.json",
"type": "object",
"title": "The root schema",
"description": "The root schema comprises the entire JSON document.",
"default": {},
"examples": [
{"timestamp_sent": "2020-09-01", "timestamp_received": "2020-09-01"}
],
"required": ["timestamp_sent", "timestamp_received"],
"properties": {
"timestamp_sent": {
"$id": "#/properties/timestamp_sent",
"type": "string",
"title": "The timestamp_sent schema",
"description": "Timestamp sent refers to the time at which a message is sent.",
"default": "",
"examples": ["2020-09-01"],
},
"timestamp_received": {
"$id": "#/properties/timestamp_received",
"type": "string",
"title": "The timestamp_received schema",
"description": "Timestamp received refers to the time at which a message is received.",
"default": "",
"examples": ["2020-09-01"],
},
},
"additionalProperties": True,
},
},
}
if os.getenv("CONSUMER_KAFKA_USERNAME") and os.getenv("CONSUMER_KAFKA_PASSWORD"):
CONSUMER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL"
CONSUMER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
CONSUMER_CONFIG["PARAMS"]["sasl.username"] = os.getenv("CONSUMER_KAFKA_USERNAME")
CONSUMER_CONFIG["PARAMS"]["sasl.password"] = os.getenv("CONSUMER_KAFKA_PASSWORD")
if os.getenv("PRODUCER_KAFKA_USERNAME") and os.getenv("PRODUCER_KAFKA_PASSWORD"):
PRODUCER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL"
PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv("PRODUCER_KAFKA_USERNAME")
PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv("PRODUCER_KAFKA_PASSWORD")
if os.getenv("METRICS_KAFKA_USERNAME") and os.getenv("METRICS_KAFKA_PASSWORD"):
METRICS_CONFIG["PARAMS"]["PARAMS"]["security.protocol"] = "SASL_SSL"
METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.username"] = os.getenv(
"METRICS_KAFKA_USERNAME"
)
METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.password"] = os.getenv(
"METRICS_KAFKA_PASSWORD"
)
RUN_CONESEARCH = os.getenv("RUN_CONESEARCH", "True")
# Step Configuration
STEP_CONFIG = {
"PROMETHEUS": PROMETHEUS,
"DB_CONFIG": DB_CONFIG,
"CONSUMER_CONFIG": CONSUMER_CONFIG,
"PRODUCER_CONFIG": PRODUCER_CONFIG,
"METRICS_CONFIG": METRICS_CONFIG,
"RUN_CONESEARCH": RUN_CONESEARCH,
}