欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Kafka】使用 kafka-python发送消息时,出现 type(value_bytes) in (bytes, bytearray, memoryview, type(None))

最编程 2024-03-11 07:20:19
...
【开源中国 APP 全新上线】“动弹” 回归、集成大模型对话、畅读技术报告

前提:使用Python3.8版本
1、首先安装kafka库【pip install kafka】
2、kafka库安装成功后,连接kafka库时,报以下错误:
C:\Users\USER\PycharmProjects\test\venv\Scripts\python.exe C:/Users/USER/PycharmProjects/test/test/tests.pyTraceback (most recent call last):File "C:/Users/USER/PycharmProjects/test/test/tests.py", line 8, in from kafka import KafkaProducerFile "C:\Users\USER\PycharmProjects\test\venv\lib\site-packages\kafka__init__.py", line 23, in from kafka.producer import KafkaProducerFile "C:\Users\USER\PycharmProjects\test\venv\lib\site-packages\kafka\producer__init__.py", line 4, in from .simple import SimpleProducerFile "C:\Users\USER\PycharmProjects\test\venv\lib\site-packages\kafka\producer\simple.py", line 54return '' % self.async^SyntaxError: invalid syntax
代码如下:

import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='****')

msg_dict = {
    "operatorId":"test",#公交公司ID
    "terminalId":"123",#设备Id
    "terminalCode":"123",#设备编码(使用车辆ID)
    "terminalNo":"1",#同一车辆内terminal序号从1开始
}
msg = json.dumps(msg_dict)
producer.send('tqs-admin-event-1', msg)
producer.close()
print("结束")

3、报错原因:3.8版本中,async已经变成了关键字,所以导致不兼容
4、解决方案:执行 pip install kafka-python,就可以解决
在这里插入图片描述
5、以上就可以正常连接kafka,但是使用KafkaProducer的send函数时,会报assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))【以下这个错】
在这里插入图片描述
错误原因:send函数的value_bytes是str类型
解决方案:

import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='192.168.2.230:9092')

msg_dict = {
    "operatorId":"test",#公交公司ID
    "terminalId":"123",#设备Id
    "terminalCode":"123",#设备编码(使用车辆ID)
    "terminalNo":"1",#同一车辆内terminal序号从1开始
}
msg = json.dumps(msg_dict).encode() #这里加了encode 进行了编码
producer.send('tqs-admin-event-1', msg)
producer.close()
print("结束")