订阅
纠错
加入自媒体

基于Spark的数据分析实践

2019-06-19 09:55
EAWorld
关注

三、SparkSQL

Spark 从 1.3 版本开始原有 SchemaRDD 的基础上提供了类似Pandas DataFrame API。新的DataFrame API不仅可以大幅度降低普通开发者的学习门槛,同时还支持Scala、Java与Python三种语言。更重要的是,由于脱胎自SchemaRDD,DataFrame天然适用于分布式大数据场景。

一般的数据处理步骤:读入数据 -> 对数据进行处理 -> 分析结果  -> 写入结果

SparkSQL 结构化数据

处理结构化数据(如 CSV,JSON,Parquet 等);

把已经结构化数据抽象成 DataFrame (HiveTable);

非结构化数据通过 RDD.map.filter 转换成结构化进行处理;

按照列式数据库,只加载非结构化中可结构化的部分列(Hbase,MongoDB);

处理非结构化数据,不能简单的用 DataFrame 装载。而是要用 SparkRDD 把数据读入,在通过一系列的 Transformer Method 把非结构化的数据加工为结构化,或者过滤到不合法的数据。

SparkSQL DataFrame

SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。如果熟悉 Python Pandas 库中的 DataFrame 结构,则会对 SparkSQL DataFrame 概念非常熟悉。

TextFile DataFrame

import.org.apache.spark.sql._//定义数据的列名称和类型valdt=StructType(List(id:String,name:String,gender:String,age:Int))
//导入user_info.csv文件并指定分隔符vallines = sc.textFile("/path/user_info.csv").map(_.split(","))
//将表结构和数据关联起来,把读入的数据user.csv映射成行,构成数据集valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))
//通过SparkSession.createDataFrame()创建表,并且数据表表头val df= spark.createDataFrame(rowRDD, dt)

可左右滑动查看代码

读取规则数据文件作为DataFrame

SparkSession.Builder builder = SparkSession.builder()Builder.setMaster("local").setAppName("TestSparkSQLApp")SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();
# 读取 JSON 数据,path 可为文件或者目录valdf=sqlContext.read().json(path);
# 读取 HadoopParquet 文件vardf=sqlContext.read().parquet(path);
# 读取 HadoopORC 文件vardf=sqlContext.read().orc(path);

可左右滑动查看代码

JSON 文件为每行一个 JSON 对象的文件类型,行尾无须逗号。文件头也无须[]指定为数组;SparkSQL 读取是只是按照每行一条 JSON Record序列化;

Parquet文件

Configurationconfig = new Configuration();ParquetFileReaderreader = ParquetFileReader.open(        HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf));Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData();String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata");

可左右滑动查看代码

allFiedls 的值就是各字段的名称和具体的类型,整体是一个json格式进行展示。

读取 Hive 表作为 DataFrame

Spark2 API 推荐通过 SparkSession.Builder 的 Builder 模式创建 SparkContext。 Builder.getOrCreate() 用于创建 SparkSession,SparkSession 是 SparkContext 的封装。

在Spark1.6中有两个核心组件SQLcontext和HiveContext。SQLContext 用于处理在 SparkSQL 中动态注册的表,HiveContext 用于处理 Hive 中的表。

从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可执行 Hive 中的表,也可执行内部注册的表;

在需要执行 Hive 表时,只需要在 SparkSession.Builder 中开启 Hive 支持即可(enableHiveSupport())。

SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();

可左右滑动查看代码

// db 指 Hive 库中的数据库名,如果不写默认为 default

// tableName 指 hive 库的数据表名

sqlContext.sql(“select * from db.tableName”)

可左右滑动查看代码

SparkSQL ThriftServer

//首先打开 Hive 的 Metastore服务

hive$bin/hive –-service metastore –p 8093

可左右滑动查看代码

//把 Spark 的相关 jar 上传到hadoophdfs指定目录,用于指定sparkonyarn的依赖 jar

spark$hadoop fs –put jars/*.jar /lib/spark2

可左右滑动查看代码

// 启动 spark thriftserver 服务

spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar

可左右滑动查看代码

当hdfs 上传了spark 依赖 jar 时,通过spark.yarn.jars 可看到日志 spark 无须每个job 都上传jar,可节省启动时间

19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar

可左右滑动查看代码

//通过 spark bin 下的 beeline 工具,可以连接到 spark ThriftServer(SparkOnHive)

bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop

可左右滑动查看代码

-u 是指定 beeline 的执行驱动地址;

-n 是指定登陆到 spark Session 上的用户名称;

Beeline 还支持传入-e 可传入一行 SQL,

-e <query>                      query that should be executed

也可通过 –f 指定一个 SQL File,内部可用逗号分隔的多个 SQL(存储过程)

-f <exec file>                  script file that should be executed

SparkSQL Beeline 的执行效果展示

SparkSQL ThriftServer

对于 SparkSQL ThriftServer 服务,每个登陆的用户都有创建的 SparkSession,并且执行的对个 SQL 会通过时间顺序列表展示。

SparkSQL ThriftServer 服务可用于其他支持的数据库工具创建查询,也用于第三方的 BI 工具,如 tableau。

四、SparkSQL Flow

SparkSQL Flow 是以 SparkSQL 为基础,开发的统一的基于 XML 配置化的可执行一连串的 SQL 操作,这一连串的 SQL 操作定义为一个 Flow。下文开始 SparkSQL Flow 的介绍:

SparkSQL Flow 是基于 SparkSQL 开发的一种基于 XML 配置化的 SQL 数据流转处理模型。该模型简化了 SparkSQL 、Spark RDD的开发,并且降低开发了难度,适合了解数据业务但无法驾驭大数据以及 Spark 技术的开发者。

一个由普元技术部提供的基于 SparkSQL 的开发模型;

一个可二次定制开发的大数据开发框架,提供了灵活的可扩展 API;

一个提供了 对文件,数据库,NoSQL 等统一的数据开发视界语义;

基于 SQL 的开发语言和 XML 的模板配置,支持 Spark UDF 的扩展管理;

支持基于 Spark Standlone,Yarn,Mesos 资源管理平台;

支持开源、华为、星环等平台统一认证。

SparkSQL Flow 适合的场景:

批量 ETL;

非实时分析服务;

SparkSQL Flow XML 概览

Properties 内定义一组变量,可用于宏替换;

Methods 内可注册 udf 和 udaf 两种函数;

Prepare 内可定义前置 SQL,用于执行 source 前的 sql 操作;

Sources 内定义一个到多个数据表视图;

Transformer 内可定义 0 到多个基于 SQL 的数据转换操作(支持 join);

Targets 用于定义 1 到多个数据输出;

After 可定义 0到多个任务日志;

如你所见,source 的 type 参数用于区分 source 的类型,source 支持的种类直接决定SparkSQL Flow 的数据源加载广度;并且,根据 type 不同,source 也需要配置不同的参数,如数据库还需要 driver,url,user和 password 参数。

Transformer 是基于 source 定的数据视图可执行的一组转换 SQL,该 SQL 符合 SparkSQL 的语法(SQL99)。Transform 的 SQL 的执行结果被作为中间表命名为 table_name 指定的值。

Targets 为定义输出,table_name 的值需在 source 或者 Transformer 中定义。

<上一页  1  2  3  4  下一页>  
声明: 本文由入驻维科号的作者撰写,观点仅代表作者本人,不代表OFweek立场。如有侵权或其他问题,请联系举报。

发表评论

0条评论,0人参与

请输入评论内容...

请输入评论/评论长度6~500个字

您提交的评论过于频繁,请输入验证码继续

暂无评论

暂无评论

人工智能 猎头职位 更多
扫码关注公众号
OFweek人工智能网
获取更多精彩内容
文章纠错
x
*文字标题:
*纠错内容:
联系邮箱:
*验 证 码:

粤公网安备 44030502002758号