zookeeper的框架学习和使用——Curator的使用

zookeeper的框架学习和使用——Curator的使用

Scroll Down

Curator的使用

1.依赖导入

	 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.11</version>
        </dependency>

2.连接服务端

1.重连策略

Curator存在RetryPolicy重连策略

RetryForever(int retryIntervalMs)一直重连
RetryOneTime(int sleepMsBetweenRetry)间隔重连 
RetryNTimes(int n, int sleepMsBetweenRetries)重连几次
RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) 重试直到经过指定的时间
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)随着重试次数增加重试时间间隔变大,指数倍增长baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))。
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries)  baseSleepTimeMs初始sleep时间,maxSleepTimeMs最大sleep时间,maxRetries最大重试次数

2.命名空间

Curator会创建命名空间这个节点后,其他的操作都是就基于命名空间。

package life.xp.distributed.curatordemo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

import java.util.concurrent.CountDownLatch;

/**
 * @ClassName: CuratorDemo
 * @Description: 基于Curator的zookeeper操作
 * @author: XP
 * @data: 2020/3/23
 */
public class CuratorDemo {
    public static final String zkServerIps = "IP:2181";
    public static final String path ="/curator/test";
    public static final CountDownLatch countDownLatch=new CountDownLatch(1);
    // Curator客户端
    public static CuratorFramework client = null;
    public static void main(String[] args) throws Exception {
        //重试策略 间隔重试时间为1秒 总共5次
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000, 5);
        //实例化客户端(建造者模式)
        client=CuratorFrameworkFactory.builder()
        //.authorization() 权限访问
        .connectString(zkServerIps)  // 放入zookeeper服务器ip
        .sessionTimeoutMs(10000).retryPolicy(retryPolicy)  // 设定会话时间以及重连策略
        .namespace("workspace").build();  // 设置命名空间以及开始建立连接

        // 启动Curator客户端
        client.start();
        //查看是否连接
        boolean started = client.isStarted();
        System.out.println("当前客户端的状态:" + (started ? "连接中..." : "已关闭..."));

        byte[] data = "this is a test data".getBytes();  // 节点数据
        String result = client.create().creatingParentsIfNeeded()  // 创建父节点,也就是会递归创建
                .withMode(CreateMode.PERSISTENT)  // 节点类型
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)  // 节点的acl权限
                .forPath(path, data);

        System.out.println(result + "节点,创建成功...");

        Thread.sleep(1000);

        // 关闭客户端
        client.close();

        // 获取当前客户端的状态
        started = client.isStarted();
        System.out.println("当前客户端的状态:" + (started ? "连接中..." : "已关闭..."));

    }


}

image.png

image.png

3.增删改查

基本客户端

package life.xp.distributed.curatordemo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.List;

/**
 * 使用curator连接zk
 */
public class CuratorConnect {
    // Curator客户端
    public CuratorFramework client = null;
    // 集群模式则是多个ip
    public static final String zkServerIps = "IP:2181";
    public static final String NAMESPACE ="workspace";
  

    public CuratorConnect() {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

        // 实例化Curator客户端
        client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
                //.authorization() 权限访问
                    .connectString(zkServerIps)  // 放入zookeeper服务器ip
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)  // 设定会话时间以及重连策略
                    .namespace(NAMESPACE).build();  // 设置命名空间以及开始建立连接

        // 启动Curator客户端
        client.start();
    }

    // 关闭zk客户端连接
    private void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

   
}

1.创建节点

client.create().creatingParentsIfNeeded() //创建父节点,也就是会递归创建:
  .withMode(CreateMode.PERSISTENT) //	节点类型
 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //	节点的acl权限
forPath(nodePath, data) //	节点路径 节点数据

示例:

  CuratorConnect curatorConnect = new CuratorConnect();
        String path="/temp";
        byte[] defaultValue="0".getBytes();
        curatorConnect.client.create()
                .creatingParentContainersIfNeeded()
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .withMode(CreateMode.PERSISTENT)
                .forPath(path,defaultValue);
   curatorConnect.client.close();

image.png

2.修改节点

 curatorConnect.client //连接客户端
.setData() //修改数据
.withVersion(0) //乐观锁控制
forPath(nodePath, data)//	节点路径 节点数据

示例:

   //修改节点
        curatorConnect.client
                .setData()
                .withVersion(0)
                .forPath(path,defaultValue);

image.png

3.查询节点

 curatorConnect.client //连接客户端
.getData() //获取数据
.storingStatIn(stat) //存储版本信息
forPath(nodePath, data) //	节点路径 节点数据

示例:

  Stat stat = new Stat();
        byte[] bytes = curatorConnect.client
                .getData()
                .storingStatIn(stat)
                .forPath(path);
        System.out.println(new String(bytes));

image.png

4.删除

 curatorConnect.client//连接客户端
.delete()//删除数据
.guaranteed()//不管网路状态 如果删除失败会去一直删除
.withVersion(0)//乐观锁控制
forPath(nodePath)

示例:

curatorConnect.client
                .delete()
                .guaranteed()
                .deletingChildrenIfNeeded()
                .forPath(path);

image.png

5.子节点操作

 curatorConnect.client//连接客户端
.getChildren() //获取子节点
.forPath(nodePath)//节点路径

6.使用watcher

1.获取数据时设置watcher

 curatorConnect.client
	连接客户端
.getData()
	获取数据
.usingWatcher()
	使用watcher
		监听只会触发一次
.forPath(nodePath)
	节点路径

示例

 curatorConnect.client
                .getData()
                .usingWatcher((Watcher) watchedEvent -> {
                    System.out.println(watchedEvent);
                })
                .forPath(path);
                curatorConnect.client
                .setData()
                .withVersion(1)
                .forPath(path,defaultValue);

image.png

2.使用NodeCache

new NodeCache( curatorConnect.client,nodePath) 连接客户端 节点路径
nodeCache.start()
不加true
	第一次访问不会自动将节点数据放在Nodecache
true
	第一次访问会自动将节点数据放在Nodecache
不支持事件的监听:
nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {

            }
        });
监听父节点,子节点改变监听

示例:


        CountDownLatch countDownLatch=new CountDownLatch(1);
        NodeCache nodeCache =new NodeCache( curatorConnect.client,path);
        nodeCache.start(true);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点改变了");
                countDownLatch.countDown();
            }
        });
        curatorConnect.client
                .setData()
                .withVersion(-1)
                .forPath(path,defaultValue);
        countDownLatch.await();
        curatorConnect.client.close();

image.png

3.使用PathChildrenCache

监听父节点,子节点改变监听:

new PathChildrenCache(curatorConnect.client,nodePath,true)
pathChildrenCache.start
	StartMode
		同步
			BUILD_INITIAL_CACHE
		异步
			NORMAL
			 POST_INITIALIZED_EVENT
				初始化就会触发化事件
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                
            }
        });

PathChildrenCacheEvent:

CHILD_ADDED
CHILD_UPDATED
CHILD_REMOVED
CONNECTION_SUSPENDED
CONNECTION_RECONNECTED
CONNECTION_LOST
INITIALIZED

示例:

PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorConnect.client, path,true);
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println(event);
            }
        });
                curatorConnect.client
                .setData()
                .withVersion(-1)
                .forPath(path,defaultValue);
        curatorConnect.client.close();

image.png

7.ACl

client.create().creatingParentsIfNeeded()
	 创建父节点,也就是会递归创建
.withACL()
	true将权限给与父目录
ZooDefs.Ids
	OPEN_ACL_UNSAFE
	CREATOR_ALL_ACL
	READ_ACL_UNSAFE
创建命名空间的时候,添加身份认证
	authorization(List<AuthInfo> authInfos)
	 authorization(String scheme, byte[] auth)

8.基于curator的统一配置管理

JsonUtils:

package cn.xxx.utils;



import java.util.List;



import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.JavaType;

import com.fasterxml.jackson.databind.ObjectMapper;



/**

 * JsonUtils 

 */

public class JsonUtils {



    // 定义jackson对象

    private static final ObjectMapper MAPPER = new ObjectMapper();



    /**

     * 将对象转换成json字符串。

     * <p>Title: pojoToJson</p>

     * <p>Description: </p>

     * @param data

     * @return

     */

    public static String objectToJson(Object data) {

    	try {

			String string = MAPPER.writeValueAsString(data);

			return string;

		} catch (JsonProcessingException e) {

			e.printStackTrace();

		}

    	return null;

    }

    

    /**

     * 将json结果集转化为对象

     * 

     * @param jsonData json数据

     * @param clazz 对象中的object类型

     * @return

     */

    public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {

        try {

            T t = MAPPER.readValue(jsonData, beanType);

            return t;

        } catch (Exception e) {

        	e.printStackTrace();

        }

        return null;

    }

    

    /**

     * 将json数据转换成pojo对象list

     * <p>Title: jsonToList</p>

     * <p>Description: </p>

     * @param jsonData

     * @param beanType

     * @return

     */

    public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {

    	JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);

    	try {

    		List<T> list = MAPPER.readValue(jsonData, javaType);

    		return list;

		} catch (Exception e) {

			e.printStackTrace();

		}

    	

    	return null;

    }

    

}


假设是redis的配置文件需要统一管理:
RedisConfig:

package cn.xxx.utils;



public class RedisConfig {



	private String type;	// add 新增配置	update 更新配置	delete 删除配置

	private String url;		// 如果是add或update,则提供下载地址

	private String remark;	// 备注

	

	public String getType() {

		return type;

	}

	public void setType(String type) {

		this.type = type;

	}

	public String getUrl() {

		return url;

	}

	public void setUrl(String url) {

		this.url = url;

	}

	public String getRemark() {

		return remark;

	}

	public void setRemark(String remark) {

		this.remark = remark;

	}

}


客户端:

package cn.xxx.curatorcheckclient;

import cn.xxx.utils.JsonUtils;
import cn.xxx.utils.RedisConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.CountDownLatch;

public class Client1 {

	public CuratorFramework client = null;
	public static final String zkServerPath = "127.0.0.1:2181";

	public Client1() {
		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		client = CuratorFrameworkFactory.builder()
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
				.namespace("workspace").build();
		client.start();
	}

	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}

	public final static String CONFIG_NODE_PATH = "/super";
	public final static String SUB_PATH = "/redis-config";
	public static CountDownLatch countDown = new CountDownLatch(1);

	public static void main(String[] args) throws Exception {
		Client1 cto = new Client1();
		System.out.println("client1 启动成功...");

		final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
		childrenCache.start(StartMode.BUILD_INITIAL_CACHE);

		// 添加监听事件
		childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
			@Override
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				// 监听节点变化
				if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
					String configNodePath = event.getData().getPath();
					if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
						System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);

						// 读取节点数据
						String jsonConfig = new String(event.getData().getData());
						System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);

						// 从json转换配置
						RedisConfig redisConfig = null;
						if (StringUtils.isNotBlank(jsonConfig)) {
							redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
						}

						// 配置不为空则进行相应操作
						if (redisConfig != null) {
							String type = redisConfig.getType();
							String url = redisConfig.getUrl();
							String remark = redisConfig.getRemark();
							// 判断事件
							if (type.equals("add")) {
								System.out.println("监听到新增的配置,准备下载...");
								// ... 连接ftp服务器,根据url找到相应的配置
								Thread.sleep(500);
								System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
								// ... 下载配置到你指定的目录
								Thread.sleep(1000);
								System.out.println("下载成功,已经添加到项目中");
								// ... 拷贝文件到项目目录
							} else if (type.equals("update")) {
								System.out.println("监听到更新的配置,准备下载...");
								// ... 连接ftp服务器,根据url找到相应的配置
								Thread.sleep(500);
								System.out.println("开始下载配置文件,下载路径为<" + url + ">");
								// ... 下载配置到你指定的目录
								Thread.sleep(1000);
								System.out.println("下载成功...");
								System.out.println("删除项目中原配置文件...");
								Thread.sleep(100);
								// ... 删除原文件
								System.out.println("拷贝配置文件到项目目录...");
								// ... 拷贝文件到项目目录
							} else if (type.equals("delete")) {
								System.out.println("监听到需要删除配置");
								System.out.println("删除项目中原配置文件...");
							}

							// TODO 视情况统一重启服务
						}
					}
				}
			}
		});

		countDown.await();

		cto.closeZKClient();
	}

}


对应json格式

{"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}

{"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}
{"type":"delete","url":"","remark":"delete"}

在对应命名空间创建节点

create /workspace/super/redis-config 0

更新配置:

create /workspace/super/redis-config {"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}

image.png