Presto必知必会
1. 概述
Presto是一个分布式SQL查询引擎,用于查询分布在一个或多个不同数据源中的大数据集。
Presto通过使用分布式查询,可以快速高效的完成海量数据的查询。如果你需要处理TB或者PB级别的数据,那么你可能更希望借助于Hadoop和HDFS来完成这些数据的处理。作为Hive和Pig(Hive和Pig都是通过MapReduce的管道流来完成HDFS数据的查询)的替代者,Presto不仅可以访问HDFS,也可以操作不同的数据源,包括:RDBMS和其他的数据源(例如:Cassandra)。
Presto被设计为数据仓库和数据分析产品:数据分析、大规模数据聚集和生成报表。这些工作经常通常被认为是线上分析处理操作。
Presto是FaceBook开源的一个开源项目。Presto在FaceBook诞生,并且由FaceBook内部工程师和开源社区的工程师公共维护和改进。
2. 部署Presto
下载安装:
下载Presto server tarball, presto-server-0.152.tar.gz,将它解压。它包含一个顶级目录,presto-server-0.152,我们叫它安装目录。
Presto需要一个用于存储日志、本地元数据等的数据目录。建议在安装目录的外面创建一个数据目录。这样方便Presto进行升级。
配置Presto:
在安装目录中创建一个etc目录。在这个etc目录中放入以下配置信息:
l 节点属性:每个节点的环境配置信息
l JVM 配置:JVM的命令行选项
l 配置属性:Presto server的配置信息
l Catalog属性:configuration for Connectors(数据源)的配置信息
节点属性:
节点属性配置文件:etc/node.properties包含针对于每个节点的特定的配置信息。一个节点就是在一台机器上安装的Presto实例。这份配置文件一般情况下是在Presto第一次安装的时候,由部署系统创建的。一个etc/node.properties配置文件至少包含如下配置信息:
node.environment=production
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/var/lib/kylin/prestodata
针对上面的配置信息描述如下:
node.environment:集群名称。所有在同一个集群中的Presto节点必须拥有相同的集群名称。
node.id:每个Presto节点的唯一标示。每个节点的node.id都必须是唯一的。在Presto进行重启或者升级过程中每个节点的node.id必须保持不变。如果在一个节点上安装多个Presto实例(例如:在同一台机器上安装多个Presto节点),那么每个Presto节点必须拥有唯一的node.id。
node.data-dir:数据存储目录的位置(操作系统上的路径)。Presto将会把日期和数据存储在这个目录下。
JVM配置:
JVM配置文件,etc/jvm.config,包含一系列在启动JVM的时候需要使用的命令行选项。这份配置文件的格式是:一系列的选项,每行配置一个单独的选项。由于这些选项不在shell命令中使用。因此即使将每个选项通过空格或者其他的分隔符分开,java程序也不会将这些选项分开,而是作为一个命令行选项处理。(就想下面例子中的OnOutOfMemoryError选项)。
一个典型的etc/jvm.config配置文件如下:
-server
-Xmx16G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:OnOutOfMemoryError=kill -9 %p
由于OutOfMemoryError将会导致JVM处于不一致状态,所以遇到这种错误的时候我们一般的处理措施就是记录下dump headp中的信息(用于debugging),然后强制终止进程。
Presto会将查询编译成字节码文件,因此Presto会生成很多class,因此我们我们应该增大Perm区的大小(在Perm中主要存储class)并且要允许Jvm class unloading。
配置属性:
Presto的配置文件:etc/config.properties包含了Presto server的所有配置信息。每个Presto server既是一个coordinator也是一个worker。但是在大型集群中,处于性能考虑,建议单独用一台机器作为coordinator。
一个coordinator的etc/config.properties应该至少包含以下信息:
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
query.max-memory=50GB
query.max-memory-per-node=1GB
discovery-server.enabled=true
discovery.uri=http://example.net:8080
以下是最基本的worker配置:
coordinator=false
http-server.http.port=8080
query.max-memory=50GB
query.max-memory-per-node=1GB
discovery.uri=http://example.net:8080
但是如果你用一台机器进行测试,那么这一台机器将会即作为coordinator,也作为worker。配置文件将会如下所示:
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=5GB
query.max-memory-per-node=1GB
discovery-server.enabled=true
discovery.uri=http://example.net:8080
对配置项解释如下:
coordinator:指定是否允许Presto实例作为一个coordinator(接收来自客户端的查询请求,管理每个查询的执行过程)。
node-scheduler.include-coordinator:是否允许在coordinator服务中进行调度工作。对于大型的集群,在一个节点上的Presto server即作为coordinator又作为worker将会降低查询性能。因为如果一个服务器作为worker使用,大部分的资源都会被worker占用,那么就不会有足够的资源进行关键任务调度、管理和监控查询执行。
http-server.http.port:指定HTTP server的端口。Presto 使用 HTTP进行内部和外部的所有通讯。
query.max-memory=50GB:一个单独的查询任务使用的最大内存 (一个查询计划的某个执行部分会在一个特定的节点上执行)。 这个配置参数限制的GROUP BY语句中的Group的数目、JOIN关联中的右关联表的大小、ORDER BY语句中的行数和一个窗口函数中处理的行数。 该参数应该根据并发查询的数量和查询的复杂度进行调整。如果该参数设置的太低,很多查询将不能执行;但是如果设置的太高将会导致JVM把内存耗光。
query.max-memory-per-node=1GB:一个单独的查询任务使用任何一个节点的最大内存。
discovery-server.enabled:Presto 通过Discovery 服务来找到集群中所有的节点。为了能够找到集群中所有的节点,每一个Presto实例都会在启动的时候将自己注册到discovery服务。Presto为了简化部署,并且也不想再增加一个新的服务进程,Presto coordinator 可以运行一个内嵌在coordinator 里面的Discovery 服务。这个内嵌的Discovery 服务和Presto共享HTTP server并且使用同样的端口。
discovery.uri:Discovery server的URI。由于启用了Presto coordinator内嵌的Discovery 服务,因此这个uri就是Presto coordinator的uri。修改example.net:8080,根据你的实际环境设置该URI。注意:这个URI一定不能以“/“结尾。
日志级别:
日志配置文件:etc/log.properties。在这个配置文件中允许你根据不同的日志结构设置不同的日志级别。每个logger都有一个名字(通常是使用logger的类的全标示类名).Loggers通过名字中的“.“来表示层级和集成关系。 (像java里面的包). 如下面的log配置信息:
com.facebook.presto=INFO
这个将为com.facebook.presto.server和com.facebook.presto.hive设置最小的级别为INFO。一共有四个级别为:DEBUG, INFO, WARN和ERROR。
Catalog属性:
Presto通过connectors访问数据。这些connectors挂载在catalogs上。 connector 可以提供一个catalog中所有的schema和表。例如: Hive connector 将每个hive的database都映射成为一个schema,所以如果hive connector挂载到了名为hive的catalog,并且在hive的web有一张名为clicks的表,那么在Presto中可以通过hive.web.clicks来访问这张表。
通过在etc/catalog目录下创建catalog属性文件来完成catalogs的注册。例如:可以先创建一个etc/catalog/jmx.properties文件,文件中的内容如下,完成在jmxcatalog上挂载一个jmx connector:
connector.name=jmx
可以通过https://prestodb.io/docs/current/connector.html查看Connectors的详细配置选项。
下面介绍一下我的环境搭建的信息:
CDH 5.7.0集成环境,其中:
Hadoop:2.6.0
Hive:1.1.0
Presto版本:presto-server-0.152
我们这里演示Presto读取Hive中的表数据。
Presto部署的集群节点:
10.20.18.25 SZB-L0023778
10.20.18.28 SZB-L0023779
10.20.18.24 SZB-L0023780
SZB-L0023780节点作为coordinator节点,其余两个节点为Worker节点。
SZB-L0023780的配置文件内容为:
coordinator节点(SZB-L0023780)的配置为:
etc/config.properties文件内容为:
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
query.max-memory=8GB
query.max-memory-per-node=1GB
discovery-server.enabled=true
discovery.uri=http://SZB-L0023780:8080
etc/jvm.config文件内容为:
-server
-Xmx8G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:OnOutOfMemoryError=kill -9 %p
etc/node.properties文件内容为:
node.environment=production
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/var/lib/kylin/prestodata
etc/catalog/hive.properties文件内容为:
connector.name=hive-cdh5
hive.metastore.uri=thrift://SZB-L0023778:9083
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
Worker节点(SZB-L0023778,SZB-L0023779)的配置为:
etc/config.properties文件内容为:
coordinator=false
http-server.http.port=8080
query.max-memory=8GB
query.max-memory-per-node=1GB
discovery.uri=http://SZB-L0023780:8080
etc/node.properties文件内容为(节点:SZB-L0023778):
node.environment=production
node.id=ffffffff-ffff-ffff-ffff-gggggggggggg
node.data-dir=/var/lib/kylin/prestodata
etc/node.properties文件内容为(节点:SZB-L0023778):
node.environment=production
node.id=ffffffff-ffff-ffff-ffff-hhhhhhhhhhhh
node.data-dir=/var/lib/kylin/prestodata
etc/jvm.config文件内容为:
-server
-Xmx8G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:OnOutOfMemoryError=kill -9 %p
etc/catalog/hive.properties文件内容为:
connector.name=hive-cdh5
hive.metastore.uri=thrift://SZB-L0023778:9083
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
3. 运行Presto
在安装目录的bin/launcher文件,就是启动脚本。Presto可以使用如下命令作为一个后台进程启动:
bin/launcher start
另外,也可以在前台运行,日志和相关输出将会写入stdout/stderr(可以使用类似daemontools的工具捕捉这两个数据流):
bin/launcher run
运行bin/launcher --help,Presto将会列出支持的命令和命令行选项。另外可以通过运行时使用—verbose参数,来调试安装是否正确。
启动完之后,日志将会写在node.data-dir 配置目录的子目录var/log下,该目录下有如下文件:
launcher.log:这个日志文件由launcher创建,并且server的stdout和stderr都被重定向到了这个日志文件中。这份日志文件中只会有很少的信息,包括:在server日志系统初始化的时候产生的日志和JVM产生的诊断和测试信息。
server.log:这个是Presto使用的主要日志文件。一般情况下,该文件中将会包括server初始化失败时产生的相关信息。这份文件会被自动轮转和压缩。
http-request.log:这是HTTP请求的日志文件,包括server收到的每个HTTP请求信息,这份文件会被自动轮转和压缩。
4. 命令行接口访问Presto
Presto CLI为用户提供了一个用于查询的可交互终端窗口。CLI是一个 可执行 JAR文件, 这也就意味着你可以像UNIX终端窗口一样来使用CLI。
下载 presto-cli-0.152-executable.jar,重名名为 presto ,使用 chmod +x 命令设置可执行权限,然后执行:
./presto --server example.net:8080 --catalog hive --schema default
使用 --help 选项运行CLI,可以看到可用的选项。
默认情况下,查询的结果是分页的。而这种分页的实现不需要你去编写什么代码,而是通过配置一系列的配置信息来实现的。你也可以通过将环境变量:PRESTO_PAGER 设置为你自己的程序名称(比如less,more)来自己实现分页或者也可以PRESTO_PAGER 的值设置为空,从而禁止分页。
示例如下:
presto --server SZB-L0023780:8080 --catalog hive--schema default
presto:default>use zy_af_db;
presto: zy_af_db>select count(1) from ps_of_all;
_col0
-----------
256656293
(1 row)
Query20160907_030802_00014_kr9n9, FINISHED, 2 nodes
Splits:828 total, 828 done (100.00%)
0:03 [257M rows, 55GB] [78.7M rows/s, 16.9GB/s]
5. JDBC驱动访问Presto
通过使用JDBC驱动,可以访问Presto。下载presto-jdbc-0.152.jar并将这个jar文件添加到你的java应用程序的classpath中,Presto支持的URL格式如下:
jdbc:presto://host:port
jdbc:presto://host:port/catalog
jdbc:presto://host:port/catalog/schema
例如,可以使用下面的URL来连接运行在example.net服务器8080端口上的Presto的hive catalog中的sales schema:
jdbc:presto://example.net:8080/hive/sales
6. Presto校验器
我们可以使用Presto Verifier 来将Presto与其他的数据库(例如:MySql)进行对比测试或者将两个Presto集群相互进行对比测试。如果我们需要对Presto进行二次开发,那么我们将会使用Presto Verifier不间断的与Presto的前一版本进行对比测试。
第一步:创建一个MySQL数据库,并且在数据库中用如下语句创建一个表:
CREATE TABLE verifier_queries(
id INT NOT NULL AUTO_INCREMENT,
suite VARCHAR(256) NOT NULL,
name VARCHAR(256),
test_catalog VARCHAR(256) NOT NULL,
test_schema VARCHAR(256) NOT NULL,
test_prequeries TEXT,
test_query TEXT NOT NULL,
test_postqueries TEXT,
test_username VARCHAR(256) NOT NULL default 'verifier-test',
test_password VARCHAR(256),
control_catalog VARCHAR(256) NOT NULL,
control_schema VARCHAR(256) NOT NULL,
control_prequeries TEXT,
control_query TEXT NOT NULL,
control_postqueries TEXT,
control_username VARCHAR(256) NOT NULL default 'verifier-test',
control_password VARCHAR(256),
session_properties_json VARCHAR(256),
PRIMARY KEY (id)
);
第二步,创建一个属性文件,通过该属性文件来配置校验器:
suite=my_suite
query-database=jdbc:mysql://localhost:3306/my_database?user=my_username&password=my_password
control.gateway=jdbc:presto://localhost:8080
test.gateway=jdbc:presto://localhost:8081
thread-count=1
最后一步, 下载presto-verifier-0.152-executable.jar,并将其重命名为:verifier,通过命令:chmod +x为其赋予执行权限,然后运行:
./verifier config.properties
7. 管理Presto
7.1 Web接口
Presto提供了一个Web接口用于监控和管理查询。这个Web接口可以在Presto coordinator上通过HTTP访问,使用在etc/config.properties中配置的HTTP端口号。
如上的主页面显示了正在执行的查询数,正常活动的Worker数,排队的查询数,阻塞的查询数,并行度等等,以及每个查询的列表区域(包括查询的ID,查询语句,查询状态,用户名,数据源等等)。正在查询的排在最上面,紧接着依次为最近完成的查询,失败的查询等。
查询的状态有以下几种:
· QUEUED – 查询以及被接受,正等待执行
· PLANNING – 查询在计划中
· STARTING – 查询已经开始执行
· RUNNING – 查询已经运行,至少有一个task开始执行
· BLOCKED – 查询被阻塞,并且在等待资源(缓存空间,内存,切片)
· FINISHING – 查询正完成(比如commit forautocommit queries)
· FINISHED – 查询已经完成(比如数据已经输出)
· FAILED – 查询执行失败
如果你想查看一个查询更加详细信息,只要点击该查询的“Query ID”链接即可,部分内容如下:
每个查询中的Tasks,我们可以单击“ID”查询每个Task更多的信息。
查询的总概览中提供了一个“Kill Query”功能,另外也提供了三种可视化的功能,为Live Plan,Raw JSON和Split Timeline。比如我们可以从Split Timelinez中查询哪部分耗时比较多。
7.2 Presto调优
默认的Presto配置满足绝大部分的负载要求,下面的一些信息会帮助你解决Presto集群环境中的一些特殊的性能问题。
配置文件
task.info-refresh-max-wait
控制过期task信息,使用在调度中。增加这个值能够减少coordinator 的CPU负载,但是可能导致split调度不是最优的。
task.max-worker-threads
设置workers处理splits时使用的线程数。如果Worker的CPU利用率低并且所有的线程都在使用,那么增加这个值可以提高吞吐量,但是会增加堆内存空间大小。活动线程的数量可以通过
com.facebook.presto.execution.taskexecutor.runningsplitsJMX统计
distributed-joins-enabled
使用hash分布式join代替broadcast广播的join。分布式join需要根据join key的hash值来重新分布表,这可能比广播join慢,但是可以利用更大的表之间join操作。广播的join需要关接的右边的表加载到每一个节点的内存中,然而分布式join是将关联的右边的表加载到分布式内存中。我们可以在每一个查询中设置distributed_join 的session属性值来进行选择。
node-scheduler.network-topology
当调度split时,设置使用的网络拓扑。
当调度split时,“legacy”将忽略拓扑,“flat”将尝试在同一个节点调度split。
JVM配置
下面的参数可以帮助诊断GC问题:
-XX:+PrintGCApplicationConcurrentTime
-XX:+PrintGCApplicationStoppedTime
-XX:+PrintGCCause
-XX:+PrintGCDateStamps
-XX:+PrintGCTimeStamps
-XX:+PrintGCDetails
-XX:+PrintReferenceGC
-XX:+PrintClassHistogramAfterFullGC
-XX:+PrintClassHistogramBeforeFullGC
-XX:PrintFLSStatistics=2
-XX:+PrintAdaptiveSizePolicy
-XX:+PrintSafepointStatistics
-XX:PrintSafepointStatisticsCount=1
7.3 队列配置
队列规则定义在一个Json文件中,用于控制能够提交给Presto的查询的个数,以及每个队列中能够同时运行的查询的个数。用config.properties中的query.queue-config-file来指定Json配置文件的名字。
队列规则如果定义了多个队列,查询会按顺序依次进入不同的队列中。队列规则将按照顺序进行处理,并且使用第一个匹配上的规则。在以下的配置例子中,有5个队列模板,在user.${USER}队列中,${USER}表示着提交查询的用户名。同样的${SOURCE}表示提交查询的来源。
同样有如下的规则定义了哪一类查询会进入哪一个队列中:
· 第一条规则将使bob成为管理员,bob提交的查询进入admin队列。
· 第二条规则表示,来源包含pipeline的查询将首先进入用户的个人队列中,然后进入pipeline队列。当一个查询进入一个新的队列后,直到查询结束才会离开之前的队列。
· 最后一个规则包含所有的队列,将所有的查询加入到个人用户队列中
所有这些规则实现了这样的策略,bob是一个管理员,而其他用户需要遵循以下的限制:
1. 每个用户最多能同时运行5个查询,另外可以运行一个pipeline。
2. 最多能同时运行10个pipeline来源的查询。
3. 最多能同时运行100个其他查询。
{
"queues": {
"user.${USER}": {
"maxConcurrent": 5,
"maxQueued": 20
},
"user_pipeline.${USER}": {
"maxConcurrent": 1,
"maxQueued": 10
},
"pipeline": {
"maxConcurrent": 10,
"maxQueued": 100
},
"admin": {
"maxConcurrent": 100,
"maxQueued": 100
},
"global": {
"maxConcurrent": 100,
"maxQueued": 1000
}
},
"rules": [
{
"user": "bob",
"queues": ["admin"]
},
{
"source": ".*pipeline.*",
"queues": [
"user_pipeline.${USER}",
"pipeline",
"global"
]
},
{
"queues": [
"user.${USER}",
"global"
]
}
]
}