# 500行SQL快速实现UCF

## 写在前面话

UCF通常是User-base Collaborative Filter的简写;大体的算法思路是根据用户行为计算相似群体(邻居)，为用户推荐其邻居喜好的内容；感觉是不是很简单、那废话不多说先撸个SQL。

## SQL

select uid1,uid2,sim
from (
select uid1
,uid2
,cnt12 / sqrt(cnt1*cnt2) sim
,row_number() over(partition by uid1 order by cnt12 / sqrt(cnt1*cnt2) desc) sim_rn
from (
select a.uid uid1
,b.uid uid2
,count(a.iid) cnt12
from tb_behavior a
join tb_behavior b
on a.iid = b.iid
where a.uid  b.uid
group by a.uid,b.uid
) a12
join (select uid,count(iid) cnt1 from tb_behavior group by uid) a1
on a12.uid1 = a1.uid
join (select uid,count(iid) cnt2 from tb_behavior group by uid) a2
on a12.uid1 = a2.uid
) tb_neighbour
where sim > 0.1 and sim_rn <= 30

select uid1,iid
from (
select uid1
,iid
,max(sim) score
,row_number() over(partition by uid1 order by max(sim) desc) user_rn
from tb_neighbour a12
join (select uid,iid from tb_behavior) a2
on a12.uid2 = a2.uid
join (select uid,collect_set(iid) iids1 from tb_behavior group by uid) a1
on a12.uid1 = a1.uid
where not array_contaions(iids1,a2.iid)
group by uid1,iid
) tb_rec
where user_rn <= 500

## 思考

• 1.join引起的数据倾斜问题：tb_neighbour表很大，往往热点物品会占据80%的曝光和消费记录，如何解决？
• 2.增量更新问题：上面的框架，tb_behavior表每次都是全量计算，是否能改造成增量更新邻居表和推荐结果，并减少计算时间呢？

## join引起的数据倾斜问题

with tb_behavior_sample as (
select uid,iid
from (
select uid
,iid
,row_number() over(partition by iid order by rand()) feed_rn
from tb_behavior
) bh
where feed_rn <= 50000
)

select uid1,uid2,sim
from (
select uid1
,uid2
,cnt12 / sqrt(cnt1*cnt2) sim
,row_number() over(partition by uid1 order by cnt12 / sqrt(cnt1*cnt2) desc) sim_rn
from (
select a.uid uid1
,b.uid uid2
,count(a.iid) cnt12
from tb_behavior_sample a
join tb_behavior_sample b
on a.iid = b.iid
where a.uid  b.uid
group by a.uid,b.uid
) a12
join (select uid,count(iid) cnt1 from tb_behavior group by uid) a1
on a12.uid1 = a1.uid
join (select uid,count(iid) cnt2 from tb_behavior group by uid) a2
on a12.uid1 = a2.uid
) tb_neighbour
where sim > 0.1 and sim_rn <= 30

## 增量更新问题

with tb_behavior_sample_incr as (
select uid,iid
from (
select uid
,iid
,row_number() over(partition by iid order by rand()) feed_rn
from tb_behavior_incr
) bh
where feed_rn <= 50000
)

insert overwrite table tb_neighbour
select uid1,uid2,sim
from (
select uid1
,uid2
,sum(cnt12) / sqrt(sum(cnt1)*sum(cnt2)) sim
,row_number() over(partition by uid1 order by sum(cnt12) / sqrt(sum(cnt1)*sum(cnt2)) desc) sim_rn
from (
select uid1,uid2,cnt12,cnt1,cnt2
from tb_neighbour
union all
select a.uid uid1
,b.uid uid2
,count(a.iid) cnt12
,cnt1
,cnt2
from tb_behavior_sample_incr a
join tb_behavior_sample_incr b
on a.iid = b.iid
where a.uid  b.uid
group by a.uid,b.uid
) a12
join (select uid,count(iid) cnt1 from tb_behavior_incr group by uid) a1
on a12.uid1 = a1.uid
join (select uid,count(iid) cnt2 from tb_behavior_incr group by uid) a2
on a12.uid1 = a2.uid
group by uid1,uid2
) tb_neighbour
where sim > 0.1 and sim_rn <= 30

kappa架构初次计算即是增量，不断累积每次增量的结果更新tb_neighbour；相当于lambda初始全量计算的一种回放，直至追到最新的时间分区。

insert overwrite table tb_user_consume
select uid,substring_index(concat_ws(",",collect_list(iids1)),",",10000) iids1
from (
select uid,concat_ws(",",collect_set(cast(iid as string))) iids1
from tb_behavior_incr
union all
select uid,iids1
from tb_user_consume
) a
group by uid

select uid1,iid
from (
select uid1
,iid
,max(sim) score
,row_number() over(partition by uid1 order by max(sim) desc) user_rn
from tb_neighbour a12
join (select uid,cast(iid as string) iid from tb_behavior_incr) a2
on a12.uid2 = a2.uid
join (select uid,split(iids1,",") iids1 from tb_user_consume) a1
on a12.uid1 = a1.uid
where not array_contaions(iids1,a2.iid)
group by uid1,iid
) tb_rec
where user_rn <= 500