博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于zookeeper的MySQL主主负载均衡的简单实现
阅读量:4561 次
发布时间:2019-06-08

本文共 14728 字,大约阅读时间需要 49 分钟。

1.先上原理图

2.说明

两个mysql采用主主同步的方式进行部署。

在安装mysql的服务器上安装客户端(目前是这么做,以后想在zookeeper扩展集成),客户端实时监控mysql应用的可用性,可用时想zookeepercreateNode,当网络不可用或者mysql应用不可用时,建立的znode消失。

在客户端,通过改造proxool数据库连接池的方式,在建立连接之前,从zookeeper中去取真实的数据库URL,如果有多个URL,即有多个服务时,采用随机算法去拿连接(以后准备扩展权重)。当连接不可用时,数据库连接池将重建连接,这时候又回去zookeeper拿连接,因为agent建立的临时znode消失了,就不能拿到已经失效的url了。

这个方案只是初步的实验和实现了,还有很多后续的问题,主要为了解决lvs+keepalived只能在同一个区域内的问题。

3.部分实现

  1).agent

  

/** * 数据库可用性检测 * @author tomsnail * @date 2015年4月3日 上午10:11:51 */public class TestMySQL {    public static boolean test(String url){                 Connection conn = null;         Statement stmt = null;         ResultSet rs  = null;         String sql = ConfigHelp.getLocalConifg("jdbc_inventory.house-keeping-test-sql", "select 0");            try {                Class.forName(ConfigHelp.getLocalConifg("jdbc_inventory.driver-class", "com.mysql.jdbc.Driver"));// 动态加载mysql驱动                conn = DriverManager.getConnection(url);                stmt = conn.createStatement();                rs = stmt.executeQuery(sql);                while (rs.next()) {                }                return true;            } catch (SQLException e) {                e.printStackTrace();            } catch (Exception e) {                e.printStackTrace();            } finally {                try {                    if(rs!=null){                        rs.close();                    }                    if(stmt!=null){                        stmt.close();                    }                    if(conn!=null)                        conn.close();                } catch (SQLException e) {                    e.printStackTrace();                }            }        return false;    }}
/** * zookeeper客户端 * @author tomsnail * @date 2015年4月3日 上午10:11:51 */public class TestServer {    private static final Logger logger = LoggerFactory            .getLogger(TestServer.class);    private static ZooKeeper zk;        private String path;    //同步锁    private Lock _lock = new ReentrantLock();        // 用于等待 SyncConnected 事件触发后继续执行当前线程    private CountDownLatch latch = new CountDownLatch(1);        public TestServer() {        zk = connectServer();        new Thread(new Runnable() {            @Override            public void run() {                while (true) {                    try {                        Thread.currentThread().sleep(3000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    //logger.info("check zk...");                    _lock.lock();                    if (zk != null) {                        if (zk.getState().isAlive()                                && zk.getState().isConnected()) {                            //logger.info("zk is ok");                            _lock.unlock();                            continue;                        }                    }                    close();                    logger.info("reConnectServer ...");                    zk = connectServer();                    logger.info("reConnectServer ok");                    _lock.unlock();                }            }            private void close() {                if(zk!=null){                    try {                        zk.close();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    zk = null;                }            }        }).start();    }    // 连接 ZooKeeper 服务器    private ZooKeeper connectServer() {        ZooKeeper zk = null;        try {            zk = new ZooKeeper(ConfigHelp.ZK_CONNECTION_STRING,                    ConfigHelp.ZK_SESSION_TIMEOUT, new Watcher() {                        @Override                        public void process(WatchedEvent event) {                            if (event.getState() == Event.KeeperState.SyncConnected) {                                latch.countDown(); // 唤醒当前正在执行的线程                            }                        }                    });            latch.await(); // 使当前线程处于等待状态        } catch (Exception e) {            logger.error("", e);        }        if (zk != null) {            try {                Stat stat = zk.exists(ConfigHelp.ZK_ROOT_PATH, false);                if (stat == null) {                    String path = zk.create(ConfigHelp.ZK_ROOT_PATH,                            "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,                            CreateMode.PERSISTENT); // 创建一个临时性且有序的 ZNode                    logger.info("create zookeeper node ({})", path);                }                stat = zk.exists(ConfigHelp.ZK_RMI_PATH, false);                if (stat == null) {                    String path = zk.create(ConfigHelp.ZK_RMI_PATH,                            "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,                            CreateMode.PERSISTENT); // 创建一个临时性且有序的 ZNode                    logger.info("create zookeeper node ({})", path);                }            } catch (Exception e) {                e.printStackTrace();            }        }        return zk;    }    // 创建 ZNode    public void createNode(String url) {        _lock.lock();        try {            byte[] data = url.getBytes();            path = zk.create(ConfigHelp.ZK_RMI_PATH + "/", data,                    ZooDefs.Ids.OPEN_ACL_UNSAFE,                    CreateMode.EPHEMERAL_SEQUENTIAL); // 创建一个临时性且有序的 ZNode            logger.info("create zookeeper node ({} => {})", path, url);        } catch (Exception e) {            logger.error("", e);            e.printStackTrace();        }        _lock.unlock();    }        public void deleteNode(String url){        _lock.lock();        try {            Stat stat = zk.exists(path, false);            if(stat!=null){                zk.delete(url, stat.getVersion());            }        } catch (Exception e) {            e.printStackTrace();        }        _lock.unlock();    }}
/** * 数据库检测测试主类 * @author tomsnail * @date 2015年4月3日 上午10:11:51 */public class TestMain {        private static TestServer testServer = new TestServer();    public static void main(String[] args) {        String url = ConfigHelp.getLocalConifg("jdbc_inventory.driver-url", "select 0");        boolean isOK = false;        while(true){            if(TestMySQL.test(url)){                if(isOK){                                    }else{                    testServer.createNode(url);//建立znode                }                isOK = true;            }else{                isOK = false;                testServer.deleteNode(url);//删除znode            }                        try {                Thread.currentThread().sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}

 

  2).proxool

/** * zookeeper信息定义类 * @author tomsnail * @date 2015年4月2日 下午6:49:13 */public class ZkInfoDefinition {        public static final String PREFIX_ZK = "zookeeper";        public static final String ZK_URL = "zkUrl";        public static final String ZK_SESSION_TIMEOUT = "sessionTimeout";        public static final String ZK_PATH = "zkPath";        public static final String ZK_ENABLE = "zkEnable";    public static String zkUrl="192.168.102.1:31315";        public static int sessionTimeout = 5000;        public static boolean isEnable = false;        public static String zkPath = "/root/db";    public String getZkUrl() {        return zkUrl;    }    public void setZkUrl(String zkUrl) {        this.zkUrl = zkUrl;    }    public int getSessionTimeout() {        return sessionTimeout;    }    public void setSessionTimeout(int sessionTimeout) {        this.sessionTimeout = sessionTimeout;    }    public String getZkPath() {        return zkPath;    }    public void setZkPath(String zkPath) {        this.zkPath = zkPath;    }    public ZkInfoDefinition(String zkUrl, int sessionTimeout, String zkPath) {        super();        this.zkUrl = zkUrl;        this.sessionTimeout = sessionTimeout;        this.zkPath = zkPath;    }    public ZkInfoDefinition(){            }}
/** * zookeeper客户端 * @author tomsnail * @date 2015年4月3日 上午10:15:11 */public class ZkClient {       private static final Logger logger = LoggerFactory.getLogger(ZkClient.class);               // 用于等待 SyncConnected 事件触发后继续执行当前线程        private CountDownLatch latch = new CountDownLatch(1);             // 定义一个 volatile 成员变量,用于保存最新的 RMI 地址(考虑到该变量或许会被其它线程所修改,一旦修改后,该变量的值会影响到所有线程)        private volatile List
dataList = new ArrayList
(); private Lock _lock = new ReentrantLock(); private static ZooKeeper zk; private LBUrl lbUrl; public ZkClient(){ this(new BasicLBUrl()); } // 构造器 public ZkClient(LBUrl lbUrl) { this.lbUrl = lbUrl; zk = connectServer(); // 连接 ZooKeeper 服务器并获取 ZooKeeper 对象 watchNode(); new Thread(new Runnable() { @Override public void run() { while (true) { try { Thread.currentThread().sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } _lock.lock(); if (zk != null) { if (zk.getState().isAlive() && zk.getState().isConnected()) { _lock.unlock(); continue; } } if(zk!=null){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } zk = null; } zk = connectServer(); _lock.unlock(); } } }).start(); } // 查找 URL 服务 public String getUrl() { if (dataList!=null&&dataList.size()>0) { return this.lbUrl.getUrl(dataList); } return null; } public List
getUrls(){ return dataList; } // 连接 ZooKeeper 服务器 private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(ZkInfoDefinition.zkUrl, ZkInfoDefinition.sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); // 唤醒当前正在执行的线程 } } }); latch.await(); // 使当前线程处于等待状态 } catch (Exception e) { logger.error("", e); } return zk; } // 观察 /registry 节点下所有子节点是否有变化 private void watchNode() { _lock.lock(); if(zk!=null&&zk.getState().isAlive()&&zk.getState().isConnected()){ }else{ if(zk!=null){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } zk = null; } zk = connectServer(); } try { List
nodeList = zk.getChildren(ZkInfoDefinition.zkPath, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(); // 若子节点有变化,则重新调用该方法(为了获取最新子节点中的数据) } } }); List
dataList = new ArrayList
(); // 用于存放 /registry 所有子节点中的数据 for (String node : nodeList) { byte[] data = zk.getData(ZkInfoDefinition.zkPath + "/" + node, false, null); // 获取 /registry 的子节点中的数据 dataList.add(new String(data)); } logger.debug("node data: {}", dataList); this.dataList = dataList; } catch (Exception e) { logger.error("", e); } _lock.unlock(); } public static void main(String[] args) { ZkClient client = new ZkClient(); System.out.println(client.getUrl()); }}
View Code
/** * 从zookeeper获得URL连接操作类 * @author tomsnail * @date 2015年4月2日 下午6:56:06 */public class ZkUrlOperation {        private static final ZkUrlOperation instance = new ZkUrlOperation();    private static ZkInfoDefinition zkInfoDefinition;        private static ZkClient zkClient;        private static final byte[] _lock = new byte[0];        private  ZkUrlOperation(){            }        public static ZkUrlOperation getInstance(){        return instance;    }        public  void addZkInfoDefinition(ZkInfoDefinition zkInfoDefinition){        ZkUrlOperation.zkInfoDefinition = zkInfoDefinition;    }        public  void addZkInfoDefinition(String key,String value){        if(ZkUrlOperation.zkInfoDefinition==null){            ZkUrlOperation.zkInfoDefinition = new ZkInfoDefinition();        }        if(key.contains(ZkInfoDefinition.ZK_PATH)){            ZkUrlOperation.zkInfoDefinition.setZkPath(value);        }        if(key.contains(ZkInfoDefinition.ZK_SESSION_TIMEOUT)){            ZkUrlOperation.zkInfoDefinition.setSessionTimeout(Integer.valueOf(value));;        }        if(key.contains(ZkInfoDefinition.ZK_URL)){            ZkUrlOperation.zkInfoDefinition.setZkUrl(value);;        }        if(key.contains(ZkInfoDefinition.ZK_ENABLE)){            ZkUrlOperation.zkInfoDefinition.isEnable = Boolean.valueOf(value);        }    }            public String getUrl(){        synchronized (_lock) {            if(zkInfoDefinition.isEnable){                if(zkClient==null){                    zkClient = new ZkClient();                }                                String url = zkClient.getUrl();                return url;            }else{                return "";            }                    }                    }        public boolean isAvailUrl(String url){        synchronized (_lock) {            if(zkInfoDefinition.isEnable){                if(zkClient==null){                    zkClient = new ZkClient();                }                List
urls = zkClient.getUrls(); for(int i=0;i
View Code

 

转载于:https://www.cnblogs.com/TomSnail/p/4389297.html

你可能感兴趣的文章
数据库事务
查看>>
xe7 控件升级
查看>>
TFrame bug
查看>>
刚学习的如何才能自信的拍美美的婚纱照呢(要结婚啦)
查看>>
M51文件注释
查看>>
关于临界资源访问互斥量的死锁问题
查看>>
django-view层
查看>>
异步加载JS的方法。
查看>>
golang-gorm框架支持mysql json类型
查看>>
【tool】白盒测试
查看>>
Linux 下的 scp
查看>>
理解同步,异步和延迟脚本
查看>>
Checklist: 2019 05.01 ~ 06.30
查看>>
Binary XML file : Error inflating class com.esri.android.map.MapView
查看>>
grep,awk和sed
查看>>
.NET Core WebAPI IIS 部署问题
查看>>
SystemTap 静态探针安装包
查看>>
redis五种数据类型的使用
查看>>
浏览器全屏之requestFullScreen全屏与F11全屏
查看>>
软件包管理:rpm命令管理-安装升级与卸载
查看>>