Skip to content

Commit

Permalink
Ensure that API/JSON-RPC messages in the same session are processed a…
Browse files Browse the repository at this point in the history
…nd not stalled

This basically drops the "corked" implementation which just stalled the
TLS IO polling after some requests. If you need sort of rate limiting
for these events, use an external TLS proxy which terminates that in front
of Icinga.

fixes #6635
  • Loading branch information
Michael Friedrich committed Nov 12, 2018
1 parent ec93937 commit 46ed013
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 45 deletions.
10 changes: 0 additions & 10 deletions lib/base/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,6 @@ bool Stream::WaitForData(int timeout)
return IsDataAvailable() || IsEof();
}

void Stream::SetCorked(bool corked)
{
m_Corked = corked;
}

bool Stream::IsCorked() const
{
return m_Corked;
}

static void StreamDummyCallback()
{ }

Expand Down
5 changes: 0 additions & 5 deletions lib/base/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ class Stream : public Object
bool WaitForData();
bool WaitForData(int timeout);

virtual void SetCorked(bool corked);
bool IsCorked() const;

virtual bool SupportsWaiting() const;

virtual bool IsDataAvailable() const;
Expand All @@ -146,8 +143,6 @@ class Stream : public Object

boost::mutex m_Mutex;
boost::condition_variable m_CV;

bool m_Corked{false};
};

}
Expand Down
22 changes: 3 additions & 19 deletions lib/base/tlsstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,12 @@ void TlsStream::OnEvent(int revents)
char buffer[64 * 1024];

if (m_CurrentAction == TlsActionNone) {
bool corked = IsCorked();
if (!corked && (revents & (POLLIN | POLLERR | POLLHUP)))
if (revents & (POLLIN | POLLERR | POLLHUP))
m_CurrentAction = TlsActionRead;
else if (m_SendQ->GetAvailableBytes() > 0 && (revents & POLLOUT))
m_CurrentAction = TlsActionWrite;
else {
if (corked)
ChangeEvents(0);
else
ChangeEvents(POLLIN);
ChangeEvents(POLLIN);

return;
}
Expand Down Expand Up @@ -289,7 +285,7 @@ void TlsStream::OnEvent(int revents)

lock.unlock();

while (!IsCorked() && m_RecvQ->IsDataAvailable() && IsHandlingEvents())
while (m_RecvQ->IsDataAvailable() && IsHandlingEvents())
SignalDataAvailable();
}

Expand Down Expand Up @@ -428,18 +424,6 @@ bool TlsStream::IsDataAvailable() const
return m_RecvQ->GetAvailableBytes() > 0;
}

void TlsStream::SetCorked(bool corked)
{
Stream::SetCorked(corked);

boost::mutex::scoped_lock lock(m_Mutex);

if (corked)
m_CurrentAction = TlsActionNone;
else
ChangeEvents(POLLIN | POLLOUT);
}

Socket::Ptr TlsStream::GetSocket() const
{
return m_Socket;
Expand Down
2 changes: 0 additions & 2 deletions lib/base/tlsstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ class TlsStream final : public Stream, private SocketEvents
bool SupportsWaiting() const override;
bool IsDataAvailable() const override;

void SetCorked(bool corked) override;

bool IsVerifyOK() const;
String GetVerifyError() const;

Expand Down
5 changes: 0 additions & 5 deletions lib/remote/httpserverconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ void HttpServerConnection::ProcessMessageAsync(HttpRequest& request, HttpRespons

response.Finish();
m_PendingRequests--;
m_Stream->SetCorked(false);
}

void HttpServerConnection::DataAvailableHandler()
Expand All @@ -354,8 +353,6 @@ void HttpServerConnection::DataAvailableHandler()
if (!m_Stream->IsEof()) {
boost::recursive_mutex::scoped_lock lock(m_DataHandlerMutex);

m_Stream->SetCorked(true);

try {
while (ProcessMessage())
; /* empty loop body */
Expand All @@ -366,8 +363,6 @@ void HttpServerConnection::DataAvailableHandler()
close = true;
}

m_RequestQueue.Enqueue(std::bind(&Stream::SetCorked, m_Stream, false));

/* Request finished, decide whether to explicitly close the connection. */
if (m_CurrentRequest.ProtocolVersion == HttpVersion10 ||
m_CurrentRequest.Headers->Get("connection") == "close") {
Expand Down
4 changes: 0 additions & 4 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,6 @@ void JsonRpcConnection::DataAvailableHandler()
if (!m_Stream->IsEof()) {
boost::mutex::scoped_lock lock(m_DataHandlerMutex);

m_Stream->SetCorked(true);

try {
while (ProcessMessage())
; /* empty loop body */
Expand All @@ -290,8 +288,6 @@ void JsonRpcConnection::DataAvailableHandler()

return;
}

l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&Stream::SetCorked, m_Stream, false));
} else
close = true;

Expand Down

0 comments on commit 46ed013

Please sign in to comment.