背景:读取TXT文件,加载到kafka中,然后通过logstash消费kafka中的数据加载到es中  
 第一步:导入相应的依赖包  
pip install kafka- python   
pip install loguru
pip install msgpack
  
 第二步:编写连接kafka的代码  
# - * -  coding:  utf- 8  - * - 
import  json
import  json
import  msgpack
from  loguru import  logger
from  kafka import  KafkaProducer
from  kafka. errors import  KafkaError
def kfk_produce_1 ( ) : 
    "" "
        发送 json 格式数据
    : return : 
    "" "
    producer =  KafkaProducer ( 
    
        bootstrap_servers= '192.168.85.109:9092' , 
        value_serializer= lambda v:  json. dumps ( v) . encode ( 'utf-8' ) 
    ) 
    
    producer. send ( 'python_test_topic' ,  { 'key' :  'value' } ) 
kfk_produce_1 ( ) 
  
 第三步:验证是否在kafka中创建topic  
 kafka的消费者界面上已经出现了创建的topic,并且数据也接收到了  
 注意:下面的消费者界面的按钮,要先运行起来,选择好kafka环境和topic,group以后,点击那个绿色的运行按钮,就能实时看到发送过来的消息了,😄  
 
 问题记录:  
 
 然后在使用时,报错提示:ImportError: cannot import name ‘KafkaConsumer’  
 找了一会儿最后发现自己创建的文件名叫做:kafka.py,突然意识到问题出在哪里了。  
 原因:  
 简单说就是因为,创建的文件名是kafka.py,这会导致代码运行时,python解释器查找kafka的模块时,就找到自身kafka.py了,所以就报错。  
 以后写代码的时候,还是要注意,切记不要用关键字去命名文件,避免不必要的麻烦。  
 有问题,欢迎大家留言交流沟通