Skip to content

Commit

Permalink
perf($EventBus): support async event bus; support dynamic component scan
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnymillergh committed Jun 25, 2022
1 parent 6938193 commit fe1de45
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.jmsoftware.maf.authcenter.event

import com.google.common.eventbus.AllowConcurrentEvents
import com.google.common.eventbus.EventBus
import com.google.common.eventbus.Subscribe
import com.jmsoftware.maf.common.util.logger
import com.jmsoftware.maf.springcloudstarter.eventbus.DemoEvent
import com.jmsoftware.maf.springcloudstarter.eventbus.EventSubscriber

/**
* # DemoEventSubscriber
*
* Change description here.
*
* @author Johnny Miller (锺俊), email: johnnysviva@outlook.com, 6/23/22 7:46 AM
**/
@EventSubscriber
@Suppress("unused")
class DemoEventSubscriber(
private val eventBus: EventBus
) {
companion object {
private val log = logger()
}

@Subscribe
@AllowConcurrentEvents
fun subscribe(demoEvent: DemoEvent) {
log.info("Got subscription: $demoEvent")
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class AsyncConfiguration(
private val mafProjectProperties: MafProjectProperties
) {
companion object {
private const val QUEUE_CAPACITY = 10000
private const val QUEUE_CAPACITY = 10_000
private val log = logger()
}

Expand All @@ -54,7 +54,7 @@ class AsyncConfiguration(
return TaskExecutorCustomizer { taskExecutor: ThreadPoolTaskExecutor ->
taskExecutor.corePoolSize = corePoolSize
taskExecutor.maxPoolSize = corePoolSize * 3
taskExecutor.setQueueCapacity(QUEUE_CAPACITY)
taskExecutor.queueCapacity = QUEUE_CAPACITY
taskExecutor.setBeanName("asyncTaskExecutor")
taskExecutor.setThreadNamePrefix("${mafProjectProperties.projectArtifactId}-async-executor-")
// Specify the RejectedExecutionHandler to use for the ExecutorService.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.jmsoftware.maf.springcloudstarter.eventbus

import cn.hutool.core.util.IdUtil

/**
* # DemoEvent
*
* Change description here.
*
* @author Johnny Miller (锺俊), email: johnnysviva@outlook.com, 6/25/22 1:18 PM
**/
data class DemoEvent(
val id: String = IdUtil.nanoId(),
val name: String = DemoEvent::class.java.simpleName
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.jmsoftware.maf.springcloudstarter.eventbus

import org.springframework.context.annotation.Import
import org.springframework.context.annotation.*
import org.springframework.core.annotation.AliasFor
import kotlin.annotation.AnnotationRetention.*
import kotlin.annotation.AnnotationTarget.*


/**
* # EnableEventBus
Expand All @@ -9,8 +13,14 @@ import org.springframework.context.annotation.Import
*
* @author Johnny Miller (锺俊), email: johnnysviva@outlook.com, 6/23/22 7:52 AM
**/
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.RUNTIME)
@Target(CLASS)
@Retention(RUNTIME)
@MustBeDocumented
@Import(EventBugConfiguration::class)
annotation class EnableEventBus
@Import(
EventSubscriberRegistrar::class,
EventBugConfiguration::class
)
annotation class EnableEventBus(
@get:AliasFor(attribute = "basePackages") val value: Array<String> = [],
@get:AliasFor(attribute = "value") val basePackages: Array<String> = []
)
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.jmsoftware.maf.springcloudstarter.eventbus

import com.google.common.eventbus.AsyncEventBus
import com.google.common.eventbus.EventBus
import com.jmsoftware.maf.common.util.logger
import com.jmsoftware.maf.springcloudstarter.property.MafProjectProperties
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Import
import org.springframework.context.event.EventListener
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

/**
* # EventBugConfiguration
Expand All @@ -18,25 +19,34 @@ import org.springframework.context.event.EventListener
* @author Johnny Miller (锺俊), email: johnnysviva@outlook.com, 6/23/22 7:52 AM
**/
@ConditionalOnClass(EventBus::class)
@Import(EventSubscriberRegistrar::class)
class EventBugConfiguration(
private val mafProjectProperties: MafProjectProperties,
private val applicationContext: ApplicationContext
private val applicationContext: ApplicationContext,
private val applicationTaskExecutor: ThreadPoolTaskExecutor
) {
companion object {
private val log = logger()
}

@Bean
fun eventBus(): EventBus = EventBus("event-bus-${mafProjectProperties.projectArtifactId}")
fun eventBus(): EventBus = EventBus("event-bus-${mafProjectProperties.projectArtifactId}").apply {
log.warn("Initial bean: `${this.javaClass.simpleName}`")
}

@Bean
fun asyncEventBus(): AsyncEventBus =
AsyncEventBus("async-event-bus-${mafProjectProperties.projectArtifactId}", applicationTaskExecutor).apply {
log.warn("Initial bean: `${this.javaClass.simpleName}`")
}

@EventListener
fun onApplicationEvent(event: ApplicationReadyEvent) {
log.info("All Beans have been initialized. Elapsed: ${event.timeTaken.seconds} s")
val eventBus = applicationContext.getBean(EventBus::class.java)
val eventBus = applicationContext.getBean(AsyncEventBus::class.java)
applicationContext.getBeansWithAnnotation(EventSubscriber::class.java).forEach { (key, value) ->
eventBus.register(value)
log.info("Event bus registered subscriber: $key")
}
eventBus.post(DemoEvent())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import com.jmsoftware.maf.common.util.logger
import org.springframework.beans.factory.config.BeanDefinitionHolder
import org.springframework.beans.factory.support.AbstractBeanDefinition
import org.springframework.beans.factory.support.BeanDefinitionRegistry
import org.springframework.context.EnvironmentAware
import org.springframework.context.ResourceLoaderAware
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner
import org.springframework.context.annotation.Import
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar
import org.springframework.core.annotation.AnnotationAttributes
import org.springframework.core.env.Environment
import org.springframework.core.io.ResourceLoader
import org.springframework.core.type.AnnotationMetadata
import org.springframework.core.type.StandardAnnotationMetadata
import org.springframework.core.type.filter.AnnotationTypeFilter

/**
Expand All @@ -19,24 +24,47 @@ import org.springframework.core.type.filter.AnnotationTypeFilter
*
* @author Johnny Miller (锺俊), email: johnnysviva@outlook.com, 6/22/22 10:44 PM
* @see Subscribe
* @see <a href='https://stackoverflow.com/q/50808941/9728243'>How to get basePackages of @ComponentScan programatically at runtime?</a>
**/
class EventSubscriberRegistrar : ImportBeanDefinitionRegistrar, ResourceLoaderAware {
class EventSubscriberRegistrar : ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
companion object {
private val log = logger()

private fun getBasePackages(
metadata: StandardAnnotationMetadata,
attributes: AnnotationAttributes
): Set<String> {
return LinkedHashSet(attributes.getStringArray("basePackages").toList()).ifEmpty {
// If value attribute is not set, fallback to the package of the annotated class
log.warn("Returning the package of the underlying class: ${metadata.introspectedClass.name}")
setOf(metadata.introspectedClass.getPackage().name)
}
}
}

private lateinit var resourceLoaderMember: ResourceLoader
private lateinit var environment: Environment
private lateinit var resourceLoader: ResourceLoader

override fun registerBeanDefinitions(importingClassMetadata: AnnotationMetadata, registry: BeanDefinitionRegistry) {
EventSubscriberBeanDefinitionScanner(registry).apply {
this.resourceLoader = resourceLoaderMember
}.scan("com.jmsoftware.maf").apply {
val annotationAttributes = AnnotationAttributes(
importingClassMetadata.getAnnotationAttributes(EnableEventBus::class.java.canonicalName)!!
)
EventSubscriberBeanDefinitionScanner(registry, environment, resourceLoader).scan(
*getBasePackages(
importingClassMetadata as StandardAnnotationMetadata,
annotationAttributes
).toTypedArray()
).apply {
log.warn("Number of beans registered for @${EventSubscriber::class.simpleName}: $this")
}
}

override fun setEnvironment(environment: Environment) {
this.environment = environment
}

override fun setResourceLoader(resourceLoader: ResourceLoader) {
resourceLoaderMember = resourceLoader
this.resourceLoader = resourceLoader
}
}

Expand All @@ -47,19 +75,23 @@ class EventSubscriberRegistrar : ImportBeanDefinitionRegistrar, ResourceLoaderAw
*
* @author Johnny Miller (锺俊), email: johnnysviva@outlook.com, 6/22/22 10:51 PM
**/
private class EventSubscriberBeanDefinitionScanner(registry: BeanDefinitionRegistry) :
ClassPathBeanDefinitionScanner(registry, false) {
private class EventSubscriberBeanDefinitionScanner(
registry: BeanDefinitionRegistry,
environment: Environment,
resourceLoader: ResourceLoader
) :
ClassPathBeanDefinitionScanner(registry, false, environment, resourceLoader) {
companion object {
private val log = logger()
}

override fun doScan(vararg basePackages: String?): MutableSet<BeanDefinitionHolder> {
override fun doScan(vararg basePackages: String): MutableSet<BeanDefinitionHolder> {
addIncludeFilter(AnnotationTypeFilter(EventSubscriber::class.java))
return super.doScan(*basePackages)
}

override fun postProcessBeanDefinition(beanDefinition: AbstractBeanDefinition, beanName: String) {
log.info("Post process bean definition: $beanName")
log.info("Post process bean definition: `$beanName`")
super.postProcessBeanDefinition(beanDefinition, beanName)
}
}

0 comments on commit fe1de45

Please sign in to comment.