使用 Python 配合 Redis 超越缓存

使用 Redis 储存数据流

除了发布与订阅之外, Redis 还可以使用流来发布和订阅事件。 Redis 流 是一个非常大的话题, 但使用它只需要 掌握少量命令 。 从 Python 来看, 这些命令的用法都是非常简单的, 我将一一向你说明。

下面的代码将把三次大脚兽的目击事件添加到流里面。

 import asyncio
 import aioredis

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

   await asyncio.gather(
     add_to_stream(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
     add_to_stream(redis, 2, 'Sighting near the Columbia River', 'Class A'),
     add_to_stream(redis, 3, 'Chased by a tall hairy creature', 'Class A'))

   redis.close()
   await redis.wait_closed()

 def add_to_stream(redis, id, title, classification):
   return redis.xadd('bigfoot:sightings:stream', {
     'id': id, 'title': title, 'classification': classification })

 asyncio.run(main())

这段代码中最重要的就是第 17 行和第 18 行, 它使用了 redis.xadd 函数将一次目击事件的字段添加到流里面。

每个新添加的流事件都有一个唯一标识符, 其中包含自 1970 年开始的时间戳(毫秒)和一个用破折号连接的序列号。 例如, 当我写这篇文章的时候, 1970 年 1 月 1 日(Unix纪元)午夜已经过去了 1,593,120,357,193 毫秒(1.59千兆秒)。 因此当我运行上面这段代码的时候, 命令将创建出 ID 为 1593120357193-0 的事件。

我们在添加事件的时候可以使用 * 来代替具体的 ID , 这样 Redis 就会根据当前时间来自动生成事件的 ID , 这也是 redis.xadd 函数的默认行为。

正如接下来的代码所示, 在读取流元素的时候, 我们需要设置一个起始 ID 。 你可以看到, 在第 10 行, 程序将变量 last_id 设置成了 0-0 , 这个 ID 代表流的起始位置。

 import asyncio
 import aioredis

 from pprint import pp

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf8')

   last_id = '0-0'
   while True:
     events = await redis.xread(['bigfoot:sightings:stream'], timeout=0, count=5, latest_ids=[last_id])
     for key, id, fields in events:
       pp(fields)
       last_id = id

 asyncio.run(main())

程序的第 12 行使用 redis.xread 函数从流中请求最多 5 个 0-0 之后的事件。 该调用将返回一个列表, 然后程序将对其进行循环和解构, 以获得事件的字段和标识符。 事件的标识符会被储存起来, 以便将来调用 redis.xread 时可以获得新的事件并在有需要时重新读取之前读取过的旧事件。