使用 Python 配合 Redis 超越缓存
2014 年 8 月 18 日
使用 Redis 储存数据流
除了发布与订阅之外, Redis 还可以使用流来发布和订阅事件。 Redis 流 是一个非常大的话题, 但使用它只需要 掌握少量命令 。 从 Python 来看, 这些命令的用法都是非常简单的, 我将一一向你说明。
下面的代码将把三次大脚兽的目击事件添加到流里面。
import asyncio import aioredis async def main(): redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8') await asyncio.gather( add_to_stream(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'), add_to_stream(redis, 2, 'Sighting near the Columbia River', 'Class A'), add_to_stream(redis, 3, 'Chased by a tall hairy creature', 'Class A')) redis.close() await redis.wait_closed() def add_to_stream(redis, id, title, classification): return redis.xadd('bigfoot:sightings:stream', { 'id': id, 'title': title, 'classification': classification }) asyncio.run(main()) |
这段代码中最重要的就是第 17 行和第 18 行, 它使用了 redis.xadd
函数将一次目击事件的字段添加到流里面。
每个新添加的流事件都有一个唯一标识符, 其中包含自 1970 年开始的时间戳(毫秒)和一个用破折号连接的序列号。 例如, 当我写这篇文章的时候, 1970 年 1 月 1 日(Unix纪元)午夜已经过去了 1,593,120,357,193 毫秒(1.59千兆秒)。 因此当我运行上面这段代码的时候, 命令将创建出 ID 为 1593120357193-0
的事件。
我们在添加事件的时候可以使用 *
来代替具体的 ID , 这样 Redis 就会根据当前时间来自动生成事件的 ID , 这也是 redis.xadd
函数的默认行为。
正如接下来的代码所示, 在读取流元素的时候, 我们需要设置一个起始 ID 。 你可以看到, 在第 10 行, 程序将变量 last_id
设置成了 0-0
, 这个 ID 代表流的起始位置。
import asyncio import aioredis from pprint import pp async def main(): redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf8') last_id = '0-0' while True: events = await redis.xread(['bigfoot:sightings:stream'], timeout=0, count=5, latest_ids=[last_id]) for key, id, fields in events: pp(fields) last_id = id asyncio.run(main()) |
程序的第 12 行使用 redis.xread
函数从流中请求最多 5 个 0-0
之后的事件。 该调用将返回一个列表, 然后程序将对其进行循环和解构, 以获得事件的字段和标识符。 事件的标识符会被储存起来, 以便将来调用 redis.xread
时可以获得新的事件并在有需要时重新读取之前读取过的旧事件。