最佳实践|RDS&POLARDB归档到X-Pack Spark计算


1

业务背景

部分RDS和POLARDB For MySQL的用户曾遇到如下场景:当一张表的数据达到几千万时,你查询一次所花的时间会变多。
这时候采取水平分表的策略,水平拆分是将同一个表的数据进行分块保存到不同的数据库中,这些数据库中的表结构完全相同。

本文将介绍如何把这些水平分表的表归档到

X-Pack Spark数仓

,做统一的大数据计算。

X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供

复杂分析、流式处理及入库、机器学习

的能力,从而更好的解决用户数据处理相关场景问题。


2

RDS & POLARDB

分表归档到X-Pack Spark步骤



一键关联POLARDB到Spark集群

一键关联主要是做好spark访问RDS & POLARDB的准备工作。



POLARDB表存储


在database ‘test1’中每5分钟生成一张表,这里假设为表 ‘test1’、’test2’、’test2’、… 

具体的建表语句如下:

*请左右滑动阅览

 CREATE TABLE `test1` ( `a` int(11) NOT NULL,
`b` time DEFAULT NULL,
`c` double DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8



归档到Spark的调试


x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试。
1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包。
*请左右滑动阅览

 wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar

2、创建交互式查询

以pyspark为例,下面是具体归档demo的代码:
*请左右滑动阅览

spark.sql("drop table sparktest").show()
# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
spark.sql("
CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
"
USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
# CREATE TABLE `test1` (
# `a` int(11) NOT NULL,
# `b` time DEFAULT NULL,
# `c` double DEFAULT NULL,
# PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4):
#构造polardb的表名
dbtable = "
test1." + "test" + str(num)
#spark外表关联polardb对应的表
externalPolarDBTableNow = spark.read \
.format("
jdbc") \
.option("
driver", "com.mysql.jdbc.Driver") \
.option("
url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \
.option("
dbtable", dbtable) \
.option("
user", "name") \
.option("
password", "xxx*") \
.load().registerTempTable("
polardbTableTemp")
#生成本次polardb表数据要写入的spark表的分区信息
(dtValue, hhValue, mmValue) = ("
20191015", "13", str(05 * num))
#执行导数据sql
spark.sql("
insert into sparktest partition(dt= %s ,hh= %s , mm=%s ) "
"
select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
#删除临时的spark映射polardb表的catalog
spark.catalog.dropTempView("
polardbTableTemp")
#查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
spark.sql("
show partitions sparktest").show(1000, False)
spark.sql("
select count(*) from sparktest").show()



归档作业上生产


交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以pyspark作业为例:


/polardb/polardbArchiving.py 内容如下:
*请左右滑动阅览

# -*- coding: UTF-8 -*-

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("PolardbArchiving") \
.enableHiveSupport() \
.getOrCreate()

spark.sql("drop table sparktest").show()
# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
"USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
# CREATE TABLE `test1` (
# `a` int(11) NOT NULL,
# `b` time DEFAULT NULL,
# `c` double DEFAULT NULL,
# PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4):
#构造polardb的表名
dbtable = "test1." + "test" + str(num)
#spark外表关联polardb对应的表
externalPolarDBTableNow = spark.read \
.format("jdbc") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("url", "jdbc:mysql://pc-.mysql.polardb.rds.aliyuncs.com:3306") \
.option("dbtable", dbtable) \
.option("user", "ma,e") \
.option("password", "xxx*") \
.load().registerTempTable("polardbTableTemp")
#生成本次polardb表数据要写入的spark表的分区信息
(dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
#执行导数据sql
spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s ) "
"select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
#删除临时的spark映射polardb表的catalog
spark.catalog.dropTempView("polardbTableTemp")
#查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
spark.sql("show partitions sparktest").show(1000, False)
spark.sql("select count(*) from sparktest").show()
spark.stop()



扫描下方 :arrow_down:二维码


了解关于


X-Pack Spark计算服务



的更多信息

双十一还不知道买什么?

阿里云数据库双11

爆款直降

这份购物清单 :arrow_down:给你拿去!  

不过【

阿里云


终于放出了双十一预热活动的推广。
价格也是非常惊人的优惠!

  • 1核2G:
    86/年,229/3年(折算1年76元)。

  • 2核4G:
    799/3年,一年只要266元。
    平时近万元!
    强推这款!
    这次我买的就是这款。

拼团地址:


http://suo.im/59kDxw


,如有需要,直接扫码也可以参与拼团。

没有套路,买就是划算



“爱码士”们赶紧买起来吧 !


点击





阅读原文” 


直达会场