Zookeeper的原理和使用——基于java的客户端使用

Zookeeper的原理和使用——基于java的客户端使用

Scroll Down

1.zookeeper基于java的客户端使用

1.导入相关依赖

 	<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.9</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.6.1</version>
        </dependency>

2.创建连接

package life.xp.distributed.zkdemo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @ClassName: ZkAPI
 * @Description: zookeeper java基本api
 * @author: XP
 * @data: 2020/3/22
 */
public class ZkAPI implements Watcher{
    public static final String ADDRESS = "IP:2181";
    public static final CountDownLatch countDownLatch=new CountDownLatch(1);
    public static void main(String[] args) throws InterruptedException {
        ZooKeeper zooKeeper = null;
        try {
            //创建连接
             zooKeeper = new ZooKeeper(ADDRESS,5000,new ZkAPI());
            //让主线程等待zookeeper的watcher响应
            countDownLatch.await();
            //连接状态
            System.out.println(zooKeeper.getState());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            zooKeeper.close();
        }
    }

    public void process(WatchedEvent watchedEvent) {

        //如果收到了服务端的响应事件,连接成功
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            System.out.println("进入watcher");
            countDownLatch.countDown();
        }
    }
}

image.png

3.对节点的增删改查

1.新增

//创建节点  节点名称 节点值 权限 节点类型
zooKeeper.create("/java","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

image.png

2.查询

//节点状态
Stat stat = new Stat();
//获取数据 节点 watcher 节点状态
byte[] data = zooKeeper.getData(java.lang.String.valueOf(path), null, stat);
System.out.println(new String(data));

image.png

3.修改

Stat stat = new Stat();
zooKeeper.setData(path,"888".getBytes(),stat.getVersion());
byte[] data = zooKeeper.getData(java.lang.String.valueOf(path), null, stat);
System.out.println(new String(data));       

image.png

4.删除

Stat stat = new Stat();
stat.setVersion(1);
zooKeeper.delete(path,stat.getVersion());

image.png

4.zookeeper的watcher机制

image.png
zookeeper的watcher监听机制:我们基于zookeeper节点的事件操作(增删改),可以绑定watcher对对应事件进行监听,做出对应的操作
watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。(watcher 是一次性的操作)。可以通过循环监听去达到永久监听效果
注册watcher事件:

getData、Exists、getChildren

触发watcher事件:

create、delete、setData

watcher事件的类型

None (-1), 客户端链接状态发生变化的时候,会收到 none 的事件
NodeCreated (1), 创建节点的事件。
NodeDeleted (2), 删除节点的事件
NodeDataChanged (3), 节点数据发生变更
NodeChildrenChanged (4); 子节点被创建、被删除、会发生事件触发

绑定事件:

getData、Exists、getChildren

1.通过exists绑定path

package life.xp.distributed.zkdemo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

/**
 * @ClassName: ZkAPI
 * @Description: zookeeper java基本api
 * @author: XP
 * @data: 2020/3/22
 */
public class ZkAPIWatch implements Watcher{
    public static final String ADDRESS = "IP:2181";
    public static final String path ="/java";
    public static final CountDownLatch countDownLatch=new CountDownLatch(1);
    public static void main(String[] args) throws InterruptedException {
        ZooKeeper zooKeeper = null;
        try {
            //创建连接
             zooKeeper = new ZooKeeper(ADDRESS,5000,new ZkAPIWatch());

            zooKeeper.exists(path,new ZkAPIWatch());
             //创建节点  节点名称 节点值 权限 节点类型
            zooKeeper.create("/java","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//            节点状态

            //让主线程等待zookeeper的watcher响应
            countDownLatch.await();
            //连接状态
            System.out.println(zooKeeper.getState());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            zooKeeper.close();
            System.out.println(zooKeeper.getState());
        }
    }

    public void process(WatchedEvent watchedEvent) {

        //如果收到了服务端的响应事件,连接成功
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            System.out.println("进入watcher");
            System.out.println(watchedEvent.getType());
        }

        //
        if(Event.EventType.NodeCreated==watchedEvent.getType()){
            System.out.println("节点创建");
            countDownLatch.countDown();
        }
        if(Event.EventType.NodeDataChanged==watchedEvent.getType()){
            System.out.println("数据修改");
            countDownLatch.countDown();
        }
        if(Event.EventType.NodeDeleted==watchedEvent.getType()){
            System.out.println("节点删除");
            countDownLatch.countDown();
        }
        if(Event.EventType.NodeChildrenChanged==watchedEvent.getType()){
            System.out.println("子节点创建");
            countDownLatch.countDown();
        }
    }
}

效果
image.png

2.通过getData绑定path

package life.xp.distributed.zkdemo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

/**
 * @ClassName: ZkAPI
 * @Description: zookeeper java基本api
 * @author: XP
 * @data: 2020/3/22
 */
public class ZkAPIWatch implements Watcher{
    public static final String ADDRESS = "IP:2181";
    public static final String path ="/java";
    public static final CountDownLatch countDownLatch=new CountDownLatch(1);
    public static ZooKeeper zooKeeper = null;
    public static void main(String[] args) throws InterruptedException {

        try {
            //创建连接
             zooKeeper = new ZooKeeper(ADDRESS,5000,new ZkAPIWatch());

//            zooKeeper.exists(path,new ZkAPIWatch());

             //创建节点  节点名称 节点值 权限 节点类型
            zooKeeper.create(path,"0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            zooKeeper.getData(path,new ZkAPIWatch(),new Stat());
            //getData应该是与节点的不同线程操作
            zooKeeper.setData(path,"666".getBytes(),-1);

            //让主线程等待zookeeper的watcher响应
            countDownLatch.await();
            //连接状态
            System.out.println(zooKeeper.getState());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            zooKeeper.close();
            System.out.println(zooKeeper.getState());
        }
    }

    public void process(WatchedEvent watchedEvent) {

        //如果收到了服务端的响应事件,连接成功
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            System.out.println("进入watcher");
            System.out.println(watchedEvent.getType());
        }

        //
        if(Event.EventType.NodeCreated==watchedEvent.getType()){
            System.out.println("节点创建");
            countDownLatch.countDown();
        }
        if(Event.EventType.NodeDataChanged==watchedEvent.getType()){
            System.out.println("数据修改");
            countDownLatch.countDown();
        }
        if(Event.EventType.NodeDeleted==watchedEvent.getType()){
            System.out.println("节点删除");
            countDownLatch.countDown();
        }
        if(Event.EventType.NodeChildrenChanged==watchedEvent.getType()){
            System.out.println("子节点创建");
            countDownLatch.countDown();
        }
    }

}

image.png

3.通过getChildren

监听children的事件变化

package life.xp.distributed.zkdemo;

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;

/**
 * @ClassName: ZkAPI
 * @Description: zookeeper java基本api
 * @author: XP
 * @data: 2020/3/22
 */
public class ZkAPIWatchChildren implements Watcher{
    public static final String ADDRESS = "IP:2181";
    public static final String path ="/java/test";
    public static final CountDownLatch countDownLatch=new CountDownLatch(1);
    public static ZooKeeper zooKeeper = null;
    public static void main(String[] args) throws InterruptedException {

        try {
            //创建连接
             zooKeeper = new ZooKeeper(ADDRESS,5000,new ZkAPIWatchChildren());

//            zooKeeper.exists(path,new ZkAPIWatch());
            zooKeeper.getChildren("/java",new ZkAPIWatchChildren());
             //创建节点  节点名称 节点值 权限 节点类型
            zooKeeper.create(path,"0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            //让主线程等待zookeeper的watcher响应
            countDownLatch.await();
            //连接状态
            System.out.println(zooKeeper.getState());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            zooKeeper.close();
            System.out.println(zooKeeper.getState());
        }
    }

    public void process(WatchedEvent watchedEvent) {

        //如果收到了服务端的响应事件,连接成功
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            System.out.println("进入watcher");
            System.out.println(watchedEvent.getType());
        }

        //
        if(Event.EventType.NodeCreated==watchedEvent.getType()){
            System.out.println("节点创建");
            countDownLatch.countDown();
        }
        if(Event.EventType.NodeDataChanged==watchedEvent.getType()){
            System.out.println("数据修改");
            countDownLatch.countDown();
        }
        if(Event.EventType.NodeDeleted==watchedEvent.getType()){
            System.out.println("节点删除");
            countDownLatch.countDown();
        }
        if(Event.EventType.NodeChildrenChanged==watchedEvent.getType()){
            System.out.println("子节点创建");
            countDownLatch.countDown();
        }
    }

}

效果:
image.png

5.基于zookeeper的分布式锁

分布式锁:解决跨进程之间的资源共享的问题,防止同一时间有其他应用操作同一个资源而出现的数据问题,常见场景:秒杀,抢红包
zookeeper分布式锁原理:利用节点的唯一性和有序性,简单的说:就是需要争抢资源的统一在一个目录下创建有序的临时节点(防止惊群效应),然后判断自己是否是这群节点里面最小的,如果是则相当于拿到了锁,进行资源的操作,使用完,删除节点(释放锁);其他不是最小的则进行等待,然后通过watcher监听比自己小的节点的改变,继续去判断。
简单的例子:

package life.xp.distributed.zkdemo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @ClassName: ZkLock
 * @Description: 简单的分布式锁
 * @author: XP
 * @data: 2020/3/22
 */
public class ZkLock implements Lock, Watcher {
    private static final String ADDRESS = "103.40.23.38:2181";
    private static final String ROOT_LOCK ="/locks";
    private static ZooKeeper zooKeeper = null;
    private byte[] defaultValue="0".getBytes();
    /**等待的前一个锁*/
    private String WAIT_LOCK;
    /**当前的锁*/
    private String CURRENT_LOCK;

    private CountDownLatch countDownLatch;
    public ZkLock() {
        //初始化Zk
            try {
                zooKeeper= new ZooKeeper(ADDRESS,5000,this);
                Stat exists = zooKeeper.exists(ROOT_LOCK, false);
                if(exists==null){
                    //如果没有 锁目录就创建
                    zooKeeper.create(ROOT_LOCK,defaultValue, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }

    }

    @Override
    public void lock() {
        //尝试获取锁
        if(this.tryLock()){
            System.out.println(Thread.currentThread().getName()+"--->获取锁成功,编号为"+CURRENT_LOCK);
            return;
        }
        //没有获取锁需要进行监听
        try {
            waitLock(WAIT_LOCK);
            System.out.println(Thread.currentThread().getName()+"执行了到这儿");
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    /**
     * @Author xp
     * @Description 设置监听阻塞
     * @Date 23:52 2020/3/22
     * @Param [wait_lock]
     * @return void
     **/
    private boolean waitLock(String wait_lock) throws KeeperException, InterruptedException {
        Stat stat=zooKeeper.exists(wait_lock,true);
        //监听当前节点的上一节点
        if(stat!=null){
            System.out.println(Thread.currentThread().getName()+"--->等待锁编号为"+wait_lock+"释放");
            countDownLatch=new CountDownLatch(1);
            countDownLatch.await();
            System.out.println(Thread.currentThread().getName()+"--->获取锁成功");
        }
        return true;

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        try {
            //1.创建临时节点目录
            CURRENT_LOCK = zooKeeper.create(ROOT_LOCK + "/", defaultValue, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName()+"--->尝试获取锁,编号为"+CURRENT_LOCK);
            //2.拿到zookeeper所目录下的子节点集合
            List<String> childrens = zooKeeper.getChildren(ROOT_LOCK, false);
            //3.将节点进行排序,判断最小的是否为自己的目录
            SortedSet<String> sortedSet=new TreeSet<String>();

            for (String children: childrens){
                sortedSet.add(ROOT_LOCK+"/"+children);
            }
            String minPath = sortedSet.first();
            //4.如果不是则将上个路径存入 为了监听
            SortedSet<String> lessLock = sortedSet.headSet(CURRENT_LOCK);
            if(CURRENT_LOCK.equals(minPath)){
                return true;
            }
            if(!lessLock.isEmpty()){
                WAIT_LOCK=lessLock.last();
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        System.out.println(Thread.currentThread().getName()+"--->释放锁"+CURRENT_LOCK);
        try {
            //释放锁
            zooKeeper.delete(CURRENT_LOCK,-1);
            CURRENT_LOCK=null;
            zooKeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    @Override
    public Condition newCondition() {
        return null;
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println(Thread.currentThread().getName()+"进入watcher");
        //如果释放锁 则释放线程去获取
        if (this.countDownLatch!=null){
            this.countDownLatch.countDown();
        }
    }
}


测试:

package life.xp.distributed.zkdemo;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

class TestMyZkLock{
    public static void main(String[] args) throws IOException {
        CountDownLatch countDownLatch=new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    //模拟10个应用同时操作资源
                    countDownLatch.await();
                    ZkLock zkLock = new ZkLock();
                    zkLock.lock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            },"Thread-"+i).start();
            countDownLatch.countDown();
        }
        System.in.read();

    }
}

6.总结

zookeeper基于java的api也是比较人性化和简单的,主要特性是watcher机制,简单的来说就是一个类似事件的发布与订阅的方式,通知客户端节点的事务改变,基于这个我们可以实现分布式锁,利用的是有序(防止惊群)和临时(防止死锁)的分布式锁,是监听上个节点的变化,最小的获取锁。