Skip to content

Commit

Permalink
[ETCM-531] Cache-based and thread-safe blacklist implementation (#921)
Browse files Browse the repository at this point in the history
* First draft of cache-based blacklist

* Fix logging output

* Fix FastSyncSpec

* Update nix-sbt sha

* Update nix-sbt sha

* Polish and add tests

* Rename BlackListId to BlacklistId

* Rework PeerListSupport a little bit

* Small cleanup

* [ETCM-531] Turn blacklist reasons into proper types, small improvements based on PR comments

* Make BlacklistReasonType a sealed trait

* Only log description for blacklist reason

* ETCM-531 renamed minute -> minutes

* ETCM-531 Reformat triggered by running sbt pp

* ETCM-531 Fix expiration after read
  • Loading branch information
1015bit authored Feb 16, 2021
1 parent e0e46ce commit 4d04689
Show file tree
Hide file tree
Showing 15 changed files with 449 additions and 93 deletions.
26 changes: 14 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -111,24 +111,26 @@ lazy val node = {
Seq(
Dependencies.akka,
Dependencies.akkaHttp,
Dependencies.json4s,
Dependencies.circe,
Dependencies.apacheCommons,
Dependencies.boopickle,
Dependencies.rocksDb,
Dependencies.enumeratum,
Dependencies.testing,
Dependencies.cats,
Dependencies.monix,
Dependencies.network,
Dependencies.circe,
Dependencies.cli,
Dependencies.crypto,
Dependencies.scopt,
Dependencies.dependencies,
Dependencies.enumeratum,
Dependencies.guava,
Dependencies.json4s,
Dependencies.kamon,
Dependencies.logging,
Dependencies.apacheCommons,
Dependencies.micrometer,
Dependencies.kamon,
Dependencies.monix,
Dependencies.network,
Dependencies.prometheus,
Dependencies.cli,
Dependencies.dependencies
Dependencies.rocksDb,
Dependencies.scaffeine,
Dependencies.scopt,
Dependencies.testing
).flatten ++ malletDeps
}

Expand Down
2 changes: 1 addition & 1 deletion nix/pkgs/mantis.nix
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ in sbt.mkDerivation {

# This sha represents the change dependencies of mantis.
# Update this sha whenever you change the dependencies
depsSha256 = "0gppwz6dvligrrgjmramyrm9723pwhg89cqfpxj22z2d86brwas2";
depsSha256 = "07iixw8va4zwpiln2zy2gr245z1ir4jd6pqgmkzhwnhw3mf5j28k";

# this is the command used to to create the fixed-output-derivation
depsWarmupCommand = "sbt compile --debug -Dnix=true";
Expand Down
16 changes: 14 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,19 @@ object Dependencies {
jline,
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2",
"org.scala-sbt.ipcsocket" % "ipcsocket" % "1.1.0",
"com.google.guava" % "guava" % "29.0-jre",
"org.xerial.snappy" % "snappy-java" % "1.1.7.7",
"org.web3j" % "core" % "5.0.0" % Test,
"io.vavr" % "vavr" % "1.0.0-alpha-3"
)

val guava: Seq[ModuleID] = {
val version = "30.1-jre"
Seq(
"com.google.guava" % "guava" % version,
"com.google.guava" % "guava-testlib" % version % "test"
)
}

val prometheus: Seq[ModuleID] = {
val provider = "io.prometheus"
val version = "0.9.0"
Expand All @@ -137,7 +144,7 @@ object Dependencies {
"com.google.code.findbugs" % "jsr305" % "3.0.2" % Optional,
provider % "micrometer-core" % version,
provider % "micrometer-registry-jmx" % version,
provider % "micrometer-registry-prometheus" % version,
provider % "micrometer-registry-prometheus" % version
)
}

Expand All @@ -153,4 +160,9 @@ object Dependencies {
val shapeless: Seq[ModuleID] = Seq(
"com.chuusai" %% "shapeless" % "2.3.3"
)

val scaffeine: Seq[ModuleID] = Seq(
"com.github.blemale" %% "scaffeine" % "4.0.2" % "compile"
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ import monix.eval.Task
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.Try
import io.iohk.ethereum.blockchain.sync.CacheBasedBlacklist
object FastSyncItSpecUtils {

class FakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig)
extends CommonFakePeer(peerName, fakePeerCustomConfig) {

lazy val validators = new MockValidatorsAlwaysSucceed

val maxSize = 1000
val blacklist = CacheBasedBlacklist.empty(maxSize)

lazy val fastSync = system.actorOf(
FastSync.props(
storagesInstance.storages.fastSyncStateStorage,
Expand All @@ -32,6 +36,7 @@ object FastSyncItSpecUtils {
validators,
peerEventBus,
etcPeerManager,
blacklist,
testSyncConfig,
system.scheduler
)
Expand Down
161 changes: 161 additions & 0 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package io.iohk.ethereum.blockchain.sync

import com.github.benmanes.caffeine.cache.Caffeine
import com.github.blemale.scaffeine.{Cache, Scaffeine}
import io.iohk.ethereum.utils.Logger

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import scala.jdk.DurationConverters._

import Blacklist._
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType.WrongBlockHeadersType
import io.iohk.ethereum.consensus.validators.std.StdBlockValidator.BlockError
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType

trait Blacklist {
def isBlacklisted(id: BlacklistId): Boolean
def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit
def remove(id: BlacklistId): Unit
def keys: Set[BlacklistId]
}

object Blacklist {
import BlacklistReason._
import BlacklistReasonType._

trait BlacklistId {
def value: String
}

sealed trait BlacklistReason {
def reasonType: BlacklistReasonType
def description: String
}
object BlacklistReason {
sealed trait BlacklistReasonType {
def code: Int
def name: String
}
object BlacklistReasonType {
case object WrongBlockHeadersType extends BlacklistReasonType {
val code: Int = 1
val name: String = "WrongBlockHeadersType"
}
case object BlockHeaderValidationFailedType extends BlacklistReasonType {
val code: Int = 2
val name: String = "BlockHeaderValidationFailed"
}
case object ErrorInBlockHeadersType extends BlacklistReasonType {
val code: Int = 3
val name: String = "ErrorInBlockHeaders"
}
case object EmptyBlockBodiesType extends BlacklistReasonType {
val code: Int = 4
val name: String = "EmptyBlockBodies"
}
case object BlockBodiesNotMatchingHeadersType extends BlacklistReasonType {
val code: Int = 5
val name: String = "BlockBodiesNotMatchingHeaders"
}
case object EmptyReceiptsType extends BlacklistReasonType {
val code: Int = 6
val name: String = "EmptyReceipts"
}
case object InvalidReceiptsType extends BlacklistReasonType {
val code: Int = 7
val name: String = "InvalidReceipts"
}
case object RequestFailedType extends BlacklistReasonType {
val code: Int = 8
val name: String = "RequestFailed"
}
case object PeerActorTerminatedType extends BlacklistReasonType {
val code: Int = 9
val name: String = "PeerActorTerminated"
}
}

case object WrongBlockHeaders extends BlacklistReason {
val reasonType: BlacklistReasonType = WrongBlockHeadersType
val description: String = "Wrong blockheaders response (empty or not chain forming)"
}
case object BlockHeaderValidationFailed extends BlacklistReason {
val reasonType: BlacklistReasonType = BlockHeaderValidationFailedType
val description: String = "Block header validation failed"
}
case object ErrorInBlockHeaders extends BlacklistReason {
val reasonType: BlacklistReasonType = ErrorInBlockHeadersType
val description: String = "Error in block headers response"
}
final case class EmptyBlockBodies(knownHashes: Seq[String]) extends BlacklistReason {
val reasonType: BlacklistReasonType = EmptyBlockBodiesType
val description: String = s"Got empty block bodies response for known hashes: $knownHashes"
}
case object BlockBodiesNotMatchingHeaders extends BlacklistReason {
val reasonType: BlacklistReasonType = BlockBodiesNotMatchingHeadersType
val description = "Block bodies not matching block headers"
}
final case class EmptyReceipts(knownHashes: Seq[String]) extends BlacklistReason {
val reasonType: BlacklistReasonType = EmptyReceiptsType
val description: String = s"Got empty receipts for known hashes: $knownHashes"
}
final case class InvalidReceipts(knownHashes: Seq[String], error: BlockError) extends BlacklistReason {
val reasonType: BlacklistReasonType = InvalidReceiptsType
val description: String = s"Got invalid receipts for known hashes: $knownHashes due to: $error"
}
final case class RequestFailed(error: String) extends BlacklistReason {
val reasonType: BlacklistReasonType = RequestFailedType
val description: String = s"Request failed with error: $error"
}
case object PeerActorTerminated extends BlacklistReason {
val reasonType: BlacklistReasonType = PeerActorTerminatedType
val description: String = "Peer actor terminated"
}
}
}

final case class CacheBasedBlacklist(cache: Cache[BlacklistId, BlacklistReasonType]) extends Blacklist with Logger {

import CacheBasedBlacklist._

override def isBlacklisted(id: BlacklistId): Boolean = cache.getIfPresent(id).isDefined

override def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit = {
log.info("Blacklisting peer [{}] for {}. Reason: {}", id, duration, reason.description)
cache.policy().expireVariably().toScala match {
case Some(varExpiration) => varExpiration.put(id, reason.reasonType, duration.toJava)
case None =>
log.warn(customExpirationError(id))
cache.put(id, reason.reasonType)
}
}
override def remove(id: BlacklistId): Unit = cache.invalidate(id)

override def keys: Set[BlacklistId] = cache.underlying.asMap().keySet().asScala.toSet
}

object CacheBasedBlacklist {

def customExpirationError(id: BlacklistId): String =
s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration."

def empty(maxSize: Int): CacheBasedBlacklist = {
val cache =
Scaffeine()
.expireAfter[BlacklistId, BlacklistReasonType](
create = (_, _) => 60.minutes,
update = (_, _, _) => 60.minutes,
read = (_, _, duration) => duration // read access should not change the expiration time
) // required to enable VarExpiration policy (i.e. set custom expiration time per element)
.maximumSize(
maxSize
) // uses Window TinyLfu eviction policy, see https://github.com/ben-manes/caffeine/wiki/Efficiency
.build[BlacklistId, BlacklistReasonType]()
CacheBasedBlacklist(cache)
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package io.iohk.ethereum.blockchain.sync

import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{Actor, ActorLogging, Cancellable, Scheduler}
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId

import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{Duration, FiniteDuration}

// will be removed once regular sync is switched to new blacklist implementation
trait BlacklistSupport {
selfActor: Actor with ActorLogging =>

Expand All @@ -14,9 +17,9 @@ trait BlacklistSupport {

protected val maxBlacklistedNodes = 1000

val blacklistedPeers = mutable.LinkedHashMap.empty[BlackListId, Cancellable]
val blacklistedPeers = mutable.LinkedHashMap.empty[BlacklistId, Cancellable]

def blacklist(blacklistId: BlackListId, duration: FiniteDuration, reason: String): Unit = {
def blacklist(blacklistId: BlacklistId, duration: FiniteDuration, reason: String): Unit = {
if (duration > Duration.Zero) {
if (blacklistedPeers.size >= maxBlacklistedNodes) {
removeOldestPeer()
Expand All @@ -30,13 +33,13 @@ trait BlacklistSupport {
}
}

def undoBlacklist(blacklistId: BlackListId): Unit = {
def undoBlacklist(blacklistId: BlacklistId): Unit = {
val peer = blacklistedPeers.get(blacklistId)
peer.foreach(_.cancel())
blacklistedPeers.remove(blacklistId)
}

def isBlacklisted(blacklistId: BlackListId): Boolean =
def isBlacklisted(blacklistId: BlacklistId): Boolean =
blacklistedPeers.exists(_._1 == blacklistId)

def handleBlacklistMessages: Receive = { case UnblacklistPeer(ref) =>
Expand All @@ -52,9 +55,6 @@ trait BlacklistSupport {

object BlacklistSupport {

abstract class BlackListId {
def value: String
}
private case class UnblacklistPeer(blacklistId: BlacklistId)

private case class UnblacklistPeer(blacklistId: BlackListId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.iohk.ethereum.utils.Config.SyncConfig
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

// will be removed once regular sync is switched to new blacklist/peerlist implementation
trait PeerListSupport {
self: Actor with ActorLogging with BlacklistSupport =>
import PeerListSupport._
Expand Down
Loading

0 comments on commit 4d04689

Please sign in to comment.