资料:zookeeper入门源码

基本概念

大数据生态系统里很多组件的命名都是某种动物,例如Hadoop是🐘,hive是🐝,zookeeper就是动物园管理者,是管理大数据生态系统各组件的管理员。

zookeeper是经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务。

应用场景

  • 维护配置信息
    Java编程经常会遇到配置项,例如数据库的user、password等,通常配置信息会放在配置文件中,再把配置文件放在服务器上。当需要修改配置信息时,要去服务器上修改对应的配置文件,但在分布式系统中很多服务器都需要使用该配置文件,因此必须保证该配置服务的高可用性和各台服务器上配置的一致性。通常会将配置文件部署在一个集群上,但一个集群涉及的服务器数量是很庞大的,如果一台台服务器逐个修改配置文件是效率很低且危险的,因此需要一种服务可以高效快速且可靠地完成配置项的更改工作。
    zookeeper就可以提供这种服务,使用Zab一致性协议保证一致性。hbase中客户端就是连接zookeeper获得必要的hbase集群的配置信息才可以进一步操作。在开源消息队列Kafka中,也使用zookeeper来维护broker的信息。在dubbo中也广泛使用zookeeper管理一些配置来实现服务治理。

  • 分布式锁服务
    一个集群是一个分布式系统,由多台服务器组成。为了提高并发度和可靠性,在多台服务器运行着同一种服务。当多个服务在运行时就需要协调各服务的进度,有时候需要保证当某个服务在进行某个操作时,其他的服务都不能进行该操作,即对该操作进行加锁,如果当前机器故障,释放锁并fall over到其他机器继续执行。

  • 集群管理
    zookeeper会将服务器加入/移除的情况通知给集群中其他正常工作的服务器,以及即使调整存储和计算等任务的分配和执行等,此外zookeeper还会对故障的服务器做出诊断并尝试修复。

  • 生成分布式唯一ID
    在过去的单库单表系统中,通常使用数据库字段自带的auto_increment熟悉自动为每条记录生成一个唯一的id。但分库分表后就无法依靠该属性来标识一个唯一的记录。此时可以使用zookeeper在分布式环境下生成全局唯一性id。每次要生成一个新id时,创建一个持久顺序结点,创建操作返回的结点序号,即为新id,然后把比自己结点小的删除。

设计目标

  • 高性能
    将数据存储在内存中,直接服务于客户端的所有非事务请求,尤其适合读为主的应用场景
  • 高可用
    一般以集群方式对外提供服务,每台机器都会在内存中维护当前的服务状态,每台机器之间都保持通信。只要集群中超过一般机器都能正常工作,那么整个集群就能够正常对外服务。
  • 严格顺序访问
    对于客户端的每个更新请求,zookeeper都会生成全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序。

数据结构

zookeeper的数据结点可以视为树状结构(或者目录),树中各节点成为znode,一个znode可以有多个子结点。zookeeper结点在结构上表现为树状,使用路径来定位某个znode。
znode兼具文件和目录两种特点,既像文件一样维护着数据、元信息、ACL、时间戳等数据结构,又像目录一样可以作为路径标识的一部分。

znode大体上分为三部分(使用get命令查看)
①结点的数据
②结点的子结点
③结点的状态

结点类型(在创建时被确定且不能更改)
①临时结点:生命周期依赖于创建它的会话,会话结束结点将被自动删除。临时结点不允许拥有子结点。
②持久化结点:生命周期不依赖于会话,只有在客户端显式执行删除操作时才被删除。

安装(我恨linux)

使用centos8虚拟机,先创建一个zookeeper用户

zookeeper是基于jdk的,先安装jdk1.8:安装JDK

安装zookeeper(我是真的烦Linux 找个下载地址找一年 还慢死):

wget  http://mirrors.hust.edu/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

30kb/s 慢慢等吧

终于下载完了,tar -xzvf zookeeper-3.4.14.tar.gz进行解压

然后进入zookeeper的conf配置目录,复制zoo_sample.cfg 命名zoo.cfg。然后回到上级目录,创建一个data目录。

然后进入conf目录,使用vi命令编辑刚才复制的zoo.cfg配置文件,打开后按a进入编辑模式。

修改datadir为刚才创建的data目录,然后按esc,再按:到文件末尾,使用wq保存退出。

卧槽…居然成功啦
cd ..返回zookeeper根目录,再cd bin进入bin目录,./zkServer.sh start 启动zookeeper,等待一段时间后,./zkServer.sh status查看zookeeper是否成功启动,Mode显示的是当前是单机状态

通过客户端连接本机zookeeper服务./zkCli.sh,登录成功后会打印日志信息

关闭zookeeper:

基本命令

新增结点

//-s为有序结点,-e为临时结点
create [-s] [-e] path data 

先启动服务器

确实已经启动

通过客户端连接服务器

  • 创建持久化结点
    path是/hadoop 数据是“123456” 不加选项默认是持久化结点

    通过get读取数据

    持久化结点与当前会话无关,quit退出会话后再次登陆,get数据还在

  • 创建持久化有序结点
    创建路径为/a,会自动补为/a0000000001

  • 创建临时结点
    临时结点在会话结束后会被删除

    quit结束会话再次登陆,不能读取到数据

  • 创建临时有序结点(用于分布式锁)

更新结点

更新结点的命令是set,可以直接进行修改

修改/hadoop的值从123456变为i want offer

成功

也可以基于版本号更改,类似CAS机制,当传入数据版本号和当前不一致时拒绝修改,初始版本号dataVersion为0,每次修改后会加1

当前版本为1,修改时使用1版本号,修改完变为2

版本号为2时,使用3会失败


删除结点

使用delete命令,同样可以传入版本号,如果版本号不符合也不会执行
版本号为2时,使用3删除失败,使用2删除成功

如果当前结点下有子结点,不能删除,如果要删除需要使用rmr

查看结点

使用get查看结点的属性和数据

cZxid 数据结点创建时的事务id
ctime 数据结点创建时间
mZxid 数据结点最后一次更新时的事务id
mtime 数据结点最后一次更新的时间
pZxid 子结点最后一次修改的事务id
cversion 子结点的更改次数
dataVersion 结点数据更改次数
aclVersion 结点ACL的更改次数
ephemeralOwner 如果是临时结点,表示会话的sessionID;如果是持久结点值为0
dataLength 数据内容长度
numChildren 子结点数

使用stat只返回属性 没有数据

查看结点列表
使用ls pathls2 path,ls2是ls的增强,除了列出子结点还有当前结点的属性

监听器(维护配置信息)

使用get path watch,监听器只能使用一次
左边客户端使用监听器,右边客户端更改了数据,左边监听到了数据的更改

也可以使用stat path watch

还可以使用ls/ls2 path watch 可以监听到子结点

ACL权限控制

类似linux针对文件的权限控制
acl权限控制使用scheme:id:permission标识,
①scheme :权限模式

  • world 只有一个用户,代表登陆zookeeper的所有人(默认)
  • ip 对客户端使用ip地址认证
  • auth 使用已添加认证的用户认证
  • digest 使用用户名:密码方式认证

②id :授权对象
③permission :授予的权限

  • create 简写c 可以创建子结点
  • delete 简写d 可以删除子结点(仅下一级)
  • read 简写r 可以读取结点数据以及显示子结点
  • write 简写w 可以设置结点数据
  • admin 简写a 可以设置节点访问控制列表权限

例如setAcl /hadoop ip:192.168.2.142:crwda 表示ip地址为192.168.2.142的客户端对/hadoop有全部权限
getAcl path 读取权限
setAcl path acl 设置权限
addauth schema suth 添加认证用户

world授权模式

setAcl path world:anyone:<acl>
例如取消c创建权限,就不能再创建子结点

取消d权限,不能删除子结点

ip授权模式

命令setAcl path ip:<ip>:<acl>
要用两个虚拟机,这个就不演示了…
主要就是限制客户端ip地址的

auth模式

`addauth digest <user>:<password> `添加认证用户
`setAcl <path> auth:<user>:<acl> 	` 授权


另一台客户端如果没有添加该用户就不能读取

digest授权模式

setAcl <path> digest:<user>:<password>:<acl>
这里的密码是经过SHA1及BASE64处理的密文
通过echo -n sjh:sjh2019. | openssl dgst -binary -sha1 | openssl base64生成密文,复制该结果

添加结点/node4,使用digest模式授权,因为之前添加过sjh密码sjh019.的用户,所以可以访问

  • 多种授权模式
    同一个结点可以使用多种授权模式,用逗号隔开就行,例
  • 超级管理员
    假设超级管理员的账号是super:admin,先要为其生成密文:

    得到密文xQJmxLMiHGwaqBvst5y6rkB6HQs=
    在zookeeper目录下/bin/zkServer.sh服务器脚本文件,找到这一行

    "-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="加入这一句

    修改完按esc ,按:, 输入wq保存退出
    重启

    使用客户端登陆,创建/node6,取消创建子结点权限

    此时不能创建子结点

    添加超级管理员用户,此时可以成功添加子结点

IDEA操作zookeeper

操作流程

  • 连接到zookeeper服务器(好害怕连不上- - 。。。昨天GitHub连上了,不知道今天能不能再撞一次运气。。连不上这篇文章也就到此为止了。。。)
  • 定期向服务器发送心跳,否则会过期
  • 会话处于活动状态就可以获取/设置znode
  • 所有任务完成后,断开连接

连接到zookeeper

创建一个新的Java工程,导入一下jar包

妈的连了快俩小时终于搞定了!!!

导完jar包后还需要导入一个log4j文件,创建一个连接zookeeper的测试类

public class zookeeperTest {

    public static void main(String[] args) {
        ZooKeeper zooKeeper = null;
        try{
            //创建一个计数器对象
            CountDownLatch countDownLatch=new CountDownLatch(1);
            //第一个参数是服务器ip和端口号,第二个参数是客户端与服务器的会话超时时间单位ms,第三个参数是监视器对象
            zooKeeper=new ZooKeeper("192.168.2.142:2181", 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState()==Event.KeeperState.SyncConnected){
                        System.out.println("连接创建成功");
                        //通知主线程解除阻塞
                        countDownLatch.countDown();
                    }
                }
            });
            //主线程阻塞,等待连接对象的创建成功
            countDownLatch.await();
            System.out.println("会话编号"+zooKeeper.getSessionId());
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(zooKeeper!=null) {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

由于连接是异步的,所以用countdownlatch确保连接成功再继续
一直失败!
最后关了防火墙。。。就成功了!!!


创建结点

在Linux客户端先创建一个/create结点

在IDEA创建一个子结点/node1

public class zkCreate {

    private static final String IP="192.168.2.142:2181";
    private static ZooKeeper zooKeeper;

    @Before
    public void connect() throws Exception{
        //创建一个计数器对象
        CountDownLatch countDownLatch=new CountDownLatch(1);
        //第一个参数是服务器ip和端口号,第二个参数是客户端与服务器的会话超时时间单位ms,第三个参数是监视器对象
        zooKeeper=new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState()==Event.KeeperState.SyncConnected){
                    System.out.println("连接创建成功");
                    //通知主线程解除阻塞
                    countDownLatch.countDown();
                }
            }
        });
        //主线程阻塞,等待连接对象的创建成功
        countDownLatch.await();
    }

    @After
    public void close() throws Exception{
        zooKeeper.close();
    }

    @Test
    public void create1() throws Exception{
        //同步创建结点
        // 参数1 结点路径
        // 参数2 结点数据
        // 参数3权限列表 OPEN_ACL_UNSAFE代表world方式授权 cdrwa
        // 参数4 结点类型 persistent表示持久化结点
        zooKeeper.create("/create/node1","i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}

运行后在Linux客户端查看

创建一个只有读权限的数据

	@Test
    public void create2() throws Exception{
        //  OPEN_ACL_UNSAFE代表world方式授权 r只能读
        zooKeeper.create("/create/node2","i want offer".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

运行后查询

自定义设置权限列表
使用world模式,设置有创建和删除权限

	@Test
    public void create3() throws Exception{
        //自定义方式设置权限
        List<ACL> acls=new ArrayList<>();
        Id id=new Id("world","anyone");
        acls.add(new ACL(ZooDefs.Perms.CREATE,id));
        acls.add(new ACL(ZooDefs.Perms.DELETE,id));
        zooKeeper.create("/create/node4","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
    }

ip模式授权

@Test
    public void create4() throws Exception{
        //ip方式设置权限
        List<ACL> acls=new ArrayList<>();
        Id id=new Id("ip","192.168.2.142");
        acls.add(new ACL(ZooDefs.Perms.CREATE,id));
        zooKeeper.create("/create/node4","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
    }

auth方式授权

@Test
    public void create5() throws Exception{
        //auth方式设置权限
        zooKeeper.addAuthInfo("digest","sjh:sjh2019.".getBytes());
        zooKeeper.create("/create/node5","i want offer".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    }

digest方式授权

 @Test
    public void create6() throws Exception{
        //digest方式设置权限
        List<ACL> acls=new ArrayList<>();
        Id id=new Id("digest","sjh:base64和sha1加密后的密码");
        acls.add(new ACL(ZooDefs.Perms.CREATE,id));
        zooKeeper.create("/create/node5","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
    }

创建持久化有序结点

@Test
    public void create7() throws Exception{
        //持久化有序结点
        String s = zooKeeper.create("/create/node7", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println(s);
    }

运行结果:

创建临时结点

@Test
    public void create8() throws Exception{
        //创建临时结点
        String s = zooKeeper.create("/create/node8", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(s);
    }

创建临时有序结点

@Test
    public void create9() throws Exception{
        //创建临时结点
        String s = zooKeeper.create("/create/node9", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(s);
    }

异步创建结点

	 @Test
    public void create10() throws Exception{
        //异步创建结点
        zooKeeper.create("/create/node11", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                System.out.println("创建状态: "+rc);//0表示创建成功
                System.out.println("path: "+path);//结点路径
                System.out.println("name: "+name);//结点路径
                System.out.println("ctx: "+ctx);//上下文
            }
        },"context");
    }

运行结果,由于是异步的,可能还没有输出就结束了

调用sleep方法让线程休眠,等待zookeeper操作执行完毕

成功:


更新结点

先查询/create/node1结点的当前值为i wangt offer

同步更新结点:

	@Test
    public void set1() throws Exception{
        //同步更新结点
        //第一个参数 结点路径
        //第二个参数 要修改的值
        //第三个参数 数据版本 -1代表版本号不参与更新
        zooKeeper.setData("/create/node1","2020GetGoodOffer".getBytes(),-1);
    }

运行完成后,再次查询该结点,值已经更改为2020GetGoodOffer

异步修改结点

@Test
    public void set2() throws Exception{
        //异步更新结点
        //第一个参数 结点路径
        //第二个参数 要修改的值
        //第三个参数 数据版本 -1代表版本号不参与更新
        //第四个参数 匿名回调函数
        //第五个参数 上下文参数
        zooKeeper.setData("/create/node1", "2020 Get Offer!!!".getBytes(), -1, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);
                System.out.println("ctx: "+ctx);
                System.out.println("stat: "+stat);
            }
        },"context");
        Thread.sleep(1000);
    }

结果:

删除结点

同步删除

@Test
    public void del1() throws Exception{
        //同步删除数据
        //第一个参数表示删除结点的路径
        //第二个参数表示删除结点的数据版本 -1表示删除时不考虑版本信息
        zooKeeper.delete("/create/node1",-1);
    }

此时数据已不存在

异步删除

 @Test
    public void del2() throws Exception{
        //异步删除数据
        zooKeeper.delete("/create/node2", -1, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);
                System.out.println("ctx: "+ctx);
            }
        },"context");
        Thread.sleep(1000);
    }

运行结果


查看结点

同步查看结点

@Test
    public void get1() throws Exception{
        //同步读取数据
        //第一个参数是路径
        //第二个参数是watch,先填false(以后在讲)
        //第三个参数用于获取结点属性
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData("/create/node3", false, stat);
        System.out.println(new String(data));//数据
        System.out.println(stat);//属性
    }

结果

异步查看结点

@Test
    public void get2() throws Exception{
        //异步读取数据
        //第一个参数是路径
        //第二个参数是watch,先填false(以后在讲)
        //第三个参数是匿名回调函数
        Stat stat = new Stat();
        zooKeeper.getData("/create/node3", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);//结点路径
                System.out.println("ctx: "+ctx);//上下文参数
                System.out.println(new String(data));//数据
                System.out.println(stat);//属性
            }
        },"context");
        Thread.sleep(1000);
    }

运行结果:


查看子结点

创建测试数据

同步

	@Test
    public void getChild1() throws Exception{
        //同步
        //第一个参数是父路径
        //第二个参数是watch,先填false(以后在讲)
        List<String> children = zooKeeper.getChildren("/create/father", false);
        for(String str:children)
            System.out.println(str);
    }

运行结果:

异步

@Test
    public void getChild2() throws Exception{
        //异步
        //第一个参数是父路径
        //第二个参数是watch,先填false(以后在讲)
        //第三个参数是匿名回调函数
        zooKeeper.getChildren("/create/father", false, new AsyncCallback.ChildrenCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, List<String> children) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);//结点路径
                System.out.println("ctx: "+ctx);//上下文参数
                System.out.println(children);//子结点信息
            }
        },"context");
        Thread.sleep(1000);
    }

运行结果


检查结点是否存在

同步

	@Test
    public void exists1() throws Exception{
        //同步判断
        //第一个参数是路径
        //第二个参数是watch,先填false(以后在讲)
        Stat exists = zooKeeper.exists("/create/null", false);
        System.out.println(exists==null?"不存在":"存在");
    }

运行结果:

异步

@Test
    public void exists2() throws Exception{
        //异步判断
        //第一个参数是路径
        //第二个参数是watch,先填false(以后在讲)
        //第三个参数是匿名回调函数
        //第四个参数上下文
        zooKeeper.exists("/create/null", false, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);//结点路径
                System.out.println("ctx: "+ctx);//上下文参数
                System.out.println(stat==null?"不存在":"存在");
            }
        },"context");
        Thread.sleep(1000);
    }

运行结果:


更多推荐

【菜鸟教程】Zookeeper基础入门【上】