kafka集群集成Kerberos

爱一个人没有错 -
kafka集群集成Kerberos
一、Kerbeos简介

Kerberos可以将认证的密钥在集群部署时事先放到可靠的节点上。集群运行时,集群内的节点使用密钥得到认证,认证通过后的节点才能提供服务。企图冒充的节点由于没有事先得到的密钥信息,无法与集群内部的节点通信。这样就防止了恶意地使用或篡改Hadoop集群的问题,确保了Hadoop集群的可靠性、安全性。

名词解释AS(Authentication Server): 认证服务器KDC(Key Distribution Center): 密钥分发中心TGT(Ticket Granting Ticket): 票据授权票据,票据的票据TGS(Ticket Granting Server):票据授权服务器SS(Service Server): 特定服务提供端Principal: 被认证的个体Ticket: 票据, 客户端用来证明身份真实性。包含: 用户名,IP,时间戳,有效期,会话秘钥。二、安装zookeeper(以下操作以centos8为例)
#下载二进制压缩包
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.3/zookeeper-3.6.3.tar.gz
#解压缩
tar -zxvf zookeeper-3.6.3.tar.gz

#进入zookeeper的根目录,然后把conf下的zoo_sample.cfg这个文件重命名为zoo.cfg

#进去bin目录上一级目录 启动
./bin/zkServer.sh start
#查看是否启动成功
ps -aux | grep 'zookeeper-3.6.3' 或 ps -ef | grep 'zookeeper-3.6.3'

image.png

三、安装启动kafka
#下载
wget http://mirrors.hust.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
#解压
tar -zxvf kafka_2.12-2.8.0.tgz
#重命名
mv kafka_2.12-2.8.0 kafka-2.8.0

#修改kafka根目录下 config/server.properties 配置文件
#新增如下配置
port=9092
#内网(表示本机ip)
host.name=10.206.0.17   
#外网(云服务器ip,如果有需要配置,没有这项去掉)
advertised.host.name=119.45.174.249

#启动kafka(-daemon表示后台启动,在bin上一层目录启动)
./bin/kafka-server-start.sh -daemon config/server.properties
#关闭kafka
./bin/kafka-server-stop.sh
#创建topic
./bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic test --zookeeper localhost:2181/kafka
#创建生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

#如想外界连接需要关闭防火墙
#查看防火墙状态
systemctl status firewalld.service
#关闭防火墙
systemctl stop firewalld.service
#启动防火墙
systemctl start firewalld.service
#防火墙自动重启
systemctl enable firewalld.service
四、配置安装Kerberos1.安装Kerberos相关软件
yum install krb5-server krb5-libs krb5-workstation -y
2、配置kdc.conf
vim /var/kerberos/krb5kdc/kdc.conf
 
 
内容如下:
 
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
 
[realms]
HADOOP.COM = {
#master_key_type = aes256-cts
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
max_renewable_life = 7d
supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
 }
HADOOP.COM:是设定的realms。名字随意。Kerberos可以支持多个realms,一般全用大写masterkeytype,supported_enctypes默认使用aes256-cts。由于,JAVA使用aes256-######cts验证方式需要安装额外的jar包,这里暂不使用acl_file:标注了admin的用户权限。admin_keytab:KDC进行校验的keytabsupported_enctypes:支持的校验方式。注意把aes256-cts去掉3、配置krb5.conf
vim /etc/krb5.conf
 
内容如下:
 
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
 
[logging]
 default = FILE:/var/log/krb5libs.log
 kdc = FILE:/var/log/krb5kdc.log
 admin_server = FILE:/var/log/kadmind.log
 
[libdefaults]
 default_realm = HADOOP.COM
 dns_lookup_realm = false
 dns_lookup_kdc = false
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true
 clockskew = 120
 udp_preference_limit = 1
[realms]
 HADOOP.COM = {
  kdc = es1    //改为服务器主机名或ip地址
  admin_server = es1   //改为服务器主机名或ip地址
 }
 
[domain_realm]
 .hadoop.com = HADOOP.COM
 hadoop.com = HADOOP.COM
4、初始化kerberos database
kdb5_util create -s -r HADOOP.COM
5、修改database administrator的ACL权限
vim /var/kerberos/krb5kdc/kadm5.acl
 
#修改如下
*/admin@HADOOP.COM  
6、启动kerberos daemons
#启动
systemctl start kadmin krb5kdc
#设置开机自动启动
systemctl enable kadmin krb5kdc
7、kerberos简单操作
#1.首先以超管身份进入kadmin
kadmin.local
#2.查看用户
listprincs
#新增用户kafka/es1
addprinc kafka/es1
#退出kadmin.local
exit
8、Kafka集成Kerberos1.生成用户keytab(该文件后续kafka配置和客户端认证需用到)
kadmin.local -q "xst  -k /var/kerberos/krb5kdc/kadm5.keytab  kafka/es1@HADOOP.COM"
2.在kafka安装目录下config创建kafka-jaas.conf
#内容如下
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/var/kerberos/krb5kdc/kadm5.keytab"
principal="kafka/es1@HADOOP.COM";
};
 
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
useTicketCache=true
keyTab="/var/kerberos/krb5kdc/kadm5.keytab"
principal="kafka/es1@HADOOP.COM";
};
3.在config/server.properties添加如下配置

advertised.listeners=SASL_PLAINTEXT://es1:9092 #对应主机名称
listeners=SASL_PLAINTEXT://es1:9092 #对应主机名称
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
4.在bin/kafka-run-class.sh脚本添加kafka jvm参数("<"里面的内容,注意 ">" 要去掉,如原参数里面内容和待添加相同,只保留一份即可)
#jvm performance options
if [ -z "$KAFKAJVMPERFORMANCEOPTS" ]; then KAFKAJVMPERFORMANCEOPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent < -Djava.awt.headless=true -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/xingtongping/kafka_2.12-2.2.0/config/kafka-jaas.conf >" fi
5.配置config/producer.properties,kafka生产者kerberos配置
security.protocol = SASL_PLAINTEXT
sasl.mechanism = GSSAPI
sasl.kerberos.service.name =kafka
6.配置config/consumer.properties,kafka消费者kerberos配置
security.protocol = SASL_PLAINTEXT
sasl.mechanism = GSSAPI
sasl.kerberos.service.name=kafka
五、linux测试命令1.启动kafka服务
./bin/kafka-server-start.sh config/server.properties
2.创建生产者
./bin/kafka-console-producer.sh --broker-list es1:9092 --topic test  --producer.config config/producer.properties
3.创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server es1:9092 --topic test  --consumer.config config/consumer.properties
六、本地环境java客户端消费测试代码

1.把krb5.conf和kafka-jaas.conf下载到本地,并且修改kafka-jaas.conf文件的keytab路径:改为本地keytab路径
image.png
image.png
2.使用域名记得修改hosts文件,添加内容:172.16.110.173 es1
3.添加如下代码测试

public class Consumer {
    public static void main(String[] args) throws IOException {
 
     System.setProperty("java.security.krb5.conf","D:\zookeeper\kerberos\bd\krb5.conf");              //认证代码
     System.setProperty("java.security.auth.login.config","D:\zookeeper\kerberos\bd\kafka-jaas.conf");//认证代码
     Properties props = new Properties();
      //集群地址,多个地址用","分隔
     props.put("bootstrap.servers", "es1:9092");//主机名称或Ip           
     props.put("sasl.kerberos.service.name", "kafka");     //认证代码
     props.put("sasl.mechanism", "GSSAPI");                //认证代码
     props.put("security.protocol", "SASL_PLAINTEXT");     //认证代码
     props.put("group.id", "1");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
     //创建消费者
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
     // 订阅topic,可以为多个用,隔开,此处订阅了"test"这个主题
     consumer.subscribe(Arrays.asList("test"));
     //持续监听
     while(true){
        //poll频率
        ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
        for(ConsumerRecord<String,String> consumerRecord : consumerRecords){
            System.out.println("在test中读到:" + consumerRecord.value());
        }
    }
 }
}
特别申明:本文内容来源网络,版权归原作者所有,如有侵权请立即与我们联系(cy198701067573@163.com),我们将及时处理。

Tags 标签

javalinuxkafka

扩展阅读

加个好友,技术交流

1628738909466805.jpg