Skip to content

Commit

Permalink
Merge pull request #117 from SelfImpr001/master
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyou2 authored Jun 22, 2020
2 parents d657d19 + af7676b commit 8d85b81
Showing 1 changed file with 40 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,39 +80,62 @@ boolean updateMsgOffset(int jobId, Properties props, Logger log, String[] consum
boolean result = false;
String vNewMsgID = "-1";
PreparedStatement updatePstmt = null;
PreparedStatement pstmtForGetID = null;
Connection msgConn = null;
vNewMsgID = setConsumedMsg(props,log,consumedMsgInfo);
try {
if(StringUtils.isNotEmpty(vNewMsgID) && StringUtils.isNotBlank(vNewMsgID) && !"-1".equals(vNewMsgID)){
msgConn = getEventCheckerConnection(props,log);
if(msgConn == null) return false;
int vProcessID = jobId;
String vReceiveTime = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");;
String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END";
log.info("last message offset {} is:" + lastMsgId);
updatePstmt = msgConn.prepareCall(sqlForUpdateMsg);
updatePstmt.setString(1, receiver);
updatePstmt.setString(2, topic);
updatePstmt.setString(3, msgName);
updatePstmt.setString(4, vReceiveTime);
updatePstmt.setString(5, vNewMsgID);
int updaters = updatePstmt.executeUpdate();
log.info("updateMsgOffset successful {} update result is:" + updaters);
if(updaters != 0){
log.info("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID);
//return true after update success
result = true;
msgConn.setAutoCommit(false);
String sqlForReadMsgID = "SELECT msg_id FROM event_status WHERE receiver=? AND topic=? AND msg_name=? for update";
pstmtForGetID = msgConn.prepareCall(sqlForReadMsgID);
pstmtForGetID.setString(1, receiver);
pstmtForGetID.setString(2, topic);
pstmtForGetID.setString(3, msgName);
ResultSet rs = pstmtForGetID.executeQuery();
String nowLastMsgId = rs.last()==true ? rs.getString("msg_id"):"0";
log.info("receive message successfully , Now check to see if the latest offset has changed ,nowLastMsgId is {} " + nowLastMsgId);
if("0".equals(nowLastMsgId) || nowLastMsgId.equals(lastMsgId)){

int vProcessID = jobId;
String vReceiveTime = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");;
String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END";
log.info("last message offset {} is:" + lastMsgId);
updatePstmt = msgConn.prepareCall(sqlForUpdateMsg);
updatePstmt.setString(1, receiver);
updatePstmt.setString(2, topic);
updatePstmt.setString(3, msgName);
updatePstmt.setString(4, vReceiveTime);
updatePstmt.setString(5, vNewMsgID);
int updaters = updatePstmt.executeUpdate();
log.info("updateMsgOffset successful {} update result is:" + updaters);
if(updaters != 0){
log.info("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID);
//return true after update success
result = true;
}else{
log.info("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID);
result = false;
}
}else{
log.info("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID);
log.info("the latest offset has changed , Keep waiting for the signal");
result = false;
}
msgConn.commit();
}else{
result = false;
}
}catch (SQLException e){
log.error("Error update Msg Offset" + e);
try {
msgConn.rollback();
} catch (SQLException ex) {
log.error("transaction rollback failed " + e);
}
return false;
}finally {
closeQueryStmt(pstmtForGetID, log);
closeQueryStmt(updatePstmt, log);
closeConnection(msgConn, log);
}
Expand Down

0 comments on commit 8d85b81

Please sign in to comment.