-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathntfy_api.py
207 lines (172 loc) · 5.81 KB
/
ntfy_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
from flask import Flask, request, jsonify
from flask_cors import CORS
import sqlite3
app = Flask(__name__)
CORS(app)
app.logger.setLevel("WARNING")
def get_db_connection():
conn = sqlite3.connect("/github-ntfy/watched_repos.db")
conn.row_factory = sqlite3.Row
return conn
def close_db_connection(conn):
conn.close()
@app.route("/app_repo", methods=["POST"])
def app_repo():
data = request.json
repo = data.get("repo")
# Vérifier si le champ 'repo' est présent dans les données JSON
if not repo:
return (
jsonify({"error": "The repo field is required."}),
400,
)
# Établir une connexion à la base de données
conn = get_db_connection()
cursor = conn.cursor()
try:
# Vérifier si le dépôt existe déjà dans la base de données
cursor.execute(
"SELECT * FROM watched_repos WHERE repo=?",
(repo,),
)
existing_repo = cursor.fetchone()
if existing_repo:
return (
jsonify({"error": f"The GitHub repo {repo} is already in the database."}),
409,
)
# Ajouter le dépôt à la base de données
cursor.execute(
"INSERT INTO watched_repos (repo) VALUES (?)",
(repo,),
)
conn.commit()
return jsonify({"message": f"The GitHub repo {repo} as been added to the watched repos."})
finally:
# Fermer la connexion à la base de données
close_db_connection(conn)
@app.route("/app_docker_repo", methods=["POST"])
def app_docker_repo():
data = request.json
repo = data.get("repo")
# Vérifier si le champ 'repo' est présent dans les données JSON
if not repo:
return (
jsonify({"error": "The repo field is required."}),
400,
)
# Établir une connexion à la base de données
conn = get_db_connection()
cursor = conn.cursor()
try:
# Vérifier si le dépôt existe déjà dans la base de données
cursor.execute(
"SELECT * FROM docker_watched_repos WHERE repo=?",
(repo,),
)
existing_repo = cursor.fetchone()
if existing_repo:
return (
jsonify({"error": f"The Docker repo {repo} is already in the database."}),
409,
)
# Ajouter le dépôt à la base de données
cursor.execute(
"INSERT INTO docker_watched_repos (repo) VALUES (?)",
(repo,),
)
conn.commit()
return jsonify({"message": f"The Docker repo {repo} as been added to the watched repos."})
finally:
# Fermer la connexion à la base de données
close_db_connection(conn)
@app.route("/watched_repos", methods=["GET"])
def get_watched_repos():
db = get_db_connection()
cursor = db.cursor()
cursor.execute("SELECT repo FROM watched_repos")
watched_repos = [repo[0] for repo in cursor.fetchall()]
cursor.close()
db.close()
return jsonify(watched_repos)
@app.route("/watched_docker_repos", methods=["GET"])
def get_watched_docker_repos():
db = get_db_connection()
cursor = db.cursor()
cursor.execute("SELECT repo FROM docker_watched_repos")
watched_repos = [repo[0] for repo in cursor.fetchall()]
cursor.close()
db.close()
return jsonify(watched_repos)
@app.route("/delete_repo", methods=["POST"])
def delete_repo():
data = request.json
repo = data.get("repo")
# Vérifier si le champ 'repo' est présent dans les données JSON
if not repo:
return (
jsonify({"error": "The repo field is required."}),
400,
)
# Établir une connexion à la base de données
conn = get_db_connection()
cursor = conn.cursor()
try:
# Vérifier si le dépôt existe dans la base de données
cursor.execute(
"SELECT * FROM watched_repos WHERE repo=?",
(repo,),
)
existing_repo = cursor.fetchone()
if not existing_repo:
return (
jsonify({"error": f"The GitHub repo {repo} is not in the database."}),
404,
)
# Supprimer le dépôt de la base de données
cursor.execute(
"DELETE FROM watched_repos WHERE repo=?",
(repo,),
)
conn.commit()
return jsonify({"message": f"The GitHub repo {repo} as been deleted from the watched repos."})
finally:
# Fermer la connexion à la base de données
close_db_connection(conn)
@app.route("/delete_docker_repo", methods=["POST"])
def delete_docker_repo():
data = request.json
repo = data.get("repo")
# Vérifier si le champ 'repo' est présent dans les données JSON
if not repo:
return (
jsonify({"error": "The repo field is required."}),
400,
)
# Établir une connexion à la base de données
conn = get_db_connection()
cursor = conn.cursor()
try:
# Vérifier si le dépôt existe dans la base de données
cursor.execute(
"SELECT * FROM docker_watched_repos WHERE repo=?",
(repo,),
)
existing_repo = cursor.fetchone()
if not existing_repo:
return (
jsonify({"error": f"The Docker repo {repo} is not in the database."}),
404,
)
# Supprimer le dépôt de la base de données
cursor.execute(
"DELETE FROM docker_watched_repos WHERE repo=?",
(repo,),
)
conn.commit()
return jsonify({"message": f"The Docker repo {repo} as been deleted from the watched repos."})
finally:
# Fermer la connexion à la base de données
close_db_connection(conn)
if __name__ == "__main__":
app.run(debug=False)