Serverless 实战:3 分钟实现文本敏感词过滤
敏感词过滤是随着互联网社区一起发展起来的一种阻止网络犯罪和网络暴力的技术手段,通过对可能存在犯罪或网络暴力的关键词进行有针对性的筛查和屏蔽,能够防患于未然,将后果严重的犯罪行为扼杀于萌芽之中。
随着各种社交论坛的日益火爆,敏感词过滤逐渐成为了非常重要的功能。那么在 Serverless 架构下,利用 Python 语言,敏感词过滤又有那些新的实现呢?我们能否用最简单的方法实现一个敏感词过滤的 API 呢?
了解敏感过滤的几种方法
Replace 方法
敏感词过滤,其实在一定程度上是文本替换,以 Python 为例,我们可以通过 replace
来实现,首先准备一个敏感词库,然后通过 replace
进行敏感词替换:
复制代码
defworldFilter(keywords, text): foreveinkeywords: text = text.replace(eve,"***") returntext keywords = (" 关键词 1"," 关键词 2"," 关键词 3") content =" 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。" print(worldFilter(keywords, content))
这种方法虽然操作简单,但是存在一个很大的问题:在文本和敏感词汇非常庞大的情况下,会出现很严重的性能问题。
举个例子,我们先修改代码进行基本的性能测试:
复制代码
importtime defworldFilter(keywords, text): foreveinkeywords: text = text.replace(eve,"***") returntext keywords =[" 关键词 "+ str(i)foriinrange(0,10000)] content =" 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。"*1000 startTime = time.time() worldFilter(keywords, content) print(time.time()-startTime)
此时的输出结果是: 0.12426114082336426
,可以看到性能非常差。
正则表达方法
相较于 replace
,使用正则表达 re.sub
实现可能更加快速。
复制代码
importtime importre defworldFilter(keywords, text): returnre.sub("|".join(keywords),"***", text) keywords =[" 关键词 "+ str(i)foriinrange(0,10000)] content =" 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。"*1000 startTime = time.time() worldFilter(keywords, content) print(time.time()-startTime)
增加性能测试之后,我们按照上面的方法进行改造测试,输出结果是 0.24773502349853516
。
对比这两个例子,我们会发现当前两种方法的性能差距不是很大,但是随着文本数量的增加,正则表达的优势会逐渐凸显,性能提升明显。
DFA 过滤敏感词
相对来说,DFA 过滤敏感词的效率会更高一些,例如我们把坏人、坏孩子、坏蛋作为敏感词,那么它们的树关系可以这样表达:
而 DFA 字典是这样表示的:
复制代码
{ '坏': { '蛋': { '\x00':0 }, '人': { '\x00':0 }, '孩': { '子': { '\x00':0 } } } }
使用这种树表示问题最大的好处就是可以降低检索次数、提高检索效率。其基本代码实现如下:
复制代码
importtime classDFAFilter(object): def__init__(self): self.keyword_chains = {}# 关键词链表 self.delimit ='\x00'# 限定 defparse(self, path): withopen(path, encoding='utf-8')asf: forkeywordinf: chars = str(keyword).strip().lower()# 关键词英文变为小写 ifnotchars:# 如果关键词为空直接返回 return level = self.keyword_chains foriinrange(len(chars)): ifchars[i]inlevel: level = level[chars[i]] else: ifnotisinstance(level, dict): break forjinrange(i, len(chars)): level[chars[j]] = {} last_level, last_char = level, chars[j] level = level[chars[j]] last_level[last_char] = {self.delimit:0} break ifi == len(chars) -1: level[self.delimit] =0 deffilter(self, message, repl="*"): message = message.lower() ret = [] start =0 whilestart < len(message): level = self.keyword_chains step_ins =0 forcharinmessage[start:]: ifcharinlevel: step_ins +=1 ifself.delimitnotinlevel[char]: level = level[char] else: ret.append(repl * step_ins) start += step_ins -1 break else: ret.append(message[start]) break else: ret.append(message[start]) start +=1 return''.join(ret) gfw = DFAFilter() gfw.parse("./sensitive_words") content =" 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。"*1000 startTime = time.time() result = gfw.filter(content) print(time.time()-startTime)
这里的字典库是:
复制代码
withopen("./sensitive_words",'w')asf: f.write("\n".join( [" 关键词 "+ str(i)foriinrange(0,10000)]))
执行结果:
复制代码
0.06450581550598145
从中,我们可以看到性能又进一步得到了提升。
AC 自动机过滤敏感词算法
什么是 AC 自动机?简单来说,AC 自动机就是字典树 +kmp 算法 + 失配指针,一个常见的例子就是给出 n 个单词,再给出一段包含 m 个字符的文章,让你找出有多少个单词在文章里出现过。
代码实现:
复制代码
importtime classNode(object): def__init__(self): self.next = {} self.fail =None self.isWord =False self.word ="" classAcAutomation(object): def__init__(self): self.root = Node() # 查找敏感词函数 defsearch(self, content): p = self.root result = [] currentposition =0 whilecurrentposition < len(content): word = content[currentposition] whilewordinp.next ==Falseandp != self.root: p = p.fail ifwordinp.next: p = p.next[word] else: p = self.root ifp.isWord: result.append(p.word) p = self.root currentposition +=1 returnresult # 加载敏感词库函数 defparse(self, path): withopen(path, encoding='utf-8')asf: forkeywordinf: temp_root = self.root forcharinstr(keyword).strip(): ifcharnotintemp_root.next: temp_root.next[char] = Node() temp_root = temp_root.next[char] temp_root.isWord =True temp_root.word = str(keyword).strip() # 敏感词替换函数 defwordsFilter(self, text): """ :param ah: AC 自动机 :param text: 文本 :return: 过滤敏感词之后的文本 """ result = list(set(self.search(text))) forxinresult: m = text.replace(x,'*'* len(x)) text = m returntext acAutomation = AcAutomation() acAutomation.parse('./sensitive_words') startTime = time.time() print(acAutomation.wordsFilter(" 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。"*1000)) print(time.time()-startTime)
词库同样是:
复制代码
withopen("./sensitive_words",'w')asf: f.write("\n".join( [" 关键词 "+ str(i)foriinrange(0,10000)]))
使用上面的方法,测试结果为 0.017391204833984375
。
敏感词过滤方法小结
根据上文的测试对比,我们可以发现在所有算法中,DFA 过滤敏感词性能最高,但是在实际应用中,DFA 过滤和 AC 自动机过滤各自有自己的适用场景,可以根据具体业务来选择。
实现敏感词过滤 API
想要实现敏感词过滤 API,就需要将代码部署到 Serverless 架构上,选择 API 网关与函数计算进行结合。以 AC 自动机过滤敏感词算法为例:我们只需要增加是几行代码就好:
复制代码
# -*- coding:utf-8 -*- importjson, uuid classNode(object): def__init__(self): self.next = {} self.fail =None self.isWord =False self.word ="" classAcAutomation(object): def__init__(self): self.root = Node() # 查找敏感词函数 defsearch(self, content): p = self.root result = [] currentposition =0 whilecurrentposition < len(content): word = content[currentposition] whilewordinp.next ==Falseandp != self.root: p = p.fail ifwordinp.next: p = p.next[word] else: p = self.root ifp.isWord: result.append(p.word) p = self.root currentposition +=1 returnresult # 加载敏感词库函数 defparse(self, path): withopen(path, encoding='utf-8')asf: forkeywordinf: temp_root = self.root forcharinstr(keyword).strip(): ifcharnotintemp_root.next: temp_root.next[char] = Node() temp_root = temp_root.next[char] temp_root.isWord =True temp_root.word = str(keyword).strip() # 敏感词替换函数 defwordsFilter(self, text): """ :param ah: AC 自动机 :param text: 文本 :return: 过滤敏感词之后的文本 """ result = list(set(self.search(text))) forxinresult: m = text.replace(x,'*'* len(x)) text = m returntext defresponse(msg, error=False): return_data = { "uuid": str(uuid.uuid1()), "error": error, "message": msg } print(return_data) returnreturn_data acAutomation = AcAutomation() path ='./sensitive_words' acAutomation.parse(path) defmain_handler(event, context): try: sourceContent = json.loads(event["body"])["content"] returnresponse({ "sourceContent": sourceContent, "filtedContent": acAutomation.wordsFilter(sourceContent) }) exceptExceptionase: returnresponse(str(e),True)
最后,为了方便本地测试,我们可以再增加以下代码:
复制代码
deftest(): event = { "requestContext": { "serviceId":"service-f94sy04v", "path":"/test/{path}", "httpMethod":"POST", "requestId":"c6af9ac6-7b61-11e6-9a41-93e8deadbeef", "identity": { "secretId":"abdcdxxxxxxxsdfs" }, "sourceIp":"14.17.22.34", "stage":"release" }, "headers": { "Accept-Language":"en-US,en,cn", "Accept":"text/html,application/xml,application/json", "Host":"service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com", "User-Agent":"User Agent String" }, "body":"{\"content\":\" 这是一个测试的文本,我也就呵呵了\"}", "pathParameters": { "path":"value" }, "queryStringParameters": { "foo":"bar" }, "headerParameters": { "Refer":"10.0.2.14" }, "stageVariables": { "stage":"release" }, "path":"/test/value", "queryString": { "foo":"bar", "bob":"alice" }, "httpMethod":"POST" } print(main_handler(event,None)) if__name__ =="__main__": test()
完成之后,就可以进行测试运行,例如我的字典是:
复制代码
呵呵 测试
执行之后结果:
复制代码
{'uuid': '9961ae2a-5cfc-11ea-a7c2-acde48001122', 'error': False, 'message': {'sourceContent': '这是一个测试的文本,我也就呵呵了', 'filtedContent': '这是一个 ** 的文本,我也就 ** 了'}}
接下来,我们将代码部署到云端,新建 serverless.yaml
:
复制代码
sensitive_word_filtering: component:"@serverless/tencent-scf" inputs: name:sensitive_word_filtering codeUri:./ exclude: -.gitignore -.git/** -.serverless -.env handler:index.main_handler runtime:Python3.6 region:ap-beijing description:敏感词过滤 memorySize:64 timeout:2 events: -apigw: name:serverless parameters: environment:release endpoints: -path:/sensitive_word_filtering description:敏感词过滤 method:POST enableCORS:true param: -name:content position:BODY required:'FALSE' type:string desc:待过滤的句子
然后通过 sls --debug
进行部署,部署结果:
最后,通过 PostMan 进行测试:
总结
敏感词过滤是当前企业的普遍需求,通过敏感词过滤,我们可以在一定程度上遏制恶言恶语和违规言论的出现。在具体实现过程中,有两个方面需要额外主要:
-
敏感词库的获得问题:Github 上有很多敏感词库,其中包含了各种场景中的敏感词,大家可以自行搜索下载使用;
-
API 使用场景的问题:我们可以将这个 API 放置在社区跟帖系统、留言评论系统或者是博客发布系统中,这样可以防止出现敏感词汇,减少不必要的麻烦。
作者介绍:
刘宇,腾讯 Serverless 团队后台研发工程师。毕业于浙江大学,硕士研究生学历,曾在滴滴出行、腾讯科技做产品经理,本科开始有自主创业经历,是 Anycodes 在线编程的负责人(该软件累计下载量超 100 万次)。目前投身于 Serverless 架构研发,著书《Serverless 架构:从原理、设计到项目实战》,参与开发和维护多个 Serverless 组件,是活跃的 Serverless Framework 的贡献者,也曾多次公开演讲和分享 Serverless 相关技术与经验,致力于 Serverless 的落地与项目上云。