时间:2021-06-10 08:26:49 | 栏目:Redis | 点击:次
在分布式系统中,各个进程(本文使用进程来描述分布式系统中的运行主体,它们可以在同一个物理节点上也可以在不同的物理节点上)相互之间通常是需要协调进行运作的,有时是不同进程所处理的数据有依赖关系,必须按照一定的次序进行处理,有时是在一些特定的时间需要某个进程处理某些事务等等,人们通常会使用分布式锁、选举算法等技术来协调各个进程之间的行为。因为分布式系统本身的复杂特性,以及对于容错性的要求,这些技术通常是重量级的,比如 Paxos 算法,欺负选举算法,ZooKeeper 等,侧重于消息的通信而不是共享内存,通常也是出了名的复杂和难以理解,当在具体的实现和实施中遇到问题时都是一个挑战。
Redis 经常被人们认为是一种 NoSQL 软件,但其本质上是一种分布式的数据结构服务器软件,提供了一个分布式的基于内存的数据结构存储服务。在实现上,仅使用一个线程来处理具体的内存数据结构,保证它的数据操作命令的原子特性;它同时还支持基于 Lua 的脚本,每个 Redis 实例使用同一个 Lua 解释器来解释运行 Lua 脚本,从而 Lua 脚本也具备了原子特性,这种原子操作的特性使得基于共享内存模式的分布式系统的协调方式成了可能,而且具备了很大的吸引力,和复杂的基于消息的机制不同,基于共享内存的模式对于很多技术人员来说明显容易理解的多,特别是那些已经了解多线程或多进程技术的人。在具体实践中,也并不是所有的分布式系统都像分布式数据库系统那样需要严格的模型的,而所使用的技术也不一定全部需要有坚实的理论基础和数学证明,这就使得基于 Redis 来实现分布式系统的协调技术具备了一定的实用价值,实际上,人们也已经进行了不少尝试。本文就其中的一些协调技术进行介绍。
signal/wait 操作
在分布式系统中,有些进程需要等待其它进程的状态的改变,或者通知其它进程自己的状态的改变,比如,进程之间有操作上的依赖次序时,就有进程需要等待,有进程需要发射信号通知等待的进程进行后续的操作,这些工作可以通过 Redis 的 Pub/Sub 系列命令来完成,比如:
import random
single_cast_script="""
local channels = redis.call('pubsub', 'channels', ARGV[1]..'*');
if #channels == 0
then
return 0;
end;
local index= math.mod(math.floor(tonumber(ARGV[2])), #channels) + 1;
return redis.call( 'publish', channels[index], ARGV[3]); """
def wait_single( channel, myid):
return wait( channel + myid )
def signal_single( channel, data):
rand_num = int(random.random() * 65535)
return rc.eval( single_cast_script, 0, channel, str(rand_num), str(data) )
锁的一个简单直接的实现方法就是用 SET NX 命令设置一个设定了存活周期 TTL 的 Key 来获取锁,通过删除 Key 来释放锁,通过存活周期来保证避免死锁。不过这个方法存在单点故障风险,如果部署了 master/slave 节点,则在特定条件下可能会导致安全性方面的冲突,比如:
在 Redlock 算法中,通过类似于下面这样的命令进行加锁:
SET resource_name my_random_value NX PX 30000
if redis.call("get",KEYS[1]) == ARGV[1] then return
redis.call("del",KEYS[1])else return 0end
Redlock 算法不需要保证 Redis 节点之间的时钟是同步的(不论是物理时钟还是逻辑时钟),这点和传统的一些基于同步时钟的分布式锁算法有所不同。Redlock 算法的具体的细节可以参阅 Redis 的官方文档,以及文档中列出的多种语言版本的实现。
选举算法
在分布式系统中,经常会有些事务是需要在某个时间段内由一个进程来完成,或者由一个进程作为 leader 来协调其它的进程,这个时候就需要用到选举算法,传统的选举算法有欺负选举算法(霸道选举算法)、环选举算法、Paxos 算法、Zab 算法 (ZooKeeper) 等,这些算法有些依赖于消息的可靠传递以及时钟同步,有些过于复杂,难以实现和验证。新的 Raft 算法相比较其它算法来说已经容易了很多,不过它仍然需要依赖心跳广播和逻辑时钟,leader 需要不断地向 follower 广播消息来维持从属关系,节点扩展时也需要其它算法配合。
选举算法和分布式锁有点类似,任意时刻最多只能有一个 leader 资源。当然,我们也可以用前面描述的分布式锁来实现,设置一个 leader 资源,获得这个资源锁的为 leader,锁的生命周期过了之后,再重新竞争这个资源锁。这是一种竞争性的算法,这个方法会导致有比较多的空档期内没有 leader 的情况,也不好实现 leader 的连任,而 leader 的连任是有比较大的好处的,比如 leader 执行任务可以比较准时一些,查看日志以及排查问题的时候也方便很多,如果我们需要一个算法实现 leader 可以连任,那么可以采用这样的方法:
import redis
rc = redis.Redis()
local_selector = 0def master():
global local_selector
master_selector = rc.incr('master_selector')
if master_selector == 1:
# initial / restarted
local_selector = master_selector
else:
if local_selector > 0: # I'm the master before
if local_selector > master_selector: # lost, maybe the db is fail-overed.
local_selector = 0
else: # continue to be the master
local_selector = master_selector
if local_selector > 0: # I'm the current master
rc.expire('master_selector', 20) return local_selector > 0
这个算法鼓励连任,只有当前的 leader 发生故障或者执行某个任务所耗时间超过了任期、或者 Redis 节点发生故障恢复之后才需要重新选举出新的 leader。在 master/slave 模式下,如果 master 节点发生故障,某个 slave 节点提升为新的 master 节点,即使当时 master_selector 值尚未能同步成功,也不会导致出现两个 leader 的情况。如果某个 leader 一直连任,则 master_selector 的值会一直递增下去,考虑到 master_selector 是一个 64 位的整型类型,在可预见的时间内是不可能溢出的,加上每次进行 leader 更换的时候 master_selector 会重置为从 1 开始,这种递增的方式是可以接受的,但是碰到 Redis 客户端(比如 Node.js)不支持 64 位整型类型的时候就需要针对这种情况作处理。如果当前 leader 进程处理时间超过了任期,则其它进程可以重新生成新的 leader 进程,老的 leader 进程处理完毕事务后,如果新的 leader 的进程经历的任期次数超过或等于老的 leader 进程的任期次数,则可能会出现两个 leader 进程,为了避免这种情况,每个 leader 进程在处理完任期事务之后都应该检查一下自己的处理时间是否超过了任期,如果超过了任期,则应当先设置 local_selector 为 0 之后再调用 master 检查自己是否是 leader 进程。
消息队列
消息队列是分布式系统之间的通信基本设施,通过消息可以构造复杂的进程间的协调操作和互操作。Redis 也提供了构造消息队列的原语,比如 Pub/Sub 系列命令,就提供了基于订阅/发布模式的消息收发方法,但是 Pub/Sub 消息并不在 Redis 内保持,从而也就没有进行持久化,适用于所传输的消息即使丢失了也没有关系的场景。
如果要考虑到持久化,则可以考虑 list 系列操作命令,用 PUSH 系列命令(LPUSH, RPUSH 等)推送消息到某个 list,用 POP 系列命令(LPOP, RPOP,BLPOP,BRPOP 等)获取某个 list 上的消息,通过不同的组合方式可以得到 FIFO,FILO,比如:
import redis
rc = redis.Redis()
def fifo_push(q, data):
rc.lpush(q, data)
def fifo_pop(q):
return rc.rpop(q)
def filo_push(q, data):
rc.lpush(q, data)
def filo_pop(q):
return rc.lpop(q)
def safe_fifo_push(q, data):
rc.lpush(q, data)
def safe_fifo_pop(q, cache):
msg = rc.rpoplpush(q, cache) # check and do something on msg
rc.lrem(cache, 1) # remove the msg in cache list. return msg