Skip to content
This repository has been archived by the owner on Nov 5, 2019. It is now read-only.

Commit

Permalink
Merge pull request #62 from manover/rs_secondary_connection_preference
Browse files Browse the repository at this point in the history
Support secondary only connections
  • Loading branch information
jehiah committed Apr 22, 2013
2 parents a3fcd99 + 49206d9 commit 4f92d38
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 30 deletions.
36 changes: 27 additions & 9 deletions asyncmongo/asyncjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
"""

import logging
import random
from bson import SON

import message
import helpers
from errors import AuthenticationError, RSConnectionError, InterfaceError


class AsyncMessage(object):
def __init__(self, connection, message, callback):
super(AsyncMessage, self).__init__()
Expand Down Expand Up @@ -106,13 +108,14 @@ def process(self, response=None, error=None):
raise ValueError("Unexpected state: %s" % self._state)

class ConnectRSJob(object):
def __init__(self, connection, seed, rs):
def __init__(self, connection, seed, rs, secondary_only):
self.connection = connection
self.known_hosts = set(seed)
self.rs = rs
self._tried_hosts = set()
self._blacklisted = set()
self._state = "seed"
self._primary = None
self._sec_only = secondary_only

def __repr__(self):
return "ConnectRSJob at 0x%X, state = %s" % (id(self), self._state)
Expand All @@ -125,17 +128,25 @@ def process(self, response=None, error=None):
self._state = "seed"

if self._state == "seed":
fresh = self.known_hosts ^ self._tried_hosts
if self._sec_only and self._primary:
# Add primary host to blacklisted to avoid connecting to it
self._blacklisted.add(self._primary)

fresh = self.known_hosts ^ self._blacklisted
logging.debug("Working through the rest of the host list: %r", fresh)

while fresh:
if self._primary and self._primary not in self._tried_hosts:
if self._primary and self._primary not in self._blacklisted:
# Try primary first
h = self._primary
else:
h = fresh.pop()
h = random.choice(list(fresh))

if h in fresh:
fresh.remove(h)

self._tried_hosts.add(h)
# Add tried host to blacklisted
self._blacklisted.add(h)

logging.debug("Connecting to %s:%s", *h)
self.connection._host, self.connection._port = h
Expand Down Expand Up @@ -178,11 +189,18 @@ def process(self, response=None, error=None):
self.known_hosts.update(helpers._parse_host(h) for h in hosts)

ismaster = res.get("ismaster")
if ismaster:
logging.info("Connected to master")
hidden = res.get("hidden")
if ismaster and not self._sec_only: # master and required to connect to primary
assert not hidden, "Primary cannot be hidden"
logging.debug("Connected to master (%s)", res.get("me", "unknown"))
self._state = "done"
self.connection._next_job()
else:
elif not ismaster and self._sec_only and not hidden: # not master and required to connect to secondary
assert res.get("secondary"), "Secondary must self-report as secondary"
logging.debug("Connected to secondary (%s)", res.get("me", "unknown"))
self._state = "done"
self.connection._next_job()
else: # either not master and primary connection required or master and secondary required
primary = res.get("primary")
if primary:
self._primary = helpers._parse_host(primary)
Expand Down
7 changes: 6 additions & 1 deletion asyncmongo/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class Connection(object):
- `autoreconnect` (optional): auto reconnect on interface errors
- `rs`: replica set name (required when replica sets are used)
- `seed`: seed list to connect to a replica set (required when replica sets are used)
- `secondary_only`: (optional, only useful for replica set connections)
if true, connect to a secondary member only
- `**kwargs`: passed to `backends.AsyncBackend.register_stream`
"""
Expand All @@ -48,12 +50,14 @@ def __init__(self,
backend="tornado",
rs=None,
seed=None,
secondary_only=False,
**kwargs):
assert isinstance(autoreconnect, bool)
assert isinstance(dbuser, (str, unicode, NoneType))
assert isinstance(dbpass, (str, unicode, NoneType))
assert isinstance(rs, (str, NoneType))
assert pool
assert isinstance(secondary_only, bool)

if rs:
assert host is None
Expand All @@ -68,6 +72,7 @@ def __init__(self,
self._port = port
self.__rs = rs
self.__seed = seed
self.__secondary_only = secondary_only
self.__dbuser = dbuser
self.__dbpass = dbpass
self.__stream = None
Expand All @@ -91,7 +96,7 @@ def __connect(self):
self._put_job(asyncjobs.AuthorizeJob(self, self.__dbuser, self.__dbpass, self.__pool))

if self.__rs:
self._put_job(asyncjobs.ConnectRSJob(self, self.__seed, self.__rs))
self._put_job(asyncjobs.ConnectRSJob(self, self.__seed, self.__rs, self.__secondary_only))
# Mark the connection as alive, even though it's not alive yet to prevent double-connecting
self.__alive = True
else:
Expand Down
59 changes: 40 additions & 19 deletions test/test_replica_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
import time
import logging
import subprocess
import socket

import test_shunt
import asyncmongo
import asyncmongo.connection
import asyncmongo.errors

TEST_TIMESTAMP = int(time.time())

Expand All @@ -19,6 +17,7 @@ class ReplicaSetTest(test_shunt.MongoTest):
]

def mongo_cmd(self, cmd, port=27018, res='"ok" : 1'):
logging.info("mongo_cmd: %s", cmd)
pipe = subprocess.Popen("mongo --port %d" % port, shell=True,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
reply = pipe.communicate(cmd)[0]
Expand All @@ -34,34 +33,44 @@ def wait_master(self, port):
logging.info("Waiting for %d to become master", port)
time.sleep(5)

def wait_secondary(self, port):
while True:
if self.mongo_cmd("db.isMaster();", port).find('"secondary" : true') > 0:
logging.info("%d is a secondary", port)
break
else:
logging.info("Waiting for %d to become secondary", port)
time.sleep(5)

def setUp(self):
super(ReplicaSetTest, self).setUp()
hostname = socket.gethostname()
logging.info("configuring a replica set at %s" % hostname)
logging.info("configuring a replica set at 127.0.0.1")
cfg = """
{
"_id" : "rs0",
"members" : [
{
"_id" : 0,
"host" : "%(hostname)s:27018"
"host" : "127.0.0.1:27018"
},
{
"_id" : 1,
"host" : "%(hostname)s:27019",
"host" : "127.0.0.1:27019",
"priority" : 2
},
{
"_id" : 2,
"host" : "%(hostname)s:27020",
"priority" : 0
"host" : "127.0.0.1:27020",
"priority" : 0,
"hidden": true
}
]
}
""" % dict(hostname=hostname)
self.mongo_cmd("rs.initiate(%s);" % cfg)
"""
self.mongo_cmd("rs.initiate(%s);" % cfg, 27019)
logging.info("waiting for replica set to finish configuring")
self.wait_master(27019)
self.wait_secondary(27018)

def test_connection(self):
class Pool(object):
Expand All @@ -76,18 +85,30 @@ class AsyncClose(object):
def process(self, *args, **kwargs):
tornado.ioloop.IOLoop.instance().stop()

hostname = socket.gethostname()
try:
conn = asyncmongo.connection.Connection(pool=Pool(),
seed=[(hostname, 27018), (hostname, 27020)],
rs="rs0")
for i in xrange(10):
conn = asyncmongo.connection.Connection(pool=Pool(),
seed=[('127.0.0.1', 27018), ('127.0.0.1', 27020)],
rs="rs0")

conn._put_job(AsyncClose(), 0)
conn._next_job()
tornado.ioloop.IOLoop.instance().start()
conn._put_job(AsyncClose(), 0)
conn._next_job()
tornado.ioloop.IOLoop.instance().start()

assert conn._host == '127.0.0.1'
assert conn._port == 27019

for i in xrange(10):
conn = asyncmongo.connection.Connection(pool=Pool(),
seed=[('127.0.0.1', 27018), ('127.0.0.1', 27020)],
rs="rs0", secondary_only=True)

conn._put_job(AsyncClose(), 0)
conn._next_job()
tornado.ioloop.IOLoop.instance().start()

assert conn._host == hostname
assert conn._port == 27019
assert conn._host == '127.0.0.1'
assert conn._port == 27018

except:
tornado.ioloop.IOLoop.instance().stop()
Expand Down
2 changes: 1 addition & 1 deletion test/test_shunt.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def setUp(self):
self.temp_dirs.append(dirname)

options = ['mongod', '--oplogSize', '2', '--dbpath', dirname,
'--smallfiles', '-v', '--nojournal'] + list(options)
'--smallfiles', '-v', '--nojournal', '--bind_ip', '0.0.0.0'] + list(options)
logging.debug(options)
pipe = subprocess.Popen(options)
self.mongods.append(pipe)
Expand Down

0 comments on commit 4f92d38

Please sign in to comment.