Skip to content

Commit

Permalink
Merge pull request #7 from gavares/issue_6
Browse files Browse the repository at this point in the history
fix issue #6: asyncReadHighestSequenceNr queries snapshot and journal tbl
  • Loading branch information
okumin committed Sep 25, 2015
2 parents 0087882 + cdebbac commit 4d084b8
Showing 1 changed file with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ private[sqlasync] trait ScalikeJDBCWriteJournal extends AsyncWriteJournal with A
SQLSyntax.createUnsafely(tableName)
}

// Used in asyncReadHighestSequenceNr query
private[this] lazy val snapshotsTable = {
val tableName = extension.config.snapshotTableName
SQLSyntaxSupportFeature.verifyTableName(tableName)
SQLSyntax.createUnsafely(tableName)
}


override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
log.debug("Write messages, {}", messages)
val batch = ListBuffer.empty[SQLSyntax]
Expand Down Expand Up @@ -70,7 +78,15 @@ private[sqlasync] trait ScalikeJDBCWriteJournal extends AsyncWriteJournal with A
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
log.debug("Read the highest sequence number, persistenceId = {}, fromSequenceNr = {}", persistenceId, fromSequenceNr)
sessionProvider.withPool { implicit session =>
val sql = sql"SELECT sequence_nr FROM $table WHERE persistence_id = $persistenceId ORDER BY sequence_nr DESC LIMIT 1"
val sql = sql"""
SELECT
coalesce(max(max_ids.seq_nr), 0)
FROM (
SELECT max(sequence_nr) seq_nr FROM $table WHERE persistence_id = $persistenceId
UNION
SELECT max(sequence_nr) seq_nr FROM $snapshotsTable WHERE persistence_id = $persistenceId
) AS max_ids
"""
log.debug("Execute {} binding persistence_id = {} and sequence_nr = {}", sql.statement, persistenceId, fromSequenceNr)
sql.map(_.longOpt(1)).single().future().map(_.flatten.getOrElse(0L))
}
Expand Down

0 comments on commit 4d084b8

Please sign in to comment.