Java中使用akka手记二 Cluster
基础知识
-
2.3.2的cluster还有些想实现的东西没有实现,包括:actor分区(负载均衡) actor handoff(临时故障时的actor处理) actor重新平衡(增减节点时有用) actor状态复制机制(类似做M/S作用)
-
2.3.2的cluster已经有的能力有:节点-集群-leader节点; membership; gossip协议同步状态; VECTOR CLOCKS保障顺序; 失败检测-节点不可达算法; seed节点-新节点加入时可以向这些节点发消息,但也是可以向任意成员发的; membership生命周期有joining up leaving exiting removed unreachable几种状态。
依赖
maven中添加akka-cluster包:
1 2 3 4 5
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.10</artifactId>
<version>2.3.1</version>
</dependency>
配置
下面的配置启用了Cluster。application.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
}
这里面定义的seed节点,用来作为cluster的初始化和加入点。要跨机器的话,就要修改这里的127.0.0.1了。
代码
下面是一个使用cluster的actor实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
public class SimpleClusterListener extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
//subscribe to cluster changes
@Override
public void preStart() {
//#subscribe
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
MemberEvent.class, UnreachableMember.class);
//#subscribe
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
-
这个actor把自己注册了一个cluster事件。当cluster上有风吹草动时,都会收到消息。
-
运行这段代码只需要运行后面代码中的SimpleClusterApp。
加入种子节点
-
一开始能够预料的节点们被叫做种子节点(seed nodes),有节点加入的时候,会等种子节点的返回确认才算是加入成功。
-
两种方式指定seed nodes的位置,一种是在application.xml中,另一种是在jvm的参数里。
自动和手动down机
-
被失败检测出来不可达的节点,会被leader进行处理,也可以手动搞下来。
-
akka.cluster.atuo-down-unreadchable-after=10s 10秒不可达就自动关
-
也可以写代码 Cluster.get(system).down(address)
-
网络分裂时,这个自动down有可能会出现脑裂。
cluster的事件
- ClusterEvent.MemberUp
- ClusterEvent.MemberExited
- ClusterEvent.MemberRemoved
- ClusterEvent.UnreachableMember
- ClusterEvent.ReachableMember
代码
-
sample.cluster.simple.main启动了system。
-
一共三个actorSystem被启动,端口为2551 2552 0,0的时候会是随机最大端口。
-
application.conf里定义了2551与2552为seed nodes,所以2551与2552先组成了cluster,0加入的时候会收到2551和2552的确认。
-
gossip协议的功劳让一个状态值可能会被重复传递。
原创文章如转载,请注明:转载自五四陈科学院[http://www.54chen.com]
Posted by 54chen java,akka