Skip to content

Commit

Permalink
Rework grabbing mechanism, we need to take into account dates for eac…
Browse files Browse the repository at this point in the history
…h individual ISIN
  • Loading branch information
Pozo committed Jul 13, 2024
1 parent e9d492c commit f485ac9
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package com.github.pozo.investmentfunds.api.grabber

import com.github.pozo.investmentfunds.api.redis.RedisService
import com.github.pozo.investmentfunds.domain.DataFlowConstants
import org.apache.camel.ProducerTemplate
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import redis.clients.jedis.resps.Tuple
import java.text.SimpleDateFormat
import java.util.*


@Service
Expand All @@ -19,65 +14,11 @@ class GrabberService constructor(

private val logger = LoggerFactory.getLogger(GrabberService::class.java)

private val format = SimpleDateFormat(DataFlowConstants.GRAB_DATA_COMMAND_DATE_FORMAT.field)

private val redisEntryFormat = SimpleDateFormat(DataFlowConstants.RATES_KEY_DATE_FORMAT.field)

@Scheduled(cron = "0 0 * * * *")
@Scheduled(cron = "0 0 */6 * * *")
override fun trigger() {
val lastSuccessfulGrabbing = RedisService.jedis.get(DataFlowConstants.GRAB_DATA_LATEST_DATE_KEY.field)

logger.info("Initiating CSV retrieval mechanism. The last trigger date was $lastSuccessfulGrabbing.")

if (lastSuccessfulGrabbing == null || lastSuccessfulGrabbing.isEmpty()) {
logger.info("Initiating full retrieval. Retrieving data from '${DataFlowConstants.START_YEAR_DATE.field}'")
initialDataRetriaval(DataFlowConstants.START_YEAR_DATE.field)
} else {
val latestEntry = getMostRecentEntry()
if (latestEntry == null) {
// The redis is empty
logger.info("Initiating full retrieval. The redis store contains invalid data.")
initialDataRetriaval(DataFlowConstants.START_YEAR_DATE.field)
} else {
val latestEntryDate = redisEntryFormat.parse(extractDate(latestEntry.element))
val lastSuccessfulGrabbingDate = format.parse(lastSuccessfulGrabbing)

if (lastSuccessfulGrabbingDate.before(latestEntryDate)) {
// The data is invalid
// The last grabbing mechanism failed at some point
logger.info("Initiating partially retrieval. The redis store contains invalid data, the last grabbing failed at some point.")
initialDataRetriaval(lastSuccessfulGrabbing)
} else {
// Normal behaviour
// It's still possible that the CSV files are empty, because we don't have new data every day
logger.info("Initiating data retrieval.")
initialDataRetriaval(format.format(latestEntryDate))
}
}
}
logger.info("Initiating CSV retrieval mechanism.")
producerTemplate.sendBody("direct:grab-data", null)
}

private fun initialDataRetriaval(startDate: String?) {
logger.info("Retrieving data from '$startDate'")
producerTemplate.sendBody(
"direct:grab-data",
"$startDate${DataFlowConstants.GRAB_DATA_COMMAND_SEPARATOR.field}${format.format(Date())}"
)
}

private fun getMostRecentEntry(): Tuple? {
return RedisService.jedis.pipelined().use { pipeline ->
val results = RedisService.jedis.keys("rate:keys#*")
.map { RedisService.jedis.zrevrangeByScoreWithScores(it, "+inf", "-inf", 0, 1) }
.mapNotNull { it.firstOrNull() }
.maxByOrNull { it.score }
pipeline.sync()
return@use results
}
}

private fun extractDate(input: String): String? {
return input.split("#").takeIf { it.size > 1 }?.get(1)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.github.pozo.investmentfunds.api.grabber


import com.github.pozo.investmentfunds.api.grabber.processors.CsvProcessor
import com.github.pozo.investmentfunds.api.grabber.processors.CsvProcessor.ISIN_HEADER_NAME
import com.github.pozo.investmentfunds.api.grabber.processors.ISINProcessor
import com.github.pozo.investmentfunds.api.grabber.processors.ISINProcessor.END_DATE_HEADER_NAME
import com.github.pozo.investmentfunds.api.grabber.processors.ISINProcessor.ISIN_LIST_HEADER_NAME
Expand All @@ -11,31 +10,27 @@ import com.github.pozo.investmentfunds.api.grabber.processors.RedisProcessor
import org.apache.camel.LoggingLevel
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.model.dataformat.CsvDataFormat
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component

@Component
class InvestmentFundsRoutes constructor(
@Value("\${processors.isin-group-size}") private val isinGroupSize: Int
) : RouteBuilder() {
class InvestmentFundsRoutes : RouteBuilder() {

companion object {
const val FUND_DATA_ROUTE_NAME = "direct:fund-data"
const val RATE_DATA_ROUTE_NAME = "direct:rate-data"
}

override fun configure() {
from("direct:grab-data")
.onCompletion().onCompleteOnly().to("direct:end-csv-processing").end()
.filter(ISINProcessor.isValidIntervalValues())
.process(ISINProcessor.setISINIntervalHeaderValues())
.log("Received message with body: \${body} and headers: \${headers}")
from("direct:grab-data").routeId("grab-data")
.onCompletion().to("direct:end-csv-processing").end()
.to("https://www.bamosz.hu/egyes-alapok-kivalasztasa")
.convertBodyTo(String::class.java, "UTF-8")
.process(ISINProcessor.extractISINList())
.split().tokenize("\n", isinGroupSize)
.split().tokenize("\n")
.convertBodyTo(String::class.java, "UTF-8")
.process(ISINProcessor.setISINListHeaderValue())
.filter { it.message.body as String != "HU0000346663" }
.process(ISINProcessor.setISINIntervalHeaderValues())
.filter(ISINProcessor.isValidIntervalValues())
.setHeader("Content-Type", constant("application/x-www-form-urlencoded"))
.setBody()
.simple(
Expand All @@ -48,7 +43,7 @@ class InvestmentFundsRoutes constructor(
"&sortDirection=dec" +
"&separator=vesszo"
)
.log(LoggingLevel.INFO, "Downloading CSV files for the interval ('\${header.$START_DATE_HEADER_NAME}'-'\${header.$END_DATE_HEADER_NAME}') and ISIN numbers: '\${header.$ISIN_LIST_HEADER_NAME}'")
.log(LoggingLevel.INFO, "'\${header.$ISIN_LIST_HEADER_NAME}' Downloading CSV file for the interval '\${header.$START_DATE_HEADER_NAME}'-'\${header.$END_DATE_HEADER_NAME}'")
.to("https://www.bamosz.hu/bamosz-public-letoltes-portlet/data.download")
.convertBodyTo(String::class.java, "ISO-8859-2")
.convertBodyTo(String::class.java, "UTF-8")
Expand All @@ -57,35 +52,33 @@ class InvestmentFundsRoutes constructor(
quote = "\""
escape = "\\"
})
.log(LoggingLevel.INFO, "Vertically split CSV files according to ISIN numbers.")

//.log(LoggingLevel.INFO, "Vertically split CSV files according to ISIN numbers.")
.process(CsvProcessor.splitVertically())
.split().body()
.log(LoggingLevel.INFO, "Horizontally segment CSV chunks based on fund and rates data.")
//.log(LoggingLevel.INFO, "Horizontally segment CSV chunks based on fund and rates data.")
.process(CsvProcessor.splitHorizontallyAndSend())
.end()

from(FUND_DATA_ROUTE_NAME).doTry()
.log(LoggingLevel.INFO, "Processing fund data for '\${header.$ISIN_HEADER_NAME}'")
from(FUND_DATA_ROUTE_NAME).routeId("fund-data").doTry()
//.log(LoggingLevel.INFO, "Processing fund data for '\${header.$ISIN_HEADER_NAME}'")
.process(RedisProcessor.saveFundData())
.log(LoggingLevel.INFO, "Fund data processed for '\${header.$ISIN_HEADER_NAME}'")
//.log(LoggingLevel.INFO, "Fund data processed for '\${header.$ISIN_HEADER_NAME}'")
.doCatch(Exception::class.java)
.log(LoggingLevel.ERROR, "An error occurred during the processing : \${exception.message}")
.transform().simple("\${exception.message}")
.end()

from(RATE_DATA_ROUTE_NAME).doTry()
.log(LoggingLevel.INFO, "Processing rate data for '\${header.$ISIN_HEADER_NAME}'")
from(RATE_DATA_ROUTE_NAME).routeId("rate-data").doTry()
//.log(LoggingLevel.INFO, "Processing rate data for '\${header.$ISIN_HEADER_NAME}'")
.process(RedisProcessor.saveRateData())
.log(LoggingLevel.INFO, "Rate data processed for '\${header.$ISIN_HEADER_NAME}'")
//.log(LoggingLevel.INFO, "Rate data processed for '\${header.$ISIN_HEADER_NAME}'")
.doCatch(Exception::class.java)
.log(LoggingLevel.ERROR, "An error occurred during the processing : \${exception.message}")
.transform().simple("\${exception.message}")
.end()

from("direct:end-csv-processing")
from("direct:end-csv-processing").routeId("end-csv-processing")
.filter { exchange -> exchange.message.headers[START_DATE_HEADER_NAME] != null && exchange.message.headers[END_DATE_HEADER_NAME] != null }
.log("All CSV processed for (\${header.$START_DATE_HEADER_NAME}-\${header.$END_DATE_HEADER_NAME})")
.process(RedisProcessor.saveMetaData())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.github.pozo.investmentfunds.api.grabber.processors

import com.github.pozo.investmentfunds.api.grabber.InvestmentFundsRoutes.Companion.FUND_DATA_ROUTE_NAME
import com.github.pozo.investmentfunds.api.grabber.InvestmentFundsRoutes.Companion.RATE_DATA_ROUTE_NAME
import com.github.pozo.investmentfunds.api.grabber.processors.ISINProcessor.ISIN_LIST_HEADER_NAME
import com.github.pozo.investmentfunds.domain.FundHeaders
import org.apache.camel.Exchange
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -30,7 +31,7 @@ object CsvProcessor {
val firstHeaderLine = csvLines.first()
val entries = mutableMapOf<Pair<Int, Int>, VerticalPiece>()

logger.info("The CSV contains '${csvLines.size}' number of entries, and '${firstHeaderLine.size}' header columns")
logger.info("'${exchange.`in`.headers[ISIN_LIST_HEADER_NAME]}' The CSV contains '${csvLinesWithMetaRow.size}' number of lines, and '${firstHeaderLine.size}' number of columns")

val verticalIndexes: MutableList<Int> =
IntStream.range(1, firstHeaderLine.size) // skipping first "label" column
Expand Down Expand Up @@ -81,6 +82,9 @@ object CsvProcessor {
val isin = fundData[fundHeaders.indexOf(FundHeaders.ISIN.field)]
val sanitizedIsin = Regex("\\HU[0-9]{9,10}\\b").find(isin)?.value ?: isin

logger.info("'$isin' The CSV chunk contains '${fundHeaders.size}' number of fund headers, and '${fundData.size}' number of fund data")
logger.info("'$isin' The CSV chunk contains '${rateHeaders.size}' number of rate headers, and '${rateData.size}' number of rate data")

exchange.context.createProducerTemplate()
.sendBodyAndHeader(
FUND_DATA_ROUTE_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.github.pozo.investmentfunds.api.grabber.processors

import com.github.pozo.investmentfunds.api.redis.RedisService
import com.github.pozo.investmentfunds.domain.DataFlowConstants
import org.apache.camel.Exchange
import org.slf4j.LoggerFactory
import redis.clients.jedis.resps.Tuple
import java.text.SimpleDateFormat
import java.util.*
import java.util.regex.Matcher
Expand All @@ -15,11 +17,13 @@ object ISINProcessor {

private const val ISIN_PATTERN = "HU[0-9]{10}"

const val ISIN_LIST_HEADER_NAME = "isin-list"
const val ISIN_LIST_HEADER_NAME = "isin"
const val START_DATE_HEADER_NAME = "start-date"
const val END_DATE_HEADER_NAME = "end-date"

private val format = SimpleDateFormat(DataFlowConstants.GRAB_DATA_COMMAND_DATE_FORMAT.field)
private val format = SimpleDateFormat(DataFlowConstants.DOMAIN_DATE_FORMAT.field)

private val redisEntryFormat = SimpleDateFormat(DataFlowConstants.RATES_KEY_DATE_FORMAT.field)

fun extractISINList(): (exchange: Exchange) -> Unit = { exchange ->
val htmlContent: String = exchange.message.body as String
Expand All @@ -36,27 +40,19 @@ object ISINProcessor {
exchange.message.body = isinSet.joinToString(separator = "\n")
}

fun setISINListHeaderValue(): (exchange: Exchange) -> Unit = { exchange ->
val message = exchange.getIn().body as String
exchange.message.setHeader(ISIN_LIST_HEADER_NAME, message.replace('\n', ','))
}

fun isValidIntervalValues(): (exchange: Exchange) -> Boolean {
return { exchange ->
val message: String = exchange.message.body as String

try {
val (start, end) = message.trim().split(DataFlowConstants.GRAB_DATA_COMMAND_SEPARATOR.field)
val startDate = format.parse(start)
val endDate = format.parse(end)
logger.info("The extracted start date is '$startDate' and end date is '$endDate'")
val startDate = format.parse(exchange.message.headers[START_DATE_HEADER_NAME] as String)
val endDate = format.parse(exchange.message.headers[END_DATE_HEADER_NAME] as String)
logger.info("'${exchange.`in`.headers[ISIN_LIST_HEADER_NAME]}' The extracted start date is '$startDate' and end date is '$endDate'")

!startDate.before(format.parse(DataFlowConstants.START_YEAR_DATE.field))
&& !startDate.after(endDate)
&& !endDate.after(Date())
&& !isSameDay(startDate, endDate)
} catch (e: Exception) {
logger.error("The given interval values are not correct '$message'", e)
logger.error("'${exchange.`in`.headers[ISIN_LIST_HEADER_NAME]}' The given interval values are not correct '${exchange.message}'", e)
false
}
}
Expand All @@ -74,12 +70,37 @@ object ISINProcessor {
}

fun setISINIntervalHeaderValues(): (exchange: Exchange) -> Unit = { exchange ->
val message: String = exchange.message.body as String
val (startDate, endDate) = message.trim().split(DataFlowConstants.GRAB_DATA_COMMAND_SEPARATOR.field)
val isin: String = exchange.message.body as String
val mostRecentEntry = getMostRecentEntryFor(isin)

if(mostRecentEntry == null) {
logger.info("'$isin' There is no entry for this ISIN in redis")
exchange.message.setHeader(ISIN_LIST_HEADER_NAME, isin)
exchange.message.setHeader(START_DATE_HEADER_NAME, DataFlowConstants.START_YEAR_DATE.field)
exchange.message.setHeader(END_DATE_HEADER_NAME, format.format(Date()))
} else {
val mostRecentEntryDate = redisEntryFormat.parse(extractDate(mostRecentEntry.element))
logger.info("'$isin' The last entry date for this ISIN is '$mostRecentEntryDate'")
exchange.message.setHeader(ISIN_LIST_HEADER_NAME, isin)
exchange.message.setHeader(START_DATE_HEADER_NAME, format.format(addOneDay(mostRecentEntryDate)))
exchange.message.setHeader(END_DATE_HEADER_NAME, format.format(Date()))
}
}

private fun addOneDay(date: Date): Date {
val calendar = Calendar.getInstance()
calendar.time = date
calendar.add(Calendar.DAY_OF_YEAR, 1)
return calendar.time
}

private fun getMostRecentEntryFor(isin: String): Tuple? {
return RedisService.jedis.zrevrangeByScoreWithScores("rate:keys#$isin", "+inf", "-inf", 0, 1)
.firstOrNull()
}

logger.info("The extracted start date is '$startDate' and end date is '$endDate'")
exchange.message.setHeader(START_DATE_HEADER_NAME, startDate)
exchange.message.setHeader(END_DATE_HEADER_NAME, endDate)
private fun extractDate(input: String): String? {
return input.split("#").takeIf { it.size > 1 }?.get(1)
}

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
package com.github.pozo.investmentfunds.api.grabber.processors

import com.github.pozo.investmentfunds.api.grabber.processors.CsvProcessor.ISIN_HEADER_NAME
import com.github.pozo.investmentfunds.api.grabber.processors.ISINProcessor.END_DATE_HEADER_NAME
import com.github.pozo.investmentfunds.domain.DataFlowConstants
import com.github.pozo.investmentfunds.domain.FundHeaders
import com.github.pozo.investmentfunds.domain.RateHeaders
import com.github.pozo.investmentfunds.domain.RedisHashKey
import org.apache.camel.Exchange
import org.slf4j.LoggerFactory
import redis.clients.jedis.JedisPooled

object RedisProcessor {

private val jedis = JedisPooled("investmentfunds-redis", 6379)
private val logger = LoggerFactory.getLogger(RedisProcessor::class.java)

fun saveMetaData(): (exchange: Exchange) -> Unit = { exchange ->
val periodEnd = exchange.message.getHeader(END_DATE_HEADER_NAME, String::class.java)
jedis.set(DataFlowConstants.GRAB_DATA_LATEST_DATE_KEY.field, periodEnd)
}
private val jedis = JedisPooled("investmentfunds-redis", 6379)

fun saveFundData(): (exchange: Exchange) -> Unit = { exchange ->
val body = exchange.getIn().getBody(Pair::class.java) as Pair<List<String>, List<String>>
Expand All @@ -30,6 +26,7 @@ object RedisProcessor {
.associate { it.name.lowercase() to data[header.indexOf(it.field)] }

jedis.hset("fund#$isin", keyValuePairs)
logger.info("'$isin' Saved '${keyValuePairs.size}' number of fund entries")
}

fun saveRateData(): (exchange: Exchange) -> Unit = { exchange ->
Expand All @@ -38,6 +35,7 @@ object RedisProcessor {
val isin = exchange.message.getHeader(ISIN_HEADER_NAME, String::class.java)
val header = body.first
val data = body.second
var numberOfSavedEntries = 0

// filter non empty fields
jedis.pipelined().use { pipeline ->
Expand All @@ -55,8 +53,10 @@ object RedisProcessor {
RedisHashKey.calculateScore(entry[header.indexOf(RateHeaders.DATE.field)]),
rateKey
)
numberOfSavedEntries++
}
pipeline.sync()
}
logger.info("'$isin' Saved '${numberOfSavedEntries}' number of rate entries")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ enum class DataFlowConstants(val field: String) {
START_YEAR("1992"),// there is no data before this date
START_YEAR_DATE("1992.01.01"),

GRAB_DATA_COMMAND_DATE_FORMAT("yyyy.MM.dd"),
GRAB_DATA_COMMAND_SEPARATOR(","),
GRAB_DATA_LATEST_DATE_KEY("meta#last-successful-grabbing-ending-date"),

DOMAIN_DATE_FORMAT("yyyy.MM.dd"),
RATES_KEY_DATE_FORMAT("yyyy/MM/dd")
}
4 changes: 1 addition & 3 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
server.error.whitelabel.enabled=false
springdoc.api-docs.path=/

processors.isin-group-size=10
springdoc.api-docs.path=/

0 comments on commit f485ac9

Please sign in to comment.