海量日志分析的预处理

背景

前段时间,接入了公司部分日志流量,大约每分钟30万的请求日志。计划做一个恶意URL检测的系统,分2个阶段:

第一阶段:基于黑规则检测
第二阶段:基于机器学习检测

期间参考过很多文章,大部分都在讨论怎么去发现恶意请求,这无可厚非。但是实际环境中恶意请求的占比又是多少呢?观察我们的日志发现90%的请求都是正常请求,其中一些请求非常明显(一看就是没问题那种),例如:
/1/api/new_games?v=0.06540659188776337

/5tBARviOXjSbscQH9AzRiw==/109951164132869215.jpg?param=36y36

/Active_Page/js/vue.min.js

/api.php?format=json&t=1

如果不做一定的过滤,明显正常的请求进入检测引擎,即影响处理效率又浪费资源,很容易造成KAFKA数据积压。所以本文算是一个预热吧,这里重点和大家探讨滤白的问题,希望能起到抛砖引玉的作用。后面我会陆续分享其他2个阶段,目前项目正在紧张开发中。

0×01. 数据处理

过滤策略如下:

1.静态请求,URL后缀为:
[".jpeg", ".gif", ".jpg", ".png", ".js", ".css", ".bmp", ".ico", ".woff", ".woff2", ".ttf", ".eot", ".apk", ".json", ".zip", ".rar", ".html", ".ico", ".swf", ".exe", ".dat", ".txt", ".cgi", ".ts”]

2.参数值为纯数字、纯字母、或者字母数字组合
3.参数值值包含:数字、字母、-、_
4.参数值长度小于5
5.参数为空的GET POST请求
6.http返回状态码为200

策略1:静态请求处理

获取URL后缀名,判断是否在后缀白名单中,部分代码如下:

#判断是否静态连接
    @staticmethod
    def filter_static(url):
        suffix = Common.get_url_ext(url)
        if suffix.lower() in Config.STATIC_SUFFIXES:
            return True
        return False

    #获取url文件后缀
    @staticmethod
    def get_url_ext(url):
        try:
            path = urlparse.urlparse(url).path
            return os.path.splitext(path)[1]
        except:
            return False   

策略2:参数值为纯数字、纯字母、或者字母数字组合

#str.isdigit(): True 只包含数字
    if paramVal.isdigit():
        return True
    #str.isalpha():True 只包含字母
    if paramVal.isalpha():
        return True
    #str.isalnum():True 只包含字母或者数字
    if paramVal.isalnum():
        return True

策略2:参数值包含数字、字母、-、_

思路1:将字符串泛化:

  • [a-zA-Z]泛化为 A

  • [0-9]泛化为 N

  • [-_]泛化为 C

  • 其他字符泛化为 T

  • 字符串去重,判断去重后的结果中是否包含T

例如:test123
泛化后:AAAANNN
去重后:AN

思路2:词法分析

简单来说就是,定义5种合法状态,如下所示:


TK_STRING = 1
TK_INTEGER = 2
TK_UNDER = 3 # _
TK_STRAIGHT = 4 # –

然后扫描字符串,每个字符的前后字符都是合法状态,一旦出现非合法字符则标记为可疑,否则标白。

0×02. 遇到的问题

1.数据格式化

我们的日志都是使用Packetbeat抓取的,默认的格式比较复杂,其中有很多用不着的数据字段,故格式化后的 数据格式如下图:

{
    "@timestamp": "2019-06-14T15:04:40.864956011+08:00",
    "cap_ip": "",
    "cap_source": "D8:9D:67:13:EE:E2",
    "cap_timens": 168524000,
    "connectIP": "109.70.282.173",
    "dst": "123.126.104.7:80",
    "geo_city": "",
    "geo_country": "",
    "geo_sla": "0.000",
    "geo_slo": "0.000",
    "kafka_pid": 1,
    "raw_time": "2019-06-14T15:04:40+08:00",
    "realIP": "109.70.282.173",
    "realUrl": "/apiV2/profile/newsListAjax",
    "request.hc": 1,
    "request.host": "mp.xxxx.com",
    "request.method": "GET",
    "request.url": "/apiV2/profile/newsListAjax?xpt=cHBhZzkxNTMyNWUwZTJhMkBzb2h1LmNvbQ%3D%3D&pageNumber=2952&pageSize=25",
    "request.user-agent": "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)",
    "response.body": "{\"status\":1,\"msg\":\"error\"}",
    "response.code": 200,
    "response.content-length": "26",
    "response.content-type": "image/webp; charset=UTF-8",
    "src": "109.70.282.173:29819",
    "status.code": 1,
    "time": "2019-06-14 15:04:40",
    "unix_time": 1560495880
}

2.参数提取

GET请求相对简单些,废话不多说,代码如下:

def getQueryString(request):
    data = []
    try:
        url = request["request.url"]
        result = urlparse.urlparse(url)
        query = result.query
        #urlparse.parse_qsl解析url请求切割参数时,遇到';'会截断,导致获取的参数值缺失';'后面的内容
        if ";" in query:
            query = re.sub(r';', '@@@@', query)
        params = urlparse.parse_qsl(query, True)
        for k, v in params:
            if not v:
                continue
            #恢复分号
            if '@@@@' in v:
                v = re.sub(r'@@@@', ';', v)
            if paramValFilter(v):
                continue
            data.append(v)
    except Exception, e:
        print "parse query error:", e, request

POST请求参数格式比较复杂,我们这里基本都是JSON格式,而且大多为深度JSON,需要递归提取:

def getDeepJsonVal(data, result=[]):
    if isinstance(data, dict):
        for key, value in data.items():
            getDeepJsonVal(value, result)
    elif isinstance(data, list):
        for value in data:
            getDeepJsonVal(value)
    else:
        if isinstance(data, unicode) and  ('{' in data or '[' in data):
            try:
                getDeepJsonVal(simplejson.loads(data))
            except:
                if not paramValFilter(data):
                    result.append(data)
        else:
            if not paramValFilter(data):
                result.append(data)
    return result

3.参数值中包含中文,直接跳过,不做处理

def filterChinese(check_str): # 过滤中文
        for ch in check_str.decode('utf-8'):
            if u'\u4e00' <= ch <= u'\u9fff':
                return True
        return False

0×03.结果分析:

处理了10万条线上请求数据,标白的请求数为:44656,过滤掉了45%的请求,效果比较理想,甚至有点出乎我的意料,Very Nice!
代码传送门:https://github.com/skskevin/UrlDetect