Skip to content

Commit

Permalink
Merge pull request #296 from NikolayBaranovv/check_master
Browse files Browse the repository at this point in the history
feat: close connection when master down
  • Loading branch information
psi29a authored Oct 24, 2024
2 parents 1894f36 + 6779faa commit 54d1a78
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 202 deletions.
25 changes: 24 additions & 1 deletion tests/advanced/test_replicaset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from time import time

from bson import SON
from pymongo.errors import AutoReconnect, ConfigurationError, OperationFailure
from pymongo.errors import AutoReconnect, ConfigurationError, NotPrimaryError
from twisted.internet import defer, reactor
from twisted.trial import unittest

Expand Down Expand Up @@ -346,3 +346,26 @@ def test_StaleConnection(self):
finally:
self.__mongod[0].kill(signal.SIGCONT)
yield conn.disconnect()

@defer.inlineCallbacks
def test_CloseConnectionAfterPrimaryStepDown(self):
conn = ConnectionPool(self.master_with_guaranteed_write)
try:
yield conn.db.coll.insert_one({"x": 42})

got_not_primary_error = False

while True:
try:
yield conn.db.coll.find_one()
if got_not_primary_error:
# We got error and then restored — OK
break
yield self.__sleep(1)
yield conn.admin.command({"replSetStepDown": 86400, "force": 1})
except (NotPrimaryError, AutoReconnect):
got_not_primary_error = True

finally:
yield conn.disconnect()
self.flushLoggedErrors(NotPrimaryError)
18 changes: 8 additions & 10 deletions tests/basic/test_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,20 +269,18 @@ def test_OperationFailure(self):

def fake_send_query(*args):
return defer.succeed(
Msg(
body=bson.encode(
{
"ok": 0.0,
"errmsg": "operation was interrupted",
"code": 11602,
"codeName": "InterruptedDueToReplStateChange",
}
)
Msg.create(
{
"ok": 0.0,
"errmsg": "operation was interrupted",
"code": 11602,
"codeName": "InterruptedDueToReplStateChange",
}
)
)

with patch(
"txmongo.protocol.MongoProtocol.send_msg", side_effect=fake_send_query
"txmongo.protocol.MongoProtocol._send_raw_msg", side_effect=fake_send_query
):
yield self.assertFailure(
self.coll.bulk_write(
Expand Down
20 changes: 10 additions & 10 deletions tests/basic/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,24 @@ def test_EncodeDecodeReply(self):
self.assertEqual(decoded.documents, request.documents)

def test_EncodeDecodeMsg(self):
request = Msg(
response_to=123,
flag_bits=OP_MSG_MORE_TO_COME,
body=bson.encode({"a": 1, "$db": "dbname"}),
request = Msg.create(
body={"a": 1, "$db": "dbname"},
payload={
"documents": [
bson.encode({"a": 1}),
bson.encode({"a": 2}),
{"a": 1},
{"a": 2},
],
"updates": [
bson.encode({"$set": {"z": 1}}),
bson.encode({"$set": {"z": 2}}),
{"$set": {"z": 1}},
{"$set": {"z": 2}},
],
"deletes": [
bson.encode({"_id": ObjectId()}),
bson.encode({"_id": ObjectId()}),
{"_id": ObjectId()},
{"_id": ObjectId()},
],
},
acknowledged=False,
response_to=123,
)

decoded = self._encode_decode(request)
Expand Down
5 changes: 1 addition & 4 deletions tests/basic/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,7 @@ def test_CursorClosingWithTimeout(self):
{"$where": "sleep(100); true"}, batch_size=5, timeout=0.8
)
with patch.object(
MongoProtocol,
"send_msg",
side_effect=MongoProtocol.send_msg,
autospec=True,
MongoProtocol, "send_msg", side_effect=MongoProtocol.send_msg, autospec=True
) as mock:
with self.assertRaises(TimeExceeded):
yield dfr
Expand Down
2 changes: 1 addition & 1 deletion tests/mongod.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def stop(self):
if self._proc and self._proc.pid:
d = defer.Deferred()
self._notify_stop.append(d)
self._proc.signalProcess("INT")
self.kill("INT")
return d
else:
return defer.fail("Not started yet")
Expand Down
9 changes: 8 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pymongo.errors import AutoReconnect
from twisted.internet import defer
from twisted.trial import unittest

Expand All @@ -15,5 +16,11 @@ def setUp(self):

@defer.inlineCallbacks
def tearDown(self):
yield self.coll.drop()
while True:
try:
yield self.coll.drop()
break
except AutoReconnect:
pass

yield self.conn.disconnect()
23 changes: 7 additions & 16 deletions txmongo/_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,18 @@
validate_ok_for_update,
)

from txmongo._bulk_constants import (
_DELETE,
_INSERT,
_UPDATE,
COMMAND_NAME,
PAYLOAD_ARG_NAME,
)
from txmongo.protocol import MongoProtocol, Msg
from txmongo.types import Document

_WriteOp = Union[InsertOne, UpdateOne, UpdateMany, ReplaceOne, DeleteOne, DeleteMany]

_INSERT = 0
_UPDATE = 1
_DELETE = 2

COMMAND_NAME = {
_INSERT: "insert",
_UPDATE: "update",
_DELETE: "delete",
}

PAYLOAD_ARG_NAME = {
_INSERT: "documents",
_UPDATE: "updates",
_DELETE: "deletes",
}


class _Run:
op_type: int
Expand Down
13 changes: 13 additions & 0 deletions txmongo/_bulk_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
_INSERT = 0
_UPDATE = 1
_DELETE = 2
COMMAND_NAME = {
_INSERT: "insert",
_UPDATE: "update",
_DELETE: "delete",
}
PAYLOAD_ARG_NAME = {
_INSERT: "documents",
_UPDATE: "updates",
_DELETE: "deletes",
}
Loading

0 comments on commit 54d1a78

Please sign in to comment.