Skip to content

Commit

Permalink
Wrap entire NLP initialization in a mutex lock to prevent race condit…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
qqndrew committed Mar 7, 2022
1 parent 84898cd commit dff68df
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 64 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.ohnlp.medtagger</groupId>
<artifactId>medtagger</artifactId>
<version>1.0.35</version>
<version>1.0.36</version>
<description>The MedTagger biomedical information extraction pipeline</description>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static org.apache.uima.fit.factory.AnalysisEngineFactory.createEngineDescription;
Expand Down Expand Up @@ -73,6 +75,8 @@ public PCollection<Row> expand(PCollection<Row> input) {
}

private static class MedTaggerPipelineFunction extends DoFn<Row, Row> {
private transient static final ReentrantLock INIT_MUTEX_LOCK = new ReentrantLock();

private final String resourceFolder;
private final String textField;
private final RunMode mode;
Expand All @@ -92,77 +96,83 @@ public MedTaggerPipelineFunction(String textField, String resourceFolder, RunMod

@Setup
public void init() throws IOException, InvalidXMLException, URISyntaxException, ResourceInitializationException {
AggregateBuilder ae = new AggregateBuilder();
// Tokenization, Sentence Splitting, Section Detection, etc.
ae.add(createEngineDescription("desc.backbone.aes.PreConceptExtractionAE"));
// Add the appropriate NER/normalization component depending on run mode
URI uri = null;
switch (mode) {
case OHNLPTK_DEFINED: // Ruleset from a web service
throw new UnsupportedOperationException("Remote Served IE Rulesets not yet implemented");
case STANDALONE:
case STANDALONE_IE_ONLY: {
uri = MedTaggerPipelineFunction.class.getResource("/resources/" + this.resourceFolder).toURI();
Map<String, String> env = new HashMap<>();
env.put("create", "true");
try {
// Ensure it is created, ignore if not
FileSystem fs = FileSystems.newFileSystem(uri, env);
} catch (FileSystemAlreadyExistsException ignored) {
}
ae.add(createEngineDescription("org.ohnlp.medtagger.ie.aes.MedTaggerIEAnnotatorAE", "Resource_dir", uri.toString()));
break;
}
case STANDALONE_DICT_ONLY: {
uri = MedTaggerPipelineFunction.class.getResource("/resources/" + this.resourceFolder).toURI();
Map<String, String> env = new HashMap<>();
env.put("create", "true");
try {
// Ensure it is created, ignore if not
FileSystem fs = FileSystems.newFileSystem(uri, env);
} catch (FileSystemAlreadyExistsException ignored) {
try {
INIT_MUTEX_LOCK.lock();
AggregateBuilder ae = new AggregateBuilder();
// Tokenization, Sentence Splitting, Section Detection, etc.
ae.add(createEngineDescription("desc.backbone.aes.PreConceptExtractionAE"));
// Add the appropriate NER/normalization component depending on run mode
URI uri = null;
switch (mode) {
case OHNLPTK_DEFINED: // Ruleset from a web service
throw new UnsupportedOperationException("Remote Served IE Rulesets not yet implemented");
case STANDALONE:
case STANDALONE_IE_ONLY: {
uri = MedTaggerPipelineFunction.class.getResource("/resources/" + this.resourceFolder).toURI();
Map<String, String> env = new HashMap<>();
env.put("create", "true");
try {
// Ensure it is created, ignore if not
FileSystem fs = FileSystems.newFileSystem(uri, env);
} catch (FileSystemAlreadyExistsException ignored) {
}
ae.add(createEngineDescription("org.ohnlp.medtagger.ie.aes.MedTaggerIEAnnotatorAE", "Resource_dir", uri.toString()));
break;
}
ae.add(createEngineDescription(AhoCorasickLookupAnnotator.class, "dict_file", uri.toString()));
break;
}
case STANDALONE_DICT_AND_IE: {
String[] parsed = this.resourceFolder.split("\\|");
uri = MedTaggerPipelineFunction.class.getResource("/resources/" + parsed[0]).toURI();
URI dictURI = null;
if (parsed.length > 1) {
dictURI = MedTaggerPipelineFunction.class.getResource("/resources/" + parsed[1]).toURI();
case STANDALONE_DICT_ONLY: {
uri = MedTaggerPipelineFunction.class.getResource("/resources/" + this.resourceFolder).toURI();
Map<String, String> env = new HashMap<>();
env.put("create", "true");
try {
// Ensure it is created, ignore if not
FileSystem fs = FileSystems.newFileSystem(uri, env);
} catch (FileSystemAlreadyExistsException ignored) {
}
ae.add(createEngineDescription(AhoCorasickLookupAnnotator.class, "dict_file", uri.toString()));
break;
}
Map<String, String> env = new HashMap<>();
env.put("create", "true");
try {
// Ensure it is created, ignore if not
FileSystem fs = FileSystems.newFileSystem(uri, env);
} catch (FileSystemAlreadyExistsException ignored) {
case STANDALONE_DICT_AND_IE: {
String[] parsed = this.resourceFolder.split("\\|");
uri = MedTaggerPipelineFunction.class.getResource("/resources/" + parsed[0]).toURI();
URI dictURI = null;
if (parsed.length > 1) {
dictURI = MedTaggerPipelineFunction.class.getResource("/resources/" + parsed[1]).toURI();
}
Map<String, String> env = new HashMap<>();
env.put("create", "true");
try {
// Ensure it is created, ignore if not
FileSystem fs = FileSystems.newFileSystem(uri, env);
} catch (FileSystemAlreadyExistsException ignored) {
}
ae.add(createEngineDescription("org.ohnlp.medtagger.ie.aes.MedTaggerIEAnnotatorAE", "Resource_dir", uri.toString()));
if (dictURI != null) {
ae.add(createEngineDescription("desc.backbone.aes.MedTaggerDictionaryLookupAE", "dict_file", dictURI.toString()));
} else {
ae.add(createEngineDescription("desc.backbone.aes.MedTaggerDictionaryLookupAE"));
}
break;
}
ae.add(createEngineDescription("org.ohnlp.medtagger.ie.aes.MedTaggerIEAnnotatorAE", "Resource_dir", uri.toString()));
if (dictURI != null) {
ae.add(createEngineDescription("desc.backbone.aes.MedTaggerDictionaryLookupAE", "dict_file", dictURI.toString()));
} else {
case GENERAL_CLINICAL:
ae.add(createEngineDescription("desc.backbone.aes.MedTaggerDictionaryLookupAE"));
}
break;
break;
}

// Add Context handling
if (uri != null) {
ae.add(AnalysisEngineFactory.createEngineDescription(RuleContextAnnotator.class, "context_ruleset", uri.toString()));
} else {
ae.add(AnalysisEngineFactory.createEngineDescription(RuleContextAnnotator.class));
}
case GENERAL_CLINICAL:
ae.add(createEngineDescription("desc.backbone.aes.MedTaggerDictionaryLookupAE"));
break;
}

// Add Context handling
if (uri != null) {
ae.add(AnalysisEngineFactory.createEngineDescription(RuleContextAnnotator.class, "context_ruleset", uri.toString()));
} else {
ae.add(AnalysisEngineFactory.createEngineDescription(RuleContextAnnotator.class));
this.resMgr = ResourceManagerFactory.newResourceManager();
this.aae = UIMAFramework.produceAnalysisEngine(ae.createAggregateDescription(), resMgr, null);
this.cas = CasCreationUtils.createCas(Collections.singletonList(aae.getMetaData()),
null, resMgr);
} finally {
INIT_MUTEX_LOCK.unlock();
}

this.resMgr = ResourceManagerFactory.newResourceManager();
this.aae = UIMAFramework.produceAnalysisEngine(ae.createAggregateDescription(), resMgr, null);
this.cas = CasCreationUtils.createCas(Collections.singletonList(aae.getMetaData()),
null, resMgr);
}

@ProcessElement
Expand Down

0 comments on commit dff68df

Please sign in to comment.