使用JGroups构建缓存同步节点

猪小花1号2018-09-05 09:41

作者:廖祥俐


本文主要介绍了JGroups的基本知识,及结合JGroups构建一个集群中具有缓存同步的节点示例,并对使用JGroups需要注意的一些地方进行指出(主要是在集群中,节点相互感知不到的网络设置问题)


1, JGroups的基本介绍

JGroups是一个可靠的群组通讯Java工具包。它基于IP组播(IP multicast),但在可靠性,组成员管理上对它作了扩展。

JGroups的可靠性体现在:

1,对所有接收者的消息的无丢失传输(通过丢失消息的重发) 2,大消息的分割传输和重组 3,消息的顺序发送和接收 4,原子性:消息要么被所有接收者接收,要么全不

JavaGroups的成员关系管理体现在:

1,可以知道组内有哪些成员 2,成员的加入,离开,掉线等的通知

JavaGroups的主要功能特征:

  • 组的创建与删除。组成员能在LAN或WAN环境内互相发送消息
  • 组的成员加入或离开
  • 组成员的检测和通知:加入,离开,掉线
  • 检测与移除已掉线的成员
  • 消息的组播 (member-to-group或point-to-multipoint)
  • 消息的点对点发送 (member-to-member或point-to-point)
  • 支持UDP (IP Multicast), TCP, JMS等传输协议
  • 免费开放源代码


2,基本概念

JGroups的官网地址 JGroups的源码地址

pom.xml中添加:

    <dependency>
        <groupId>org.jgroups</groupId>
        <artifactId>jgroups</artifactId>
        <version>3.5.1.Final</version>
    </dependency>

注:JDK1.6支持的最高版本为3.5.x,不支持3.6.x及以上版本的JGroups


1)基本概念:信道

JGroups 使用 信道(JChannel)作为连接到组、发送和接收消息的主要API,创建一个信道有以下几种方式,通常是采用默认值(JGroups自带的udp.xml),或者自己传入protocol相关的配置文件。

一个节点程序要加入一个集群发送消息,则这个程序必须先创建一个信道,并通过集群提供的名字进行连接,(集群的名字可以理解为一个key,通过这个key进行分组)。信道建立完成后,成员可以通过该信道进行发送和接收其它组成员的消息,组内成员也可通过断开连接离开集群。信道也可以重新被利用:断开连接后可通过重新连接连接到群组。

每个信道都有一个唯一的地址,任何一个成员也有一个唯一的地址。信道总是知道群组所有成员的地址(注:需要查看群组成员的具体物理地址也可以通过信道去执行),可以通过信道获取群组所有成员的地址列表,这个列表又称作视图(View)。

成员无论是加入还是离开群组,或者错误被检查到,都会产生视图(View)上的变化,在此会创建一个新的视图,并发送到其它成员,让所有成员保持同步。

信道使用的属性经常通过XML文件提供,也可以通过字符串、URL、DOM树等方式进行提供。

JChannel的状态转移图如下所示:


2)基本概念:视图

视图用来保存群组中的成员列表,以及创建人,如下代码所示:

public class View implements Comparable<View>, Streamable, Iterable<Address> {
...
 protected ViewId    view_id;
protected Address[] members;
...
}

其中ViewId的结构如下:

public class ViewId implements Comparable<ViewId>, Streamable {
protected Address creator;   // Address of the creator of this view
    protected long    id=0;      // Lamport time of the view
...
}

注:id表示群组创建以来加入该视图的第id个成员


3)基本概念:消息

JGroups中封装了消息类,包含一个字节缓冲区、发送和接受者地址,如下:

public class Message implements Streamable {
    protected Address          dest_addr; // 目标地址
    protected Address          src_addr; // 源地址
    protected byte[]           buf; // 消息字节长度
    protected int              offset; // 有效消息的偏移量(一般为0)
    protected int              length; // 消息长度(一般为buf.length)
...
}

当接受者地址为null的时候,默认是往群组所有成员进行组播。


4)基本概念:接收器

成员在怎样去接收消息呢?通过设置信道上的接收器,在有消息过来的时候,即可进行处理,具体的处理逻辑,在Receiver 中。Receiver 以一个回调的形式进行监听群组的状态变化,如视图发生改变,有消息过来,或者同步状态等

channel.setReceiver(Receiver r)

Receiver接口的源码如下:

public interface Receiver extends MessageListener, MembershipListener {
}
public interface MessageListener {
 void          receive(Message msg);
void getState(OutputStream output) throws Exception;
void setState(InputStream input) throws Exception;
}
public interface MembershipListener {
void viewAccepted(View new_view);
void suspect(Address suspected_mbr);
void block();
void unblock();
}

JGroups实现了一个空的Receiver实例:ReceiverAdapter,默认对回调不进行任何处理。


3,使用

需求:在一个集群里面,节点通过JGroups相互感知,每个节点管理本机的Memcached,当节点对本机Memcache进行set或者remove操作时,向集群广播这一消息,集群中的节点在接收到消息之后,也进行相应的set或者remove操作,完成集群节点间缓存的同步。

具体的一个同步缓存节点,可以定义如下:

public class SyncCacheNode extends ReceiverAdapter implements SyncCache {

@Resource(name="musicrep_message_local_memcached") private MemcacheManager memcacheManager; // 本机的Memcache
@Value("${musicrep.synCache.group.key}") private String groupKey; // 信道名字
@Value("${musicrep.synCache.group.protocol}") private String CONFIG_XML; // 协议配置

private Channel ch;

private View view;
private int rank=-1; // 当前节点在集群节点的排名
private int cluster_size=-1; // 集群大小
private Map<Address, PhysicalAddress> state = new ConcurrentHashMap<Address, PhysicalAddress>(); // 集群状态 address/实际ip地址,注:需线程安全

private static final int retry = 5;

@SuppressWarnings("resource")
public void start(String name) throws Exception {
System.setProperty("java.net.preferIPv4Stack", "true"); // 禁用Ipv6
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
ch = new JChannel(is).name(name);
ch.setDiscardOwnMessages(true); // 不要接收到自己发出的消息
ch.setReceiver(this);
ch.connect(groupKey);
ch.getState(null, 1000); // 同步状态,从第一个节点处把当前集群的信息同步
}

public void stop(){
ch.close();
}
public String clusterInfo(){
...
return sb.toString();
}

@Override
public void receive(Message msg){
try{
Request req = (Request)Util.objectFromByteBuffer(msg.getBuffer()); // 反序列化
logger.info(req.getCacheKey() + "," + memcacheManager.get(req.getCacheKey()));
switch (req.getType()) {
case SET:
{
memcacheManager.set(req.getCacheKey(), req.getCacheObj()); // set 对象
}
break;
case REMOVE:
{
memcacheManager.remove(req.getCacheKey()); // remove 对象
}
default:
break;
}
} catch(Exception e) {
logger.error("exception receiving message from " + msg.getSrc(), e);
}
}

public void getState(OutputStream output) throws Exception {
Util.objectToStream(state, new DataOutputStream(output));
}

@SuppressWarnings("unchecked")
public void setState(InputStream input) throws Exception {
Map<Address, PhysicalAddress> memMap=(Map<Address, PhysicalAddress>)Util.objectFromStream(new DataInputStream(input));
state.clear();
state.putAll(memMap);
}

@Override
public void viewAccepted(View view){ // 监听视图(view)变化
List<Address> left_members = this.view!=null && view !=null ?
Util.leftMembers(this.view.getMembers(), view.getMembers()) : null; // 找出离开的节点成员
this.view = view;
Address local_addr = ch.getAddress();
setCluster_size(view.size());
List<Address> members = view.getMembers();
int old_rank = rank;
for(int i=0; i < members.size(); i++) {
Address tmp=members.get(i);
if(tmp.equals(local_addr)) {
rank=i;
break;
}
}
for(Address address : members){ // 更新集群状态
PhysicalAddress physicalAddress = (PhysicalAddress) ch.down(new Event(Event.GET_PHYSICAL_ADDRESS, address)); // 获取对应的物理地址
state.put(address, physicalAddress);
}
if(old_rank == -1 || old_rank != rank){ // rank状态发生了变化,记录一下
logger.info("my rank is " + rank);
}

if(left_members != null && !left_members.isEmpty()) {
for(Address mbr: left_members) {
handleLeftMember(mbr); // 处理离开的成员,在此只是简单的移除并log
}
}
logger.info(clusterInfo());
}

private void handleLeftMember(Address mbr) {
state.remove(mbr); // 集群状态中删除该成员,暂时先只这么做
logger.info("address : " + mbr + " left the cluster, can not recieve the message...");
}

@Override
public boolean clearCache(String key) {
try{
memcacheManager.remove(key);
Request clear = new Request(Type.REMOVE, key, null); // 包装一个消息结构
byte[] buf = Util.objectToByteBuffer(clear);
{
ch.send(new Message(null, buf)); // 组播发送
}
// for(Address address : state.keySet()){ // 点对点发送clear消息
// ch.send(new Message(address, buf));
// }
return true;
} catch(Exception e) {
logger.error("error clear cache", e);
return false;
}

}

@Override
public boolean setCache(String key, Object value) {
try{
memcacheManager.set(key, value);
Request set = new Request(Type.SET, key, value); // 包装一个消息结构
byte[] buf = Util.objectToByteBuffer(set);
for(Address address : state.keySet()){ // 点对点发送clear消息
ch.send(new Message(address, buf));
}
return true;
} catch (Exception e){
logger.error("error set cache", e);
return false;
}
}

public int getCluster_size() {
return cluster_size;
}

public void setCluster_size(int cluster_size) {
this.cluster_size = cluster_size;
}
}


4,配置的一些注意点

测试网络是否支持多播

JGroups的jar包里面提供测试网络,在两台机器上分别运行以下两个测试代码,首先

java org.jgroups.tests.McastReceiverTest

然后

java org.jgroups.tests.McastSenderTest

IPv6的禁用

IPv6禁用的两种方式:

1,启动参数禁用:

java -Djava.net.preferIPv4Stack=true

2,程序代码禁用:

System.setProperty("java.net.preferIPv4Stack", "true"); // 禁用Ipv6

ttl值的设置(坑)

TTL的作用是限制IP数据包在计算机网络中的存在的时间,在使用JGroups,很容易会采用JGroups包里自带的协议配置文件,如udp.xml,里面对于ttl的配置:

<UDP
         mcast_port="${jgroups.udp.mcast_port:45588}"
         ip_ttl="0"
         tos="8"
... />

这样的代码在本机(windows)测试是没有问题的,开启两个JGroups程序,它们相互之间能够通过连接同一个信道进行通信,然而,在不同机器,即使在同一个子网里面,会发现机器通过信道无法相互发现。

这里可以看看ttl值的作用:

TTL字段由IP数据包的发送者设置,在IP数据包从源到目的的整个转发路径上,每经过一个路由器,路由器都会修改这个TTL字段值,具体的做法是把该TTL的值减1,然后再将IP包转发出去。如果在IP包到达目的IP之前,TTL减少为0,路由器将会丢弃收到的TTL=0的IP包并向IP包的发送者发送 ICMP time exceeded消息。

所以当节点相互无法发现,而又属于同一个子网时,可以通过更改默认的ttl值(更改默认的配置文件),然后查看运行结果。


JGroups获取物理地址

在使用JGroups的时候,希望能对信道的物理地址进行获取定位,然而,JGroups的文档中及很多接口并没有涉及如何获取物理地址的方法,从StackOverFlow上,获取这么一种方法

public void receive(Message msg) {
    String srcIp;   
    Address addr = msg.getSrc();

    PhysicalAddress physicalAddr = (PhysicalAddress)channel.down(new Event(Event.GET_PHYSICAL_ADDRESS, addr));

    if(physicalAddr instanceof IpAddress) {
        IpAddress ipAddr = (IpAddress)physicalAddr;
        InetAddress inetAddr = ipAddr.getIpAddress();
        srcIp = inetAddr.getHostAddress();
    }
}

This is a hack, but it works. The JGroups team has stated that this is dangerous as they could change the underlying code at any time, so use with caution.




网易云大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者廖祥俐授权发布