-
Notifications
You must be signed in to change notification settings - Fork 3
/
app.py
101 lines (81 loc) · 2.65 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import base64
import hashlib
import os
import re
import requests
import pandas as pd
from requests_oauthlib import OAuth2Session
from flask import (
Flask,
request,
redirect,
session,
render_template,
make_response,
)
app = Flask(__name__)
app.secret_key = os.urandom(50)
client_id = os.environ.get("CLIENT_ID")
client_secret = os.environ.get("CLIENT_SECRET")
redirect_uri = os.environ.get("REDIRECT_URI")
auth_url = "https://twitter.com/i/oauth2/authorize"
token_url = "https://api.twitter.com/2/oauth2/token"
# Set the scopes
scopes = ["tweet.read", "users.read", "bookmark.read"]
# Create a code verifier
code_verifier = base64.urlsafe_b64encode(os.urandom(30)).decode("utf-8")
code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
# Create a code challenge
code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
code_challenge = code_challenge.replace("=", "")
def get_bookmarks(user_id, token):
print("Making a request to the bookmarks endpoint")
params = {"tweet.fields": "created_at"}
return requests.request(
"GET",
"https://api.twitter.com/2/users/{}/bookmarks".format(user_id),
headers={"Authorization": "Bearer {}".format(token["access_token"])},
params=params,
)
@app.route("/")
def hello():
return render_template("index.html")
@app.route("/start")
def demo():
global twitter
twitter = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
authorization_url, state = twitter.authorization_url(
auth_url, code_challenge=code_challenge, code_challenge_method="S256"
)
session["oauth_state"] = state
return redirect(authorization_url)
@app.route("/oauth/callback", methods=["GET"])
def callback():
code = request.args.get("code")
token = twitter.fetch_token(
token_url=token_url,
client_secret=client_secret,
code_verifier=code_verifier,
code=code,
)
print(token)
user_me = requests.request(
"GET",
"https://api.twitter.com/2/users/me",
headers={"Authorization": "Bearer {}".format(token["access_token"])},
).json()
print(user_me)
user_id = user_me["data"]["id"]
bookmarks = get_bookmarks(user_id, token).json()
global df
df = pd.DataFrame(bookmarks["data"])
return render_template("next.html")
@app.route("/oauth/next")
def export():
resp = make_response(df.to_csv())
resp.headers["Content-Disposition"] = "attachment; filename=export.csv"
resp.headers["Content-Type"] = "text/csv"
return resp
if __name__ == "__main__":
app.run(debug=True)