From 8b1fbac769cc1ff6a5e98930326463403e8ab992 Mon Sep 17 00:00:00 2001 From: Alistair Bush Date: Fri, 21 Jun 2013 17:16:48 +1200 Subject: [PATCH 1/2] Add ssl socket factory --- .../river/mongodb/MongoDBRiver.java | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index 7448000c..3d00439f 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -34,6 +34,8 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import javax.net.ssl.SSLSocketFactory; + import org.bson.BasicBSONObject; import org.bson.types.BSONTimestamp; import org.bson.types.ObjectId; @@ -77,6 +79,7 @@ import com.mongodb.Mongo; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientOptions.Builder; import com.mongodb.MongoException; import com.mongodb.MongoInterruptedException; import com.mongodb.QueryOperators; @@ -104,6 +107,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { public final static String PORT_FIELD = "port"; public final static String OPTIONS_FIELD = "options"; public final static String SECONDARY_READ_PREFERENCE_FIELD = "secondary_read_preference"; + public final static String SSL_CONNECTION_FIELD = "ssl"; public final static String DROP_COLLECTION_FIELD = "drop_collection"; public final static String EXCLUDE_FIELDS_FIELD = "exclude_fields"; public final static String FILTER_FIELD = "filter"; @@ -164,6 +168,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { protected final String mongoLocalPassword; protected final String mongoOplogNamespace; protected final boolean mongoSecondaryReadPreference; + protected final boolean mongoUseSSL; protected final String indexName; protected final String typeName; @@ -250,7 +255,9 @@ public MongoDBRiver(final RiverName riverName, .get(SECONDARY_READ_PREFERENCE_FIELD), false); dropCollection = XContentMapValues.nodeBooleanValue( mongoOptionsSettings.get(DROP_COLLECTION_FIELD), false); - + mongoUseSSL = XContentMapValues.nodeBooleanValue( + mongoOptionsSettings.get(SSL_CONNECTION_FIELD), false); + if (mongoOptionsSettings.containsKey(EXCLUDE_FIELDS_FIELD)) { excludeFields = new HashSet(); Object excludeFieldsSettings = mongoOptionsSettings @@ -274,6 +281,7 @@ public MongoDBRiver(final RiverName riverName, mongoSecondaryReadPreference = false; dropCollection = false; excludeFields = null; + mongoUseSSL = false; } // Credentials @@ -377,6 +385,7 @@ public MongoDBRiver(final RiverName riverName, script = null; dropCollection = false; excludeFields = null; + mongoUseSSL = false; } mongoOplogNamespace = mongoDb + "." + mongoCollection; @@ -555,10 +564,16 @@ && getAdminDb().isAuthenticated()) { private Mongo getMongoClient() { if (mongo == null) { - // TODO: MongoClientOptions should be configurable - MongoClientOptions mco = MongoClientOptions.builder() + + Builder builder = MongoClientOptions.builder() .autoConnectRetry(true).connectTimeout(15000) - .socketTimeout(60000).build(); + .socketKeepAlive(true).socketTimeout(60000); + if (mongoUseSSL){ + builder.socketFactory(SSLSocketFactory.getDefault()); + } + + // TODO: MongoClientOptions should be configurable + MongoClientOptions mco = builder.build(); mongo = new MongoClient(mongoServers, mco); } return mongo; @@ -963,7 +978,16 @@ private boolean assignCollections() { @Override public void run() { - mongo = new MongoClient(mongoServers); + Builder builder = MongoClientOptions.builder() + .autoConnectRetry(true).connectTimeout(15000) + .socketKeepAlive(true).socketTimeout(60000); + if (mongoUseSSL){ + builder.socketFactory(SSLSocketFactory.getDefault()); + } + + // TODO: MongoClientOptions should be configurable + MongoClientOptions mco = builder.build(); + mongo = new MongoClient(mongoServers, mco); if (mongoSecondaryReadPreference) { mongo.setReadPreference(ReadPreference.secondaryPreferred()); From c57bf9bbec9500bd0574f7f658df73d0281a3d63 Mon Sep 17 00:00:00 2001 From: Alistair Bush Date: Sun, 23 Jun 2013 21:05:56 +1200 Subject: [PATCH 2/2] add ability to ignore ssl certificate errors. --- .../river/mongodb/MongoDBRiver.java | 55 ++++++++++++++++++- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index 3d00439f..94b02ef8 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -24,6 +24,10 @@ import java.io.IOException; import java.net.UnknownHostException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -34,7 +38,11 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import javax.net.SocketFactory; +import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import org.bson.BasicBSONObject; import org.bson.types.BSONTimestamp; @@ -108,6 +116,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { public final static String OPTIONS_FIELD = "options"; public final static String SECONDARY_READ_PREFERENCE_FIELD = "secondary_read_preference"; public final static String SSL_CONNECTION_FIELD = "ssl"; + public final static String SSL_VERIFY_CERT_FIELD = "sslverifycertificate"; public final static String DROP_COLLECTION_FIELD = "drop_collection"; public final static String EXCLUDE_FIELDS_FIELD = "exclude_fields"; public final static String FILTER_FIELD = "filter"; @@ -169,6 +178,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { protected final String mongoOplogNamespace; protected final boolean mongoSecondaryReadPreference; protected final boolean mongoUseSSL; + protected final boolean mongoSSLVerifyCertificate; protected final String indexName; protected final String typeName; @@ -187,6 +197,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { // private final TransferQueue> stream = new // LinkedTransferQueue>(); private final BlockingQueue> stream; + private SocketFactory sslSocketFactory; private Mongo mongo; private DB adminDb; @@ -257,6 +268,8 @@ public MongoDBRiver(final RiverName riverName, mongoOptionsSettings.get(DROP_COLLECTION_FIELD), false); mongoUseSSL = XContentMapValues.nodeBooleanValue( mongoOptionsSettings.get(SSL_CONNECTION_FIELD), false); + mongoSSLVerifyCertificate = XContentMapValues.nodeBooleanValue( + mongoOptionsSettings.get(SSL_VERIFY_CERT_FIELD), true); if (mongoOptionsSettings.containsKey(EXCLUDE_FIELDS_FIELD)) { excludeFields = new HashSet(); @@ -282,6 +295,7 @@ public MongoDBRiver(final RiverName riverName, dropCollection = false; excludeFields = null; mongoUseSSL = false; + mongoSSLVerifyCertificate = false; } // Credentials @@ -386,6 +400,7 @@ public MongoDBRiver(final RiverName riverName, dropCollection = false; excludeFields = null; mongoUseSSL = false; + mongoSSLVerifyCertificate = false; } mongoOplogNamespace = mongoDb + "." + mongoCollection; @@ -569,7 +584,7 @@ private Mongo getMongoClient() { .autoConnectRetry(true).connectTimeout(15000) .socketKeepAlive(true).socketTimeout(60000); if (mongoUseSSL){ - builder.socketFactory(SSLSocketFactory.getDefault()); + builder.socketFactory(getSSLSocketFactory()); } // TODO: MongoClientOptions should be configurable @@ -610,6 +625,42 @@ public void close() { indexerThread.interrupt(); } } + + private SocketFactory getSSLSocketFactory() { + if (sslSocketFactory != null) + return sslSocketFactory; + + if (!mongoSSLVerifyCertificate) { + try { + final TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { + + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) + throws CertificateException { + } + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) + throws CertificateException { + } + }}; + final SSLContext sslContext = SSLContext.getInstance( "SSL" ); + sslContext.init( null, trustAllCerts, new java.security.SecureRandom() ); + // Create an ssl socket factory with our all-trusting manager + sslSocketFactory = sslContext.getSocketFactory(); + return sslSocketFactory; + } catch(Exception ex) { + logger.error("Unable to build ssl socket factory without certificate validation, using default instead.", ex); + } + } + sslSocketFactory = SSLSocketFactory.getDefault(); + return sslSocketFactory; + } private class Indexer implements Runnable { @@ -982,7 +1033,7 @@ public void run() { .autoConnectRetry(true).connectTimeout(15000) .socketKeepAlive(true).socketTimeout(60000); if (mongoUseSSL){ - builder.socketFactory(SSLSocketFactory.getDefault()); + builder.socketFactory(getSSLSocketFactory()); } // TODO: MongoClientOptions should be configurable