今日已办
50字项目价值和重难点
项目价值
通过将指标监控组件接入项目,对比包括其配套工具在功能、性能上的差异、优劣,给出监控服务瘦身的建议
top3难点
- 减少监控服务资源成本,考虑性能优化
 - 如何证明我们在监控服务差异、优劣方面的断言
 - 监控服务无感化,支持代码可扩展
 
总监回复
小而美的监控服务
怎么为之小? 怎么为之美? 要小要美的关键点是什么?是你们对于这个服务的核心的把握,最内核的把握,是如何做好减法。多看点做减法的文章,难的地方是,如何先想厚再做薄。
Watermill
Replace sarama.SyncProducer -> kafka-client.Producer
 
-  
测试发现 WriteKafka 的 Span 耗时较久,Watermill-kafka 的 Publisher 的 Producer 是
sarama.SyncProducer,同步写入耗时较久,而sarama.ASyncProducer的 API 不够直观,需求也需要我们更改底层库为 kafka-go -  
对比原先写入kafka,发现 kafka-client(依赖 kafka-go) 的 Producer 可以支持同步or异步消息写入,故修改 Publisher 的实现
 -  
profile/internal/watermill/watermillkafka/marshaler.gopackage watermillkafka import ( "github.com/Shopify/sarama" "github.com/ThreeDotsLabs/watermill/message" "github.com/pkg/errors" "github.com/segmentio/kafka-go" ) const UUIDHeaderKey = "_watermill_message_uuid" const HeaderKey = "_key" // Marshaler marshals Watermill's message to Kafka message. type Marshaler interface { Marshal(topic string, msg *message.Message) (*kafka.Message, error) } // Unmarshaler unmarshals Kafka's message to Watermill's message. type Unmarshaler interface { Unmarshal(*sarama.ConsumerMessage) (*message.Message, error) } type MarshalerUnmarshaler interface { Marshaler Unmarshaler } type DefaultMarshaler struct{} func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*kafka.Message, error) { if value := msg.Metadata.Get(UUIDHeaderKey); value != "" { return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey) } headers := []kafka.Header{{ Key: UUIDHeaderKey, Value: []byte(msg.UUID), }} var msgKey string for key, value := range msg.Metadata { if key == HeaderKey { msgKey = value } else { headers = append(headers, kafka.Header{ Key: key, Value: []byte(value), }) } } return &kafka.Message{ Topic: topic, Key: []byte(msgKey), Value: msg.Payload, Headers: headers, }, nil } func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error) { var messageID string metadata := make(message.Metadata, len(kafkaMsg.Headers)) for _, header := range kafkaMsg.Headers { if string(header.Key) == UUIDHeaderKey { messageID = string(header.Value) } else { metadata.Set(string(header.Key), string(header.Value)) } } msg := message.NewMessage(messageID, kafkaMsg.Value) msg.Metadata = metadata return msg, nil } -  
profile/internal/watermill/watermillkafka/publisher.go 
package watermillkafka
import (
	"context"
	kc "github.com/Kevinello/kafka-client"
	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/pkg/errors"
	"go.uber.org/zap"
	"profile/internal/connector"
	"profile/internal/log"
	"profile/internal/watermill/model"
)
type Publisher struct {
	config   PublisherConfig
	producer *kc.Producer
	logger   watermill.LoggerAdapter
	closed bool
}
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
	config PublisherConfig,
	logger watermill.LoggerAdapter,
) (*Publisher, error) {
	if err := config.Validate(); err != nil {
		return nil, err
	}
	if logger == nil {
		logger = watermill.NopLogger{}
	}
	producer, err := kc.NewProducer(context.Background(), config.KcProducerConfig)
	if err != nil {
		return nil, errors.Wrap(err, "cannot create Kafka producer")
	}
	return &Publisher{
		config:   config,
		producer: producer,
		logger:   logger,
	}, nil
}
type PublisherConfig struct {
	// Kafka brokers list.
	Brokers []string
	// Marshaler is used to marshal messages from Watermill format into Kafka format.
	Marshaler Marshaler
	// KcProducerConfig configuration object used to create new instances of Producer
	KcProducerConfig kc.ProducerConfig
}
func (c PublisherConfig) Validate() error {
	if len(c.Brokers) == 0 {
		return errors.New("missing brokers")
	}
	if c.Marshaler == nil {
		return errors.New("missing marshaler")
	}
	return c.KcProducerConfig.Validate()
}
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
	if p.closed {
		return errors.New("publisher closed")
	}
	logFields := make(watermill.LogFields, 2)
	logFields["topic"] = topic
	for _, msg := range msgs {
		logFields["message_uuid"] = msg.UUID
		p.logger.Trace("Sending message to Kafka", logFields)
		kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg)
		if err != nil {
			return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
		}
		data := msg.Context().Value("data").(*model.ConsumeCtxData)
		err = p.producer.WriteMessages(msg.Context(), *kafkaMsg)
		if err != nil {
			log.Logger.ErrorContext(msg.Context(), "send message to kafka error", zap.Error(err))
			data.WriteKafkaSpan.End()
			data.RootSpan.End()
			return errors.Wrapf(err, "cannot produce message %s", msg.UUID)
		}
		data.WriteKafkaSpan.End()
		log.Logger.Info("[WriteKafka] write kafka success",
			zap.String("topic", connector.GetTopic(data.Event.Category)),
			zap.String("id", data.Event.ID), zap.Any("msg", data.Event),
			zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
		p.logger.Trace("Message sent to Kafka", logFields)
		data.RootSpan.End()
	}
	return nil
}
func (p *Publisher) Close() error {
	if p.closed {
		return nil
	}
	p.closed = true
	if err := p.producer.Close(); err != nil {
		return errors.Wrap(err, "cannot close Kafka producer")
	}
	return nil
}
 
- 测试发现性能对比先前修改有了大量提升,(还测试了不同环境-docker/本地,不同配置-同步/异步的区别),docker环境,开启异步是效率最高的
 



明日待办
- benchmark:watermill 和 baserunner
 



















