Skip to content

Commit

Permalink
[ETCM-266]-replaced-rate-limiter-built-on-twitter (#873)
Browse files Browse the repository at this point in the history
[ETCM-266]-replaced-rate-limiter-built-on-twitter
  • Loading branch information
Dmitry Voronov authored Jan 1, 2021
1 parent c069d08 commit 84a2f9b
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 137 deletions.
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ lazy val node = {
Dependencies.cats,
Dependencies.monix,
Dependencies.network,
Dependencies.twitterUtilCollection,
Dependencies.crypto,
Dependencies.scopt,
Dependencies.logging,
Expand Down
2 changes: 0 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ object Dependencies {
"org.codehaus.janino" % "janino" % "3.1.2"
)

val twitterUtilCollection = Seq("com.twitter" %% "util-collection" % "19.1.0")

val crypto = Seq("org.bouncycastle" % "bcprov-jdk15on" % "1.66")

val scopt = Seq("com.github.scopt" % "scopt_2.12" % "3.7.1")
Expand Down
64 changes: 0 additions & 64 deletions repo.nix
Original file line number Diff line number Diff line change
Expand Up @@ -1065,54 +1065,6 @@
url = "https://repo1.maven.org/maven2/com/trueaccord/scalapb/scalapb-runtime_2.12/0.6.6/scalapb-runtime_2.12-0.6.6.pom";
sha256 = "1D151CC97E7A94E1795A66538692E047468C5D7E5491EC709A59F8D7B7ABB039";
};
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-javadoc.jar";
sha256 = "3400584E84FE765101EE84DDE208CBB4AECAE33A2093ED989AEBD802916F9BEB";
};
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-sources.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-sources.jar";
sha256 = "1FAAC80D74C5E99B30A32C81D5F8A675F5DA1889584893822A720E7C044BF2DF";
};
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.jar";
sha256 = "5DF5983BB58E545CACD771504DC32A83CE27D4B2ABC45B3041ACD4F74ECF4418";
};
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.pom" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.pom";
sha256 = "D2155349209C1EC451F08FE3A4ECE7A916B118E46F3492A81FC24B7B90E9EA7E";
};
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-javadoc.jar";
sha256 = "0B1ACC67A5C6C8281FF0322899E0E1F3C28516094D166412E2DBF0C960E94C8E";
};
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-sources.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-sources.jar";
sha256 = "43F4E01682C3517DCA2CDB15678419CEA6C904886231C5507552AA996B48439A";
};
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.jar";
sha256 = "E91061D0EC00F3573AA1311079E32A1F4A56D09BDE0D52715E4C035B2156AF52";
};
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.pom" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.pom";
sha256 = "0D0ECBD7D803EB1CC5337805A07242E87D7CD3C371FC67C243B3D68615E0F4CB";
};
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-javadoc.jar";
sha256 = "87695034BFFD441D11ED0B3672F412795CE1B9F5770CF2E882281623B781F708";
};
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-sources.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-sources.jar";
sha256 = "FD71104B195EDE70D78FD38D49B0F8B4419C33EF59D7E23592272CF68538CEF9";
};
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.jar" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.jar";
sha256 = "B1516320F07264AE2BE860F4C74FA2DF5EF8BD349C68D082D8EAECDFCF77F766";
};
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.pom" = {
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.pom";
sha256 = "17DB0C4F7A9C79E6279A8A4D6CAAEAC2DF889FDD2D3BA002E648F436C34FC88B";
};
"nix-public/com/typesafe/akka/akka-actor_2.12/2.5.23/akka-actor_2.12-2.5.23-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/com/typesafe/akka/akka-actor_2.12/2.5.23/akka-actor_2.12-2.5.23-javadoc.jar";
sha256 = "5C129DD97237DAB58EC6FBA946978916057BB6BBC414113BE8083ECD68A889E6";
Expand Down Expand Up @@ -2609,22 +2561,6 @@
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-java8-compat_2.12/0.8.0/scala-java8-compat_2.12-0.8.0.pom";
sha256 = "49E2711154CE9BA76962E043F4F8A6D0B890CD9E2FC6D0E99C3A0E6AD6C1950E";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-javadoc.jar";
sha256 = "228DA077BAEB60BB35E612780FB805126B0F88F9319AFDEA60E3BABEBC798BC2";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-sources.jar" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-sources.jar";
sha256 = "CB4BA7B7E598530FAEC863E5069864A28268EE4C636B0C46443884DCC4E07AC6";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.jar" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.jar";
sha256 = "282C78D064D3E8F09B3663190D9494B85E0BB7D96B0DA05994FE994384D96111";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.pom" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.pom";
sha256 = "B512704E22EEF743D6288DED94943DFE78F9BB0636CCB5AC1919364BF0B4EF2A";
};
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.1.2/scala-parser-combinators_2.12-1.1.2-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.1.2/scala-parser-combinators_2.12-1.1.2-javadoc.jar";
sha256 = "7B7C26DC48B443E1268F8235AE009409C26AC80D976E313B4F3F7740C227AE59";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.json4s.native.Serialization
import org.json4s.{DefaultFormats, JInt, native}
import scala.concurrent.duration.{FiniteDuration, _}

trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
trait JsonRpcHttpServer extends Json4sSupport with Logger {
val jsonRpcController: JsonRpcBaseController
val jsonRpcHealthChecker: JsonRpcHealthChecker
val config: JsonRpcHttpServerConfig
Expand Down Expand Up @@ -54,37 +54,39 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
}
.result()

protected val rateLimit = new RateLimit(config.rateLimit)

val route: Route = cors(corsSettings) {
(path("healthcheck") & pathEndOrSingleSlash & get) {
handleHealthcheck()
} ~ (path("buildinfo") & pathEndOrSingleSlash & get) {
handleBuildInfo()
} ~ (pathEndOrSingleSlash & post) {
(extractClientIP & entity(as[JsonRpcRequest])) { (clientAddress, request) =>
handleRequest(clientAddress, request)
} ~ entity(as[Seq[JsonRpcRequest]]) { request =>
handleBatchRequest(request)
// TODO: maybe rate-limit this one too?
entity(as[JsonRpcRequest]) {
case statusReq if statusReq.method == FaucetJsonRpcController.Status =>
handleRequest(statusReq)
case jsonReq =>
rateLimit {
handleRequest(jsonReq)
}
// TODO: separate paths for single and multiple requests
// TODO: to prevent repeated body and json parsing
} ~ entity(as[Seq[JsonRpcRequest]]) {
case _ if config.rateLimit.enabled =>
complete(StatusCodes.MethodNotAllowed, JsonRpcError.MethodNotFound)
case reqSeq =>
complete {
Task
.traverse(reqSeq)(request => jsonRpcController.handleRequest(request))
.runToFuture
}
}
}
}

def handleRequest(clientAddress: RemoteAddress, request: JsonRpcRequest): StandardRoute = {
//FIXME: FaucetJsonRpcController.Status should be part of a Healthcheck request or alike.
// As a temporary solution, it is being excluded from the Rate Limit.
if (config.rateLimit.enabled && request.method != FaucetJsonRpcController.Status) {
handleRateLimitedRequest(clientAddress, request)
} else complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
}

def handleRateLimitedRequest(clientAddress: RemoteAddress, request: JsonRpcRequest): StandardRoute = {
if (isBelowRateLimit(clientAddress))
complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
else {
log.warn(s"Request limit exceeded for ip ${clientAddress.toIP.getOrElse("unknown")}")
complete(
(StatusCodes.TooManyRequests, JsonRpcError.RateLimitError(config.rateLimit.minRequestInterval.toSeconds))
)
}
def handleRequest(request: JsonRpcRequest): StandardRoute = {
complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
}

private def handleResponse(f: Task[JsonRpcResponse]): Task[(StatusCode, JsonRpcResponse)] = f map { jsonRpcResponse =>
Expand Down Expand Up @@ -128,15 +130,6 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
)
}

private def handleBatchRequest(requests: Seq[JsonRpcRequest]) = {
if (!config.rateLimit.enabled) {
complete {
Task
.traverse(requests)(request => jsonRpcController.handleRequest(request))
.runToFuture
}
} else complete(StatusCodes.MethodNotAllowed, JsonRpcError.MethodNotFound)
}
}

object JsonRpcHttpServer extends Logger {
Expand All @@ -160,12 +153,15 @@ object JsonRpcHttpServer extends Logger {
}

trait RateLimitConfig {
// TODO: Move the rateLimit.enabled setting upwards:
// TODO: If we don't need to limit the request rate at all - we don't have to define the other settings
val enabled: Boolean
val minRequestInterval: FiniteDuration
val latestTimestampCacheSize: Int
}

object RateLimitConfig {
// TODO: Use pureconfig
def apply(rateLimitConfig: TypesafeConfig): RateLimitConfig =
new RateLimitConfig {
override val enabled: Boolean = rateLimitConfig.getBoolean("enabled")
Expand Down
74 changes: 60 additions & 14 deletions src/main/scala/io/iohk/ethereum/jsonrpc/server/http/RateLimit.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,71 @@
package io.iohk.ethereum.jsonrpc.server.http

import java.time.Clock
import java.time.Duration

import akka.http.scaladsl.model.RemoteAddress
import com.twitter.util.LruMap
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.JsonRpcHttpServerConfig
import akka.NotUsed
import akka.http.scaladsl.model.{RemoteAddress, StatusCodes}
import akka.http.scaladsl.server.{Directive0, Route}
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.RateLimitConfig
import akka.http.scaladsl.server.Directives._
import com.google.common.base.Ticker
import com.google.common.cache.CacheBuilder
import io.iohk.ethereum.jsonrpc.JsonRpcError
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import io.iohk.ethereum.jsonrpc.serialization.JsonSerializers
import org.json4s.{DefaultFormats, Formats, Serialization, native}

trait RateLimit {
class RateLimit(config: RateLimitConfig) extends Directive0 with Json4sSupport {

val config: JsonRpcHttpServerConfig
private implicit val serialization: Serialization = native.Serialization
private implicit val formats: Formats = DefaultFormats + JsonSerializers.RpcErrorJsonSerializer

val latestRequestTimestamps = new LruMap[RemoteAddress, Long](config.rateLimit.latestTimestampCacheSize)
private[this] lazy val minInterval = config.minRequestInterval.toSeconds

val clock: Clock = Clock.systemUTC()
private[this] lazy val lru = {
val nanoDuration = config.minRequestInterval.toNanos
val javaDuration = Duration.ofNanos(nanoDuration)
val ticker: Ticker = new Ticker {
override def read(): Long = getCurrentTimeNanos
}
CacheBuilder
.newBuilder()
.weakKeys()
.expireAfterAccess(javaDuration)
.ticker(ticker)
.build[RemoteAddress, NotUsed]()
}

private[this] def isBelowRateLimit(ip: RemoteAddress): Boolean = {
var exists = true
lru.get(
ip,
() => {
exists = false
NotUsed
}
)
exists
}

def isBelowRateLimit(clientAddress: RemoteAddress): Boolean = {
val timeMillis = clock.instant().toEpochMilli
val latestRequestTimestamp = latestRequestTimestamps.getOrElse(clientAddress, 0L)
// Override this to test
protected def getCurrentTimeNanos: Long = System.nanoTime()

val response = latestRequestTimestamp + config.rateLimit.minRequestInterval.toMillis < timeMillis
if (response) latestRequestTimestamps.put(clientAddress, timeMillis)
response
// Such algebras prevent if-elseif-else boilerplate in the JsonRPCServer code
// It is also guaranteed that:
// 1) no IP address is extracted unless config.enabled is true
// 2) no LRU is created unless config.enabled is true
// 3) cache is accessed only once (using get)
override def tapply(f: Unit => Route): Route = {
if (config.enabled) {
extractClientIP { ip =>
if (isBelowRateLimit(ip)) {
val err = JsonRpcError.RateLimitError(minInterval)
complete((StatusCodes.TooManyRequests, err))
} else {
f.apply(())
}
}
} else f.apply(())
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.iohk.ethereum.jsonrpc.server.http

import java.net.InetAddress
import java.time.{Clock, Instant, ZoneId}
import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
Expand Down Expand Up @@ -215,7 +214,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
status shouldEqual StatusCodes.TooManyRequests
}

fakeClock.advanceTime(2 * serverConfigWithRateLimit.rateLimit.minRequestInterval.toMillis)
mockJsonRpcHttpServerWithRateLimit.mockedTime = 50000000L

postRequest ~> Route.seal(mockJsonRpcHttpServerWithRateLimit.route) ~> check {
status shouldEqual StatusCodes.OK
Expand Down Expand Up @@ -382,7 +381,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout

val rateLimitConfig = new RateLimitConfig {
override val enabled: Boolean = false
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(5, TimeUnit.SECONDS)
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(20, TimeUnit.MILLISECONDS)
override val latestTimestampCacheSize: Int = 1024
}

Expand All @@ -397,7 +396,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout

val rateLimitEnabledConfig = new RateLimitConfig {
override val enabled: Boolean = true
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(5, TimeUnit.SECONDS)
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(20, TimeUnit.MILLISECONDS)
override val latestTimestampCacheSize: Int = 1024
}

Expand All @@ -412,31 +411,27 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout

val mockJsonRpcController = mock[JsonRpcController]
val mockJsonRpcHealthChecker = mock[JsonRpcHealthChecker]
val fakeClock = new FakeClock

val mockJsonRpcHttpServer = new FakeJsonRpcHttpServer(
jsonRpcController = mockJsonRpcController,
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
config = serverConfig,
cors = serverConfig.corsAllowedOrigins,
testClock = fakeClock
cors = serverConfig.corsAllowedOrigins
)

val corsAllowedOrigin = HttpOrigin("http://localhost:3333")
val mockJsonRpcHttpServerWithCors = new FakeJsonRpcHttpServer(
jsonRpcController = mockJsonRpcController,
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
config = serverConfig,
cors = HttpOriginMatcher(corsAllowedOrigin),
testClock = fakeClock
cors = HttpOriginMatcher(corsAllowedOrigin)
)

val mockJsonRpcHttpServerWithRateLimit = new FakeJsonRpcHttpServer(
jsonRpcController = mockJsonRpcController,
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
config = serverConfigWithRateLimit,
cors = serverConfigWithRateLimit.corsAllowedOrigins,
testClock = fakeClock
cors = serverConfigWithRateLimit.corsAllowedOrigins
)
}
}
Expand Down Expand Up @@ -467,27 +462,17 @@ class FakeJsonRpcHttpServer(
val jsonRpcController: JsonRpcBaseController,
val jsonRpcHealthChecker: JsonRpcHealthChecker,
val config: JsonRpcHttpServerConfig,
val cors: HttpOriginMatcher,
val testClock: Clock
val cors: HttpOriginMatcher
)(implicit val actorSystem: ActorSystem)
extends JsonRpcHttpServer
with Logger {
def run(): Unit = ()
override def corsAllowedOrigins: HttpOriginMatcher = cors
override val clock = testClock
}

class FakeClock extends Clock {

var time: Instant = Instant.now()
var mockedTime:Long = 0L

def advanceTime(seconds: Long): Unit = {
time = time.plusSeconds(seconds)
override protected val rateLimit: RateLimit = new RateLimit(config.rateLimit) {
override protected def getCurrentTimeNanos: Long = FakeJsonRpcHttpServer.this.mockedTime
}

override def getZone: ZoneId = ZoneId.systemDefault()

override def withZone(zone: ZoneId): Clock = Clock.fixed(time, getZone)

override def instant(): Instant = time
}

0 comments on commit 84a2f9b

Please sign in to comment.