ZooKeeper初探及使用场景


认识ZooKeeper

ZooKeeper是一个分布式调度协调服务,分布式应用的产生必定会带来多节点之间的协调问题,这个时候ZooKeeper就出现了。很多开源中间件都用到了ZooKeeper,比如KafKa就是强依赖于ZooKeeper.它自身也是一个分布式应用。

ZooKeeper节点特性

ZooKeeper共有4种节点,分别是持久节点,持久序号节点,临时节点,临时序号节点

特性 创建命令
持久节点 持久化保存的节点,默认创建 create /test
持久序号节点 持久化保存的节点,创建的时候会在路径上加上序号作为后缀比如/test会创建出/test00000001 create -s /test
临时节点 临时节点会在客户端删除的时候自动删除 create -e /test
临时序号节点 临时节点会在客户端删除的时候自动删除,创建的时候会在路径上加上序号作为后缀比如/test会创建出/test00000001 create -e -s /test
Container节点 Container节点目录下没有子节点会在zookeeper内部的定时任务轮训下自动删除,(刚开始的没有不算,只有有了子节点后被清除才会有之前的逻辑) create ‐c /test
TTL 节点 禁用,需在系统配置增加 zookeeper.extendedTypesEnabled=true, create ‐t 500 /test

Zookeeper ACL 权限控制

权限信息

  • 创建权限(c: create):授予权限的对象可以在数据节点下创建子节点
  • 更新权限(w: wirte):授予权限的对象可以更新该数据节点
  • 读取权限(r: read):授予权限的对象可以读取该节点的内容以及子节点的列表信息
  • 删除权限(d: delete):授予权限的对象可以删除该数据节点的子节点
  • 管理者权限(a: admin):授予权限的对象可以对该数据节点体进行 ACL 权限设置

ACL的设置有2种方式:

  1. 创建节点的时候直接设置

    create /test-node datatest digest:test:V28q/NynI4JI3Rk54h0r8O5kMug=:cdrwa
    
  2. 单独设置

    setAcl /test-node digest:test:V28q/NynI4JI3Rk54h0r8O5kMug=:cdrwa
    

    授权过后不能直接访问需要授权才能访问

    授权

    addauth digest test:test
    

    上诉授权是密文,如果想要明文授权我们需要先获取权限在进行授权

    addauth digest test:test
    create /test-node1 test auth:test:test:cdwra
    

    最后还有一种授权方式是对ip进行授权

    create /test-node2 data ip:192.168.186.131:cdwra
    setAcl /test-node2 ip:192.168.186.131:cdwra
    

超级管理员模式

启动脚本添加

-Dzookeeper.DigestAuthenticationProvider.superDigest=zk:N0YquoLgOZWu74hzsd3OJTZZUw0=

具体可看zookeeper安装博文

ZooKeeper集群

ZooKeeper集群主要解决的是高可用读写分离提高承载能力。ZooKeeper集群必须保证有至少三个节点,不然部署失败

集群角色

角色名称 功能
leader 主节点,领导者 写数据,通过选举产生
follower 从节点,追随者 读数据,主节点的备选节点,拥有投票权
observer 次级子节点,观察者 读数据,无投票权,不能选举为主节点,计算集群可用状态不会将其计算在内

数据提交机制

ZooKeeper是一个CP架构,它专注于的是一致性,为什么它拥有一致性?

流程:客户端会向服务端发起写请求,如果请求对象不是主节点,从节点会把请求转发给主节点,数据写进主节点,主节点发送请求proposal给从节点,从节点写进事务日志后给主节点一个ack,主节点统计ack数量,如果ack数量是集群半数以上,主节点又会发起一个commit请求给客户端,客户端记录事务提交,并把数据更新到内存中去,主节点告诉客户端我写好了,在同步数据这段时间,ZooKeeper对于客户端而言是不可用的,多以它不满足可用性,所以他是CP架构。

在集群运行过程当中如果有一个follower节点宕机,由于宕机节点没过半,集群仍然能正常服务。当leader收到新的客户端请求,此时无法同步给宕机的节点。造成数据不一致。为了解决这个问题,当节点启动时,第一件事情就是找当前的Leader,比对数据是否一致。不一致则开始同步,同步完成之后在进行对外提供服务。

如何比对Leader的数据版本呢?这里通过ZXID事物ID来确认。比Leader就需要同步。

ZXID: ZXID是一个长度64位的数字,其中低32位是按照数字递增,任何数据的变更都会导致,低32位的数字简单加1。高32位是leader周期编号,每当选举出一个新的leader时,新的leader就从本地事物日志中取出ZXID,然后解析出高32位的周期编号,进行加1,再将低32位的全部设置为0。这样就保证了每次新选举的leader后,保证了ZXID的唯一性而且是保证递增的。

集群选举机制

  • 选票pk成功的条件

    // FastLeaderElection.totalOrderPredicate
    return ((newEpoch > curEpoch) ||
                    ((newEpoch == curEpoch) &&
                    ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    

    从代码可以看出

    选举成功3个条件符合其一

    1. 选举周期大的
    2. 周期相等选事务id大的
    3. 周期相等,事务id相等,选serverid大的,serverid就是配置文件配的集群server.*,也就是myid

    选举成功的条件

    // FastLeaderElection.termPredicate
    voteSet.hasAllQuorums()
    // hasAllQuorums
    !qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())
    // QuorumMaj.containsQuorum
    (ackSet.size() > half);
    // QuorumMaj
    half = votingMembers.size() / 2;
    

    从这段代码可以看出选举成功就是对某一节点的选票超过半数即为选举成功

    选举触发条件

    • 节点初始化启动时

      节点启动的时候会在集群中寻找是否有leader,找到了就与leader建立通信,自身变为follower或observer,没有找到Leader,当前节点记录为LOOKING,开始选举

    • 半数以上节点无法和Leader建立通信

      Leader一挂,只要有半数节点感知到Leader挂了,就所有follower进入到LOOKING状态,开始选举

    选举流程

    每个节点启动后都会默认给自己投票,收到其他节点会通过选票pk成功的条件判断谁获胜了,当前节点会更改自己的选票为获胜方,下一轮选举会发给其他节点,最后统计选票,选票超过半数即为LEADING,若自己节点不是LEADING节点就是FOLLOWER或OBSERVER。

    如果集群已经选举完成,新节点加入集群,会首先发送选票给其他节点,其他节点接收到选票后由于已经有LEADING,会把LEADING节点发送给新节点,新节点更改为FOLLOWER.

Q&A

  • 为什么建议集群奇数个节点?

    节省资源,3节点和4节点都是挂2个节点不可用,所以用4个节点站在可用性的角度来看没意义

  • 为什么要有半数机制?

    脑裂问题,假设你有4个节点,由于网络抖动分裂成了2个集群,然后这2个集群都会进行选举,会选举出2个leader,网络恢复后就有2个leader,肯定是有问题的,但半数机制就保证了这种问题不会发生,就算网络抖动也只会选举出一个leader

ZooKeeper使用场景

分布式锁

在单体应用中,我们可以用synchronized或者ReentrantLock来实现锁机制。但在分布式应用中不行,因为synchronized或者ReentrantLock只针对于一个JVM中可以使用。所以我们必须使用分布式锁来实现锁机制。可以使用Redis或ZooKeeper实现,Redis就不多说了。

实现分布式锁的主要逻辑就是利用临时序号节点,我创建一个临时序号节点,继续创建临时序号节点会自增最后形成一堆有序的节点,通过序号我们可以知道哪个节点在前哪个节点在后,序号最小的节点才有资格拿锁,后一个节点监听前一个节点,当前一个节点被删除了,我后一个节点就可以去尝试激活锁了。

这里为什么不直接每个节点监听加锁节点,加锁节点删除在找到最小序号去加锁呢?

这里主要是防止羊群效应,一旦加锁节点被释放就会有大量的节点被触发,然后反查Lock子节点可能会让zookeeper承受巨大的压力。

这里写一个简单的demo

private String server = "你的zookeeper地址";
private ZkClient zkClient;
private static final String rootPath = "/dm-lock";

public ZookeeperLock() {
    zkClient = new ZkClient(server, 5000, 20000);
    // 创建根节点
    if (!zkClient.exists(rootPath)) {
        zkClient.createPersistent(rootPath);
    }
}

/**
 * 加锁
 */
public Lock lock(String lockId, long timeout) {
    // 创建临时序号节点
    Lock lockNode = createLockNode(lockId);
    // 尝试拿锁
    lockNode = tryActiveLock(lockNode);
    // 没有拿到锁
    if (!lockNode.isActive()) {
        try {
            synchronized (lockNode) {
                // 把线程hang在这里然后等待监听事件触发,拿到锁,释放线程
                // 在tryActiveLockelse逻辑里面
                lockNode.wait(timeout);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    // 线程hang住的时间里面还没拿到锁就抛异常
    if (!lockNode.isActive()) {
        throw new RuntimeException("lock timeout");
    }
    return lockNode;
}

/**
 * 释放锁
 */
public void unlock(Lock lock) {
    if (lock.isActive()) {
        zkClient.delete(lock.getPath());
    }
}

/**
 * 尝试激活锁
 */
private Lock tryActiveLock(Lock lockNode) {

    //  获取根节点下面所有的排好序的子节点
    List<String> list = zkClient.getChildren(rootPath)
        .stream()
        .sorted()
        .map(p -> rootPath + "/" + p)
        .collect(Collectors.toList());

    // 取最小的节点,这个节点就应该是应该加锁的节点
    String firstNodePath = list.get(0);
    // 最小节点是不是当前节点,是就直接加锁
    if (firstNodePath.equals(lockNode.getPath())) {
        lockNode.setActive(true);
    } else {
        // 取得当前节点减一,要监听这个节点
        String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
        zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }

            // 前一个节点被释放回调
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                // 重新尝试拿锁
                Lock lock = tryActiveLock(lockNode);
                synchronized (lockNode) {
                    // 之前同步块的线程释放
                    if (lock.isActive()) {
                        lockNode.notify(); 
                    }
                }
                zkClient.unsubscribeDataChanges(upNodePath, this);
            }
        });
    }
    return lockNode;
}

/**
 * 创建临时序号节点
 */
public Lock createLockNode(String lockId) {
    String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
    return new Lock(lockId, nodePath);
}

当然这里的代码还是会有一些问题的,比如重试机制的处理,幽灵节点的处理,这里只是更好的理解分布式锁,建议直接使用Curator客户端。

Curator分布式锁使用及源码

基本使用

// 配置类
@Configuration
public class CuratorConfig {
    @Bean(initMethod = "start")
    public CuratorFramework curatorFramework() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        return CuratorFrameworkFactory.newClient(ZookeeperConstants.SERVER, retryPolicy);
    }
}
@Service
public class MutexLock {

    @Autowired
    CuratorFramework curatorFramework;

    public Object handler(Integer id) throws Exception {
        InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/product_"+id);
        try {
            interProcessMutex.acquire();
            // 模拟业务执行
            Thread.sleep((new Random()).nextInt(10)*1000);

        } catch (Exception e) {
            if (e instanceof RuntimeException) {
                throw e;
            }
        }finally {
            interProcessMutex.release();
        }
        return "ok";
    }
}

原理和之前使用的zkClient原理基本一致

  1. 请求进来,没有根节点,创建容器节点作为根节点,
  2. 在跟节点下创建临时序号节点
  3. 判断节点是不是最小的节点,是最小的节点直接获取锁,不是就监听前一个节点
  4. 获得锁的请求,处理完业务代码释放锁删除节点然后监听它的节点收到通知,走第三步

源码:

// 从interProcessMutex.acquire();进入代码
@Override
public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

    // 获取当前线程
    Thread currentThread = Thread.currentThread();
    // threadData是一个Map,key是当前线程,value是LocalData LocalData里面主要是加锁路径和加锁次数
    LockData lockData = threadData.get(currentThread);
    // 不是空说明当前线程已经获得锁了
    if ( lockData != null )
    {
        // re-entering
        // 这里对LocalData里面的lockCount自增,这里说明了锁的可重入特性
        lockData.lockCount.incrementAndGet();
        return true;
    }

    // 这里是加锁逻辑,主要方法是attemptLock
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    // 加锁成功
    if ( lockPath != null )
    {
        // 加锁信息放进threadData里面
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

attemptLock方法:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    final long      startMillis = System.currentTimeMillis();
    final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int             retryCount = 0;

    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;
    while ( !isDone )
    {
        isDone = true;

        try
        {
            // 主要逻辑
            // 加锁逻辑
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 判断节点是不是最小的节点,是最小的节点直接获取锁,不是就监听前一个节点
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // gets thrown by StandardLockInternalsDriver when it can't find the lock node
            // this can happen when the session expires, etc. So, if the retry allows, just try it all again
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}

加锁逻辑createsTheLock方法:

public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
    String ourPath;
    if ( lockNodeBytes != null )
    {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
    }
    else
    {
        // 这里会创建一个根节点是容器节点(子节点全部删完会删除父节点,子节点全部删了就没有必要维护父节点了,所以直接删除掉),还有子节点是临时序号节点
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    return ourPath;
}

internalLockLoop:

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
    // 是否获取锁
    boolean     haveTheLock = false;
    boolean     doDelete = false;
    try
    {
        if ( revocable.get() != null )
        {
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }
        
        // 自旋直至获得锁,或者到达超时时间
        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
        {
            // 获取所有子节点并排序
            List<String>        children = getSortedChildren();
            // 处理当前节点把父路径截取仅获取后面,sequenceNodeName在children列表中
            String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            // 判断节点是否是最小
            PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            // 获取锁
            if ( predicateResults.getsTheLock() )
            {
                // 更新flag
                haveTheLock = true;
            }
            // 没有拿到锁开启监听
            else
            {
                // 监听节点
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                // 加锁等待,watcher里面会释放锁,继续while循环
                synchronized(this)
                {
                    try
                    {
                        // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                        // 监听
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        // 有millisToWait就等待一段时间
                        if ( millisToWait != null )
                        {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if ( millisToWait <= 0 )
                            {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }
                        
                            wait(millisToWait);
                        }
                        // 一直等待
                        else
                        {
                            wait();
                        }
                    }
                    catch ( KeeperException.NoNodeException e )
                    {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    }
    finally
    {
        if ( doDelete )
        {
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}

getsTheLock方法:

// 判断节点是否是最小
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
    // 看sequenceNodeName在children中的位置
    int             ourIndex = children.indexOf(sequenceNodeName);
    validateOurIndex(sequenceNodeName, ourIndex);
    // maxLeases是1的逻辑在LockInternals的构造方法中。
    // maxLeases是1,只要ourIndex是0就表示最小,这里是互斥锁,如果是读写锁maxLeases就不是1后面会说到
    boolean         getsTheLock = ourIndex < maxLeases;
    // 要监听的节点如果拿到锁了不监听pathToWatch是null.否则监听前一个节点
    String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
    // 基本信息封装
    return new PredicateResults(pathToWatch, getsTheLock);
}

分布式注册中心

在这里,首先得了解注册中心是什么?单体应用升级到分布式应用,肯定会有成千上万个服务节点不同的客户端会调用这些服务节点,如果把这些服务节点配置到客户端的话会很复杂,这时候就需要一个中间件来帮助我们找到我们需要调用的服务,这就是服务发现。

一般注册中心需要一下几个功能,服务注册,服务发现,服务订阅

  • 服务注册:服务刚启动将自己的信息注册到注册中心
  • 服务发现:客户端通过注册中心找到自己要调用的服务,然后通过RPC进行调用
  • 服务订阅:能动态感知服务的变化

我们可以把注册的服务写成临时节点,只要服务一下线就会自动删除对应的服务。


文章作者: dm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dm !
评论
 上一篇
消息队列常见问题及解决方案 消息队列常见问题及解决方案
当前市场常见的几种消息中间件就有比如说RabbitMQ、RocketMQ、Kafka,他们都各有优势,下面会介绍他们之间的差别和优缺点。这里不针对某种消息队列,常见消息队列糅杂在一起谈谈 常用消息中间件简单介绍只是简单介绍几种常用消息中间件
2023-09-08
下一篇 
ZooKeeper安装及常用命令 ZooKeeper安装及常用命令
发现zookeeper3.5.3之后出了许多新特性,就趁此时机安装一下较新版本zookeeper,顺便记录一下安装步骤,方便以后查验。 首先准备zookeeper,在这里我使用的是centos7,java环境是1.8.安装的zookeepe
2023-08-01
  目录