kafka笔记3(生产者)
?
創建Kafka生產者:
Kafka生產者有3個必選屬性:
bootstrap.servers ? broker地址清單,格式為host:port ? ,清單中不必包含所有broker,但至少2個
key.serializer? = org.apache.kafka.common.serialization.Serializer 接口類,生產者使用這個類把鍵對象序列化為字節數組
Kafka還提供了ByteArraySerializer,StringSerializer,IntegerSerializer 實現類
value.serializer? 與key.serializer可以將值序列化
發送消息有3種方式:
發送并忘記(fire-and-forget)? 發送消息給服務器,但是不關心是否正常到達
同步發送? 使用send()發送信息,返回一個future對象,調用get()方法進行等待,了解信息是否正常發送
異步發送 ?? 調用send()函數,并指定一個回調函數,服務器在返回響應時調用該函數
?
生產者的可選屬性:
acks 指定有多少個分區副本收到消息,生產者才會認為消息寫入是成功的
ack=0? 生產者在成功寫入消息之前不會等待任何來自服務器的響應
ack=1 只要集群的首領節點收到信息,生產者就會收到一個來自服務器的成功響應
ack=all 所有參數與復制的節點全部收到消息,生產者才會收到一個來自服務器的響應,這種模式最安全
buffer.memory? 設置生產者內存緩沖區大小,生產者使用它緩沖要發送到服務器的消息
compression.type? 默認消息發送不使用壓縮,該參數可以設置為snappy,gzip,lz4
retries 生產者可以重發消息的次數,默認 每次重試之間等待100ms,可以使用參數 retry.backoff.ms參數改變這個時間間隔
batch.size 指定一個批次可以使用的內存大小,按照字節數計算
linger.ms 指定生產者在發送批次之前等待更多消息加入批次的時間。Kafkaproducer 會在批次填滿或linger.ms達到上限時把批次發送出去
client.id 該參數可以指定任意的字符串,服務器會用它識別信息的來源,還可以用在日志和配額指標里
max.in.flight.requests.per.connection? 指定生產者在收到服務器響應之前可以發送多少個消息,值越高越占用內存;設為1可以保證消息是按照發送順序寫入服務器的
timeout.ms ? reuqest.timeout.ms ? metadata.fetch.timeout.ms?
request.timeout.ms 指定生產者在發送數據時等待服務器返回響應的時間
metadata.timeout.ms 指定生產者在獲取元數據時等待服務器返回響應時間
tiemout.ms指定broker等待同步副本返回消息確認的時間,與acks的配置相匹配
max.block.ms? 指定調用send()或partitionsFor()方法獲取元數據時生產者的阻塞時間,當阻塞時間到達max.block.ms時,生產者會拋出異常
max.request.size 控制生產者發送的請求大小,指能發送的單個消息的最大值或單個請求里所有消息的總和
broker對可接收的消息最大值也有自己的限制(message.max.bytes),兩邊配置最好匹配,避免生產者發送消息被拒絕
receive.buffer.bytes 和 send.buffer.bytes
這2個參數分別指定了TCP socket接收和發送數據包緩沖區大小,默認-1
max.in.flight.requests.per.connection=1 保證了消息的順序,如果大于1,第一批次寫入失敗后,重試成功可能會改變消息的順序
?
序列化器:
自定義序列化器
Avro序列化
Kafka使用Avro序列器是通過schema注冊表來實現的,schema注冊表不屬于Kafka
?
轉載于:https://www.cnblogs.com/zy1234567/p/10339868.html
總結
以上是生活随笔為你收集整理的kafka笔记3(生产者)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 无服务器计算的黑暗面:程序移植没那么容易
- 下一篇: 8、SpringBoot-CRUD默认访