本文共 8710 字,大约阅读时间需要 29 分钟。
在网上看到太多千篇一律的zookeeper相关的文章,都是定义,没有一个是有完整代码的,这对自己学习zk十分困难,其实要用zk实现主备切换、负载均衡其实没有自己想象的那么难,只需要了解zk的基本特性即可。在这里贴上自己写的代码与自己的理解,大家多多指教! 一、思路 软负载说简单点,就是将Client端的请求均匀的分配到不同 的server端。下面我们来说说实现的基本思路: 注册:首先你需要确定一个父节点,在这里父节点的名称暂且就叫/parentNode;每个server端启动时首先向zk的集群的父节点/parentNode下去注册一个临时的子节点,这样当有N台server时,注册的子节点就是/parentNode/server1、/parentNode/server2.........、/parentNode/serverN。节点的数据就存每台server的ip 与port,这样你服务端不管是用rmi协议还是http协议,都可以向这个服务器发送请求了。做完这些你就成功了一半,继续加油哦~ 获取服务列表:在Client端实现轮询分发的功能,实现Watcher接口,这会让你实时的监控服务端的变化。首先去获取父节点/parentNode下所有的子节点,得到之前存的ip与port,然后将这些列表缓存到一map中,这里就叫serverUrlCacheMap。由于实现了Watcher接口,当父节点发生变化时zk 的集群会通知Client端,此时Client端只要重新获取父节点下所有子节点的数据,重新缓存即可 轮询分发:定义一个全局变量index ,每次发起请求时,直接去serverUrlCacheMap中获取这个编号的URL,然后发送给server,就可达到轮询分发的功能。 二、代码示例 说了这么多,相信大家有点思路了,不明白也没关系,下面的代码会给大家解释的很详细,在这里你需要安装配置一个zk的集群。网上随便搜一搜都有好多资料 服务端: package com.newcosoft.lsmp.bank.server; import java.rmi.Naming; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.newcosoft.lsmp.bank.client.config.BankConfig; import com.newcosoft.lsmp.bank.client.constant.BankConstant; import com.newcosoft.lsmp.bank.client.zookeeper.BankZookHelper; import com.newcosoft.lsmp.bank.server.biz.front.rmi.RmiService; import com.newcosoft.lsmp.bank.server.config.BankDatabaseConfig; import com.newcosoft.lsmp.bank.server.util.DefaultBeanFactory; import com.newcosoft.util.StringUtils; public class LsmpBankStart { private static Logger logger = LoggerFactory.getLogger(LsmpBankStart.class); private ZooKeeper zk; public static void main(String[] args) { try { // 启动时,先注册zk节点,重点在这里 new LsmpBankStart().registerZook(args[0], args[1]); // 以下为你项目启动的核心代码,这里我项目的server端使用的是rmi协议,可以不用看了 DefaultBeanFactory bf = DefaultBeanFactory .getInstance("/spring/springContext_lsmp_bank.xml"); RmiService rmiService = (RmiService) bf.getBean("business"); int rmiPort = Integer.valueOf(args[1]); createRegistry(rmiPort); System.setProperty("java.rmi.server.hostname", args[0]); UnicastRemoteObject.exportObject(rmiService, rmiPort); String rmiUrl = "rmi://" + args[0] + ":" + rmiPort + "/" + args[2]; Naming.rebind(rmiUrl, rmiService); } catch (Exception e) { logger.error("LsmpBankStart failed!", e); System.exit(1); } } // 初始化zk的连接 private void initZk() { try { if (zk == null || !zk.getState().isAlive()) { synchronized (this) { if (zk != null) { zk.close(); } // 重新建立连接 zk = new ZooKeeper(BankConfig.getInstance().getZookURL(), BankConstant.HA_SESSION_TIMEOUT, this); while (zk.getState() != ZooKeeper.States.CONNECTED) { Thread.sleep(3000); } } } } catch (Exception e) { logger.error("zk初始化连接异常:" + e.toString()); } } private void registerZook(String ip, String port) { // 初始化zk initZk(); // 父节点路径 String parentNode = BankConfig.getInstance().getBankServerParentNode(); String[] nodeList = parentNode.split("/"); String nodePath = ""; // 循环创建持久父节点 for (String node : nodeList) { if (!StringUtils.isEmpty(node)) { nodePath = nodePath + "/" + node; BankZookHelper.createNode(zk, nodePath, node, CreateMode.PERSISTENT); } } // 创建临时子节点 //BankZookHelper为封装的zookeeper一些基础API的类 BankZookHelper.createNode(zk, nodePath + "/" + ip, ip + ":" + port, CreateMode.EPHEMERAL); } } 客户端: package com.newcosoft.lsmp.bank.client.zookeeper; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.newcosoft.lsmp.bank.client.config.BankConfig; import com.newcosoft.lsmp.bank.client.constant.BankConstant; import com.newcosoft.lsmp.bank.client.rmi.BankServerRMIClient; public class LoadBalanceZookHelper implements Watcher { private static Logger logger = LoggerFactory .getLogger(LoadBalanceZookHelper.class); private ZooKeeper zk; // 可用的服务列表 private List serverUrlList; // 获取service 的索引 private AtomicInteger index = new AtomicInteger(0); // 缓存的BankServerRMIClient private Map bankServerRMIClientCacheMap = new ConcurrentHashMap(); public LoadBalanceZookHelper() { // 初始化zk initZk(); cacheBankServerRMIClients(); } private static class SingletonHolder { static final LoadBalanceZookHelper INSTANCE = new LoadBalanceZookHelper(); } public static LoadBalanceZookHelper getInstance() { return SingletonHolder.INSTANCE; } // 轮询分发服务 public BankServerRMIClient distributeServer() { if (index.get() >= Integer.MAX_VALUE) { index.set(0); } int a = index.get() % serverUrlList.size(); // 自增长 index.incrementAndGet(); String url = serverUrlList.get(a); if (logger.isDebugEnabled()) { logger.debug("请求的URL:" + url); } // TODO 删掉syso System.out.println("请求的URL:" + url); return bankServerRMIClientCacheMap.get(url); } // 注册rmi ,并缓存 private void registerRmiAndCache(List serverUrlList) { for (String url : serverUrlList) { String host[] = url.split(":"); BankServerRMIClient bankServerRMIClient = new BankServerRMIClient( host[0], Integer.parseInt(host[1])); bankServerRMIClientCacheMap.put(url, bankServerRMIClient); } } // 初始化,缓存rmi客户端服务 public void cacheBankServerRMIClients() { // 获取到所有的 serverURL serverUrlList = getServerUrlList(BankConfig.getInstance() .getBankServerParentNode()); if (logger.isDebugEnabled()) { logger.debug("重新缓存可用的服务列表,serverUrlList:" + serverUrlList); } if (serverUrlList.size() > 0 && serverUrlList != null) { // 注册并缓存 registerRmiAndCache(serverUrlList); } else { logger.error("可用的服务列表为空"); throw new RuntimeException("可用的服务列表为空"); } } // 获取zk连接 private void initZk() { try { if (zk == null || !zk.getState().isAlive()) { synchronized (this) { if (zk != null) { zk.close(); } // 重新建立连接 zk = new ZooKeeper(BankConfig.getInstance().getZookURL(), BankConstant.HA_SESSION_TIMEOUT, this); while (zk.getState() != ZooKeeper.States.CONNECTED) { Thread.sleep(3000); } } } } catch (Exception e) { logger.error("zk初始化连接异常:" + e.toString()); } } // 获取可用的服务列表 public List getServerUrlList(String parentNodePath) { List serverList = new ArrayList(); try { // 获取所有子节点 List nodePathList = zk.getChildren(parentNodePath, true); if (nodePathList.size() > 0 && nodePathList != null) { for (String hostPath : nodePathList) { hostPath = parentNodePath + "/" + hostPath; // 获取子节点数据URL String data = new String(zk.getData(hostPath, true, null)); serverList.add(data); } } } catch (KeeperException e) { logger.error(e.toString()); } catch (InterruptedException e) { logger.error(e.toString()); } return serverList; } // 本来有个很简单的办法,直接将原来的缓存去掉,然后在重新注册,但是由于注册rmi底层也是用所 tcp/ip协议,由于这个协议比较耗时, 所以使用以下方式。请大家根据实际情况来处理 // 节点发生变化时,重新缓存rmi 客户端服务 ,简单来说就是更新缓存 public void reCacheBankServerRMIClients() { // 将原来的serverURL 赋值给 List oldServerUrlList = serverUrlList; // 获取最新的 serverURL,保留住 serverUrlList = getServerUrlList(BankConfig.getInstance() .getBankServerParentNode()); if (logger.isDebugEnabled()) { logger.debug("子节点发生变化,重新获取的服务的URL:" + serverUrlList); } // 将最新的serverURL赋值给 List newServerUrlList = serverUrlList; List reducedUrlList = new ArrayList(); reducedUrlList.addAll(oldServerUrlList); List incrementUrlList = new ArrayList(); incrementUrlList.addAll(newServerUrlList); // 获取down掉服务的的URL reducedUrlList.removeAll(newServerUrlList); // 获取新增服务的URL incrementUrlList.removeAll(oldServerUrlList); if (reducedUrlList.size() > 0 && reducedUrlList != null) { for (String reducedUrl : reducedUrlList) { // 将down掉的URL的服务从缓存中减掉 bankServerRMIClientCacheMap.remove(reducedUrl); } if (logger.isDebugEnabled()) { logger.debug("去除掉的服务的URL:" + reducedUrlList); } } if (incrementUrlList.size() > 0 && incrementUrlList != null) { // 将新增的URL重新注册rmi 并缓存住 registerRmiAndCache(incrementUrlList); if (logger.isDebugEnabled()) { logger.debug("新增的服务的URL:" + incrementUrlList); } } } @Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.Expired) { logger.debug("触发了回话过期事件"); // 重新连接zk initZk(); // 重新缓存 reCacheBankServerRMIClients(); } // if (event.getState() == KeeperState.SyncConnected) { // logger.debug("触发了断开重连事件"); // // 重新缓存 // reCacheBankServerRMIClients(); // } if (event.getType() == EventType.NodeChildrenChanged) { logger.debug("触发了子节点变化事件"); // 重新缓存 reCacheBankServerRMIClients(); } } public static void main(String[] args) { LoadBalanceZookHelper.getInstance(); } }转载地址:http://abnsi.baihongyu.com/