Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETCM-266]-replaced-rate-limiter-built-on-twitter #873

Merged
merged 15 commits into from
Jan 1, 2021
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ lazy val node = {
Dependencies.cats,
Dependencies.monix,
Dependencies.network,
Dependencies.twitterUtilCollection,
Dependencies.crypto,
Dependencies.scopt,
Dependencies.logging,
Expand Down
2 changes: 0 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ object Dependencies {
"org.codehaus.janino" % "janino" % "3.1.2"
)

val twitterUtilCollection = Seq("com.twitter" %% "util-collection" % "19.1.0")

val crypto = Seq("org.bouncycastle" % "bcprov-jdk15on" % "1.66")

val scopt = Seq("com.github.scopt" % "scopt_2.12" % "3.7.1")
Expand Down
64 changes: 0 additions & 64 deletions repo.nix
Original file line number Diff line number Diff line change
Expand Up @@ -1065,54 +1065,6 @@
url = "https://repo1.maven.org/maven2/com/trueaccord/scalapb/scalapb-runtime_2.12/0.6.6/scalapb-runtime_2.12-0.6.6.pom";
sha256 = "1D151CC97E7A94E1795A66538692E047468C5D7E5491EC709A59F8D7B7ABB039";
};
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-javadoc.jar";
sha256 = "3400584E84FE765101EE84DDE208CBB4AECAE33A2093ED989AEBD802916F9BEB";
};
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-sources.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-sources.jar";
sha256 = "1FAAC80D74C5E99B30A32C81D5F8A675F5DA1889584893822A720E7C044BF2DF";
};
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.jar";
sha256 = "5DF5983BB58E545CACD771504DC32A83CE27D4B2ABC45B3041ACD4F74ECF4418";
};
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.pom" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.pom";
sha256 = "D2155349209C1EC451F08FE3A4ECE7A916B118E46F3492A81FC24B7B90E9EA7E";
};
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-javadoc.jar";
sha256 = "0B1ACC67A5C6C8281FF0322899E0E1F3C28516094D166412E2DBF0C960E94C8E";
};
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-sources.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-sources.jar";
sha256 = "43F4E01682C3517DCA2CDB15678419CEA6C904886231C5507552AA996B48439A";
};
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.jar";
sha256 = "E91061D0EC00F3573AA1311079E32A1F4A56D09BDE0D52715E4C035B2156AF52";
};
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.pom" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.pom";
sha256 = "0D0ECBD7D803EB1CC5337805A07242E87D7CD3C371FC67C243B3D68615E0F4CB";
};
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-javadoc.jar";
sha256 = "87695034BFFD441D11ED0B3672F412795CE1B9F5770CF2E882281623B781F708";
};
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-sources.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-sources.jar";
sha256 = "FD71104B195EDE70D78FD38D49B0F8B4419C33EF59D7E23592272CF68538CEF9";
};
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.jar";
sha256 = "B1516320F07264AE2BE860F4C74FA2DF5EF8BD349C68D082D8EAECDFCF77F766";
};
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.pom" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.pom";
sha256 = "17DB0C4F7A9C79E6279A8A4D6CAAEAC2DF889FDD2D3BA002E648F436C34FC88B";
};
"nix-public/com/typesafe/akka/akka-actor_2.12/2.5.23/akka-actor_2.12-2.5.23-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/com/typesafe/akka/akka-actor_2.12/2.5.23/akka-actor_2.12-2.5.23-javadoc.jar";
sha256 = "5C129DD97237DAB58EC6FBA946978916057BB6BBC414113BE8083ECD68A889E6";
Expand Down Expand Up @@ -2609,22 +2561,6 @@
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-java8-compat_2.12/0.8.0/scala-java8-compat_2.12-0.8.0.pom";
sha256 = "49E2711154CE9BA76962E043F4F8A6D0B890CD9E2FC6D0E99C3A0E6AD6C1950E";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-javadoc.jar";
sha256 = "228DA077BAEB60BB35E612780FB805126B0F88F9319AFDEA60E3BABEBC798BC2";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-sources.jar" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-sources.jar";
sha256 = "CB4BA7B7E598530FAEC863E5069864A28268EE4C636B0C46443884DCC4E07AC6";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.jar" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.jar";
sha256 = "282C78D064D3E8F09B3663190D9494B85E0BB7D96B0DA05994FE994384D96111";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.pom" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.pom";
sha256 = "B512704E22EEF743D6288DED94943DFE78F9BB0636CCB5AC1919364BF0B4EF2A";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.1.2/scala-parser-combinators_2.12-1.1.2-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.1.2/scala-parser-combinators_2.12-1.1.2-javadoc.jar";
sha256 = "7B7C26DC48B443E1268F8235AE009409C26AC80D976E313B4F3F7740C227AE59";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.json4s.native.Serialization
import org.json4s.{DefaultFormats, JInt, native}
import scala.concurrent.duration.{FiniteDuration, _}

trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
trait JsonRpcHttpServer extends Json4sSupport with Logger {
val jsonRpcController: JsonRpcBaseController
val jsonRpcHealthChecker: JsonRpcHealthChecker
val config: JsonRpcHttpServerConfig
Expand Down Expand Up @@ -54,37 +54,39 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
}
.result()

protected val rateLimit = new RateLimit(config.rateLimit)

val route: Route = cors(corsSettings) {
(path("healthcheck") & pathEndOrSingleSlash & get) {
handleHealthcheck()
} ~ (path("buildinfo") & pathEndOrSingleSlash & get) {
handleBuildInfo()
} ~ (pathEndOrSingleSlash & post) {
(extractClientIP & entity(as[JsonRpcRequest])) { (clientAddress, request) =>
handleRequest(clientAddress, request)
} ~ entity(as[Seq[JsonRpcRequest]]) { request =>
handleBatchRequest(request)
// TODO: maybe rate-limit this one too?
entity(as[JsonRpcRequest]) {
case statusReq if statusReq.method == FaucetJsonRpcController.Status =>
dmitry-worker marked this conversation as resolved.
Show resolved Hide resolved
handleRequest(statusReq)
case jsonReq =>
rateLimit {
handleRequest(jsonReq)
}
// TODO: separate paths for single and multiple requests
// TODO: to prevent repeated body and json parsing
} ~ entity(as[Seq[JsonRpcRequest]]) {
case _ if config.rateLimit.enabled =>
complete(StatusCodes.MethodNotAllowed, JsonRpcError.MethodNotFound)
case reqSeq =>
complete {
Task
.traverse(reqSeq)(request => jsonRpcController.handleRequest(request))
.runToFuture
}
}
}
}

def handleRequest(clientAddress: RemoteAddress, request: JsonRpcRequest): StandardRoute = {
//FIXME: FaucetJsonRpcController.Status should be part of a Healthcheck request or alike.
// As a temporary solution, it is being excluded from the Rate Limit.
if (config.rateLimit.enabled && request.method != FaucetJsonRpcController.Status) {
handleRateLimitedRequest(clientAddress, request)
} else complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
}

def handleRateLimitedRequest(clientAddress: RemoteAddress, request: JsonRpcRequest): StandardRoute = {
if (isBelowRateLimit(clientAddress))
complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
else {
log.warn(s"Request limit exceeded for ip ${clientAddress.toIP.getOrElse("unknown")}")
complete(
(StatusCodes.TooManyRequests, JsonRpcError.RateLimitError(config.rateLimit.minRequestInterval.toSeconds))
)
}
def handleRequest(request: JsonRpcRequest): StandardRoute = {
complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
}

private def handleResponse(f: Task[JsonRpcResponse]): Task[(StatusCode, JsonRpcResponse)] = f map { jsonRpcResponse =>
Expand Down Expand Up @@ -128,15 +130,6 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
)
}

private def handleBatchRequest(requests: Seq[JsonRpcRequest]) = {
if (!config.rateLimit.enabled) {
complete {
Task
.traverse(requests)(request => jsonRpcController.handleRequest(request))
.runToFuture
}
} else complete(StatusCodes.MethodNotAllowed, JsonRpcError.MethodNotFound)
}
}

object JsonRpcHttpServer extends Logger {
Expand All @@ -160,12 +153,15 @@ object JsonRpcHttpServer extends Logger {
}

trait RateLimitConfig {
// TODO: Move the rateLimit.enabled setting upwards:
dmitry-worker marked this conversation as resolved.
Show resolved Hide resolved
// TODO: If we don't need to limit the request rate at all - we don't have to define the other settings
val enabled: Boolean
val minRequestInterval: FiniteDuration
val latestTimestampCacheSize: Int
}

object RateLimitConfig {
// TODO: Use pureconfig
def apply(rateLimitConfig: TypesafeConfig): RateLimitConfig =
new RateLimitConfig {
override val enabled: Boolean = rateLimitConfig.getBoolean("enabled")
Expand Down
74 changes: 60 additions & 14 deletions src/main/scala/io/iohk/ethereum/jsonrpc/server/http/RateLimit.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,71 @@
package io.iohk.ethereum.jsonrpc.server.http

import java.time.Clock
import java.time.Duration

import akka.http.scaladsl.model.RemoteAddress
import com.twitter.util.LruMap
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.JsonRpcHttpServerConfig
import akka.NotUsed
import akka.http.scaladsl.model.{RemoteAddress, StatusCodes}
import akka.http.scaladsl.server.{Directive0, Route}
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.RateLimitConfig
import akka.http.scaladsl.server.Directives._
import com.google.common.base.Ticker
import com.google.common.cache.CacheBuilder
import io.iohk.ethereum.jsonrpc.JsonRpcError
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import io.iohk.ethereum.jsonrpc.serialization.JsonSerializers
import org.json4s.{DefaultFormats, Formats, Serialization, native}

trait RateLimit {
class RateLimit(config: RateLimitConfig) extends Directive0 with Json4sSupport {

val config: JsonRpcHttpServerConfig
private implicit val serialization: Serialization = native.Serialization
private implicit val formats: Formats = DefaultFormats + JsonSerializers.RpcErrorJsonSerializer

val latestRequestTimestamps = new LruMap[RemoteAddress, Long](config.rateLimit.latestTimestampCacheSize)
private[this] lazy val minInterval = config.minRequestInterval.toSeconds

val clock: Clock = Clock.systemUTC()
private[this] lazy val lru = {
val nanoDuration = config.minRequestInterval.toNanos
val javaDuration = Duration.ofNanos(nanoDuration)
val ticker: Ticker = new Ticker {
override def read(): Long = getCurrentTimeNanos
}
CacheBuilder
.newBuilder()
.weakKeys()
.expireAfterAccess(javaDuration)
.ticker(ticker)
.build[RemoteAddress, NotUsed]()
}

private[this] def isBelowRateLimit(ip: RemoteAddress): Boolean = {
var exists = true
lru.get(
ip,
() => {
exists = false
NotUsed
}
)
exists
}

def isBelowRateLimit(clientAddress: RemoteAddress): Boolean = {
val timeMillis = clock.instant().toEpochMilli
val latestRequestTimestamp = latestRequestTimestamps.getOrElse(clientAddress, 0L)
// Override this to test
protected def getCurrentTimeNanos: Long = System.nanoTime()

val response = latestRequestTimestamp + config.rateLimit.minRequestInterval.toMillis < timeMillis
if (response) latestRequestTimestamps.put(clientAddress, timeMillis)
response
// Such algebras prevent if-elseif-else boilerplate in the JsonRPCServer code
// It is also guaranteed that:
// 1) no IP address is extracted unless config.enabled is true
// 2) no LRU is created unless config.enabled is true
// 3) cache is accessed only once (using get)
override def tapply(f: Unit => Route): Route = {
if (config.enabled) {
extractClientIP { ip =>
if (isBelowRateLimit(ip)) {
val err = JsonRpcError.RateLimitError(minInterval)
complete((StatusCodes.TooManyRequests, err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a minor comment, we can reduce this line:
complete((StatusCodes.TooManyRequests, JsonRpcError.RateLimitError(minInterval)))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it was inline in the beginning, but I've decided that the code is quite unreadable :)
IMHO this will not affect performance and the compiler will inline it anyway because it is used once, immediately after creation. Maybe it would be better to construct a kind of builder for responses instead of serializable Tuples. WDYT?

} else {
f.apply(())
}
}
} else f.apply(())
}

biandratti marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.iohk.ethereum.jsonrpc.server.http

import java.net.InetAddress
import java.time.{Clock, Instant, ZoneId}
import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
Expand Down Expand Up @@ -215,7 +214,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
status shouldEqual StatusCodes.TooManyRequests
}

fakeClock.advanceTime(2 * serverConfigWithRateLimit.rateLimit.minRequestInterval.toMillis)
mockJsonRpcHttpServerWithRateLimit.mockedTime = 50000000L
dmitry-worker marked this conversation as resolved.
Show resolved Hide resolved

postRequest ~> Route.seal(mockJsonRpcHttpServerWithRateLimit.route) ~> check {
status shouldEqual StatusCodes.OK
Expand Down Expand Up @@ -382,7 +381,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout

val rateLimitConfig = new RateLimitConfig {
override val enabled: Boolean = false
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(5, TimeUnit.SECONDS)
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(20, TimeUnit.MILLISECONDS)
override val latestTimestampCacheSize: Int = 1024
}

Expand All @@ -397,7 +396,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout

val rateLimitEnabledConfig = new RateLimitConfig {
override val enabled: Boolean = true
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(5, TimeUnit.SECONDS)
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(20, TimeUnit.MILLISECONDS)
override val latestTimestampCacheSize: Int = 1024
}

Expand All @@ -412,31 +411,27 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout

val mockJsonRpcController = mock[JsonRpcController]
val mockJsonRpcHealthChecker = mock[JsonRpcHealthChecker]
val fakeClock = new FakeClock

val mockJsonRpcHttpServer = new FakeJsonRpcHttpServer(
jsonRpcController = mockJsonRpcController,
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
config = serverConfig,
cors = serverConfig.corsAllowedOrigins,
testClock = fakeClock
cors = serverConfig.corsAllowedOrigins
)

val corsAllowedOrigin = HttpOrigin("http://localhost:3333")
val mockJsonRpcHttpServerWithCors = new FakeJsonRpcHttpServer(
jsonRpcController = mockJsonRpcController,
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
config = serverConfig,
cors = HttpOriginMatcher(corsAllowedOrigin),
testClock = fakeClock
cors = HttpOriginMatcher(corsAllowedOrigin)
)

val mockJsonRpcHttpServerWithRateLimit = new FakeJsonRpcHttpServer(
jsonRpcController = mockJsonRpcController,
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
config = serverConfigWithRateLimit,
cors = serverConfigWithRateLimit.corsAllowedOrigins,
testClock = fakeClock
cors = serverConfigWithRateLimit.corsAllowedOrigins
)
}
}
Expand Down Expand Up @@ -467,27 +462,17 @@ class FakeJsonRpcHttpServer(
val jsonRpcController: JsonRpcBaseController,
val jsonRpcHealthChecker: JsonRpcHealthChecker,
val config: JsonRpcHttpServerConfig,
val cors: HttpOriginMatcher,
val testClock: Clock
val cors: HttpOriginMatcher
)(implicit val actorSystem: ActorSystem)
extends JsonRpcHttpServer
with Logger {
def run(): Unit = ()
override def corsAllowedOrigins: HttpOriginMatcher = cors
override val clock = testClock
}

class FakeClock extends Clock {

var time: Instant = Instant.now()
var mockedTime:Long = 0L

def advanceTime(seconds: Long): Unit = {
time = time.plusSeconds(seconds)
override protected val rateLimit: RateLimit = new RateLimit(config.rateLimit) {
override protected def getCurrentTimeNanos: Long = FakeJsonRpcHttpServer.this.mockedTime
}

override def getZone: ZoneId = ZoneId.systemDefault()

override def withZone(zone: ZoneId): Clock = Clock.fixed(time, getZone)

override def instant(): Instant = time
}