2025-01-22  阅读(125)
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://www.skjava.com/mianshi/baodian/detail/2805335292

回答

雪花算法(Snowflake Algorithm)是分布式环境下非常经典的一种 分布式唯一 ID 生成算法,由 Twitter 的工程师设计出来的。它的核心目标是 高效生成全局唯一的、趋势递增的 ID,而且生成速度快、并发性能好,完全可以满足大规模分布式系统的需求。

雪花算法生成的 ID 是一个 64 位的长整型数字,里面的每一部分都编码了特定的信息,比如时间戳、机器 ID 和序列号,这让它既能保证全局唯一性,又不会出现重复。而且,ID 是按照时间递增的,能在数据写入时优化索引性能。

详解

雪花算法生成的 64 为二进制 Id,可以分为如下几个部分

位数 字段名 说明
1 bit 符号位 恒为 0,保证生成的 ID 是正数。
41 bits 时间戳 相对时间戳,记录从某个时间点到当前的毫秒数。41 位可以支持 69 年的时间范围, (2^41 / (1000 * 60 * 60 * 24 * 365) ≈ 69
10 bits 机器 ID 标识当前生成 ID 的机器或节点。10 位最多可以支持 1024 台机器
12 bits 序列号 同一毫秒内生成的序列号,用于避免冲突。12 位支持同一毫秒内生成 4096 个唯一 ID

雪花算法的特点可以用 "快、稳、准、省" 四个字来概括:

  • :生成 ID 的速度极快。每毫秒内可以生成数千到上万个唯一 ID,非常适合高并发环境,比如电商大促或者日志记录。
  • :生成的 ID 保证全局唯一,采用时间戳、机器 ID 和序列号的组合方式,不同机器生成的 ID 不会冲突,稳如泰山。
  • :ID 按照时间戳递增,趋势递增的特点非常适合数据库索引。用雪花算法生成的 ID 写入数据库时,可以减少索引分裂的风险,提升查询性能。
  • :ID 是 64 位的长整型数字,占用空间小,比使用 UUID 等其他方式更节省存储空间。

虽然,雪花算法有这么多优点,但也有一些不足支持:

  • 时钟依赖:雪花算法高度依赖系统时钟,如果机器时钟出现回拨,可能会导致 ID 冲突或者生成失败。所以,我们必须要对时钟回拨做出特殊处理,比如等待或者抛出异常等等。
  • 机器 ID 配置复杂:在大规模集群中,需要提前规划好机器 ID 的分配,避免重复或冲突。
  • 时间戳溢出问题:时间戳部分占 41 位,大约支持 69 年。如果系统运行时间超过这个范围,需要重新规划起始时间。

目前市面上有很多开源组件提供了雪花算法的实现,就没有必要造轮子了,比较常用的有:MyBatis-plusHutool

扩展

目前使用雪花算法生成全局唯一 ID 的开源有蛮多的,比较出名的有百度的 Uid-Generator 和美团 Leaf 项目。下面简单介绍下这两个组件。

由于大明哥对这两个组件没有研究过,所以在网上找了一篇写的比较好的文章贴出来,下面内容摘自:https://chinalhr.github.io/post/uid-generator-scheme/

百度uid-generator项目

UidGenerator是由百度技术部开发的,通过Java实现, 基于Snowflake算法的唯一ID生成器。UidGenerator以组件形式工作在应用项目中,支持自定义workerId位数和初始化策略, 从而适用于docker等容器化环境下实例自动重启、漂移等场景。

UidGenerator项目地址:https://github.com/baidu/uid-generator

数据结构

UidGenerator生成的ID默认结构如下:

  • sign(1bit):固定1bit符号标识,即生成的UID为正数。
  • delta seconds (28 bits):当前时间,相对于时间基点"2016-05-20"的增量值,单位:秒,最多可支持约8.7年
  • worker id (22 bits): 机器id,最多可支持约420w次机器启动。内置实现为在启动时由数据库分配,默认分配策略为用后即弃,后续可提供复用策略。
  • sequence (13 bits): 每秒下的并发序列,13 bits可支持每秒8192个并发。

项目结构

├── BitsAllocator.java      - Bit分配器(C)
 ├── UidGenerator.java      - UID生成器接口(I)
 ├── buffer
 │   ├── BufferPaddingExecutor.java    - 填充RingBuffer的执行器(C)
 │   ├── BufferedUidProvider.java    - RingBuffer中UID的生产者(C)
 │   ├── RejectedPutBufferHandler.java  - 拒绝Put到RingBuffer的处理器(C)
 │   ├── RejectedTakeBufferHandler.java  - 拒绝从RingBuffer中Take的处理器(C)
 │   └── RingBuffer.java      - RingBuffer数据结构实现类(C)
 ├── exception            - 相关的自定义异常类
 ├── impl
 │   ├── CachedUidGenerator.java    - 基于RingBuffer存储的UID生成器实现类(C)
 │   └── DefaultUidGenerator.java    - 默认UID生成器实现类(C)
 ├── utils                 - 工具类(日期,网络,线程池,枚举)
 └── worker                
     ├── DisposableWorkerIdAssigner.java  - 一次性的WorkerId分配器(C)
     ├── WorkerIdAssigner.java    - WorkerId分配器接口(I)
     ├── WorkerNodeType.java    - 工作节点类型,分为容器与实体两种(E)
     ├── dao
     │   └── WorkerNodeDAO.java    - WorkerNode数据库访问接口
     └── entity
         └── WorkerNodeEntity.java    - WorkerNode 实体类

项目依赖

主要依赖了springframework的核心包,mybatis,Apache commons工具类,logback

表结构:

CREATE TABLE WORKER_NODE(
ID BIGINT NOT NULL AUTO_INCREMENT COMMENT 'auto increment id',
HOST_NAME VARCHAR(64) NOT NULL COMMENT 'host name',
PORT VARCHAR(64) NOT NULL COMMENT 'port',
TYPE INT NOT NULL COMMENT 'node type: ACTUAL or CONTAINER',
LAUNCH_DATE DATE NOT NULL COMMENT 'launch date',
MODIFIED TIMESTAMP NOT NULL COMMENT 'modified time',
CREATED TIMESTAMP NOT NULL COMMENT 'created time',
PRIMARY KEY(ID)
) COMMENT='DB WorkerID Assigner for UID Generator',ENGINE = INNODB;

从上面的项目目录来看,UidGenerator提供了两种实现方式,分别是实时ID生成方式的DefaultUidGenerator与预生成ID生成方式的CachedUidGenerator。

DefaultUidGenerator实现方式

UidGenerator接口如下

public interface UidGenerator {

    /**
     * 获取UID
     */
    long getUID() throws UidGenerateException;

    /**
     * 解析格式化输出UID
     */
    String parseUID(long uid);
}

DefaultUidGenerator实现了UidGenerator与InitializingBean,具体实现流程如下:

  1. Bean初始化,配置的timeBits,workerBits,seqBits位数与epochStr日期,因为最多可支持约8.7年,所以最好设置epochStr值为合适的日期。
  2. afterPropertiesSet方法初始化BitsAllocator设置timeBits,workerBits,seqBits等位数信息,初始化workId(每次初始化往WORKER_NODE表插入一条记录,返回的主键ID就是workId,所以最多可支持约420w次机器启动)。
  3. 核心方法nextId获取UID,具体方式如下
protected synchronized long nextId() {
       //获取当前时间,单位秒,会校验是否超过最大的时间年限,超过则抛出异常
        long currentSecond = getCurrentSecond();

        // 如果出现时间回拨,就抛出异常
        if (currentSecond < lastSecond) {
            long refusedSeconds = lastSecond - currentSecond;
            throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
        }

        // 同一秒内序列号自增,
        if (currentSecond == lastSecond) {
            sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
            //超过最大序列,等待下一秒生成uid
            if (sequence == 0) {
                currentSecond = getNextSecond(lastSecond);
            }

        // 不同秒内序列号从0开始
        } else {
            sequence = 0L;
        }

        lastSecond = currentSecond;

        // bits位移操作生成UID
        return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
    }

总结:DefaultUidGenerator的实现与雪花算法实现对比改动较小,对生成ID结构的位数可自定义,对分布式环境下增减机器的WorkId分配更友好,但没有解决时间回拨问题,采用了简单的抛出异常的方式处理。

RingBuffer

RingBuffer本质上是一个数组,数组中每个项被称为slot。UidGenerator设计了两个RingBuffer,一个保存唯一ID,一个保存flag。RingBuffer的尺寸是2^n。

RingBuffer Of UID

RingBuffer Of UID是存储UID的RingBuffer 使用两个指针,Tail指针表示最新生成的UID,如果这个Tail指针追上了Cursor指针,此时RingBuffer已经满了,不允许再继续生成UID了。可以通过属性rejectedPutBufferHandler指定拒绝策略。

Cursor指针表示最后一个已经给消费的UID,如果Cursor指针追上了Tail指针,此时RingBuffer已经空了,不允许继续获取UID了。可以通过rejectedTakeBufferHandler指定拒绝策略。

RingBuffer Of Flag

RingBuffer Of Flag是存储Uid状态的RingBuffer,每个slot的值都是0或者1,0是CAN_PUT_FLAG(可以填充)的标志位,1是CAN_TAKE_FLAG(可以消费)的标识位。

配置参数

  <!-- 以下为可选配置, 如未指定将采用默认值 -->
    <!-- RingBuffer size扩容参数, 可提高UID生成的吞吐量. --> 
    <!-- 默认:3, 原bufferSize=8192, 扩容后bufferSize= 8192 << 3 = 65536 -->
    <property name="boostPower" value="3"></property> 
    
    <!-- 指定何时向RingBuffer中填充UID, 取值为百分比(0, 100), 默认为50 -->
    <!-- 举例: bufferSize=1024, paddingFactor=50 -> threshold=1024 * 50 / 100 = 512. -->
    <!-- 当环上可用UID数量 < 512时, 将自动对RingBuffer进行填充补全 -->
    <property name="paddingFactor" value="50"></property> 
    
    <!-- 另外一种RingBuffer填充时机, 在Schedule线程中, 周期性检查填充 -->
    <!-- 默认:不配置此项, 即不使用Schedule线程. 如需使用, 请指定Schedule线程时间间隔, 单位:秒 -->
    <property name="scheduleInterval" value="60"></property> 
    
    <!-- 拒绝策略: 当环已满, 无法继续填充时 -->
    <!-- 默认无需指定, 将丢弃Put操作, 仅日志记录. 如有特殊需求, 请实现RejectedPutBufferHandler接口(支持Lambda表达式) -->
    <property name="rejectedPutBufferHandler" ref="XxxxYourPutRejectPolicy"></property> 
    
    <!-- 拒绝策略: 当环已空, 无法继续获取时 -->
    <!-- 默认无需指定, 将记录日志, 并抛出UidGenerateException异常. 如有特殊需求, 请实现RejectedTakeBufferHandler接口(支持Lambda表达式) -->
    <property name="rejectedTakeBufferHandler" ref="XxxxYourTakeRejectPolicy"></property>

CacheLine补齐

RingBuffer的数据都是使用数组来存储的,在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题。tail和cursor变量如果直接用原生的AtomicLong类型,tail和cursor可能会缓存在同一个cacheLine中,多个线程读取该变量可能会引发CacheLine的RFO请求,反而影响性能,为了防止伪共享问题,填充了6个long类型的成员变量,加上long类型的value成员变量,刚好占满一个Cache Line(其中Java对象还有8byte的对象头),这种方式称之为CacheLine补齐。项目中使用了PaddedAtomicLong进行包装。

public class PaddedAtomicLong extends AtomicLong {
    private static final long serialVersionUID = -3415778863941386253L;

    /** Padded 6 long (48 bytes) */
    public volatile long p1, p2, p3, p4, p5, p6 = 7L;

    /**
     * Constructors from {@link AtomicLong}
     */
    public PaddedAtomicLong() {
        super();
    }

    public PaddedAtomicLong(long initialValue) {
        super(initialValue);
    }

    /**
     * To prevent GC optimizations for cleaning unused padded references
     */
    public long sumPaddingToPreventOptimization() {
        return p1 + p2 + p3 + p4 + p5 + p6;
    }

}

  • RingBuffer填充时机
    1. 初始化预填充:初始化时预先填充满
    2. 即时填充:take消费,即时检查剩余可用slot量是否小于设定阈值,小于则进行填充
    3. 周期填充:通过Schedule线程,定时补全空闲slots。

CachedUidGenerator实现方式

CachedUidGenerator是基于RingBuffer去实现的。

CachedUidGenerator继承自DefaultUidGenerator,实现了DisposableBean。

初始化

  1. 调用父类DefaultUidGenerator的afterPropertiesSet方法初始化BitsAllocator与WorkId。
  2. 根据配置的boostPower与paddingFactor初始化RingBuffer,根据配置的scheduleInterval初始化bufferPaddingExecutor,初始化拒绝策略rejectedPutBufferHandler与rejectedTakeBufferHandler。
  3. 初始化填满RingBuffer中所有slot(初始化填充UID),此处调用bufferPaddingExecuto的paddingBuffer()方法。
  4. 开启Ringbuffer补丁线程(前提是配置了属性scheduleInterval)

RingBuffer.put(填充)

CachedUidGenerator生成UID的方式主要是通过paddingBuffer方法实现的,生成流程如下图所示,主要关注点是在满足填充新的唯一ID条件时,通过时间值递增得到新的时间值(lastSecond.incrementAndGet()),而不是System.currentTimeMillis()这种方式,解决了运行时时间回拨问题。

public void paddingBuffer() {
        LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);

        // is still running
        if (!running.compareAndSet(false, true)) {
            LOGGER.info("Padding buffer is still running. {}", ringBuffer);
            return;
        }
    
        boolean isFullRingBuffer = false;
        while (!isFullRingBuffer) {
            //调用nextIdsForOneSecond方法获取UID list,注意此处的时间使用lastSecond.incrementAndGet()方法时间自增的方式获取而不是System.currentTimeMillis()获取,解决了运行时时间回拨问题
            List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
            for (Long uid : uidList) {
                //调用put方法加入RingBuffer of UID中
                isFullRingBuffer = !ringBuffer.put(uid);
                if (isFullRingBuffer) {
                    break;
                }
            }
        }

        // not running now
        running.compareAndSet(true, false);
        LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
    }
 
 /**
     * 指定秒内获取UID List,Size为max sequence(0~max sequence)
     * 
     * @param currentSecond
     * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1
     */
    protected List<Long> nextIdsForOneSecond(long currentSecond) {
        // Initialize result list size of (max sequence + 1)
        int listSize = (int) bitsAllocator.getMaxSequence() + 1;
        List<Long> uidList = new ArrayList<>(listSize);

        // Allocate the first sequence of the second, the others can be calculated with the offset
        long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
        for (int offset = 0; offset < listSize; offset++) {
            uidList.add(firstSeqUid + offset);
        }

        return uidList;
    }
    
    
 public synchronized boolean put(long uid) {
      //获取Tail与Cursor指针
        long currentTail = tail.get();
        long currentCursor = cursor.get();

        // 1. 判断Ring是否已满,满了执行Put拒绝策略
        long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
        if (distance == bufferSize - 1) {
            rejectedPutHandler.rejectPutBuffer(this, uid);
            return false;
        }

        // 2. check标志是否为CAN_PUT_FLAG
        int nextTailIndex = calSlotIndex(currentTail + 1);
        if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
            rejectedPutHandler.rejectPutBuffer(this, uid);
            return false;
        }

        // 3. 将UID插入下一个slots中
        // 4. 设置标志为CAN_TAKE_FLAG
        // 5. tail指针+1
        slots[nextTailIndex] = uid;
        flags[nextTailIndex].set(CAN_TAKE_FLAG);
        tail.incrementAndGet();
        return true;
    }

RingBuffer.take(取值)

CachedUidGenerator获取UID通过RingBuffer.take()方法直接从RingBuffer中取出,获取流程如下图所示

public long take() {
        // 获取当前Cursor指针与下一个可用Cursor指针并进行check
        long currentCursor = cursor.get();
        long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
        Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");

        // 1. 判断如果达到阈值,则以异步模式触发填充
        long currentTail = tail.get();
        if (currentTail - nextCursor < paddingThreshold) {
            LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
                    nextCursor, currentTail - nextCursor);
            bufferPaddingExecutor.asyncPadding();
        }

        // 2. 当前Cursor指针等于下一个可用Cursor指针,Ring为空,执行Take拒绝策略
        if (nextCursor == currentCursor) {
            rejectedTakeHandler.rejectTakeBuffer(this);
        }

        // 3. 检查下一个slot是否为CAN_TAKE_FLAG
        int nextCursorIndex = calSlotIndex(nextCursor);
        Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");

        // 4. 从slot中获取UID
        // 5. 设置标志为CAN_PUT_FLAG.
        long uid = slots[nextCursorIndex];
        flags[nextCursorIndex].set(CAN_PUT_FLAG);
        return uid;
    }

总结

百度uid-generator项目针对snowflake算法落地进行了一些改造,主要是对workId的获取上,对分布式集群环境下面,实例自动伸缩,docker容器化的场景,通过每次重启获取新的workId进行优化。提供了DefaultUidGenerator实时生成UID与CachedUidGenerator预生成UID两种方式。CachedUidGenerator通过借用未来时间来解决雪花算法sequence存在的并发限制,而且通过时间值递增的方式解决雪花算法存在的时间回拨问题。

美团Leaf项目

Leaf是美团技术部推出的一个分布式ID生成服务,通过Java实现, 基于数据库号段模式和snowflake算法模式的UID获取。

Leaf项目地址:https://github.com/Meituan-Dianping/Leaf

项目结构

这里主要关注core模块:

├── common                - 公共包
│   ├── CheckVO.java          
│   ├── PropertyFactory.java      - leaf.properties配置获取
│   ├── Result.java            - 统一返回值
│   ├── Status.java            - 状态值
│   ├── Utils.java            - 工具类
│   └── ZeroIDGen.java          - 生成0号ID  
├── IDGen.java              - ID生成接口(I)
├── segment                
│   ├── dao                - 号段相关Dao操作
│   │   ├── IDAllocDao.java
│   │   ├── IDAllocMapper.java
│   │   └── impl
│   │       └── IDAllocDaoImpl.java
│   ├── model              - 号段相关model
│   │   ├── LeafAlloc.java
│   │   ├── SegmentBuffer.java
│   │   └── Segment.java
│   └── SegmentIDGenImpl.java      - 号段相关ID生成实现
└── snowflake              
    ├── exception
    │   ├── CheckLastTimeException.java
    │   ├── CheckOtherNodeException.jav
    │   └── ClockGoBackException.java
    ├── SnowflakeIDGenImpl.java      - SnowFlakeID生成实现
    └── SnowflakeZookeeperHolder.java  - Zookeeper相关操作类

Leaf-segment号段模式

Leaf-segment号段模式是对上文的基于数据库号段模式的一种实现与优化。 每次获取一个segmen号段的值, 减轻数据库的压力 。利用 biz_tag字段来区分 业务,隔离影响,后续可以依据 biz_tag进行分库分表。

数据库表结构

CREATE TABLE `leaf_alloc` (
  `biz_tag` varchar(128) NOT NULL DEFAULT '' COMMENT '业务key',
  `max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '当前已经分配了的最大id',
  `step` int(11) NOT NULL COMMENT '初始步长,也是动态调整的最小步长',
  `description` varchar(256) DEFAULT NULL COMMENT '业务key的描述',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据库维护的更新时间',
  PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

可知,在进行UID获取之前,需要先插入数据进行biz_tag的设置与其他数据的初始化。

实现方式

Leaf-segment的实现类是SegmentIDGenImpl

  • 初始化
@Override
    public boolean init() {
        //同步biz_tag到cache中
        updateCacheFromDb();
        initOK = true;
        //开启定时任务定时调度updateCacheFromDb()
        updateCacheFromDbAtEveryMinute();
        return initOK;
    }

private void updateCacheFromDb() {
        //获取数据库表中所有的biz_tag
            List<String> dbTags = dao.getAllTags();
            if (dbTags == null || dbTags.isEmpty()) {
                return;
            }
        //缓存的biz_tag
            List<String> cacheTags = new ArrayList<String>(cache.keySet());
        Set<String> insertTagsSet = new HashSet<>(dbTags);
            Set<String> removeTagsSet = new HashSet<>(cacheTags);
            //将db中新加的biz_tags加入cache中,并初始化SegmentBuffer
            for(int i = 0; i < cacheTags.size(); i++){
                String tmp = cacheTags.get(i);
                if(insertTagsSet.contains(tmp)){
                    insertTagsSet.remove(tmp);
                }
            }
            for (String tag : insertTagsSet) {
                SegmentBuffer buffer = new SegmentBuffer();
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();
                segment.setValue(new AtomicLong(0));
                segment.setMax(0);
                segment.setStep(0);
                cache.put(tag, buffer);
            }
            //将db中被删除的biz_tag(已失效)从cache删除
            for(int i = 0; i < dbTags.size(); i++){
                String tmp = dbTags.get(i);
                if(removeTagsSet.contains(tmp)){
                    removeTagsSet.remove(tmp);
                }
            }
            for (String tag : removeTagsSet) {
                cache.remove(tag);
            }
      
    }

SegmentIDGenImpl的init方法可以看出主要是对biz_tag的同步,同步到应用的缓存中并且初始化对应的Segment Buffer。并且会开启调度线程进行定时的cache刷新。

  • 双Buffer

初始化cache的时候,会对每一个biz_tag初始化一个SegmentBuffer,通过其代码的实现可以看SegmentBuffer 中包含了一个号段数组,包含两个 Segment,第一个Segment已下发10%时,如果下一个Segment未更新,则另启一个更新线程去更新下一个Segment,这就是Leaf的双Buffer特性。

这样做有两个好处:

一是segment长度如果设置为服务高峰期发号QPS的600倍(10分钟),即使DB宕机,Leaf仍能持续发号10-20分钟不受影响。

二是每次请求都会判断下个号段的状态,从而更新此号段,避免偶发网络抖动影响下个号段的更新。

ublic class SegmentBuffer {
    private String key;// 数据库的biz_tag
    private Segment[] segments; //双buffer
    private volatile int currentPos; //当前的使用的segment的index
    private volatile boolean nextReady; //下一个segment是否处于可切换状态
    private volatile boolean initOk; //是否初始化完成
    private final AtomicBoolean threadRunning; //线程是否在运行中
    private final ReadWriteLock lock;

    private volatile int step;// 动态调整的step
    private volatile int minStep; // 最小step
    private volatile long updateTimestamp;// 更新时间戳
}

public class Segment {
    private AtomicLong value = new AtomicLong(0);
    private volatile long max;
    private volatile int step;
    private SegmentBuffer buffer;
}

  • UID取值

UID通过调用get方法获取,具体实现逻辑大致如下

public Result get(final String key) {
        //SegmentIDGenImpl是否初始化
        ...
        //cache中是否已经缓存了对应biz_id的数据
        if (cache.containsKey(key)) {
            //检查SegmentBuffer中的Segment是否初始化
                    if (!buffer.isInitOk()) {
                            //初始化Segment
                            updateSegmentFromDb(key, buffer.getCurrent());
                            buffer.setInitOk(true);
                }
        }
            return getIdFromSegmentBuffer(cache.get(key));
}

updateSegmentFromDb方法从数据库表中读取数据更新SegmentBuffer中的Segment。在第一次初始号段与第二次双buffer的异步准备另一个号段会被调用,注意前两次调用之后后续双buffer的异步准备另一个号段会动态调整申请号段的区间大小,调整规则主要跟号段申请频率有关,具体逻辑大致如下:

public void updateSegmentFromDb(String key, Segment segment) {
        StopWatch sw = new Slf4JStopWatch();
        SegmentBuffer buffer = segment.getBuffer();
        LeafAlloc leafAlloc;
        /**
         * 第一次初始号段
         * 1. 更新数据库中key对应记录的maxId(maxId=maxId+step)
         * 2. 设置buffer的Step(步长)与MinStep
         */
        if (!buffer.isInitOk()) {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
        }
        /**
         * 第二次初始号段(双Buffer异步准备)
         * 1. 更新数据库中key对应记录的maxId(maxId=maxId+step)
         * 2. 设置buffer的Step(步长)与MinStep
         */
        else if (buffer.getUpdateTimestamp() == 0) {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
        }
        /**
         * 后续双Buffer异步准备
         * 动态调整step
         *      1. duration < 15 分钟 : step 变为原来的2倍, 最大为 MAX_STEP
         *      2. 15分钟 <= duration < 30分钟 : nothing
         *      3. duration >= 30 分钟 : 缩小step, 最小为DB中配置的step
         * 更新数据库中key对应记录的maxId(maxId=maxId+step)
         * 设置buffer的Step(步长)与MinStep
         */
        else {
            //更新时间差
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();
            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    //do nothing
                } else {
                    nextStep = nextStep * 2;
                }
            } else if (duration < SEGMENT_DURATION * 2) {
                //do nothing with nextStep
            } else {
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
            logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
            LeafAlloc temp = new LeafAlloc();
            temp.setKey(key);
            temp.setStep(nextStep);
            leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(nextStep);
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
        }

        /**
         * 准备当前Segment号段,设置value初始值与max/step值为buffer的max与step
         */
        long value = leafAlloc.getMaxId() - buffer.getStep();
        segment.getValue().set(value);
        segment.setMax(leafAlloc.getMaxId());
        segment.setStep(buffer.getStep());
        sw.stop("updateSegmentFromDb", key + " " + segment);
    }

最后的getIdFromSegmentBuffer方法会调用segment.getValue().getAndIncrement()从segment中获取value得到当前的UID返回并进行自增,这里在获取之前会判断当前Segment已经使用超过10%,如果超过会异步准备双buffer的另一个Segment。如果当前号段已经用完,切换另一个Segment号段使用。流程图如下图所示:

Leaf-snowflake模式

Leaf-snowflake数据结构上沿用了snowflake的设计,正数位(占1比特)+ 时间戳(占41比特)+ 机器ID(占5比特)+ 机房ID(占5比特)+ 自增值(占12比特),主要针对workId的生成与时间回拨问题进行了优化处理。

实现方式

  • 初始化WorkId生成

Leaf-snowflake依靠Zookeeper生成workId[机器ID+ 机房ID],Leaf中workId是基于ZooKeeper的顺序Id来生成的,每个应用在使用Leaf-snowflake时,启动时都会都在Zookeeper中生成一个顺序Id,相当于一台机器对应一个顺序节点,也就是一个workId。

启动步骤如下:

  1. 启动Leaf-snowflake服务,连接Zookeeper,在leaf_forever父节点下检查自己是否已经注册过(是否有该顺序子节点)。
  2. 如果有注册过直接取回自己的workerID(zk顺序节点生成的int类型ID号),启动服务。
  3. 如果没有注册过,就在该父节点下面创建一个持久顺序节点,创建成功后取回顺序号当做自己的workerID号,启动服务。

弱化对Zookeeper的依赖:Leaf-snowflake除了每次会去ZooKeeper拿数据以外,也会在本机文件系统上缓存一个workerID文件。当ZooKeeper出现问题,而且机器需要重启,能保证服务能够正常启动,做到了对三方组件的弱依赖。一定程度上提高了SLA。

  • 时间回拨问题解决

为了解决Snowflake的时间回拨问题,leaf主要从两个地方进行优化,一个是启动时对自身系统时间做校验,主要比较对象为系统节点历史记录的时间与其余节点的系统时间。启动流程如下:

服务启动时首先检查自己是否写过ZooKeeper leaf_forever节点:

  1. 若写过,则用自身系统时间与leaf_forever/${self}节点记录时间做比较,若小于leaf_forever/${self}时间则认为机器时间发生了大步长回拨,服务启动失败并报警。
  2. 若未写过,证明是新服务节点,直接创建持久节点leaf_forever/${self}并写入自身系统时间,接下来综合对比其余Leaf节点的系统时间来判断自身系统时间是否准确,具体做法是取leaf_temporary下的所有临时节点(所有运行中的Leaf-snowflake节点)的服务IP:Port,然后通过RPC请求得到所有节点的系统时间,计算sum(time)/nodeSize。
  3. 若abs( 系统时间-sum(time)/nodeSize ) < 阈值,认为当前系统时间准确,正常启动服务,同时写临时节点leaf_temporary/${self} 维持租约。
  4. 否则认为本机系统时间发生大步长偏移,启动失败并报警。
  5. 每隔一段时间(3s)上报自身系统时间写入leaf_forever/${self}。

二是运行时对时间回拨做了等待处理,时间偏差大小小于5ms,则等待两倍时间,还是小于则做异常处理,具体逻辑如下。

//发生了回拨,此刻时间小于上次发号时间
 if (timestamp < lastTimestamp) {
          
            long offset = lastTimestamp - timestamp;
            if (offset <= 5) {
                try {
                  //时间偏差大小小于5ms,则等待两倍时间
                    wait(offset << 1);//wait
                    timestamp = timeGen();
                    if (timestamp < lastTimestamp) {
                       //还是小于,抛异常并上报
                        throwClockBackwardsEx(timestamp);
                      }    
                } catch (InterruptedException e) {  
                   throw  e;
                }
            } else {
                //throw
                throwClockBackwardsEx(timestamp);
            }
        }

总结

Leaf-segment是基于数据库号段模式的优化,规避网络波动:通过双Buffer的预生成方式规避了网络抖动对UID生成的影响;容灾性高:DB不可用情况下一段时间依旧可以进行发号(时间与segment步长有关)。

Leaf-snowflake是基于Snowflake算法的优化,在workId的获取上,对分布式集群环境下面,实例自动伸缩场景,通过对zookeeper的弱依赖实现WorkId的获取。针对时间回拨问题处理对比百度uid-generator比较粗暴,做了启动校验与时间等待的处理。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文