Skip to content

Commit

Permalink
Merge pull request #218 from corda/ivan-catching-up-with-1.2
Browse files Browse the repository at this point in the history
catching up with 1.2
  • Loading branch information
ischasny authored Oct 29, 2020
2 parents ad257bd + 4dd3bd4 commit e4a4b85
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package com.r3.corda.lib.tokens.selection
import co.paralleluniverse.fibers.Suspendable
import com.r3.corda.lib.tokens.contracts.internal.schemas.PersistentFungibleToken
import com.r3.corda.lib.tokens.contracts.types.TokenType
import net.corda.core.CordaRuntimeException
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.services.vault.QueryCriteria
Expand Down Expand Up @@ -53,8 +54,19 @@ fun tokenAmountWithHolderCriteria(token: TokenType, holder: AbstractParty): Quer

/**
* An exception that is thrown where the specified criteria returns an amount of tokens
* that is not sufficient for the specified spend.
* that is not sufficient for the specified spend. If the amount of tokens *is* sufficient
* but there is not enough of non-locked tokens available to satisfy the amount then
* [InsufficientNotLockedBalanceException] will be thrown.
*
* @param message The exception message that should be thrown in this context
*/
class InsufficientBalanceException(message: String) : RuntimeException(message)
open class InsufficientBalanceException(message: String) : CordaRuntimeException(message)

/**
* An exception that is thrown where the specified criteria returns an amount of tokens
* that is sufficient for the specified spend, however there is not enough of non-locked tokens
* available to satisfy the amount.
*
* @param message The exception message that should be thrown in this context
*/
class InsufficientNotLockedBalanceException(message: String) : InsufficientBalanceException(message)
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,41 @@ class DatabaseTokenSelection @JvmOverloads constructor(
val logger = contextLogger()
}

/** Queries for held token amounts with the specified token to the specified requiredAmount. */
/**
* Queries for held token amounts with the specified token to the specified requiredAmount.
*
* @return the amount of claimed tokens (effectively the sum of values of the states in [stateAndRefs]
* */
private fun executeQuery(
requiredAmount: Amount<TokenType>,
lockId: UUID,
additionalCriteria: QueryCriteria,
sorter: Sort,
stateAndRefs: MutableList<StateAndRef<FungibleToken>>,
includeSoftLocked: Boolean,
softLockingType: QueryCriteria.SoftLockingType = QueryCriteria.SoftLockingType.UNLOCKED_ONLY
): Boolean {
): Amount<TokenType> {
// Didn't need to select any tokens.
if (requiredAmount.quantity == 0L) {
return false
return Amount(0, requiredAmount.token)
}

// Enrich QueryCriteria with additional default attributes (such as soft locks).
// We only want to return RELEVANT states here.
val baseCriteria = QueryCriteria.VaultQueryCriteria(
val baseCriteria = if (!includeSoftLocked) {
QueryCriteria.VaultQueryCriteria(
contractStateTypes = setOf(FungibleToken::class.java),
softLockingCondition = QueryCriteria.SoftLockingCondition(softLockingType, listOf(lockId)),
relevancyStatus = Vault.RelevancyStatus.RELEVANT,
status = Vault.StateStatus.UNCONSUMED
)
)
} else {
QueryCriteria.VaultQueryCriteria(
contractStateTypes = setOf(FungibleToken::class.java),
relevancyStatus = Vault.RelevancyStatus.RELEVANT,
status = Vault.StateStatus.UNCONSUMED
)
}

var pageNumber = DEFAULT_PAGE_NUM
var claimedAmount = 0L
Expand All @@ -96,19 +109,37 @@ class DatabaseTokenSelection @JvmOverloads constructor(

val claimedAmountWithToken = Amount(claimedAmount, requiredAmount.token)
// No tokens available.
if (stateAndRefs.isEmpty()) return false
// There were not enough tokens available.
if (claimedAmountWithToken < requiredAmount) {
logger.trace("TokenType selection requested $requiredAmount but retrieved $claimedAmountWithToken with state refs: ${stateAndRefs.map { it.ref }}")
return false
}
if (stateAndRefs.isEmpty()) return Amount(0, requiredAmount.token)

// We picked enough tokensToIssue, so softlock and go.
logger.trace("TokenType selection for $requiredAmount retrieved ${stateAndRefs.count()} states totalling $claimedAmountWithToken: $stateAndRefs")
services.vaultService.softLockReserve(lockId, stateAndRefs.map { it.ref }.toNonEmptySet())
return true
return claimedAmountWithToken
}

/**
* Queries for held token amounts with the specified token to the specified requiredAmount
* AND tries to soft lock the selected tokens.
*/
private fun executeQueryAndReserve(
requiredAmount: Amount<TokenType>,
lockId: UUID,
additionalCriteria: QueryCriteria,
sorter: Sort,
stateAndRefs: MutableList<StateAndRef<FungibleToken>>,
softLockingType: QueryCriteria.SoftLockingType = QueryCriteria.SoftLockingType.UNLOCKED_ONLY
): Boolean {
// not including soft locked tokens
val claimedAmount = executeQuery(requiredAmount, lockId, additionalCriteria, sorter, stateAndRefs, false, softLockingType)
return if (claimedAmount >= requiredAmount) {
// We picked enough tokensToIssue, so softlock and go.
logger.trace("TokenType selection for $requiredAmount retrieved ${stateAndRefs.count()} states totalling $claimedAmount: $stateAndRefs")
services.vaultService.softLockReserve(lockId, stateAndRefs.map { it.ref }.toNonEmptySet())
true
} else {
logger.trace("TokenType selection requested $requiredAmount but retrieved $claimedAmount with state refs: ${stateAndRefs.map { it.ref }}")
false
}
}


@Suspendable
override fun selectTokens(
holder: Holder,
Expand All @@ -119,7 +150,7 @@ class DatabaseTokenSelection @JvmOverloads constructor(
val criteria = constructQueryCriteria(requiredAmount, holder, queryBy)
val stateAndRefs = mutableListOf<StateAndRef<FungibleToken>>()
for (retryCount in 1..maxRetries) {
if (!executeQuery(requiredAmount, lockId, criteria, sortByStateRefAscending(), stateAndRefs)) {
if (!executeQueryAndReserve(requiredAmount, lockId, criteria, sortByStateRefAscending(), stateAndRefs)) {
// TODO: Need to specify exactly why it fails. Locked states or literally _no_ states!
// No point in retrying if there will never be enough...
logger.warn("TokenType selection failed on attempt $retryCount.")
Expand All @@ -129,8 +160,16 @@ class DatabaseTokenSelection @JvmOverloads constructor(
val durationMillis = (minOf(retrySleep.shl(retryCount), retryCap / 2) * (1.0 + Math.random())).toInt()
FlowLogic.sleep(durationMillis.millis)
} else {
logger.warn("Insufficient spendable states identified for $requiredAmount.")
throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.")
// if there is enough soft locked tokens available to satisfy the amount then we need to throw
// [InsufficientNotLockedBalanceException] instead
val amountWithSoftLocked = executeQuery(requiredAmount, lockId, criteria, sortByStateRefAscending(), mutableListOf(), true)
if (amountWithSoftLocked < requiredAmount) {
logger.warn("Insufficient spendable states identified for $requiredAmount.")
throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.")
} else {
logger.warn("Insufficient not locked spendable states identified for $requiredAmount.")
throw InsufficientNotLockedBalanceException("Insufficient not locked spendable states identified for $requiredAmount.")
}
}
} else {
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.r3.corda.lib.tokens.contracts.types.TokenType
import com.r3.corda.lib.tokens.contracts.utilities.withoutIssuer
import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
import com.r3.corda.lib.tokens.selection.InsufficientBalanceException
import com.r3.corda.lib.tokens.selection.InsufficientNotLockedBalanceException
import com.r3.corda.lib.tokens.selection.memory.internal.Holder
import com.r3.corda.lib.tokens.selection.memory.internal.lookupExternalIdFromKey
import com.r3.corda.lib.tokens.selection.sortByStateRefAscending
Expand Down Expand Up @@ -266,19 +267,23 @@ class VaultWatcherService(private val tokenObserver: TokenObserver,

val requiredAmountWithoutIssuer = requiredAmount.withoutIssuer()
var amountLocked: Amount<TokenType> = requiredAmountWithoutIssuer.copy(quantity = 0)
// this is the running total of soft locked tokens that we encounter until the target token amount is reached
var amountAlreadySoftLocked: Amount<TokenType> = requiredAmountWithoutIssuer.copy(quantity = 0)
val finalPredicate = enrichedPredicate.get()
for (tokenStateAndRef in bucket) {
// Does the token satisfy the (optional) predicate eg. issuer?
if (finalPredicate.invoke(tokenStateAndRef)) {
val tokenAmount = uncheckedCast(tokenStateAndRef.state.data.amount.withoutIssuer())
// if so, race to lock the token, expected oldValue = PLACE_HOLDER
if (__backingMap.replace(tokenStateAndRef, PLACE_HOLDER, selectionId)) {
// we won the race to lock this token
lockedTokens.add(tokenStateAndRef)
val token = tokenStateAndRef.state.data
amountLocked += uncheckedCast(token.amount.withoutIssuer())
amountLocked += tokenAmount
if (amountLocked >= requiredAmountWithoutIssuer) {
break
}
} else {
amountAlreadySoftLocked += tokenAmount
}
}
}
Expand All @@ -287,7 +292,11 @@ class VaultWatcherService(private val tokenObserver: TokenObserver,
lockedTokens.forEach {
unlockToken(it, selectionId)
}
throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.")
if (amountLocked + amountAlreadySoftLocked < requiredAmountWithoutIssuer) {
throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.")
} else {
throw InsufficientNotLockedBalanceException("Insufficient not-locked spendable states identified for $requiredAmount.")
}
}

UPDATER.schedule({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.r3.corda.lib.tokens.integrationTest

import com.r3.corda.lib.ci.workflows.RequestKey
import com.r3.corda.lib.ci.workflows.RequestKeyFlow
import com.r3.corda.lib.tokens.contracts.states.FungibleToken
import com.r3.corda.lib.tokens.contracts.states.NonFungibleToken
import com.r3.corda.lib.tokens.contracts.types.IssuedTokenType
import com.r3.corda.lib.tokens.contracts.types.TokenType
import com.r3.corda.lib.tokens.contracts.utilities.*
import com.r3.corda.lib.tokens.money.GBP
import com.r3.corda.lib.tokens.money.USD
import com.r3.corda.lib.tokens.selection.InsufficientNotLockedBalanceException
import com.r3.corda.lib.tokens.testing.states.House
import com.r3.corda.lib.tokens.testing.states.Ruble
import com.r3.corda.lib.tokens.workflows.flows.rpc.ConfidentialIssueTokens
Expand Down Expand Up @@ -50,6 +50,7 @@ import org.hamcrest.CoreMatchers.`is`
import org.hamcrest.CoreMatchers.equalTo
import org.junit.Assert
import org.junit.Test
import kotlin.test.assertFailsWith

class TokenDriverTest {

Expand Down Expand Up @@ -297,14 +298,14 @@ class TokenDriverTest {

// Restart the node
val restartedNode = startNode(providedName = DUMMY_BANK_A_NAME, customOverrides = mapOf("p2pAddress" to "localhost:30000")).getOrThrow()
// Try to spend same states, they should be locked after restart, so we expect insufficient balance exception to be thrown.
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
// Try to spend same states, they should be locked after restart, so we expect insufficient not locked balance exception to be thrown.
assertFailsWith<InsufficientNotLockedBalanceException> {
restartedNode.rpc.startFlowDynamic(
SelectAndLockFlow::class.java,
50.GBP,
10.millis
SelectAndLockFlow::class.java,
50.GBP,
10.millis
).returnValue.getOrThrow()
}.withMessageContaining("InsufficientBalanceException: Insufficient spendable states identified for 50.00 TokenType(tokenIdentifier='GBP', fractionDigits=2)")
}
// This should just work.
restartedNode.rpc.startFlowDynamic(
SelectAndLockFlow::class.java,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ class OwnerMigration : CustomSqlChange {
private val logger = contextLogger()
}

private object AMQPInspectorSerializationScheme : AbstractAMQPSerializationScheme(emptyList()) {
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
return true
}

override fun rpcClientSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
override fun rpcServerSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
}
private object AMQPInspectorSerializationScheme : AbstractAMQPSerializationScheme(emptyList()) {
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
return true
}

override fun rpcClientSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
override fun rpcServerSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
}

val serializationFactory = SerializationFactoryImpl().apply {
registerScheme(AMQPInspectorSerializationScheme)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import com.r3.corda.lib.tokens.workflows.utilities.getPreferredNotary
import com.r3.corda.lib.tokens.workflows.utilities.ourSigningKeys
import net.corda.core.contracts.Amount
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.seconds
import net.corda.core.utilities.toNonEmptySet
import net.corda.core.utilities.unwrap
import java.time.Duration
import java.time.temporal.ChronoUnit
Expand Down Expand Up @@ -157,4 +159,4 @@ class JustLocalSelect(val amount: Amount<TokenType>, val timeBetweenSelects: Dur
}
throw InsufficientBalanceException("Could not select: ${amount}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import com.r3.corda.lib.tokens.money.CHF
import com.r3.corda.lib.tokens.money.GBP
import com.r3.corda.lib.tokens.money.USD
import com.r3.corda.lib.tokens.selection.InsufficientBalanceException
import com.r3.corda.lib.tokens.selection.InsufficientNotLockedBalanceException
import com.r3.corda.lib.tokens.selection.TokenQueryBy
import com.r3.corda.lib.tokens.selection.database.selector.DatabaseTokenSelection
import com.r3.corda.lib.tokens.workflows.flows.move.addMoveFungibleTokens
import com.r3.corda.lib.tokens.workflows.types.PartyAndAmount
import com.r3.corda.lib.tokens.workflows.utilities.tokenAmountWithIssuerCriteria
import net.corda.core.contracts.StateRef
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.toNonEmptySet
import net.corda.testing.node.StartedMockNode
import org.junit.Assert
import org.junit.Before
Expand Down Expand Up @@ -84,6 +87,28 @@ class DatabaseTokenSelectionTests : MockNetworkTest(numberOfNodes = 4) {
}
}

@Test
fun `not enough not locked tokens available`() {
val tokenSelection = DatabaseTokenSelection(A.services)
val uuid = UUID.randomUUID()

// issuing tokens in two tranches so we can lock one of those
val issueTransaction = I.issueFungibleTokens(A, 900.BTC).toCompletableFuture().get()
I.issueFungibleTokens(A, 100.BTC).toCompletableFuture().get()

// locking the bigger state
A.transaction {
A.services.vaultService.softLockReserve(uuid, setOf(StateRef(issueTransaction.tx.id, 0)).toNonEmptySet())
}

assertFailsWith<InsufficientNotLockedBalanceException> {
A.transaction {
tokenSelection.selectTokens(200.BTC, lockId = uuid)
}
}
}


@Test
fun `generate move test`() {
val transactionBuilder = TransactionBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.r3.corda.lib.tokens.contracts.states.FungibleToken
import com.r3.corda.lib.tokens.money.GBP
import com.r3.corda.lib.tokens.money.USD
import com.r3.corda.lib.tokens.selection.InsufficientBalanceException
import com.r3.corda.lib.tokens.selection.InsufficientNotLockedBalanceException
import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
import com.r3.corda.lib.tokens.selection.memory.internal.Holder
import com.r3.corda.lib.tokens.selection.memory.internal.lookupExternalIdFromKey
Expand Down Expand Up @@ -56,6 +57,24 @@ class InMemorySelectionTest {
}
}

@Test(expected = InsufficientNotLockedBalanceException::class)
fun `insufficient balance selection - should throw InsufficientNotLockedBalanceException when there is not enough not locked tokens available`() {
val (vaultObserver, observable) = getExternalIdVaultObserver()
val vaultWatcherService = VaultWatcherService(vaultObserver, InMemorySelectionConfig.defaultConfig())
val uuid = UUID.randomUUID()
val key = services.keyManagementService.freshKey(uuid)

// placing two states of 100 and 50 USD into the observer, then soft locking the 100-one.
// The test should fail with InsufficientNotLockedBalanceException when trying to select 60 USD.
val biggerStateAndRef = VaultWatcherServiceTest.createNewFiatCurrencyTokenRef(100, key, VaultWatcherServiceTest.notary1, VaultWatcherServiceTest.issuer1, USD, observable, database)
VaultWatcherServiceTest.createNewFiatCurrencyTokenRef(50, key, VaultWatcherServiceTest.notary1, VaultWatcherServiceTest.issuer1, USD, observable, database)
vaultWatcherService.lockTokensExternal(listOf(biggerStateAndRef), UUID.randomUUID().toString())

database.transaction {
vaultWatcherService.selectTokens(Holder.MappedIdentity(uuid), Amount(60, USD), selectionId = "abc")
}
}

@Test
fun `indexing and selection by public key`() {
val (vaultObserver, observable) = getPublicKeyVaultObserver()
Expand Down

0 comments on commit e4a4b85

Please sign in to comment.