作者:廖祥俐
本文主要介绍了JGroups的基本知识,及结合JGroups构建一个集群中具有缓存同步的节点示例,并对使用JGroups需要注意的一些地方进行指出(主要是在集群中,节点相互感知不到的网络设置问题)
JGroups是一个可靠的群组通讯Java工具包。它基于IP组播(IP multicast),但在可靠性,组成员管理上对它作了扩展。
JGroups的可靠性体现在:
1,对所有接收者的消息的无丢失传输(通过丢失消息的重发) 2,大消息的分割传输和重组 3,消息的顺序发送和接收 4,原子性:消息要么被所有接收者接收,要么全不
JavaGroups的成员关系管理体现在:
1,可以知道组内有哪些成员 2,成员的加入,离开,掉线等的通知
JavaGroups的主要功能特征:
在pom.xml
中添加:
<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
<version>3.5.1.Final</version>
</dependency>
注:JDK1.6支持的最高版本为3.5.x,不支持3.6.x及以上版本的JGroups
JGroups 使用 信道(JChannel)作为连接到组、发送和接收消息的主要API,创建一个信道有以下几种方式,通常是采用默认值(JGroups自带的udp.xml
),或者自己传入protocol
相关的配置文件。
一个节点程序要加入一个集群发送消息,则这个程序必须先创建一个信道,并通过集群提供的名字进行连接,(集群的名字可以理解为一个key,通过这个key进行分组)。信道建立完成后,成员可以通过该信道进行发送和接收其它组成员的消息,组内成员也可通过断开连接离开集群。信道也可以重新被利用:断开连接后可通过重新连接连接到群组。
每个信道都有一个唯一的地址,任何一个成员也有一个唯一的地址。信道总是知道群组所有成员的地址(注:需要查看群组成员的具体物理地址也可以通过信道去执行),可以通过信道获取群组所有成员的地址列表,这个列表又称作视图(View)。
成员无论是加入还是离开群组,或者错误被检查到,都会产生视图(View)上的变化,在此会创建一个新的视图,并发送到其它成员,让所有成员保持同步。
信道使用的属性经常通过XML文件提供,也可以通过字符串、URL、DOM树等方式进行提供。
JChannel的状态转移图如下所示:
视图用来保存群组中的成员列表,以及创建人,如下代码所示:
public class View implements Comparable<View>, Streamable, Iterable<Address> {
...
protected ViewId view_id;
protected Address[] members;
...
}
其中ViewId
的结构如下:
public class ViewId implements Comparable<ViewId>, Streamable {
protected Address creator; // Address of the creator of this view
protected long id=0; // Lamport time of the view
...
}
注:id
表示群组创建以来加入该视图的第id
个成员
JGroups中封装了消息类,包含一个字节缓冲区、发送和接受者地址,如下:
public class Message implements Streamable {
protected Address dest_addr; // 目标地址
protected Address src_addr; // 源地址
protected byte[] buf; // 消息字节长度
protected int offset; // 有效消息的偏移量(一般为0)
protected int length; // 消息长度(一般为buf.length)
...
}
当接受者地址为null的时候,默认是往群组所有成员进行组播。
成员在怎样去接收消息呢?通过设置信道上的接收器,在有消息过来的时候,即可进行处理,具体的处理逻辑,在Receiver 中。Receiver 以一个回调的形式进行监听群组的状态变化,如视图发生改变,有消息过来,或者同步状态等
channel.setReceiver(Receiver r)
Receiver接口的源码如下:
public interface Receiver extends MessageListener, MembershipListener {
}
public interface MessageListener {
void receive(Message msg);
void getState(OutputStream output) throws Exception;
void setState(InputStream input) throws Exception;
}
public interface MembershipListener {
void viewAccepted(View new_view);
void suspect(Address suspected_mbr);
void block();
void unblock();
}
JGroups实现了一个空的Receiver
实例:ReceiverAdapter
,默认对回调不进行任何处理。
需求:在一个集群里面,节点通过JGroups相互感知,每个节点管理本机的Memcached,当节点对本机Memcache进行set或者remove操作时,向集群广播这一消息,集群中的节点在接收到消息之后,也进行相应的set或者remove操作,完成集群节点间缓存的同步。
具体的一个同步缓存节点,可以定义如下:
public class SyncCacheNode extends ReceiverAdapter implements SyncCache {
@Resource(name="musicrep_message_local_memcached") private MemcacheManager memcacheManager; // 本机的Memcache
@Value("${musicrep.synCache.group.key}") private String groupKey; // 信道名字
@Value("${musicrep.synCache.group.protocol}") private String CONFIG_XML; // 协议配置
private Channel ch;
private View view;
private int rank=-1; // 当前节点在集群节点的排名
private int cluster_size=-1; // 集群大小
private Map<Address, PhysicalAddress> state = new ConcurrentHashMap<Address, PhysicalAddress>(); // 集群状态 address/实际ip地址,注:需线程安全
private static final int retry = 5;
@SuppressWarnings("resource")
public void start(String name) throws Exception {
System.setProperty("java.net.preferIPv4Stack", "true"); // 禁用Ipv6
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
ch = new JChannel(is).name(name);
ch.setDiscardOwnMessages(true); // 不要接收到自己发出的消息
ch.setReceiver(this);
ch.connect(groupKey);
ch.getState(null, 1000); // 同步状态,从第一个节点处把当前集群的信息同步
}
public void stop(){
ch.close();
}
public String clusterInfo(){
...
return sb.toString();
}
@Override
public void receive(Message msg){
try{
Request req = (Request)Util.objectFromByteBuffer(msg.getBuffer()); // 反序列化
logger.info(req.getCacheKey() + "," + memcacheManager.get(req.getCacheKey()));
switch (req.getType()) {
case SET:
{
memcacheManager.set(req.getCacheKey(), req.getCacheObj()); // set 对象
}
break;
case REMOVE:
{
memcacheManager.remove(req.getCacheKey()); // remove 对象
}
default:
break;
}
} catch(Exception e) {
logger.error("exception receiving message from " + msg.getSrc(), e);
}
}
public void getState(OutputStream output) throws Exception {
Util.objectToStream(state, new DataOutputStream(output));
}
@SuppressWarnings("unchecked")
public void setState(InputStream input) throws Exception {
Map<Address, PhysicalAddress> memMap=(Map<Address, PhysicalAddress>)Util.objectFromStream(new DataInputStream(input));
state.clear();
state.putAll(memMap);
}
@Override
public void viewAccepted(View view){ // 监听视图(view)变化
List<Address> left_members = this.view!=null && view !=null ?
Util.leftMembers(this.view.getMembers(), view.getMembers()) : null; // 找出离开的节点成员
this.view = view;
Address local_addr = ch.getAddress();
setCluster_size(view.size());
List<Address> members = view.getMembers();
int old_rank = rank;
for(int i=0; i < members.size(); i++) {
Address tmp=members.get(i);
if(tmp.equals(local_addr)) {
rank=i;
break;
}
}
for(Address address : members){ // 更新集群状态
PhysicalAddress physicalAddress = (PhysicalAddress) ch.down(new Event(Event.GET_PHYSICAL_ADDRESS, address)); // 获取对应的物理地址
state.put(address, physicalAddress);
}
if(old_rank == -1 || old_rank != rank){ // rank状态发生了变化,记录一下
logger.info("my rank is " + rank);
}
if(left_members != null && !left_members.isEmpty()) {
for(Address mbr: left_members) {
handleLeftMember(mbr); // 处理离开的成员,在此只是简单的移除并log
}
}
logger.info(clusterInfo());
}
private void handleLeftMember(Address mbr) {
state.remove(mbr); // 集群状态中删除该成员,暂时先只这么做
logger.info("address : " + mbr + " left the cluster, can not recieve the message...");
}
@Override
public boolean clearCache(String key) {
try{
memcacheManager.remove(key);
Request clear = new Request(Type.REMOVE, key, null); // 包装一个消息结构
byte[] buf = Util.objectToByteBuffer(clear);
{
ch.send(new Message(null, buf)); // 组播发送
}
// for(Address address : state.keySet()){ // 点对点发送clear消息
// ch.send(new Message(address, buf));
// }
return true;
} catch(Exception e) {
logger.error("error clear cache", e);
return false;
}
}
@Override
public boolean setCache(String key, Object value) {
try{
memcacheManager.set(key, value);
Request set = new Request(Type.SET, key, value); // 包装一个消息结构
byte[] buf = Util.objectToByteBuffer(set);
for(Address address : state.keySet()){ // 点对点发送clear消息
ch.send(new Message(address, buf));
}
return true;
} catch (Exception e){
logger.error("error set cache", e);
return false;
}
}
public int getCluster_size() {
return cluster_size;
}
public void setCluster_size(int cluster_size) {
this.cluster_size = cluster_size;
}
}
JGroups
的jar包里面提供测试网络,在两台机器上分别运行以下两个测试代码,首先
java org.jgroups.tests.McastReceiverTest
然后
java org.jgroups.tests.McastSenderTest
IPv6禁用的两种方式:
1,启动参数禁用:
java -Djava.net.preferIPv4Stack=true
2,程序代码禁用:
System.setProperty("java.net.preferIPv4Stack", "true"); // 禁用Ipv6
TTL的作用是限制IP数据包在计算机网络中的存在的时间,在使用JGroups
,很容易会采用JGroups包里自带的协议配置文件,如udp.xml
,里面对于ttl
的配置:
<UDP
mcast_port="${jgroups.udp.mcast_port:45588}"
ip_ttl="0"
tos="8"
... />
这样的代码在本机(windows)测试是没有问题的,开启两个JGroups
程序,它们相互之间能够通过连接同一个信道进行通信,然而,在不同机器,即使在同一个子网里面,会发现机器通过信道无法相互发现。
这里可以看看ttl
值的作用:
TTL字段由IP数据包的发送者设置,在IP数据包从源到目的的整个转发路径上,每经过一个路由器,路由器都会修改这个TTL字段值,具体的做法是把该TTL的值减1,然后再将IP包转发出去。如果在IP包到达目的IP之前,TTL减少为0,路由器将会丢弃收到的TTL=0的IP包并向IP包的发送者发送 ICMP time exceeded消息。
所以当节点相互无法发现,而又属于同一个子网时,可以通过更改默认的ttl
值(更改默认的配置文件),然后查看运行结果。
在使用JGroups的时候,希望能对信道的物理地址进行获取定位,然而,JGroups的文档中及很多接口并没有涉及如何获取物理地址的方法,从StackOverFlow上,获取这么一种方法:
public void receive(Message msg) {
String srcIp;
Address addr = msg.getSrc();
PhysicalAddress physicalAddr = (PhysicalAddress)channel.down(new Event(Event.GET_PHYSICAL_ADDRESS, addr));
if(physicalAddr instanceof IpAddress) {
IpAddress ipAddr = (IpAddress)physicalAddr;
InetAddress inetAddr = ipAddr.getIpAddress();
srcIp = inetAddr.getHostAddress();
}
}
This is a hack, but it works. The JGroups team has stated that this is dangerous as they could change the underlying code at any time, so use with caution.
网易云大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者廖祥俐授权发布