Rust 版本的 peerstore 落地实践

在 rust-libp2p 中,当协议想要获取 peer_id 所对应的地址时,需要实现 NetworkBehaviour 的 addresses_of_peer 方法。与之不同的是,go-libp2p 使用 peerstore 来存储了 peer_id 与 address 之间的关系,因此我们可以参考它来实现一个 rust 版本的 peerstore。

实现构想

首先,由于我们的启动核心是 swarm,那 peerstore 会作为其中的一个属性。其次,在 go-libp2p-peerstore 中,peerstore 主要存放的数据为三块:地址信息的 AddrBook,公私钥信息的 KeyBook,协议信息的 ProtoBook;我们可以将三者结合在一起组成一个新的 struct,取名为 PeerRecord,放在以 peer_id 为 key 的 Hashmap 中。

垃圾回收机制

GC 存在的意义就是防止 hashmap 过度膨胀。由于网络状态时时刻刻都在发生变化,peer 之间的连接也可能随之变化,而 peerstore 有一个重要的作用就是存放 peer 的地址信息。如果不对已经失效的 peer 信息进行清理,就会影响到 peerstore 的工作效率。

目前实现的效果是,在 swarm 的 start 方法中使用 task::spawn 启动一个任务,创建一个 mpsc 的 channel,使用 select 语法等待管道传来的消息或者 task 等待10分钟的逻辑完成,针对某些地址,如果已经超出 ttl 的限制,清理当前地址;同时,如果当前 peer_id 的地址集合中不存在任何的地址信息,就将其从 Hashmap 中移除。

虽然 GC 机制的存在,使 Hashmap 不会无限制扩容,良好地帮助了系统的运行。但是仍然有些不足的地方,考虑如下这种情况:

对于 KAD 协议来说,peer 需要不停地进行迭代查询已知节点,逐步填充自己的 KBucket,这是一个耗时的过程。如果在 gc 时,将较早查询到的节点地址信息从 peerstore 中移除了,那么又需要重新启用迭代查询去获取地址,因此我们在 PeerRecord 中添加了一个 bool 值 pinned。GC 时会判断这条记录的类别,如果 pinned 为 true,就会跳过清理的步骤。

序列化与持久化

对每一个 peer 来说,因为某些原因需要下线或停机时,存放在 peerstore 里的节点信息是不应该被丢弃的。而对于需要持久化的数据,也需要进行序列化操作,便于存放。

在 libp2p-rs 中,主循环的调用也是通过 task::spawn 启动的。当 swarm 接收到 close 的消息时,将会退出事件处理的循环,并向运行 peerstore 的 gc 线程发送一个 close() 的事件,结束 gc 的过程。接下来调用 peerstore的save_data() 方法,将数据使用 serde 序列化成 json 格式,并使用 std::io 将序列化后的数据存放到根目录的 txt 文件中。

方法分析

以 GC 方法进行解析:

1. swarm 主循环 spawn 运行 task,每十分钟触发一次 select。

2. Hashmap 被 Arc 包裹,可以通过 lock() 获取,保证并发安全。

3. 如果该 peer 的信息不是通过 kad 获取的,调用 retain 筛选未超出 ttl 时限的地址。

4. 如果当前 peer 的地址数据已经清空,从 hashmap 中移除这个 peer。

   // swarm/lib.rs
    // The GC task is to remove all expired addresses from the peer store
        task::spawn(async move {
            log::info!("starting Peerstore GC...");
            loop {
                let either = future::select(rx.next(), task::sleep(PEERSTORE_GC_PURGE_INTERVAL).boxed()).await;
                match either {
                    Either::Left((_, _)) => break,
                    Either::Right((_, _)) => peer_store.remove_expired_addrs(),
                }
            }
            log::info!("quitting Peerstore GC...");
        });
        
    // core/peerstore.rs
    /// Removes all expired address.
    pub fn remove_expired_addrs(&self) {
        let mut to_remove = vec![];
        let mut guard = self.inner.lock().unwrap();
        for (peer, pr) in guard.iter_mut() {
            if !pr.pinned {
                log::debug!("GC attempt for {:?}", peer);
                pr.addrs.retain(|record| record.expiry.elapsed() < record.ttl);
                // delete this peer if no addr at all
                if pr.addrs.is_empty() {
                    log::debug!("remove {:?} from peerstore", peer);
                    to_remove.push(peer.clone());
                }
            }
        }

        for peer in to_remove {
            guard.remove(&peer);
        }
    }