-
Notifications
You must be signed in to change notification settings - Fork 0
/
filter_emoticons.py
executable file
·185 lines (143 loc) · 4.88 KB
/
filter_emoticons.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/env/python
# Author: Luis Rei <me@luisrei.com>
# License: MIT License
'''
Reads from a file and outputs to files:
argv[1] - lines of utf8 encoded text (input)
pos.txt - lines that contain happy smileys (with smiley removed)
neg.txt - lines that contain negative smileys (with smiley removed)
other.txt - the remaining tweets
TODO:
- Discard all tweets with emoticons not on the positive or negative lists
- Should probably make hears and kisses (except broken heart) positive
'''
from __future__ import print_function
import sys
import os
import re
import gzip
import json
import codecs
import itertools
import random
import argparse
import multiprocessing
from functools import partial
pos_smileys = [u':)', u':D', u':-)', u':-))', u':]', u'=)', u'(:', u':o)']
neg_smileys = [u':(', u';(', u':-(', u':-[', u":'(", u":[", u":{", u">:("]
#
# add positive unicode emoji
#
pos_smileys += [unichr(x)
for x in range(int('1F600', 16), int('1F600', 16) + 16)]
pos_smileys += [unichr(int(x, 16)) for x in
['1F61A', '263A', '263A', '1F642', '1F917', '1F60C', '270C',
'1F44D']]
# positive cat faces
pos_smileys += [unichr(x) for x in
range(int('1F638', 16), int('1F63D', 16) + 1)]
#
# add negative unicode emoji
#
# exclude a few that are not unanbigously negative
# is weary face ('1F629') negative? if not add here:
not_neg = [unichr(int(x, 16)) for x in ['1F62B', '1F62A', '1F624']]
possibly_neg = [unichr(x) for x in
range(int('1F620', 16), int('1F620', 16) + 14)]
unambigously_neg = [x for x in possibly_neg if x not in not_neg]
neg_smileys += unambigously_neg
neg_smileys += [unichr(int(x, 16)) for x in
['1F610', '1F611', '2639', '16F41', '1F612', '1F61E', '1F64D',
'1F64E']]
# negative cat faces
neg_smileys += [unichr(int(x, 16)) for x in ['1F63E', '1F63F', '1F640']]
#
# Ambigous / unknown
# actually not all emoji but most anyways:
all_emoji = [unichr(x) for x in range(int('1F600', 16), int('1F64F', 16))]
unambigous = neg_smileys + pos_smileys
ambigous = [x for x in all_emoji if x not in unambigous]
POS = True
NEG = False
random.seed()
def process_line(prob_smiley, json_line):
'''
Identifies smileys or lack of them.
If they exist they are removed.
text is lowered (converted to lower case)
< min_tokens => discarded
'''
unicode_line = json.loads(json_line)['text'].strip()
tokens = unicode_line.split()
# handle well tokenized text
has_pos = False
has_neg = False
for sm in pos_smileys:
if sm in tokens:
has_pos = True
break
for sm in neg_smileys:
if sm in tokens:
has_neg = True
break
if not has_neg and not has_pos:
return (None, unicode_line) # No smileys
if has_pos and has_neg:
return (None, unicode_line) # Ambiguous
# "Flip a coin to see if smiley will be removed
remove_smiley = random.random() > prob_smiley
if has_pos:
if remove_smiley:
tokens = [x for x in tokens if x not in pos_smileys]
unicode_line = u' '.join(tokens)
return (POS, unicode_line)
if has_neg:
if remove_smiley:
tokens = [x for x in tokens if x not in neg_smileys]
unicode_line = u' '.join(tokens)
return (NEG, unicode_line)
# should be unreachable
return None
def main():
''' Main '''
# Stats Vars
n_pos = 0
n_neg = 0
n = 0
# arguments
parser = argparse.ArgumentParser()
# parser.add_argument('-n', '--num_jobs', action="store",
# dest="num_jobs", type=int, default=0)
parser.add_argument('-p', '--prob_smiley', action="store",
dest="prob_smiley", default=0.4)
parser.add_argument('input_tweet_file')
parser.add_argument('output_directory')
args = parser.parse_args()
# open output files
pos_path = os.path.join(args.output_directory, 'pos.txt')
f_pos = codecs.open(pos_path, 'w', encoding='utf-8')
neg_path = os.path.join(args.output_directory, 'neg.txt')
f_neg = codecs.open(neg_path, 'w', encoding='utf-8')
other_path = os.path.join(args.output_directory, 'other.txt')
f_other = codecs.open(other_path, 'w', encoding='utf-8')
# Read and Process
with gzip.open(args.input_tweet_file, 'r') as f:
for line in f:
res = process_line(args.prob_smiley, line)
n += 1
if res is None:
continue
if res[0] is None:
f_other.write(res[1] + u'\n')
if res[0] == POS:
n_pos += 1
f_pos.write(res[1] + u'\n')
if res[0] == NEG:
f_neg.write(res[1] + u'\n')
n_neg += 1
# explicitly close files
f_pos.close()
f_neg.close()
f_other.close()
if __name__ == '__main__':
main()