Skip to content

Commit

Permalink
Use existing channel events
Browse files Browse the repository at this point in the history
Instead of defining new events. We also keep a set of active channels to
ensure that duplicate events don't mess up our state (even though this
shouldn't happen, it feels safer).
  • Loading branch information
t-bast authored and thomash-acinq committed Dec 10, 2024
1 parent f708e10 commit ca2c7f2
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2588,13 +2588,6 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case _: TransientChannelData => None
}
context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt))

if (state == NORMAL) {
peer ! Peer.ChannelDeactivated
}
if (nextState == NORMAL) {
peer ! Peer.ChannelActivated
}
}

if (nextState == CLOSED) {
Expand Down
59 changes: 29 additions & 30 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Peer(val nodeParams: NodeParams,

context.system.eventStream.subscribe(self, classOf[CurrentFeerates])
context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight])
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])

startWith(INSTANTIATING, Nothing)

Expand All @@ -91,7 +92,7 @@ class Peer(val nodeParams: NodeParams,
} else {
None
}
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = 0, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait
}

when(DISCONNECTED) {
Expand Down Expand Up @@ -145,11 +146,11 @@ class Peer(val nodeParams: NodeParams,
d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
stay() using d.copy(peerStorage = d.peerStorage.copy(written = true))

case Event(ChannelActivated, d: DisconnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels + 1)
case Event(e: ChannelReadyForPayments, d: DisconnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)

case Event(ChannelDeactivated, d: DisconnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels - 1)
case Event(e: LocalChannelDown, d: DisconnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
}

when(CONNECTED) {
Expand Down Expand Up @@ -424,7 +425,7 @@ class Peer(val nodeParams: NodeParams,
}
stay()

case Event(e: ChannelReadyForPayments, _: ConnectedData) =>
case Event(e: ChannelReadyForPayments, d: ConnectedData) =>
pendingOnTheFlyFunding.foreach {
case (paymentHash, pending) =>
pending.status match {
Expand All @@ -440,7 +441,10 @@ class Peer(val nodeParams: NodeParams,
}
}
}
stay()
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)

case Event(e: LocalChannelDown, d: ConnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)

case Event(msg: HasChannelId, d: ConnectedData) =>
d.channels.get(FinalChannelId(msg.channelId)) match {
Expand Down Expand Up @@ -534,26 +538,25 @@ class Peer(val nodeParams: NodeParams,
d.peerConnection forward unknownMsg
stay()

case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels > 0 =>
// If we don't have any pending write operations, we write the updated peer storage to disk after a delay.
// This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations.
// If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay
// writing to the DB and may never store our peer's backup.
if (d.peerStorage.written) {
startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
case Event(store: PeerStorageStore, d: ConnectedData) =>
if (nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels.nonEmpty) {
// If we don't have any pending write operations, we write the updated peer storage to disk after a delay.
// This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations.
// If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay
// writing to the DB and may never store our peer's backup.
if (d.peerStorage.written) {
startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
}
stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false))
} else {
log.debug("ignoring peer storage (feature={}, channels={})", nodeParams.features.hasFeature(Features.ProvideStorage), d.activeChannels.mkString(","))
stay()
}
stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false))

case Event(WritePeerStorage, d: ConnectedData) =>
d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
stay() using d.copy(peerStorage = d.peerStorage.copy(written = true))

case Event(ChannelActivated, d: ConnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels + 1)

case Event(ChannelDeactivated, d: ConnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels - 1)

case Event(unhandledMsg: LightningMessage, _) =>
log.warning("ignoring message {}", unhandledMsg)
stay()
Expand Down Expand Up @@ -785,7 +788,7 @@ class Peer(val nodeParams: NodeParams,
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
}

private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage): State = {
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage): State = {
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}")
log.debug("got authenticated connection to address {}", connectionReady.address)

Expand Down Expand Up @@ -957,16 +960,16 @@ object Peer {

sealed trait Data {
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
def activeChannels: Int
def activeChannels: Set[ByteVector32] // channels that are available to process payments
def peerStorage: PeerStorage
}
case object Nothing extends Data {
override def channels = Map.empty
override def activeChannels: Int = 0
override def activeChannels: Set[ByteVector32] = Set.empty
override def peerStorage: PeerStorage = PeerStorage(None, written = true)
}
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage) extends Data
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Int, currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage) extends Data
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
def localFeatures: Features[InitFeature] = localInit.features
def remoteFeatures: Features[InitFeature] = remoteInit.features
Expand Down Expand Up @@ -1081,9 +1084,5 @@ object Peer {
case class RelayUnknownMessage(unknownMessage: UnknownMessage)

case object WritePeerStorage

case object ChannelActivated

case object ChannelDeactivated
// @formatter:on
}
21 changes: 16 additions & 5 deletions eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class PeerSpec extends FixtureSpec {
// let's simulate a connection
if (initializePeer) {
switchboard.send(peer, Peer.Init(channels, Map.empty))
channels.foreach(c => if (c.isInstanceOf[DATA_NORMAL]) peer ! ChannelActivated)
}
val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures())
switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = true, localInit, remoteInit))
Expand Down Expand Up @@ -759,22 +758,34 @@ class PeerSpec extends FixtureSpec {
test("peer storage") { f =>
import f._

// We connect with a previous backup.
val channel = ChannelCodecsSpec.normal
val peerConnection1 = peerConnection
val peerConnection2 = TestProbe()
val peerConnection3 = TestProbe()

nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef")
connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(ChannelCodecsSpec.normal), peerStorage = Some(hex"abcdef"))
connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(channel), peerStorage = Some(hex"abcdef"))
peer ! ChannelReadyForPayments(ActorRef.noSender, channel.remoteNodeId, channel.channelId, channel.commitments.latest.fundingTxIndex)
peerConnection1.send(peer, PeerStorageStore(hex"deadbeef"))
peerConnection1.send(peer, PeerStorageStore(hex"0123456789"))

// We disconnect and reconnect, sending the last backup we received.
peer ! Peer.Disconnect(f.remoteNodeId)
val peerConnection2 = TestProbe()
connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"0123456789"))
peerConnection2.send(peer, PeerStorageStore(hex"1111"))

// We reconnect again.
val peerConnection3 = TestProbe()
connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"1111"))
// Because of the delayed writes, we may not have stored the latest value immediately, but we will eventually store it.
eventually {
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
}

// Our channel closes, so we stop storing backups for that peer.
peer ! LocalChannelDown(ActorRef.noSender, channel.channelId, channel.shortIds, channel.remoteNodeId)
peerConnection3.send(peer, PeerStorageStore(hex"2222"))
peer ! WritePeerStorage
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw)

private val PeerNothingData = Peer.Nothing
private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = 0, PeerStorage(None, written = true))
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = 0, recommendedFeerates, None, PeerStorage(None, written = true))
private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(None, written = true))
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = Set.empty, recommendedFeerates, None, PeerStorage(None, written = true))

case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe)

Expand Down Expand Up @@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
import f._

val peer = TestProbe()
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = 0, PeerStorage(None, written = true))))
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = Set.empty, PeerStorage(None, written = true))))
monitor.expectNoMessage()
}

Expand Down

0 comments on commit ca2c7f2

Please sign in to comment.