diff --git a/geomesa-datastore-bundle/geomesa-datastore-processors/src/main/scala/org/geomesa/nifi/datastore/processor/UpdateGeoMesaRecord.scala b/geomesa-datastore-bundle/geomesa-datastore-processors/src/main/scala/org/geomesa/nifi/datastore/processor/UpdateGeoMesaRecord.scala index abe83392..16c6a671 100644 --- a/geomesa-datastore-bundle/geomesa-datastore-processors/src/main/scala/org/geomesa/nifi/datastore/processor/UpdateGeoMesaRecord.scala +++ b/geomesa-datastore-bundle/geomesa-datastore-processors/src/main/scala/org/geomesa/nifi/datastore/processor/UpdateGeoMesaRecord.scala @@ -8,6 +8,7 @@ package org.geomesa.nifi.datastore.processor +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} import org.apache.nifi.annotation.behavior.InputRequirement.Requirement import org.apache.nifi.annotation.behavior.{InputRequirement, SupportsBatching, WritesAttribute, WritesAttributes} import org.apache.nifi.annotation.documentation.{CapabilityDescription, Tags} @@ -19,22 +20,23 @@ import org.apache.nifi.processor.util.StandardValidators import org.apache.nifi.processor.{ProcessContext, ProcessSession} import org.apache.nifi.serialization.RecordReaderFactory import org.apache.nifi.serialization.record.Record -import org.geomesa.nifi.datastore.processor.UpdateGeoMesaRecord.{AttributeFilter, FidFilter} +import org.geomesa.nifi.datastore.processor.UpdateGeoMesaRecord.{AttributeFilter, FidFilter, SchemaCache} import org.geomesa.nifi.datastore.processor.mixins.DataStoreProcessor import org.geomesa.nifi.datastore.processor.records.{GeometryEncoding, OptionExtractor, SimpleFeatureRecordConverter} import org.geomesa.nifi.datastore.services.DataStoreService import org.geotools.data.{DataStore, DataUtilities, Transaction} import org.geotools.util.factory.Hints import org.locationtech.geomesa.filter.filterToString +import org.locationtech.geomesa.utils.concurrent.ExitingExecutor import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose} -import org.opengis.feature.simple.SimpleFeature +import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType} import org.opengis.filter.Filter import java.io.InputStream -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.{LinkedBlockingQueue, ScheduledThreadPoolExecutor, TimeUnit} import scala.annotation.tailrec -import scala.util.Try import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} @Tags(Array("geomesa", "geo", "update", "records", "geotools")) @@ -59,6 +61,7 @@ class UpdateGeoMesaRecord extends DataStoreProcessor { private var service: DataStoreService = _ private var factory: RecordReaderFactory = _ private var options: OptionExtractor = _ + private var schemas: SchemaCache = _ private val stores = new LinkedBlockingQueue[DataStore]() @@ -70,6 +73,7 @@ class UpdateGeoMesaRecord extends DataStoreProcessor { logger.info("Initializing") service = getDataStoreService(context) + schemas = new SchemaCache(service) factory = context.getProperty(RecordReader).asControllerService(classOf[RecordReaderFactory]) options = OptionExtractor(context, GeometryEncoding.Wkt) @@ -83,10 +87,6 @@ class UpdateGeoMesaRecord extends DataStoreProcessor { } val fullFlowFileName = fullName(file) - val ds = stores.poll() match { - case null => service.loadDataStore() - case ds => ds - } try { logger.debug(s"Running ${getClass.getName} on file $fullFlowFileName") @@ -102,7 +102,7 @@ class UpdateGeoMesaRecord extends DataStoreProcessor { val converter = SimpleFeatureRecordConverter(reader.getSchema, opts) val typeName = converter.sft.getTypeName - val existing = Try(ds.getSchema(typeName)).getOrElse(null) + val existing = schemas.getSchema(typeName) if (existing == null) { throw new IllegalStateException(s"Schema '$typeName' does not exist in the data store") } @@ -129,42 +129,72 @@ class UpdateGeoMesaRecord extends DataStoreProcessor { nextRecord } - var record = nextRecord - while (record != null) { - try { - val sf = converter.convert(record) - val filter = filterFactory(sf) + var ds = stores.poll() match { + case null => service.loadDataStore() + case ds => ds + } + + try { + val records = Iterator.continually(nextRecord).takeWhile(_ != null) + val features = records.flatMap { record => try { - WithClose(ds.getFeatureWriter(typeName, filter, Transaction.AUTO_COMMIT)) { writer => - if (!writer.hasNext) { - logger.warn(s"Filter does not match any features, skipping update: ${filterToString(filter)}") - failure += 1L - } else { - do { - val toWrite = writer.next() - names.foreach(n => toWrite.setAttribute(n, sf.getAttribute(n))) - if (opts.fidField.isDefined) { - toWrite.getUserData.put(Hints.PROVIDED_FID, sf.getID) - } - if (opts.visField.isDefined) { - sf.visibility.foreach(toWrite.visibility = _) - } - writer.write() - success += 1L - } while (writer.hasNext) - } + val sf = converter.convert(record) + val filter = filterFactory(sf) + if (ds == null) { + failure += 1L + Iterator.empty + } else { + Iterator.single(sf -> filter) } } catch { case NonFatal(e) => failure += 1L - logger.error(s"Error writing feature to store: '${DataUtilities.encodeFeature(sf)}'", e) + logger.error(s"Error converting record to feature: '${record.toMap.asScala.mkString(",")}'", e) + Iterator.empty } - } catch { - case NonFatal(e) => - failure += 1L - logger.error(s"Error converting record to feature: '${record.toMap.asScala.mkString(",")}'", e) } - record = nextRecord + + features.foreach { case (sf, filter) => + Try(ds.getFeatureWriter(typeName, filter, Transaction.AUTO_COMMIT)) match { + case Failure(e) => + // if we can't get the writer, that usually means the datastore has become invalid + failure += 1L + logger.error(s"Error getting feature writer:", e) + service.dispose(ds) + ds = null // this will filter out and fail any remaining records in our iterator, above + + case Success(writer) => + try { + if (!writer.hasNext) { + logger.warn(s"Filter does not match any features, skipping update: ${filterToString(filter)}") + failure += 1L + } else { + do { + val toWrite = writer.next() + names.foreach(n => toWrite.setAttribute(n, sf.getAttribute(n))) + if (opts.fidField.isDefined) { + toWrite.getUserData.put(Hints.PROVIDED_FID, sf.getID) + } + if (opts.visField.isDefined) { + sf.visibility.foreach(toWrite.visibility = _) + } + writer.write() + success += 1L + } while (writer.hasNext) + } + } catch { + case NonFatal(e) => + failure += 1L + logger.error(s"Error writing feature to store: '${DataUtilities.encodeFeature(sf)}'", e) + } finally { + CloseWithLogging(writer) + } + } + } + } finally { + if (ds != null) { + stores.put(ds) + } } } } @@ -180,8 +210,6 @@ class UpdateGeoMesaRecord extends DataStoreProcessor { case NonFatal(e) => logger.error(s"Error processing file $fullFlowFileName:", e) session.transfer(file, Relationships.FailureRelationship) - } finally { - stores.put(ds) } } @@ -226,6 +254,30 @@ object UpdateGeoMesaRecord { override def apply(f: SimpleFeature): Filter = ff.equals(prop, ff.literal(f.getAttribute(name))) } + private class SchemaCache(service: DataStoreService) { + + private val refresher = ExitingExecutor(new ScheduledThreadPoolExecutor(1)) + + private val schemaCheckCache = + Caffeine.newBuilder().build[String, SimpleFeatureType]( + new CacheLoader[String, SimpleFeatureType]() { + override def load(typeName: String): SimpleFeatureType = { + // we schedule a refresh, vs using the built-in Caffeine refresh which will only refresh after a + // request. If there are not very many requests, then the deferred value will always be out of date + refresher.schedule(new Runnable() { override def run(): Unit = refresh(typeName) }, 1, TimeUnit.HOURS) + val store = service.loadDataStore() + try { Try(store.getSchema(typeName)).getOrElse(null) } finally { + service.dispose(store) + } + } + } + ) + + private def refresh(typeName: String): Unit = schemaCheckCache.refresh(typeName) + + def getSchema(typeName: String): SimpleFeatureType = schemaCheckCache.get(typeName) + } + object Properties { val LookupCol: PropertyDescriptor = new PropertyDescriptor.Builder()