Skip to content

Commit

Permalink
GEOMESA-3287 Fix JDBC connection re-load
Browse files Browse the repository at this point in the history
* This allows NiFi to recover automatically if a PostGIS database
  goes down and subsequently recovers
* DataStores are re-loaded when ever a "core" operation (like getting
  a feature writer) throws an exception
  • Loading branch information
elahrvivaz committed Jul 26, 2023
1 parent bc40ca5 commit 03d995b
Show file tree
Hide file tree
Showing 14 changed files with 315 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.nifi.processor.io.InputStreamCallback
import org.apache.nifi.processor.util.StandardValidators
import org.geomesa.nifi.datastore.processor.CompatibilityMode.CompatibilityMode
import org.geomesa.nifi.datastore.processor.mixins.{DataStoreIngestProcessor, FeatureTypeProcessor, FeatureWriters}
import org.geotools.data.DataStore
import org.geomesa.nifi.datastore.services.DataStoreService
import org.geotools.util.Converters
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.features.ScalaSimpleFeature
Expand Down Expand Up @@ -57,27 +57,29 @@ class AvroToPutGeoMesa extends DataStoreIngestProcessor with FeatureTypeProcesso
// noinspection ScalaDeprecation
override protected def createIngest(
context: ProcessContext,
dataStore: DataStore,
service: DataStoreService,
writers: FeatureWriters,
mode: CompatibilityMode): IngestProcessor = {
val useProvidedFid = context.getProperty(UseProvidedFid).getValue.toBoolean
new AvroIngest(dataStore, writers, mode, useProvidedFid)
new AvroIngest(service, writers, mode, useProvidedFid)
}

/**
* GeoAvro ingest
*
* @param store data store
* @param service data store service
* @param writers feature writers
* @param mode field match mode
* @param useProvidedFid use provided fid
*/
class AvroIngest(
store: DataStore,
service: DataStoreService,
writers: FeatureWriters,
mode: CompatibilityMode,
useProvidedFid: Boolean
) extends IngestProcessor(store, writers, mode) {
) extends IngestProcessor(service, writers, mode) {

private val store = service.loadDataStore()

override def ingest(
context: ProcessContext,
Expand Down Expand Up @@ -136,6 +138,12 @@ class AvroToPutGeoMesa extends DataStoreIngestProcessor with FeatureTypeProcesso
IngestResult(success, failure)
}

override def close(): Unit = {
try { service.dispose(store) } finally {
super.close()
}
}

private def checkSchemaAndMapping(
sft: SimpleFeatureType,
shouldUpdate: Boolean): Option[SimpleFeature => SimpleFeature] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.nifi.processor._
import org.geomesa.nifi.datastore.processor.CompatibilityMode.CompatibilityMode
import org.geomesa.nifi.datastore.processor.mixins.ConvertInputProcessor.ConverterCallback
import org.geomesa.nifi.datastore.processor.mixins.{ConvertInputProcessor, DataStoreIngestProcessor, FeatureWriters}
import org.geotools.data._
import org.geomesa.nifi.datastore.services.DataStoreService
import org.locationtech.geomesa.utils.geotools.{SftArgResolver, SftArgs}
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}

Expand Down Expand Up @@ -50,10 +50,10 @@ class PutGeoMesa extends DataStoreIngestProcessor with ConvertInputProcessor {

override protected def createIngest(
context: ProcessContext,
dataStore: DataStore,
service: DataStoreService,
writers: FeatureWriters,
mode: CompatibilityMode): IngestProcessor = {
val ingest = new ConverterIngest(dataStore, writers, mode)
val ingest = new ConverterIngest(service, writers, mode)
// due to validation, should be all Rights
ingest.init(PutGeoMesa.initSchemas(context).map { case Right(sft) => sft })
ingest
Expand All @@ -65,12 +65,12 @@ class PutGeoMesa extends DataStoreIngestProcessor with ConvertInputProcessor {
/**
* Converter ingest
*
* @param store data store
* @param service data store service
* @param writers feature writers
* @param mode schema compatibility mode
*/
class ConverterIngest(store: DataStore, writers: FeatureWriters, mode: CompatibilityMode)
extends IngestProcessor(store, writers, mode) {
class ConverterIngest(service: DataStoreService, writers: FeatureWriters, mode: CompatibilityMode)
extends IngestProcessor(service, writers, mode) {

def init(sfts: Seq[SimpleFeatureType]): Unit = sfts.foreach(checkSchema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.geomesa.nifi.datastore.processor.PutGeoMesaRecord.CountHolder
import org.geomesa.nifi.datastore.processor.mixins.{DataStoreIngestProcessor, FeatureWriters}
import org.geomesa.nifi.datastore.processor.records.Properties._
import org.geomesa.nifi.datastore.processor.records.{GeometryEncoding, OptionExtractor, SimpleFeatureRecordConverter}
import org.geomesa.nifi.datastore.services.DataStoreService
import org.geotools.data._
import org.locationtech.geomesa.utils.io.WithClose

Expand All @@ -47,29 +48,29 @@ class PutGeoMesaRecord extends DataStoreIngestProcessor {

override protected def createIngest(
context: ProcessContext,
dataStore: DataStore,
service: DataStoreService,
writers: FeatureWriters,
mode: CompatibilityMode): IngestProcessor = {
val factory = context.getProperty(RecordReader).asControllerService(classOf[RecordReaderFactory])
val options = OptionExtractor(context, GeometryEncoding.Wkt)
new RecordIngest(dataStore, writers, factory, options, mode)
new RecordIngest(service, writers, factory, options, mode)
}

/**
* Record based ingest
*
* @param store data store
* @param service data store service
* @param writers feature writers
* @param recordReaderFactory record reader factory
* @param options converter options
*/
class RecordIngest(
store: DataStore,
service: DataStoreService,
writers: FeatureWriters,
recordReaderFactory: RecordReaderFactory,
options: OptionExtractor,
mode: CompatibilityMode
) extends IngestProcessor(store, writers, mode) {
) extends IngestProcessor(service, writers, mode) {

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.nifi.serialization.record.Record
import org.geomesa.nifi.datastore.processor.UpdateGeoMesaRecord.{AttributeFilter, FidFilter}
import org.geomesa.nifi.datastore.processor.mixins.DataStoreProcessor
import org.geomesa.nifi.datastore.processor.records.{GeometryEncoding, OptionExtractor, SimpleFeatureRecordConverter}
import org.geomesa.nifi.datastore.services.DataStoreService
import org.geotools.data.{DataStore, DataUtilities, Transaction}
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.filter.filterToString
Expand All @@ -30,6 +31,7 @@ import org.opengis.feature.simple.SimpleFeature
import org.opengis.filter.Filter

import java.io.InputStream
import java.util.concurrent.LinkedBlockingQueue
import scala.annotation.tailrec
import scala.util.Try
import scala.util.control.NonFatal
Expand All @@ -54,27 +56,24 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
import scala.collection.JavaConverters._

@volatile
private var ds: DataStore = _
private var service: DataStoreService = _
private var factory: RecordReaderFactory = _
private var options: OptionExtractor = _

private val stores = new LinkedBlockingQueue[DataStore]()

override protected def getPrimaryProperties: Seq[PropertyDescriptor] =
super.getPrimaryProperties ++ UpdateGeoMesaRecord.Props

@OnScheduled
def initialize(context: ProcessContext): Unit = {
logger.info("Initializing")

ds = loadDataStore(context)

try {
factory = context.getProperty(RecordReader).asControllerService(classOf[RecordReaderFactory])
options = OptionExtractor(context, GeometryEncoding.Wkt)
} catch {
case NonFatal(e) => CloseWithLogging(ds); ds = null; throw e
}
service = getDataStoreService(context)
factory = context.getProperty(RecordReader).asControllerService(classOf[RecordReaderFactory])
options = OptionExtractor(context, GeometryEncoding.Wkt)

logger.info(s"Initialized datastore ${ds.getClass.getSimpleName}")
logger.info(s"Initialized DataStoreService ${service.getClass.getSimpleName}")
}

override def onTrigger(context: ProcessContext, session: ProcessSession): Unit = {
Expand All @@ -84,8 +83,13 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
}

val fullFlowFileName = fullName(file)
val ds = stores.poll() match {
case null => service.loadDataStore()
case ds => ds
}
try {
logger.debug(s"Running ${getClass.getName} on file $fullFlowFileName")

val opts = options(context, file.getAttributes)
val id = context.getProperty(LookupCol).evaluateAttributeExpressions(file).getValue
val filterFactory = if (opts.fidField.contains(id)) { FidFilter } else { new AttributeFilter(id) }
Expand Down Expand Up @@ -176,6 +180,8 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
case NonFatal(e) =>
logger.error(s"Error processing file $fullFlowFileName:", e)
session.transfer(file, Relationships.FailureRelationship)
} finally {
stores.put(ds)
}
}

Expand All @@ -185,10 +191,8 @@ class UpdateGeoMesaRecord extends DataStoreProcessor {
def cleanup(): Unit = {
logger.info("Processor shutting down")
val start = System.currentTimeMillis()
if (ds != null) {
CloseWithLogging(ds)
ds = null
}
stores.iterator().asScala.foreach(service.dispose)
stores.clear()
logger.info(s"Shut down in ${System.currentTimeMillis() - start}ms")
}
}
Expand Down
Loading

0 comments on commit 03d995b

Please sign in to comment.