阿里云kafka使用记录(python版本)
生活随笔
收集整理的這篇文章主要介紹了
阿里云kafka使用记录(python版本)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
kafka端 consumer vpc版代碼 import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError# context.check_hostname = True
consumer = KafkaConsumer(bootstrap_servers=['192.168.xx.xx:9092'],group_id='xx',api_version = (0,10))print('consumer start to consuming...')
consumer.subscribe(('xx',))
for message in consumer:print(message.topic)print(message.offset)print(message.key)print(message.value)print(message.partition)
?
producer vpc版代碼
#!/usr/bin/env python # encoding: utf-8import socket from kafka import KafkaProducer from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092'],api_version = (0,10),retries=5)partitions = producer.partitions_for('xx') print('Topic下分區: %s' % partitions)try:future = producer.send(topic='xx', value=b'hello aliyun-kafka!')future.get()print('send message succeed.') except KafkaError as e:print('send message failed.')print(e)consumer公網版代碼
import ssl import socket from kafka import KafkaConsumer from kafka.errors import KafkaErrorcontext = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED # context.check_hostname = True context.load_verify_locations("/tmp/ca-cert")consumer = KafkaConsumer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],group_id='xxx',sasl_mechanism="PLAIN",ssl_context=context,security_protocol='SASL_SSL',api_version = (0,10),sasl_plain_username='xxx',sasl_plain_password='1234567890')print('consumer start to consuming...') consumer.subscribe(('xxx', )) for message in consumer:print(message.topic)print(message.offset)print(message.value)break?
producer 公網版代碼 #!/usr/bin/env python # encoding: utf-8import ssl import socket from kafka import KafkaProducer from kafka.errors import KafkaErrorcontext = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED # context.check_hostname = True context.load_verify_locations("/tmp/ca-cert") #這個文件參考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo producer = KafkaProducer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],sasl_mechanism="PLAIN",ssl_context=context,security_protocol='SASL_SSL',api_version = (0,10),retries=5,sasl_plain_username='xx',sasl_plain_password='1234567890'#注意是access-key的最后十位) partitions = producer.partitions_for('xxx') print ('Topic下分區: %s' % partitions)try:future = producer.send('xxx', b'hello aliyun-kafka!')future.get()print('send message succeed.') except KafkaError as e:print('send message failed.')print(e)?
?
?
?
從阿里云控臺獲得連接信息
?
?
轉載于:https://www.cnblogs.com/castlevania/p/10370803.html
總結
以上是生活随笔為你收集整理的阿里云kafka使用记录(python版本)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信小程序--家庭记账本开发--01
- 下一篇: 随机生成10元素数组并找出最大元素(Ja