Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tests for twitter adapter #145

Merged
merged 4 commits into from
Apr 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions chatterbot/adapters/io/multi_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def process_response(self, statement):
Takes an input value.
Returns an output value.
"""
for adapter in self.adapters:
adapter.process_response(statement)
for i in range(1, len(self.adapters)):
self.adapters[i].process_response(statement)

return self.adapters[0].process_response(statement)

Expand Down
59 changes: 36 additions & 23 deletions chatterbot/adapters/storage/twitter_storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from chatterbot.adapters.storage import StorageAdapter
from chatterbot.conversation import Statement
from chatterbot.conversation import Statement, Response
import random
import twitter

Expand All @@ -13,20 +13,24 @@ def __init__(self, **kwargs):
super(TwitterAdapter, self).__init__(**kwargs)

self.api = twitter.Api(
consumer_key=kwargs["twitter_consumer_key"],
consumer_secret=kwargs["twitter_consumer_secret"],
access_token_key=kwargs["twitter_access_token_key"],
access_token_secret=kwargs["twitter_access_token_secret"]
consumer_key=kwargs.get('twitter_consumer_key'),
consumer_secret=kwargs.get('twitter_consumer_secret'),
access_token_key=kwargs.get('twitter_access_token_key'),
access_token_secret=kwargs.get('twitter_access_token_secret')
)

def count(self):
return 1

def find(self, statement_text):
tweets = self.api.GetSearch(term=statement_text, count=20)
tweet = random.choice(tweets)
tweets = self.api.GetSearch(term=statement_text, count=1)

if tweets:
return Statement(tweets[0].text, in_response_to=[
Response(statement_text)
])

return Statement(tweet.text)
return None

def filter(self, **kwargs):
"""
Expand All @@ -41,15 +45,15 @@ def filter(self, **kwargs):

# If no text parameter was given get a selection of recent tweets
if not statement_text:
statements = []
for i in range(0, 20):
statements.append(self.get_random())
statements = self.get_random(number=20)
return statements

tweets = self.api.GetSearch(term=statement_text)
tweet = random.choice(tweets)

statement = Statement(tweet.text)
statement = Statement(tweet.text, in_response_to=[
Response(statement_text)
])

return [statement]

Expand All @@ -62,12 +66,12 @@ def choose_word(self, words):
"""
for word in words:
# If the word contains only letters with a length from 4 to 9
if word.isalpha() and (len(word) > 3 or len(word) <= 9):
if word.isalpha() and len(word) > 3 and len(word) <= 9:
return word

return None

def get_random(self):
def get_random(self, number=1):
"""
Returns a random statement from the api.
To generate a random tweet, search twitter for recent tweets
Expand All @@ -76,24 +80,33 @@ def get_random(self):
the selected random tweet, and make a second search request.
Return one random tweet selected from the search results.
"""
statements = []
tweets = self.api.GetSearch(term="random", count=5)

tweet = random.choice(tweets)
base_response = Response(text=tweet.text)

words = tweet.text.split()
word = self.choose_word(words)

# If a valid word is found, make a second search request
# TODO: What if a word is not found?
if word:
tweets = self.api.GetSearch(term=word, count=5)
tweets = self.api.GetSearch(term=word, count=number)
if tweets:
tweet = random.choice(tweets)

# TODO: Handle non-ascii characters properly
cleaned_text = ''.join(
[i if ord(i) < 128 else ' ' for i in tweet.text]
)

return Statement(cleaned_text)
for tweet in tweets:
# TODO: Handle non-ascii characters properly
cleaned_text = ''.join(
[i if ord(i) < 128 else ' ' for i in tweet.text]
)
statements.append(
Statement(cleaned_text, in_response_to=[base_response])
)

if number == 1:
return random.choice(statements)

return statements

def drop(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@
'Programming Language :: Python :: 3.5',
],
test_suite='tests',
tests_require=[]
tests_require=['mock']
)
1 change: 1 addition & 0 deletions tests/storage_adapter_tests/test_data/get_search.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"statuses":[{"metadata":{"result_type":"popular","iso_language_code":"en"},"created_at":"Tue Dec 08 21:40:00 +0000 2015","id":674342688083283970,"id_str":"674342688083283970","text":"\ud83c\udfb6 C++, Java, Python &amp; Ruby. These are a few of my favorite things \ud83c\udfb6 #HourOfCode \ud83d\udd51\ud83d\udcbb\ud83d\udc7e\ud83c\udfae https:\/\/t.co\/GSCmPh9V6j","source":"\u003ca href=\"https:\/\/vine.co\" rel=\"nofollow\"\u003eVine for Android\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":58309829,"id_str":"58309829","name":"Nickelodeon","screen_name":"NickelodeonTV","location":"USA","description":"The Official Twitter for Nickelodeon, USA!","url":"https:\/\/t.co\/Lz9i6LdC4f","entities":{"url":{"urls":[{"url":"https:\/\/t.co\/Lz9i6LdC4f","expanded_url":"http:\/\/www.nick.com","display_url":"nick.com","indices":[0,23]}]},"description":{"urls":[]}},"protected":false,"followers_count":3914587,"friends_count":2263,"listed_count":3321,"created_at":"Sun Jul 19 22:19:02 +0000 2009","favourites_count":2757,"utc_offset":-18000,"time_zone":"Eastern Time (US & Canada)","geo_enabled":true,"verified":true,"statuses_count":33910,"lang":"en","contributors_enabled":false,"is_translator":false,"is_translation_enabled":true,"profile_background_color":"FA743E","profile_background_image_url":"http:\/\/pbs.twimg.com\/profile_background_images\/450718163508789248\/E26KBqrx.jpeg","profile_background_image_url_https":"https:\/\/pbs.twimg.com\/profile_background_images\/450718163508789248\/E26KBqrx.jpeg","profile_background_tile":false,"profile_image_url":"http:\/\/pbs.twimg.com\/profile_images\/671387650792665088\/sJxvItMD_normal.jpg","profile_image_url_https":"https:\/\/pbs.twimg.com\/profile_images\/671387650792665088\/sJxvItMD_normal.jpg","profile_banner_url":"https:\/\/pbs.twimg.com\/profile_banners\/58309829\/1448906254","profile_link_color":"D1771E","profile_sidebar_border_color":"FFFFFF","profile_sidebar_fill_color":"F0F0F0","profile_text_color":"333333","profile_use_background_image":false,"has_extended_profile":false,"default_profile":false,"default_profile_image":false,"following":false,"follow_request_sent":false,"notifications":false},"geo":null,"coordinates":null,"place":null,"contributors":null,"is_quote_status":false,"retweet_count":28,"favorite_count":126,"entities":{"hashtags":[{"text":"HourOfCode","indices":[72,83]}],"symbols":[],"user_mentions":[],"urls":[{"url":"https:\/\/t.co\/GSCmPh9V6j","expanded_url":"https:\/\/vine.co\/v\/i7QJji9Ldmr","display_url":"vine.co\/v\/i7QJji9Ldmr","indices":[89,112]}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"lang":"en"}]}
100 changes: 100 additions & 0 deletions tests/storage_adapter_tests/test_twitter_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from unittest import TestCase
from unittest import SkipTest
from mock import Mock, MagicMock
from chatterbot.adapters.storage import TwitterAdapter
import os
import json

def side_effect(*args, **kwargs):
from twitter import Status

# A special case for testing a response with no results
if 'term' in kwargs and kwargs.get('term') == 'Non-existant':
return []

current_directory = os.path.dirname(os.path.realpath(__file__))
data_file = os.path.join(
current_directory,
'test_data',
'get_search.json'
)
tweet_data = open(data_file)
data = json.loads(tweet_data.read())
tweet_data.close()

return [Status.NewFromJsonDict(x) for x in data.get('statuses', '')]


class TwitterAdapterTestCase(TestCase):

def setUp(self):
"""
Instantiate the adapter.
"""
self.adapter = TwitterAdapter(
twitter_consumer_key='twitter_consumer_key',
twitter_consumer_secret='twitter_consumer_secret',
twitter_access_token_key='twitter_access_token_key',
twitter_access_token_secret='twitter_access_token_secret'
)
self.adapter.api = Mock()

self.adapter.api.GetSearch = MagicMock(side_effect=side_effect)

def test_count(self):
"""
The count should always be 1.
"""
self.assertEqual(self.adapter.count(), 1)

def test_count(self):
"""
The update method should return the input statement.
"""
from chatterbot.conversation import Statement
statement = Statement('Hello')
result = self.adapter.update(statement)
self.assertEqual(statement, result)

def test_choose_word(self):
words = ['G', 'is', 'my', 'favorite', 'letter']
word = self.adapter.choose_word(words)
self.assertEqual(word, words[3])

def test_choose_no_word(self):
words = ['q']
word = self.adapter.choose_word(words)
self.assertEqual(word, None)

def test_drop(self):
"""
This drop method should do nothing.
"""
self.adapter.drop()

def test_filter(self):
statements = self.adapter.filter()
self.assertEqual(len(statements), 1)

def test_statement_not_found(self):
"""
Test the case that a match is not found.
"""
statement = self.adapter.find('Non-existant')
self.assertEqual(statement, None)

def test_statement_found(self):
found_statement = self.adapter.find('New statement')
self.assertNotEqual(found_statement, None)
self.assertTrue(len(found_statement.text))

def test_filter(self):
statements = self.adapter.filter(
text__contains='a few of my favorite things'
)
self.assertGreater(len(statements), 0)

def test_get_random(self):
statement = self.adapter.get_random()
self.assertNotEqual(statement, None)
self.assertTrue(len(statement.text))