Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

当MQ选择为Kafka时,支持配置Kerberos认证 #1895

Merged
merged 2 commits into from
Jun 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,56 @@
*/
public class CanalConstants {

public static final String MDC_DESTINATION = "destination";
public static final String ROOT = "canal";
public static final String CANAL_ID = ROOT + "." + "id";
public static final String CANAL_IP = ROOT + "." + "ip";
public static final String CANAL_PORT = ROOT + "." + "port";
public static final String CANAL_METRICS_PULL_PORT = ROOT + "." + "metrics.pull.port";
public static final String CANAL_ZKSERVERS = ROOT + "." + "zkServers";
public static final String CANAL_WITHOUT_NETTY = ROOT + "." + "withoutNetty";
public static final String MDC_DESTINATION = "destination";
public static final String ROOT = "canal";
public static final String CANAL_ID = ROOT + "." + "id";
public static final String CANAL_IP = ROOT + "." + "ip";
public static final String CANAL_PORT = ROOT + "." + "port";
public static final String CANAL_METRICS_PULL_PORT = ROOT + "." + "metrics.pull.port";
public static final String CANAL_ZKSERVERS = ROOT + "." + "zkServers";
public static final String CANAL_WITHOUT_NETTY = ROOT + "." + "withoutNetty";

public static final String CANAL_DESTINATIONS = ROOT + "." + "destinations";
public static final String CANAL_AUTO_SCAN = ROOT + "." + "auto.scan";
public static final String CANAL_AUTO_SCAN_INTERVAL = ROOT + "." + "auto.scan.interval";
public static final String CANAL_CONF_DIR = ROOT + "." + "conf.dir";
public static final String CANAL_SERVER_MODE = ROOT + "." + "serverMode";
public static final String CANAL_DESTINATIONS = ROOT + "." + "destinations";
public static final String CANAL_AUTO_SCAN = ROOT + "." + "auto.scan";
public static final String CANAL_AUTO_SCAN_INTERVAL = ROOT + "." + "auto.scan.interval";
public static final String CANAL_CONF_DIR = ROOT + "." + "conf.dir";
public static final String CANAL_SERVER_MODE = ROOT + "." + "serverMode";

public static final String CANAL_DESTINATION_SPLIT = ",";
public static final String GLOBAL_NAME = "global";
public static final String CANAL_DESTINATION_SPLIT = ",";
public static final String GLOBAL_NAME = "global";

public static final String INSTANCE_MODE_TEMPLATE = ROOT + "." + "instance.{0}.mode";
public static final String INSTANCE_LAZY_TEMPLATE = ROOT + "." + "instance.{0}.lazy";
public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE = ROOT + "." + "instance.{0}.manager.address";
public static final String INSTANCE_SPRING_XML_TEMPLATE = ROOT + "." + "instance.{0}.spring.xml";
public static final String INSTANCE_MODE_TEMPLATE = ROOT + "." + "instance.{0}.mode";
public static final String INSTANCE_LAZY_TEMPLATE = ROOT + "." + "instance.{0}.lazy";
public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE = ROOT + "." + "instance.{0}.manager.address";
public static final String INSTANCE_SPRING_XML_TEMPLATE = ROOT + "." + "instance.{0}.spring.xml";

public static final String CANAL_DESTINATION_PROPERTY = ROOT + ".instance.destination";

public static final String CANAL_SOCKETCHANNEL = ROOT + "." + "socketChannel";

public static final String CANAL_MQ_SERVERS = ROOT + "." + "mq.servers";
public static final String CANAL_MQ_RETRIES = ROOT + "." + "mq.retries";
public static final String CANAL_MQ_BATCHSIZE = ROOT + "." + "mq.batchSize";
public static final String CANAL_MQ_LINGERMS = ROOT + "." + "mq.lingerMs";
public static final String CANAL_MQ_MAXREQUESTSIZE = ROOT + "." + "mq.maxRequestSize";
public static final String CANAL_MQ_BUFFERMEMORY = ROOT + "." + "mq.bufferMemory";
public static final String CANAL_MQ_CANALBATCHSIZE = ROOT + "." + "mq.canalBatchSize";
public static final String CANAL_MQ_CANALGETTIMEOUT = ROOT + "." + "mq.canalGetTimeout";
public static final String CANAL_MQ_FLATMESSAGE = ROOT + "." + "mq.flatMessage";
public static final String CANAL_MQ_COMPRESSION_TYPE = ROOT + "." + "mq.compressionType";
public static final String CANAL_MQ_ACKS = ROOT + "." + "mq.acks";
public static final String CANAL_MQ_TRANSACTION = ROOT + "." + "mq.transaction";
public static final String CANAL_MQ_PRODUCERGROUP = ROOT + "." + "mq.producerGroup";
public static final String CANAL_ALIYUN_ACCESSKEY = ROOT + "." + "aliyun.accessKey";
public static final String CANAL_ALIYUN_SECRETKEY = ROOT + "." + "aliyun.secretKey";
public static final String CANAL_MQ_PROPERTIES = ROOT + "." + "mq.properties";
public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE = ROOT + "." + "mq.enableMessageTrace";
public static final String CANAL_MQ_ACCESS_CHANNEL = ROOT + "." + "mq.accessChannel";
public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC = ROOT + "." + "mq.customizedTraceTopic";
public static final String CANAL_MQ_NAMESPACE = ROOT + "." + "mq.namespace";
public static final String CANAL_DESTINATION_PROPERTY = ROOT + ".instance.destination";

public static final String CANAL_SOCKETCHANNEL = ROOT + "." + "socketChannel";

public static final String CANAL_MQ_SERVERS = ROOT + "." + "mq.servers";
public static final String CANAL_MQ_RETRIES = ROOT + "." + "mq.retries";
public static final String CANAL_MQ_BATCHSIZE = ROOT + "." + "mq.batchSize";
public static final String CANAL_MQ_LINGERMS = ROOT + "." + "mq.lingerMs";
public static final String CANAL_MQ_MAXREQUESTSIZE = ROOT + "." + "mq.maxRequestSize";
public static final String CANAL_MQ_BUFFERMEMORY = ROOT + "." + "mq.bufferMemory";
public static final String CANAL_MQ_CANALBATCHSIZE = ROOT + "." + "mq.canalBatchSize";
public static final String CANAL_MQ_CANALGETTIMEOUT = ROOT + "." + "mq.canalGetTimeout";
public static final String CANAL_MQ_FLATMESSAGE = ROOT + "." + "mq.flatMessage";
public static final String CANAL_MQ_COMPRESSION_TYPE = ROOT + "." + "mq.compressionType";
public static final String CANAL_MQ_ACKS = ROOT + "." + "mq.acks";
public static final String CANAL_MQ_TRANSACTION = ROOT + "." + "mq.transaction";
public static final String CANAL_MQ_PRODUCERGROUP = ROOT + "." + "mq.producerGroup";
public static final String CANAL_ALIYUN_ACCESSKEY = ROOT + "." + "aliyun.accessKey";
public static final String CANAL_ALIYUN_SECRETKEY = ROOT + "." + "aliyun.secretKey";
public static final String CANAL_MQ_PROPERTIES = ROOT + "." + "mq.properties";
public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE = ROOT + "." + "mq.enableMessageTrace";
public static final String CANAL_MQ_ACCESS_CHANNEL = ROOT + "." + "mq.accessChannel";
public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC = ROOT + "." + "mq.customizedTraceTopic";
public static final String CANAL_MQ_NAMESPACE = ROOT + "." + "mq.namespace";
public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE = ROOT + "." + "mq.kafka.kerberos.enable";
public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";

public static String getInstanceModeKey(String destination) {
return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package com.alibaba.otter.canal.deployer;

import java.io.File;
import java.io.FileFilter;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
Expand All @@ -18,6 +8,15 @@
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileFilter;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
* Canal server 启动类
Expand Down Expand Up @@ -227,6 +226,21 @@ private static MQProperties buildMQProperties(Properties properties) {
mqProperties.setNamespace(namespace);
}

String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
}

String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
}

String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
}

for (Object key : properties.keySet()) {
key = StringUtils.trim(key.toString());
if (((String) key).startsWith(CanalConstants.CANAL_MQ_PROPERTIES)) {
Expand Down
7 changes: 7 additions & 0 deletions deployer/src/main/resources/canal.properties
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,10 @@ canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
######### Kafka Kerberos Info #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public class MQProperties {
private String accessChannel = null;
private String customizedTraceTopic = null;
private String namespace = "";
private boolean kerberosEnable = false; //kafka集群是否启动Kerberos认证
private String kerberosKrb5FilePath = ""; //启动Kerberos认证时配置为krb5.conf文件的路径
private String kerberosJaasFilePath = ""; //启动Kerberos认证时配置为jaas.conf文件的路径

public static class CanalDestination {

private String canalDestination;
Expand Down Expand Up @@ -257,6 +261,30 @@ public void setNamespace(String namespace) {
this.namespace = namespace;
}

public boolean isKerberosEnable() {
return kerberosEnable;
}

public void setKerberosEnable(boolean kerberosEnable) {
this.kerberosEnable = kerberosEnable;
}

public String getKerberosKrb5FilePath() {
return kerberosKrb5FilePath;
}

public void setKerberosKrb5FilePath(String kerberosKrb5FilePath) {
this.kerberosKrb5FilePath = kerberosKrb5FilePath;
}

public String getKerberosJaasFilePath() {
return kerberosJaasFilePath;
}

public void setKerberosJaasFilePath(String kerberosJaasFilePath) {
this.kerberosJaasFilePath = kerberosJaasFilePath;
}

@Override public String toString() {
return "MQProperties{" +
"servers='" + servers + '\'' +
Expand All @@ -280,6 +308,9 @@ public void setNamespace(String namespace) {
", accessChannel='" + accessChannel + '\'' +
", customizedTraceTopic='" + customizedTraceTopic + '\'' +
", namespace='" + namespace + '\'' +
", kerberosEnable='" + kerberosEnable + '\'' +
", kerberosKrb5FilePath='" + kerberosKrb5FilePath + '\'' +
", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.alibaba.otter.canal.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.MQMessageUtils;
import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.spi.CanalMQProducer;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand All @@ -14,13 +15,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.MQMessageUtils;
import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.spi.CanalMQProducer;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
* kafka producer 主操作类
Expand Down Expand Up @@ -59,6 +59,26 @@ public void init(MQProperties kafkaProperties) {
} else {
properties.put("retries", kafkaProperties.getRetries());
}

if (kafkaProperties.isKerberosEnable()){
File krb5File = new File(kafkaProperties.getKerberosKrb5FilePath());
File jaasFile = new File(kafkaProperties.getKerberosJaasFilePath());
if(krb5File.exists() && jaasFile.exists()){
//配置kerberos认证,需要使用绝对路径
System.setProperty("java.security.krb5.conf",
krb5File.getAbsolutePath());
System.setProperty("java.security.auth.login.config",
jaasFile.getAbsolutePath());
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.kerberos.service.name", "kafka");
}else{
String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";
logger.error(errorMsg);
throw new RuntimeException(errorMsg);
}
}

if (!kafkaProperties.getFlatMessage()) {
properties.put("value.serializer", MessageSerializer.class.getName());
producer = new KafkaProducer<String, Message>(properties);
Expand Down