Akka 学习(四)Remote Actor

news2025/6/22 18:01:53

目录

  • 一 介绍
    • 1.1 Remote Actor
    • 1.2 适用场景
    • 1.3 踩坑点
  • 二 实战
    • 2.1 需求
    • 2.2 Java 版本
      • 2.2.1 效果图
      • 2.2.2 实体类
      • 2.2.3 服务端Actor 处理
      • 2.2.4 服务端配置文件
      • 2.2.5 客服端Actor处理
      • 2.2.6 客服端配置文件
      • 2.2.7 测试
    • 2.3 Scala 版本
      • 2.3.1 效果
      • 2.2.3 服务端Actor处理
      • 2.3.4 客户端Actor处理
      • 2.3.5 测试

一 介绍

1.1 Remote Actor

虽然Akka在单机上可以运行上百万的Actor,但出于容错、负载均衡、灰度发布、提高并行度等等原因,我们仍然需要能在多个不同的服务器上运行Actor。所以Akka提供了akka-remoting的扩展包,屏蔽底层网络传输的细节,让上层以及其简单的方式使用远程的Actor调度。
Akka Remoting 是一个以点对点方式连接 actor 系统的通信模块,它是 Akka 集群的基础。远程处理的设计由两个(相关的)设计决策驱动:

  1. 相关系统之间的通信是对称的:如果系统 A 可以连接到系统 B,那么系统 B 也必须能够独立连接到系统 A。
  2. 通信系统的角色在连接模式方面是对称的:没有只接受连接的系统,也没有只发起连接的系统。

这些决定的结果是不可能安全地创建具有预定义角色的纯客户端-服务器设置(违反假设 2)。对于客户端-服务器设置,最好使用 HTTP 或 Akka I/O。
重要提示:使用涉及网络地址转换、负载平衡器或 Docker 容器的设置违反了假设 1,除非在网络配置中采取额外步骤以允许相关系统之间的对称通信。在这种情况下,Akka 可以配置为绑定到与用于在 Akka 节点之间建立连接的地址不同的网络地址。请参阅NAT 后面或 Docker 容器中的 Akka。

1.2 适用场景

  • remoting的存在其实是为akka cluster做底层支持的,通常并不会直接去使用remoting的包。但为了了解cluster的底层原理,还是有必要看下remoting。
  • 同时,remoting被设计为Peer-to-Peer而非Client-Server,所以不适用于基于后者的系统开发,比如我们无法在一个provider为local的Actor里去查找一个remote actor发送消息,必须两者均为remote actor,才满足对等。

1.3 踩坑点

  • Akka版本需要与Scala版本匹配

maven仓库地址:
image.png
注意版本匹配,不然会疯狂报错,运行不起来
我的版本:
scala:2.13.0
Akka版本:2.13

  • 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.hc</groupId>
  <artifactId>ActorDemo03</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <name>ActorDemo03</name>
  <description>Demo project for Spring Boot</description>

  <properties>
    <java.version>1.8</java.version>
    <scala.version>2.11.7</scala.version>
  </properties>



  <dependencies>

    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-actor_2.13</artifactId>
      <version>2.5.23</version>
    </dependency>


    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-remote_2.13</artifactId>
      <version>2.5.23</version>
    </dependency>

    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-testkit_2.13</artifactId>
      <version>2.5.23</version>
    </dependency>

    <dependency>
      <groupId>org.scala-lang.modules</groupId>
      <artifactId>scala-java8-compat_2.11</artifactId>
      <version>1.0.2</version>
    </dependency>







    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.testng</groupId>
      <artifactId>testng</artifactId>
      <version>RELEASE</version>
      <scope>compile</scope>
    </dependency>


  </dependencies>



  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>8</source>
          <target>8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>



</project>

二 实战

2.1 需求

epub_22651331_22.jpg

  • 我们还将创建一个数据库客户端,用于展示如何请求服务器,以及如何从远程Actor中获取Future。
  • 服务器端的服务接收到客户端的请求后将返回Future。
  • 这样我们就已经编写了一个可以使用的键值存储数据库(和redis很类似)以及一个可以使用该数据库的远程客户端。

2.2 Java 版本

2.2.1 效果图

  • 服务端启动前

image.png

  • 客服端启动

image.png

  • 服务端收到请求后

image.png

2.2.2 实体类

  • SetRequest
package pojo;

import java.io.Serializable;

/**
 * @description: 设置消息
 * @author: shu
 * @createDate: 2022/11/28 11:51
 * @version: 1.0
 */
public class SetRequest implements Serializable {
    public final String key;
    public final Object value;
    public SetRequest(String key, Object value) {
        this.key = key;
        this.value = value;
    }

}

  • GetRequest
package pojo;

import java.io.Serializable;

/**
 * @description: 获取消息
 * @author: shu
 * @createDate: 2022/11/28 11:52
 * @version: 1.0
 */
public class GetRequest implements Serializable {
    public final String key;
    public GetRequest(String key) {
        this.key = key;
    }
}
  • 错误消息
package pojo;

import java.io.Serializable;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/11/28 11:52
 * @version: 1.0
 */
public class KeyNotFoundException extends Exception implements
        Serializable {
    public final String key;
    public KeyNotFoundException(String key) {
        this.key = key;
    }
}

2.2.3 服务端Actor 处理


import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import pojo.GetRequest;
import pojo.KeyNotFoundException;
import pojo.SetRequest;

import java.util.HashMap;
import java.util.Map;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/11/28 12:31
 * @version: 1.0
 */
public class RequestActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);


    protected final Map<String, Object> map = new HashMap<>();

    @Override
    public void preStart() throws Exception {
        log.info("ToFindRemoteActor is starting");
    }

    @Override
    public Receive createReceive() {
        return  ReceiveBuilder.create()
                // 设置消息
                .match(SetRequest.class, message -> {
                    // 打印消息
                    log.info("Received Set request: {}", message.key);
                    // 缓存消息
                    map.put(message.key, message.value);
                    // 回应消息
                    sender().tell(new Status.Success(message.key), self());
                })
                // 得到消息
                .match(GetRequest.class, message -> {
                    // 打印日志
                    log.info("Received Get request: {}", message.key);
                    // 获取消息
                    Object value = (Object) map.get(message.key);
                    Object response = (value!= null)
                            ? value
                            : new Status.Failure(new KeyNotFoundException(message.key));
                    // 响应消息
                    sender().tell(response, self());
                })
                // 未找到消息
                .matchAny(o ->
                        sender().tell(new Status.Failure(new ClassNotFoundException()), self())
                )
                .build();

    }


    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {

//        Config config = ConfigFactory.parseString(
//                        "akka.remote.netty.tcp.port=" + 2551)
//                .withFallback(ConfigFactory.load("application.conf"));

        // Create an Akka system
        ActorSystem system = ActorSystem.create("akkademy");

        // Create an actor
        ActorRef ref = system.actorOf(Props.create(RequestActor.class), "akkademy-db");

        System.out.println(ref);

    }



}

2.2.4 服务端配置文件

image.png

akka {
  stdout-loglevel = "DEBUG"
  loglevel = "DEBUG"
  actor {
  provider = "akka.remote.RemoteActorRefProvider"
}
remote {
  enabled-transports = ["akka.remote.netty.tcp"]
  netty.tcp {
  hostname = "127.0.0.1"
}
}
log-sent-messages = on
log-received-messages = on
}

2.2.5 客服端Actor处理

package client;



import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import pojo.GetRequest;
import pojo.SetRequest;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static scala.compat.java8.FutureConverters.toJava;


/**
* @description:
* @author: shu
* @createDate: 2022/11/28 16:10
* @version: 1.0
*/
public class JClient {
    private final ActorSystem system = ActorSystem.create("LocalSystem");
    private final ActorSelection remoteDb;

    public JClient(String remoteAddress) {
        remoteDb = system.actorSelection("akka.tcp://akkademy@" +
                                         remoteAddress + "/user/akkademy-db");
    }

    /**
* 缓存消息
* @param key
* @param value
* @return
*/
    public CompletionStage set(String key, Object value) {
        return toJava(new AskableActorSelection(remoteDb).ask(new SetRequest(key, value), Timeout.apply(5000, TimeUnit.SECONDS)));
    }

    /**
* 获取缓存消息
* @param key
* @return
*/
    public CompletionStage get(String key){
        return   toJava(new AskableActorSelection(remoteDb).ask(new GetRequest(key), Timeout.apply(5000, TimeUnit.SECONDS)));
    }

}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import client.JClient;
import org.junit.Test;
import pojo.SetRequest;

import java.util.concurrent.CompletableFuture;
public class AkkademyDbTest {

    /**
     * 测试注意需要放在不同的两个项目进行测试,不然会Caused by: java.net.BindException: Address already in use: bind
     * @throws Exception
     */
    @Test
    public void itShouldSetRecord() throws Exception {
            JClient client = new JClient("127.0.0.1:2552");
                client.set("123", 123);
                Integer result = (Integer) ((CompletableFuture) client.
                        get("123")).get();
                System.out.println("获取的结果:"+result);
                assert(result == 123);

        }


}

2.2.6 客服端配置文件

akka {
  stdout-loglevel = "DEBUG"
  loglevel = "DEBUG"
  actor {
  provider = "akka.remote.RemoteActorRefProvider"
}
remote {
  enabled-transports = ["akka.remote.netty.tcp"]
  netty.tcp {
  hostname = "127.0.0.1"
  port = 0
}
log-sent-messages = on
log-received-messages = on
}
}

2.2.7 测试

注意:需要放在两个不一样的配置项目中

  1. 服务端项目启动,等待请求的到来,注意配置文件
  2. 客服端项目启动,发送AKKA请求给服务端
  3. 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端

2.3 Scala 版本

2.3.1 效果

  • 启动服务前

image.png

  • 客户端启动

image.png

  • 服务端收到请求

image.png

2.2.3 服务端Actor处理

import akka.actor.{Actor, Status}
import akka.event.{Logging, LoggingAdapter}

import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable

/**
 * @description:
 * @author: shu
 * @createDate: 2022/11/28 12:41
 * @version: 1.0
 */
class ScalaRequest extends Actor {
  protected val log: LoggingAdapter = Logging.getLogger(context.system, this)
   val map: mutable.Map[String, Object] = new mutable.HashMap[String, Object]

  override def receive = {

    case SetRequest(key, value) =>
      log.info("received SetRequest - key: {} value: {}", key, value)
      map.put(key, value)
      sender() ! Status.Success

    case GetRequest(key) =>
      log.info("received GetRequest - key: {}", key)
      val response: Option[Object] = map.get(key)

      response match{
        case Some(x) => sender() ! x
        case None => sender() ! Status.Failure(new KeyNotFoundException(key))
      }

    case o => Status.Failure(new ClassNotFoundException)
  }

}


case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception

配置文件跟Java一样

2.3.4 客户端Actor处理

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/1 11:43
 * @version: 1.0
 */
class SClient(remoteAddress: String) {
  private implicit val timeout = Timeout(2 seconds)
  private implicit val system = ActorSystem("LocalSystem")
  private val remoteDb = system.actorSelection(
    s"akka.tcp://akkademy@$remoteAddress/user/akkademy-db")
  def set(key: String, value: Object) = {
    remoteDb ? SetRequest(key, value)
  }
  def get(key: String) = {
    remoteDb ? GetRequest(key)
  }
}



case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception
import scala.concurrent.duration.DurationInt
import scala.concurrent.Await
import scala.language.postfixOps

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/1 11:44
 * @version: 1.0
 */

object Main extends App {
  val client = new SClient("127.0.0.1:2552")
  client.set("123", new Integer(123))
  val futureResult = client.get("123")
  val result = Await.result(futureResult, 10  seconds)

}

2.3.5 测试

注意:需要放在两个不一样的配置项目中

  1. 服务端项目启动,等待请求的到来,注意配置文件
  2. 客服端项目启动,发送AKKA请求给服务端
  3. 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/70985.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用 Excel 数据透视表深入研究数据分析

问题 1(文章数据在底部) 为美国选民案例研究创建一个数据透视表,并用它来回答以下问题: A) 有多少个州的选民人口百分比低于 55%?哪些州? 答:有5个州的选民人数低于55%,分别是得克萨斯州、阿肯色州、俄克拉荷马州、夏威夷州和西弗吉尼亚州。 步骤:根据以下结果,创建…

基于jsp+java+ssm的社会保险信息管理系统-计算机毕业设计

项目介绍 课题研究的基本内容及预期目标或成果 用户注册与登录功能&#xff0c;在单位注册功能中有申请管理功能&#xff0c;填写具体信息。 系统管理员&#xff1a; 1&#xff09;个人密码修改&#xff1a;实现了管理员用户密码信息的修改。 2&#xff09;参保人员管理&a…

ORACE dbca创建报错Oracle system identifier(SID) “orcl“

最近项目需要通过备份恢复oracle实例&#xff0c;必须使用orcl&#xff0c;通过dbca创建实例是提示如下报错&#xff1a; 查看日志&#xff0c;$ORACLE_HOME/cfgtoollogs/dbca/dbcaui.log EVERE: [FATAL] A database instance with Oracle system identifier(SID) "orcl&…

零基础入门推荐系统 - 新闻推荐 - 实操2

内容导航: 零基础入门推荐系统 - 新闻推荐 - 实操2比赛数据分析:用户属性分析:训练集和测试集中分别有多少用户&#xff1f;用户城市分布有什么规律&#xff1f;平均每个用户会点击多少个文章&#xff1f;点击来源与文章点击次数是否存在关联&#xff1f;用户行为分析:零基础入…

【车载开发系列】UDS诊断---读取周期标识符($0x2A)

【车载开发系列】UDS诊断—读取周期标识符&#xff08;$0x2A&#xff09; UDS诊断---读取周期标识符&#xff08;$0x2A&#xff09;【车载开发系列】UDS诊断---读取周期标识符&#xff08;$0x2A&#xff09;一.概念定义二.报文格式1&#xff09;请求报文2&#xff09;初始响应3…

[附源码]计算机毕业设计课程在线测评系统Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

打破信息壁垒,提升业务水平,纷享销客CRM带给木链科技不一样的体验

步入数字化时代&#xff0c;企业业务模式和员工工作方式日新月异&#xff0c;传统协作方式很显然已经难以适应当前的需求&#xff0c;企业亟需一种新的面向信息化的协作方式&#xff0c;以提高工作效率&#xff0c;提升业务水平。 这样的挑战也发生在工业互联网安全企业&#…

Oracle一次获取多个序列值

Oracle一次获取多个序列值SQL 语句一次获取多个序列值获取序列中的多个值connect by level 生成多行数据JDBC 一次获取多个序列值MyBatis 一次获取多个序列值SQL 语句一次获取多个序列值 获取序列中的多个值 创建序列 CREATE SEQUENCE test_user_seq;获取一个序列值 SELECT…

Linux系统(Centos 7)配置DNS客户端

配置DNS客户端 DNS 客户端的配置非常简单&#xff0c;假设本地首选DNS服务器的IP地址为192.168.10.1&#xff0c;备用DNS 服务器的IP地址为192.168.10.2&#xff0c;则 DNS客户端的设置如下。 配置Windows 客户端 打开“Intermet 协议&#xff08;TCP/IP)”属性对话框&a…

8_3、Java基本语法之线程的生命周期与同步

一、线程的生命周期 JDK中用Thread.State类定义了线程的几种状态 要想实现多线程&#xff0c;必须在主线程中创建新的线程对象。Java语言使用Thread类 及其子类的对象来表示线程&#xff0c;在它的一个完整的生命周期中通常要经历如下的五种状态&#xff1a; 新建&#xff1a; …

基于ARIMA、SVM、随机森林销售的时间序列预测

如今DT&#xff08;数据技术&#xff09;时代&#xff0c;数据变得越来越重要&#xff0c;其核心应用“预测”也成为互联网行业以及产业变革的重要力量。最近我们被客户要求撰写关于销售时间序列预测的研究报告&#xff0c;包括一些图形和统计输出。对于零售行业来说&#xff0…

Elsevier(爱思唯尔)LaTex 模板详细说明

Elsevier 模板的使用 官方网站提供的 Latex Instructions&#xff0c;Elsevier 模板下载地址&#xff1a;elsarticle-template.zip [ 如果不了解文档类的作用&#xff0c;可以参考&#xff1a;documentclass ] Elsevier 提供了 3 种自定义的文档类&#xff1a; elsarticle…

大厂软件测试流程完整版

目 1.概述 1.1目的 有效的保证软件质量&#xff1b; 有效的制定不同测试类型&#xff08;软件系统测试、音频主观性测试、Field Trial、专项测试、自动化测试、性能测试、用户体验测试&#xff09;的软件测试计划&#xff1b; 按照计划进行测试&#xff0c;发现软件中存在…

Session | web应用的session机制、session的实现原理

目录 一&#xff1a;web应用的session机制 二&#xff1a;session的实现原理 一&#xff1a;web应用的session机制 &#xff08;1&#xff09;什么是会话&#xff1f; ①会话对应的英语单词&#xff1a;session ②用户打开浏览器&#xff0c;进行一系列操作&#xff0c;然后…

[附源码]计算机毕业设计楼盘销售管理系统Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

session,cookie,token详解

session,cookie,token详解 1.session 1.1 session的作用是什么 session的作用是用于保存每个用户的专用信息&#xff1b;当用户访问时&#xff0c;服务器都会为每个用户分配唯一的Session ID&#xff0c;而且当访问其他程序时可以从用户的session中取出该用户的数据为用户服务。…

将 Cpar 文件导入 2019 版的 Carsim 后,无法打开 video+plot 是什么问题?

大家在进行联合仿真的过程中&#xff0c;首先要将你的 Carsim 右上角的锁打开&#xff01; 解锁之后要明确仿真动画&#xff08;video&#xff09;和图像&#xff08;plot&#xff09;只有在联合仿真运行完了之后才会有&#xff0c;这个时候需要点击 Simulink 模型界面那个绿色…

Elasticsearch 基本操作

&#x1f449; Elasticsearch 基本操作 &#x1f48e; 1  RESTful REST 指的是一组架构约束条件和原则。满足这些约束条件和原则的应用程序或设计就是 RESTful。Web 应用程序最重要的 REST 原则是&#xff0c;客户端和服务器之间的交互在请求之间是无状态的。从客户端到服务器…

基于改进量子粒子群算法的电力系统经济调度(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️❤️&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f389;作者研究&#xff1a;&#x1f3c5;&#x1f3c5;&#x1f3c5;本科计算机专业&#xff0c;研究生电气学硕…