javascript
Springboot集成使用阿里云kafka详细步骤
轉載請注明出處:Springboot集成使用阿里云kafka詳細步驟
明確連接認證類型
首先要明確使用哪種連接認證類型
Ons模式參考
https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-java-demo/beta
Ons模式的conf內容
KafkaClient {com.aliyun.openservices.ons.sasl.client.OnsLoginModule requiredAccessKey="XXX"SecretKey="XXX"; };Plain模式參考
https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-java-demo/vpc-ssl
Plain模式的conf內容
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="xxxxxxxxxxxxxxxxxxxxx"password="xxxxxxxxxxxxxxxxxxxxx"; };分別在這兩個帖子中下載對應的jks證書和conf文件。
或者參考代碼的相應目錄下
注意,這兩個配置都不能打包到jar包中,否則容易無法識別和出問題,所以我們需要放在服務的明確路徑里。
例如/jar/kafka_client_jaas.conf和/jar/kafka.client.truststore.jks
集成
springboot版本為1.5.2。
引入kafka-client的jar包
在項目的pom文件中添加kafka-clients并且排除spring-kafka中的kafka-clients。
因為spring-kafka目前最新版本為2.1.2,其依賴的kafka-clients是1.0.x,但Kafka 服務端版本是 0.10,Client 版本建議 0.10,所以此處需排除依賴重新引入,否則一直報錯:disconnected
如下:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.0.0</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>新建KafkaAliyunConfiguration類
KafkaAliyunConfiguration.java
package com.biologic.util;import java.net.URL; import java.util.HashMap; import java.util.Map;import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.util.StringUtils;@Configuration @EnableKafka public class KafkaAliyunConfiguration {@Value("${kafka.broker.address}")private String brokerAddress;@Value("${kafka.sample.topic}")private String defaultTopic;@Value("${kafka.jks.location}")private String jksLocation;@Value("${kafka.sample.retrycount}")private String retrycount;public KafkaAliyunConfiguration() {//如果用-D 或者其它方式設置過,這里不再設置if (null == System.getProperty("java.security.auth.login.config")) {//請注意將 XXX 修改為自己的路徑//這個路徑必須是一個文件系統可讀的路徑,不能被打包到 jar 中System.setProperty("java.security.auth.login.config", "/jar/kafka_client_jaas.conf");}System.out.println("環境變量中已有config文件,kafka配置為:"+System.getProperty("java.security.auth.login.config"));}public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<String, Object>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);if (StringUtils.isEmpty(jksLocation)) {props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaAliyunConfiguration.class.getClassLoader().getResource("kafka.client.truststore.jks").getPath());} else {props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksLocation);}props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");props.put(ProducerConfig.RETRIES_CONFIG, retrycount);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<String, String>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());kafkaTemplate.setDefaultTopic(defaultTopic);return kafkaTemplate;} }此處定義了四個變量,通過配置文件注入:
brokerAddress kafka服務器地址
defaultTopic kafka默認topic
jksLocation JKS文件地址(開發環境無需定義,直接讀取resources下的jks,但生產環境需讀取jar包外部的jks文件,所以此處需配置路徑)
retrycount 重試次數
配置文件properties中增加相應變量
在application-beta.properties中增加對應配置如下:
kafka.broker.address=39.76.22.123:9093,39.175.15.234:9093,39.126.188.165:9093kafka.sample.retrycount=100kafka.sample.topic=save_samplekafka.jks.location=/jar/kafka.client.truststore.jks新建KafkaService發送消息
KafkaService.java
package com.biologic.api.service;import org.springframework.stereotype.Service;@Service public interface KafkaService {void sendMessage(String topic, String data);void releaseKafkaMsg(String barcode, String chip);}KafkaServiceImpl.java
package com.biologic.api.service.impl;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;import com.biologic.api.service.KafkaService;import net.sf.json.JSONArray; import net.sf.json.JSONObject;@Service public class KafkaServiceImpl implements KafkaService {@Value("${kafka.sample.topic}")private String sampleTopic;private Logger LOG = LoggerFactory.getLogger(KafkaServiceImpl.class);// private final KafkaTemplate<Integer, String> kafkaTemplate;//// /**// * 注入KafkaTemplate// * @param kafkaTemplate kafka模版類// */// @Autowired// public KafkaServiceImpl(KafkaTemplate kafkaTemplate) {// this.kafkaTemplate = kafkaTemplate;// }@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String data) {LOG.info("kafka sendMessage start");ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);}@Overridepublic void onSuccess(SendResult<String, String> result) {LOG.info("kafka sendMessage success topic = {}, data = {}", topic, data);}});LOG.info("kafka sendMessage end");}public void releaseKafkaMsg(String barcode, String chip) {try {JSONArray data = new JSONArray();JSONObject kafka_sample_state = new JSONObject();kafka_sample_state.put("plate_id", chip);kafka_sample_state.put("barcode", barcode);kafka_sample_state.put("status", "release_report");data.add(kafka_sample_state);JSONObject sample_list = new JSONObject();sample_list.put("sample_list", data.toString());sendMessage(sampleTopic, sample_list.toString());} catch (Exception e) {e.printStackTrace();}}}外部注入路徑變量的方式
我們上面的代碼中是把conf文件的路徑寫死的,如果需要變動地址,可以使用以下方式
環境注入conf文件路徑
因為代碼中會默認獲取環境變量中的java.security.auth.login.config配置,所以只需要啟動時 賦值路徑即可。
-Djava.security.auth.login.config=你的配置絕對路徑完整啟動springboot的項目命令如下:
java -jar /jar/report-api-1.0.0-SNAPSHOT.jar --spring.profiles.active=beta -Djava.security.auth.login.config=/jar/kafka_client_jaas.conf變量注入conf文件路徑
注意 因為類的初始化在注入變量之前,所以conf的路徑不能用變量的方式注入,否則會報空指針錯誤。
如下用法會報錯
@Value("${kafka.conf.location}")private String confLocation;public KafkaAliyunConfiguration() {// 如果用-D 或者其它方式設置過,這里不再設置if (null == System.getProperty("java.security.auth.login.config")) {// 請注意將 XXX 修改為自己的路徑// 這個路徑必須是一個文件系統可讀的路徑,不能被打包到 jar 中System.setProperty("java.security.auth.login.config", confLocation);System.out.println("使用配置中的路徑,kafka配置為:" + System.getProperty("java.security.auth.login.config"));} else {System.out.println("環境變量中已有config文件,kafka配置為:" + System.getProperty("java.security.auth.login.config"));}}安全層面加固
因為直接conf文件中包含帳號密碼容易被其他人查看到,有一種方式是外部引入模版文件,使用環境變量中的帳號密碼修改conf文件。
模版文件如下:
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="_KAFKA_ALIYUN_USERNAME_"password="_KAFKA_ALIYUN_PASSWORD_"; };使用shell命令從s3中下載conf文件并修改conf文件如下:
initContainers:- name: pull-libimage: anigeo/awscli:latestcommand: ["/bin/sh","-c"] args: ['aws s3 cp s3://test-env/kafka.client.truststore.jks /jar/ ;aws s3 cp s3://test-env/kafka_client_jaas.conf /jar/ ;sed -i "s/_KAFKA_ALIYUN_USERNAME_/${KAFKA_SSL_USERNAME}/" /jar/kafka_client_jaas.conf;sed -i "s/_KAFKA_ALIYUN_PASSWORD_/${KAFKA_SSL_PASSWORLD}/" /jar/kafka_client_jaas.conf']env:- name: AWS_DEFAULT_REGIONvalue: cn-southwest-2- name: KAFKA_SSL_USERNAMEvalueFrom:secretKeyRef:name: aliyun-kafkakey: username- name: KAFKA_SSL_PASSWORLDvalueFrom:secretKeyRef:name: aliyun-kafkakey: passwordvolumeMounts:- name: workdirmountPath: /jar可能遇到的問題–org.apache.kafka.common.errors.UnsupportedSaslMechanismException: Client SASL mechanism ‘ONS’ not enabled in the server, enabled mechanisms are [PLAIN]
原因
代碼中使用的配置與conf中設置的安全機制不一致。
解決方式
PLAIN模式
代碼中
對應conf內容
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="xxxxxxxxxxxxxxxxxxxxx"password="xxxxxxxxxxxxxxxxxxxxx"; };ONS模式
代碼中
對應conf內容
KafkaClient {com.aliyun.openservices.ons.sasl.client.OnsLoginModule requiredAccessKey="XXX"SecretKey="XXX"; };可能遇到的問題–nested exception is java.lang.NullPointerException
使用代碼為
public KafkaAliyunConfiguration() {if (StringUtils.isEmpty(confLocation)) {URL authLocation = KafkaAliyunConfiguration.class.getClassLoader().getResource("kafka_client_jaas.conf");if (System.getProperty("java.security.auth.login.config") == null) {System.setProperty("java.security.auth.login.config", authLocation.toExternalForm());}System.out.println("kafka配置為:"+authLocation.toExternalForm());} else {System.out.println("kafka配置為:"+confLocation);System.setProperty("java.security.auth.login.config", confLocation);} }在進行KafkaAliyunConfiguration初始化時報錯空指針。
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'qualityServiceImpl': Unsatisfied dependency expressed through field 'kafkaService'; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaServiceImpl': Unsatisfied dependency expressed through field 'kafkaTemplate'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaAliyunConfiguration' defined in URL [jar:file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/biologic/util/KafkaAliyunConfiguration.class]: Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.biologic.util.KafkaAliyunConfiguration$$EnhancerBySpringCGLIB$$88e40778]: Constructor threw exception; nested exception is java.lang.NullPointerException原因 初始化KafkaAliyunConfiguration時,變量加載的順序問題導致無法識別到變量。
解決方式
方式一 初始化時不使用注入的變量
如下:
方式二 將bean方法設置成static靜態方法
參考 spring boot整合shiro引用配置文件配置是出現的問題
可能遇到的問題–Caused by: java.io.FileNotFoundException: file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/kafka.client.truststore.jks (No such file or directory)
原因
打入jar包的證書和conf文件無法讀取,或者沒有設置外部路徑導致默認讀取項目內的配置。
解決方式
通過外部明確的linux路徑進行配置。
可能遇到問題–Configuration Error:Line 3: expected [option key]
ssl.truststore.location = file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/kafka.client.truststore.jksServiceExceptionHandler.java[line:30] exception ERROR org.apache.kafka.common.KafkaException: Failed to construct kafka producerCaused by: java.io.IOException: Configuration Error:Line 3: expected [option key]原因–配置文件無法讀取或者參數格式錯誤。
解決方法
通過外部明確的linux路徑進行配置jks和conf文件, 并且注意conf中的參數格式–分號,冒號要與原文件一致。
轉載請注明出處:Springboot集成使用阿里云kafka詳細步驟
參考鏈接
https://help.aliyun.com/document_detail/99958.html?spm=a2c4g.11186623.6.563.7b3b1e3bEl5oex
https://yq.aliyun.com/articles/433740
總結
以上是生活随笔為你收集整理的Springboot集成使用阿里云kafka详细步骤的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何留住你的员工——员工流失分析
- 下一篇: Manjaro - Pacman命令详解