Skip to content

Commit

Permalink
Merge pull request #40 from networknt/refactor
Browse files Browse the repository at this point in the history
Refactor polling cdc for oracle as required by user
  • Loading branch information
GavinChenYan authored Dec 15, 2017
2 parents 1ae8df0 + f19f732 commit f4a497a
Show file tree
Hide file tree
Showing 22 changed files with 770 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public class CdcConfig {
String leadershipLockPath;
String oldDbHistoryTopicName;

int maxEventsPerPolling;
int maxAttemptsForPolling;
int pollingRetryIntervalInMilliseconds;
int pollingIntervalInMilliseconds;

public CdcConfig() {
binlogClientId = System.currentTimeMillis();
}
Expand Down Expand Up @@ -147,4 +152,36 @@ public String getOldDbHistoryTopicName() {
public void setOldDbHistoryTopicName(String oldDbHistoryTopicName) {
this.oldDbHistoryTopicName = oldDbHistoryTopicName;
}

public int getMaxEventsPerPolling() {
return maxEventsPerPolling;
}

public void setMaxEventsPerPolling(int maxEventsPerPolling) {
this.maxEventsPerPolling = maxEventsPerPolling;
}

public int getMaxAttemptsForPolling() {
return maxAttemptsForPolling;
}

public void setMaxAttemptsForPolling(int maxAttemptsForPolling) {
this.maxAttemptsForPolling = maxAttemptsForPolling;
}

public int getPollingRetryIntervalInMilliseconds() {
return pollingRetryIntervalInMilliseconds;
}

public void setPollingRetryIntervalInMilliseconds(int pollingRetryIntervalInMilliseconds) {
this.pollingRetryIntervalInMilliseconds = pollingRetryIntervalInMilliseconds;
}

public int getPollingIntervalInMilliseconds() {
return pollingIntervalInMilliseconds;
}

public void setPollingIntervalInMilliseconds(int pollingIntervalInMilliseconds) {
this.pollingIntervalInMilliseconds = pollingIntervalInMilliseconds;
}
}
6 changes: 2 additions & 4 deletions eventuate-cdc-connector-polling/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand All @@ -110,11 +109,10 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

public abstract class AbstractPollingCdcProcessorT extends CdcProcessorTest {

private PollingDao<PublishedEventBean, PublishedEvent, String> pollingDao = SingletonServiceFactory.getBean(PollingDao.class);
private PollingDao pollingDao = SingletonServiceFactory.getBean(PollingDao.class);

@Override
protected CdcProcessor<PublishedEvent> createCdcProcessor() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.networknt.eventuate.cdc.polling;

import com.networknt.config.Config;
import com.networknt.eventuate.jdbc.EventuateSchema;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.kafka.producer.EventuateKafkaProducer;
import com.networknt.eventuate.server.common.*;
import com.networknt.service.SingletonServiceFactory;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import javax.sql.DataSource;


public class PollingCdcServiceInitializer {

public static String CDC_CONFIG_NAME = "cdc";
public static CdcConfig cdcConfig = (CdcConfig) Config.getInstance().getJsonObjectConfig(CDC_CONFIG_NAME, CdcConfig.class);
public static String KAFKA_CONFIG_NAME = "kafka";
public static KafkaConfig kafkaConfig = (KafkaConfig) Config.getInstance().getJsonObjectConfig(KAFKA_CONFIG_NAME, KafkaConfig.class);

public EventuateSchema eventuateSchema() {
return new EventuateSchema();
}



public PollingDao pollingDao() {
DataSource ds = (DataSource) SingletonServiceFactory.getBean(DataSource.class);
EventPollingDataProvider pollingDataProvider= (EventPollingDataProvider) SingletonServiceFactory.getBean(EventPollingDataProvider.class);

return new PollingDao (pollingDataProvider, ds,
cdcConfig.getMaxEventsPerPolling(),
cdcConfig.getMaxAttemptsForPolling(),
cdcConfig.getPollingRetryIntervalInMilliseconds());
}

public EventuateKafkaProducer eventuateKafkaProducer() {
return new EventuateKafkaProducer();
}



public CdcProcessor<PublishedEvent> pollingCdcProcessor() {
PollingDao pollingDao = SingletonServiceFactory.getBean(PollingDao.class);
return new PollingCdcProcessor<>(pollingDao, cdcConfig.getPollingIntervalInMilliseconds());
}

public CdcKafkaPublisher<PublishedEvent> pollingCdcKafkaPublisher() {
PublishingStrategy<PublishedEvent> publishingStrategy = SingletonServiceFactory.getBean(PublishingStrategy.class);
return new PollingCdcKafkaPublisher<>( kafkaConfig.getBootstrapServers(), publishingStrategy);
}

public CuratorFramework curatorFramework() {
String connectionString = cdcConfig.getZookeeper();
return makeStartedCuratorClient(connectionString);
}

public EventTableChangesToAggregateTopicTranslator<PublishedEvent> pollingEventTableChangesToAggregateTopicTranslator() {
CdcKafkaPublisher<PublishedEvent> mySQLCdcKafkaPublisher = SingletonServiceFactory.getBean(CdcKafkaPublisher.class);
CdcProcessor<PublishedEvent> mySQLCdcProcessor = SingletonServiceFactory.getBean(CdcProcessor.class);
CuratorFramework curatorFramework = SingletonServiceFactory.getBean(CuratorFramework.class);

return new EventTableChangesToAggregateTopicTranslator<>(mySQLCdcKafkaPublisher,
mySQLCdcProcessor,
curatorFramework,
cdcConfig);
}

static CuratorFramework makeStartedCuratorClient(String connectionString) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.
builder().retryPolicy(retryPolicy)
.connectString(connectionString)
.build();
client.start();
return client;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.networknt.eventuate.cdc.polling;

import com.google.common.collect.ImmutableMap;
import com.networknt.eventuate.server.common.PublishedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -16,13 +18,13 @@
public class PollingDao<EVENT_BEAN, EVENT, ID> {
private Logger logger = LoggerFactory.getLogger(getClass());

private PollingDataProvider<EVENT_BEAN, EVENT, ID> pollingDataParser;
private EventPollingDataProvider pollingDataParser;
private DataSource dataSource;
private int maxEventsPerPolling;
private int maxAttemptsForPolling;
private int pollingRetryIntervalInMilliseconds;

public PollingDao(PollingDataProvider<EVENT_BEAN, EVENT, ID> pollingDataParser,
public PollingDao(EventPollingDataProvider pollingDataParser,
DataSource dataSource,
int maxEventsPerPolling,
int maxAttemptsForPolling,
Expand All @@ -47,28 +49,62 @@ public void setMaxEventsPerPolling(int maxEventsPerPolling) {
this.maxEventsPerPolling = maxEventsPerPolling;
}

public List<EVENT> findEventsToPublish() {
/*
String query = String.format("SELECT * FROM %s WHERE %s = 0 ORDER BY %s ASC LIMIT :limit",
public List<PublishedEvent> findEventsToPublish() {

String query = String.format("SELECT * FROM %s WHERE %s = 0 and ROWNUM <= ? ORDER BY %s ASC",
pollingDataParser.table(), pollingDataParser.publishedField(), pollingDataParser.idField());

List<EVENT_BEAN> messageBeans = handleConnectionLost(() -> namedParameterJdbcTemplate.query(query,
ImmutableMap.of("limit", maxEventsPerPolling), new BeanPropertyRowMapper(pollingDataParser.eventBeanClass())));
List<PublishedEventBean> messageBeans = handleConnectionLost(() -> handleFindQuery(query, maxEventsPerPolling));

return messageBeans.stream().map(pollingDataParser::transformEventBeanToEvent).collect(Collectors.toList());
*/
return null;

}

public void markEventsAsPublished(List<EVENT> events) {
private List<PublishedEventBean> handleFindQuery(String query, int maxEventsPerPolling) {
logger.info("cdc polling query:" + query);
System.out.println("cdc polling query:" + query);


List<ID> ids = events.stream().map(message -> pollingDataParser.getId(message)).collect(Collectors.toList());
List<PublishedEventBean> events = new ArrayList<>();
try (final Connection connection = dataSource.getConnection()) {
PreparedStatement stmt = connection.prepareStatement(query);
stmt.setInt(1, maxEventsPerPolling);
ResultSet rs = stmt.executeQuery();

String query = String.format("UPDATE %s SET %s = 1 WHERE %s in (:ids)",
pollingDataParser.table(), pollingDataParser.publishedField(), pollingDataParser.idField());
query.replaceAll(":ids", "%s");
query = String.format(query, preparePlaceHolders(ids.size()));
while (rs.next()) {
PublishedEventBean publishedEventBean = new PublishedEventBean(rs.getString("event_id"), rs.getString("event_type"),
rs.getString("event_data"), rs.getString("entity_type"), rs.getString("entity_id"),
rs.getString("triggering_event"), rs.getString("metadata"));
events.add(publishedEventBean);
}
} catch (SQLException e) {
logger.error("SqlException:", e);
}
return events;
}

//handleConnectionLost(() -> namedParameterJdbcTemplate.update(query, ImmutableMap.of("ids", ids)));
public void markEventsAsPublished(List<PublishedEvent> events) {

List<String> ids = events.stream().map(message -> pollingDataParser.getId(message)).collect(Collectors.toList());

String query = String.format("UPDATE %s SET %s = 1 WHERE %s in (%s)",
pollingDataParser.table(), pollingDataParser.publishedField(), pollingDataParser.idField(), preparePlaceHolders(ids.size()));

handleConnectionLost(() -> handleUpdatePublished(query, ids));
}

private int handleUpdatePublished (String query, List<String> ids) {
logger.info("mark Events As Published query:" + query);
int count = 0;
try (final Connection connection = dataSource.getConnection()) {
PreparedStatement stmt = connection.prepareStatement(query);
setValues(stmt, ids.toArray());
count = stmt.executeUpdate();
System.out.println("result:" + count);
} catch (SQLException e) {
logger.error("SqlException:", e);
}
return count;
}

private <T> T handleConnectionLost(Callable<T> query) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.networknt.eventute.cdc.polling;


import com.networknt.eventuate.cdc.polling.EventPollingDataProvider;
import com.networknt.eventuate.cdc.polling.PollingDao;

import com.networknt.eventuate.server.common.PublishedEvent;
import com.networknt.service.SingletonServiceFactory;
import org.h2.tools.RunScript;
import org.junit.BeforeClass;
import org.junit.Test;

import javax.sql.DataSource;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertTrue;


/**
* Junit test class for MessageProducerJdbcImpl.
* use H2 test database for data source
*/
public class PollingDaoTest {

public static DataSource ds;

static {
ds = (DataSource) SingletonServiceFactory.getBean(DataSource.class);
try (Connection connection = ds.getConnection()) {
// Runscript doesn't work need to execute batch here.
String schemaResourceName = "/eventuate_sourcing_ddl.sql";
InputStream in = PollingDaoTest.class.getResourceAsStream(schemaResourceName);

if (in == null) {
throw new RuntimeException("Failed to load resource: " + schemaResourceName);
}
InputStreamReader reader = new InputStreamReader(in);
RunScript.execute(connection, reader);

} catch (SQLException e) {
e.printStackTrace();
}
}

private EventPollingDataProvider pollingDataProvider= (EventPollingDataProvider) SingletonServiceFactory.getBean(EventPollingDataProvider.class);
PollingDao pollingDao = new PollingDao(pollingDataProvider, ds, 5,5, 100);


@BeforeClass
public static void setUp() {

}

@Test
public void testDao() {
List<PublishedEvent> result= pollingDao.findEventsToPublish();
assertTrue(result.size()>0);

}

@Test
public void testDao2() {

List<PublishedEvent> events = new ArrayList<>();
PublishedEvent event1 = new PublishedEvent();
event1.setId("111");
PublishedEvent event2 = new PublishedEvent();
event2.setId("222");
events.add(event1);
events.add(event2);
pollingDao.markEventsAsPublished(events);


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
singletons:
- javax.sql.DataSource:
- com.zaxxer.hikari.HikariDataSource:
DriverClassName: org.h2.jdbcx.JdbcDataSource
jdbcUrl: jdbc:h2:~/test
username: sa
password: sa
- com.networknt.eventuate.cdc.polling.EventPollingDataProvider:
- com.networknt.eventuate.cdc.polling.EventPollingDataProvider
Loading

0 comments on commit f4a497a

Please sign in to comment.