微服务-实用篇
- 一、微服务治理
- 1.微服务远程调用
- 2.Eureka注册中心
- Eureka的作用:
- 搭建EurekaServer服务
- Client服务注册
- 服务发现
- Ribbon负载均衡策略配置
- Ribbon配置饥饿加载
 
- 3.nacos注册中心
- 使用nacos注册中心服务
- nacos区域负载均衡
- nacos环境隔离-namespace
- Nacos和Eureka的对比
- nacos配置管理
- 配置管理步骤
- 配置热更新
 
- 多环境配置共享
 
- 4.http客户端Feign
- Feigin的使用步骤
- Feign的日志配置
- Feign的性能优化
 
- 5.统一网关Gateway
- 作用
- 搭建网关服务
- 路由的过滤器配置
- 全局过滤器
- 过滤器链执行顺序
 
 
- 二、异步通信
- 1.什么是AMQP?
- 2.部署RabbitMQ
- 3.SpingAMQP如何发送消息
- 4.SpingAMQP如何接收消息
- 5.WorkQueue模型
- 6.交换机
- 7.FanoutExchange
- 8.DirectExchange
- 9.TopicExchange
- 10.消息转换器
 
- 三、分布式搜索
- 1.初识elasticsearch
- 2.安装elastic
- 3.索引库的操作
- 4.文档操作
- 5.RestClient
- RestClient的初步使用
- RestClient增删改查
 
- 6.elasticsearch搜索功能
- DSL查询语法
 
- 7.RestClient查询文档
- 8.聚合
- 9.RestClient实现聚合
- 10.自动补全
- 11.RestClient实现自动补全
- 12.数据同步
- 13.ES集群
- 13.ES集群
 
 
一、微服务治理
1.微服务远程调用
-  springCloud提供了RestTemplate,可以发起远程http协议的调用 
-  使用 -  注入bean @Bean public RestTemplate restTemplate() { return new RestTemplate(); }
-  restTemplate使用 @RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @Autowired private RestTemplate restTemplate; private static final String UserBaseApiURL = "http://localhost:8081/user/"; @GetMapping("{orderId}") public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { // 根据id查询订单并返回 Order order = orderService.queryOrderById(orderId); // 远程调用查询用户 User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class); order.setUser(user); return order; } }
 
-  
2.Eureka注册中心
-  Eureka的作用:-  消费者该如何获取服务提供者具体信息? - 服务提供者启动时向eureka注册自己的信息
- eureka保存这些信息
- 消费者根据服务名称向eureka拉取提供者信息
 
-  如果有多个服务提供者,消费者该如何选择? - 服务消费者利用负载均衡算法,从服务列表中挑选一个
 
-  消费者如何感知服务提供者的健康状态? - 服务者会每隔30秒向EurekaServer发送心跳请求,报告健康状态
- eureka会更新记录服务列表信息,心跳不正常会被剔除
- 消费者就可以拉取到最新的信息
 
 
-  
-  搭建EurekaServer服务-  创建项目引入依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency>
-  编写application.yml配置文件 server: port: 10086 spring: application: name: eureka-server # 服务名 eureka: client: service-url: # 服务地址 defaultZone: http://127.0.0.1:10086/eureka/
-  在启动类上开启自动装配 @EnableEurekaServer // 开启eureka自动装配 @SpringBootApplication public class EurekaApplication { public static void main(String[] args) { SpringApplication.run(EurekaApplication.class , args); } }
 
-  
-  Client服务注册-  引入eureka-client依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
-  在application.yml中配置eureka地址 spring: datasource: url: jdbc:mysql://localhost:3306/cloud_order?useSSL=false username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver application: name: orderservice # 服务名 eureka: client: service-url: # 服务地址 defaultZone: http://127.0.0.1:10086/eureka/
 
-  
-  服务发现-  给RestTemplate添加@LoadBalanced注解 @Bean @LoadBalanced public RestTemplate restTemplate() { return new RestTemplate(); }
-  用服务提供者的服务名称远程调用 @RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @Autowired private RestTemplate restTemplate; private static final String UserBaseApiURL = "http://userservice/user/"; @GetMapping("{orderId}") public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { // 根据id查询订单并返回 Order order = orderService.queryOrderById(orderId); // 远程调用查询用户 User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class); order.setUser(user); return order; } }
 
-  
-  Ribbon负载均衡策略配置-  
    -  代码方式,注入IRule实例bean @Bean public IRule randomIRule() { return new RandomRule(); }
 
-  
-  2.配置文件方式: 在application.yml文件中添加配置项 
 userservice: ribbon: NFLoadBalancerRuleClassName: com.netfix.loadbalancer.RandomRule
-  
    
-  Ribbon配置饥饿加载-  ribbon默认采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长 
-  在application.yml中开启饥饿加载 ribbon: eager-load: enabled: true # 开启饥饿加载 clients: - userservice # 指定对userservice这个服务饥饿加载
 
-  
3.nacos注册中心
-  使用nacos注册中心服务-  引入依赖 <!-- nacos的管理依赖 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.5.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <!-- nacos客户端依赖包 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>
-  修改application.yml文件nacos配置 spring: application: name: orderservice # 服务名 cloud: nacos: server-addr: localhost:8848 # nacos端口号
 
-  
-  nacos区域负载均衡-  nacos默认负载均衡策略为服务轮询,一个服务往往会在多地部署多个实例,相同区域内的服务之间相互调用时长消耗更短,因此区域内优先调用比较合理,nacos提供了这样的负载均衡策略,不过区域内的策略为随机策略,当区域内没有可用服务时再访问其他区域的可用服务。 
-  application.yml配置区域 spring: application: name: userservice # 服务名 cloud: nacos: server-addr: localhost:8848 discovery: cluster-name: HZ # 集群名称
-  为Ribbon配置区域优先策略 userservice: ribbon: NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule
 
-  
-  nacos环境隔离-namespace-  不同的时期有不同的环境,比如开发时需要开发环境,namespace就可以进行环境隔离,不同环境之间的服务无法调用。 
-  application.yaml 配置namespace spring: application: name: orderservice # 服务名 cloud: nacos: server-addr: localhost:8848 discovery: cluster-name: HZ # 集群名称 namespace: fd3445b3-8e01-446a-91d5-03ab8b5a7205 # 环境id(从nacos控制台查看)
 
-  
-  Nacos和Eureka的对比-  共同点 - 都支持服务注册和服务拉取
- 都支持服务提供者心跳方式做健康检测
 
-  区别 -  Nacos支持服务端主动监测提供者状态:临时实例采用心跳模式,非临时实例采用主动健康监测。 spring: application: name: orderservice # 服务名 cloud: nacos: server-addr: localhost:8848 discovery: cluster-name: HZ # 集群名称 ephemeral: false # 非临时实例
-  临时实例心跳不正常会被剔除,非临时实例则不会被剔除。 
-  Nacos支持服务列表变更的消息推送模式,服务列表更新及时。 
-  Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP模式。 
 
-  
 
-  
-  nacos配置管理-  配置管理步骤-  在控制台添加配置文件  
-  添加依赖 <!-- nacos配置管理依赖 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency>
-  在bootstrap.yml文件中配置 spring: application: name: userservice profiles: active: dev # 开发环境 cloud: nacos: server-addr: localhost:8848 config: file-extension: yaml # 文件后缀
-  使用@Value注解注入 @Value("${pattern.format}") private String format;
 
-  
-  配置热更新-  通过@Value方式注入的配置属性需要在类上添加@RefreshScope注解即可实现配置热更新 @Slf4j @RestController @RequestMapping("/user") @RefreshScope // 配置热更新注解 public class UserController { @Autowired private UserService userService; // 配置属性注入 @Value("${pattern.format}") private String format; }
-  通过@ConfigurationProperties方式注入的属性自动热更新 @Component @ConfigurationProperties(prefix = "pattern") @Data public class PatternProperties { private String format; }
 
-  
-  多环境配置共享- 微服务会从nacos读取的配置文件: 
      - [服务名]-[spring.proflle.active].yaml
- [服务名].yaml,默认配置,多环境共享
 
- 优先级 
      - [服务名]-[环境].yaml > [服务名].yaml > 本地配置
 
 
- 微服务会从nacos读取的配置文件: 
      
 
-  
4.http客户端Feign
-  Feigin的使用步骤-  引入依赖 <!-- fegin依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>
-  启动类添加@EnableFeignClients注解 @MapperScan("cn.itcast.order.mapper") @SpringBootApplication @EnableFeignClients public class OrderApplication { public static void main(String[] args) { SpringApplication.run(OrderApplication.class, args); } }
-  编写FeignClient接口 @FeignClient("userservice") public interface UserClient { @GetMapping("user/{id}") User findById(@PathVariable("id") Long id); }
-  使用FeignClient中定义的方法代替RestTemplate @RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @Autowired private UserClient userClient; @GetMapping("{orderId}") public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { // 根据id查询订单并返回 Order order = orderService.queryOrderById(orderId); // 远程调用查询用户 User user = userClient.findById(order.getUserId()); order.setUser(user); return order; } }
 
-  
-  Feign的日志配置-  方式一是配置文件 feign: client: config: default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置 logger-level: FULL # 日志级别
-  方式二是JAVA代码配置类 public class FeignClientConfiguration { @Bean public Logger.Level feignLogLevel(){ return Logger.Level.FULL; } }@MapperScan("cn.itcast.order.mapper") @SpringBootApplication // 在启动类上添加的配置类属于全局配置 @EnableFeignClients(defaultConfiguration = FeignAutoConfiguration.class) public class OrderApplication { public static void main(String[] args) { SpringApplication.run(OrderApplication.class, args); } }// 如果需要对某一服务进行配置在服务接口上添加即可 @FeignClient(value = "userservice" , configuration = FeignClientConfiguration.class) public interface UserClient { @GetMapping("user/{id}") User findById(@PathVariable("id") Long id); }
 
-  
-  Feign的性能优化-  引入依赖 <dependency> <groupId>io.github.openfeign</groupId> <artifactId>feign-httpclient</artifactId> </dependency>
-  配置连接池 feign: client: config: default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置 logger-level: FULL # 日志级别 httpclient: enabled: true # 开启feign对HttpClient的支持 max-connections: 200 # 最大连接数 max-connections-per-route: 50 # 每个路径的最大连接数
 
-  
5.统一网关Gateway
-  作用- 对用户请求做身份验证,权限认证
- 将用户请求路由到微服务,并实现负载均衡
- 将用户请求做限流
 
-  搭建网关服务-  创建新的module,引入SpringCloudGateWay的依赖和服务发现依赖 <dependencies> <!-- nacos服务发现依赖 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!-- 网关gateway依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> </dependencies>
-  编写路由配置及nacos地址 server: port: 10010 spring: application: name: gateway cloud: nacos: server-addr: localhost:80 # nacos地址 gateway: routes: # 网关路由配置 - id: user-service # 路由id 自定义 uri: lb://userservice # 路由的目标地址 lb是负载均衡 predicates: # 路由断言 - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 - id: order-service uri: lb://orderservice predicates: # 路由断言 - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求
 
-  
-  路由的过滤器配置-  为某个服务添加过滤器 spring: application: name: gateway cloud: nacos: server-addr: localhost:8848 # nacos地址 gateway: routes: # 网关路由配置 - id: user-service # 路由id 自定义 uri: lb://userservice # 路由的目标地址 lb是负载均衡 predicates: # 路由断言 - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 - id: order-service uri: lb://orderservice predicates: # 路由断言 - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求 filters: - AddRequestHeader=color, blue # 局部过滤器
-  添加默认过滤器(全局) spring: application: name: gateway cloud: nacos: server-addr: localhost:8848 # nacos地址 gateway: routes: # 网关路由配置 - id: user-service # 路由id 自定义 uri: lb://userservice # 路由的目标地址 lb是负载均衡 predicates: # 路由断言 - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 - id: order-service uri: lb://orderservice predicates: # 路由断言 - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求 default-filters: - AddRequestHeader=color, blue # 全局过滤器
 
-  
-  全局过滤器@Order(1) // 过滤器等级 越低优先值越高 @Component public class AuthorizeFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); HttpHeaders headers = request.getHeaders(); String token = headers.getFirst("token"); if (token != null && token.equals("abc")) { return chain.filter(exchange); } exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); return exchange.getResponse().setComplete(); } }
-  过滤器链执行顺序- 每一个过滤器都必须指定一个int类型的order值,order值越小,优先级越高,执行顺序越靠前。
- GlobalFilter通过实现Ordered接口,或者添加@Order注解来指定Order值,由我们自己指定
- 路由过滤器和defaultFilter的order由Spring指定,默认是按照声明顺序从1递增
- 当过滤器的order值一样时,会按照default>路由过滤器>GlobalFilter的循序执行
 
二、异步通信
1.什么是AMQP?
- 应用层消息通信的一种协议,与语言和平台无关
2.部署RabbitMQ
RabbitMQ部署指南.md
3.SpingAMQP如何发送消息
-  引入AMQP的Starter依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-  配置RabbitMQ地址 spring: rabbitmq: addresses: 192.168.88.101 # 地址名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast password: 123321
-  新建测试类 @RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello , Spring amqp!"; rabbitTemplate.convertAndSend(queueName , message); } }
4.SpingAMQP如何接收消息
-  配置地址 spring: rabbitmq: addresses: 192.168.88.101 # 地址名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast password: 123321
-  新建类 @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") // 在启动前要确保该队列存在! public void listenSimpleQueue(String msg) { System.out.println("消费者1接收到消息 = " + msg); } }
5.WorkQueue模型
-  多个模型绑定到一个队列,用一个消息会被一个消费者处理 
-  通过设置prefetch来控制消费者预取的消息数量 logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: addresses: 192.168.88.101 # 地址名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast password: 123321 listener: simple: prefetch: 1 # 修改消费者提前把握的最大数量
-  示例: -  设置发布50条消息 @Test public void testSimpleWorkQueue() { String queueName = "simple.queue"; for (int i = 0; i < 50; i++) { String message = "hello , Spring amqp! - " + (i + 1); rabbitTemplate.convertAndSend(queueName , message); } }
-  两个接收者 @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(queues = "simple.queue") public void listenSimpleQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息 = " + msg); Thread.sleep(500); } }
 
-  
6.交换机
- 交换机的作用是什么? 
  - 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExChange的会将消息路由到每个绑定的队列
 
7.FanoutExchange
-  特点: - 会将交换机接收的消息转发给绑定的所有队列,所有与之监听的消费者全部都会收到消息
 
-  编写配置类 @Configuration public class FanoutConfig { // 声明FanoutExchange交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); } // 声明第一个队列 @Bean public Queue queue1() { return new Queue("fanout.queue1"); } // 绑定队列1和交换机 @Bean public Binding bindingQueue1(Queue queue1 , FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue1).to(fanoutExchange); } // 声明第二个队列 @Bean public Queue queue2() { return new Queue("fanout.queue2"); } // 绑定队列2和交换机 @Bean public Binding bindingQueue2(Queue queue2 , FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue2).to(fanoutExchange); } }
-  编写消费者代码 @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息 = " + msg); }
8.DirectExchange
-  特点: - 该交换机需要设置key值,当该交换机收到消息时,交换机会转发给指定key值的队列。存在多个相同的key值时,则群发
 
-  案例 -  编写接收者,绑定交换机(注解方式) @RabbitListener(bindings = @QueueBinding( value = @Queue( name = "direct.queue1"), exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT), key = {"red" , "blue" } )) public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue( name = "direct.queue2"), exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT), key = {"red" , "pink" } )) public void listenDirectQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息 = " + msg); }
-  编写发布者 @Test public void testDirectQueue() { String queueName = "simple.queue"; String message = "hello , Spring amqp!"; // 交换机name String exchangeName = "itcast.direct"; rabbitTemplate.convertAndSend(exchangeName , "red" , message); }
 
-  
9.TopicExchange
-  特点 - TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
- Queue与Exchange指定BindingKey时可以使用通配符: 
    - #:代指0个或多个单词
- *:代指一个单词
 
 
-  示例: -  编写接收者 @RabbitListener(bindings = @QueueBinding( value = @Queue( name = "topic.queue1"), exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC), key = {"china.*" } )) public void listenTopicQueue1(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue( name = "topic.queue2"), exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC), key = {"china.*" } )) public void listenTopicQueue2(String msg) { System.err.println("消费者2接收到消息 = " + msg); }
-  编写发送者 @Test public void testTopicQueue() { String queueName = "simple.queue"; String message = "hello , Spring amqp!"; // 交换机name String exchangeName = "itcast.topic"; rabbitTemplate.convertAndSend(exchangeName , "china.news" , message); }
 
-  
10.消息转换器
-  RabbieMQ是可以传递java对象的,通过MessageConverter实现,但是默认是JDk的序列化,最好为其配置JSON对象转化器,注意发布与接收方必须使用相同的MessageConverter 
-  案例 -  消息发布方 -  引入JackSon依赖 <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.16.1</version> </dependency>
-  注入Bean @Bean // 注入json消息转化器 public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
-  发布消息 @Test public void ObjectQueue() { String queueName = "object.queue"; HashMap<String, String> hashMap = new HashMap<>(); hashMap.put("name" , "小强"); hashMap.put("age" , "18"); rabbitTemplate.convertAndSend(queueName , hashMap); }
 
-  
-  消息接收方 -  引入JackSon依赖 <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.16.1</version> </dependency>
-  注入Bean @Bean // 注入json消息转化器 public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
-  定义队列 @Bean public Queue ObjectQueue() { return new Queue("object.queue"); }
-  接收消息 @RabbitListener(queues = "object.queue") public void listenObjectQueue2(HashMap<String , Object> msg) { System.err.println("消费者接收到消息 = " + msg); }
 
-  
 
-  
三、分布式搜索
1.初识elasticsearch
-  什么是elasticsearch? - 一个开源的分布式搜索引擎,可以用来实现搜索、日志统计、分析、系统监控等功能
 
-  什么是elastic stack (ELK)? - 是指以elasticsearch为核心的技术栈,包括beats、logstash、kibana、elasticsearch
 
2.安装elastic
安装elasticsearch.md
3.索引库的操作
-  mapping属性 - type:字段数据类型,常见的简单类型有: 
    - 字符串:text(可分词文本)、keyword(精确值,例如:品牌、国家)
- 数值:long、integer、short、byte、double、float
- 布尔:boolean
- 日期:date
- 对象:object
 
- index:是否创建索引,默认为true
- analyzer:使用哪种分词器
- properties:该字段的子字段
 
- type:字段数据类型,常见的简单类型有: 
    
-  索引库操作 -  创建索引库 PUT /索引库名 # 创建索引库 PUT /heima { "mappings": { "properties": { "info": { "type": "text", "analyzer": "ik_smart" }, "email" : { "type": "keyword", "index": false }, "name" : { "properties": { "firstName" : { "type" : "keyword", "index" : false }, "lastName" : { "type" : "keyword", "index" : false } } } } } }
-  查看索引库 GET /索引库名 # 查询索引库 GET /heima
-  删除索引库 DELETE /索引库名 # 删除索引库 DELETE /黑马
-  添加字段 PUT /索引库名/_mapping # 添加新字段 PUT /heima/_mapping { "properties": { "color" : { "type" : "keyword", "index" : false } } }
 
-  
4.文档操作
-  添加文档 -  模板 POST /索引库名/_doc/文档id { "字段1" : "值1", "字段2" : "值2", "字段3" : { "子属性1" : "值3", "子属性2" : "值4", } }
-  示例 # 添加文档 POST /heima/_doc/1 { "info" : "尚硅谷,让天下没有学完的技术!", "email" : "1482939313@qq.com", "name" : { "firstName" : "尚", "lastName" : "硅谷" } }
 
-  
-  查看文档 -  模板 GET /索引库名/_doc/文档id
-  示例 GET /heima/_doc/1
 
-  
-  删除文档 -  模板 DELETE /索引库名/_doc/文档id
-  示例 DELETE /heima/_doc/1
 
-  
-  修改文档 -  方式一:全量修改 -  特点 id存在则修改,不存在则创建 
-  模板 PUT /索引库名/_doc/文档id { "字段1" : "值1", "字段2" : "值2", }
 
-  
-  方式二:局部修改 -  模板 POST /索引库名/_update/文档id { "doc" : { "字段1" : "值1", "字段2" : "值2", } }
 
-  
 
-  
5.RestClient
-  RestClient的初步使用-  引入依赖 <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.12.1</version> </dependency><!-- 注意版本控制 --> <properties> <java.version>1.8</java.version> <elasticsearch.version>7.12.1</elasticsearch.version> </properties>
-  初始化RestHighLevelClient package cn.itcast.hotel; @SpringBootTest class HotelDemoApplicationTests { private RestHighLevelClient client; @BeforeEach void setUp() { this.client = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.88.101:9200") )); } @AfterEach void tearDown() throws IOException { this.client.close(); } @Test void contextLoads() { System.out.println(this.client); } }
 
-  
-  RestClient增删改查-  操作索引库 -  创建索引库 // 新建索引 @Test public void addIndex() throws IOException { // 创建Request对象 CreateIndexRequest request = new CreateIndexRequest("hotel"); // 准备请求参数 RestClientConstant.HOTELTEMPLATE 为新建索引的json结构 request.source(RestClientConstant.HOTELTEMPLATE , XContentType.JSON); // 发送请求 client.indices().create(request , RequestOptions.DEFAULT); }
-  删除和判断索引库 // 删除索引 @Test public void deleteIndex() throws IOException { // 创建Request对象 DeleteIndexRequest request = new DeleteIndexRequest("hotel"); // 发送请求 client.indices().delete(request , RequestOptions.DEFAULT); } // 判断索引库是否存在 @Test public void existsIndex() throws IOException { // 创建Request对象 GetIndexRequest request = new GetIndexRequest("hotel"); // 发送请求 boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); System.out.println( exists ? "索引库存在" : "索引库不存在"); }
 
-  
-  操作文档 -  新增文档 // 新增文档 @Test public void addDocument() throws IOException { // 获取信息 Hotel hotel = hotelService.getById(36934L); HotelDoc hotelDoc = new HotelDoc(hotel); // 创建Request对象 并设置id IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString()); // 设置请求 request.source(JSON.toJSONString(hotelDoc) , XContentType.JSON); // 发送请求 client.index(request , RequestOptions.DEFAULT); }
-  查询文档 // 新增文档 @Test public void findDocument() throws IOException { // 创建Request对象 并设置id GetRequest request = new GetRequest("hotel").id("36934"); // 发送请求 GetResponse response = client.get(request, RequestOptions.DEFAULT); String json = response.getSourceAsString(); HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); System.out.println("hotelDoc = " + hotelDoc); }
-  更新文档 // 更新文档 @Test public void updateDocument() throws IOException { // 创建Request对象 并设置id UpdateRequest request = new UpdateRequest("hotel", "36934"); // 设置更新信息 每两个参数为一对 key,value request.doc( "city" , "美国" ); // 发送请求 client.update(request , RequestOptions.DEFAULT); }
-  删除文档 // 删除文档 @Test public void deleteDocument() throws IOException { // 创建Request对象 并设置id DeleteRequest req = new DeleteRequest("hotel", "36934"); // 发送请求 client.delete(req , RequestOptions.DEFAULT); }
-  批量操作 // 批量操作 @Test public void bulkDocument() throws IOException { // 获取数据 List<Hotel> hotelList = hotelService.list(); // 创建Request对象 BulkRequest bulkRequest = new BulkRequest(); // 写入 hotelList.stream().forEach( item -> { HotelDoc hotelDoc = new HotelDoc(item); bulkRequest.add(new IndexRequest("hotel") .id(item.getId().toString()).source(JSON.toJSONString(hotelDoc) , XContentType.JSON)); } ); // 发送请求 client.bulk(bulkRequest , RequestOptions.DEFAULT); }
 
-  
 
-  
6.elasticsearch搜索功能
-  DSL查询语法-  基本语法 GET /indexName/_search { "query" : { "查询类型" : { "查询条件" : "条件值" } } }
-  查询所有 GET /hotel/_search { "query" : { “match_all” : { } } }
-  全文检索查询 -  会对查询text进行分词,查询倒排索引 
-  match和multi_match的区别: match查询单字段,multi_match根据多个字段查询,参与查询字段越多,查询性能越差 
 # match查询 GET /hotel/_search { "query": { "match": { "all": "上海如家" } } } # multi_match查询 GET /hotel/_search { "query": { "multi_match": { "query": "上海如家", "fields": ["brand" , "name" , "business"] } } }
-  
-  精确查询 # term查询 GET /hotel/_search { "query": { "term": { "brand": { "value": "如家" } J } } } # range查询 范围 GET /hotel/_search { "query": { "range": { "price": { "gte": 400, "lte": 500 } } } }
-  地理查询 -  geo_bounding_box:查询geo_point值落在某个矩形范围的所有文档 # geo_bounding_box 边界框查询 GET /hotel/_search { "query": { "geo_bounding_box" : { "location" : { "top_left" : { "lat" : 31.1, "lon" : 121.5 }, "bottom_right" : { "lat" : 30.9, "lon" : 121.7 } } } } }
-  geo_distance:查询到指定中心点小于某个距离值的所有文档 # geo_distance 距离中心点查询 GET /hotel/_search { "query": { "geo_distance" : { "distance" : "15km", "location" : "31.21,121.5" } } }
 
-  
-  复合查询 -  function score query 可以修改文档的相关性算分(query socre),根据新得到的算分排序。 
-  function score query定义的三要素 - 过滤条件:哪些文档要加分
- 算分函数:如何计算function score
- 加权方式:funcation score 与 query score如何运算
 # function sorce GET /hotel/_search { "query": { "function_score": { "query": { "match": { "all": "上海" } }, "functions": [ { "filter": { "term": { "brand": "万豪" } }, "weight": 10 } ], "boost_mode": "multiply" } } }
-  复合查询Boolean Query是一个或多个查询子句的组合。子查询的组合方式有: - must:必须匹配每个子查询,类似“与”
- should:选择性匹配子查询,类似“或”
- must_not:必须不匹配,不参与算分,类似“非”
- filter:必须匹配,不参与算分
 # 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店 # 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店 GET /hotel/_search { "query": { "bool": { "must": [ { "match": { "name": "如家" } } ], "must_not": [ { "range": { "price": { "gt": 400 } } } ], "filter": [ { "geo_distance": { "distance": "10km", "location": { "lat": 31.21, "lon": 121.5 } } } ] } } }
 
-  
-  排序 -  elasticsearch默认根据相关度算分,也可以指定 
-  简单类型 GET /hotel/_search { "query": { "match": { "all": "北京" } }, "sort": [ { "price": { "order": "desc" } } ] }
-  地理坐标 GET /hotel/_search { "query": { "match": { "all": "上海" } }, "sort": [ { "_geo_distance": { "location": "31.21,121.5", "order": "asc", "unit": "km" } } ] }
 
-  
-  分页 -  elasticsearch默认只返回top10的数据 GET /hotel/_search { "query": { "match_all": {} }, "from": 10, // 分页开始页 "size": 10 // 分页数量 }
 
-  
-  高亮 GET /hotel/_search { "query": { "match": { "all": "如家" } }, "highlight": { "fields": { "name": { "require_field_match": "false" } } } }
 
-  
7.RestClient查询文档
    @Test
    public void query() throws IOException {
        // 创建request对象
        SearchRequest searchRequest = new SearchRequest("hotel");
        // 构造DSL语句
//        searchRequest.source().query(QueryBuilders.matchAllQuery());
        searchRequest.source().query(QueryBuilders.matchQuery("all" , "上海"))
                .sort("price" , SortOrder.ASC)
                .highlighter(new HighlightBuilder().field("name").requireFieldMatch(false))
                .from(0)
                .size(50);
        // 发送请求
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println("查询结果总条数为:" + response.getHits().getTotalHits() + "条");
        System.out.println("高亮部分为:" + response.getHits().getHits()[0].getHighlightFields());
        List<HotelDoc> hotelDocList = parseRestClintResponse(response, HotelDoc.class);
        hotelDocList.stream().forEach(System.out :: println);
    }
    private <T> List<T> parseRestClintResponse(SearchResponse response , Class<T> clazz) {
        SearchHits hits = response.getHits();
        ArrayList<T> list = new ArrayList<>();
        SearchHit[] hitsArray = hits.getHits();
        Arrays.stream(hitsArray).forEach( item -> list.add(JSON.parseObject(item.getSourceAsString() , clazz)));
        return list;
    }
8.聚合
-  什么是聚合? - 聚合可以实现对文档数据的统计、分析、运算
 
-  聚合的分类 - 桶聚合:用来对文档作分组 
    - TermAggregation:按照文档字段值分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
 
- 度量聚合:用以计算一些值,比如:最大值、最小值、avg等 
    - Avg:平均值
- Max:最大值
- Min:最小值
- Stats:同时求Avg、Max、Min、sum等
 
- 管道聚合:其他聚合的结果为基础做聚合
 
- 桶聚合:用来对文档作分组 
    
-  示例: -  DSL实现Bucket聚合 GET /hotel/_search { "query": { "range": { "price": { "lte": 500 } } }, "size" : 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 100, "order": { "_count": "asc" } } } } }
-  DSL实现Metrics GET /hotel/_search { "size": 0, "aggs": { "brandAggs": { "terms": { "field": "brand", "size": 30, "order": { "score_aggs.avg": "desc" } }, "aggs": { "score_aggs": { "stats": { "field": "score" } } } } } }
 
-  
9.RestClient实现聚合
    /**
     * 聚合查询
     */
    @Test
    public void queryAggregation() throws IOException {
        SearchRequest request = new SearchRequest("hotel");
        request.source().size(0);
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg").field("brand").size(20).order(BucketOrder.aggregation("scoreStats.avg" , false))
                .subAggregation(AggregationBuilders.stats("scoreStats")
                        .field("score")));
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        Aggregations aggregations = response.getAggregations();
        Terms brandAgg = aggregations.get("brandAgg");
        List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
        buckets.stream().forEach( item -> {
            System.out.print(item.getKeyAsString());
            Aggregations aggregations1 = item.getAggregations();
            List<Aggregation> aggregations2 = aggregations1.asList();
            Aggregation aggregation = aggregations2.get(0);
            // 解析出aggregation内的内容
            if (aggregation instanceof Stats) {
                Stats stats = (Stats) aggregation;
                double avgScore = stats.getAvg();
                double minScore = stats.getMin();
                double maxScore = stats.getMax();
                long count = stats.getCount();
                System.out.println("平均分:" + avgScore + ", 最低分:" + minScore + ", 最高分:" + maxScore + ", 总数:" + count);
            }
                }
        );
    }
10.自动补全
-  自定义分词器 // 酒店数据索引库 PUT /hotel { "settings": { "analysis": { "analyzer": { "text_anlyzer": { "tokenizer": "ik_max_word", "filter": "py" }, "completion_analyzer": { "tokenizer": "keyword", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "id":{ "type": "keyword" }, "name":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart", "copy_to": "all" }, "address":{ "type": "keyword", "index": false }, "price":{ "type": "integer" }, "score":{ "type": "integer" }, "brand":{ "type": "keyword", "copy_to": "all" }, "city":{ "type": "keyword" }, "starName":{ "type": "keyword" }, "business":{ "type": "keyword", "copy_to": "all" }, "location":{ "type": "geo_point" }, "pic":{ "type": "keyword", "index": false }, "all":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart" }, "suggestion":{ "type": "completion", "analyzer": "completion_analyzer" } } } }// 自动补全查询 POST /hotel/_search { "suggest": { "suggestions": { "text": "sd", "completion": { "field": "suggestion", "skip_duplicates": true, "size": 10 } } } }
11.RestClient实现自动补全
    /**
     * 自动补全
     */
    @Test
    public void autoComplete() throws IOException {
        SearchRequest request = new SearchRequest("hotel");
        request.source()
                .suggest(new SuggestBuilder().addSuggestion(
                        "suggestions",
                        SuggestBuilders
                                .completionSuggestion("suggestion")
                                .size(10)
                                .prefix("sd")
                        ));
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        Suggest suggest = response.getSuggest();
        CompletionSuggestion suggestion = suggest.getSuggestion("suggestions");
        List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();
        options.stream().forEach(item -> {
            System.out.println(item.getHit());
        });
    }
12.数据同步
-  数据同步的几种实现方式 - 同步调用:实现简单,粗暴、业务耦合度高
- 异步通知:低耦合,实现难度一般、依赖mp的可靠性
- 监听binlog:完全解除服务间耦合、开始binlog增加数据库负担、实现复杂度高
 
-  利用MQ实现mysql与elasticsearch数据同步 -  定义常量类 public class MqConstant { // 交换机 public static final String HOTEL_EXCHANGE_TOPIC = "hotel.exchange.topic"; // update队列 public static final String HOTEL_UPDATE_QUEUE = "hotel.update"; // delete队列 public static final String HOTEL_DELETE_QUEUE = "hotel.delete"; }
-  在消费者微服务中声明exchange、queue、RoutingKey(rabbitMQ懒加载,只有消费者在监听,不监听不会创建交换机队列等) @Configuration public class MqConfig { // 交换机 @Bean public TopicExchange topicExchange() { return new TopicExchange(MqConstant.HOTEL_EXCHANGE_TOPIC , true , false); } // 更新队列 @Bean(name = "updateQueue") public Queue updateQueue(){ return new Queue(MqConstant.HOTEL_UPDATE_QUEUE , true); } // 删除队列 @Bean(name = "deleteQueue") public Queue deleteQueue(){ return new Queue(MqConstant.HOTEL_DELETE_QUEUE , true); } // 绑定队列交换机 @Bean public Binding bindingUpdateQueue(@Qualifier("updateQueue") Queue updateQueue , TopicExchange topicExchange){ return BindingBuilder.bind(updateQueue).to(topicExchange).with(MqConstant.HOTEL_UPDATE_QUEUE); } // 绑定队列交换机 @Bean public Binding bindingDeleteQueue(@Qualifier("deleteQueue") Queue deleteQueue , TopicExchange topicExchange){ return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstant.HOTEL_DELETE_QUEUE); } }
-  生产者发布 @RestController @RequestMapping("hotel") @CrossOrigin("*") public class HotelController { @Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate; @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id); } }
-  消费者监听 @Component public class HotelListener { @Autowired private IHotelService hotelService; // 更新监听 @RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE) public void updateListen(Long id) { System.out.println("更新es文档..."); hotelService.updateEs(id); } // 删除监听 @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE) public void deleteListen(Long id) { System.out.println("删除es文档..."); hotelService.deleteEs(id); } }
 
-  
13.ES集群
- 各节点的职责 
  - master eligible节点的作用是什么? 
    - 参与集群选主
- 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
 
- data节点的作用是什么? 
    - 数据CRUD
 
- coordinator节点的作用是什么? 
    - 路由请求到其他节点
- 合并查询到的结果,返回用户
 
 
- master eligible节点的作用是什么? 
    
- 分布式新增如何确定分配分片? 
  - coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
 
- 分布式查询 
  - 分散阶段:coordinator node将查询请求分发给不同分片
- 收集阶段:将查询结果汇总到coordinator node,整理并返回给用户
 
- ES集群的故障转移 
  - 集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这叫故障转移
 
nt.HOTEL_DELETE_QUEUE);
 }
 }
```
-  生产者发布 @RestController @RequestMapping("hotel") @CrossOrigin("*") public class HotelController { @Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate; @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id); } }
-  消费者监听 @Component public class HotelListener { @Autowired private IHotelService hotelService; // 更新监听 @RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE) public void updateListen(Long id) { System.out.println("更新es文档..."); hotelService.updateEs(id); } // 删除监听 @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE) public void deleteListen(Long id) { System.out.println("删除es文档..."); hotelService.deleteEs(id); } }
13.ES集群
- 各节点的职责 
  - master eligible节点的作用是什么? 
    - 参与集群选主
- 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
 
- data节点的作用是什么? 
    - 数据CRUD
 
- coordinator节点的作用是什么? 
    - 路由请求到其他节点
- 合并查询到的结果,返回用户
 
 
- master eligible节点的作用是什么? 
    
- 分布式新增如何确定分配分片? 
  - coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
 
- 分布式查询 
  - 分散阶段:coordinator node将查询请求分发给不同分片
- 收集阶段:将查询结果汇总到coordinator node,整理并返回给用户
 
- ES集群的故障转移 
  - 集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这叫故障转移
 

![成功解决[!] CocoaPods could not find compatible versions for pod “sqflite“](https://img-blog.csdnimg.cn/direct/a53e7a685c7d48b59b0317669cf9d0cb.png)

















