参考资料
- https://doc.akka.io/libraries/akka-core/current/typed/actors.html#first-example
关于scala语法的注意事项
extends App
是个语法糖,等同于直接在伴生对象中编写main 方法- 对象是通过apply方法创建的,也可以通过对象的名称单独创建(此时实际上会调用apply方法)
- case class 样例类用于定义不可变类,可以用于模式匹配
- trait类似接口但是可以包括抽象方法,具体方法,子类。带有sealed表示只能在定义它的同一个文件中被继承,常用于更加安全的模式匹配,例如如果消息类型为sealed trait,则actor可以安全接受多种消息。
helloworld示例
代码的整体示意图如下
- HelloWorldMain创建ActorSystem,作为一个actorref指向HelloWorldMain actor。使用此引用向HelloWorldMain actor发送SayHello消息
- HelloWorldMain actor初始化Helloworld actor和HelloWorldBot,以及在收到SayHello消息后向HelloWorld actor发送Greet消息(其中带有HelloWorldBot的actorref)
- HelloWorld actor收到消息后向HelloWorldBot发送Greeted消息
- HelloWorldBot actor收到消息后greetingCounter计数增加,并向HelloWorld actor返回Greet消息。当greetingCounter超过max时暂停行为。
代码示例
//#imports
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
//#hello-world-actor
object HelloWorld {
// 使用样例类定义消息类型
final case class Greet(whom: String, replyTo: ActorRef[Greeted])
final case class Greeted(whom: String, from: ActorRef[Greet])
// Behaviors.receive函数接收一个函数作为参数,{}是为了容纳多行lambda表达式
def apply(): Behavior[Greet] = Behaviors.receive { (context, message) =>
context.log.info("Hello {}!", message.whom)
message.replyTo ! Greeted(message.whom, context.self) // 向message.replyTo发送消息Greeted,其中context.self是自身的actorref
Behaviors.same // 设置后续的消息处理逻辑不变
}
}
//#hello-world-bot
object HelloWorldBot {
def apply(max: Int): Behavior[HelloWorld.Greeted] = {
bot(0, max)
}
private def bot(greetingCounter: Int, max: Int): Behavior[HelloWorld.Greeted] =
Behaviors.receive { (context, message) =>
val n = greetingCounter + 1
context.log.info("Greeting {} for {}", n, message.whom)
if (n == max) {
Behaviors.stopped // 到达max次数后停止行为,避免无限循环
} else {
message.from ! HelloWorld.Greet(message.whom, context.self)
bot(n, max)
}
}
}
//#hello-world-main
object HelloWorldMain {
final case class SayHello(name: String)
def apply(): Behavior[SayHello] =
Behaviors.setup { context =>
val greeter = context.spawn(HelloWorld(), "greeter") // 初始化HelloWorld
Behaviors.receiveMessage { message => // 收到消息后创建HelloWorldBot
val replyTo = context.spawn(HelloWorldBot(max = 3), message.name)
greeter ! HelloWorld.Greet(message.name, replyTo)
Behaviors.same
}
}
def main(args: Array[String]): Unit = {
val system: ActorSystem[HelloWorldMain.SayHello] =
ActorSystem(HelloWorldMain(), "hello")
system ! HelloWorldMain.SayHello("World")
Thread.sleep(3000)
system.terminate()
}
}
chatroom示例
代码的整体示意图如下
- Main启动后,初始化chatRoom和Gabbler客户端。向ChatRoom发送GetSession消息(带有client actorref)
- chatRoom创建session actor,用来隔离会话
- chatRoom向client发送SessionGranted消息(带有session actorref)
- client(Gabbler)收到SessionGranted后向session actor发送PostMessage消息
- session 收到SessionGranted后向room发送PublishSessionMessage
- room返回NotifyClient给session
- 然后按照NotifyClient(带有MessagePosted)中的client actorref将MessagePosted转发给特定的client
- client收到MessagePosted之后完成并推出
整体的思路
- ChatRoom Actor:作为中央枢纽,负责管理所有的会话(Sessions)。每个连接到聊天室的客户端都会通过
GetSession
消息与 ChatRoom 交互,并获得一个专属的会话 Actor。 - Session Actor:每个客户端都有一个对应的 Session Actor,用于处理该客户端的消息收发、保持客户端状态等。
- client(Gabbler)Actor:模拟客户端行为,可以发送消息给 ChatRoom 或者其他客户端。
- 客户端间通信:通过 ChatRoom 转发消息来实现客户端间的通信。当一个客户端发送消息时,它实际上是将消息发送给了 ChatRoom,然后由 ChatRoom 将消息广播给所有其他在线的客户端。
代码示例
定义消息,这里实际上等同于定义actor之间的通信协议
- RoomCommand,用来获取session
- SessionEvent,用来管理session和发送message
- SessionCommand,用来发送message和通知client
object ChatRoom {
sealed trait RoomCommand
final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) extends RoomCommand
sealed trait SessionEvent
final case class SessionGranted(handle: ActorRef[PostMessage]) extends SessionEvent
final case class SessionDenied(reason: String) extends SessionEvent
final case class MessagePosted(screenName: String, message: String) extends SessionEvent
sealed trait SessionCommand
final case class PostMessage(message: String) extends SessionCommand
private final case class NotifyClient(message: MessagePosted) extends SessionCommand
}
ChatRoom actor部分
object ChatRoom {
// PublishSessionMessage消息将包含的ChatRoom消息传播到所有连接的客户端
private final case class PublishSessionMessage(screenName: String, message: String) extends RoomCommand
def apply(): Behavior[RoomCommand] =
chatRoom(List.empty)
private def chatRoom(sessions: List[ActorRef[SessionCommand]]): Behavior[RoomCommand] =
Behaviors.receive { (context, message) =>
message match {
// 如果收到GetSession,create a child actor for further interaction with the client
case GetSession(screenName, client) =>
val ses = context.spawn(
session(context.self, screenName, client),
name = URLEncoder.encode(screenName, StandardCharsets.UTF_8.name))
client ! SessionGranted(ses)
chatRoom(ses :: sessions) // ::用于将ses添加到sessions头。由于Akka 的行为是不可变的(每次更改状态都必须返回一个新的 behavior),所以通常通过 递归函数 + 参数携带状态 的方式来模拟“状态变化”
// 如果接收到 PublishSessionMessage 就向所有的session发送notification,每个session都带有client内容。等于是申请chatroom允许发送
case PublishSessionMessage(screenName, message) =>
val notification = NotifyClient(MessagePosted(screenName, message))
sessions.foreach(_ ! notification) // 将消息转发给session中的所有client
Behaviors.same
}
}
// 用于创建session actor,接受SessionCommand消息
private def session(
room: ActorRef[PublishSessionMessage],
screenName: String,
client: ActorRef[SessionEvent]): Behavior[SessionCommand] =
Behaviors.receiveMessage {
// 向room中的所有其他用户发送消息
case PostMessage(message) =>
room ! PublishSessionMessage(screenName, message)
Behaviors.same
// room发布消息通知client
case NotifyClient(message) =>
client ! message
Behaviors.same
}
}
客户端部分
object Gabbler {
import ChatRoom._
def apply(): Behavior[SessionEvent] =
Behaviors.setup { context =>
Behaviors.receiveMessage {
case SessionDenied(reason) =>
context.log.info("cannot start chat room session: {}", reason)
Behaviors.stopped
case SessionGranted(handle) =>
handle ! PostMessage("Hello World!")
Behaviors.same
case MessagePosted(screenName, message) =>
context.log.info("message has been posted by '{}': {}", screenName, message)
Behaviors.stopped
}
}
actorsystem入口
- 这里使用了Behaviors.setup。
Behaviors.setup
和Behaviors.receiveMessage
都是用于定义 Actor 行为的工厂方法,区别是Behaviors.setup
允许你在初始化阶段访问ActorContext
,而Behaviors.receiveMessage
不直接提供对上下文的访问,专注于消息处理逻辑 Main
Actor,对应于传统 Java 应用程序中的main
方法
object Main {
def apply(): Behavior[NotUsed] =
Behaviors.setup { context =>
val chatRoom = context.spawn(ChatRoom(), "chatroom")
val gabblerRef = context.spawn(Gabbler(), "gabbler")
context.watch(gabblerRef) //监控gabbler actor,如果gabbler 终止了,当前 Actor 将收到一个 Terminated(gabblerRef) 信号
chatRoom ! ChatRoom.GetSession("ol’ Gabbler", gabblerRef)
// 处理 Terminated 信号
Behaviors.receiveSignal {
case (_, Terminated(_)) =>
Behaviors.stopped
}
}
def main(args: Array[String]): Unit = {
ActorSystem(Main(), "ChatRoomDemo")
}
}
运行结果如下,按照预期逻辑,目前只有一个client,并且发送消息收到响应后推出