目录
- 复习
- 本节内容
- 正文
- 什么是Actor模型
- 如何应用
- 创建Actor基类
- 创建RootActor
- 创建AkkaContext
- 创建ConnectActorManager和ConnectActor
- 生成actor并发送消息给它
- 课后作业
- 结尾
复习
上一节我们使用gradle构建了一个多模块系统。
并且在登录服启动了Netty服务,监听config文件中配置的端口。
在client模块中使用Netty创建一个客户端连接到登录服,并且发送并接收协议。
本节内容
本节我们学习一下Actor模型。并将其应用在我们的登录服中。
我们将会为每一个连接上来的客户端生成一个actor专门用于处理该连接发送上来的请求数据。
正文
什么是Actor模型
Actor模型是一种强大的并发计算模型。
在Actor模型中,一个Actor是一个最基本的计算单元,它可以看作是一个个独立的实体,它们之间毫无关联,但是可以通过消息来通信。
一个Actor收到其他Actor的信息后,可以根据需要作出各种响应。每个Actor的数据相互隔离,使用消息传递的方式来进行并发操作,避免了多线程编程中常见的同步和共享内存问题,从而提高了程序的可靠性和可伸缩性。
使用Actor模型最好的一点在于,为Actor逻辑进行编码时,无需过多考虑多线程并发问题,因为它是通过消息驱动的且其内部数据只能由其本身进行修改,而每个Actor的消息处理是串行的,所以每个Actor内部的数据不会被并发修改。
出bug的情况较少,对新人友好,意味着可以多招新人程序员来开发以节约项目成本。
如何应用
要在java中使用Actor模型,可以使用akka的actor库进行开发。
Kilim是一个比akka更轻量化的actor库,但是社区规模相对较小。
本项目选择使用akka的actor库。
implementation group: 'com.typesafe.akka', name: 'akka-actor-typed_3', version: '2.8.5'
创建Actor基类
因为我们项目中将会有多种不同功能的actor出现,我们先定一个actor基类用于规范所有actor。
创建BaseMsg和BaseActor两个类
/**
* 消息基类 所有Actor消息的基类
*/
public class BaseMsg implements Serializable {
}
/**
* Actor基类
*/
@Slf4j
public abstract class BaseActor extends AbstractBehavior<BaseMsg> {
public BaseActor(ActorContext<BaseMsg> context) {
super(context);
}
@Override
public Receive<BaseMsg> createReceive() {
ReceiveBuilder<BaseMsg> builder = newReceiveBuilder();
builder.onMessage(BaseMsg.class, this::onBaseMsg);
return builder.build();
}
private Behavior<BaseMsg> onBaseMsg(BaseMsg msg) {
log.info("receive base msg. {}", msg.toString());
return this;
}
}
BaseActor继承与AbstractBehavior,它接收所有BaseMsg类型的消息,当收到BaseMsg消息时打印receive base msg.
创建RootActor
RootActor是一个用于初始化ActorSystem和创建其他Actor的守护Actor
public class RootActor extends BaseActor{
public RootActor(ActorContext<BaseMsg> context) {
super(context);
}
public static Behavior<BaseMsg> create() {
return Behaviors.setup(RootActor::new);
}
}
创建AkkaContext
AkkaContext用于储存ActorSystem上下文,并提供创建Actor的方法。
final public class AkkaContext {
private static ActorSystem<BaseMsg> ACTOR_SYSTEM;
public static void initActorSystem() {
ACTOR_SYSTEM = ActorSystem.create(RootActor.create(), "ActorSystem");
}
public static ActorRef<BaseMsg> createActor(Behavior<BaseMsg> behavior, String name) {
return ACTOR_SYSTEM.systemActorOf(behavior, name, Props.empty());
}
}
服务器启动时调用initActorSystem初始化akka上下文。需要生成Actor则调用createActor。
创建ConnectActorManager和ConnectActor
在登录服中,我们会为每一个连接分配一个Actor,称之为ConnectActor,用于接收各个连接发送过来的消息,并串行地处理消息逻辑。
为了管理ConnectActor,我们需要一个ConnectActorManager用于存储每一个连接与connectActor的映射关系。
首先我们需要为每一个连接分配一个唯一id,称之为connectId。
修改LoginNettyHandler中负责处理建立连接逻辑的接口channelActive
// 登录服ctx自定义属性
private static final AttributeKey<HashMap<String, Object>> loginContextAttr = AttributeKey.valueOf("login");
/**
* 建立连接
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = address.getAddress().getHostAddress();
HashMap<String, Object> context = this.getContextAttrMap(ctx);
if (context == null) {
context = new HashMap<>();
ctx.channel().attr(loginContextAttr).set(context);
}
// 先只用uuid的64位来做connectId,只要不超过64位理论上不会重复
long connectId = UUID.randomUUID().getLeastSignificantBits();
context.put("connectId", connectId);
context.put("ip", ip);
if (ctx.channel().isActive()) {
log.info("创建连接—成功:ip = {},connectId = {}", ip, connectId);
}
}
/**
* 从ctx中获取自定义属性参数
*/
private HashMap<String, Object> getContextAttrMap(ChannelHandlerContext ctx) {
return ctx.channel().attr(loginContextAttr).get();
}
我们为每一个连接设置了一个存放自定义属性的HashMap,然后使用UUID生成了一个connectId存入其中。
然后我们创建一个ConnectActor类,和RootActor类类似,继承BaseActor类,并实现create方法。
public class ConnectActor extends BaseActor {
private long connectId;
private ChannelHandlerContext ctx;
public ConnectActor(ActorContext<BaseMsg> context, long connectId, ChannelHandlerContext ctx) {
super(context);
this.connectId = connectId;
this.ctx = ctx;
}
/**
* 生成一个ConnectActor的行为
*/
public static Behavior<BaseMsg> create(long connectId, ChannelHandlerContext ctx) {
return Behaviors.setup(context -> new ConnectActor(context, connectId, ctx));
}
}
创建ConnectActorManager类,用于管理所有的ConnectActor。
@Component
public class ConnectActorManager {
private final Map<Long, ActorRef<BaseMsg>> connectActorMap = new ConcurrentHashMap<>();
public ActorRef<BaseMsg> getConnectActor(long connectId) {
return connectActorMap.get(connectId);
}
public static ConnectActorManager getInstance() {
return SpringUtils.getBean(ConnectActorManager.class);
}
/**
* 创建一个ConnectActor
*/
public ActorRef<BaseMsg> createConnectActor(long connectId, ChannelHandlerContext ctx) {
ActorRef<BaseMsg> actor = AkkaContext.createActor(ConnectActor.create(connectId, ctx), String.valueOf(connectId));
connectActorMap.put(connectId, actor);
return actor;
}
}
在ConnectActorManager类中,我们定义了一个ConcurrentHashMap,用于存放所有的ConnectActor,因为Netty接收数据是多线程的,所以可能有多个线程同时对其进行读写操作,所以使用ConcurrentHashMap保证线程安全。
生成actor并发送消息给它
我们修改LoginNettyHandler类,处理读取数据的接口channelRead0.
@Override
protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
HashMap<String, Object> context = this.getContextAttrMap(ctx);
long connectId = (long)context.get("connectId");
ConnectActorManager actorManager = SpringUtils.getBean(ConnectActorManager.class);
ActorRef<BaseMsg> connectActor = actorManager.getConnectActor(connectId);
if (connectActor == null) {
connectActor = actorManager.createConnectActor(connectId, ctx);
}
log.info(new String(msg));
BaseMsg baseMsg = new BaseMsg();
connectActor.tell(baseMsg);
ctx.channel().writeAndFlush(msg);
}
当接收到数据时,我们从ctx中获得建立连接时生成的connectId,再用其去ConnectActorManager中获取对应的connectActor,没有则创建一个新的。
然后我们创建一个消息BaseMsg,将其发送给connectActor进行处理。
最后我们将数据原封不动地返回回去。
运行一下试试。
可以看到我们的服务器收到了test消息,并且给ConnectActor发送了一个BaseMsg消息。
课后作业
现在我们可以给ConnectActor发送消息了,是不是可以将BaseMsg细分成更多不一样的消息,用于处理ctx的不同类型的数据。
我预计为BaseMsg创建不同的子类,用来通知ConnectActor来做出不同的行为。
ClientUpMsg:客户端上行数据消息;
ConnectClosedMsg:客户端断开消息;
但是我不会在这里将代码展示出来,我希望读者们可以自己去实现这两个消息,只需要打印出不同的日志就行。
结尾
本节我们讲了Actor模型并且在实战中使用了它。
Actor模型是一个非常好用的并发计算模型,有了它可以使开发者不用过多关心并发问题和数据共享问题,只需要专注于业务逻辑的开发即可。
希望本节的内容能对你有所帮助,有任何问题可以评论或私信。
感兴趣的同学可以关注该专栏,我会持续更新更多关于游戏服务器开发的内容。