CDH Kerberos 认证下Kafka 消费方式
集群Kerberos認證安裝參考:https://datamining.blog.csdn.net/article/details/98480008
目錄
?
環(huán)境:
配置
Java Producer 代碼
文件內(nèi)容:
kafka_client_jaas.conf
krb5.conf? ?( kerberos 配置文件復制過來即可)
kafka.keytab
Java Consumer 代碼?
Linux 控制臺消費
Linux 控制臺發(fā)送數(shù)據(jù)數(shù)據(jù)
Linux 控制臺創(chuàng)建、刪除Topic
環(huán)境:
CDH 6.x
Kafka 1.0.1
?
? ? ? ? 加入kerberos認證的Kafka是無法直接用Api進行消費,需要進行安全認證。
配置
查看CDH中配置是否和下面一樣,不一樣則修改
Java Producer 代碼
這里只列出配置的代碼,其他的與普通producer相同
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.io.IOException; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;public class KafkaProducer {private static BlockingQueue<Future<RecordMetadata>> queue = new ArrayBlockingQueue<Future<RecordMetadata>>(8192*2);private static long lastCommitTime = 0;private static boolean flag = true;Producer<String, String> producer = null;public KafkaProducer() {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//這里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("max.request.size", 8000000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);}}resources目錄下文件
文件內(nèi)容:
kafka_client_jaas.conf
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka; };krb5.conf? ?( kerberos 配置文件復制過來即可)
# Configuration snippets may be placed in this directory as well includedir /etc/krb5.conf.d/[logging]default = FILE:/var/log/krb5libs.logkdc = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.log[libdefaults]dns_lookup_realm = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = truerdns = falsepkinit_anchors = /etc/pki/tls/certs/ca-bundle.crtdefault_realm = JAST.COM # default_ccache_name = KEYRING:persistent:%{uid}[realms]JAST.COM = {kdc = cs-1admin_server = cs-1 }[domain_realm].jast.com = JAST.COMjast.com = JAST.COM?
kafka.keytab
? ? ? ? kerberos生成的keytab文件
生成文件方式:
kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"具體可參考:
https://datamining.blog.csdn.net/article/details/98625330
Java Consumer 代碼?
與Producer基本一致,文件說明參考Producer代碼
import java.util.Arrays; import java.util.Date; import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords;public class KafkaConsumer {private static KafkaConsumer kafkaSink = null;org.apache.kafka.clients.consumer.KafkaConsumer consumer;private static int number;public KafkaConsumer(String topic,int count) {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//這里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("group.id", "y" );props.put("zookeeper.session.timeout.ms", "600000");props.put("zookeeper.sync.time.ms", "200000");props.put("auto.commit.interval.ms", "100000");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG, count+"");//設置最大消費數(shù)props.put("security.protocol", "SASL_PLAINTEXT");consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic)); //"collectionInfo"}}Linux 控制臺消費
生成kafka用戶keytab文件
kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"生成kafka_client_jaas.conf文件,位置隨意,內(nèi)容如下
# cat config/kafka_jaas.conf KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka; };添加環(huán)境變量引用jaas文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_client_jaas.conf"創(chuàng)建consumer.properties文件,內(nèi)容如下
security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka group.id=test11此時就可以消費了
/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.200:9092 --topic test --from-beginning --consumer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/consumer.properties成功消費數(shù)據(jù)
Linux 控制臺發(fā)送數(shù)據(jù)數(shù)據(jù)
創(chuàng)建producer.properties文件,內(nèi)容如下
# cat producer.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka發(fā)送數(shù)據(jù)
/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-producer.sh --broker-list 192.168.0.200:9092 --topic test --producer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/producer.propertiesProducer
查看Consumer?消費成功
Linux 控制臺創(chuàng)建、刪除Topic
在linux 系統(tǒng)配置上面設置的jaas環(huán)境變量后即可
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_jaas.conf"?
總結(jié)
以上是生活随笔為你收集整理的CDH Kerberos 认证下Kafka 消费方式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: NameNode 启动失败 - Th
- 下一篇: 扩展框架分析