Etcd Java(etcd4j客户端)使用小记 --- 持久化监听模型实现

未来已来2018-09-18 13:02

本文来自网易云社区


作者:乔安然

etcd是CoreOS开发的分布式高可用键值存储系统。随着CoreOS和K8s等项目在开源社区日益火热,etcd组件也渐渐为开发人员所关注。
etcd也是受到ZooKeeper与doozer启发而催生的项目,除了拥有类似功能,更专注于以下四点。
  • 简单:基于HTTP+JSON的API让你用curl就可以轻松使用(V3版本不再使用JSON)。
  • 安全:可选SSL客户认证机制。
  • 快速:每个实例每秒支持一千次写操作。
  • 可信:使用Raft算法充分实现了分布式。
在项目对比etcd和zookeeper之后,etcd更轻型容易部署安装使用,zk特性比较丰富,但已老态龙钟,需要点新鲜选择。在去年我党生日迎来了etcd v3(使用gRPC、改变key ttl使用租约等),蛋疼的发现java客户端etcd4j不支持v3版本,v2版本目前可以满足我们需求,继续使用etcd,后续会关注etcd4j更新。本文基于etcd v2版本使用。
etcd事件监听
etcd没有提供被动监听的实现,我们可以主动轮训监听key的变化。如果想监听其子节点可以通过recursive=true参数
“curl http://127.0.0.1:2379/v2/keys/foo?wait=true”
对/foo的改变会受到通知和返回相关变化事件
HTTP/1.1 200 OK
    Content-Type: application/json
    X-Etcd-Cluster-Id: e88d54f6225f06ad
    X-Etcd-Index: 271
    X-Raft-Index: 872202
    X-Raft-Term: 5
    Date: Sat, 26 Sep 2015 08:43:17 GMT
    Transfer-Encoding: chunked
{
"action": 
"set",
"node": {
"createdIndex": 7,
"key": "/foo",
"modifiedIndex": 7,
"value": "bar"
},
"prevNode": {
"createdIndex": 6,
"key": "/foo",
"modifiedIndex": 6,
"value": "bar"
}
}
etcd中的数据变化(包括目录和key、value变化)相关类型事件:set、get、create、update、delete、expire、compareAndSwap、compareAndDelete。在返回的http response中的action属性就是事件类型,如上图所示。
etcd记录下最近1000次事件变化,使用index我们可以watch其key在过去发生的变化。使用node的modifiedIndex+1就可以监听下一次事件:
curl 'http://127.0.0.1:2379/v2/keys/foo?wait=true&waitIndex=8'
但当事件突发比如1秒内产生几千条事件,事件监听处理比较慢或者未监听是发生了客户端事件丢失。当我们index超过etcd记录的返回,就返回如下消息:
{ "errorCode" : 401 , "message" : "The event in requested index is outdated and cleared" , "cause" : "the requested history has been cleared [1008/8]" , "index" : 2007 }
官方文档中推荐使用X-Etcd-Index+1作为waitIndex代替使用node的modifiedIndex+1:
1. X-Etcd-Index代表etcd当前index,为所有key共享。单挑递增总是大于或等于modifiedIndex,而这个modifiedIndex是etcd已经存储事件的index
2. modifiedIndex和X-Etcd-Index之间不代表有事件发生,当fetch 相关key不会有事件返回。
使用modifiedIndex+1只是功能表示后续监听,它比X-Etcd-Index+1小,很可能相关事件已经被清除,可能会受到401EventIndexCleared 错误。在初始监听或断开重新监听,index应该使用X-Etcd-Index+1,而不是modifiedIndex+1。
上述只能监听一次事件变化,Etcd还提供流式监听,在curl的时候加stream=true参数,会和etcd服务端建议http长连接,后续的每个事件都会通过这个http chunk推送给客户端。相对比一次性监听,更简洁,可靠性更高。但有一问题,流式监听监听从命令发出之后的时间,先前的时间是监听不到的,如果再加waitIndex参数,waitIndex小于或等于启动监听时etcd的X-Etcd-Index,只能接受到满足条件的事件,后续不再接收,只能收到一个事件。waitIndex大于X-Etcd-Index无效。
Etcd4j并没有支持流式监听,而且流式监听无法做到监听时的客户端和服务端数据同步。我们使用Etcd4j实现监听持久化,实现如下特性:
1. 事件类型简单化,etcd的set、get、create、update、delete、expire、compareAndSwap、compareAndDelete事件类型,简化成Add、Upate、Removed三种类型
2. 支持目录、key全量变化监听,支持子节点变化监听。例如etcd删除目录时子节点也随之删除,只产生删除目录事件,而子节点删除也需要通知其删除事件。
3. 需要解决事件突发(超过etcd记录event数量)造成监听index丢失,重新再同步需要保证数据一致、同步前后的变化通知上层监听。

事件和处理接口定义:
public class EtcdEvent {

public enum Type{
added, removed, updated
}

private EtcdEvent.Type type;
private T preValue;
private T value;
private String key;
private Throwable cause;
private long index;

public EtcdEvent(Type type, T preValue, T value, String key, Throwable cause, long index) {
this.type = type;
this.preValue = preValue;
this.value = value;
this.key = key;
this.cause = cause;
this.index = index;
}
}

{

void handle(EtcdEvent event);
}


watch接口定义
public interface Watch {

// 是否属于该watch的范围,是则处理建构事件通知上层EtcdEventHandler
Watch accept(EtcdKeysResponse response);

void stop();

//用于初始同步和401异常再同步
Watch sync();

boolean isSync();

// 当前watch的index
long currentIndex()

}
当前watch有三种实现。
1、ValueWatch 针对单一key的value值变化监听
2、DirectoryWatch 针对目录的变化监听,不支持子节点
3、EtcdNodeWatch 针对某一key及子节点(包括目录和值变化)监听

WatchService是根据syncIndex定时监听etcd变化,将其获取结果交给注册的watch进行处理。
启动流程:
1、确保监听的根目录存在,并获取初始X-Etcd-Index
protected void ensureDirectoryExists() {
try {
logger.debug("attempting to put directory {}", directory);
EtcdKeysResponse response = client.get().putDir(directory).prevExist(false).send().get();
logger.debug("put directory {} successful", directory);

lock.writeRunnable(()->{
syncIndex.set(response.node.modifiedIndex);
etcdIndex.set(response.node.modifiedIndex);
});
logger.debug("directory {} index recorded", directory);
} catch (EtcdException ee) {
logger.debug("put directory {} failed, it exists", directory);
lock.writeRunnable(()->{
syncIndex.set(ee.index);
etcdIndex.set(ee.index);
});
logger.debug("directory {} index recorded", directory);
} catch (Exception e) {
throw new CommonException(String.format("could not create directory %s", directory), e);
}
}

2. 开始watch节点,使用ScheduledExecutorService提供FiexDelay任务,该任务就是监听变化
将返回的reponse交给watch处理。如果发生异常,让所有watch重新同步
protected void startWatchingNodes(){
logger.debug("start watch nodes thread.");
future = this.executorService.scheduleWithFixedDelay(()->{
long currentIndex = syncIndex.get();
try{
EtcdKeysResponse response =
client.get().getDir(directory)
.recursive()
.sorted()
.waitForChange(currentIndex + 1)
.timeout(timeoutInMs, TimeUnit.MILLISECONDS)
.send()
.get();
lock.writeRunnable(()->{
etcdIndex.set(response.etcdIndex);
if(syncIndex.compareAndSet(currentIndex, response.node.modifiedIndex)){
indexDelta.set(etcdIndex.get() - syncIndex.get());
watchList.forEach(watch -> watch.accept(response));
}
}).run();
} catch (EtcdException e) {
logger.warn("watch nodes exception, directory: "+ directory, e);
ensureDirectoryExists();
lock.writeRunnable(()->{
etcdIndex.set(e.index);
syncIndex.set(watchList.stream().mapToLong(w -> w.sync().currentIndex())
.min().orElse(e.index));
indexDelta.set(etcdIndex.get() - syncIndex.get());
}).run();

} catch (TimeoutException e) {
logger.debug("etcd watch timeout , directory:{}, waitIndex:{}", directory, currentIndex + 1);
} catch (Exception e) {
logger.warn("exception thrown while watching "+ directory + ", waitIndex:" + (currentIndex+1), e);
}


},1,1, TimeUnit.MILLISECONDS);

}

注册watch流程
1、调用watch.sync初始同步
2、将watch添加到watch列表中
3、更新watchService中SyncIndex,以watch列表中最小的index为主。
我们看下EtcdNodeWatch的实现
public class EtcdNodeWatch extends AbstractWatch {
//该watch的目录范围
private String directory;
//正则匹配,判断watchService的监听是否属于其范围
private Pattern keyMatcher;
//存储当前节点情况
private Map> nodeMap = new HashMap<>();
。。。
}
同步实现:
@Override
public Watch sync() {
isSync = false;
try {
EtcdKeysResponse response = etcdClient.get().get(directory).dir().recursive().sorted().send().get();

currentIndex = response.etcdIndex;

// 初始加载diretcory下所有节点,加载到nodeMap中
if(response.node.dir){
for(EtcdNode etcdNode : response.node.nodes){
loadEtcdNode(etcdNode, currentIndex);
}
}

//同步前后nodemap中的数据变化需要通知上层
nodeMap = nodeMap.entrySet().stream().filter((entry) ->{
// etcdIndex相同数据无变化
if(entry.getValue().loadIndex == currentIndex){
return true;
}
//nodeMap etcdIndex不一样,说明在etcd中已不存在该节点,统一removed处理。
EtcdValue etcdValue = entry.getValue();
EtcdEvent event = EtcdEvent.builder()
.withType(EtcdEvent.Type.removed)
.withPreValue(etcdValue.value)
.withIndex(currentIndex)
.withKey(entry.getKey())
.build();
fireEvent(eventHandler, event);
return false;
}).collect(Collectors.toMap(e -> e.getKey(), e->e.getValue()));

isSync = true;

} catch (EtcdException e) {
currentIndex = e.index;
// key不存在,说明已被删除,nodeMap中所有都要清理,并通知上层removed
if(e.errorCode == EtcdErrorCode.KeyNotFound){
logger.debug("directory:{} watch sync KeyNotFound , will clear all value and fire removed event.",directory);


for (Map.Entry> entry : nodeMap.entrySet()){
EtcdValue etcdValue = entry.getValue();
EtcdEvent event = EtcdEvent.builder()
.withType(EtcdEvent.Type.removed)
.withPreValue(etcdValue.value)
.withIndex(currentIndex)
.withKey(entry.getKey())
.build();
fireEvent(eventHandler, event);
}
nodeMap.clear();
isSync = true;
}else{
logger.warn("directory watch sync EtcdException:", e);
}
} catch (IOException | TimeoutException | EtcdAuthenticationException e) {
logger.error(format("failed to sync watch for directory %s", directory), e);
}catch (Throwable t){
logger.error("sync watch for director " + directory + " error.", t);
}
return this;
}
构造事件,将etcd事件统一成Add、Update、Removed类型
@Override
protected Optional buildEvent(ObjectMapper mapper, String directory, EtcdKeysResponse response) {
EtcdNode nodeValue = response.node;
EtcdNode preNodeValue = response.prevNode;
if(nodeValue == null && preNodeValue == null){
return Optional.empty();
}
long etcdIndex = response.node.modifiedIndex;

switch (response.action){
case create:
case set:
if(filterValueChange(nodeValue, preNodeValue)){
return Optional.of(EtcdEvent.builder()
.withType(preNodeValue == null ? EtcdEvent.Type.added : EtcdEvent.Type.updated)
.withValue(nodeValue)
.withPreValue(preNodeValue)
.withIndex(etcdIndex)
.withKey(removeDirectory(directory, nodeValue.key))
.build());
}
break;
case expire:
case delete:
case compareAndDelete:
return Optional.of(EtcdEvent.builder()
.withType(EtcdEvent.Type.removed)
.withValue(nodeValue)
.withPreValue(preNodeValue)
.withIndex(etcdIndex)
.withKey(removeDirectory(directory, response.prevNode.key))
.build());
case compareAndSwap:
case update:
return Optional.of(EtcdEvent.builder()
.withType(EtcdEvent.Type.updated)
.withValue(nodeValue)
.withPreValue(preNodeValue)
.withIndex(etcdIndex)
.withKey(removeDirectory(directory, response.node.key))
.build());
default:
break;

}
return Optional.empty();

}
accept处理
@Override
public Watch accept(EtcdKeysResponse response) {

//判断是否属于该watch范围
if(!keyMatcher.matcher(key(response)).matches()){
return this;
}

//构建事件通知上层EtcdEventHandler
long responseIndex = indexResponse(response);
if(currentIndex < responseIndex){
buildEvent(null, directory, response).ifPresent(etcdEvent -> nodeMap.compute(etcdEvent.getKey(), (key, entry) ->{
switch (etcdEvent.getType()){
case added:
case updated:
fireEvent(eventHandler, etcdEvent);
return new EtcdValue((EtcdNode) etcdEvent.getValue(), etcdEvent.getIndex(), response.etcdIndex);
case removed:
// 目录删除,需要清理子节点
if(((EtcdNode)etcdEvent.getValue()).isDir()){
Iterator keyIterator = nodeMap.keySet().iterator();
while (keyIterator.hasNext()){
if(keyIterator.next().startsWith(key)){
keyIterator.remove();
}
}
}
fireEvent(eventHandler, etcdEvent);
return null;
default:
throw new IllegalStateException("unknown event type "+etcdEvent.getType().name());
}
}));
currentIndex = responseIndex;

}else{
logger.debug("filtering event for {}", key(response));
logger.debug("response index {} current index {}", responseIndex, currentIndex);
}
return this;
}



本文来自网易云社区,经作者乔安然授权发布

网易云免费体验馆0成本体验20+款云产品!

更多网易研发、产品、运营经验分享请访问网易云社区