Presto必知必会

黎东
L、先森
2019-01-18 0 2682

1.      概述


Presto是一个分布式SQL查询引擎,用于查询分布在一个或多个不同数据源中的大数据集。


 


Presto通过使用分布式查询,可以快速高效的完成海量数据的查询。如果你需要处理TB或者PB级别的数据,那么你可能更希望借助于Hadoop和HDFS来完成这些数据的处理。作为Hive和Pig(Hive和Pig都是通过MapReduce的管道流来完成HDFS数据的查询)的替代者,Presto不仅可以访问HDFS,也可以操作不同的数据源,包括:RDBMS和其他的数据源(例如:Cassandra)。


Presto被设计为数据仓库和数据分析产品:数据分析、大规模数据聚集和生成报表。这些工作经常通常被认为是线上分析处理操作。


 


Presto是FaceBook开源的一个开源项目。Presto在FaceBook诞生,并且由FaceBook内部工程师和开源社区的工程师公共维护和改进。


 


2.      部署Presto


下载安装:


下载Presto server tarball, presto-server-0.152.tar.gz,将它解压。它包含一个顶级目录,presto-server-0.152,我们叫它安装目录。


 


Presto需要一个用于存储日志、本地元数据等的数据目录。建议在安装目录的外面创建一个数据目录。这样方便Presto进行升级。


 


配置Presto:


在安装目录中创建一个etc目录。在这个etc目录中放入以下配置信息:


l  节点属性:每个节点的环境配置信息


l  JVM 配置:JVM的命令行选项


l  配置属性:Presto server的配置信息


l  Catalog属性:configuration for Connectors(数据源)的配置信息


 


节点属性:


节点属性配置文件:etc/node.properties包含针对于每个节点的特定的配置信息。一个节点就是在一台机器上安装的Presto实例。这份配置文件一般情况下是在Presto第一次安装的时候,由部署系统创建的。一个etc/node.properties配置文件至少包含如下配置信息:


node.environment=production


node.id=ffffffff-ffff-ffff-ffff-ffffffffffff


node.data-dir=/var/lib/kylin/prestodata


 


针对上面的配置信息描述如下:


node.environment:集群名称。所有在同一个集群中的Presto节点必须拥有相同的集群名称。


node.id:每个Presto节点的唯一标示。每个节点的node.id都必须是唯一的。在Presto进行重启或者升级过程中每个节点的node.id必须保持不变。如果在一个节点上安装多个Presto实例(例如:在同一台机器上安装多个Presto节点),那么每个Presto节点必须拥有唯一的node.id。


node.data-dir:数据存储目录的位置(操作系统上的路径)。Presto将会把日期和数据存储在这个目录下。


 


JVM配置:


JVM配置文件,etc/jvm.config,包含一系列在启动JVM的时候需要使用的命令行选项。这份配置文件的格式是:一系列的选项,每行配置一个单独的选项。由于这些选项不在shell命令中使用。因此即使将每个选项通过空格或者其他的分隔符分开,java程序也不会将这些选项分开,而是作为一个命令行选项处理。(就想下面例子中的OnOutOfMemoryError选项)。


一个典型的etc/jvm.config配置文件如下:


-server

-Xmx16G

-XX:+UseG1GC

-XX:G1HeapRegionSize=32M

-XX:+UseGCOverheadLimit

-XX:+ExplicitGCInvokesConcurrent

-XX:+HeapDumpOnOutOfMemoryError

-XX:OnOutOfMemoryError=kill -9 %p

 


由于OutOfMemoryError将会导致JVM处于不一致状态,所以遇到这种错误的时候我们一般的处理措施就是记录下dump headp中的信息(用于debugging),然后强制终止进程。


 


Presto会将查询编译成字节码文件,因此Presto会生成很多class,因此我们我们应该增大Perm区的大小(在Perm中主要存储class)并且要允许Jvm class unloading。


 


配置属性:


Presto的配置文件:etc/config.properties包含了Presto server的所有配置信息。每个Presto server既是一个coordinator也是一个worker。但是在大型集群中,处于性能考虑,建议单独用一台机器作为coordinator。


一个coordinator的etc/config.properties应该至少包含以下信息:


coordinator=true

node-scheduler.include-coordinator=false

http-server.http.port=8080

query.max-memory=50GB

query.max-memory-per-node=1GB

discovery-server.enabled=true

discovery.uri=http://example.net:8080

以下是最基本的worker配置:


coordinator=false

http-server.http.port=8080

query.max-memory=50GB

query.max-memory-per-node=1GB

discovery.uri=http://example.net:8080

 


但是如果你用一台机器进行测试,那么这一台机器将会即作为coordinator,也作为worker。配置文件将会如下所示:


coordinator=true

node-scheduler.include-coordinator=true

http-server.http.port=8080

query.max-memory=5GB

query.max-memory-per-node=1GB

discovery-server.enabled=true

discovery.uri=http://example.net:8080

 


对配置项解释如下:


coordinator:指定是否允许Presto实例作为一个coordinator(接收来自客户端的查询请求,管理每个查询的执行过程)。

node-scheduler.include-coordinator:是否允许在coordinator服务中进行调度工作。对于大型的集群,在一个节点上的Presto server即作为coordinator又作为worker将会降低查询性能。因为如果一个服务器作为worker使用,大部分的资源都会被worker占用,那么就不会有足够的资源进行关键任务调度、管理和监控查询执行。

http-server.http.port:指定HTTP server的端口。Presto 使用 HTTP进行内部和外部的所有通讯。

query.max-memory=50GB:一个单独的查询任务使用的最大内存 (一个查询计划的某个执行部分会在一个特定的节点上执行)。 这个配置参数限制的GROUP BY语句中的Group的数目、JOIN关联中的右关联表的大小、ORDER BY语句中的行数和一个窗口函数中处理的行数。 该参数应该根据并发查询的数量和查询的复杂度进行调整。如果该参数设置的太低,很多查询将不能执行;但是如果设置的太高将会导致JVM把内存耗光。

query.max-memory-per-node=1GB:一个单独的查询任务使用任何一个节点的最大内存。

discovery-server.enabled:Presto 通过Discovery 服务来找到集群中所有的节点。为了能够找到集群中所有的节点,每一个Presto实例都会在启动的时候将自己注册到discovery服务。Presto为了简化部署,并且也不想再增加一个新的服务进程,Presto coordinator 可以运行一个内嵌在coordinator 里面的Discovery 服务。这个内嵌的Discovery 服务和Presto共享HTTP server并且使用同样的端口。

discovery.uri:Discovery server的URI。由于启用了Presto coordinator内嵌的Discovery 服务,因此这个uri就是Presto coordinator的uri。修改example.net:8080,根据你的实际环境设置该URI。注意:这个URI一定不能以“/“结尾。

 


日志级别:


日志配置文件:etc/log.properties。在这个配置文件中允许你根据不同的日志结构设置不同的日志级别。每个logger都有一个名字(通常是使用logger的类的全标示类名).Loggers通过名字中的“.“来表示层级和集成关系。 (像java里面的包). 如下面的log配置信息:


com.facebook.presto=INFO

这个将为com.facebook.presto.server和com.facebook.presto.hive设置最小的级别为INFO。一共有四个级别为:DEBUG, INFO, WARN和ERROR。


 


Catalog属性:


Presto通过connectors访问数据。这些connectors挂载在catalogs上。 connector 可以提供一个catalog中所有的schema和表。例如: Hive connector 将每个hive的database都映射成为一个schema,所以如果hive connector挂载到了名为hive的catalog,并且在hive的web有一张名为clicks的表,那么在Presto中可以通过hive.web.clicks来访问这张表。


 


通过在etc/catalog目录下创建catalog属性文件来完成catalogs的注册。例如:可以先创建一个etc/catalog/jmx.properties文件,文件中的内容如下,完成在jmxcatalog上挂载一个jmx connector:


connector.name=jmx

可以通过https://prestodb.io/docs/current/connector.html查看Connectors的详细配置选项。


 


 


下面介绍一下我的环境搭建的信息:


CDH 5.7.0集成环境,其中:


           Hadoop:2.6.0


           Hive:1.1.0


Presto版本:presto-server-0.152


我们这里演示Presto读取Hive中的表数据。


 


Presto部署的集群节点:


10.20.18.25     SZB-L0023778


10.20.18.28      SZB-L0023779


10.20.18.24     SZB-L0023780


 


SZB-L0023780节点作为coordinator节点,其余两个节点为Worker节点。


SZB-L0023780的配置文件内容为:


coordinator节点(SZB-L0023780)的配置为:


etc/config.properties文件内容为:


coordinator=true


node-scheduler.include-coordinator=false


http-server.http.port=8080


query.max-memory=8GB


query.max-memory-per-node=1GB


discovery-server.enabled=true


discovery.uri=http://SZB-L0023780:8080


 


etc/jvm.config文件内容为:


-server


-Xmx8G


-XX:+UseG1GC


-XX:G1HeapRegionSize=32M


-XX:+UseGCOverheadLimit


-XX:+ExplicitGCInvokesConcurrent


-XX:+HeapDumpOnOutOfMemoryError


-XX:OnOutOfMemoryError=kill -9 %p


 


etc/node.properties文件内容为:


node.environment=production


node.id=ffffffff-ffff-ffff-ffff-ffffffffffff


node.data-dir=/var/lib/kylin/prestodata


etc/catalog/hive.properties文件内容为:


connector.name=hive-cdh5


hive.metastore.uri=thrift://SZB-L0023778:9083


hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml


 


Worker节点(SZB-L0023778,SZB-L0023779)的配置为:


etc/config.properties文件内容为:


coordinator=false


http-server.http.port=8080


query.max-memory=8GB


query.max-memory-per-node=1GB


discovery.uri=http://SZB-L0023780:8080


 


etc/node.properties文件内容为(节点:SZB-L0023778):


node.environment=production


node.id=ffffffff-ffff-ffff-ffff-gggggggggggg


node.data-dir=/var/lib/kylin/prestodata


 


etc/node.properties文件内容为(节点:SZB-L0023778):


node.environment=production


node.id=ffffffff-ffff-ffff-ffff-hhhhhhhhhhhh


node.data-dir=/var/lib/kylin/prestodata


 


etc/jvm.config文件内容为:


-server


-Xmx8G


-XX:+UseG1GC


-XX:G1HeapRegionSize=32M


-XX:+UseGCOverheadLimit


-XX:+ExplicitGCInvokesConcurrent


-XX:+HeapDumpOnOutOfMemoryError


-XX:OnOutOfMemoryError=kill -9 %p


 


etc/catalog/hive.properties文件内容为:


connector.name=hive-cdh5


hive.metastore.uri=thrift://SZB-L0023778:9083


hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml


 


 


 


3.      运行Presto


在安装目录的bin/launcher文件,就是启动脚本。Presto可以使用如下命令作为一个后台进程启动:


bin/launcher start

另外,也可以在前台运行,日志和相关输出将会写入stdout/stderr(可以使用类似daemontools的工具捕捉这两个数据流):


bin/launcher run

运行bin/launcher --help,Presto将会列出支持的命令和命令行选项。另外可以通过运行时使用—verbose参数,来调试安装是否正确。


启动完之后,日志将会写在node.data-dir 配置目录的子目录var/log下,该目录下有如下文件:


launcher.log:这个日志文件由launcher创建,并且server的stdout和stderr都被重定向到了这个日志文件中。这份日志文件中只会有很少的信息,包括:在server日志系统初始化的时候产生的日志和JVM产生的诊断和测试信息。


 


server.log:这个是Presto使用的主要日志文件。一般情况下,该文件中将会包括server初始化失败时产生的相关信息。这份文件会被自动轮转和压缩。


 


http-request.log:这是HTTP请求的日志文件,包括server收到的每个HTTP请求信息,这份文件会被自动轮转和压缩。


 


 


4.      命令行接口访问Presto


Presto CLI为用户提供了一个用于查询的可交互终端窗口。CLI是一个 可执行 JAR文件, 这也就意味着你可以像UNIX终端窗口一样来使用CLI。


下载 presto-cli-0.152-executable.jar,重名名为 presto ,使用 chmod +x 命令设置可执行权限,然后执行:


./presto --server example.net:8080 --catalog hive --schema default

         使用 --help 选项运行CLI,可以看到可用的选项。


默认情况下,查询的结果是分页的。而这种分页的实现不需要你去编写什么代码,而是通过配置一系列的配置信息来实现的。你也可以通过将环境变量:PRESTO_PAGER 设置为你自己的程序名称(比如less,more)来自己实现分页或者也可以PRESTO_PAGER 的值设置为空,从而禁止分页。


 


 


示例如下:


presto  --server SZB-L0023780:8080 --catalog hive--schema default


presto:default>use zy_af_db;


presto: zy_af_db>select count(1) from ps_of_all;


   _col0  


-----------


 256656293


(1 row)


 


Query20160907_030802_00014_kr9n9, FINISHED, 2 nodes


Splits:828 total, 828 done (100.00%)


0:03 [257M rows, 55GB] [78.7M rows/s, 16.9GB/s]


 


 


5.      JDBC驱动访问Presto


通过使用JDBC驱动,可以访问Presto。下载presto-jdbc-0.152.jar并将这个jar文件添加到你的java应用程序的classpath中,Presto支持的URL格式如下:


jdbc:presto://host:port

jdbc:presto://host:port/catalog

jdbc:presto://host:port/catalog/schema

例如,可以使用下面的URL来连接运行在example.net服务器8080端口上的Presto的hive catalog中的sales schema:


jdbc:presto://example.net:8080/hive/sales

 


 


6.      Presto校验器


我们可以使用Presto Verifier 来将Presto与其他的数据库(例如:MySql)进行对比测试或者将两个Presto集群相互进行对比测试。如果我们需要对Presto进行二次开发,那么我们将会使用Presto Verifier不间断的与Presto的前一版本进行对比测试。


第一步:创建一个MySQL数据库,并且在数据库中用如下语句创建一个表:


CREATE TABLE verifier_queries(

    id INT NOT NULL AUTO_INCREMENT,

    suite VARCHAR(256) NOT NULL,

    name VARCHAR(256),

    test_catalog VARCHAR(256) NOT NULL,

    test_schema VARCHAR(256) NOT NULL,

    test_prequeries TEXT,

    test_query TEXT NOT NULL,

    test_postqueries TEXT,

    test_username VARCHAR(256) NOT NULL default 'verifier-test',

    test_password VARCHAR(256),

    control_catalog VARCHAR(256) NOT NULL,

    control_schema VARCHAR(256) NOT NULL,

    control_prequeries TEXT,

    control_query TEXT NOT NULL,

    control_postqueries TEXT,

    control_username VARCHAR(256) NOT NULL default 'verifier-test',

control_password VARCHAR(256),

session_properties_json VARCHAR(256),

    PRIMARY KEY (id)

);

第二步,创建一个属性文件,通过该属性文件来配置校验器:


suite=my_suite

query-database=jdbc:mysql://localhost:3306/my_database?user=my_username&password=my_password

control.gateway=jdbc:presto://localhost:8080

test.gateway=jdbc:presto://localhost:8081

thread-count=1

最后一步, 下载presto-verifier-0.152-executable.jar,并将其重命名为:verifier,通过命令:chmod +x为其赋予执行权限,然后运行:


./verifier config.properties

 


 


7.      管理Presto


7.1  Web接口


Presto提供了一个Web接口用于监控和管理查询。这个Web接口可以在Presto coordinator上通过HTTP访问,使用在etc/config.properties中配置的HTTP端口号。






如上的主页面显示了正在执行的查询数,正常活动的Worker数,排队的查询数,阻塞的查询数,并行度等等,以及每个查询的列表区域(包括查询的ID,查询语句,查询状态,用户名,数据源等等)。正在查询的排在最上面,紧接着依次为最近完成的查询,失败的查询等。


查询的状态有以下几种:


·        QUEUED – 查询以及被接受,正等待执行


·        PLANNING – 查询在计划中


·        STARTING – 查询已经开始执行


·        RUNNING – 查询已经运行,至少有一个task开始执行


·        BLOCKED – 查询被阻塞,并且在等待资源(缓存空间,内存,切片)


·        FINISHING – 查询正完成(比如commit forautocommit queries)


·        FINISHED – 查询已经完成(比如数据已经输出)


·        FAILED – 查询执行失败


如果你想查看一个查询更加详细信息,只要点击该查询的“Query ID”链接即可,部分内容如下:




 


 


每个查询中的Tasks,我们可以单击“ID”查询每个Task更多的信息。


 


 


查询的总概览中提供了一个“Kill Query”功能,另外也提供了三种可视化的功能,为Live Plan,Raw JSON和Split Timeline。比如我们可以从Split Timelinez中查询哪部分耗时比较多。


 


 


7.2  Presto调优


默认的Presto配置满足绝大部分的负载要求,下面的一些信息会帮助你解决Presto集群环境中的一些特殊的性能问题。


 


配置文件


task.info-refresh-max-wait


控制过期task信息,使用在调度中。增加这个值能够减少coordinator 的CPU负载,但是可能导致split调度不是最优的。


 


task.max-worker-threads


设置workers处理splits时使用的线程数。如果Worker的CPU利用率低并且所有的线程都在使用,那么增加这个值可以提高吞吐量,但是会增加堆内存空间大小。活动线程的数量可以通过


com.facebook.presto.execution.taskexecutor.runningsplitsJMX统计


 


distributed-joins-enabled


使用hash分布式join代替broadcast广播的join。分布式join需要根据join key的hash值来重新分布表,这可能比广播join慢,但是可以利用更大的表之间join操作。广播的join需要关接的右边的表加载到每一个节点的内存中,然而分布式join是将关联的右边的表加载到分布式内存中。我们可以在每一个查询中设置distributed_join 的session属性值来进行选择。


 


node-scheduler.network-topology


当调度split时,设置使用的网络拓扑。


当调度split时,“legacy”将忽略拓扑,“flat”将尝试在同一个节点调度split。


 


JVM配置


下面的参数可以帮助诊断GC问题:


-XX:+PrintGCApplicationConcurrentTime

-XX:+PrintGCApplicationStoppedTime

-XX:+PrintGCCause

-XX:+PrintGCDateStamps

-XX:+PrintGCTimeStamps

-XX:+PrintGCDetails

-XX:+PrintReferenceGC

-XX:+PrintClassHistogramAfterFullGC

-XX:+PrintClassHistogramBeforeFullGC

-XX:PrintFLSStatistics=2

-XX:+PrintAdaptiveSizePolicy

-XX:+PrintSafepointStatistics

-XX:PrintSafepointStatisticsCount=1

 


 


7.3  队列配置


队列规则定义在一个Json文件中,用于控制能够提交给Presto的查询的个数,以及每个队列中能够同时运行的查询的个数。用config.properties中的query.queue-config-file来指定Json配置文件的名字。


 


队列规则如果定义了多个队列,查询会按顺序依次进入不同的队列中。队列规则将按照顺序进行处理,并且使用第一个匹配上的规则。在以下的配置例子中,有5个队列模板,在user.${USER}队列中,${USER}表示着提交查询的用户名。同样的${SOURCE}表示提交查询的来源。


 


同样有如下的规则定义了哪一类查询会进入哪一个队列中:


·        第一条规则将使bob成为管理员,bob提交的查询进入admin队列。


·        第二条规则表示,来源包含pipeline的查询将首先进入用户的个人队列中,然后进入pipeline队列。当一个查询进入一个新的队列后,直到查询结束才会离开之前的队列。


·        最后一个规则包含所有的队列,将所有的查询加入到个人用户队列中


 


所有这些规则实现了这样的策略,bob是一个管理员,而其他用户需要遵循以下的限制:


1.    每个用户最多能同时运行5个查询,另外可以运行一个pipeline。


2.    最多能同时运行10个pipeline来源的查询。


3.    最多能同时运行100个其他查询。


 


{

  "queues": {

    "user.${USER}": {

      "maxConcurrent": 5,

      "maxQueued": 20

    },

    "user_pipeline.${USER}": {

      "maxConcurrent": 1,

      "maxQueued": 10

    },

    "pipeline": {

      "maxConcurrent": 10,

      "maxQueued": 100

    },

    "admin": {

      "maxConcurrent": 100,

      "maxQueued": 100

    },

    "global": {

      "maxConcurrent": 100,

      "maxQueued": 1000

    }

  },

  "rules": [

    {

      "user": "bob",

      "queues": ["admin"]

    },

    {

      "source": ".*pipeline.*",

      "queues": [

        "user_pipeline.${USER}",

        "pipeline",

        "global"

      ]

    },

    {

      "queues": [

        "user.${USER}",

        "global"

      ]

    }

  ]

}



大数据