回答
雪花算法(Snowflake Algorithm)是分布式环境下非常经典的一种 分布式唯一 ID 生成算法,由 Twitter 的工程师设计出来的。它的核心目标是 高效生成全局唯一的、趋势递增的 ID,而且生成速度快、并发性能好,完全可以满足大规模分布式系统的需求。
雪花算法生成的 ID 是一个 64 位的长整型数字,里面的每一部分都编码了特定的信息,比如时间戳、机器 ID 和序列号,这让它既能保证全局唯一性,又不会出现重复。而且,ID 是按照时间递增的,能在数据写入时优化索引性能。
详解
雪花算法生成的 64 为二进制 Id,可以分为如下几个部分
位数 | 字段名 | 说明 |
---|---|---|
1 bit | 符号位 | 恒为 0,保证生成的 ID 是正数。 |
41 bits | 时间戳 | 相对时间戳,记录从某个时间点到当前的毫秒数。41 位可以支持 69 年的时间范围, (2^41 / (1000 * 60 * 60 * 24 * 365) ≈ 69 ) |
10 bits | 机器 ID | 标识当前生成 ID 的机器或节点。10 位最多可以支持 1024 台机器 |
12 bits | 序列号 | 同一毫秒内生成的序列号,用于避免冲突。12 位支持同一毫秒内生成 4096 个唯一 ID |
雪花算法的特点可以用 "快、稳、准、省" 四个字来概括:
- 快:生成 ID 的速度极快。每毫秒内可以生成数千到上万个唯一 ID,非常适合高并发环境,比如电商大促或者日志记录。
- 稳:生成的 ID 保证全局唯一,采用时间戳、机器 ID 和序列号的组合方式,不同机器生成的 ID 不会冲突,稳如泰山。
- 准:ID 按照时间戳递增,趋势递增的特点非常适合数据库索引。用雪花算法生成的 ID 写入数据库时,可以减少索引分裂的风险,提升查询性能。
- 省:ID 是 64 位的长整型数字,占用空间小,比使用 UUID 等其他方式更节省存储空间。
虽然,雪花算法有这么多优点,但也有一些不足支持:
- 时钟依赖:雪花算法高度依赖系统时钟,如果机器时钟出现回拨,可能会导致 ID 冲突或者生成失败。所以,我们必须要对时钟回拨做出特殊处理,比如等待或者抛出异常等等。
- 机器 ID 配置复杂:在大规模集群中,需要提前规划好机器 ID 的分配,避免重复或冲突。
- 时间戳溢出问题:时间戳部分占 41 位,大约支持 69 年。如果系统运行时间超过这个范围,需要重新规划起始时间。
目前市面上有很多开源组件提供了雪花算法的实现,就没有必要造轮子了,比较常用的有:MyBatis-plus
、Hutool
。
扩展
目前使用雪花算法生成全局唯一 ID 的开源有蛮多的,比较出名的有百度的 Uid-Generator
和美团 Leaf
项目。下面简单介绍下这两个组件。
由于大明哥对这两个组件没有研究过,所以在网上找了一篇写的比较好的文章贴出来,下面内容摘自:https://chinalhr.github.io/post/uid-generator-scheme/
百度uid-generator项目
UidGenerator是由百度技术部开发的,通过Java实现, 基于Snowflake算法的唯一ID生成器。UidGenerator以组件形式工作在应用项目中,支持自定义workerId位数和初始化策略, 从而适用于docker等容器化环境下实例自动重启、漂移等场景。
UidGenerator项目地址:https://github.com/baidu/uid-generator
数据结构
UidGenerator生成的ID默认结构如下:
- sign(1bit):固定1bit符号标识,即生成的UID为正数。
- delta seconds (28 bits):当前时间,相对于时间基点"2016-05-20"的增量值,单位:秒,最多可支持约8.7年
- worker id (22 bits): 机器id,最多可支持约420w次机器启动。内置实现为在启动时由数据库分配,默认分配策略为用后即弃,后续可提供复用策略。
- sequence (13 bits): 每秒下的并发序列,13 bits可支持每秒8192个并发。
项目结构
├── BitsAllocator.java - Bit分配器(C)
├── UidGenerator.java - UID生成器接口(I)
├── buffer
│ ├── BufferPaddingExecutor.java - 填充RingBuffer的执行器(C)
│ ├── BufferedUidProvider.java - RingBuffer中UID的生产者(C)
│ ├── RejectedPutBufferHandler.java - 拒绝Put到RingBuffer的处理器(C)
│ ├── RejectedTakeBufferHandler.java - 拒绝从RingBuffer中Take的处理器(C)
│ └── RingBuffer.java - RingBuffer数据结构实现类(C)
├── exception - 相关的自定义异常类
├── impl
│ ├── CachedUidGenerator.java - 基于RingBuffer存储的UID生成器实现类(C)
│ └── DefaultUidGenerator.java - 默认UID生成器实现类(C)
├── utils - 工具类(日期,网络,线程池,枚举)
└── worker
├── DisposableWorkerIdAssigner.java - 一次性的WorkerId分配器(C)
├── WorkerIdAssigner.java - WorkerId分配器接口(I)
├── WorkerNodeType.java - 工作节点类型,分为容器与实体两种(E)
├── dao
│ └── WorkerNodeDAO.java - WorkerNode数据库访问接口
└── entity
└── WorkerNodeEntity.java - WorkerNode 实体类
项目依赖:
主要依赖了springframework的核心包,mybatis,Apache commons工具类,logback
表结构:
CREATE TABLE WORKER_NODE(
ID BIGINT NOT NULL AUTO_INCREMENT COMMENT 'auto increment id',
HOST_NAME VARCHAR(64) NOT NULL COMMENT 'host name',
PORT VARCHAR(64) NOT NULL COMMENT 'port',
TYPE INT NOT NULL COMMENT 'node type: ACTUAL or CONTAINER',
LAUNCH_DATE DATE NOT NULL COMMENT 'launch date',
MODIFIED TIMESTAMP NOT NULL COMMENT 'modified time',
CREATED TIMESTAMP NOT NULL COMMENT 'created time',
PRIMARY KEY(ID)
) COMMENT='DB WorkerID Assigner for UID Generator',ENGINE = INNODB;
从上面的项目目录来看,UidGenerator提供了两种实现方式,分别是实时ID生成方式的DefaultUidGenerator与预生成ID生成方式的CachedUidGenerator。
DefaultUidGenerator实现方式
UidGenerator接口如下
public interface UidGenerator {
/**
* 获取UID
*/
long getUID() throws UidGenerateException;
/**
* 解析格式化输出UID
*/
String parseUID(long uid);
}
DefaultUidGenerator实现了UidGenerator与InitializingBean,具体实现流程如下:
- Bean初始化,配置的timeBits,workerBits,seqBits位数与epochStr日期,因为最多可支持约8.7年,所以最好设置epochStr值为合适的日期。
- afterPropertiesSet方法初始化BitsAllocator设置timeBits,workerBits,seqBits等位数信息,初始化workId(每次初始化往WORKER_NODE表插入一条记录,返回的主键ID就是workId,所以最多可支持约420w次机器启动)。
- 核心方法nextId获取UID,具体方式如下
protected synchronized long nextId() {
//获取当前时间,单位秒,会校验是否超过最大的时间年限,超过则抛出异常
long currentSecond = getCurrentSecond();
// 如果出现时间回拨,就抛出异常
if (currentSecond < lastSecond) {
long refusedSeconds = lastSecond - currentSecond;
throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
}
// 同一秒内序列号自增,
if (currentSecond == lastSecond) {
sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
//超过最大序列,等待下一秒生成uid
if (sequence == 0) {
currentSecond = getNextSecond(lastSecond);
}
// 不同秒内序列号从0开始
} else {
sequence = 0L;
}
lastSecond = currentSecond;
// bits位移操作生成UID
return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
}
总结:DefaultUidGenerator的实现与雪花算法实现对比改动较小,对生成ID结构的位数可自定义,对分布式环境下增减机器的WorkId分配更友好,但没有解决时间回拨问题,采用了简单的抛出异常的方式处理。
RingBuffer
RingBuffer本质上是一个数组,数组中每个项被称为slot。UidGenerator设计了两个RingBuffer,一个保存唯一ID,一个保存flag。RingBuffer的尺寸是2^n。
RingBuffer Of UID
RingBuffer Of UID是存储UID的RingBuffer 使用两个指针,Tail指针表示最新生成的UID,如果这个Tail指针追上了Cursor指针,此时RingBuffer已经满了,不允许再继续生成UID了。可以通过属性rejectedPutBufferHandler指定拒绝策略。
Cursor指针表示最后一个已经给消费的UID,如果Cursor指针追上了Tail指针,此时RingBuffer已经空了,不允许继续获取UID了。可以通过rejectedTakeBufferHandler指定拒绝策略。
RingBuffer Of Flag
RingBuffer Of Flag是存储Uid状态的RingBuffer,每个slot的值都是0或者1,0是CAN_PUT_FLAG(可以填充)的标志位,1是CAN_TAKE_FLAG(可以消费)的标识位。
配置参数
<!-- 以下为可选配置, 如未指定将采用默认值 -->
<!-- RingBuffer size扩容参数, 可提高UID生成的吞吐量. -->
<!-- 默认:3, 原bufferSize=8192, 扩容后bufferSize= 8192 << 3 = 65536 -->
<property name="boostPower" value="3"></property>
<!-- 指定何时向RingBuffer中填充UID, 取值为百分比(0, 100), 默认为50 -->
<!-- 举例: bufferSize=1024, paddingFactor=50 -> threshold=1024 * 50 / 100 = 512. -->
<!-- 当环上可用UID数量 < 512时, 将自动对RingBuffer进行填充补全 -->
<property name="paddingFactor" value="50"></property>
<!-- 另外一种RingBuffer填充时机, 在Schedule线程中, 周期性检查填充 -->
<!-- 默认:不配置此项, 即不使用Schedule线程. 如需使用, 请指定Schedule线程时间间隔, 单位:秒 -->
<property name="scheduleInterval" value="60"></property>
<!-- 拒绝策略: 当环已满, 无法继续填充时 -->
<!-- 默认无需指定, 将丢弃Put操作, 仅日志记录. 如有特殊需求, 请实现RejectedPutBufferHandler接口(支持Lambda表达式) -->
<property name="rejectedPutBufferHandler" ref="XxxxYourPutRejectPolicy"></property>
<!-- 拒绝策略: 当环已空, 无法继续获取时 -->
<!-- 默认无需指定, 将记录日志, 并抛出UidGenerateException异常. 如有特殊需求, 请实现RejectedTakeBufferHandler接口(支持Lambda表达式) -->
<property name="rejectedTakeBufferHandler" ref="XxxxYourTakeRejectPolicy"></property>
CacheLine补齐
RingBuffer的数据都是使用数组来存储的,在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题。tail和cursor变量如果直接用原生的AtomicLong类型,tail和cursor可能会缓存在同一个cacheLine中,多个线程读取该变量可能会引发CacheLine的RFO请求,反而影响性能,为了防止伪共享问题,填充了6个long类型的成员变量,加上long类型的value成员变量,刚好占满一个Cache Line(其中Java对象还有8byte的对象头),这种方式称之为CacheLine补齐。项目中使用了PaddedAtomicLong进行包装。
public class PaddedAtomicLong extends AtomicLong {
private static final long serialVersionUID = -3415778863941386253L;
/** Padded 6 long (48 bytes) */
public volatile long p1, p2, p3, p4, p5, p6 = 7L;
/**
* Constructors from {@link AtomicLong}
*/
public PaddedAtomicLong() {
super();
}
public PaddedAtomicLong(long initialValue) {
super(initialValue);
}
/**
* To prevent GC optimizations for cleaning unused padded references
*/
public long sumPaddingToPreventOptimization() {
return p1 + p2 + p3 + p4 + p5 + p6;
}
}
- RingBuffer填充时机
- 初始化预填充:初始化时预先填充满
- 即时填充:take消费,即时检查剩余可用slot量是否小于设定阈值,小于则进行填充
- 周期填充:通过Schedule线程,定时补全空闲slots。
CachedUidGenerator实现方式
CachedUidGenerator是基于RingBuffer去实现的。
CachedUidGenerator继承自DefaultUidGenerator,实现了DisposableBean。
初始化
- 调用父类DefaultUidGenerator的afterPropertiesSet方法初始化BitsAllocator与WorkId。
- 根据配置的boostPower与paddingFactor初始化RingBuffer,根据配置的scheduleInterval初始化bufferPaddingExecutor,初始化拒绝策略rejectedPutBufferHandler与rejectedTakeBufferHandler。
- 初始化填满RingBuffer中所有slot(初始化填充UID),此处调用bufferPaddingExecuto的paddingBuffer()方法。
- 开启Ringbuffer补丁线程(前提是配置了属性scheduleInterval)
RingBuffer.put(填充)
CachedUidGenerator生成UID的方式主要是通过paddingBuffer方法实现的,生成流程如下图所示,主要关注点是在满足填充新的唯一ID条件时,通过时间值递增得到新的时间值(lastSecond.incrementAndGet()),而不是System.currentTimeMillis()这种方式,解决了运行时时间回拨问题。
public void paddingBuffer() {
LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
// is still running
if (!running.compareAndSet(false, true)) {
LOGGER.info("Padding buffer is still running. {}", ringBuffer);
return;
}
boolean isFullRingBuffer = false;
while (!isFullRingBuffer) {
//调用nextIdsForOneSecond方法获取UID list,注意此处的时间使用lastSecond.incrementAndGet()方法时间自增的方式获取而不是System.currentTimeMillis()获取,解决了运行时时间回拨问题
List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
for (Long uid : uidList) {
//调用put方法加入RingBuffer of UID中
isFullRingBuffer = !ringBuffer.put(uid);
if (isFullRingBuffer) {
break;
}
}
}
// not running now
running.compareAndSet(true, false);
LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
}
/**
* 指定秒内获取UID List,Size为max sequence(0~max sequence)
*
* @param currentSecond
* @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1
*/
protected List<Long> nextIdsForOneSecond(long currentSecond) {
// Initialize result list size of (max sequence + 1)
int listSize = (int) bitsAllocator.getMaxSequence() + 1;
List<Long> uidList = new ArrayList<>(listSize);
// Allocate the first sequence of the second, the others can be calculated with the offset
long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
for (int offset = 0; offset < listSize; offset++) {
uidList.add(firstSeqUid + offset);
}
return uidList;
}
public synchronized boolean put(long uid) {
//获取Tail与Cursor指针
long currentTail = tail.get();
long currentCursor = cursor.get();
// 1. 判断Ring是否已满,满了执行Put拒绝策略
long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
if (distance == bufferSize - 1) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}
// 2. check标志是否为CAN_PUT_FLAG
int nextTailIndex = calSlotIndex(currentTail + 1);
if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}
// 3. 将UID插入下一个slots中
// 4. 设置标志为CAN_TAKE_FLAG
// 5. tail指针+1
slots[nextTailIndex] = uid;
flags[nextTailIndex].set(CAN_TAKE_FLAG);
tail.incrementAndGet();
return true;
}
RingBuffer.take(取值)
CachedUidGenerator获取UID通过RingBuffer.take()方法直接从RingBuffer中取出,获取流程如下图所示
public long take() {
// 获取当前Cursor指针与下一个可用Cursor指针并进行check
long currentCursor = cursor.get();
long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");
// 1. 判断如果达到阈值,则以异步模式触发填充
long currentTail = tail.get();
if (currentTail - nextCursor < paddingThreshold) {
LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
nextCursor, currentTail - nextCursor);
bufferPaddingExecutor.asyncPadding();
}
// 2. 当前Cursor指针等于下一个可用Cursor指针,Ring为空,执行Take拒绝策略
if (nextCursor == currentCursor) {
rejectedTakeHandler.rejectTakeBuffer(this);
}
// 3. 检查下一个slot是否为CAN_TAKE_FLAG
int nextCursorIndex = calSlotIndex(nextCursor);
Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");
// 4. 从slot中获取UID
// 5. 设置标志为CAN_PUT_FLAG.
long uid = slots[nextCursorIndex];
flags[nextCursorIndex].set(CAN_PUT_FLAG);
return uid;
}
总结
百度uid-generator项目针对snowflake算法落地进行了一些改造,主要是对workId的获取上,对分布式集群环境下面,实例自动伸缩,docker容器化的场景,通过每次重启获取新的workId进行优化。提供了DefaultUidGenerator实时生成UID与CachedUidGenerator预生成UID两种方式。CachedUidGenerator通过借用未来时间来解决雪花算法sequence存在的并发限制,而且通过时间值递增的方式解决雪花算法存在的时间回拨问题。
美团Leaf项目
Leaf是美团技术部推出的一个分布式ID生成服务,通过Java实现, 基于数据库号段模式和snowflake算法模式的UID获取。
Leaf项目地址:https://github.com/Meituan-Dianping/Leaf
项目结构
这里主要关注core模块:
├── common - 公共包
│ ├── CheckVO.java
│ ├── PropertyFactory.java - leaf.properties配置获取
│ ├── Result.java - 统一返回值
│ ├── Status.java - 状态值
│ ├── Utils.java - 工具类
│ └── ZeroIDGen.java - 生成0号ID
├── IDGen.java - ID生成接口(I)
├── segment
│ ├── dao - 号段相关Dao操作
│ │ ├── IDAllocDao.java
│ │ ├── IDAllocMapper.java
│ │ └── impl
│ │ └── IDAllocDaoImpl.java
│ ├── model - 号段相关model
│ │ ├── LeafAlloc.java
│ │ ├── SegmentBuffer.java
│ │ └── Segment.java
│ └── SegmentIDGenImpl.java - 号段相关ID生成实现
└── snowflake
├── exception
│ ├── CheckLastTimeException.java
│ ├── CheckOtherNodeException.jav
│ └── ClockGoBackException.java
├── SnowflakeIDGenImpl.java - SnowFlakeID生成实现
└── SnowflakeZookeeperHolder.java - Zookeeper相关操作类
Leaf-segment号段模式
Leaf-segment号段模式是对上文的基于数据库号段模式的一种实现与优化。 每次获取一个segmen号段的值, 减轻数据库的压力 。利用 biz_tag字段来区分 业务,隔离影响,后续可以依据 biz_tag进行分库分表。
数据库表结构
CREATE TABLE `leaf_alloc` (
`biz_tag` varchar(128) NOT NULL DEFAULT '' COMMENT '业务key',
`max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '当前已经分配了的最大id',
`step` int(11) NOT NULL COMMENT '初始步长,也是动态调整的最小步长',
`description` varchar(256) DEFAULT NULL COMMENT '业务key的描述',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据库维护的更新时间',
PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
可知,在进行UID获取之前,需要先插入数据进行biz_tag的设置与其他数据的初始化。
实现方式
Leaf-segment的实现类是SegmentIDGenImpl
- 初始化
@Override
public boolean init() {
//同步biz_tag到cache中
updateCacheFromDb();
initOK = true;
//开启定时任务定时调度updateCacheFromDb()
updateCacheFromDbAtEveryMinute();
return initOK;
}
private void updateCacheFromDb() {
//获取数据库表中所有的biz_tag
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
//缓存的biz_tag
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//将db中新加的biz_tags加入cache中,并初始化SegmentBuffer
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
}
//将db中被删除的biz_tag(已失效)从cache删除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
}
}
SegmentIDGenImpl的init方法可以看出主要是对biz_tag的同步,同步到应用的缓存中并且初始化对应的Segment Buffer。并且会开启调度线程进行定时的cache刷新。
- 双Buffer
初始化cache的时候,会对每一个biz_tag初始化一个SegmentBuffer,通过其代码的实现可以看SegmentBuffer 中包含了一个号段数组,包含两个 Segment,第一个Segment已下发10%时,如果下一个Segment未更新,则另启一个更新线程去更新下一个Segment,这就是Leaf的双Buffer特性。
这样做有两个好处:
一是segment长度如果设置为服务高峰期发号QPS的600倍(10分钟),即使DB宕机,Leaf仍能持续发号10-20分钟不受影响。
二是每次请求都会判断下个号段的状态,从而更新此号段,避免偶发网络抖动影响下个号段的更新。
ublic class SegmentBuffer {
private String key;// 数据库的biz_tag
private Segment[] segments; //双buffer
private volatile int currentPos; //当前的使用的segment的index
private volatile boolean nextReady; //下一个segment是否处于可切换状态
private volatile boolean initOk; //是否初始化完成
private final AtomicBoolean threadRunning; //线程是否在运行中
private final ReadWriteLock lock;
private volatile int step;// 动态调整的step
private volatile int minStep; // 最小step
private volatile long updateTimestamp;// 更新时间戳
}
public class Segment {
private AtomicLong value = new AtomicLong(0);
private volatile long max;
private volatile int step;
private SegmentBuffer buffer;
}
- UID取值
UID通过调用get方法获取,具体实现逻辑大致如下
public Result get(final String key) {
//SegmentIDGenImpl是否初始化
...
//cache中是否已经缓存了对应biz_id的数据
if (cache.containsKey(key)) {
//检查SegmentBuffer中的Segment是否初始化
if (!buffer.isInitOk()) {
//初始化Segment
updateSegmentFromDb(key, buffer.getCurrent());
buffer.setInitOk(true);
}
}
return getIdFromSegmentBuffer(cache.get(key));
}
updateSegmentFromDb方法从数据库表中读取数据更新SegmentBuffer中的Segment。在第一次初始号段与第二次双buffer的异步准备另一个号段会被调用,注意前两次调用之后后续双buffer的异步准备另一个号段会动态调整申请号段的区间大小,调整规则主要跟号段申请频率有关,具体逻辑大致如下:
public void updateSegmentFromDb(String key, Segment segment) {
StopWatch sw = new Slf4JStopWatch();
SegmentBuffer buffer = segment.getBuffer();
LeafAlloc leafAlloc;
/**
* 第一次初始号段
* 1. 更新数据库中key对应记录的maxId(maxId=maxId+step)
* 2. 设置buffer的Step(步长)与MinStep
*/
if (!buffer.isInitOk()) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
}
/**
* 第二次初始号段(双Buffer异步准备)
* 1. 更新数据库中key对应记录的maxId(maxId=maxId+step)
* 2. 设置buffer的Step(步长)与MinStep
*/
else if (buffer.getUpdateTimestamp() == 0) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
}
/**
* 后续双Buffer异步准备
* 动态调整step
* 1. duration < 15 分钟 : step 变为原来的2倍, 最大为 MAX_STEP
* 2. 15分钟 <= duration < 30分钟 : nothing
* 3. duration >= 30 分钟 : 缩小step, 最小为DB中配置的step
* 更新数据库中key对应记录的maxId(maxId=maxId+step)
* 设置buffer的Step(步长)与MinStep
*/
else {
//更新时间差
long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
int nextStep = buffer.getStep();
if (duration < SEGMENT_DURATION) {
if (nextStep * 2 > MAX_STEP) {
//do nothing
} else {
nextStep = nextStep * 2;
}
} else if (duration < SEGMENT_DURATION * 2) {
//do nothing with nextStep
} else {
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
}
logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
LeafAlloc temp = new LeafAlloc();
temp.setKey(key);
temp.setStep(nextStep);
leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(nextStep);
buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
}
/**
* 准备当前Segment号段,设置value初始值与max/step值为buffer的max与step
*/
long value = leafAlloc.getMaxId() - buffer.getStep();
segment.getValue().set(value);
segment.setMax(leafAlloc.getMaxId());
segment.setStep(buffer.getStep());
sw.stop("updateSegmentFromDb", key + " " + segment);
}
最后的getIdFromSegmentBuffer方法会调用segment.getValue().getAndIncrement()从segment中获取value得到当前的UID返回并进行自增,这里在获取之前会判断当前Segment已经使用超过10%,如果超过会异步准备双buffer的另一个Segment。如果当前号段已经用完,切换另一个Segment号段使用。流程图如下图所示:
Leaf-snowflake模式
Leaf-snowflake数据结构上沿用了snowflake的设计,正数位(占1比特)+ 时间戳(占41比特)+ 机器ID(占5比特)+ 机房ID(占5比特)+ 自增值(占12比特)
,主要针对workId的生成与时间回拨问题进行了优化处理。
实现方式
- 初始化WorkId生成
Leaf-snowflake依靠Zookeeper生成workId[机器ID+ 机房ID],Leaf中workId是基于ZooKeeper的顺序Id来生成的,每个应用在使用Leaf-snowflake时,启动时都会都在Zookeeper中生成一个顺序Id,相当于一台机器对应一个顺序节点,也就是一个workId。
启动步骤如下:
- 启动Leaf-snowflake服务,连接Zookeeper,在leaf_forever父节点下检查自己是否已经注册过(是否有该顺序子节点)。
- 如果有注册过直接取回自己的workerID(zk顺序节点生成的int类型ID号),启动服务。
- 如果没有注册过,就在该父节点下面创建一个持久顺序节点,创建成功后取回顺序号当做自己的workerID号,启动服务。
弱化对Zookeeper的依赖:Leaf-snowflake除了每次会去ZooKeeper拿数据以外,也会在本机文件系统上缓存一个workerID文件。当ZooKeeper出现问题,而且机器需要重启,能保证服务能够正常启动,做到了对三方组件的弱依赖。一定程度上提高了SLA。
- 时间回拨问题解决
为了解决Snowflake的时间回拨问题,leaf主要从两个地方进行优化,一个是启动时对自身系统时间做校验,主要比较对象为系统节点历史记录的时间与其余节点的系统时间。启动流程如下:
服务启动时首先检查自己是否写过ZooKeeper leaf_forever节点:
- 若写过,则用自身系统时间与leaf_forever/${self}节点记录时间做比较,若小于leaf_forever/${self}时间则认为机器时间发生了大步长回拨,服务启动失败并报警。
- 若未写过,证明是新服务节点,直接创建持久节点leaf_forever/${self}并写入自身系统时间,接下来综合对比其余Leaf节点的系统时间来判断自身系统时间是否准确,具体做法是取leaf_temporary下的所有临时节点(所有运行中的Leaf-snowflake节点)的服务IP:Port,然后通过RPC请求得到所有节点的系统时间,计算sum(time)/nodeSize。
- 若abs( 系统时间-sum(time)/nodeSize ) < 阈值,认为当前系统时间准确,正常启动服务,同时写临时节点leaf_temporary/${self} 维持租约。
- 否则认为本机系统时间发生大步长偏移,启动失败并报警。
- 每隔一段时间(3s)上报自身系统时间写入leaf_forever/${self}。
二是运行时对时间回拨做了等待处理,时间偏差大小小于5ms,则等待两倍时间,还是小于则做异常处理,具体逻辑如下。
//发生了回拨,此刻时间小于上次发号时间
if (timestamp < lastTimestamp) {
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
try {
//时间偏差大小小于5ms,则等待两倍时间
wait(offset << 1);//wait
timestamp = timeGen();
if (timestamp < lastTimestamp) {
//还是小于,抛异常并上报
throwClockBackwardsEx(timestamp);
}
} catch (InterruptedException e) {
throw e;
}
} else {
//throw
throwClockBackwardsEx(timestamp);
}
}
总结
Leaf-segment是基于数据库号段模式的优化,规避网络波动:通过双Buffer的预生成方式规避了网络抖动对UID生成的影响;容灾性高:DB不可用情况下一段时间依旧可以进行发号(时间与segment步长有关)。
Leaf-snowflake是基于Snowflake算法的优化,在workId的获取上,对分布式集群环境下面,实例自动伸缩场景,通过对zookeeper的弱依赖实现WorkId的获取。针对时间回拨问题处理对比百度uid-generator比较粗暴,做了启动校验与时间等待的处理。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。