本文来自网易云社区
作者:乔安然
HTTP/1.1 200 OK
Content-Type: application/json
X-Etcd-Cluster-Id: e88d54f6225f06ad
X-Etcd-Index: 271
X-Raft-Index: 872202
X-Raft-Term: 5
Date: Sat, 26 Sep 2015 08:43:17 GMT
Transfer-Encoding: chunked
{
"action":
"set",
"node": {
"createdIndex": 7,
"key": "/foo",
"modifiedIndex": 7,
"value": "bar"
},
"prevNode": {
"createdIndex": 6,
"key": "/foo",
"modifiedIndex": 6,
"value": "bar"
}
}
public class EtcdEvent {
public enum Type{
added, removed, updated
}
private EtcdEvent.Type type;
private T preValue;
private T value;
private String key;
private Throwable cause;
private long index;
public EtcdEvent(Type type, T preValue, T value, String key, Throwable cause, long index) {
this.type = type;
this.preValue = preValue;
this.value = value;
this.key = key;
this.cause = cause;
this.index = index;
}
}
{
void handle(EtcdEvent event);
}
public interface Watch {
// 是否属于该watch的范围,是则处理建构事件通知上层EtcdEventHandler
Watch accept(EtcdKeysResponse response);
void stop();
//用于初始同步和401异常再同步
Watch sync();
boolean isSync();
// 当前watch的index
long currentIndex()
}
protected void ensureDirectoryExists() {
try {
logger.debug("attempting to put directory {}", directory);
EtcdKeysResponse response = client.get().putDir(directory).prevExist(false).send().get();
logger.debug("put directory {} successful", directory);
lock.writeRunnable(()->{
syncIndex.set(response.node.modifiedIndex);
etcdIndex.set(response.node.modifiedIndex);
});
logger.debug("directory {} index recorded", directory);
} catch (EtcdException ee) {
logger.debug("put directory {} failed, it exists", directory);
lock.writeRunnable(()->{
syncIndex.set(ee.index);
etcdIndex.set(ee.index);
});
logger.debug("directory {} index recorded", directory);
} catch (Exception e) {
throw new CommonException(String.format("could not create directory %s", directory), e);
}
}
protected void startWatchingNodes(){
logger.debug("start watch nodes thread.");
future = this.executorService.scheduleWithFixedDelay(()->{
long currentIndex = syncIndex.get();
try{
EtcdKeysResponse response =
client.get().getDir(directory)
.recursive()
.sorted()
.waitForChange(currentIndex + 1)
.timeout(timeoutInMs, TimeUnit.MILLISECONDS)
.send()
.get();
lock.writeRunnable(()->{
etcdIndex.set(response.etcdIndex);
if(syncIndex.compareAndSet(currentIndex, response.node.modifiedIndex)){
indexDelta.set(etcdIndex.get() - syncIndex.get());
watchList.forEach(watch -> watch.accept(response));
}
}).run();
} catch (EtcdException e) {
logger.warn("watch nodes exception, directory: "+ directory, e);
ensureDirectoryExists();
lock.writeRunnable(()->{
etcdIndex.set(e.index);
syncIndex.set(watchList.stream().mapToLong(w -> w.sync().currentIndex())
.min().orElse(e.index));
indexDelta.set(etcdIndex.get() - syncIndex.get());
}).run();
} catch (TimeoutException e) {
logger.debug("etcd watch timeout , directory:{}, waitIndex:{}", directory, currentIndex + 1);
} catch (Exception e) {
logger.warn("exception thrown while watching "+ directory + ", waitIndex:" + (currentIndex+1), e);
}
},1,1, TimeUnit.MILLISECONDS);
}
public class EtcdNodeWatch extends AbstractWatch {
//该watch的目录范围
private String directory;
//正则匹配,判断watchService的监听是否属于其范围
private Pattern keyMatcher;
//存储当前节点情况
private Map> nodeMap = new HashMap<>();
。。。
}
@Override
public Watch sync() {
isSync = false;
try {
EtcdKeysResponse response = etcdClient.get().get(directory).dir().recursive().sorted().send().get();
currentIndex = response.etcdIndex;
// 初始加载diretcory下所有节点,加载到nodeMap中
if(response.node.dir){
for(EtcdNode etcdNode : response.node.nodes){
loadEtcdNode(etcdNode, currentIndex);
}
}
//同步前后nodemap中的数据变化需要通知上层
nodeMap = nodeMap.entrySet().stream().filter((entry) ->{
// etcdIndex相同数据无变化
if(entry.getValue().loadIndex == currentIndex){
return true;
}
//nodeMap etcdIndex不一样,说明在etcd中已不存在该节点,统一removed处理。
EtcdValue etcdValue = entry.getValue();
EtcdEvent event = EtcdEvent.builder()
.withType(EtcdEvent.Type.removed)
.withPreValue(etcdValue.value)
.withIndex(currentIndex)
.withKey(entry.getKey())
.build();
fireEvent(eventHandler, event);
return false;
}).collect(Collectors.toMap(e -> e.getKey(), e->e.getValue()));
isSync = true;
} catch (EtcdException e) {
currentIndex = e.index;
// key不存在,说明已被删除,nodeMap中所有都要清理,并通知上层removed
if(e.errorCode == EtcdErrorCode.KeyNotFound){
logger.debug("directory:{} watch sync KeyNotFound , will clear all value and fire removed event.",directory);
for (Map.Entry> entry : nodeMap.entrySet()){
EtcdValue etcdValue = entry.getValue();
EtcdEvent event = EtcdEvent.builder()
.withType(EtcdEvent.Type.removed)
.withPreValue(etcdValue.value)
.withIndex(currentIndex)
.withKey(entry.getKey())
.build();
fireEvent(eventHandler, event);
}
nodeMap.clear();
isSync = true;
}else{
logger.warn("directory watch sync EtcdException:", e);
}
} catch (IOException | TimeoutException | EtcdAuthenticationException e) {
logger.error(format("failed to sync watch for directory %s", directory), e);
}catch (Throwable t){
logger.error("sync watch for director " + directory + " error.", t);
}
return this;
}
@Override
protected Optional buildEvent(ObjectMapper mapper, String directory, EtcdKeysResponse response) {
EtcdNode nodeValue = response.node;
EtcdNode preNodeValue = response.prevNode;
if(nodeValue == null && preNodeValue == null){
return Optional.empty();
}
long etcdIndex = response.node.modifiedIndex;
switch (response.action){
case create:
case set:
if(filterValueChange(nodeValue, preNodeValue)){
return Optional.of(EtcdEvent.builder()
.withType(preNodeValue == null ? EtcdEvent.Type.added : EtcdEvent.Type.updated)
.withValue(nodeValue)
.withPreValue(preNodeValue)
.withIndex(etcdIndex)
.withKey(removeDirectory(directory, nodeValue.key))
.build());
}
break;
case expire:
case delete:
case compareAndDelete:
return Optional.of(EtcdEvent.builder()
.withType(EtcdEvent.Type.removed)
.withValue(nodeValue)
.withPreValue(preNodeValue)
.withIndex(etcdIndex)
.withKey(removeDirectory(directory, response.prevNode.key))
.build());
case compareAndSwap:
case update:
return Optional.of(EtcdEvent.builder()
.withType(EtcdEvent.Type.updated)
.withValue(nodeValue)
.withPreValue(preNodeValue)
.withIndex(etcdIndex)
.withKey(removeDirectory(directory, response.node.key))
.build());
default:
break;
}
return Optional.empty();
}
@Override
public Watch accept(EtcdKeysResponse response) {
//判断是否属于该watch范围
if(!keyMatcher.matcher(key(response)).matches()){
return this;
}
//构建事件通知上层EtcdEventHandler
long responseIndex = indexResponse(response);
if(currentIndex < responseIndex){
buildEvent(null, directory, response).ifPresent(etcdEvent -> nodeMap.compute(etcdEvent.getKey(), (key, entry) ->{
switch (etcdEvent.getType()){
case added:
case updated:
fireEvent(eventHandler, etcdEvent);
return new EtcdValue((EtcdNode) etcdEvent.getValue(), etcdEvent.getIndex(), response.etcdIndex);
case removed:
// 目录删除,需要清理子节点
if(((EtcdNode)etcdEvent.getValue()).isDir()){
Iterator keyIterator = nodeMap.keySet().iterator();
while (keyIterator.hasNext()){
if(keyIterator.next().startsWith(key)){
keyIterator.remove();
}
}
}
fireEvent(eventHandler, etcdEvent);
return null;
default:
throw new IllegalStateException("unknown event type "+etcdEvent.getType().name());
}
}));
currentIndex = responseIndex;
}else{
logger.debug("filtering event for {}", key(response));
logger.debug("response index {} current index {}", responseIndex, currentIndex);
}
return this;
}
本文来自网易云社区,经作者乔安然授权发布
网易云免费体验馆,0成本体验20+款云产品!
更多网易研发、产品、运营经验分享请访问网易云社区。