目录
- 前言
 - 1. 基本知识
 - 2. 模板
 - 3. Demo
 
前言
分布式ID的生成推荐阅读:分布式ID生成方法的超详细分析(全)
1. 基本知识
Snowflake 算法是一种用于生成全局唯一 ID 的分布式算法,最初由 Twitter 设计并开源
它被设计用于解决分布式系统中生成唯一 ID 的需求,特别是在微服务架构和大规模分布式系统中
Snowflake 算法的核心思想是利用时间戳、工作机器 ID 和序列号来生成全局唯一的 64 位长整型 ID
其核心的组成部分如下:
-  
时间戳(Timestamp):通常以毫秒为单位,用于标识生成 ID 的时间点。时间戳的精度对 ID 的唯一性至关重要
 -  
工作机器 ID(Worker ID):用于标识不同的工作机器或节点。在分布式系统中,每个节点需要有唯一的标识
 -  
数据中心 ID(Datacenter ID):用于标识不同的数据中心。在大规模分布式系统中,可能存在多个数据中心,每个数据中心需要有唯一的标识
 -  
序列号(Sequence Number):用于解决同一时间戳下生成多个 ID 的冲突问题。序列号通常是一个自增的数字,通过与一定的位数掩码进行位运算来确保不会溢出
 
Snowflake 算法的作用:
-  
生成全局唯一 ID:Snowflake 算法可以在分布式系统中生成全局唯一的 ID,确保不同节点生成的 ID 不会冲突
 -  
适用于分布式环境:由于Snowflake算法只依赖于机器的时钟和网络通信,因此非常适合在分布式环境中使用
 -  
简单且高效:Snowflake 算法的实现相对简单,且性能高效,可以快速生成唯一 ID
 
2. 模板
以下模板带有注释
实现了一个 Snowflake 类,通过调用 generate 方法可以生成唯一的 Snowflake ID
Snowflake ID 是一个 64 位长整型,包含了时间戳、数据中心 ID、工作机器 ID 和序列号等信息
import time
class SnowFlake(object):
    def __init__(self, worker_id, datacenter_id, sequence=0):
        self.worker_id = worker_id  # 用于标识不同的工作机器
        self.datacenter_id = datacenter_id  # 用于标识不同的数据中心
        self.sequence = sequence  # 序列号,用于解决并发生成的 ID 冲突
        self.tw_epoch = 1288834974657  # Twitter Snowflake epoch (in milliseconds),Snowflake 算法的起始时间点
        
        # Bit lengths,用于计算位数
        self.worker_id_bits = 5  # 5位,最大值为31
        self.datacenter_id_bits = 5  # 5位,最大值为31
        self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)  # 最大工作机器 ID
        self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)  # 最大数据中心 ID
        self.sequence_bits = 12  # 12位,支持的最大序列号数
        self.sequence_mask = -1 ^ (-1 << self.sequence_bits)  # 序列号掩码,用于生成序列号
        
        # Create initial timestamp,初始化上一次生成 ID 的时间戳
        self.last_timestamp = self.current_timestamp()
        
        # Check worker_id and datacenter_id values,检查工作机器 ID 和数据中心 ID 的取值范围
        if self.worker_id > self.max_worker_id or self.worker_id < 0:
            raise ValueError(f"Worker ID must be between 0 and {self.max_worker_id}")
        if self.datacenter_id > self.max_datacenter_id or self.datacenter_id < 0:
            raise ValueError(f"Datacenter ID must be between 0 and {self.max_datacenter_id}")
    
    @staticmethod
    def current_timestamp():
        return int(time.time() * 1000)  # 获取当前时间戳,单位为毫秒
    
    def generate(self):
        timestamp = self.current_timestamp()  # 获取当前时间戳
        
        if timestamp < self.last_timestamp:  # 如果当前时间戳小于上一次生成 ID 的时间戳
            raise ValueError("Clock moved backwards. Refusing to generate ID for {} milliseconds".format(
                self.last_timestamp - timestamp))  # 抛出异常,时钟回拨
        
        if timestamp == self.last_timestamp:  # 如果当前时间戳等于上一次生成 ID 的时间戳
            self.sequence = (self.sequence + 1) & self.sequence_mask  # 增加序列号,并与序列号掩码进行与运算,防止溢出
            if self.sequence == 0:  # 如果序列号归零
                timestamp = self.wait_next_millis(self.last_timestamp)  # 等待下一毫秒
        else:
            self.sequence = 0  # 时间戳变化,序列号重置为零
        
        self.last_timestamp = timestamp  # 更新上一次生成 ID 的时间戳
        
        # Generate Snowflake ID,生成 Snowflake ID
        _id = ((timestamp - self.tw_epoch) << (self.worker_id_bits + self.datacenter_id_bits)) | (
                self.datacenter_id << self.worker_id_bits) | self.worker_id << self.sequence_bits | self.sequence  # 使用时间戳、数据中心 ID、工作机器 ID 和序列号生成 ID
        return f"{_id:016d}"  # 返回 64 位长整型 ID 的字符串表示,补齐到16位长度
    
    def wait_next_millis(self, last_timestamp):
        timestamp = self.current_timestamp()  # 获取当前时间戳
        while timestamp <= last_timestamp:  # 循环直到获取到下一毫秒的时间戳
            timestamp = self.current_timestamp()
        return timestamp  # 返回下一毫秒的时间戳
 
3. Demo
结合以上模板,放一个调用的过程:
# 示例用法
if __name__ == "__main__":
    # 假设有两个数据中心,每个数据中心有两个工作机器
    worker_id = 1
    datacenter_id = 1
    
    snowflake = SnowFlake(worker_id, datacenter_id)
    
    # 生成10个ID
    for i in range(10):
        print(snowflake.generate())
 
截图如下:










![[蓝桥杯 2015 省 B] 生命之树](https://img-blog.csdnimg.cn/direct/7bd7daa8c9504bcbaf2b9794db0e8d9d.png)







