程序中的Hive具体是干什么用的呢?

一般需要设置为 nonstrict

Hive是基于Hadoop平台的数仓工具,具有海量数据存储、水平可扩展、离线批量处理的优点,解决了传统关系型数仓不能支持海量数据存储、水平可扩展性等问题,但是由于Hive数据存储和数据处理是依赖于HDFS和MapReduce,因此在Hive进行数据离线批量处理时,需将查询语言先转换成MR任务,由MR批量处理返回结果,所以Hive没法满足数据实时查询分析的需求。

hivesql优化 hive优化实战hivesql优化 hive优化实战


hivesql优化 hive优化实战


hivesql优化 hive优化实战


Hive是由FaceBook研发并开源,当时FaceBook使用Oracle作为数仓,由于数据量越来越大,Oracle数仓性能越来越,没法实现海量数据的离线批量分析,因此基于Hadoop研发Hive,并开源给Apacha。

Pig与HIVE工具类似,都可以用类sql语言对数据进行处理。但是他们应用场景有区别,Pig用于数据仓库数据的ETL,HIVE用于数仓数据分析。

从架构图当中,可看出Hive并没有完成数据的存储和处理,它是由HDFS完成数据存储,MR完成数据处理,其只是提供了用户查询语言的能力。Hive支持类sql语言,这种SQL称为Hivesql。用户可用Hivesql语言查询,其驱动可将Hivesql语言转换成MR任务,完成数据处理。

【Hive的访问接口】

CLI:是hive提供的命令行工具

HWI:是Hive的web访问接口

JDBC/ODBC:是两种的标准的应用程序编程访问接口

Thrift :提供异构语言,进行远程RPC调用Hive的能力。

因此Hiv具备丰富的访问接口能力,几乎能满足各种但是执行开发应用场景需求。

【Driver】

是HIVE比较核心的驱动模块,包含编译器、优化器、执行器,职责为把用户输入的Hivesql转换成MR数据处理任务

【Metastore】

是HIVE的元数据存储模块,数据的访问和查找,必须要先访问元数据。Hive中的元数据一般使用单独的关系型数据库存储,常用的是Mysql,为了确保高可用,Mysql元数据库还需主备部署。

架构图上面Karmasphere、Hue、Qubole也是访问HIVE的工具,其中Qubole可远程访问HIVE,相当于HIVE作为一种公有云服务,用户可通过互联网访问Hive服务。

Hive在使用过程中出现了一些不稳定问题,由此发展出了Hive HA机制,

使用Hive SQL插入动态分区的Parquet表OOM异常分析

是否开启动态分区功能,默认 false 关闭。

1.异常描述

当运行“INSERT ... SELECT”语句向 Parquet 或者 ORC 格式的表中插入数据时,如果启用了动态分区,你可能会碰到以下错误,而导致作业无常执行。

Hive 客户端:

(可左右滑动)

YARN 的 8088 中查看具体 map task 报错:

(可左右滑动)

2.异常分析

Parquet 和 ORC 是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行 INSERT 语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致 mappers 或 reducers 的 OOM,具体取决于打开的文件写入器(file writer)的数量。

通过 INSERT 语句插入数据到动态分区表中,也可能会超过 HDFS 同时打开文件数的限制。

如果没有 join 或聚合,INSERT ... SELECT 语句会被转换为只有 map 任务的作业。mapper 任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个 mapper 必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper 在运行时所需的内存量随着它遇到的分区数量的增加而增加。

3.异常重现与解决

3.1.生成动态分区的几个参数说明

hive.exec.dynamic.partition

默认值:false

使用动态分区时候,该参数必须设置成 true;

hive.exec.dynamic.partition.mode

默认值:strict

动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。

hive.exec.max.dynamic.partitions.pernode

默认值:100

在每个执行 MR 的上,可以创建多少个动态分区。

该参数需要根据实际的数据来设定。

比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。

hive.exec.max.dynamic.partitions

默认值:1000

在所有执行 MR 的上,一共可以创建多少个动态分区。

同上参数解释。

hive.exec.max.created.files

默认值:100000 {VALUE._col3} {VALUE._col4}0

整个 MR Job 中,可以创建多少个 HDFS 文件。 任务的物理内存分配值,常见设置为 1GB,2GB,4GB 等。

mapreduce.map.ja.opts

map 任务的 Ja 堆栈大小设置,一般设置为小于等于上面那个值的 75%,这样可以保证 map 任务有足够的堆栈外内存空间。

mapreduce.input.fileinputformat.split.maxsize

mapreduce.input.fileinputformat.split.minsize

这个两个参数联合起来用,主要是为了方便控制 mapreduce 的 map 数量。比如我设置为 1073741824,就是为了让每个 map 处理 1GB 的文件。

3.2.一个例子

Fayson 在前两天给人调一个使用 Hive SQL 插入动态分区的 Parquet 表时,总是报错 OOM,也是折腾了很久。以下我们来看看整个过程。

1.首先我们看看执行脚本的内容,基本其实就是使用 Hive 的 insert 语句将文本数据表插入到另外一张 parquet 表中,当然使用了动态分区。

2.我们看看原始数据文件,是文本文件,一共 120 个,每个 30GB 大小,总共不多 3.6TB。

3.我们看看报错

4.因为是一个只有 map 的 mapreduce 任务,当我们从 YARN 的 8088 观察这个作业时可以发现,基本没有一个 map 能够执行成功,全部都是失败的。报上面的错误。

5.把 从 2GB 增大到 4GB,8GB,16GB,相应 mapreduce.map.ja.opts 增大到 3GB,6GB,12GB。依旧报错 OOM。

6.后面又将 mapreduce.input.fileinputformat.split.maxsize 从 1GB,减少为 512MB,256MB,从而增大 map 数量,缩小单个 map 处理文件的大小。依旧报错 OOM。

7.启用 hive.optimize.sort.dynamic.partition,增加 reduce 过程,作业执行成功。

8.查看结果文件大约 1.2TB,约为输入文件的三分之一。一共 1557 个分区,的分区文件为 2GB。

4.异常总结

对于这个异常,我们建议有以下三种方式来处理:

1.启用 hive.optimize.sort.dynamic.partition,将其设置为 true。通过这个优化,这个只有 map 任务的 mapreduce 会引入 reduce 过程,这样动态分区的那个字段比如日期在传到 reducer 时会被排序。由于分区字段是排序的,因此每个 reducer 只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写 parquet 文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。

2.第二种方式就是增加每个 mapper 的内存分配,即增大 和 mapreduce.map.ja.opts,这样所有文件写入器(filewriter)缓冲区对应的内存会更充沛。

3.将查询分解为几个较小的查询,以减少每个查询创建的分区数量。这样可以让每个 mapper 打开较少的文件写入器(file writer)。

备注:

默认情况下,Hive 为每个打开的 Parquet 文件缓冲区(file buffer)分配 128MB。这个 buffer 大小由参数 parquet.block.size 控制。为获得性能,parquet 的 buffer size 需要与 HDFS 的 block size 保持对齐(比如相等),从而使每个 parquet 文件在单个 HDFS 的块中,以便每个 I/O 请求都可以读取整个数据文件,而无需通过网络传输访问后续的 block。

参考:

spark SQL和hive到底什么关系

而整个结果数据的产生只需要4分钟左右的时间,比如以下方式:将结果以textfile存入hdfs:

Spark SQL解决了这两个问题。

Map Reduce

,Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe。也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。执行生成和优化都由Catalyst负责。借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行优化策略比Hive要简洁得多。去年Spark summit上Catalyst的作者Michael Arust对Catalyst做了一个简要介绍:2013 | Spark Summit。

spark SQL和hive到底什么关系

Spark SQL解决了这两个问题。

,Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe。也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。执行生成和优化都由Catalyst负责。借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行优化策略比Hive要简洁得多。去年Spark summit上Catalyst的作者Michael Arust对Catalyst做了一个简要介绍:2013 | Spark Summit。

历史上存在的原理,以前都是使用hive来构建数据仓库,所以存在大量对hive所管理的数据查询的需求。而hive、shark、sparlSQL都可以进行hive的数据查询。shark是使用了hive的sql语法解析器和优化器,修改了执行器,使之物理执行过程是跑在spark上;而sparkSQL是使用了自身的语法解析器、优化器和执行器,同时sparkSQL还扩展了接口,不单单支持hive数据的查询,可以进行多种数据源的数据查询。

历史上存在的原理,以前都是使用hive来构建数据仓库,所以存在大量对hive所管理的数据查询的需求。而hive、shark、sparlSQL都可以进行hive的数据查询。shark是使用了hive的sql语法解析器和优化器,修改了执行器,使之物理执行过程是跑在spark上;而sparkSQL是使用了自身的语法解析器、优化器和执行器,同时sparkSQL还扩展了接口,不单单支持hive数据的查询,可以进行多种数据源的数据查询。

hive中有数据 select count()显示为0的问题

input format: org.apache.hadoop.mapred.TextInputFormat

检查表的时候

type: int

sql语句:

显示为

sql语句:

显示为

后面加上限制1条可以正常查询。

执行下语句后,正常查询可以正常显示。

以上设置原理为

这是hive中的一个优化参数导致的,对于一些使用频率可能很高的sql会进行查询优化,会将这个参数[hive.query.using.stats]设置为true(默认是false),这样的话,Hive在执行某些查询时,例如select count(1),只利用元数据存储中保存的状态信息返回结果,从而提高了响应速度

如何生成mapreduce任务来处理这条sql

type: bigint

Hive中在做多表关联时,由于Hive的SQL优化引擎还不够强大,表的关联顺序不同往往导致产生不同数量的MapReduce作业数。这时就需要通过分析执行对SQL进行调整,以获得最少的MapReduce作业数。举一个例子(案例基于Hive 0.6.0):

create table ljn1(

k1 bigint,

k2 String,

v1 int

)condition expressions:;

create table ljn2(

k1 bigint,

v2 int

);

create table ljn3(

k1 bigint,

v3 int

);

create table ljn4(

k1 bigint,

v4 int

);

create table ljn5(

k1 bigint,

v5 int

);

create table ljn6(

k2 string,

v6 int

);

然后看一下下面这个SQL的执行:

explain

select a.v1

from

ljn1 a

left outer join ljn2 b on (a.k1 = b.k1)

left outer join ljn3 c on (a.k1 = c.k1)

left outer join ljn4 d on (a.k1 = d.k1)

left outer join ljn6 e on (a.k2 = e.k2)

left outer join ljn5 f on (a.k1 = f.k1);

STAGE DEPENDENCIES:

Stage-5 is a root stage

Stage-1 depends on stages: Stage-5

Stage-2 depends on stages: Stage-1

Stage-0 is a root stage

STAGE PLANS:

Stage: Stage-5

Alias -> Map Operator Tree:

aTableScan

alias: a

key expressions:

Map-reduce partition columns:

tag: 0

value expressions:

expr: k2

type: string

expr: v1

alias: b

key expressions:

Map-reduce partition columns:

tag: 1

Join Operator

condition map:

Left Outer Join0 to 1

0 {VALUE._col0} {VALUE._col1} {VALUE._col2}

1handleSkewJoin: false

outputColumnNames: _col0, _col1, _col2

File Output Operator

compressed: true

GlobalTableId: 0

table:

input format: org.apache.hadoop.mapred.SequenceFileInputFormat

output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

Stage: Stage-1

Alias -> Map Operator Tree:

$INTNAME

key expressions:

Map-reduce partition columns:

tag: 0

value expressions:

expr: _col1

type: string

expr: _col2

cTableScan

alias: c

key expressions:

Map-reduce partition columns:

tag: 1

dTableScan

alias: d

key expressions:

Map-reduce partition columns:

tag: 2

fTableScan

alias: f

key expressions:

Map-reduce partition columns:

tag: 3

Join Operator

condition map:

Left Outer Join0 to 1

Left Outer Join0 to 2

Left Outer Join0 to 3

12

3handleSkewJoin: false

outputColumnNames: _col3, _col4

File Output Operator

compressed: true

GlobalTableId: 0

table:

input format: org.apache.hadoop.mapred.SequenceFileInputFormat

output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

Stage: Stage-2

Alias -> Map Operator Tree:

$INTNAME

key expressions:

expr: _col3

type: string

Map-reduce partition columns:

expr: _col3

type: string

tag: 0

value expressions:

expr: _col4

eTableScan

alias: e

key expressions:

expr: k2

type: string

Map-reduce partition columns:

expr: k2

type: string

tag: 1

Join Operator

condition map:

Left Outer Join0 to 1

0 {VALUE._col10}

1handleSkewJoin: false

outputColumnNames: _col10

Select Operator

expressions:

expr: _col10

outputColumnNames: _col0

File Output Operator

compressed: true

GlobalTableId: 0

table:

output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

Stage: Stage-0

Fetch Operator

limit: -1

常规来讲,这个SQL非常简单,a表是主表,与其他表左外关联用到了k1和k2两个关联键,使用两个MapReduce作业完全可以搞定。但是这个SQL的执行却给出了3个作业:(Stage-0用做数据的最终展示,该作业可以忽略不计)第1个作业(Stage-5)是a表与b表关联;第2个作业(Stage-1)是第1个作业的中间结果再与c、d、f三表关联;第3个作业(Stage-2)是第2个作业的中间结果再与e表关联。

有点搞不懂了吧,第1和第2个作业明明可以合并在一起来完成的呀!其实我也搞不懂,从执行中看不出原由。而且如果这个SQL去掉c或者e其中的一个关联表,第1和第2个作业就可以合并在一起!很奇妙,我没有深入探究,应该是Hive的规则优化器还不够完美。

总之,遇到这种多表关联的情况一定要记得看一下执行,看看Hive是不是生成了多余的作业。如果Hive真的犯傻生成了多余的作业,就要尝试改变一下SQL的写法。通常是将关联键相同的表放在一起,如果还不行就再引入子查询。例如上面这个例子改为如下SQL就可以只生成2个作业了:

explain

select t.v1

from

(select a.k2,a.v1

from

ljn1 a

left outer join ljn2 b on (a.k1 = b.k1)

left outer join ljn3 c on (a.k1 = c.k1)

left outer join ljn4 d on (a.k1 = d.k1)

left outer join ljn5 f on (a.k1 = f.k1)

) t

left outer join ljn6 e on (t.k2 = e.k2)

;STAGE DEPENDENCIES:

Stage-1 is a root stage

Stage-2 depends on stages: Stage-1

Stage-0 is a root stage

STAGE PLANS:

Stage: Stage-1

Alias -> Map Operator Tree:

t:a

TableScan

alias: a

key expressions:

Map-reduce partition columns:

tag: 0

value expressions:

expr: k2

type: string

expr: v1

t:b

TableScan

alias: b

key expressions:

Map-reduce partition columns:

tag: 1

t:c

TableScan

alias: c

key expressions:

Map-reduce partition columns:

tag: 2

t:d

TableScan

alias: d

key expressions:

Map-reduce partition columns:

tag: 3

t:f

TableScan

alias: f

key expressions:

Map-reduce partition columns:

tag: 4

Join Operator

condition map:

Left Outer Join0 to 1

Left Outer Join0 to 2

Left Outer Join0 to 3

Left Outer Join0 to 4

0 {VALUE._col1} {VALUE._col2}

12

handleSkewJoin: false

outputColumnNames: _col1, _col2

Select Operator

expressions:

expr: _col1

type: string

expr: _col2

outputColumnNames: _col0, _col1

File Output Operator

compressed: true

GlobalTableId: 0

table:

input format: org.apache.hadoop.mapred.SequenceFileInputFormat

output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

Stage: Stage-2

Alias -> Map Operator Tree:

$INTNAME

key expressions:

type: string

Map-reduce partition columns:

type: string

tag: 0

value expressions:

expr: _col1

eTableScan

alias: e

key expressions:

expr: k2

如何查看hive引擎 hive.execution.engine

expr: _col0

Hive中在做多表关联时,由于Hive的SQL优化引擎还不够强大,表的关联顺序不同往往导致产生不同数量的MapReduce作业数。这时就需要通过分析执行对SQL进行调整,以获得最少的MapReduce作业数。

进行sort merge bucket map join时,需要设置的属性为:

【Hive】Hive Join 介绍

保存后将文件到集群所有

[TOC]

Hive 中的 Join 只支持等值 Join,也就是说 Join on 中的 on 里面表之间连接条件只能是 = ,不能是 <,> 等符号。此外,on中的等值连接之间只能是 and,不能是or。

Hive 执行引擎会将 HQL “翻译” 成为map-reduce 任务,在执行表的 Join 作时,如果多个表中每个表都使用同一个列进行连接(出现在 Join on 子句中),则只会生成一个 MR Job:

三个表 a、b、c 都分别使用了同一个字段进行连接,亦即同一个字段同时出现在两个 Join 子句中,从而只生成一个 MR Job。

如果多表中,其中存在一个表使用了至少 2 个字段进行连接(同一个表的至少2个列出现在 Join 子句中),则会至少生成 2 个MR Job:

三个表基于 2 个字段进行连接,这两个字段 b.key1 和 b.key2 同时出现在 b 表中。连接的过程是这样的:首先 a 和 b 表基于a.key 和 b.key1 进行连接,对应着个 MR Job;表 a 和 b 连接的结果,再和 c 进行连接,对应着第二个 MR Job。

这是因为 Map 输出时候以 Join on 条件中的列为 key,如果 Join 有多个关联键,则以这些关联键的组合作为 key,Map 根据 Key 分发数据给 Reduce 端,具体的 Join 是在 Reduce 作中完成,因此,如果多表基于不同的列做 Join,则无法在一轮 MR 任务中将所有相关数据 shuffle 到同一个 Reduce 作中。

Hive 支持常用的 SQL Join 语句,例如内连接、左外连接、右外连接以及 Hive 的 map 端连接。其中 map 端连接是用于优化 Hive 连接查询的一个重要技巧。

先准备三张表。

employee员工表:

dept部门表一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于 100000,可根据实际情况加以调整。:

薪水表:

多张表进行内连接作时,只有所有表中与 on 条件中相匹配的数据才会显示,类似取交集。

JOIN 作符 左边表中符合 where 条件的所有记录都会被保留 ,JOIN 作符右边表中如果没有符合 on 后面连接条件的记录,则从右边表中选出的列为NULL,如果没有 where 条件,则左边表中的记录都被保留。

对于大量的数据,在编写 SQL 时尽量用 where 条件过滤掉不符合条件的数据是有益的。但是对于左外连接和右外连接, where 条件是在 on 条件执行之后才会执行,on 条件会产生一个临时表,where 条件是对这个临时表进行过滤 。

因此为了优化 Hive SQL 执行的效率, 在需要使用外连接的场景,如果是要条件查询后才连接应该把查询件放置于 on 后,如果是想再连接完毕后才筛选就应把条件放置于 where 后面,对主表的筛选要用 where 条件 。

特别要注意的是,如果是需要对主表过滤之后再和从表做左关联,将主表写成子查询的形式,可以减少主表的数据量 :

RIGHT OUTER JOIN,与 LEFT OUTER JOIN 相对, JOIN 作符右边表中符合where 条件的所有记录都会被保留 ,JOIN 作符左边表中如果没有符合 on 后面连接条件的记录,则从左边表中选出的列为 NULL。

保留满足 where 条件的两个表的数据,类似并集,没有符合连接条件的字段使用 NULL 填充。

以 LEFT SEMI JOIN 关键字前面的表为主表,返回主表的 KEY 也在副表中的记录。在早期的 Hive 版本中,不支持标准 SQL 中的 IN 或 EXISTS 的功能,可以使用LEFT SEMI JOIN 实现类似的功能。

需要强调的是:

笛卡尔积是一种连接,表示左边表的行数乘以右边表的行数。

Hive中的 Join 可分为 Common Join(Reduce 阶段完成 join)和 Map Join(Map 阶段完成 join)。

如果不指定 Map Join 或者不符合 Map Join 的条件,那么 Hive 解析器会默认把执行 Common Join,即在 Reduce 阶段完成 join。整个过程包含 Map、Shuffle、Reduce 阶段。

以下面 HQL 为例,图解其过程:

Hive 0.7 之前,需要使用 hint 提示 / + mapjoin(table) / 才会执行Map Join,否则执行 Common Join,但在 0.7 版本之后,默认自动会转换 Map Join,由参数hive.auto.convert.join 来控制,默认为 true。

如上图中的流程, 首先Task A 在客户端本地执行,负责扫描小表 b 的数据,将其转换成一个 HashTable 的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache 中。

接下来是 Task B,该任务是一个没有 Reduce 的 MR,启动 MapTasks 扫描大表 a,在 Map 阶段,根据 a 的每一条记录去和 DistributeCache 中 b 表对应的 HashTable 关联,并直接输出结果。

由于 MapJoin 没有 Reduce,所以由 Map 直接输出结果文件,有多少个 Map Task,就有多少个结果文件。

Map Join 效率比 Common Join 效率好,但总会有“小表”条件不满足的时候。这就需要 bucket map join 了。

Bucket map join 需要待连接的两个表在连接字段上进行分桶(每个分桶对应hdfs上的一个文件),而且小表的桶数需要时大表桶数的倍数。

建立分桶表的例子:

这样,my_user 表就对应 32 个桶,数据根据 uid 的 hash value 与32 取余,然后被分发导不同的桶中。

如果两个表在连接字段上分桶,则可以执行 bucket map join 了,具体的:

对于 bucket map join 中的两个表,如果每个桶内分区字段也是有序的,则还可以进行 sort merge bucket map join。

建表语句为:

Join 的过程中,Map 结束之后,会将相同的 Key 的数据 shuffle 到同一个 Reduce中,如果数据分布均匀的话,每个Reduce 处理的数据量大体上是比较均衡的,但是若明显存在数据倾斜的时候,会出现某些 Reducer 处理的数据量过大,从而使得该的处理时间过长,成为瓶颈。

大表与大表关联,如果其中一张表的多是空值或者 0 比较多,容易 shuffle 给一个reduce,造成运行慢。

这种情况可以对异常值赋一个随机值来分散 key,均匀分配给多个 reduce 去执行,比如:

当 key 值都是有效值时,解决办法为:

设置以下参数:

Hive 在运行的时候无法判断哪个 key 会产生倾斜,所以使用 hive.skewjoin.key 参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的 reduce,一般可以设置成待处理的总记录数/reduce 个数的 2-4 倍。

1. Hive基础二(join原理和机制,join的几种类型,数据倾斜简单处理)

2. Hive:JOIN及JOIN优化

3. hive中关于常见数据倾斜的处理

hive和sparksql的区别

标准查询关键字执行顺序为 from->on->where->group by->hing->order by ,on 是先对表进行筛选后再关联的,left 关联则 on 只对右表有效,左表都要选出来。

历史上存在的原理,以前都是使用hive来构建数据仓库,所以存在大量对hive所管理的数据查询的需求。而hive、shark、sparlSQL都可以进行hive的数据type: int查询。shark是使用了hive的sql语法解析器和优化器,修改了执行器,使之物理执行过程是跑在spark上;而sparkSQL是使用了自身的语法解析器、优化器和执行器,同时sparkSQL还扩展了接口,不单单支持hive数据的查询,可以进行多种数据源的数据查询。

Hive on Tez

sort order: +

Hortonworks 在2014年左右发布了 Stinger Initiative,并进行社区分享,为的是让 Hive 支持更多 SQL,并实现更好的性能。

Tez 是 Apache 开源的支持 DAG(有向无环图) 作业的计算框架,是支持 Hadoop 2.x 的重要引擎。它源于 MapReduce 框架,核心思想是将 Map 和 Reduce 两个作进一步拆分,分解后的元作可以任意灵活组合,产生新的作,这些作经过一些控制程序组装后,可形成一个大的 DAG 作业。

Tez 将 Map task 和 Reduce task 进一步拆分为如下图所示:

Tez 的 task 由 Input、processor、output 阶段组成,可以表达所有复杂的 map、reduce 作,如下图:

Tez 可以将多个有依赖的作业转换为一个作业(只需写一次 HDFS,中间环节较少),从而这样一来当两边 bucket 要做局部 join 的时候,只需要用类似 merge sort 算法中的 merge 作一样把两个 bucket 顺序遍历一遍即可完成,小表的数据可以每次只读取一部分,然后还是用大表一行一行的去匹配,这样的join 没有限制内存的大小. 并且也可以执行全外连接。大大提升 DAG 作业的性能。Tez 已被 Hortonworks 用于 Hive 引擎的优化,经测试一般小任务比 Hive MR 的 2-3 倍速度左右,大任务 7-10 倍左右,情况不同效果不同。

Tez + Hive 仍采用 MapReduce 计算框架,但对 DAG 的作业依赖关系进行了裁剪,并将多个小作业合并成一个大作业,不仅减少了计算量,而且写 HDFS 次数也大大减少。

影响数据检索效率的几个因素

Reduce Output Operator

影响数据检索效率的几个因素

bTableScan

数据检索有两种主要形态。种是纯数据库型的。典型的结构是一个关系型数据,比如 mysql。用户通过 SQL 表达出所需要的数据,mysql 把 SQL 翻译成物理的数据检索动作返回结果。第二种形态是现在越来越流行的大数据玩家的。典型的结构是有一个分区的数据存储,最初这种存储就是原始的 HDFS,后来开逐步有人在 HDFS 上加上索引的支持,或者干脆用 Elasticsearc 这样的数据存储。然后在存储之上有一个分布式的实时计算层,比如 Hive 或者 Spark SQL。用户用 Hive SQL 提交给计算层,计算层从存储里拉取出数据,进行计算之后返回给用户。这种大数据的起初是因为 SQL 有很多 ad-hoc 查询是满足不了的,干脆让用户自己写 map/reduce 想怎么算都可以了。但是后来玩大了之后,越来越多的人觉得这些 Hive 之类的方案查询效率怎么那么低下啊。于是一个又一个项目开始去优化这些大数据计算框架的查询性能。这些优化手段和经典的数据库优化到今天的手段是没有什么两样的,很多公司打着搞计算引擎的旗号干着重新发明数据库的活。所以,回归本质,影响数据检索效率的就那么几个因素。我们不妨来看一看。

数据检索干的是什么事情

=> 加载 => 变换

找到所需要的数据,把数据从远程或者磁盘加载到内存中。按照规则进行变换,比如按某个字段group by,取另外一个字段的sum之类的计算。

影响效率的四个因素

读取更少的数据

数据本地化,充分遵循底层硬件的限制设计架构

更多的机器

更高效率的计算和计算的物理实现

原则上的四点描述是非常抽象的。我们具体来看这些点映射到实际的数据库中都是一些什么样的优化措施。

读取更少的数据

数据越少,检索需要的时间当然越少了。在考虑所有技术手段之前,最有效果的恐怕是从业务的角度审视一下我们是否需要从那么多的数据中检索出结果来。有没有可能用更少的数据达到同样的效果。减少的数据量的两个手段,聚合和抽样。如果在入库之前把数据就做了聚合或者抽样,是不是可以极大地减少查询所需要的时间,同时效果上并无多少异呢?极端情况下,如果需要的是一天的总访问量,比如有1个亿。查询的时候去数1亿行肯定快不了。但是如果统计好了一天的总访问量,查询的时候只需要取得一条记录就可以知道今天有1个亿的人访问了。

索引是一种非常常见的减少数据读取量的策略了。一般的按行存储的关系型数据库都会有一个主键。用这个主键可以非常快速的查找到对应的行。KV存储也是这样,按照Key可以快速地找到对应的Value。可以理解为一个Hashmap。但是一旦查询的时候不是用主键,而是另外一个字段。那么最糟糕的情况就是进行一次全表的扫描了,也就是把所有的数据都读取出来,然后看要的数据到底在哪里,这就不可能快了。减少数据读取量的方案就是,建立一个类似字典一样的查找表,当我们找 username=wentao 的时候,可以列举出所有有 wentao 作为用户名的行的主键。然后拿这些主键去行存储(就是那个hashmap)里捞数据,就一捞一个准了。

谈到索引就不得不谈一下一个查询使用了两个字段,如何使用两个索引的问题。mysql的行为可以代表大部分主流数据库的处理方式:

基本上来说,经验表明有多个单字段的索引,数据库会选一的来使用。其余字段的过滤仍然是通过数据读取到内存之后,用predicate去判断的。也就是无法减少数据的读取量。

在这个方面基于inverted index的数据就非常有特点。一个是Elasticsearch为代表的lucene系的数据库。另外一个是新锐的druid数据库。

效果就是,这些数据库可以把单字段的filter结果缓存起来。多个字段的查询可以把之前缓存的结果直接拿过来做 AND 或者 OR 作。

索引存在的必要是因为主存储没有提供直接的快速的能力。如果访问的就是数据库的主键,那么需要读取的数据也就非常少了。另外一个变种就是支持遍历的主键,比如hbase的rowkey。如果查询的是一个基于rowkey的范围,那么像hbase这样的数据库就可以支持只读取到这个范围内的数据,而不用读取不再这个范围内的额外数据,从而提高速度。这种加速的方式就是利用了主存储自身的物理分布的特性。另外一个更常见的场景就是 partition。比如 mysql 或者 tgresql 都支持分区表的概念。当我们建立了分区表之后,查找的条件如果可以过滤出分区,那么可以大幅减少需要读取的数据量。比 partition 更细粒度一些的是 clustered index。它其实不是一个索引(二级索引),它是改变了数据在主存储内的排列方式,让相同clustered key的数据彼此紧挨着放在一起,从而在查询的时候避免扫描到无关的数据。比 partition 更粗一些的是分库分表分文件。比如我们可以一天建立一张表,查询的时候先到表,再执行 SQL。比如 graphite 给每个 metric 创建一个文件存放采集来的 data point,查询的时候给定metric 就可以到一个文件,然后只读取这个文件的数据。

另外还有一点就是按行存储和按列存储的区别。按列存储的时候,每个列是一个的文件。查询用到了哪几个列就打开哪几个列的文件,没有用到的列的数据碰都不会碰到。反观按行存储,一张中的所有字段是彼此紧挨在磁盘上的。一个表如果有100个字段,哪怕只选取其中的一个字段,在扫描磁盘的时候其余99个字段的数据仍然会被扫描到的。

考虑一个具体的案例,时间序列数据。如何使用读取更少的数据的策略来提高检索的效率呢?首先,我们可以保证入库的时间粒度,维度粒度是正好是查询所需要的。如果查询需要的是5分钟数据,但是入库的是1分钟的,那么就可以先聚合成5分钟的再存入数据库。对于主存储的物理布局选择,如果查询总是针对一个时间范围的。那么把 timestamp 做为 hbase 的 rowkey,或者 mysql 的 clustered index 是合适。这样我们按时间过滤的时候,选择到的是一堆连续的数据,不用读取之后再过滤掉不符合条件的数据。但是如果在一个时间范围内有很多中数据,比如1万个IP,那么即便是查1个IP的数据也需要把1万个IP的数据都读取出来。所以可以把 IP 维度也编码到 rowkey 或者 clustered index 中。但是如另外还有一个维度是 OS,那么查询的时候 IP 维度的 rowkey 是没有帮助的,仍然是要把所有的数据都查出来。这就是仅依靠主存储是无法满足各种查询条件下都能够读取更少的数据的原因。所以,二级索引是必要的。我们可以把时间序列中的所有维度都拿出来建立索引,然后查询的时候如果指定了维度,就可以用二级索引把真正需要读取的数据过滤出来。但是实践中,很多数据库并不因为使用了索引使得查询变快了,有的时候反而变得更慢了。对于 mysql 来说,存储时间序列的方式是按时间做 partition,不对维度建立任何索引。查询的时候只过滤出对应的 partition,然后进行全 partition 扫描,这样会快过于使用二级索引到行之后再去读取主存储的查询方式。究其原因,就是数据本地化的问题了。

[page]

数据本地化

数据本地化的实质是软件工程师们要充分尊重和理解底层硬件的限制,并且用各种手段规避问题化利用手里的硬件资源。本地化有很多种形态

最常见的理解的本地化问题是网络问题。我们都知道网络带宽不是无限的,比本地磁盘慢多了。如果可能尽量不要通过网络去访问数据。即便要访问,也应该一次抓取多一些数据,而不是一次搞一点,然后搞很多次。因为网络连接和来回的开销是非常高的。这就是 data locality 的问题。我们要把计算尽可能的靠近数据,减少网络上传输的数据量。

这种带宽引起的本地化问题,还有很多。网络比硬盘慢,硬盘比内存慢,内存比L2缓存慢。做到的数据库可以让计算完全发生在 L2 缓存内,尽可能地避免频繁地在内存和L2之间倒腾数据。

另外一种形态的问题化问题是磁盘的顺序读和随机读的问题。当数据彼此靠近地物理存放在磁盘上的时候,顺序读取一批是非常快的。如果需要随机读取多个不连续的硬盘位置,磁头就要来回移动从而使得读取速度快速下降。即便是 SSD 硬盘,顺序读也是要比随机读快的。

基于尽可能让数据读取本地化的原则,检索应该尽可能地使用顺序读而不是随机读。如果可以的话,把主存储的row key或者clustered index设计为和查询提交一样的。时间序列如果都是按时间查,那么按时间做的row key可以非常高效地以顺序读的方式把数据拉取出来。类似地,按列存储的数据如果要把一个列的数据都取出来加和的话,可以非常快地用顺序读的方式加载出来。

二级索引的访问方式典型的随机读。当查询条件经过了二级索引查找之后得到一堆的主存储的 key,那么就需要对每个 key 进行一次随机读。即便彼此仅靠的key可以用顺序读做一些优化,总体上来说仍然是随机读的模式。这也就是为什么时间序列数据在 mysql 里建立了索引反而比没有建索引还要慢的原因。

为了尽可能的利用顺序读,人们就开始想各种办法了。前面提到了 mysql 里的一行数据的多个列是彼此紧靠地物理存放的。那么如果我们把所需要的数据建成多个列,那么一次查询就可以批量获得更多的数据,减少随机读取的次数。也就是把之前的一些行变为列的方式来存放,减少行的数量。这种做法的经典案例就是时间序列数据,比如可以一分钟存一行数据,每一秒的值变成一个列。那么行的数量可以变成之前的1/60。

但是这种行变列的做法在按列存储的数据库里就不能直接照搬了,有些列式数据库有column family的概念,不同的设置在物理上存放可能是在一起的也可能是分开的。对于 Elasticsearch 来说,要想减少行的数量,让一行多pack一些数据进去,一种做法就是利用 nested document。内部 Elasticsearch 可以保证一个 document 下的所有的 nested document是物理上靠在一起放在同一个 lucene 的 segment 内。

网络的data locality就比较为人熟知了。map reduce的大数据计算模式就是利用map在数据的本地把数据先计算,往往计算的结果可以比原数据小很多。然后再通过网络传输汇总后做 reduce 计算。这样就节省了大量网络传输数据的时间浪费和资源消耗。现在 Elasticsearch 就支持在每个 data node 上部署 spark。由 spark 在每个 data node 上做计算。而不用把数据都查询出来,用网络传输到 spark 集群里再去计算。这种数据库和计算集群的混合部署是高性能的关键。类似的还有 storm 和 kafka 之间的关系。

网络的data locality还有一个老大难问题就是分布式大数据下的多表join问题。如果只是查询一个分布式表,那么把计算用 map reduce 表达就没有多大问题了。但是如果需要同时查询两个表,就意味着两个表可能不是在物理上同样均匀分布的。一种最简单的策略就是找出两张表中最小的那张,然后把表的内容广播到每个上,再做join。复杂一些的是对两个单表做 map reduce,然后按照相同的 key 把部分计算的结果汇集在一起。第三种策略是保证数据分布的方式,让两张表查询的时候需要用到的数据总在一起。没有完美的方案,也不大可能有完美的方案。除非有一天网络带宽可以大到忽略不计的地步。

更多的机器

这个就没有什么好说的了。多一倍的机器就多一倍的 CPU,可以同时计算更多的数据。多一倍的机器就多一倍的磁头,可以同时扫描更多的字节数。很多大数据框架的故事就是讲如何如何通过 scale out解决无限大的问题。但是值得注意的是,集群可以无限大,数据可以无限多,但是口袋里的银子不会无限多的。堆机器解决问题比升级大型机是要便宜,但是机器堆多了也是非常昂贵的。特别是 Hive 这些从一开始就是分布式多机的检索方案,刚开始的时候效率并不高。堆机器是一个乘数,当数据库本来单机性能不高的时候,乘数大并不能起到决定性的作用。

更高效的计算和计算实现

检索的过程不仅仅是磁盘扫描,它还包括一个可简单可复杂的变换过程。使用 hyperloglog,count min-sketch等有损算法可以极大地提高统计计算的性能。数据库的join也是一个经常有算法创新的地方。

计算实现就是算法是用C++实现的还是用ja,还是python实现的。用ja是用大Integer实现的,还是小int实现的。不同的语言的实现方式会有一些固定的开销。不是说快就一定要C++,但是 python 写 for 循环是显然没有指望的。任何数据检索的环节只要包含 python/ruby 这些语言的逐条 for 循环就一定快不起来了。

结论

希望这四点可以被记住,成为一种指导性的优化数据检索效率的思维框架。无论你是设计一个mysql表结构,还是优化一个spark sql的应用。从这四个角度想想,都有哪些环节是在拖后腿的,手上的工具有什么样的参数可以调整,让随机读变成顺序读,表结构怎么样设计可以最小化数据读取的量。要做到这一点,你必须非常非常了解工具的底层实现。而不是盲目的相信,xx数据库是的数据库,所以它一定很快之类的。如果你不了解你手上的数据库或者计算引擎,当它快的时候你不知道为何快,当它慢的时候你就更加无从优化了。