Skip to content

Commit

Permalink
Merge pull request #145 from gunthercox/twitter
Browse files Browse the repository at this point in the history
Added tests for twitter adapter
  • Loading branch information
gunthercox committed Apr 5, 2016
2 parents 57bb82b + c0a74c8 commit 51593fc
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 26 deletions.
4 changes: 2 additions & 2 deletions chatterbot/adapters/io/multi_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def process_response(self, statement):
Takes an input value.
Returns an output value.
"""
for adapter in self.adapters:
adapter.process_response(statement)
for i in range(1, len(self.adapters)):
self.adapters[i].process_response(statement)

return self.adapters[0].process_response(statement)

Expand Down
59 changes: 36 additions & 23 deletions chatterbot/adapters/storage/twitter_storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from chatterbot.adapters.storage import StorageAdapter
from chatterbot.conversation import Statement
from chatterbot.conversation import Statement, Response
import random
import twitter

Expand All @@ -13,20 +13,24 @@ def __init__(self, **kwargs):
super(TwitterAdapter, self).__init__(**kwargs)

self.api = twitter.Api(
consumer_key=kwargs["twitter_consumer_key"],
consumer_secret=kwargs["twitter_consumer_secret"],
access_token_key=kwargs["twitter_access_token_key"],
access_token_secret=kwargs["twitter_access_token_secret"]
consumer_key=kwargs.get('twitter_consumer_key'),
consumer_secret=kwargs.get('twitter_consumer_secret'),
access_token_key=kwargs.get('twitter_access_token_key'),
access_token_secret=kwargs.get('twitter_access_token_secret')
)

def count(self):
return 1

def find(self, statement_text):
tweets = self.api.GetSearch(term=statement_text, count=20)
tweet = random.choice(tweets)
tweets = self.api.GetSearch(term=statement_text, count=1)

if tweets:
return Statement(tweets[0].text, in_response_to=[
Response(statement_text)
])

return Statement(tweet.text)
return None

def filter(self, **kwargs):
"""
Expand All @@ -41,15 +45,15 @@ def filter(self, **kwargs):

# If no text parameter was given get a selection of recent tweets
if not statement_text:
statements = []
for i in range(0, 20):
statements.append(self.get_random())
statements = self.get_random(number=20)
return statements

tweets = self.api.GetSearch(term=statement_text)
tweet = random.choice(tweets)

statement = Statement(tweet.text)
statement = Statement(tweet.text, in_response_to=[
Response(statement_text)
])

return [statement]

Expand All @@ -62,12 +66,12 @@ def choose_word(self, words):
"""
for word in words:
# If the word contains only letters with a length from 4 to 9
if word.isalpha() and (len(word) > 3 or len(word) <= 9):
if word.isalpha() and len(word) > 3 and len(word) <= 9:
return word

return None

def get_random(self):
def get_random(self, number=1):
"""
Returns a random statement from the api.
To generate a random tweet, search twitter for recent tweets
Expand All @@ -76,24 +80,33 @@ def get_random(self):
the selected random tweet, and make a second search request.
Return one random tweet selected from the search results.
"""
statements = []
tweets = self.api.GetSearch(term="random", count=5)

tweet = random.choice(tweets)
base_response = Response(text=tweet.text)

words = tweet.text.split()
word = self.choose_word(words)

# If a valid word is found, make a second search request
# TODO: What if a word is not found?
if word:
tweets = self.api.GetSearch(term=word, count=5)
tweets = self.api.GetSearch(term=word, count=number)
if tweets:
tweet = random.choice(tweets)

# TODO: Handle non-ascii characters properly
cleaned_text = ''.join(
[i if ord(i) < 128 else ' ' for i in tweet.text]
)

return Statement(cleaned_text)
for tweet in tweets:
# TODO: Handle non-ascii characters properly
cleaned_text = ''.join(
[i if ord(i) < 128 else ' ' for i in tweet.text]
)
statements.append(
Statement(cleaned_text, in_response_to=[base_response])
)

if number == 1:
return random.choice(statements)

return statements

def drop(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@
'Programming Language :: Python :: 3.5',
],
test_suite='tests',
tests_require=[]
tests_require=['mock']
)
1 change: 1 addition & 0 deletions tests/storage_adapter_tests/test_data/get_search.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"statuses":[{"metadata":{"result_type":"popular","iso_language_code":"en"},"created_at":"Tue Dec 08 21:40:00 +0000 2015","id":674342688083283970,"id_str":"674342688083283970","text":"\ud83c\udfb6 C++, Java, Python &amp; Ruby. These are a few of my favorite things \ud83c\udfb6 #HourOfCode \ud83d\udd51\ud83d\udcbb\ud83d\udc7e\ud83c\udfae https:\/\/t.co\/GSCmPh9V6j","source":"\u003ca href=\"https:\/\/vine.co\" rel=\"nofollow\"\u003eVine for Android\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":58309829,"id_str":"58309829","name":"Nickelodeon","screen_name":"NickelodeonTV","location":"USA","description":"The Official Twitter for Nickelodeon, USA!","url":"https:\/\/t.co\/Lz9i6LdC4f","entities":{"url":{"urls":[{"url":"https:\/\/t.co\/Lz9i6LdC4f","expanded_url":"http:\/\/www.nick.com","display_url":"nick.com","indices":[0,23]}]},"description":{"urls":[]}},"protected":false,"followers_count":3914587,"friends_count":2263,"listed_count":3321,"created_at":"Sun Jul 19 22:19:02 +0000 2009","favourites_count":2757,"utc_offset":-18000,"time_zone":"Eastern Time (US & Canada)","geo_enabled":true,"verified":true,"statuses_count":33910,"lang":"en","contributors_enabled":false,"is_translator":false,"is_translation_enabled":true,"profile_background_color":"FA743E","profile_background_image_url":"http:\/\/pbs.twimg.com\/profile_background_images\/450718163508789248\/E26KBqrx.jpeg","profile_background_image_url_https":"https:\/\/pbs.twimg.com\/profile_background_images\/450718163508789248\/E26KBqrx.jpeg","profile_background_tile":false,"profile_image_url":"http:\/\/pbs.twimg.com\/profile_images\/671387650792665088\/sJxvItMD_normal.jpg","profile_image_url_https":"https:\/\/pbs.twimg.com\/profile_images\/671387650792665088\/sJxvItMD_normal.jpg","profile_banner_url":"https:\/\/pbs.twimg.com\/profile_banners\/58309829\/1448906254","profile_link_color":"D1771E","profile_sidebar_border_color":"FFFFFF","profile_sidebar_fill_color":"F0F0F0","profile_text_color":"333333","profile_use_background_image":false,"has_extended_profile":false,"default_profile":false,"default_profile_image":false,"following":false,"follow_request_sent":false,"notifications":false},"geo":null,"coordinates":null,"place":null,"contributors":null,"is_quote_status":false,"retweet_count":28,"favorite_count":126,"entities":{"hashtags":[{"text":"HourOfCode","indices":[72,83]}],"symbols":[],"user_mentions":[],"urls":[{"url":"https:\/\/t.co\/GSCmPh9V6j","expanded_url":"https:\/\/vine.co\/v\/i7QJji9Ldmr","display_url":"vine.co\/v\/i7QJji9Ldmr","indices":[89,112]}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"lang":"en"}]}
100 changes: 100 additions & 0 deletions tests/storage_adapter_tests/test_twitter_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from unittest import TestCase
from unittest import SkipTest
from mock import Mock, MagicMock
from chatterbot.adapters.storage import TwitterAdapter
import os
import json

def side_effect(*args, **kwargs):
from twitter import Status

# A special case for testing a response with no results
if 'term' in kwargs and kwargs.get('term') == 'Non-existant':
return []

current_directory = os.path.dirname(os.path.realpath(__file__))
data_file = os.path.join(
current_directory,
'test_data',
'get_search.json'
)
tweet_data = open(data_file)
data = json.loads(tweet_data.read())
tweet_data.close()

return [Status.NewFromJsonDict(x) for x in data.get('statuses', '')]


class TwitterAdapterTestCase(TestCase):

def setUp(self):
"""
Instantiate the adapter.
"""
self.adapter = TwitterAdapter(
twitter_consumer_key='twitter_consumer_key',
twitter_consumer_secret='twitter_consumer_secret',
twitter_access_token_key='twitter_access_token_key',
twitter_access_token_secret='twitter_access_token_secret'
)
self.adapter.api = Mock()

self.adapter.api.GetSearch = MagicMock(side_effect=side_effect)

def test_count(self):
"""
The count should always be 1.
"""
self.assertEqual(self.adapter.count(), 1)

def test_count(self):
"""
The update method should return the input statement.
"""
from chatterbot.conversation import Statement
statement = Statement('Hello')
result = self.adapter.update(statement)
self.assertEqual(statement, result)

def test_choose_word(self):
words = ['G', 'is', 'my', 'favorite', 'letter']
word = self.adapter.choose_word(words)
self.assertEqual(word, words[3])

def test_choose_no_word(self):
words = ['q']
word = self.adapter.choose_word(words)
self.assertEqual(word, None)

def test_drop(self):
"""
This drop method should do nothing.
"""
self.adapter.drop()

def test_filter(self):
statements = self.adapter.filter()
self.assertEqual(len(statements), 1)

def test_statement_not_found(self):
"""
Test the case that a match is not found.
"""
statement = self.adapter.find('Non-existant')
self.assertEqual(statement, None)

def test_statement_found(self):
found_statement = self.adapter.find('New statement')
self.assertNotEqual(found_statement, None)
self.assertTrue(len(found_statement.text))

def test_filter(self):
statements = self.adapter.filter(
text__contains='a few of my favorite things'
)
self.assertGreater(len(statements), 0)

def test_get_random(self):
statement = self.adapter.get_random()
self.assertNotEqual(statement, None)
self.assertTrue(len(statement.text))

0 comments on commit 51593fc

Please sign in to comment.