哨兵(centennial)负责接待客人,直接与调用方对接。 哨兵的核心组件包括service HUB和connection pool。 service HUB用于与服务中心通信,获取可提供服务的节点信息。 connection pool用于缓存与index worker的连接,避免每次搜索时重新建立连接。 连接池初始化为空map。 提供函数获取指定endpoint的GRPC连接。 函数首先检查本地缓存中是否有可用连接,若无则创建新连接。 创建连接时默认立即返回,可选阻塞模式直到连接可用。 连接建立后放入缓存并返回。 哨兵提供添加、删除和搜索三个核心功能。 添加功能:随机选择一台index worker添加新文档。 删除功能:遍历所有endpoint,并行删除指定文档。 搜索功能:将搜索请求发送到所有endpoint,合并搜索结果。 使用channel进行并发搜索结果的收集。 上游并发写入channel,下游读取channel数据到切片。 使用wait group等待所有搜索任务完成。 关闭channel后仍可读取,确保读取到所有数据。
package index_service
import (
"context"
"fmt"
"github.com/jmh000527/criker-search/index_service/service_hub"
"github.com/jmh000527/criker-search/types"
"github.com/jmh000527/criker-search/utils"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"sync"
"sync/atomic"
"time"
)
type Sentinel struct {
hub service_hub. ServiceHub
connPool sync. Map
}
func NewSentinel ( etcdServers [ ] string ) * Sentinel {
return & Sentinel{
hub: service_hub. GetServiceHubProxy ( etcdServers, 3 , 100 ) ,
connPool: sync. Map{ } ,
}
}
func ( sentinel * Sentinel) GetGrpcConn ( endpoint string ) * grpc. ClientConn {
v, exists := sentinel. connPool. Load ( endpoint)
if exists {
conn := v. ( * grpc. ClientConn)
if conn. GetState ( ) == connectivity. TransientFailure || conn. GetState ( ) == connectivity. Shutdown {
utils. Log. Printf ( "连接到 endpoint %s 的状态为 %s" , endpoint, conn. GetState ( ) . String ( ) )
conn. Close ( )
sentinel. connPool. Delete ( endpoint)
} else {
return conn
}
}
ctx, cancel := context. WithTimeout ( context. Background ( ) , 200 * time. Millisecond)
defer cancel ( )
grpcConn, err := grpc. DialContext ( ctx, endpoint, grpc. WithTransportCredentials ( insecure. NewCredentials ( ) ) , grpc. WithBlock ( ) )
if err != nil {
utils. Log. Printf ( "连接到 %s 的 gRPC 失败,错误: %s" , endpoint, err. Error ( ) )
return nil
}
utils. Log. Printf ( "连接到 %s 的 gRPC 成功" , endpoint)
sentinel. connPool. Store ( endpoint, grpcConn)
return grpcConn
}
func ( sentinel * Sentinel) AddDoc ( doc types. Document) ( int , error ) {
endpoint := sentinel. hub. GetServiceEndpoint ( IndexService)
if len ( endpoint) == 0 {
return 0 , fmt. Errorf ( "未找到服务 %s 的有效节点" , IndexService)
}
grpcConn := sentinel. GetGrpcConn ( endpoint)
if grpcConn == nil {
return 0 , fmt. Errorf ( "连接到 %s 的 gRPC 失败" , endpoint)
}
client := NewIndexServiceClient ( grpcConn)
affected, err := client. AddDoc ( context. Background ( ) , & doc)
if err != nil {
return 0 , err
}
utils. Log. Printf ( "成功向 worker %s 添加 %d 个文档" , endpoint, affected. Count)
return int ( affected. Count) , nil
}
func ( sentinel * Sentinel) DeleteDoc ( docId string ) int {
endpoints := sentinel. hub. GetServiceEndpoints ( IndexService)
if len ( endpoints) == 0 {
return 0
}
var n int32
wg := sync. WaitGroup{ }
wg. Add ( len ( endpoints) )
for _ , endpoint := range endpoints {
go func ( endpoint string ) {
defer wg. Done ( )
grpcConn := sentinel. GetGrpcConn ( endpoint)
if grpcConn == nil {
utils. Log. Printf ( "连接到 %s 的 gRPC 失败" , endpoint)
return
}
client := NewIndexServiceClient ( grpcConn)
affected, err := client. DeleteDoc ( context. Background ( ) , & DocId{ docId} )
if err != nil {
utils. Log. Printf ( "从 worker %s 删除文档 %s 失败,错误: %s" , endpoint, docId, err)
return
}
if affected. Count > 0 {
atomic. AddInt32 ( & n, affected. Count)
utils. Log. Printf ( "从 worker %s 删除文档 %s 成功" , endpoint, docId)
}
} ( endpoint)
}
wg. Wait ( )
return int ( atomic. LoadInt32 ( & n) )
}
func ( sentinel * Sentinel) Search ( query * types. TermQuery, onFlag, offFlag uint64 , orFlags [ ] uint64 ) [ ] * types. Document {
endpoints := sentinel. hub. GetServiceEndpoints ( IndexService)
if len ( endpoints) == 0 {
return nil
}
docs := make ( [ ] * types. Document, 0 , 1000 )
resultChan := make ( chan * types. Document, 1000 )
var wg sync. WaitGroup
wg. Add ( len ( endpoints) )
for _ , endpoint := range endpoints {
go func ( endpoint string ) {
defer wg. Done ( )
grpcConn := sentinel. GetGrpcConn ( endpoint)
if grpcConn == nil {
utils. Log. Printf ( "连接到 %s 的 gRPC 连接失败" , endpoint)
return
}
client := NewIndexServiceClient ( grpcConn)
searchResult, err := client. Search ( context. Background ( ) , & SearchRequest{
Query: query,
OnFlag: onFlag,
OffFlag: offFlag,
OrFlags: orFlags,
} )
if err != nil {
utils. Log. Printf ( "向 worker %s 执行查询 %s 失败,错误: %s" , endpoint, query, err)
return
}
if len ( searchResult. Results) > 0 {
utils. Log. Printf ( "向 worker %s 执行查询 %s 成功,获取到 %v 个文档" , endpoint, query, len ( searchResult. Results) )
for _ , result := range searchResult. Results {
resultChan <- result
}
}
} ( endpoint)
}
signalChan := make ( chan struct { } )
go func ( ) {
for doc := range resultChan {
docs = append ( docs, doc)
}
signalChan <- struct { } { }
} ( )
wg. Wait ( )
close ( resultChan)
<- signalChan
return docs
}
func ( sentinel * Sentinel) Count ( ) int {
var n int32
endpoints := sentinel. hub. GetServiceEndpoints ( IndexService)
if len ( endpoints) == 0 {
return 0
}
var wg sync. WaitGroup
wg. Add ( len ( endpoints) )
for _ , endpoint := range endpoints {
go func ( endpoint string ) {
defer wg. Done ( )
grpcConn := sentinel. GetGrpcConn ( endpoint)
if grpcConn != nil {
client := NewIndexServiceClient ( grpcConn)
affected, err := client. Count ( context. Background ( ) , new ( CountRequest) )
if err != nil {
utils. Log. Printf ( "从 worker %s 获取文档数量失败: %s" , endpoint, err)
}
if affected. Count > 0 {
atomic. AddInt32 ( & n, affected. Count)
utils. Log. Printf ( "worker %s 共有 %d 个文档" , endpoint, affected. Count)
}
}
} ( endpoint)
}
wg. Wait ( )
return int ( atomic. LoadInt32 ( & n) )
}
func ( sentinel * Sentinel) Close ( ) ( err error ) {
sentinel. connPool. Range ( func ( key, value any) bool {
conn := value. ( * grpc. ClientConn)
err = conn. Close ( )
return true
} )
sentinel. hub. Close ( )
return
}