etcd 是一个开源的 分布式键值存储系统(Key-Value Store),主要用于配置共享和服务发现。
ETCD是一个键值(KV)数据库,类似于Redis,支持分布式集群。 ETCD也可以看作是一个分布式文件系统,类似于ZooKeeper,可以对文件和目录进行监听。 在服务注册场景下,ETCD中的key是类似于文件路径的字符串,value为空。 每台服务器启动后,会主动将自己的IP地址和提供的服务名称写入ETCD。 为了防止key过期,服务器会每隔一段时间(如9秒)重新写入自己的信息。 通过设置租期(如2秒),ETCD可以在服务器宕机后及时清理过期的key。 客户端通过查询ETCD来获取能够提供服务的服务器IP地址。 客户端可以查询具有特定前缀的key,以获取所有提供相同服务的服务器IP。 ETCD支持监听功能,客户端可以监听特定前缀的变化,实时获取新的服务器信息。
package service_hub
import (
"context"
"errors"
"github.com/jmh000527/criker-search/index_service/load_balancer"
"github.com/jmh000527/criker-search/utils"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
etcdv3 "go.etcd.io/etcd/client/v3"
"strings"
"sync"
"time"
)
type EtcdServiceHub struct {
client * etcdv3. Client
heartbeatFrequency int64
watched sync. Map
loadBalancer load_balancer. LoadBalancer
}
const (
ServiceRootPath = "/criker-search"
)
var (
etcdServiceHub * EtcdServiceHub
hubOnce sync. Once
)
func GetServiceHub ( etcdServers [ ] string , heartbeatFrequency int64 ) * EtcdServiceHub {
if etcdServiceHub == nil {
hubOnce. Do ( func ( ) {
client, err := etcdv3. New ( etcdv3. Config{
Endpoints: etcdServers,
DialTimeout: 3 * time. Second,
} )
if err != nil {
utils. Log. Fatal ( "连接etcd失败:" , err)
}
etcdServiceHub = & EtcdServiceHub{
client: client,
heartbeatFrequency: heartbeatFrequency,
loadBalancer: & load_balancer. RoundRobin{ } ,
}
} )
}
return etcdServiceHub
}
func ( hub * EtcdServiceHub) RegisterService ( service, endpoint string , leaseId etcdv3. LeaseID) ( etcdv3. LeaseID, error ) {
if leaseId <= 0 {
leaseGrantResponse, err := hub. client. Grant ( context. Background ( ) , hub. heartbeatFrequency)
if err != nil {
utils. Log. Printf ( "创建租约失败: %v" , err)
return 0 , err
}
key := strings. TrimRight ( ServiceRootPath, "/" ) + "/" + service + "/" + endpoint
_ , err = hub. client. Put ( context. Background ( ) , key, "" , etcdv3. WithLease ( leaseGrantResponse. ID) )
if err != nil {
utils. Log. Printf ( "服务注册失败: %v" , err)
return leaseGrantResponse. ID, err
}
utils. Log. Printf ( "成功注册服务: %v" , key)
return leaseGrantResponse. ID, nil
} else {
_ , err := hub. client. KeepAliveOnce ( context. Background ( ) , leaseId)
if errors. Is ( err, rpctypes. ErrLeaseNotFound) {
utils. Log. Printf ( "未找到租约,重新注册服务" )
return hub. RegisterService ( service, endpoint, 0 )
} else if err != nil {
utils. Log. Printf ( "续租失败: %v" , err)
return 0 , err
}
return leaseId, nil
}
}
func ( hub * EtcdServiceHub) UnregisterService ( service string , endpoint string ) error {
key := strings. TrimRight ( ServiceRootPath, "/" ) + "/" + service + "/" + endpoint
_ , err := hub. client. Delete ( context. Background ( ) , key)
if err != nil {
utils. Log. Printf ( "注销服务失败: %v" , err)
return err
}
utils. Log. Printf ( "成功注销服务: %v" , key)
return nil
}
func ( hub * EtcdServiceHub) GetServiceEndpoints ( service string ) [ ] string {
prefix := strings. TrimRight ( ServiceRootPath, "/" ) + "/" + service + "/"
getResponse, err := hub. client. Get ( context. Background ( ) , prefix, etcdv3. WithPrefix ( ) )
if err != nil {
utils. Log. Printf ( "从etcd获取服务端点失败: %v" , err)
return nil
}
endpoints := make ( [ ] string , 0 , len ( getResponse. Kvs) )
for _ , kv := range getResponse. Kvs {
path := strings. Split ( string ( kv. Key) , "/" )
endpoints = append ( endpoints, path[ len ( path) - 1 ] )
}
utils. Log. Printf ( "最新的服务端点: %v" , endpoints)
return endpoints
}
func ( hub * EtcdServiceHub) GetServiceEndpoint ( service string ) string {
endpoints := hub. GetServiceEndpoints ( service)
return hub. loadBalancer. Take ( endpoints)
}
func ( hub * EtcdServiceHub) Close ( ) {
err := hub. client. Close ( )
if err != nil {
utils. Log. Printf ( "关闭etcd客户端连接失败: %v" , err)
}
}
ETCD提供API用于服务的注册与发现。 服务中心的核心是client,用于连接到ETCD。 服务注册后,需要定期上报心跳以保持存活状态。 service worker单独部署在服务器上,连接service hub使用单例模式。 通过once实现单例模式,判断是否已创建实例。 使用ETCD new方法连接,传入endpoints和配置信息。 配置中核心是endpoints,需要提供ETCD集群的多个IP。 连接超时设置为3秒,确保连接可靠性。 服务启动时,首先申请租约并获取租约ID。 将服务信息(service name + ip:port)注册到ETCD中,并设置租约。 定期续租以保持服务存活状态。 如果租约ID不存在,则重新注册服务。 提供注销函数,传入service name和endpoint IP。 从ETCD中删除对应的key,完成注销。 服务调用方通过get service函数获取服务列表。 传入service name作为前缀,查询满足前缀的key。 返回所有匹配key的IP列表,供调用方选择。
func ( hub * EtcdServiceHub) GetServiceEndpoint ( service string ) string {
endpoints := hub. GetServiceEndpoints ( service)
return hub. loadBalancer. Take ( endpoints)
}
由于调用方希望直接获取一台服务器进行接口通信,服务中心通过策略模式,将负载均衡算法的实现交给外部,采用接口方式定义负载均衡策略,并展示了轮询和随机两种简单的负载均衡策略实现,强调了在并发环境下确保累加操作的线程安全性。
package load_balancer
type LoadBalancer interface {
Take ( endpoints [ ] string ) string
}
import "math/rand"
type RandomSelect struct { }
func ( b * RandomSelect) Take ( endpoints [ ] string ) string {
if len ( endpoints) == 0 {
return ""
}
index := rand. Intn ( len ( endpoints) )
return endpoints[ index]
}
import "sync/atomic"
type RoundRobin struct {
acc int64
}
func ( b * RoundRobin) Take ( endpoints [ ] string ) string {
if len ( endpoints) == 0 {
return ""
}
n := atomic. AddInt64 ( & b. acc, 1 )
index := int ( n % int64 ( len ( endpoints) ) )
return endpoints[ index]
}