使用 Redis 实现登录会话

Note

本文摘录自即将出版的《Redis使用手册》, 详情请见: RedisGuide.com

为了方便用户, 网站一般都会为已登录的用户生成一个加密令牌, 然后把这个令牌分别储存在服务器端和客户端, 之后每当用户再次访问该网站的时候, 网站就可以通过验证客户端提交的令牌来确认用户的身份, 从而使得用户不必重复地执行登录操作。

另一方面, 为了防止用户因为长时间不输入密码而导致遗忘密码, 并且为了保证令牌的安全性, 网站一般都会为令牌设置一个过期期限(比如一个月), 当期限到达之后, 用户的会话就会过时, 而网站则会要求用户重新登录。

上面描述的这种使用令牌来避免重复登录的机制一般被称为登录会话(login session), 通过使用 Redis 的散列, 我们可以构建出代码清单 3-4 所示的登录会话程序。

代码清单 3-4 使用散列实现的登录会话程序: /hash/login_session.py

import random
from time import time  # 获取浮点数格式的 unix 时间戳
from hashlib import sha256

# 会话的默认过期时间
DEFAULT_TIMEOUT = 3600*24*30    # 一个月

# 储存会话令牌以及会话过期时间戳的散列
SESSION_TOKEN_HASH = "session::token"
SESSION_EXPIRE_TS_HASH = "session::expire_timestamp"

# 会话状态
SESSION_NOT_LOGIN = "SESSION_NOT_LOGIN"
SESSION_EXPIRED = "SESSION_EXPIRED"
SESSION_TOKEN_CORRECT = "SESSION_TOKEN_CORRECT"
SESSION_TOKEN_INCORRECT = "SESSION_TOKEN_INCORRECT"

def generate_token():
    """
    生成一个随机的会话令牌。
    """
    random_string = str(random.getrandbits(256)).encode('utf-8')
    return sha256(random_string).hexdigest()


class LoginSession:

    def __init__(self, client, user_id):
        self.client = client
        self.user_id = user_id

    def create(self, timeout=DEFAULT_TIMEOUT):
        """
        创建新的登录会话并返回会话令牌,
        可选的 timeout 参数用于指定会话的过期时间(以秒为单位)。
        """
        # 生成会话令牌
        user_token = generate_token()
        # 计算会话到期时间戳
        expire_timestamp = time()+timeout
        # 以用户 ID 为字段,将令牌和到期时间戳分别储存到两个散列里面
        self.client.hset(SESSION_TOKEN_HASH, self.user_id, user_token)
        self.client.hset(SESSION_EXPIRE_TS_HASH, self.user_id, expire_timestamp)
        # 将会话令牌返回给用户
        return user_token

    def validate(self, input_token):
        """
        根据给定的令牌验证用户身份。
        这个方法有四个可能的返回值,分别对应四种不同情况:
        1. SESSION_NOT_LOGIN —— 用户尚未登录
        2. SESSION_EXPIRED —— 会话已过期
        3. SESSION_TOKEN_CORRECT —— 用户已登录,并且给定令牌与用户令牌相匹配
        4. SESSION_TOKEN_INCORRECT —— 用户已登录,但给定令牌与用户令牌不匹配
        """
        # 尝试从两个散列里面取出用户的会话令牌以及会话的过期时间戳
        user_token = self.client.hget(SESSION_TOKEN_HASH, self.user_id)
        expire_timestamp = self.client.hget(SESSION_EXPIRE_TS_HASH, self.user_id)

        # 如果会话令牌或者过期时间戳不存在,那么说明用户尚未登录
        if (user_token is None) or (expire_timestamp is None):
            return SESSION_NOT_LOGIN

        # 将当前时间戳与会话的过期时间戳进行对比,检查会话是否已过期
        # 因为 HGET 命令返回的过期时间戳是字符串格式的
        # 所以在进行对比之前要先将它转换成原来的浮点数格式
        if time() > float(expire_timestamp):
            return SESSION_EXPIRED

        # 用户令牌存在并且未过期,那么检查它与给定令牌是否一致
        if input_token == user_token:
            return SESSION_TOKEN_CORRECT
        else:
            return SESSION_TOKEN_INCORRECT

    def destroy(self):
        """
        销毁会话。
        """
        # 从两个散列里面分别删除用户的会话令牌以及会话的过期时间戳
        self.client.hdel(SESSION_TOKEN_HASH, self.user_id)
        self.client.hdel(SESSION_EXPIRE_TS_HASH, self.user_id)

LoginSessioncreate() 方法首先会计算出随机的会话令牌以及会话的过期时间戳, 然后使用用户 ID 作为字段, 将令牌和过期时间戳分别储存到两个散列里面。

在此之后, 每当客户端向服务器发送请求并提交令牌的时候, 程序就会使用 validate() 方法验证被提交令牌的正确性: validate() 方法会根据用户的 ID , 从两个散列里面分别取出用户的会话令牌以及会话的过期时间戳, 然后通过一系列检查判断令牌是否正确以及会话是否过期。

最后, destroy() 方法可以在用户手动登出(logout)时调用, 它可以删除用户的会话令牌以及会话的过期时间戳, 让用户重新回到未登录状态。

在拥有 LoginSession 程序之后, 我们可以通过执行以下代码, 为用户 peter 创建出相应的会话令牌:

>>> from redis import Redis
>>> from login_session import LoginSession
>>>
>>> client = Redis(decode_responses=True)
>>> session = LoginSession(client, "peter")
>>>
>>> token = session.create()
>>> token
'3b000071e59fcdcaa46b900bb5c484f653de67055fde622f34c255a65bd9a561'

并通过以下代码, 验证给定令牌的正确性:

>>> session.validate("wrong_token")
'SESSION_TOKEN_INCORRECT'
>>>
>>> session.validate(token)
'SESSION_TOKEN_CORRECT'

然后在会话使用完毕之后, 通过执行以下代码来销毁会话:

>>> session.destroy()
>>>
>>> session.validate(token)
'SESSION_NOT_LOGIN'

图 3-16 展示了使用 LoginSession 程序在数据库里面创建多个会话时的样子。

图 3-16 登录会话程序数据结构示意图