Skip to content

Commit

Permalink
GEOMESA-3287 Fix JDBC connection re-load for UpdateGeoMesaRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Jul 27, 2023
1 parent 03d995b commit 61c2b44
Showing 1 changed file with 92 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.geomesa.nifi.datastore.processor

import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement
import org.apache.nifi.annotation.behavior.{InputRequirement, SupportsBatching, WritesAttribute, WritesAttributes}
import org.apache.nifi.annotation.documentation.{CapabilityDescription, Tags}
Expand All @@ -19,22 +20,23 @@ import org.apache.nifi.processor.util.StandardValidators
import org.apache.nifi.processor.{ProcessContext, ProcessSession}
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.record.Record
import org.geomesa.nifi.datastore.processor.UpdateGeoMesaRecord.{AttributeFilter, FidFilter}
import org.geomesa.nifi.datastore.processor.UpdateGeoMesaRecord.{AttributeFilter, FidFilter, SchemaCache}
import org.geomesa.nifi.datastore.processor.mixins.DataStoreProcessor
import org.geomesa.nifi.datastore.processor.records.{GeometryEncoding, OptionExtractor, SimpleFeatureRecordConverter}
import org.geomesa.nifi.datastore.services.DataStoreService
import org.geotools.data.{DataStore, DataUtilities, Transaction}
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.filter.filterToString
import org.locationtech.geomesa.utils.concurrent.ExitingExecutor
import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose}
import org.opengis.feature.simple.SimpleFeature
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.opengis.filter.Filter

import java.io.InputStream
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.{LinkedBlockingQueue, ScheduledThreadPoolExecutor, TimeUnit}
import scala.annotation.tailrec
import scala.util.Try
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}


@Tags(Array("geomesa", "geo", "update", "records", "geotools"))
Expand All @@ -59,6 +61,7 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
private var service: DataStoreService = _
private var factory: RecordReaderFactory = _
private var options: OptionExtractor = _
private var schemas: SchemaCache = _

private val stores = new LinkedBlockingQueue[DataStore]()

Expand All @@ -70,6 +73,7 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
logger.info("Initializing")

service = getDataStoreService(context)
schemas = new SchemaCache(service)
factory = context.getProperty(RecordReader).asControllerService(classOf[RecordReaderFactory])
options = OptionExtractor(context, GeometryEncoding.Wkt)

Expand All @@ -83,10 +87,6 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
}

val fullFlowFileName = fullName(file)
val ds = stores.poll() match {
case null => service.loadDataStore()
case ds => ds
}
try {
logger.debug(s"Running ${getClass.getName} on file $fullFlowFileName")

Expand All @@ -102,7 +102,7 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
val converter = SimpleFeatureRecordConverter(reader.getSchema, opts)
val typeName = converter.sft.getTypeName

val existing = Try(ds.getSchema(typeName)).getOrElse(null)
val existing = schemas.getSchema(typeName)
if (existing == null) {
throw new IllegalStateException(s"Schema '$typeName' does not exist in the data store")
}
Expand All @@ -129,42 +129,72 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
nextRecord
}

var record = nextRecord
while (record != null) {
try {
val sf = converter.convert(record)
val filter = filterFactory(sf)
var ds = stores.poll() match {
case null => service.loadDataStore()
case ds => ds
}

try {
val records = Iterator.continually(nextRecord).takeWhile(_ != null)
val features = records.flatMap { record =>
try {
WithClose(ds.getFeatureWriter(typeName, filter, Transaction.AUTO_COMMIT)) { writer =>
if (!writer.hasNext) {
logger.warn(s"Filter does not match any features, skipping update: ${filterToString(filter)}")
failure += 1L
} else {
do {
val toWrite = writer.next()
names.foreach(n => toWrite.setAttribute(n, sf.getAttribute(n)))
if (opts.fidField.isDefined) {
toWrite.getUserData.put(Hints.PROVIDED_FID, sf.getID)
}
if (opts.visField.isDefined) {
sf.visibility.foreach(toWrite.visibility = _)
}
writer.write()
success += 1L
} while (writer.hasNext)
}
val sf = converter.convert(record)
val filter = filterFactory(sf)
if (ds == null) {
failure += 1L
Iterator.empty
} else {
Iterator.single(sf -> filter)
}
} catch {
case NonFatal(e) =>
failure += 1L
logger.error(s"Error writing feature to store: '${DataUtilities.encodeFeature(sf)}'", e)
logger.error(s"Error converting record to feature: '${record.toMap.asScala.mkString(",")}'", e)
Iterator.empty
}
} catch {
case NonFatal(e) =>
failure += 1L
logger.error(s"Error converting record to feature: '${record.toMap.asScala.mkString(",")}'", e)
}
record = nextRecord

features.foreach { case (sf, filter) =>
Try(ds.getFeatureWriter(typeName, filter, Transaction.AUTO_COMMIT)) match {
case Failure(e) =>
// if we can't get the writer, that usually means the datastore has become invalid
failure += 1L
logger.error(s"Error getting feature writer:", e)
service.dispose(ds)
ds = null // this will filter out and fail any remaining records in our iterator, above

case Success(writer) =>
try {
if (!writer.hasNext) {
logger.warn(s"Filter does not match any features, skipping update: ${filterToString(filter)}")
failure += 1L
} else {
do {
val toWrite = writer.next()
names.foreach(n => toWrite.setAttribute(n, sf.getAttribute(n)))
if (opts.fidField.isDefined) {
toWrite.getUserData.put(Hints.PROVIDED_FID, sf.getID)
}
if (opts.visField.isDefined) {
sf.visibility.foreach(toWrite.visibility = _)
}
writer.write()
success += 1L
} while (writer.hasNext)
}
} catch {
case NonFatal(e) =>
failure += 1L
logger.error(s"Error writing feature to store: '${DataUtilities.encodeFeature(sf)}'", e)
} finally {
CloseWithLogging(writer)
}
}
}
} finally {
if (ds != null) {
stores.put(ds)
}
}
}
}
Expand All @@ -180,8 +210,6 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
case NonFatal(e) =>
logger.error(s"Error processing file $fullFlowFileName:", e)
session.transfer(file, Relationships.FailureRelationship)
} finally {
stores.put(ds)
}
}

Expand Down Expand Up @@ -226,6 +254,30 @@ object UpdateGeoMesaRecord {
override def apply(f: SimpleFeature): Filter = ff.equals(prop, ff.literal(f.getAttribute(name)))
}

private class SchemaCache(service: DataStoreService) {

private val refresher = ExitingExecutor(new ScheduledThreadPoolExecutor(1))

private val schemaCheckCache =
Caffeine.newBuilder().build[String, SimpleFeatureType](
new CacheLoader[String, SimpleFeatureType]() {
override def load(typeName: String): SimpleFeatureType = {
// we schedule a refresh, vs using the built-in Caffeine refresh which will only refresh after a
// request. If there are not very many requests, then the deferred value will always be out of date
refresher.schedule(new Runnable() { override def run(): Unit = refresh(typeName) }, 1, TimeUnit.HOURS)
val store = service.loadDataStore()
try { Try(store.getSchema(typeName)).getOrElse(null) } finally {
service.dispose(store)
}
}
}
)

private def refresh(typeName: String): Unit = schemaCheckCache.refresh(typeName)

def getSchema(typeName: String): SimpleFeatureType = schemaCheckCache.get(typeName)
}

object Properties {
val LookupCol: PropertyDescriptor =
new PropertyDescriptor.Builder()
Expand Down

0 comments on commit 61c2b44

Please sign in to comment.