在Actor的生命周期中会调用几个方法,我们在需要时可以重写这些方法。
 ● prestart():在构造函数之后调用。
 ● postStop():在重启之前调用。
 ● preRestart(reason, message):默认情况下会调用postStop()。
 ● postRestart():默认情况下会调用preStart()。
一 生命周期
1.1 基本介绍
package com.shu;
import akka.actor.AbstractActor;
import scala.Option;
import java.util.Optional;
/**
 * @description: 生命周期ActorDemo
 * @author: shu
 * @createDate: 2022/12/10 11:33
 * @version: 1.0
 */
public class LifeActorDemo extends AbstractActor {
    /**
     * 在构造函数之后调用 ,可以完成一些初始化
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void preStart() throws Exception, Exception {
        super.preStart();
        System.out.println("Life 初始化");
    }
    /**
     * 在重启之前调用
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void postStop() throws Exception, Exception {
        super.postStop();
        System.out.println("Life 即将重启");
    }
    
    /**
     * 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
     * @param reason
     * @param message
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void preRestart(Throwable reason, Option<Object> message) throws Exception, Exception {
        super.preRestart(reason, message);
        System.out.println("Life 即将重启 调用preStart初始化");
    }
    /**
     * 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
     * @param reason
     * @throws Exception
     */
    @Override
    public void postRestart(Throwable reason) throws Exception, Exception {
        super.postRestart(reason);
        System.out.println("Life 即将重启 调用postStop方法");
    }
    /**
     * 收到消息
     * @return
     */
    @Override
    public Receive createReceive() {
        return null;
    }
}
 
- 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
 - 这样我们就能够决定,到底是只在Actor启动或停止的时候调用一次preStart和postStop,还是每次重启一个Actor的时候就调用preStart和postStop。
 

1.2 自定义监督策略
重写Actor的supervisorStrategy方法
    /**
     * 可以制定系你的监督策越
     * @return
     */
    @Override
    public SupervisorStrategy supervisorStrategy() {
//         super.supervisorStrategy();
         return new OneForOneStrategy(2, Duration.create("1 minute"), PartialFunction.empty());
    }
 
1.3 终止或kill一个Actor
有多种不同的方法可以用来停止一个Actor,下面任一方法都可以停止Actor:
 ● 调用ActorSystem.stop(actorRef);
 ● 调用ActorContext.stop(actorRef);
 ● 给Actor发送一条PoisonPill消息,会在Actor完成消息处理后将其停止;
 ● 给Actor发送一条kill消息,会导致Actor抛出ActorKilledException异常
对比
- 调用context.stop或system.stop会导致Actor立即停止
 - 发送PoisonPill消息则会在Actor处理完消息后将其停止
 - 不同的是,kill不会马上直接停止Actor,而是会导致Actor抛出一个ActorKilledException,
 
1.4 生命周期监控和DeathWatch
- 监督机制描述了如何对子Actor的状态进行响应。
 - 而Actor也可以对其他任何Actor进行监督。
 - 通过调用context.watch(actorRef)注册后,Actor就能够监控另一个Actor的终止,而调用context.unwatch(actorRef)就可以取消监控注册。
 - 如果被监控的Actor停止了,负责监控的Actor就会收到一条Terminated(ActorRef)消息。
 
1.5 状态
我们已经介绍过,Actor能够安全地存储状态,它允许我们使用无锁的方式并发处理状态,现在我们就来介绍Actor如何通过不同的状态来改变它的行为。
1.5.1 在状态之间暂存消息(stash)
- Akka提供了一种叫做stash的机制来支持这一功能。stash消息会把消息暂存到一个独立的队列中,该队列中存储目前无法处理的消息
 - unstash则把消息从暂存队列中取出,放回邮箱队列中,Actor就能继续处理这些消息了。
 - 在我们实际开发中,比如终端不在线,需要上线后执行一些操作,我就可以用这个机制来解决这个问题,实际上就是把消息缓存到一个队列中,但是如果缓存过多会造成内存泄漏,邮箱拥挤。
 
        if(cantHandleMessage) {
            // 缓存消息
            stash();
        } else {
            // 处理消息
            handleMessage(message);
            // 取出消息
            unstash()
        } 
 
要注意的是,虽然stash()和unstash()在希望快速改变状态的时候使用起来非常方便,但是stash消息的状态一定要和某个时间限制进行绑定,否则就有可能填满邮箱。
案例
        private Boolean online = false;
        public PartialFunction receive() {
            return RecieveBuilder
                .match(GetRequest.class, x -> {
                    if(online) {
                          processMessage(x);
                    } else {
                        stash();
                    }
                })
                .match(Connected.class, x -> {
                    online = true;
                    unstash();
                )
                .match(Disconnected.class, x -> online = false)
                .build();
 
1.5.2 热交换(Hotswap):Become/Unbecome
Akka提供了become()和unbecome(),用于管理不同的行为,这一用法可以大大改善代码的可读性。在Actor的context()中,有两个方法:
 ● become(PartialFunction behavior):这个方法将receive块中定义的行为修改为一个新的PartialFunction。
 ● unbecome():这个方法将Actor的行为修改回默认行为。
        public PartialFunction receive() {
            return RecieveBuilder
                .match(GetRequest.class, x -> stash())
                .match(Connected.class, x -> {
                    context().become(online);
                    unstash();
                  })
                  .build();
        }
        final private PartialFunction<Object, BoxedUnit> online(
            final ActorRef another) {
                return RecieveBuilder
                    .match(GetRequest.class, x -> processMessage(x))
                    .build();
            }
 
每个状态的行为都定义在自己独立的PartialFunction中,在PartialFunction中,使用模式匹配来定义不同的行为。这样我们就能够互不影响地阅读Actor中不同状态的行为。
1.5.3 有限自动机(Finite State Machine, FSM)
和热交换很相似的是,FSM中也有状态以及基于状态的行为变化,跟热交换比起来,FSM是一个更重量级的抽象概念,需要更多的代码和类型才能够实现并运行。所以通常来说,热交换是一个更简单、可读性更高的选择。
        when(DISCONNECTED,
            matchEvent(FlushMsg.class, (msg, container) -> stay())
                .event(GetRequest.class, (msg, container) -> {
                    container.add(msg);
                    return stay();
                })
                .event(Tcp.Connected.class, (msg, container) -> {
                    if(container.getFirst() == null) {
                          return goTo(CONNECTED);
                    } else {
                          return goTo(CONNECTED_AND_PENDING);
                    }
                }));
        when(CONNECTED,
            matchEvent(FlushMsg.class, (msg, container) -> stay()) {
                .event(GetRequest.class, (msg, container) -> {
                    container.add(msg);
                    return goTo(CONNECTED_AND_PENDING);
                }));
        when(CONNECTED_AND_PENDING,
            matchEvent(FlushMsg.class, (msg, container) -> {
                container = new EventQueue();
                return stay();
            })
            .event(GetRequest.class, (msg, container) -> {
                container.add(msg);
                return goTo(CONNECTED_AND_PENDING);
            }));
        scala.PartialFunction pf = ReceiveBuilder.match(String.class,
            x -> System.out.println(x)).build();
        when(CONNECTED, pf);
 
1.7 案例
结构图
 纯属虚构,方便自己理解上面的知识(Java代码实现)
 
失败生命周期的处理
● prestart():在构造函数之后调用。
 ● postStop():在重启之前调用。
 ● preRestart(reason, message):默认情况下会调用postStop()。
 ● postRestart():默认情况下会调用preStart()。
 
 我们可以发现当Actor内部发生了错误,他并不是终止了程序而是重新启动。
自定义监督策越
    /**
     * 自定义监督策越
     */
    private static SupervisorStrategy strategy = new OneForOneStrategy(
                    10,
                    Duration.ofMinutes(1),
                    DeciderBuilder
                            .match(ArithmeticException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.resume())
                            .match(NullPointerException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.restart())
                            .match(IllegalArgumentException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.stop())
                            .matchAny(o -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
                            .build());
    /**
     * 自定义策越
     * @return
     */
    @Override
    public SupervisorStrategy supervisorStrategy() {
        return strategy;
    }
 
10和Duration.create(1, TimeUnit.MINUTES)分别传递给maxNrOfRetries和withinTimeRange参数,这意味着策略每分钟重新启动一个子级最多10次。如果在withinTimeRange持续时间内重新启动计数超过maxNrOfRetries,则子 Actor 将停止。
基本效果图
- 服务端
 
- 上线
 - 请求数据
 - 下线
 

- 客服端
 

关键代码
- 服务端
 
package com.shu.terminal;
import akka.actor.*;
import akka.io.Tcp;
import akka.japi.Function;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskableActorRef;
import akka.util.Timeout;
import com.shu.meter.MeterDemoActor;
import pojo.Login;
import pojo.Logout;
import pojo.Meter;
import pojo.MeterRequest;
import scala.Option;
import scala.PartialFunction;
import scala.concurrent.Await;
import scala.concurrent.Future;
import static scala.compat.java8.FutureConverters.toJava;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/10 14:52
 * @version: 1.0
 */
public class TerminalDemoActor extends AbstractActor {
    /**
     * 自定义监督策越
     */
    private static SupervisorStrategy strategy = new OneForOneStrategy(
                    10,
                    Duration.ofMinutes(1),
                    DeciderBuilder
                            .match(ArithmeticException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.resume())
                            .match(NullPointerException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.restart())
                            .match(IllegalArgumentException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.stop())
                            .matchAny(o -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
                            .build());
    /**
     * 在线状态
     */
    private Boolean Online;
    /**
     * 在构造函数之后调用 ,可以完成一些初始化
     *
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void preStart() throws Exception, Exception {
        super.preStart();
        System.out.println("Life 初始化");
    }
    /**
     * 在重启之前调用
     *
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void postStop() throws Exception, Exception {
        super.postStop();
        System.out.println("Life 即将重启");
    }
    /**
     * 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
     *
     * @param reason
     * @param message
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void preRestart(Throwable reason, Option<Object> message) throws Exception, Exception {
        super.preRestart(reason, message);
        System.out.println("Life 即将重启 调用preStart初始化");
    }
    /**
     * 自定义策越
     * @return
     */
    @Override
    public SupervisorStrategy supervisorStrategy() {
        return strategy;
    }
    /**
     * 收到小消息
     *
     * @return
     */
    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                // 连接成功
                .match(Login.class, x -> {
                    // 在线状态改变
                    setOnline(true);
                    // 回应消息,登录成功
                    sender().tell(1001, self());
                    System.out.println("收到登录请求");
                })
                // 连接成功
                .match(Logout.class, x -> {
                    // 在线状态改变
                    setOnline(false);
                    // 回应消息,登录成功
                    sender().tell(1002, self());
                    System.out.println("收到注销请求");
                })
                // 请求数据
                .match(MeterRequest.class, msg -> {
                    // 在线
                    if (getOnline()) {
                        // 获取消息
                        Future sFuture = new AskableActorRef(context().actorOf(Props.create(MeterDemoActor.class))).ask(msg,Timeout.apply(1000,TimeUnit.SECONDS) );
                        CompletionStage<Meter> cs = toJava(sFuture);
                        CompletableFuture<Meter> future = (CompletableFuture<Meter>) cs;
                        // 消息发送给客服端
                        if (future.get() != null) {
                            sender().tell(future.get(), self());
                        }
                    }
                })
                // 未找到消息
                .matchAny(o ->
                        sender().tell(new Status.Failure(new ClassNotFoundException()), self())
                )
                .build();
    }
    public Boolean getOnline() {
        return Online;
    }
    public void setOnline(Boolean online) {
        Online = online;
    }
}
 
- 客服端
 
package client;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import pojo.*;
import java.util.Date;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static scala.compat.java8.FutureConverters.toJava;
/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/10 18:22
 * @version: 1.0
 */
public class TerminalClient {
    private final ActorSystem system = ActorSystem.create("LocalSystem");
    private final ActorSelection remoteTerminal;
    public TerminalClient(String remoteAddress) {
        remoteTerminal = system.actorSelection("akka.tcp://terminal@" +
                remoteAddress+ "/user/terminal-server");
    }
    /**
     * 获取消息
     * @param key
     * @param value
     * @return
     */
    public CompletionStage getMeterInfo(String key, int value) {
        System.out.println(remoteTerminal);
        return toJava(new AskableActorSelection(remoteTerminal).ask(new MeterRequest(key, value), Timeout.apply(5000, TimeUnit.SECONDS)));
    }
    /**
     * 上线
     * @return
     */
    public CompletionStage sendLogin() {
        System.out.println(remoteTerminal);
        return toJava(new AskableActorSelection(remoteTerminal).ask(new Login("1001", new Date()), Timeout.apply(5000, TimeUnit.SECONDS)));
    }
    /**
     * 下线
     * @return
     */
    public CompletionStage sendLogout() {
        System.out.println(remoteTerminal);
        return toJava(new AskableActorSelection(remoteTerminal).ask(new Logout("1001", new Date()), Timeout.apply(5000, TimeUnit.SECONDS)));
    }
}
 
具体案例代码:https://github.com/Eason-shu/Akka
 demo03 ,demo04


















