tensorflow-serving二次开发 – 增加模型流量监控

我们采用tensorflow-serving部署模型,利用文件同步机制分发model到tensorflow-serving目录下,由tensorflow-serving自动热加载最新N个版本模型,或者直接指定加载哪些版本。

由于tensorflow-serving加载模型会占用大量内存,因此对于不再使用的模型需要下线,由于不清楚客户端正在使用哪些版本的模型,因此让模型下线工作变得非常危险。

思路

希望可以方便的查看tensorflow-serving中各个model的流量情况,以便辅助决策模型是否可以安全下线。

tfs(tensorflow-serving)内置了prometheus接口,但是其现有指标不满足统计需求,我们期望能够统计各个{model_name,model_version}的访问量、QPS等关键指标。

因此考虑对tfs做简单的二次开发,在GRPC接口位置添加自定义埋点。

开发代码

从github checkout到特定分支的tfs,我选择了最新的r2.4分支,大家可以在这里看到我的修改(liangdong commit): https://github.com/owenliang/serving/commits/r2.4

谷歌代码采用bazel编译工具链,类似于java的maven,实测不需要翻墙。

Promtheus HTTP接口

tfs已经开启了一个http端口对外服务,上面注册了restful风格的推断API,当然还有prometheus采集API。

源码文件:tensorflow_serving/model_servers/http_server.cc,相关代码:

  // Register handler for prometheus metric endpoint.
  if (monitoring_config.prometheus_config().enable()) {
    std::shared_ptr exporter =
        std::make_shared();
    net_http::RequestHandlerOptions prometheus_request_options;
    PrometheusConfig prometheus_config = monitoring_config.prometheus_config();
    auto path = prometheus_config.path().empty()
                    ? PrometheusExporter::kPrometheusPath
                    : prometheus_config.path();
    server->RegisterRequestHandler(
        path,
        [exporter, path](net_http::ServerRequestInterface* req) {
          ProcessPrometheusRequest(exporter.get(), path, req);
        },
        prometheus_request_options);
  }

PrometheusExporter能够导出程序中的所有埋点数据,埋点数据由tensorflow核心库monitoring维护,我们后续也会向里面自定义埋点。

ProcessPromtheusRequest函数则负责处理HTTP请求,调用exporter生成Prometheus格式的应答:

void ProcessPrometheusRequest(PrometheusExporter* exporter, const string& path,
                              net_http::ServerRequestInterface* req) {
  std::vector<std::pair> headers;
  headers.push_back({"Content-Type", "text/plain"});
  string output;
  Status status;
  // Check if url matches the path.
  if (req->uri_path() != path) {
    output = absl::StrFormat("Unexpected path: %s. Should be %s",
                             req->uri_path(), path);
    status = Status(error::Code::INVALID_ARGUMENT, output);
  } else {
    status = exporter->GeneratePage(&output);
  }

也就是说,prometheus接口逻辑是不需要修改的,它能自动序列化所有程序埋点,我们只需要去代码里埋点就行。

实现埋点方法

跟踪代码,找到一些可以参考的埋点函数,学习一下如何向tensorflow monitoring库注册埋点。

源码文件:tensorflow_serving/servables/tensorflow/util.cc

可以看到一个Counter计数器:

auto* example_count_total = monitoring::Counter::New(
    "/tensorflow/serving/request_example_count_total",
    "The total number of tensorflow.Examples.", "model");
  • Counter中的N代表这个metrics支持的标签数量。
  • tensorflow/serving/request_example_count_total:metrics的名字,但最终输出时会稍微变换一下。
  • The total number of tensorflow.Examples:metrics的说明文字。
  • model:第1个标签的key名。

上述会输出这样的Prometheus Counter数据:

:tensorflow:serving:request_example_count_total{model=”xxxxxx”}  100312

我们仿照做一个统计{model_name,model_version}访问量的计数器即可:

auto* model_call_count = monitoring::Counter::New(
    "/tensorflow/serving/model_call_count",
    "The call counter of each model-version", "model_name", "model_version");

然后封装一个打点函数:

void RecordModelCall(const string& model_name, int64 model_version) {
  std::string model_version_s = absl::StrFormat("%lld", model_version);
  model_call_count->GetCell(model_name, model_version_s)->IncrementBy(1);
}

当某个model_name,model_version被GRPC调用时,我们对埋点指标做+1操作,那么我们后续就能得到这样的指标:

# TYPE :tensorflow:serving:model_call_count counter
:tensorflow:serving:model_call_count{model_nane=”half_plus_two”,model_version=”123″} 2

插入埋点调用

现在,我们去GRPC的各个接口调用一下上述方法即可,主要覆盖3大常用接口:

  • classifier:分类接口
  • predict:预测接口
  • regressor:回归接口

三种接口只是推断场景不同,都会调用某个模型推断。

predict

在源码tensorflow_serving/servables/tensorflow/predict_util.cc中埋点predict方法:

namespace internal {
Status RunPredict(
    const RunOptions& run_options, const MetaGraphDef& meta_graph_def,
    const absl::optional& servable_version,
    const internal::PredictResponseTensorSerializationOption option,
    Session* session, const PredictRequest& request, PredictResponse* response,
    const thread::ThreadPoolOptions& thread_pool_options) {
 
。。。
 
 
  MakeModelSpec(request.model_spec().name(), signature_name, servable_version,
                response->mutable_model_spec());
 
  std::vector<std::pair> input_tensors;
  std::vector output_tensor_names;
  std::vector output_tensor_aliases;
  TF_RETURN_IF_ERROR(PreProcessPrediction(signature, request, &input_tensors,
                                          &output_tensor_names,
                                          &output_tensor_aliases));
  std::vector outputs;
  RunMetadata run_metadata;
  const uint64 start_microseconds = EnvTime::NowMicros();
  TF_RETURN_IF_ERROR(session->Run(run_options, input_tensors,
                                  output_tensor_names, {}, &outputs,
                                  &run_metadata, thread_pool_options));
  const uint64 end_microseconds = EnvTime::NowMicros();
  RecordRuntimeLatency(request.model_spec().name(), /*api=*/"Predict",
                       /*runtime=*/"TF1",
                       end_microseconds - start_microseconds);
  // 这里~~~~~
  RecordModelCall(response->model_spec().name(), response->model_spec().version().value());

我们注意使用MakeModelSpec方法为response应答填充的model_version,因为用户的请求Request允许模糊指定latest版本,而response填充的则是根据服务端情况实际选择的具体版本号。

classifier

源码文件:tensorflow_serving/servables/tensorflow/classifier.cc,修改方法一样:

Status RunClassify(const RunOptions& run_options,
                   const MetaGraphDef& meta_graph_def,
                   const absl::optional& servable_version,
                   Session* session, const ClassificationRequest& request,
                   ClassificationResponse* response,
                   const thread::ThreadPoolOptions& thread_pool_options) {
  SignatureDef signature;
  TF_RETURN_IF_ERROR(GetClassificationSignatureDef(request.model_spec(),
                                                   meta_graph_def, &signature));
 
  std::unique_ptr classifier_interface;
  TF_RETURN_IF_ERROR(CreateFlyweightTensorFlowClassifier(
      run_options, session, &signature, thread_pool_options,
      &classifier_interface));
 
  MakeModelSpec(request.model_spec().name(),
                request.model_spec().signature_name(), servable_version,
                response->mutable_model_spec());
  // 这里~~~~~~~~~`
  RecordModelCall(response->model_spec().name(), response->model_spec().version().value());

regressor

源码文件:tensorflow_serving/servables/tensorflow/regressor.cc,修改如下:

  MakeModelSpec(request.model_spec().name(),
                request.model_spec().signature_name(), servable_version,
                response->mutable_model_spec());
  
  RecordModelCall(response->model_spec().name(), response->model_spec().version().value());

编译代码

编译tfs最好准备32核32G以上的服务器,低于该配置的不要尝试编译,并且安装好docker,不需要翻墙,不需要参考tfs官方介绍的那些Dockerfile自编译方法。

我们不需要研究bazel工具链用法,tfs已经为我们提供了 tools/run_in_docker.sh万能 脚本,是将宿主机源码目录映射到docker容器内并在docker容器内编译代码,整个过程中下载的bazel工具链和缓存的.o文件都会留在宿主机映射目录内,下次编译还是很快。

执行命令开始编译:

sh tools/run_in_docker.sh bazel build tensorflow_serving/model_servers:tensorflow_model_server

其含义就是在docker容器内执行: bazel build tensorflow_serving/model_servers:tensorflow_model_server这样一个编译命令,没什么特别的。

编译过程有大概7000个目标,初始运行会下载一些东西,后续编译阶段会将整机CPU全部打满:

容器内编译的中间结果和最终结果都会留在宿主机的映射目录下,也就是当前目录。

验证效果

在宿主机再次执行:

sh tools/run_in_docker.sh bash

可以以bash命令进入docker容器内,然后进入容器内的源码目录(与宿主机源码目录完全一样):

cd /root/liangdong/serving/

为了开启tfs的prometheus接口,需要编写一个prometheus的配置文件:

# 安装个vim
apt update && apt install vim
root@10-253-0-22:~/liangdong/serving# cat monitor.config 
prometheus_config {
  enable: true,
  path: "/monitoring/prometheus/metrics"
}

然后用tensorflow-serving拉起源码自带的测试model:

nohup /root/liangdong/serving/bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server –model_base_path=/root/liangdong/serving/tensorflow_serving/servables/tensorflow/testdata/saved_model_half_plus_two_cpu/ –model_name=half_plus_two –rest_api_port=8501 –monitoring_config_file=./monitor.config &

然后调用一下predict接口:

curl -d ‘{“instances”: [1.0, 2.0, 5.0]}’ -X POST http://localhost:8501/v1/models/half_plus_two:predict

然后查看一下Prometheus数据:

root@10-253-0-22:~/liangdong/serving# curl localhost:8501/monitoring/prometheus/metrics 2>/dev/null | grep model_call_count
# TYPE :tensorflow:serving:model_call_count counter
:tensorflow:serving:model_call_count{model_name="half_plus_two",model_version="123"} 1

埋点成功~

打包镜像

回到宿主机,我们可以从目录下找到编译好的二进制,它是ubuntu容器内编译的,宿主机如果不是ubuntu是运行不了的:

[root@10-253-0-22 serving]# ll bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server  -r-xr-xr-x 1 root root 223328328 Jan 11 18:27 bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server

因此,我们需要将这个二进制再打包回ubuntu干净镜像内用作线上服务,我们准备一个空目录并把该二进制拷贝进去,同时编辑一个Dockerfile:

[root@10-253-0-22 build]# ll
total 218100
-rw-r--r-- 1 root root        80 Jan 11 14:09 Dockerfile
-r-xr-xr-x 1 root root 223328328 Jan 11 18:27 tensorflow_model_server
[root@10-253-0-22 build]# cat Dockerfile 
FROM ubuntu:18.04
 
ADD tensorflow_model_server /usr/bin/tensorflow_model_server

然后执行docker build打一个tensorflow-serving镜像即可。

配置prometheus采集

我们的tfs运行在K8S里面,所以可以基于prometheus的role:pod的服务发现机制枚举出所有POD,基于pod的annotation识别采集端口和URI,经过relabel得到要采集的Target地址(__address__和__metrics_path__)。

下面给大家看一个具体例子,大家根据情况参考。

给POD加annotation:

    metadata:
      annotations:
        prometheus.io/path: /monitoring/prometheus/metrics
        prometheus.io/port: '810'
        prometheus.io/scrape: 'true'

给prometheus加采集任务:

- job_name: kubernetes-pods
      honor_timestamps: true
      scrape_interval: 30s
      scrape_timeout: 10s
      metrics_path: /metrics
      scheme: http
      kubernetes_sd_configs:
      - role: pod
      relabel_configs:
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        separator: ;
        regex: true
        replacement: $1
        action: keep
      - source_labels: [__meta_kubernetes_namespace]
        separator: ;
        regex: (.*monitoring.*)
        replacement: $1
        action: drop
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
        separator: ;
        regex: (.+)
        target_label: __metrics_path__
        replacement: $1
        action: replace
      - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
        separator: ;
        regex: ([^:]+)(?::\d+)?;(\d+)
        target_label: __address__
        replacement: $1:$2
        action: replace
      - separator: ;
        regex: __meta_kubernetes_pod_label_(.+)
        replacement: $1
        action: labelmap
      - source_labels: [__meta_kubernetes_namespace]
        separator: ;
        regex: (.*)
        target_label: namespace
        replacement: $1
        action: replace
      - source_labels: [__meta_kubernetes_pod_name]
        separator: ;
        regex: (.*)
        target_label: pod_name
        replacement: $1
        action: replace
      - source_labels: [__meta_kubernetes_pod_ip]
        separator: ;
        regex: (.*)
        target_label: pod_ip
        replacement: $1
        action: replace
      - source_labels: [__meta_kubernetes_host_ip]
        separator: ;
        regex: (.*)
        target_label: host_ip
        replacement: $1
        action: replace
      - source_labels: [__meta_kubernetes_pod_container_name]
        separator: ;
        regex: (.*)
        target_label: container_name
        replacement: $1
        action: replace

令prometheus重加载配置:

curl http://prom地址/-/reload

然后编写一个Prom sql检测每个模型的流量情况,思路是统计每个{model_name,model_version}在过去24小时每小时的平均QPS,并取其中最大值作为该模型当日的最大QPS:

max_over_time( sum( rate(:tensorflow:serving:model_call_count{model_name!=””,model_version!=””}[60m]) ) by (model_name,model_version)[86400s:3600s] )

然后可以通过prometheus的API进行远程查询获取JSON数据:

curl ‘http://10.42.35.48:9090/api/v1/query?query=max_over_time(+sum(+rate(%3atensorflow%3aserving%3amodel_call_count%7bmodel_name!%3d%22%22%2cmodel_version!%3d%22%22%7d%5b60m%5d)+)+by+(model_name%2cmodel_version)%5b86400s%3a3600s%5d+)’
{
    "status": "success",
    "data": {
        "resultType": "vector",
        "result": [{
            "metric": {
                "model_name": "hj_mtl",
                "model_version": "1609892356"
            },
            "value": [1610689547.017, "0.983193552715281"]
        }]
    }
}

可以看到每个{model_name,model_version}的当日最大小时平均QPS。

如果文章帮助了你,请帮我点击1次谷歌广告,或者微信赞助1元钱,感谢!

知识星球有更多干货内容,对我认可欢迎加入: