Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profile, KYC and config modules added #391

Merged
merged 40 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e7e53ca
start profile module
fatemeh-i Jul 24, 2023
a3323b5
try to manage permissions
fatemeh-i Jul 24, 2023
5c7e336
try to organize limitations
fatemeh-i Jul 25, 2023
efee99b
try to organize actions history
fatemeh-i Jul 26, 2023
8c2704e
start kyc module
fatemeh-i Jul 29, 2023
9b0952f
init kyc module
fatemeh-i Jul 29, 2023
b63441f
develope diffrent kyc level process
fatemeh-i Jul 30, 2023
2cc24f1
link profile manager to kyc manager
fatemeh-i Jul 31, 2023
d31328d
fix some bugs
fatemeh-i Aug 1, 2023
f3943ee
set storage proxy for kyc module
fatemeh-i Aug 2, 2023
05f3a0f
try to set pointcut for dao.save
fatemeh-i Aug 10, 2023
bd60004
sync kyc and profile module in kycLevelUpdated messages
fatemeh-i Aug 13, 2023
b4f0914
review and tes kyc services
fatemeh-i Aug 14, 2023
b38a8d9
review and test kyc services
fatemeh-i Aug 15, 2023
f0180d4
add related account services
fatemeh-i Aug 20, 2023
ac98a71
check securityContext
fatemeh-i Aug 21, 2023
658ee75
test services and fig some buges
fatemeh-i Aug 22, 2023
28d8038
check and test profile services
fatemeh-i Aug 23, 2023
c3c3696
review security layers
fatemeh-i Aug 26, 2023
1308276
mint kyc level in jwt token
fatemeh-i Sep 4, 2023
81aa7db
manage whitelist in user-management module
fatemeh-i Sep 5, 2023
3260df3
fix a typo
fatemeh-i Sep 10, 2023
815b828
link profile and accontant modules
fatemeh-i Sep 10, 2023
442a82c
avoid push unvaluable kycLevelUpdated event
fatemeh-i Sep 11, 2023
490710a
Merge branch 'dev' into profile
Marchosiax Sep 12, 2023
ed2aa07
Fix maven version mismatch
Marchosiax Sep 12, 2023
9acf498
Fix build failure
Marchosiax Sep 12, 2023
e9bed27
rm excessive dependence
fatemeh-i Sep 13, 2023
6b03cb8
rm excessive dependence
fatemeh-i Sep 13, 2023
4ca4bfb
Fix api @PathVariable error
Marchosiax Sep 18, 2023
856cd44
Web and user config (#390)
Marchosiax Oct 1, 2023
ff04a58
Remove consul from config
Marchosiax Oct 2, 2023
9f46d1f
Increase log level of config module
Marchosiax Oct 4, 2023
0d4c7b1
Remove kafka config
Marchosiax Oct 4, 2023
851dd51
Authorize actuator path for all requests
Marchosiax Oct 4, 2023
b3fffed
Remove requirement of uuid in user config services
Marchosiax Oct 4, 2023
95288c2
Merge branch 'dev' into profile
Marchosiax Oct 9, 2023
81639e9
Merge pull request #388 from opexdev/profile
fatemeh-i Oct 9, 2023
7db23c4
fix accountant conflict
fatemeh-i Oct 9, 2023
eabac88
Merge pull request #389 from opexdev/user-level-management
fatemeh-i Oct 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
java: [ 11 ]
java: [ 17 ]
name: Build OPEX and run tests with java ${{ matrix.java }}
steps:
- name: Checkout Source Code
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
java: [ 11 ]
java: [ 17 ]
name: Build OPEX and run tests with java ${{ matrix.java }}
steps:
- name: Checkout Source Code
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
java: [ 11 ]
java: [ 17 ]
name: Build OPEX and run tests with java ${{ matrix.java }}
steps:
- name: Checkout Source Code
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.nilin.opex.accountant.app.config

import co.nilin.opex.accountant.app.listener.*
import co.nilin.opex.accountant.app.listener.AccountantEventListener
import co.nilin.opex.accountant.app.listener.AccountantTempEventListener
import co.nilin.opex.accountant.app.listener.AccountantTradeListener
Expand All @@ -12,11 +13,11 @@ import co.nilin.opex.accountant.core.service.FinancialActionJobManagerImpl
import co.nilin.opex.accountant.core.service.OrderManagerImpl
import co.nilin.opex.accountant.core.service.TradeManagerImpl
import co.nilin.opex.accountant.core.spi.*
import co.nilin.opex.accountant.ports.kafka.listener.consumer.*
import co.nilin.opex.accountant.ports.kafka.listener.consumer.EventKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.OrderKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.TempEventKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.TradeKafkaListener
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand Down Expand Up @@ -113,6 +114,10 @@ class AppConfig {
return AccountantTempEventListener(orderManager, tradeManager)
}





@Autowired
fun configureOrderListener(orderKafkaListener: OrderKafkaListener, orderListener: OrderListener) {
orderKafkaListener.addListener(orderListener)
Expand All @@ -128,16 +133,21 @@ class AppConfig {

@Autowired
fun configureEventListener(
eventKafkaListener: EventKafkaListener,
accountantEventListener: AccountantEventListener
eventKafkaListener: EventKafkaListener,
accountantEventListener: AccountantEventListener,
kycLevelUpdatedKafkaListener: KycLevelUpdatedKafkaListener,
kycLevelUpdatedEventListener: KycLevelUpdatedListener
) {
eventKafkaListener.addListener(accountantEventListener)
kycLevelUpdatedKafkaListener.addEventListener(kycLevelUpdatedEventListener)

}

@Autowired
fun configureTempEventListener(
tempEventKafkaListener: TempEventKafkaListener,
accountantTempEventListener: AccountantTempEventListener
accountantTempEventListener: AccountantTempEventListener,

) {
tempEventKafkaListener.addListener(accountantTempEventListener)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package co.nilin.opex.accountant.app.listener

import co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent
import co.nilin.opex.accountant.ports.kafka.listener.spi.KycLevelUpdatedEventListener
import co.nilin.opex.accountant.ports.postgres.impl.UserLevelLoaderImpl
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch

@Component
class KycLevelUpdatedListener(val userLevelLoaderImpl: UserLevelLoaderImpl) : KycLevelUpdatedEventListener {

private val logger = LoggerFactory.getLogger(KycLevelUpdatedListener::class.java)
val scope = CoroutineScope(Dispatchers.IO)
override fun id(): String {
return "KycLevelUpdatedListener"
}

override fun onEvent(event: KycLevelUpdatedEvent,
partition: Int, offset: Long, timestamp: Long, eventId: String) {
logger.info("==========================================================================")
logger.info("Incoming UserLevelUpdated event: $event")
logger.info("==========================================================================")
scope.launch {
userLevelLoaderImpl.update(event.userId, event.kycLevel)
}
}


}
10 changes: 7 additions & 3 deletions accountant/accountant-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@
<artifactId>mockk</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<scope>test</scope>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package co.nilin.opex.accountant.core.inout


import co.nilin.opex.accountant.core.model.KycLevel
import java.time.LocalDateTime

data class KycLevelUpdatedEvent(var userId: String, var kycLevel: KycLevel, var updateDate: LocalDateTime)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package co.nilin.opex.accountant.core.model

enum class KycLevel {
Level1, Level2,
}

Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package co.nilin.opex.accountant.core.spi

import co.nilin.opex.accountant.core.model.KycLevel

interface UserLevelLoader {

suspend fun load(uuid: String): String

suspend fun update(uuid: String,userLevel:KycLevel)

}
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package co.nilin.opex.accountant.core.service

import co.nilin.opex.accountant.core.spi.JsonMapper
import com.google.gson.Gson
import org.springframework.boot.json.GsonJsonParser
import com.fasterxml.jackson.databind.ObjectMapper

class JsonMapperTestImpl : JsonMapper {

private val objectMapper = ObjectMapper().apply {
findAndRegisterModules()
}

override fun serialize(input: Any): String? {
return Gson().toJson(input)
return objectMapper.writeValueAsString(input)
}

override fun <T> deserialize(input: String, t: Class<T>): T {
return Gson().fromJson(input, t)
return objectMapper.readValue(input, t)
}

override fun toMap(input: Any): Map<String, Any> {
return GsonJsonParser().parseMap(serialize(input))
return objectMapper.convertValue(input, Map::class.java) as Map<String, Any>
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package co.nilin.opex.accountant.ports.kafka.listener.config

import co.nilin.opex.accountant.ports.kafka.listener.consumer.EventKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.OrderKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.TempEventKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.TradeKafkaListener
import co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent
import co.nilin.opex.accountant.ports.kafka.listener.consumer.*
import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
Expand All @@ -14,9 +12,7 @@ import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.*
import org.springframework.kafka.listener.*
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.util.backoff.FixedBackOff
Expand All @@ -39,7 +35,7 @@ class AccountantKafkaConfig {
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*",
JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderCancelRequestEvent"
JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderCancelRequestEvent,kyc_level_updated_event:co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent"
)
}

Expand All @@ -48,6 +44,11 @@ class AccountantKafkaConfig {
return DefaultKafkaConsumerFactory(consumerConfigs)
}

@Bean("KycConsumerFactory")
fun kycConsumerFactory(@Qualifier("consumerConfig") consumerConfigs: Map<String, Any?>): ConsumerFactory<String, KycLevelUpdatedEvent> {
return DefaultKafkaConsumerFactory(consumerConfigs)
}

@Autowired
@ConditionalOnBean(TradeKafkaListener::class)
fun configureTradeListener(
Expand Down Expand Up @@ -108,6 +109,31 @@ class AccountantKafkaConfig {
container.start()
}

@Bean("kycLevelUpdatedProducerFactory")
fun producerFactory(@Qualifier("consumerConfig") producerConfigs: Map<String, Any>): ProducerFactory<String, KycLevelUpdatedEvent> {
return DefaultKafkaProducerFactory(producerConfigs)
}

@Bean("kycLevelUpdatedKafkaTemplate")
fun kafkaTemplate(@Qualifier("kycLevelUpdatedProducerFactory") producerFactory: ProducerFactory<String, KycLevelUpdatedEvent>): KafkaTemplate<String, KycLevelUpdatedEvent> {
return KafkaTemplate(producerFactory)
}
@Autowired
@ConditionalOnBean(KycLevelUpdatedKafkaListener::class)
fun configureKycLevelUpdatedListener(
listener: KycLevelUpdatedKafkaListener,
@Qualifier("kycLevelUpdatedKafkaTemplate") template: KafkaTemplate<String, KycLevelUpdatedEvent>,
@Qualifier("KycConsumerFactory") consumerFactory: ConsumerFactory<String, KycLevelUpdatedEvent>
) {
val containerProps = ContainerProperties(Pattern.compile("kyc_level_updated"))
containerProps.messageListener = listener
val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps)
container.setBeanName("KycLevelUpdatedKafkaListenerContainer")
container.commonErrorHandler = createConsumerErrorHandler(template, "kyc_level_updated.DLT")
container.start()
}


private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler {
val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ ->
cr.headers().add("dlt-origin-module", "ACCOUNTANT".toByteArray())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package co.nilin.opex.accountant.ports.kafka.listener.consumer


import co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent
import co.nilin.opex.accountant.ports.kafka.listener.spi.KycLevelUpdatedEventListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.MessageListener
import org.springframework.stereotype.Component

@Component
class KycLevelUpdatedKafkaListener : MessageListener<String, KycLevelUpdatedEvent> {
val eventListeners = arrayListOf<KycLevelUpdatedEventListener>()
private val logger = LoggerFactory.getLogger(KycLevelUpdatedKafkaListener::class.java)
override fun onMessage(data: ConsumerRecord<String, KycLevelUpdatedEvent>) {

eventListeners.forEach { tl ->
logger.info("incoming new event " + tl.id())
tl.onEvent(data.value(), data.partition(), data.offset(), data.timestamp(), tl.id())
}
}

fun addEventListener(tl: KycLevelUpdatedEventListener) {
eventListeners.add(tl)
}

fun removeEventListener(tl: KycLevelUpdatedEventListener) {
eventListeners.removeIf { item ->
item.id() == tl.id()
}

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package co.nilin.opex.accountant.ports.kafka.listener.spi

import co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent


interface KycLevelUpdatedEventListener {
fun id(): String
fun onEvent(event: KycLevelUpdatedEvent, partition: Int, offset: Long, timestamp: Long, eventId: String)

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import reactor.core.publisher.Mono
@Repository
interface UserLevelRepository : ReactiveCrudRepository<UserLevelModel, String> {

fun findByLevel(level: String): Mono<UserLevelModel>
@Query("insert into user_level (level) values (:level) on conflict do nothing")
fun insert(level: String): Mono<Void>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import co.nilin.opex.accountant.core.spi.TempEventPersister
import co.nilin.opex.accountant.ports.postgres.dao.TempEventRepository
import co.nilin.opex.accountant.ports.postgres.model.TempEventModel
import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent
import com.google.gson.Gson
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.awaitFirstOrNull
Expand All @@ -16,15 +17,18 @@ import org.springframework.stereotype.Component
import java.time.LocalDateTime

@Component
class TempEventPersisterImpl(private val tempEventRepository: TempEventRepository) : TempEventPersister {
class TempEventPersisterImpl(
private val tempEventRepository: TempEventRepository,
private val objectMapper: ObjectMapper
) : TempEventPersister {

override suspend fun saveTempEvent(ouid: String, event: CoreEvent) {
tempEventRepository.save(
TempEventModel(
null,
ouid,
event.javaClass.name,
Gson().toJson(event),
objectMapper.writeValueAsString(event),
LocalDateTime.now()
)
).awaitSingleOrNull()
Expand All @@ -33,7 +37,7 @@ class TempEventPersisterImpl(private val tempEventRepository: TempEventRepositor
override suspend fun loadTempEvents(ouid: String): List<CoreEvent> {
return tempEventRepository
.findByOuid(ouid)
.map { Gson().fromJson(it.eventBody, Class.forName(it.eventType)) as CoreEvent }
.map { objectMapper.readValue<CoreEvent>(it.eventBody) }
.toList()
}

Expand All @@ -52,7 +56,7 @@ class TempEventPersisterImpl(private val tempEventRepository: TempEventRepositor
TempEvent(
it.id!!,
it.ouid,
Gson().fromJson(it.eventBody, Class.forName(it.eventType)) as CoreEvent,
objectMapper.readValue(it.eventBody),
it.eventDate
)
}
Expand Down
Loading
Loading