文章目录
- 1、整合
- 2、消息的生产
- 3、消费
- 4、补充:安装
Kafka主体不是用来做消息中间件的,但也有这个功能,接下来整合Kafka
1、整合
导入依赖坐标:
<dependency>    
	<groupId>org.springframework.kafka</groupId>    
	<artifactId>spring-kafka</artifactId>
</dependency>
添加相关配置:
spring:  
  kafka:    
    bootstrap-servers: localhost:9092    
    consumer:      
      group-id: order # 不可省略,否则监听时报错No group.id found in consumer config
在需要的地方注入操作对象:
@Autowired    
private KafkaTemplate<String ,String> kafkaTemplate;
//key和value的泛型自适应
2、消息的生产
继续在Service层演示消息的发送,send方法,传入topic和message
@Service
@Slf4j
public class MessageServiceKafkaImpl implements MessageService {    
	@Autowired    
	private KafkaTemplate<String ,String> kafkaTemplate;   
	@Override    
	public void sendMessage(String id) {  
	      	
		log.info("使用Kafka将待发送短信的订单纳入处理队列,id:"+id);       
		
		kafkaTemplate.send("kafka_topic",id);  
		  
	}
}
3、消费
直接使用监听器监听队列,不演示手动自己拿。使用@KafkaListener注解,其有一属性topic,就是send时的那个topic。需要注意的是,监听时,message依旧在形参,但类型是ConsumerRecord
@Component
@Slf4j
public class KafkaMessageListener{    
	@KafkaListener(topics = {"kafka_topic"})
	public void onMessage(ConsumerRecord<?, ?> record) {        
		
		log.info("已完成短信发送业务,id:"+record.value());    
	}
}
打印下ConsumerRecord对象看看:

注意监听时记得配置spring.kafka.consumer.group-id:

4、补充:安装
生产环境一般都用docker安装在容器里或者Linux上,这里备份下Windows安装,因为本地开发调式还得用。
- 下载
下载地址:https://kafka.apache.org/downloads
windows 系统下3.0.0版本存在BUG,建议使用2.X版本

-  安装:解压缩即安装 
-  启动zookeeper(注册中心),注意这里双重目录,用第二个kafka的bin /windows下的bat文件
# 在windows/zookeeper-server-start.bat所在目录cmd,再执行以下指令
# 默认端口:2181,zookeeper.properties中自行修改
zookeeper-server-start.bat ..\..\config\zookeeper.properties
# 报错输入行太长时,移动文件夹位置
# 或者将文件夹的版本号重命名去掉

- 启动Kafka
# 在windows/kafka-server-start.bat所在目录cmd,再执行以下指令
# 用第二个kafka的bin/windows下的bat文件
# 默认端口:9092,server.properties中自行修改
kafka-server-start.bat ..\..\config\server.properties
到此,安装启动成功。可测试下功能,进入对应的bat文件目录执行指令:
- 创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test123  # 分区等参数在kafka学
- 查看topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
- 删除topic
kafka-topics.bat --delete --zookeeper localhost:2181 --topic test
- 生产者功能测试
kafka-console-producer.bat --broker-list localhost:9092 --topic test123
- 消费者功能测试
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test123 --from-beginning



















