Skip to content

Commit

Permalink
Basic working version of upgraded db_connection
Browse files Browse the repository at this point in the history
I added all of the missing callback functions so that we can start work on
transaction support hopefully soon.
  • Loading branch information
Justin Wood committed May 3, 2019
1 parent b99dca2 commit 63c20ff
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 50 deletions.
24 changes: 12 additions & 12 deletions lib/mongo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ defmodule Mongo do
wv_query = %Query{action: :wire_version}

with {:ok, conn, _, _} <- select_server(topology_pid, :read, opts),
{:ok, version} <- DBConnection.execute(conn, wv_query, [], defaults(opts)) do
{:ok, _query, version} <- DBConnection.execute(conn, wv_query, [], defaults(opts)) do
cursor? = version >= 1 and Keyword.get(opts, :use_cursor, true)
opts = Keyword.drop(opts, ~w(allow_disk_use max_time use_cursor)a)

Expand Down Expand Up @@ -485,7 +485,7 @@ defmodule Mongo do
def raw_find(conn, coll, query, select, opts) do
params = [query, select]
query = %Query{action: :find, extra: coll}
with {:ok, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
with {:ok, _query, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
:ok <- maybe_failure(reply),
op_reply(docs: docs, cursor_id: cursor_id, from: from, num: num) = reply,
do: {:ok, %{from: from, num: num, cursor_id: cursor_id, docs: docs}}
Expand All @@ -494,7 +494,7 @@ defmodule Mongo do
@doc false
def get_more(conn, coll, cursor, opts) do
query = %Query{action: :get_more, extra: {coll, cursor}}
with {:ok, reply} <- DBConnection.execute(conn, query, [], defaults(opts)),
with {:ok, _query, reply} <- DBConnection.execute(conn, query, [], defaults(opts)),
:ok <- maybe_failure(reply),
op_reply(docs: docs, cursor_id: cursor_id, from: from, num: num) = reply,
do: {:ok, %{from: from, num: num, cursor_id: cursor_id, docs: docs}}
Expand All @@ -503,7 +503,7 @@ defmodule Mongo do
@doc false
def kill_cursors(conn, cursor_ids, opts) do
query = %Query{action: :kill_cursors, extra: cursor_ids}
with {:ok, :ok} <- DBConnection.execute(conn, query, [], defaults(opts)),
with {:ok, _query, :ok} <- DBConnection.execute(conn, query, [], defaults(opts)),
do: :ok
end

Expand All @@ -527,7 +527,7 @@ defmodule Mongo do
params = [query]
query = %Query{action: :command}

with {:ok, reply} <- DBConnection.execute(conn, query, params,
with {:ok, _query, reply} <- DBConnection.execute(conn, query, params,
defaults(opts)) do
case reply do
op_reply(flags: flags, docs: [%{"$err" => reason, "code" => code}])
Expand Down Expand Up @@ -572,7 +572,7 @@ defmodule Mongo do
params = [doc]
query = %Query{action: :insert_one, extra: coll}
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
{:ok, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
{:ok, _query, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
:ok <- maybe_failure(reply),
{:ok, _doc} <- get_last_error(reply),
do: {:ok, %Mongo.InsertOneResult{inserted_id: id}}
Expand Down Expand Up @@ -616,7 +616,7 @@ defmodule Mongo do
params = docs
query = %Query{action: :insert_many, extra: coll}
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
{:ok, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
{:ok, _query, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
:ok <- maybe_failure(reply),
{:ok, _doc} <- get_last_error(reply),
ids = index_map(ids, 0, %{}),
Expand All @@ -639,7 +639,7 @@ defmodule Mongo do
params = [filter]
query = %Query{action: :delete_one, extra: coll}
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
{:ok, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
{:ok, _query, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
:ok <- maybe_failure(reply),
{:ok, %{"n" => n}} <- get_last_error(reply),
do: {:ok, %Mongo.DeleteResult{deleted_count: n}}
Expand All @@ -661,7 +661,7 @@ defmodule Mongo do
params = [filter]
query = %Query{action: :delete_many, extra: coll}
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
{:ok, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
{:ok, _query, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
:ok <- maybe_failure(reply),
{:ok, %{"n" => n}} <- get_last_error(reply),
do: {:ok, %Mongo.DeleteResult{deleted_count: n}}
Expand Down Expand Up @@ -690,7 +690,7 @@ defmodule Mongo do
params = [filter, replacement]
query = %Query{action: :replace_one, extra: coll}
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
{:ok, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
{:ok, _query, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
:ok <- maybe_failure(reply),
{:ok, doc} <- get_last_error(reply) do
case doc do
Expand Down Expand Up @@ -736,7 +736,7 @@ defmodule Mongo do
params = [filter, update]
query = %Query{action: :update_one, extra: coll}
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
{:ok, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
{:ok, _qurey, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
:ok <- maybe_failure(reply),
{:ok, doc} <- get_last_error(reply) do
case doc do
Expand Down Expand Up @@ -775,7 +775,7 @@ defmodule Mongo do
params = [filter, update]
query = %Query{action: :update_many, extra: coll}
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
{:ok, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
{:ok, _query, reply} <- DBConnection.execute(conn, query, params, defaults(opts)),
:ok <- maybe_failure(reply),
{:ok, doc} <- get_last_error(reply) do
case doc do
Expand Down
92 changes: 77 additions & 15 deletions lib/mongo/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ defmodule Mongo.Protocol do
@update_flags ~w(upsert)a
@write_concern ~w(w j wtimeout)a

@doc """
DBConnection callback
"""
def disconnect(_error, %{socket: {mod, sock}} = s) do
notify_disconnect(s)
mod.close(sock)
Expand All @@ -21,6 +24,9 @@ defmodule Mongo.Protocol do
GenServer.cast(pid, {:disconnect, type, host})
end

@doc """
DBConnection callback
"""
def connect(opts) do
{write_concern, opts} = Keyword.split(opts, @write_concern)
write_concern = Keyword.put_new(write_concern, :w, 1)
Expand All @@ -36,7 +42,8 @@ defmodule Mongo.Protocol do
auth_mechanism: opts[:auth_mechanism] || nil,
connection_type: Keyword.fetch!(opts, :connection_type),
topology_pid: Keyword.fetch!(opts, :topology_pid),
ssl: opts[:ssl] || false
ssl: opts[:ssl] || false,
status: :idle
}

connect(opts, s)
Expand Down Expand Up @@ -139,26 +146,44 @@ defmodule Mongo.Protocol do
end
end

def handle_info({:tcp, data}, s) do
err = Mongo.Error.exception(message: "unexpected async recv: #{inspect data}")
{:disconnect, err, s}
@doc """
DBConnection callback
"""
def handle_begin(_opts, state) do
{:idle, state}
end

def handle_info({:tcp_closed, _}, s) do
err = Mongo.Error.exception(tag: :tcp, action: "async recv", reason: :closed, host: s.host)
{:disconnect, err, s}
@doc """
DBConnection callback
"""
def handle_close(_query, _opts, state) do
{:ok, nil, state}
end

def handle_info({:tcp_error, _, reason}, s) do
err = Mongo.Error.exception(tag: :tcp, action: "async recv", reason: reason, host: s.host)
{:disconnect, err, s}
@doc """
DBConnection callback
"""
def handle_commit(_opts, state) do
{:idle, state}
end

def handle_info({:ssl_closed, _}, s) do
err = Mongo.Error.exception(tag: :ssl, action: "async recv", reason: :closed, host: s.host)
{:disconnect, err, s}
@doc """
DBConnection callback
"""
def handle_deallocate(query, cursor, opts, state) do
{:ok, :ok, state}
end

@doc """
DBConnection callback
"""
def handle_declare(query, params, opts, state) do
{:ok, query, :ok, state}
end

@doc """
DBConnection callback
"""
def checkout(%{socket: {mod, sock}} = s) do
case setopts(mod, sock, [active: :false]) do
:ok -> recv_buffer(s)
Expand All @@ -185,6 +210,9 @@ defmodule Mongo.Protocol do
end
end

@doc """
DBConnection callback
"""
def checkin(%{socket: {mod, sock}} = s) do
:ok = setopts(mod, sock, [active: :once])
{:ok, s}
Expand All @@ -194,13 +222,44 @@ defmodule Mongo.Protocol do
handle_execute(query, params, opts, s)
end

def handle_execute(%Mongo.Query{action: action, extra: extra}, params, opts, original_state) do
@doc """
DBConnection callback
"""
def handle_fetch(query, cursor, opts, state) do
{:cont, :ok, state}
end

@doc """
DBConnection callback
"""
def handle_prepare(query, opts, state) do
{:ok, query, state}
end

@doc """
DBConnection callback
"""
def handle_rollback(opts, state) do
{:idle, state}
end

@doc """
DBConnection callback
"""
def handle_status(opts, state) do
{:idle, state}
end

@doc """
DBConnection callback
"""
def handle_execute(%Mongo.Query{action: action, extra: extra} = query, params, opts, original_state) do
{mod, sock} = original_state.socket
:ok = setopts(mod, sock, active: false)
tmp_state = %{original_state | database: Keyword.get(opts, :database, original_state.database)}
with {:ok, reply, tmp_state} <- handle_execute(action, extra, params, opts, tmp_state) do
:ok = setopts(mod, sock, active: :once)
{:ok, reply, Map.put(tmp_state, :database, original_state.database)}
{:ok, query, reply, Map.put(tmp_state, :database, original_state.database)}
end
end

Expand Down Expand Up @@ -313,6 +372,9 @@ defmodule Mongo.Protocol do
end
end

@doc """
DBConnection callback
"""
def ping(%{wire_version: wire_version, socket: {mod, sock}} = s) do
{:ok, active} = getopts(mod, sock, [:active])
:ok = setopts(mod, sock, [active: false])
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ defmodule Mongo.Topology do
server_description,
self(),
@heartbeat_frequency_ms,
Keyword.put(connopts, :pool, DBConnection.Connection)
Keyword.put(connopts, :pool, DBConnection.ConnectionPool)
]

:ok = Mongo.Events.notify(%ServerOpeningEvent{address: address, topology_pid: self()})
Expand Down
33 changes: 18 additions & 15 deletions test/mongo/connection_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Mongo.ConnectionTest do
use MongoTest.Case, async: true
import ExUnit.CaptureLog
alias Mongo

defp connect do
Expand Down Expand Up @@ -81,10 +82,10 @@ defmodule Mongo.ConnectionTest do
username: "mongodb_user", password: "wrong",
backoff_type: :stop]

capture_log fn ->
assert capture_log(fn ->
assert {:ok, pid} = Mongo.start_link(opts)
assert_receive {:EXIT, ^pid, {%Mongo.Error{code: 18}, _}}
end
assert_receive {:EXIT, ^pid, :killed}, 5000
end) =~ "(Mongo.Error) auth failed for user mongodb_user"
end

test "auth wrong on db" do
Expand All @@ -94,10 +95,10 @@ defmodule Mongo.ConnectionTest do
username: "mongodb_admin_user", password: "wrong",
backoff_type: :stop, auth_source: "admin_test"]

capture_log fn ->
assert capture_log(fn ->
assert {:ok, pid} = Mongo.start_link(opts)
assert_receive {:EXIT, ^pid, {%Mongo.Error{code: 18}, _}}
end
assert_receive {:EXIT, ^pid, :killed}, 5000
end) =~ "(Mongo.Error) auth failed for user mongodb_admin_user"
end

test "insert_one flags" do
Expand Down Expand Up @@ -176,15 +177,17 @@ defmodule Mongo.ConnectionTest do
end

test "auth connection leak" do
# sometimes the function tcp_count() returns 1, so the test fails.
# maybe it is a good idea to wait a second before counting
:timer.sleep(1000)
assert tcp_count() == 0
Enum.each(1..10, fn _ ->
connect_auth_invalid()
capture_log(fn ->
# sometimes the function tcp_count() returns 1, so the test fails.
# maybe it is a good idea to wait a second before counting
:timer.sleep(1000)
assert tcp_count() == 0
Enum.each(1..10, fn _ ->
connect_auth_invalid()
end)
:timer.sleep(1000)
# there should be 10 connections with connection_type: :monitor
assert tcp_count() == 10
end)
:timer.sleep(1000)
# there should be 10 connections with connection_type: :monitor
assert tcp_count() == 10
end
end
8 changes: 1 addition & 7 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,8 @@ defmodule MongoTest.Case do
end
end

def capture_log(fun) do
Logger.remove_backend(:console)
fun.()
Logger.add_backend(:console, flush: true)
end

defmacro unique_name do
{function, _arity} = __CALLER__.function
"#{__CALLER__.module}.#{function}"
end
end
end

0 comments on commit 63c20ff

Please sign in to comment.