forked from Vinnl/feeds
-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitter.py
187 lines (150 loc) · 6.87 KB
/
twitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import datetime
from tweety import TwitterAsync
from feedgen.feed import FeedGenerator
import toml
import os
import logging
from playwright.async_api import async_playwright
import jmespath
import asyncio
async def generate_twitter_rss():
data = toml.load('./twitter.toml')
logging.info("`./twitter.toml` loaded successfully")
# Initialize Twitter API
twitter = TwitterAsync("SESSION")
# Singing In using Credentials
# account, password, extra = os.environ.get("TWITTER_ACCOUNT_PASSWORD", "").split()
# twitter.start(account, password, extra=extra)
# logging.info(f"logged in as `{twitter.user}`")
# Singing In using Cookies
# https://chromewebstore.google.com/detail/cookie-editor/hlkenndednhfkekhgcdicdfddnkalmdm?utm_campaign=cgagnier.ca
# Export -> Header String
cookie_value = os.environ.get("TWITTER_COOKIE_VALUE", "")
await twitter.load_cookies(cookie_value)
# To test in local, execute following cmd in terminal:
# export TWITTER_AUTH_TOKEN=<your token>
# twitter.load_auth_token(os.environ.get("TWITTER_AUTH_TOKEN"))
# logging.info("twitter.load_auth_token success")
xmls = []
for rss_file_name in data:
username = data[rss_file_name]["username"]
# Advanced Search - X/Twitter
today_date = datetime.datetime.now().strftime(r'%Y-%m-%d')
query = f'(from:{username}) since:{today_date}'
tweets = await twitter.search(query)
# Sort from old to new
tweets = sorted(list(tweets), key=lambda tweet: tweet.created_on, reverse=False)
twitter_url = f'https://x.com/{username}'
# Create RSS feed
fg = FeedGenerator()
fg.load_extension('media')
fg.id(twitter_url)
fg.title(data[rss_file_name]["rssname"])
fg.author({'name': username, 'uri': twitter_url})
fg.link(href=twitter_url)
fg.language('en')
fg.description(data[rss_file_name]["rssname"])
# Add tweets to the RSS feed
for tweet in tweets:
fe = fg.add_entry()
tweet_url = f'https://x.com/{username}/status/{tweet.id}'
result = parse_tweet(await scrape_tweet(tweet_url))
reply_to = f"reply to @{result['in_reply_to_screen_name']} " if result['in_reply_to_screen_name'] else ""
title = f"{result['username']} (@{result['userid']}) {reply_to}on X"
description = result["full_text"]
media_urls = result["media_urls"]
if media_urls:
for attached_url in media_urls:
description = description.replace(attached_url, '')
media_includes = set()
for media_expanded_url, media_type \
in zip(result["media_expanded_urls"], result["media_types"]):
match media_type:
case "photo":
media_includes.add("🌄")
medium = "image"
media_found_log = "Found [image] media: "
case "video"|"animated_gif":
media_includes.add("🎬")
medium = "image"
media_found_log = f"Found [{media_type}] media but only embed preview image: "
case _:
continue
fe.media.content(url=media_expanded_url, medium=medium) # type: ignore
logging.info(f"{media_found_log}{media_expanded_url}")
if media_includes:
title += " " + "".join(media_includes)
fe.title(title)
fe.description(description)
fe.id(tweet_url)
fe.link(href=tweet_url)
fe.pubDate(tweet.created_on)
# Ensure the 'public' directory exists
os.makedirs('public', exist_ok=True)
# Generate the RSS XML
xml_file_name = f'public/{rss_file_name}.xml'
fg.rss_file(xml_file_name, pretty=True)
xmls.append(xml_file_name)
logging.info(f"{xml_file_name} has been generated")
logging.info("Feeds generated in `public/` folder")
logging.info("They will be published at:")
for xml in xmls:
logging.info(f"- https://changchiyou.github.io/wildrift-news-feeds/{xml}")
def parse_tweet(data: dict) -> dict:
"""
Parse Twitter tweet JSON dataset for the most important fields.
Reference: https://scrapfly.io/blog/how-to-scrape-twitter/
"""
result = jmespath.search(
"""{
userid: core.user_results.result.legacy.screen_name,
username: core.user_results.result.legacy.name,
created_at: legacy.created_at,
attached_display_urls: legacy.entities.urls[].display_url,
attached_expanded_urls: legacy.entities.urls[].expanded_url,
attached_urls: legacy.entities.urls[].url,
media_expanded_urls: legacy.entities.media[].media_url_https,
media_types: legacy.entities.media[].type,
media_urls: legacy.entities.media[].url,
media_video_info: legacy.entities.media[].video_info,
tagged_userids: legacy.entities.user_mentions[].screen_name,
tagged_hashtags: legacy.entities.hashtags[].text,
full_text: legacy.full_text,
lang: legacy.lang,
in_reply_to_screen_name: legacy.in_reply_to_screen_name
}""",
data,
)
return result
async def scrape_tweet(url: str) -> dict:
"""
Scrape a single tweet page for Tweet thread e.g.:
Return parent tweet, reply tweets and recommended tweets
Reference: https://scrapfly.io/blog/how-to-scrape-twitter/
"""
_xhr_calls = []
def intercept_response(response):
"""capture all background requests and save them"""
# we can extract details from background requests
if response.request.resource_type == "xhr":
_xhr_calls.append(response)
return response
async with async_playwright() as pw:
browser = await pw.firefox.launch()
context = await browser.new_context(viewport={"width": 1920, "height": 1080})
page = await context.new_page()
# enable background request intercepting:
page.on("response", intercept_response)
# go to url and wait for the page to load
await page.goto(url, wait_until="domcontentloaded")
await page.wait_for_selector("[data-testid='tweet']")
# find all tweet background requests:
tweet_calls = [f for f in _xhr_calls if "TweetResultByRestId" in f.url]
for xhr in tweet_calls:
data = await xhr.json()
return data['data']['tweetResult']['result']
logging.info(f"{url} has been scrapped by `scrape_tweet`")
return dict() # Would not be executed
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')
asyncio.run(generate_twitter_rss())