提交 b61f590d 编写于 作者: Z zhangsan

The commit code in ClusterZKDataMonitor's 'process' method,fixed the collector...

The commit code in ClusterZKDataMonitor's 'process' method,fixed the collector online/offline bug. By 'client.getChildren\(event.getPath\(\), true\);', collector can't get the deleted node. In fact,ClusterZKDataMonitor listening zookeeper 'onNodeChanged' event by use 'if \(listeners.containsKey\(event.getPath\(\)\)\)'.
上级 7834d7bd
......@@ -4,6 +4,8 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
......@@ -46,22 +48,30 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
List<String> paths;
try {
paths = client.getChildren(event.getPath(), true);
ClusterDataListener listener = listeners.get(event.getPath());
Set<String> remoteNodes = new HashSet<String>();
Set<String> notifiedNodes = listener.getAddresses();
if (CollectionUtils.isNotEmpty(paths)) {
for (String serverPath : paths) {
Stat stat = new Stat();
byte[] data = client.getData(event.getPath() + "/" + serverPath, true, stat);
String dataStr = new String(data);
if (stat.getCzxid() == stat.getMzxid()) {
String addressValue = serverPath + dataStr;
remoteNodes.add(addressValue);
if (!notifiedNodes.contains(addressValue)) {
logger.info("path children has been created, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).addAddress(serverPath + dataStr);
listeners.get(event.getPath()).serverJoinNotify(serverPath + dataStr);
} else {
logger.info("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).removeAddress(serverPath + dataStr);
listeners.get(event.getPath()).serverQuitNotify(serverPath + dataStr);
listener.addAddress(addressValue);
listener.serverJoinNotify(addressValue);
}
}
}
for (String address : notifiedNodes) {
if (remoteNodes.isEmpty() || !remoteNodes.contains(address)) {
logger.info("path children has been changed, path and data: {}", event.getPath() + "/" + address);
listener.removeAddress(address);
listener.serverQuitNotify(address);
}
}
} catch (ZookeeperClientException e) {
logger.error(e.getMessage(), e);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册