-
Notifications
You must be signed in to change notification settings - Fork 0
/
analise.py
73 lines (63 loc) · 2.61 KB
/
analise.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from pyspark.ml.classification import NaiveBayesModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
def preprocessing(lines):
words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
words = words.na.replace('', None)
words = words.na.drop()
words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
words = words.withColumn('word', F.regexp_replace('word', '#', ''))
words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
words = words.withColumn('word', F.regexp_replace('word', ':', ''))
return words
def tokenize_idf(inputDF):
tokenizer = Tokenizer(inputCol="word", outputCol="words")
wordsData = tokenizer.transform(inputDF)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
return idfModel.transform(featurizedData)
def getAnalysisTextBlob(text):
score = TextBlob(text).sentiment.polarity
if score < 0:
return "Negative"
elif score == 0:
return "Neutral"
else:
return "Positive"
def getAnalysisModel(score):
if score == 0:
return "Negative"
elif score == 1:
return "Neutral"
else:
return "Positive"
def text_classification(words,model):
# polarity detection
polarity_result_udf = udf(getAnalysisTextBlob, StringType())
words = words.withColumn("textblob",polarity_result_udf("word"))
# words = words.withColumn("naivebayes",)
return words
if __name__ == "__main__":
# create Spark session
spark = SparkSession.builder.appName("TwitterSentimentAnalysis") \
.config("spark.driver.memory", "8g") \
.config("spark.driver.cores",2) \
.getOrCreate()
nb_model=NaiveBayesModel.load("naive_bayes")
# read the tweet data from socket
lines = spark.readStream.format("socket").option("host", "0.0.0.0").option("port", 5555).load()
# Preprocess the data
words = preprocessing(lines)
# text classification to define polarity and subjectivity
words = text_classification(words,nb_model)
words = words.repartition(1)
query = words.writeStream.queryName("all_tweets")\
.outputMode("append").format("console").trigger(processingTime='60 seconds').start()
query.awaitTermination()