zookeeper

BigData / 2021-01-19

简介

​ 分布式数据一致性解决方案

  • 维护配置信息
  • 分布式锁
  • 集群管理
  • 生成分布式唯一ID

优点

  1. 高性能,内存中存储数据
  2. 高可用,可配置集群
  3. 严格数据访问顺序

单机安装

官网下载

解压

tar zxvf

在zookeeper根目录下生成一个data文件夹用于存放内存数据持久化产生的数据

mkdir data

配置:conf目录下zoo_sample.cfg复制一份

cp zoo_sample.cfg zoo.cfg

编辑配置

vim zoo.cfg

配置数据目录,端口号


dataDir=/usr/local/zookeeper-3.6.2

clientPort=2181

  • tickTime:心跳,服务器与客户端心跳时间
  • initLimit:从节点第一次链接主节点能容忍的心跳次数
  • syncLimit:同步通信时限
  • dataDir:数据目录,持久化数据路径
  • clientPort:服务端暴露的端口

启动

bin/zkServer.sh start

停止

bin/zkServer.sh stop

状态

bin/zkServer.sh status

内部原理

  1. 选举机制

假设要部署五台zookeeper服务器,那么选票为3的服务器为master

部署第一台服务器,第一台服务器选票为1

部署第二台服务器,都投票给第二台,第二台2票

部署第三台服务器,都投票给第三台,第三台3票

  1. 半数机制

集群中半数机器存活集群可用,所以适合安装奇数台服务器

  1. 节点类型

数据模型

树形结构,每个节点叫znode,znode分两种

  • 持久:client和server断开,znode不删除,可以有子节点
  • 短暂:client和server断开,znode删除,不能有子节点。应用:判断client是否宕机。

znode大小有限制,至多1M,znode路径必须使用绝对路径,也就是路径一定是以/ 开头的。

znode三部分组成:

  • stat:状态
  • data:数据
  • children:子节点

znode序列化:可以指定每个znode名字会带数字,数字的顺序表示znode顺序。

分布式部署

把第一台服务器的zookeeper文件远程拷贝到其他的服务器

scp -r zookeeper root@host:/usr/local

在 zookeeper/data 下创建文件 myid,填写本机在集群中的ID,三个主机构成集群,则myid分别是1,2,3

vim myid

在每个主机的配置文件中添加server信息

vim conf/zoo.cfg

server.1=host:2888:3888
server.2=host:2888:3888
server.3=host:2888:3888

启动服务

bin/zkServer.sh start

操作命令

查看所有命令

jll

创建znode:

create [-e] [-s] path data

  • 什么参数也不加:永久性非序列化节点
  • -s:序列化节点
  • -e:临时节点

查看目录下znode

ls

修改节点数据

set path data

获取节点数据

get path

删除节点

delete path

迭代删除

rmr path

znode 节点属性

  • dataVersion:数据版本号, 每次修改增加1
  • cversion:子节点版本号,子节点有修改增加1
  • cZxid:znode创建的事务的id
  • ctime:创建时间
  • mtime:修改时间
  • ephemeralOwner:1:临时节点,0:非临时节点

watch机制

给znode设置监听,设置一个watcher只能用一次

给一个znode设置一个监听

get -s -w path

事件类型:

image.png

JavaAPI

框架:Curator

  • Curator-framework:对zookeeper底层api封装
  • curator-recipes:封装了一些高级特性

maven:

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

代码:

// 参数:重试时间,重试最大次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);

        // 多个节点,分割,session超时,链接超时,重试策略
        CuratorFramework client = CuratorFrameworkFactory.newClient("host:2181", 1000, 1000, retryPolicy);

        client.start();

        // 删除节点
        client.delete().forPath("/hello2");

        // 创建节点同时创建上级znode,创建策略mode,路径数据。
        /**
         * mode:PERSISTENT永久节点
         *
         */
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello2", "data".getBytes(StandardCharsets.UTF_8));

        // 修改节点数据
        client.setData().forPath("/hello2", "newData".getBytes(StandardCharsets.UTF_8));

        // 获取数据
        byte[] hello2s = client.getData().forPath("/hello2");
        String hello2Str = new String(hello2s);
        System.out.println("hello2Str: " + hello2Str);

        // watch 监控机制,定义监听哪个节点
        TreeCache treeCache = new TreeCache(client, "/hello2");

        // 定义监听器
        treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
            ChildData data = treeCacheEvent.getData();
            if (data != null){
                switch (treeCacheEvent.getType()){
                    case NODE_ADDED:
                        System.out.println("节点增加");
                        break;
                    case NODE_REMOVED:
                        System.out.println("节点删除");
                        break;
                    case NODE_UPDATED:
                        System.out.println("节点更新");
                        break;
                    default:
                        System.out.println("。。。。");
                        break;
                }
            }
        });

        // 开始监听
        treeCache.start();

        // 延长监听时间
        Thread.sleep(100000);

        client.close();