-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdag.py
95 lines (63 loc) · 2.1 KB
/
dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from datetime import datetime
from datetime import timedelta
from airflow.decorators import task, dag
import tweepy
import pandas as pd
import json
import s3fs
import re
@task
def get_data_from_twitter_api():
api_key = "{/YOUR API KEY}"
api_secret_key = "{/YOUR API SECRET KEY}"
access_token = "{/YOUR ACCESS TOKEN}"
access_secret_token = "{/YOUR ACCESS SECRET TOKEN}"
auth = tweepy.OAuthHandler(api_key, api_secret_key)
auth.set_access_token(access_token, access_secret_token)
api = tweepy.API(auth)
tweets = api.user_timeline(
screen_name = '@GreatestQuotes',
count = 200,
include_rts = False,
tweet_mode = 'extended'
)
ts = [t._json for t in tweets]
return ts
@task
def transform_the_tweets(tweets):
tweet_list = []
for tweet in tweets:
text = re.split(r'–| –| – |– |. - |. - |. - |. -|.-|.\n|-| -|- | - ', tweet['full_text'])
quote = text[0]+'.'
quote_by = text[1]
data = {
"username": tweet['user']['screen_name'],
"quote" : quote,
"quote by" : quote_by,
"created time" : tweet['created_at'],
"total like": tweet['favorite_count'],
"total retweet" : tweet['retweet_count']
}
tweet_list.append(data)
return tweet_list
@task
def load_into_s3(tweet_list):
df = pd.DataFrame(tweet_list)
df.to_csv("s3://arkan-twitter-to-s3-storage/quotes.csv")
default_args = {
'owner' : 'arkan',
'depends_on_past' : False,
'start_date' : datetime(2023, 12, 26),
'email' : ['airflow@example.com'],
'email_on_failure' : False,
'email_on_retry' : False,
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
}
@dag('twitter_to_s3',default_args = default_args,
description = 'extract tweet from twitter then transform it and load it into s3 bucket')
def may_dag():
extract = get_data_from_twitter_api()
transform = transform_the_tweets(extract)
load_into_s3(transform)
may_dag()