要求:
输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。
命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。
服务端程序:
- 从kafka消费者接收扫描任务信息
- 通过调用masscan启动探测任务,获取进度和结果信息,进度写入Redis,结果信息写入Kafka。
- 要求对启动任务、kafka、整理流程进行封装。
- 要求启动2个server端,通过命令行程序下发2个不同网段,可以均匀的分配到2个server上面执行完成。
测试要求:
- 启动两个server端程序。
- 通过命令行程序下发两个任务,IP不一样。
- 看server端程序日志,是否均匀的扫描了两个任务。
前置准备:
安装docker
思路:
1. 系统架构设计
采用生产者-消费者模式:
- 命令行客户端作为生产者,将扫描任务发布到Kafka
- 两个服务端实例作为消费者,从Kafka获取任务并执行
2. 关键组件设计
-
任务表示:
- 使用JSON格式表示扫描任务,包含:
- IP范围(单个IP或CIDR格式)
- 端口范围
- 扫描带宽限制
- 任务状态
- 进度信息
- 使用JSON格式表示扫描任务,包含:
-
Kafka设计:
- 创建一个主题(如
scan-tasks
) - 使用单个分区确保任务顺序性(或根据需求设计分区策略)
- 考虑使用消费者组实现两个服务端的负载均衡
- 创建一个主题(如
-
Redis设计:
- 存储任务进度信息
- 使用Hash结构存储每个任务的进度百分比
- 设置适当的TTL防止数据无限增长
-
服务端负载均衡:
- 两个服务端加入同一个Kafka消费者组
- Kafka会自动将任务均匀分配给两个消费者
3. 执行流程
-
客户端流程:
- 解析命令行参数(IP范围、端口、带宽)
- 验证输入格式
- 创建Kafka生产者
- 将任务发布到Kafka主题
-
服务端流程:
- 初始化Kafka消费者(加入消费者组)
- 初始化Redis连接
- 循环消费任务:
a. 从Kafka获取任务
b. 更新Redis中任务状态为"running"
c. 调用masscan执行扫描:- 构造masscan命令行参数
- 启动masscan进程
- 监控进程输出和退出状态
d. 实时解析masscan输出,更新Redis中的进度
e. 扫描完成后: - 更新Redis中任务状态为"completed"
- 将完整结果发布到另一个Kafka主题(如
scan-result
)
4. 关键技术点
-
Masscan集成:
- 使用
exec.Command
启动masscan进程 - 实时解析masscan的标准输出和错误输出
- 根据输出计算扫描进度
- 使用
-
错误处理:
- 处理无效IP格式
- 处理masscan执行失败
- 处理Kafka/Redis连接问题
-
日志记录:
- 记录服务端操作日志
- 记录任务执行状态变化
- 记录错误信息
5. 测试验证思路
- 启动两个服务端实例
- 使用客户端提交两个不同网段的任务
- 观察:
- 两个服务端的日志输出
- 任务是否被均匀分配(一个服务端处理一个任务)
- 扫描进度是否正确更新
- 最终结果是否正确输出
6. 扩展考虑
-
任务优先级:
- 可以在任务中添加优先级字段
- 服务端根据优先级处理任务
-
任务超时:
- 添加任务超时机制
- 超时后重新分配任务
-
结果存储:
- 可以考虑将结果存入数据库而不仅是Kafka
-
水平扩展:
- 设计支持更多服务端实例的扩展方案
这个设计实现了基本的分布式扫描任务调度系统,核心是利用Kafka的消息队列特性实现任务分发,通过消费者组机制实现负载均衡,使用Redis作为共享状态存储。
实现:
项目结构:

kafka:
consumer
package kafka
import (
"context"
"errors"
"fmt"
"github.com/IBM/sarama"
"log"
"sync"
)
type MessageHandler func([]byte) error
type SaramaConsumer struct {
client sarama.ConsumerGroup
handlers map[string]MessageHandler
ready chan bool
ctx context.Context
cancel context.CancelFunc
consuming sync.WaitGroup
memberId string
groupId string
}
func NewKafkaConsumer(brokers []string, groupId string, topic []string) (*SaramaConsumer, error) {
config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0 // 使用适当的 Kafka 版本
config.Consumer.Offsets.Initial = sarama.OffsetOldest //
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
client, err := sarama.NewConsumerGroup(brokers, groupId, config)
if err != nil {
return nil, fmt.Errorf("failed to create consumer: %v", err)
}
ctx, cancelFunc := context.WithCancel(context.Background())
return &SaramaConsumer{
client: client,
handlers: make(map[string]MessageHandler),
ready: make(chan bool),
ctx: ctx,
cancel: cancelFunc,
groupId: groupId,
}, nil
}
func (sc *SaramaConsumer) RegisterHandler(topic string, handler MessageHandler) {
sc.handlers[topic] = handler
}
func (sc *SaramaConsumer) StartConsuming(topics []string) {
sc.consuming.Add(1)
go func() {
defer sc.consuming.Done()
for {
if err := sc.client.Consume(sc.ctx, topics, sc); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return // 正常关闭
}
log.Printf("Error from consumer: %v", err)
}
if sc.ctx.Err() != nil {
return
}
sc.ready = make(chan bool)
}
}()
<-sc.ready
log.Println("Sarama consumer up and running!...")
}
func (sc *SaramaConsumer) Setup(session sarama.ConsumerGroupSession) error {
sc.memberId = session.MemberID()
claims := session.Claims()
log.Printf("Rebalance: Consumer [%s] SETUP - Assigned partitions: %v",
sc.memberId, claims)
close(sc.ready)
return nil
}
func (sc *SaramaConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
claims := session.Claims()
log.Printf("Rebalance: Consumer [%s] CLEANUP - Releasing partitions: %v",
sc.memberId, claims)
return nil
}
func (sc *SaramaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
log.Printf("Rebalance: Consumer [%s] STARTED consuming partition %d (offset %d)",
sc.memberId, claim.Partition(), claim.InitialOffset())
defer log.Printf("Rebalance: Consumer [%s] STOPPED consuming partition %d",
sc.memberId, claim.Partition())
for message := range claim.Messages() {
if handler, ok := sc.handlers[message.Topic]; ok {
if err := handler(message.Value); err != nil {
log.Printf("Error from consumer: %v", err)
}
session.MarkMessage(message, "")
} else {
log.Printf("Error from consumer: %v", message.Topic)
}
}
return nil
}
func (sc *SaramaConsumer) Close() error {
sc.cancel()
sc.consuming.Wait()
return sc.client.Close()
}
producer
package kafka
import (
"encoding/json"
"fmt"
"github.com/IBM/sarama"
"log"
)
type SaramaProducer struct {
producer sarama.SyncProducer
}
func NewSaramaProducer(brokers []string) (*SaramaProducer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
config.Producer.Retry.Max = 5 // 重试次数
config.Producer.Return.Successes = true // 需要成功交付的返回
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner //轮询策略
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, fmt.Errorf("failed to create Sarama producer: %v", err)
}
return &SaramaProducer{producer: producer}, nil
}
func (sp *SaramaProducer) SendMessage(topic string, value interface{}) error {
jsonValue, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("fail to marshal value: %v", err)
}
msg := sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(jsonValue),
}
partition, offset, err := sp.producer.SendMessage(&msg)
if err != nil {
return fmt.Errorf("fail to send message: %v", err)
}
log.Printf("Message sent to partition %d at offset %d\n", partition, offset)
return nil
}
func (sp *SaramaProducer) Close() error {
return sp.producer.Close()
}
main
server
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"week/v3/kafka"
"week/v3/progress"
"week/v3/scanner"
"week/v3/task"
)
type Server struct {
consumer *kafka.SaramaConsumer
producer *kafka.SaramaProducer
progressTracker *progress.ProgressTracker
scanner *scanner.MasscanExecutor
}
func NewServer(brokers []string, groupId string, topic []string, redisAddr string) (*Server, error) {
consumer, err := kafka.NewKafkaConsumer(brokers, groupId, topic)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka consumer: %w", err)
}
producer, err := kafka.NewSaramaProducer(brokers)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka producer: %w", err)
}
tracker := progress.NewProgressTracker(redisAddr)
return &Server{
consumer: consumer,
producer: producer,
progressTracker: tracker,
scanner: scanner.NewMasscanExecutor(),
}, nil
}
func (s *Server) Start(ctx context.Context) {
s.consumer.RegisterHandler("scan-tasks", func(msg []byte) error {
var t task.Task
if err := json.Unmarshal(msg, &t); err != nil {
return fmt.Errorf("failed to unmarshal task: %v", err)
}
log.Printf(" Received task: %+v\n", t)
// 更新任务开始状态
if err := s.progressTracker.UpdateProgress(ctx, t.TaskId, 0); err != nil {
return fmt.Errorf("failed to update progress: %v", err)
}
// 执行 masscan 扫描
resultChan, errChan := s.scanner.Execute(ctx, t.IPRange, t.Ports, t.BandWidth)
log.Println("Masscan Execute called")
select {
case results := <-resultChan:
log.Printf(" Scan complete. %d results.\n", len(results))
for _, result := range results {
scanResult := struct {
TaskID string `json:"task_id"`
scanner.ScanResult
Timestamp int64 `json:"timestamp"`
}{
TaskID: t.TaskId,
ScanResult: result,
Timestamp: time.Now().Unix(),
}
// 发到 scan-result topic
jsonResult, err := json.Marshal(scanResult)
if err != nil {
return fmt.Errorf("failed to marshal scan result: %v", err)
}
if err := s.producer.SendMessage("scan-result", jsonResult); err != nil {
log.Printf(" Failed to send scan result to Kafka: %v", err)
} else {
log.Printf(" Sent scan result: %+v", scanResult)
}
}
// 更新任务完成状态
if err := s.progressTracker.UpdateProgress(ctx, t.TaskId, 100); err != nil {
return fmt.Errorf("failed to update progress: %v", err)
}
case err := <-errChan:
log.Printf(" Scan error: %v\n", err)
// 更新任务失败状态
if err := s.progressTracker.UpdateProgress(ctx, t.TaskId, 0); err != nil {
return fmt.Errorf("failed to update progress: %v", err)
}
return fmt.Errorf("processing task failed: %v", err)
}
return nil
})
// 启动 Kafka 消费
go s.consumer.StartConsuming([]string{"scan-tasks"})
log.Println(" Server is running and waiting for tasks...")
// 等待退出信号
<-ctx.Done()
log.Println(" Shutting down...")
if err := s.consumer.Close(); err != nil {
log.Printf("Error closing consumer: %v", err)
}
if err := s.producer.Close(); err != nil {
log.Printf("Error closing producer: %v", err)
}
if err := s.progressTracker.Close(); err != nil {
log.Printf("Error closing progress tracker: %v", err)
}
log.Println(" Server shutdown complete")
}
func main() {
if len(os.Args) < 2 {
log.Fatal("Usage: server <consumer-group-id>")
}
groupID := os.Args[1]
brokers := []string{"localhost:9092"}
redisAddr := "localhost:6379"
server, err := NewServer(brokers, groupID, []string{"scan-tasks"}, redisAddr)
if err != nil {
log.Fatalf("Failed to create server: %v", err)
}
// 监听中断信号
ctx, cancel := context.WithCancel(context.Background())
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()
server.Start(ctx)
}
client
package main
import (
"flag"
"fmt"
"log"
"strings"
"week/v3/task"
)
func main() {
ipRange := flag.String("ip", "", "ip地址")
ports := flag.String("ports", "", "端口范围,如 80,443")
bandwidth := flag.String("bandwidth", "1000", "扫描带宽")
kafkaBroker := flag.String("kafka", "localhost:9092", "kafka broker 地址")
topic := flag.String("topic", "scan-tasks", "kafka topic")
flag.Parse()
if *ipRange == "" || *ports == "" {
fmt.Println("必须指定 ip 和 ports 参数")
flag.PrintDefaults()
return
}
brokers := strings.Split(*kafkaBroker, ",")
taskManager, err := task.NewTaskManager(*topic, brokers)
if err != nil {
log.Fatalf("Failed to create task manager: %v", err)
}
defer taskManager.Close()
taskID, err := taskManager.SubmitTask(*ipRange, *ports, *bandwidth)
if err != nil {
log.Fatalf("Failed to submit task: %v", err)
}
fmt.Printf(" Task submitted successfully. Task ID: %s\n", taskID)
}
progress
package progress
import (
"context"
"encoding/json"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
type Progress struct {
TaskID string `json:"task_id"`
Progress float64 `json:"progress"`
Status string `json:"status"`
Timestamp int64 `json:"timestamp"`
}
type ProgressTracker struct {
redisClient *redis.Client
}
func NewProgressTracker(redisAddr string) *ProgressTracker {
return &ProgressTracker{
redisClient: redis.NewClient(&redis.Options{
Addr: redisAddr,
}),
}
}
func (pt *ProgressTracker) UpdateProgress(ctx context.Context, taskID string, progress float64) error {
p := Progress{
Progress: progress,
}
jsonData, err := json.Marshal(p)
if err != nil {
return fmt.Errorf("failed to marshal progress: %v", err)
}
err = pt.redisClient.Set(ctx, taskID, string(jsonData), time.Hour).Err()
if err != nil {
return fmt.Errorf("failed to update progress tracker: %v", err)
}
return nil
}
func (pt *ProgressTracker) Close() error {
return pt.redisClient.Close()
}
scanner
package scanner
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"os/exec"
)
type ScanResult struct {
IP string `json:"ip"`
Port string `json:"port"`
Status string `json:"status"`
}
type MasscanExecutor struct{}
func NewMasscanExecutor() *MasscanExecutor {
return &MasscanExecutor{}
}
func (me *MasscanExecutor) Execute(ctx context.Context, ipRange, ports, bandWidth string) (<-chan []ScanResult, <-chan error) {
resultChan := make(chan []ScanResult)
errorChan := make(chan error)
go func() {
defer close(resultChan)
defer close(errorChan)
args := []string{
"-p", ports,
"--rate", bandWidth,
"-oJ", "-",
ipRange,
}
cmd := exec.CommandContext(ctx, "masscan", args...)
output, err := cmd.CombinedOutput()
if err != nil {
errorChan <- fmt.Errorf("masscan command error: %v, output: %s", err, string(output))
return
}
// 打印输出用于调试
fmt.Printf("masscan output:\n%s\n", string(output))
scanner := bufio.NewScanner(bytes.NewReader(output))
var results []ScanResult
for scanner.Scan() {
line := scanner.Bytes()
// 过滤非JSON行
if len(line) == 0 || (line[0] != '{' && line[0] != '[') {
continue
}
var r struct {
IP string `json:"ip"`
Ports []struct {
Port int `json:"port"`
Status string `json:"status"`
} `json:"ports"`
}
if err := json.Unmarshal(line, &r); err != nil {
errorChan <- fmt.Errorf("failed to unmarshal line: %v", err)
return
}
for _, p := range r.Ports {
results = append(results, ScanResult{
IP: r.IP,
Port: fmt.Sprintf("%d", p.Port),
Status: p.Status,
})
}
}
if err := scanner.Err(); err != nil {
errorChan <- fmt.Errorf("scanner error: %v", err)
return
}
resultChan <- results
}()
return resultChan, errorChan
}
task
package task
import (
"encoding/json"
"fmt"
"github.com/IBM/sarama"
"github.com/google/uuid"
"strings"
)
type Task struct {
TaskId string `json:"task_id"`
IPRange string `json:"ip_range"`
Ports string `json:"ports"`
BandWidth string `json:"band_width"`
}
type TaskManager struct {
producer sarama.SyncProducer
topic string
}
func NewTaskManager(topic string, brokers []string) (*TaskManager, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &TaskManager{producer: producer, topic: topic}, nil
}
func (tm *TaskManager) SubmitTask(ipRange, ports, bandWidth string) ([]string, error) {
ipList := strings.Split(ipRange, ",")
var taskIDs []string
for _, ip := range ipList {
task := &Task{
TaskId: uuid.New().String(),
IPRange: ip, // 这里只发送一个 IP
Ports: ports,
BandWidth: bandWidth,
}
jsonTask, err := json.Marshal(task)
if err != nil {
return nil, fmt.Errorf("failed to marshal task: %v", err)
}
msg := sarama.ProducerMessage{
Topic: tm.topic,
Value: sarama.ByteEncoder(jsonTask),
}
_, _, err = tm.producer.SendMessage(&msg)
if err != nil {
return nil, fmt.Errorf("failed to send message: %v", err)
}
taskIDs = append(taskIDs, task.TaskId)
}
return taskIDs, nil
}
func (tm *TaskManager) Close() error {
return tm.producer.Close()
}
docker-compose
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
TZ: Asia/Shanghai
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
TZ: Asia/Shanghai
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
redis:
image: redis:7.2
container_name: redis
ports:
- "6379:6379"
environment:
TZ: Asia/Shanghai
volumes:
- redis-data:/data
volumes:
redis-data:
验证
起两个server端,一个client。先进入docker中起的kafka服务
docker exec -it kafka bash
查看kafka中的topic,在服务端和客户端未开启时,kafka中无topic,启动后注册三个topic。
scan-tasks存放扫描任务,scan-results存放扫描结果,__consumer_offsets存放偏移量
因为要两个server负载平衡,所以topic中要有两个partition,
kafka-topics --alter --bootstrap-server localhost:9092 --topic scan-tasks --partitions 2
kafka-topics --describe --bootstrap-server localhost:9092 --topic scan-tasks
使用命令行工具起两个server端
启动一个client下发命令
服务端能均匀消费信息。
感心趣可以自行查看扫描结果和写入redis的进度。