Spark SQL和DataFrame指南[中]

2017-01-05 11:10:17来源:oschina作者:openthings人点击

翻译自: http://spark.apache.org/docs/1.3.0/sql-programming-guide.html


概述(Overview)

SparkSQL是Spark的一个模块,用于结构化数据处理。它提供了一个编程的抽象被称为DataFrames,也可以作为分布式SQL查询引擎。


DataFrames

DataFrame是一种以命名列方式组织的分布式数据集。它概念上相当于关系型数据库中的表,或者R/Python中的数据帧,但是具有更丰富的优化。有很多方式可以构造出一个DataFrame,例如:结构化数据文件,Hive中的tables,外部数据库或者存在的RDDs。


DataFrame的API适用于Scala、Java和Python。


该页上所有的例子使用Spark分布式中的样本数据,可以运行在spark-shell或者pysparkshell中。


入口点:SQLContext

SparkSQL中所有功能的入口点是SQLContext类,或者它子类中的一个。为了创建一个基本的SQLContext,你所需要的是一个SparkContext。

valsc:SparkContext//AnexistingSparkContext.


valsqlContext=neworg.apache.spark.sql.SQLContext(sc)//thisisusedtoimplicitlyconvertanRDDtoaDataFrame.


importsqlContext.implicits._

除了基本的SQLContext,你还可以创建一个HiveContext,它提供了基本的SQLContext的所提供的功能的超集。这些功能中包括附加的特性,可以编写查询,使用更完全的HiveQL解析器,访问HiveUDFs,能够从Hive表中读取数据。想要使用HiveContext,你不需要有一个存在的Hive步骤,并且所有SQLContext可用的数据源仍旧可用。HiveContext只是单独打包,以避免包含默认Sparkbuild中的所有Hive依赖。如果这些依赖对于你的应用不是一个问题,那么推荐使用Spark1.3版本的HiveContext。


使用spark.sql.dialect选项,可以选择SQL的具体变种,用它来解析查询。这个参数可以使用SQLContext上的setConf方法或者在SQL中使用一组key=value命令。对于SQLContext,唯一可以的dialect是“sql”,它可以使用SparkSQL提供的一个简单的SQL解析器。在HiveContext中,默认的是“hiveql”,尽管“sql”也是可用的。因为HiveOL解析器更加完整,在大多数情况下,推荐使用这个。


创建DataFrames

使用SQLContext,应用可以从一个已经存在的RDD、Hive表或者数据源中创建DataFrames。


例如,以下根据一个JSON文件创建出一个DataFrame:


valsc:SparkContext//AnexistingSparkContext.


valsqlContext=neworg.apache.spark.sql.SQLContext(sc)//CreatetheDataFrame


valdf=sqlContext.jsonFile("examples/src/main/resources/people.json")//ShowthecontentoftheDataFrame


df.show()


//agename


//nullMichael


//30Andy


//19Justin


//Printtheschemainatreeformat


df.printSchema()


//root//|--age:long(nullable=true)


//|--name:string(nullable=true)


//Selectonlythe"name"column


df.select("name").show()


//name


//Michael


//Andy


//Justin


//Selecteverybody,butincrementtheageby1


//df.select("name",df("age")+1).show()//官方文档这样的,但是测试时发现这样编译不通过。下面的形式可以


df.select(df("name"),df("age")+1).show()


//name(age+1)


//Michaelnull


//Andy31


//Justin20


//Selectpeopleolderthan21


df.filter(df("age")>21).show()


//agename


//30Andy


//Countpeoplebyage


df.groupBy("age").count().show()


//agecount


//null1


//191


//301


以编程方式运行SQL查询

SQLContext中的sql函数使应用可以以编程方式运行SQL查询,并且将结果以DataFrame形式返回。


valsqlContext=...//AnexistingSQLContext


valdf=sqlContext.sql("SELECT*FROMtable")


RRDs之间的互操作(InteroperatingwithRDDs)


SparkSQL支持两种不同的方法,用于将存在的RDDs转换成DataFrames。第一种方法使用反射来推断包含特定类型的对象的RDD的模式。在写Spark应用时,当你已知schema的情况下,这种基于反射的方式使得代码更加简介,并且效果更好。


创建DataFrames的第二种方法是通过编程接口,它允许你构建一个模式,然后将其应用到现有的RDD上。这种方式更加的繁琐,它允许你构建一个DataFrame当列以及类型未知,直到运行时才能知道时。


使用反射推断模式

SparkSQL中的Scala接口支持自动地将包含case类的RDD转换成DataFrame。case类定义了表的模式,case类的参数的名称使用反射来读取,然后称为列的名称。case类还可以嵌套或者包含复杂的类型,例如Sequences或者Arrays。这个RDD可以隐式地转换为DataFrame,然后注册成表,表可以在后续SQL语句中使用


//scisanexistingSparkContext.


valsqlContext=neworg.apache.spark.sql.SQLContext(sc)


//thisisusedtoimplicitlyconvertanRDDtoaDataFrame.


importsqlContext.implicits._//Definetheschemausingacaseclass.


//Note:CaseclassesinScala2.10cansupportonlyupto22fields.Toworkaroundthislimit,


//youcanusecustomclassesthatimplementtheProductinterface.


caseclassPerson(name:String,age:Int)//CreateanRDDofPersonobjectsandregisteritasatable.


valpeople=sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()


people.registerTempTable("people")//SQLstatementscanberunbyusingthesqlmethodsprovidedbysqlContext.


valteenagers=sqlContext.sql("SELECTnameFROMpeopleWHEREage>=13ANDage<=19")//TheresultsofSQLqueriesareDataFramesandsupportallthenormalRDDoperations.


//Thecolumnsofarowintheresultcanbeaccessedbyordinal.


teenagers.map(t=>"Name:"+t(0)).collect().foreach(println)


1、使用case类定义schema


2、创建一个SQLContext


3、导入sqlContext.implicits._,用于隐式地将RDD转换成DataFrame


4、创建一个DataFrame,并将它注册成表。


5、使用sqlContext提供的sql方法,就可以使用SQL语句来查询了。查询后返回的结果是DataFrame,它支持所有的RDD操作


以编程方式指定模式

当case类不能提前定义时(例如,记录的结构被编码在一个String中,或者不同的用户会将文本数据集和字段进行不同的解析和投影),DataFrame可以使用以下三步,以编程的方式实现:


1.CreateanRDDofRowsfromtheoriginalRDD;


2.CreatetheschemarepresentedbyaStructTypematchingthestructureofRowsintheRDDcreatedinStep1.


3.ApplytheschematotheRDDofRowsviacreateDataFramemethodprovidedbySQLContext.


1.从原有的RDD中创建行的RDD。


2.创建一个由StructType表示的模式,StructType符合由步骤1创建的RDD的行的结构。


3.通过SQLContext提供的createDataFrame方法,将模式应用于行的RDD。


Forexample:


//scisanexistingSparkContext.


valsqlContext=neworg.apache.spark.sql.SQLContext(sc)//CreateanRDD


valpeople=sc.textFile("examples/src/main/resources/people.txt")//Theschemaisencodedinastring


valschemaString="nameage"//ImportSparkSQLdatatypesandRow.


importorg.apache.spark.sql._//Generatetheschemabasedonthestringofschema


valschema=StructType(schemaString.split("").map(fieldName=>StructField(fieldName,StringType,true)))//ConvertrecordsoftheRDD(people)toRows.


valrowRDD=people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))//ApplytheschematotheRDD.


valpeopleDataFrame=sqlContext.createDataFrame(rowRDD,schema)//RegistertheDataFramesasatable.


peopleDataFrame.registerTempTable("people")//SQLstatementscanberunbyusingthesqlmethodsprovidedbysqlContext.


valresults=sqlContext.sql("SELECTnameFROMpeople")//TheresultsofSQLqueriesareDataFramesandsupportallthenormalRDDoperations.


//Thecolumnsofarowintheresultcanbeaccessedbyordinal.


results.map(t=>"Name:"+t(0)).collect().foreach(println)

-----------------------------------------------------------


//mycode


importorg.apache.spark._


importorg.apache.spark.sql._


importorg.apache.spark.sql.types.{StructType,StructField,StringType}


valconf=newSparkConf().setMaster("local").setAppName("XX")


valsc=newSparkContext(conf)


valsqlContext=neworg.apache.spark.sql.SQLContext(sc)


valschemaString="fullNameage"


valschema=StructType(schemaString.split("").map(fieldName=>StructField(fieldName,StringType,true)))


valrowRDD=sc.textFile("data/people.txt").map(_.split("")).map(p=>Row(p(0),p(1).trim))


valpeopleDataFrame=sqlContext.createDataFrame(rowRDD,schema)


peopleDataFrame.registerTempTable("people")


valyoung=sqlContext.sql("select*frompeoplewhereage<25")


young.show()


数据源(DataSources)

SparkSQL支持通过DataFrame接口在多种数据源上进行操作。一个DataFrame可以如同一个标准的RDDs那样进行操作,还可以注册成临时的表。将一个DataFrame注册成临时表允许你在它的数据上运行SQL查询。本节介绍使用Spark数据源装载和保存数据的常用方法,使用Spark数据源保存数据。然后进入可用于内置数据源的特定选项。


通用的加载/保存功能

在最简单的形式中,默认的数据源(parquet除非通过spark.sql.sources.default另外进行配置)将被用于所有的操作。


valdf=sqlContext.load("people.parquet") df.select("name","age").save("namesAndAges.parquet")


手动指定选项

你还可以手动指定数据源,这些数据源将与任何额外的选项一同使用,你希望将这些选项传入到数据源中。数据源是通过它们的全名来指定的(如org.apache.spark.sql.parquet),但是对于内置的数据源,你也可以使用简短的名称(json,parquet,jdbc)。任何类型的DataFrames使用这些语法可以转化成其他的数据源:


valdf=sqlContext.load("people.json","json")


df.select("name","age").save("namesAndAges.parquet","parquet")


保存模式

Save操作可以可选择性地接收一个SaveModel,如果数据已经存在了,指定如何处理已经存在的数据。意识到这些保存模式没有利用任何锁,也不是原子的,这很重要。因此,如果有多个写入者试图往同一个地方写入,这是不安全的。此外,当执行一个Overwrite,在写入新的数据之前会将原来的数据进行删除。


Scala/Java


Python


Meaning




SaveMode.ErrorIfExists(default)


"error"(default)


WhensavingaDataFrametoadatasource,ifdataalreadyexists,anexceptionisexpectedtobethrown.

当往一个数据源中保存一个DataFrame,如果数据已经存在,会抛出一个异常。




SaveMode.Append


"append"


WhensavingaDataFrametoadatasource,ifdata/tablealreadyexists,contentsoftheDataFrameareexpectedtobeappendedtoexistingdata.

当往一个数据源中保存一个DataFrame,如果data/table已经存在,DataFrame的内容会追加到已经存在的数据后面。




SaveMode.Overwrite


"overwrite"


OverwritemodemeansthatwhensavingaDataFrametoadatasource,ifdata/tablealreadyexists,existingdataisexpectedtobeoverwrittenbythecontentsoftheDataFrame.

Overwrite模式意味着当向数据源中保存一个DataFrame时,如果data/table已经存在了,已经存在的数据会被DataFrame中内容覆盖掉。




SaveMode.Ignore


"ignore"


IgnoremodemeansthatwhensavingaDataFrametoadatasource,ifdataalreadyexists,thesaveoperationisexpectedtonotsavethecontentsoftheDataFrameandtonotchangetheexistingdata.Thisissimilartoa`CREATETABLEIFNOTEXISTS`inSQL.

Ignore模式意味着当向数据源中保存一个DataFrame时,如果数据已经存在,save操作不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATETABLEIFNOTEXISTS`相似。




保存为持久化表

当与HiveContext一起工作时,DataFrames也可以使用saveAsTable命令保存为持久化的表。不像registerTempTable命令,saveAsTable会将DataFrame的内容进行物化,并且在HiveMetastore中创建一个指向数据的指针。持久化表会仍旧存在即使你的Spark程序重新启动。只要你保持连接到相同的元存储(metastore)。一个持久化表的DataFrame可以通过调用SQLContext上的带有表的名称的table方法来创建。


默认情况下,saveAsTable会创建一个“管理表(managedtable)”,意味着元存储控制数据的位置。当一个表被删除后,managedtable会自动地删除它们的数据。


ParquetFiles

Parquet 是一种柱状的格式,被许多其他数据处理系统所支持。SparkSQL支持度对Parquet文件的读和写,自动保存原有数据的模式。


以编程方式加载数据

LoadingDataProgrammatically


Usingthedatafromtheaboveexample:


使用上面例子中的数据:


//sqlContextfromthepreviousexampleisusedinthisexample.


//ThisisusedtoimplicitlyconvertanRDDtoaDataFrame.


importsqlContext.implicits._valpeople:RDD[Person]=...


//AnRDDofcaseclassobjects,fromthepreviousexample.//TheRDDisimplicitlyconvertedtoaDataFramebyimplicits,allowingittobestoredusingParquet.


people.saveAsParquetFile("people.parquet")//Readintheparquetfilecreatedabove.Parquetfilesareself-describingsotheschemaispreserved.


//TheresultofloadingaParquetfileisalsoaDataFrame.


valparquetFile=sqlContext.parquetFile("people.parquet")//ParquetfilescanalsoberegisteredastablesandthenusedinSQLstatements.


parquetFile.registerTempTable("parquetFile")


valteenagers=sqlContext.sql("SELECTnameFROMparquetFileWHEREage>=13ANDage<=19")


teenagers.map(t=>"Name:"+t(0)).collect().foreach(println)


分区发现

在系统中,如Hive,使用表分区是一个常见的优化途径。在一个分区表中,数据经常存储在不同的目录中,对每一个分区目录中的路径中,对分区列的值进行编码。Parquet数据源现在可以自动地发现并且推断出分区的信息。例如,我们可以将之前使用的人口数据存储成下列目录结构的分区表,两个额外的列,gender和country作为分区列:


path└──to└──table├──gender=male│├──...│││├──country=US││└──data.parquet│├──country=CN││└──data.parquet│└──...└──gender=female├──...│├──country=US│└──data.parquet├──country=CN│└──data.parquet└──...


通过向SQLContext.parquetFile或者SQLContext.load中传入path/to/table,SparkSQL会自动地从路径中提取分区信息。现在返回的DataFrame模式变成:


root|--name:string(nullable=true)|--age:long(nullable=true)|--gender:string(nullable=true)|--country:string(nullable=true)


注意到分区列的数据类型自动被推断出来。目前支持数字的数据类型和string类型。


模式合并

像ProtocolBuffer,Avro和Thrift那样,Parquet还支持模式演化。用户可以从一个简单的模式开始,并且根据需要逐渐地向模式中添加更多的列。这样,用户最终可能会有多个不同但是具有相互兼容的模式的Parquet文件。Parquet数据源现在可以自动地发现这种情况,并且将所有这些文件的模式进行合并。


//sqlContextfromthepreviousexampleisusedinthisexample


.//ThisisusedtoimplicitlyconvertanRDDtoaDataFrame.


importsqlContext.implicits._//CreateasimpleDataFrame,storedintoapartitiondirectory


valdf1=sparkContext.makeRDD(1to5).map(i=>(i,i*2)).toDF("single","double")


df1.saveAsParquetFile("data/test_table/key=1")//CreateanotherDataFrameinanewpartitiondirectory,


//addinganewcolumnanddroppinganexistingcolumn


valdf2=sparkContext.makeRDD(6to10).map(i=>(i,i*3)).toDF("single","triple")


df2.saveAsParquetFile("data/test_table/key=2")//Readthepartitionedtable


valdf3=sqlContext.parquetFile("data/test_table")


df3.printSchema()//Thefinalschemaconsistsofall3columnsintheParquetfilestogether


//withthepartiioningcolumnappearedinthepartitiondirectorypaths.


//root


//|--single:int(nullable=true)


//|--double:int(nullable=true)


//|--triple:int(nullable=true)


//|--key:int(nullable=true)


配置

Parquet的配置可以使用SQLContext的setConf来设置或者通过使用SQL运行SETkey=value命令



PropertyName


Default


Meaning




spark.sql.parquet.binaryAsString


false


SomeotherParquet-producingsystems,inparticularImpalaandolderversionsofSparkSQL,donotdifferentiatebetweenbinarydataandstringswhenwritingouttheParquetschema.ThisflagtellsSparkSQLtointerpretbinarydataasastringtoprovidecompatibilitywiththesesystems.

其他的一些产生Parquet的系统,特别是Impala和SparkSQL的老版本,当将Parquet模式写出时不会区分二进制数据和字符串。这个标志告诉SparkSQL将二进制数据解析成字符串,以提供对这些系统的兼容。




spark.sql.parquet.int96AsTimestamp


true


SomeParquet-producingsystems,inparticularImpala,storeTimestampintoINT96.SparkwouldalsostoreTimestampasINT96becauseweneedtoavoidprecisionlostofthenanosecondsfield.ThisflagtellsSparkSQLtointerpretINT96dataasatimestamptoprovidecompatibilitywiththesesystems.

其他的一些产生Parquet的系统,特别是Impala,将时间戳存储为INT96的形式。Spark也将时间戳存储为INT96,因为我们要避免纳秒级字段的精度的损失。这个标志告诉SparkSQL将INT96数据解析为一个时间戳,以提供对这些系统的兼容。




spark.sql.parquet.cacheMetadata


true


TurnsoncachingofParquetschemametadata.Canspeedupqueryingofstaticdata.

打开Parquet模式的元数据的缓存。能够加快对静态数据的查询。




spark.sql.parquet.compression.codec


gzip


SetsthecompressioncodecusewhenwritingParquetfiles.Acceptablevaluesinclude:uncompressed,snappy,gzip,lzo.

设置压缩编码解码器,当写入一个Parquet文件时。可接收的值包括:uncompressed,snappy,gzip,lzo




spark.sql.parquet.filterPushdown


false


TurnonParquetfilterpushdownoptimization.ThisfeatureisturnedoffbydefaultbecauseofaknownbuginParuet1.6.0rc3(PARQUET-136).However,ifyourtabledoesn'tcontainanynullablestringorbinarycolumns,it'sstillsafetoturnthisfeatureon.

打开Parquet过滤器的后进先出存储的优化。这个功能默认是被关闭的,因为一个Parquet中的一个已知的bug1.6.0rc3(PARQUET-136)。然而,如果你的表中不包含任何的可为空的(nullable)字符串或者二进制列,那么打开这个功能是安全的。




spark.sql.hive.convertMetastoreParquet


true


Whensettofalse,SparkSQLwillusetheHiveSerDeforparquettablesinsteadofthebuiltinsupport.

当设置成false,SparkSQL会为parquet表使用HiveSerDe(Serialize/Deserilize)而不是内置的支持。




JSON数据集

SparkSQL可以自动推断出JSON数据集的模式,将它作为DataFrame进行加载。这个转换可以通过使用SQLContext中的下面两个方法中的任意一个来完成。


•jsonFile-从一个JSON文件的目录中加载数据,文件中的每一个行都是一个JSON对象。


•jsonRDD-从一个已经存在的RDD中加载数据,每一个RDD的元素是一个包含一个JSON对象的字符串。


注意,作为jsonFile提供deep文件不是一个典型的JSON文件。每一行必须包含一个分开的独立的有效JSON对象。因此,常规的多行JSON文件通常会失败。


//scisanexistingSparkContext.


valsqlContext=neworg.apache.spark.sql.SQLContext(sc)//AJSONdatasetispointedtobypath.


//Thepathcanbeeitherasingletextfileoradirectorystoringtextfiles.


valpath="examples/src/main/resources/people.json"


//CreateaDataFramefromthefile(s)pointedtobypath


valpeople=sqlContext.jsonFile(path)//TheinferredschemacanbevisualizedusingtheprintSchema()method.


people.printSchema()


//root


//|--age:integer(nullable=true)


//|--name:string(nullable=true)//RegisterthisDataFrameasatable.


people.registerTempTable("people")//SQLstatementscanberunbyusingthesqlmethodsprovidedbysqlContext.


valteenagers=sqlContext.sql("SELECTnameFROMpeopleWHEREage>=13ANDage<=19")//Alternatively,aDataFramecanbecreatedforaJSONdatasetrepresentedby


//anRDD[String]storingoneJSONobjectperstring.


valanotherPeopleRDD=sc.parallelize("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}"""::Nil)


valanotherPeople=sqlContext.jsonRDD(anotherPeopleRDD)


Hive表

SparkSQL还支持对存储在ApacheHive中的数据的读和写。但是,因为Hive有大量的依赖,它不包含在默认的Sparkassembly中。对Hive的支持是通过在Spark的build中添加-Phive和-Phive-thriftserver标志来完成。这个命令构建了一个新的assemblyjar,它包含Hive。注意,这个HIveassemblyjar还必须出现在所有的worker节点,因为为了访问存储在Hive中的数据,它们会访问Hive的序列化和反序列化库(SerDes)。


Hive的配置是通过将你的hive-site.xml文件放到conf/下来完成。


当使用Hive时,必须构建一个HiveContext,它继承自SQLContext,对寻找MetaStore中的表和使用HiveQL编写查询提供了支持。没有部署Hive的用户也可以创建一个HiveContext。当没有通过hive-site.xml进行配置,context会在当前目录下自动创建一个metastore_db和warehouse。


//scisanexistingSparkContext.


valsqlContext=neworg.apache.spark.sql.hive.HiveContext(sc)sqlContext.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)")


sqlContext.sql("LOADDATALOCALINPATH'examples/src/main/resources/kv1.txt'INTOTABLEsrc")//QueriesareexpressedinHiveQL


sqlContext.sql("FROMsrcSELECTkey,value").collect().foreach(println)


JDBCToOtherDatabases

SparkSQL还包含了一个数据源,它可以使用JDBC从其他数据库中读取数据。这个功能应该优先使用JdbcRDD。这是因为结果是作为DataFrame返回的,并且在SparkSQL中可以简单的被使用或加入其他数据源。JDBC数据源也能够轻松地被Java或者Python来使用,因为它不需要用户提供一个ClassTag(注意,这不同于SparkSQLJDBC服务,它允许其他应用使用SparkSQL运行查询)。


要开始使用时,你需要为你的特定数据块在Spark的类路径中添加JDBC驱动。例如,从SparkShell中连接到postgres,你需要运行以下的命令:


SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jarbin/spark-shell


通过使用数据源的API,可以将远程数据库中的表加载为DataFrame或者SparkSQL临时表。以下的选项是被支持的:



PropertyName


Meaning




url


TheJDBCURLtoconnectto.

要连接到的JDBCURL




dbtable


TheJDBCtablethatshouldberead.Notethatanythingthatisvalidina`FROM`clauseofaSQLquerycanbeused.Forexample,insteadofafulltableyoucouldalsouseasubqueryinparentheses.

需要读取的JDBC表。注意,SQL查询中的“From”子句中的任何部分都是可以使用的。例如,你可以在括号中使用子查询,而不是一个完整的表。




driver


TheclassnameoftheJDBCdriverneededtoconnecttothisURL.ThisclasswithbeloadedonthemasterandworkersbeforerunninganJDBCcommandstoallowthedrivertoregisteritselfwiththeJDBCsubsystem.

需要连接到的URL的JDBC驱动的类名。这个类会在运行一个JDBC命令之前被加载到master和workers上,允许驱动注册自己和JDBC子系统。




partitionColumn,lowerBound,upperBound,numPartitions


Theseoptionsmustallbespecifiedifanyofthemisspecified.Theydescribehowtopartitionthetablewhenreadinginparallelfrommultipleworkers.partitionColumnmustbeanumericcolumnfromthetableinquestion.

如果这些选项中的一个被指定了,那么所有的选项必须被指定。它们描述了当从多个workers上并行读取时如何进行分区。partitionColumn必须是表中的一个数字列。




valjdbcDF=sqlContext.load("jdbc",Map("url"->"jdbc:postgresql:dbserver","dbtable"->"schema.tablename"))


故障排除

*在客户端session以及所有executors上,JDBC驱动类必须对原来的类加载器是可见的。这是因为Java的DriverManager类进行安全检查,这导致打开一个连接时,它会忽略所有对于原始来加载器不可见的驱动。一个方便的方法是在所有的worker节点上修改compute_classpath.sh以包含驱动的JARs。


*一些数据库,例如H2,将所有的名称转化成大写的。在SparkSQL中你需要使用大写来指定那些名称。


性能调节(PerformanceTuning)


对于一些工作负载来说,通过将数据缓存到内存中或者打开一些实验性的选项,可能会提高性能。


在内存中缓存数据

SparkSQL可以使用内存中的柱状格式来对表进行缓存,通过调用sqlContext.cacheTable("tableName")或者rdataFrame.cache()。然后SparkSQL会扫描仅仅需要的列以及自动地调节压缩,以减少内存使用和GC压力。你可以调用sqlContext.uncacheTable("tableName")来将表从内存中移除。内存缓存的配置可以使用SQLContext上的setConf或者通过使用SQL执行SETkey=value命令的方式来完成。



PropertyName


Default


Meaning




spark.sql.inMemoryColumnarStorage.compressed


true


WhensettotrueSparkSQLwillautomaticallyselectacompressioncodecforeachcolumnbasedonstatisticsofthedata.

当设置为true时,SparkSQL会根据统计出的各项数据为每一个列选择一种压缩编解码器。




spark.sql.inMemoryColumnarStorage.batchSize


10000


Controlsthesizeofbatchesforcolumnarcaching.Largerbatchsizescanimprovememoryutilizationandcompression,butriskOOMswhencachingdata.

为列状缓存控制批处理大小。较大的批大小可以提高内存的利用率和压缩,但是缓存数据时有OOMs的风险。



其他的配置选项

以下选项也可以用来调节执行查询时的性能。随着更多的优化被自动地执行,这些选项有可能会在将来的版本中被弃用,



PropertyName


Default


Meaning




spark.sql.autoBroadcastJoinThreshold


10485760(10MB)


Configuresthemaximumsizeinbytesforatablethatwillbebroadcasttoallworkernodeswhenperformingajoin.Bysettingthisvalueto-1broadcastingcanbedisabled.NotethatcurrentlystatisticsareonlysupportedforHiveMetastoretableswherethecommand`ANALYZETABLECOMPUTESTATISTICSnoscan`hasbeenrun.




spark.sql.codegen


false


Whentrue,codewillbedynamicallygeneratedatruntimeforexpressionevaluationinaspecificquery.Forsomequerieswithcomplicatedexpressionthisoptioncanleadtosignificantspeed-ups.However,forsimplequeriesthiscanactuallyslowdownqueryexecution.




spark.sql.shuffle.partitions


200


Configuresthenumberofpartitionstousewhenshufflingdataforjoinsoraggregations.



分布式SQL引擎

SparkSQL也可以使用它的JDBC/ODBC或者命令行接口作为分布式查询引擎。以这种模式,终端用户或者应用可以直接与SparkSQL交互来运行SQL查询,而不需要写任何代码。


运行ThriftJDBC/ODBC服务

这里实现的ThriftJDBC/ODBC相当于Hive0.13中的HiveServer2.你可以用Spark或者Hive0.13自带的beeline脚本来测试JDBC服务。


在Spark目录中运行下面的命令来启动JDBC/ODBCserver:


./sbin/start-thriftserver.sh


该脚本接受所有的bin/spark-submit命令行选项,外加一个--hiveconf选项来指定Hive属性。可能执行./sbin/start-thriftserver.sh--help来查看完整的可用选项列表。默认情况下,该服务会监听localhost:10000。你可以通过任一环境变量覆盖这个bahaviour,也就是:


exportHIVE_SERVER2_THRIFT_PORT=


exportHIVE_SERVER2_THRIFT_BIND_HOST=./sbin/start-thriftserver.sh/--master/...


或者系统属性


./sbin/start-thriftserver.sh/--hiveconfhive.server2.thrift.port=/--hiveconfhive.server2.thrift.bind.host=/--master...


现在你可以使用beeline来测试ThriftJDBC/ODBC服务:


./bin/beeline


使用beeline来连接JDBC/ODBC:


beeline>!connectjdbc:hive2://localhost:10000


Beeline会向你询问用户名和密码。在非安全模式下,仅仅输入你机子的用户名以及一个空白的密码。对于安全模式,请按照beeline文档给出的指示。


通过将hive-site.xml文件放到conf/下来完成Hive的配置。


你也可以使用Hive自带的beeline脚本。


ThriftJDBC服务还支持通过HTTP传输发送ThriftRPC消息。使用下列设置作为系统属性或者通过conf/中hive-site.xml文件来启用HTTP模式:


hive.server2.transport.mode-Setthistovalue:httphive.server2.thrift.http.port-HTTPportnumberfolistenon;defaultis10001hive.server2.http.endpoint-HTTPendpoint;defaultiscliservice


为了测试,使用beeline以http模式连接JDBC/ODBC服务:


beeline>!connectjdbc:hive2://:/?hive.server2.transport.mode=http;hive.server2.thrift.http.path=


运行SparkSQLCLI

SparkSQLCLI是一种方便的工具用来以本地模式运行Hivemetastore服务,并且从命令行输入执行查询。注意,SparkSQLCLI不能和ThriftJDBC服务进行会话。


在Spark目录下,执行以下命令来启动SparkSQLCLI:


./bin/spark-sql


Hive的配置已经完成,通过将hive-site.xml文件放入conf/中。你可以运行./bin/spark-sql--help来查看完整的可用选项的列表。


迁移指南

从SparkSQL1.0-1.2升级到1.3UpgradingfromSparkSQL1.0-1.2to1.3。


在Spark1.3中,我们从SparkSQL中移除了“Alpha”标签,作为其中的一部分,对可用的APIs进行了清理。从Spark1.3以后,SparkSQL会与1.X系列中的其他版本提供二进制兼容。这种兼容性保证不包括的APIs明确地标记为不稳定。(也就是DeveloperAPI或者rExperimental)


重命名SchemaRDD为DataFrame

当升级到SparkSQL1.3后,用户可以发现最大的改变是SchemaRDD重命名为DataFrame。这主要是因为DataFrame不再直接继承自RDD。而是自己实现了RDDs提供的大多数功能。DataFrames仍旧可以通过调用.rdd方法来转化成RDDs。


Scala中有一种类型别名,从SchemaRDD到DataFrame为一些用例提供了源的兼容性。但还是建议用户更新它们的代码来使用DataFrame。Java和Python用户需要更新它们的代码。


Java和ScalaAPIs的统一

Spark1.3之前,有单独的Java兼容性类(JavaSQLContext和JavaSchemaRDD)映射成ScalaAPI。在Spark1.3中,JavaAPI和ScalaAPI进行了统一。任意语言的用户应该使用SQLContext和DataFrame。通常,这些类会尝试使用两种语言中都可用的类型(即,Array而不是语言特定的集合)。在有些情况下,如果没有相同的类型存在,则会使用函数重载来替代。


此外,Java指定的类型API被移除了。Scala和Java的用户应该使用inorg.apache.spark.sql.types中的类来以编程方式描述模式。


隐式转换的隔离以及dsl包的删除(只有Scala)

在Spark1.3之前许多代码的例子以importsqlContext._开始,它会将sqlContext中所有的函数引入到scope中。在Spark1.3中,我们将在SQLContext内部的RDDs转换成DataFrames转成成对象进行了隐式转化的隔离。用户现在需要写importsqlContext.implicits._。


此外,隐式转换现在只有组成Rroducts的RDDs参数带有一个toDF方法,而不是自动地应用。


当使用DSL(现在替代为DataFrameAPI)内部的方法时,用户之前会importorg.apache.spark.sql.catalyst.dsl。现在使用公共的dataframe方法API应该用importorg.apache.spark.sql.functions._。


为DataType删除在org.apache.spark.sql中的类型别名(只有Scala)

Spark1.3为DataType删除了出现在根本的sql包中的类型别名。现在,用户应该引入org.apache.spark.sql.types中的类。


UDF注册移到sqlContext.udf中(Java和Scala)

用于注册UDFs的函数,不是用于DataFrameDSL就是SQL,已经被移动了SQLContext中的udf对象中。


sqlCtx.udf.register("strLen",(s:String)=>s.length())


Python的UDF注册没有改变。


Python中的DataTypes不再是单例了(PythonDataTypesNoLongerSingletons)。

当使用Python中的DataTypes,你需要创建它们(i.e.StringType()),而不是引用一个单例。


与ApacheHive的兼容性

SparkSQL被设计出来用于兼容HiveMetastore,SerDes以及UDFs。目前的SparkSQL是基于Hive0.12.0和0.13.1。


部署在现有的Hive仓库中(DeployinginExistingHiveWarehouses)

TheSparkSQLThrigtJDBC服务被设计出来“立即可用”的兼容现有的Hive安装。你不需要修改已经存在的HiveMetastore或者更改数据位置或者表分区。


支持的Hive特性

SparkSQL支持大量的Hive特性,例如:


·Hivequerystatements,including:


oSELECT


oGROUPBY


oORDERBY


oCLUSTERBY


oSORTBY


·AllHiveoperators,including:


oRelationaloperators(=,⇔,==,<>,<,>,>=,<=,etc)


oArithmeticoperators(+,-,*,/,%,etc)


oLogicaloperators(AND,&&,OR,||,etc)


oComplextypeconstructors


oMathematicalfunctions(sign,ln,cos,etc)


oStringfunctions(instr,length,printf,etc)


·Userdefinedfunctions(UDF)


·Userdefinedaggregationfunctions(UDAF)


·Userdefinedserializationformats(SerDes)


·Joins


oJOIN


o{LEFT|RIGHT|FULL}OUTERJOIN


oLEFTSEMIJOIN


oCROSSJOIN


·Unions


·Sub-queries


oSELECTcolFROM(SELECTa+bAScolfromt1)t2


·Sampling


·Explain


·Partitionedtables


·View


·AllHiveDDLFunctions,including:


oCREATETABLE


oCREATETABLEASSELECT


oALTERTABLE


·MostHiveDatatypes,including:


oTINYINT


oSMALLINT


oINT


oBIGINT


oBOOLEAN


oFLOAT


oDOUBLE


oSTRING


oBINARY


oTIMESTAMP


oDATE


oARRAY<>


oMAP<>


oSTRUCT<>


不支持的Hive功能

下面是不支持的Hive特性的列表。大多数特性很少会在Hive部署中用到。


MajorHiveFeatures


·Tableswithbuckets:bucketisthehashpartitioningwithinaHivetablepartition.SparkSQLdoesn’tsupportbucketsyet.


·带有buckets的Table:bucket是Hive表分区中的哈希分区。SparkSQL不支持buckets。


EsotericHiveFeatures


*UNIONtype*Uniquejoin*Columnstatisticscollecting:SparkSQLdoesnotpiggybackscanstocollectcolumnstatisticsatthemomentandonlysupportspopulatingthesizeInBytesfieldofthehivemetastore.


HiveInput/OutputFormats


·FileformatforCLI:ForresultsshowingbacktotheCLI,SparkSQLonlysupportsTextOutputFormat.


·Hadooparchive


·CLI文件格式:对于结果,显示回CLI,SparkSQL只支持TextOutputFormat


·Hadoop存档


HiveOptimizations

少量的Hive优化没有包含在Spark中。由于SparkSQL的内存计算模型它们中的有一些(例如索引)是次要的。其他的一些会来的SparkSQL版本中加入。


·Blocklevelbitmapindexesandvirtualcolumns


·Automaticallydeterminethenumberofreducersforjoinsandgroupbys:CurrentlyinSparkSQL,youneedtocontrolthedegreeofparallelismpost-shuffleusing“SETspark.sql.shuffle.partitions=[num_tasks];”.


·Meta-dataonlyquery:Forqueriesthatcanbeansweredbyusingonlymetadata,SparkSQLstilllaunchestaskstocomputetheresult.


·Skewdataflag:SparkSQLdoesnotfollowtheskewdataflagsinHive.


·STREAMTABLEhintinjoin:SparkSQLdoesnotfollowtheSTREAMTABLEhint.


·Mergemultiplesmallfilesforqueryresults:iftheresultoutputcontainsmultiplesmallfiles,HivecanoptionallymergethesmallfilesintofewerlargefilestoavoidoverflowingtheHDFSmetadata.SparkSQLdoesnotsupportthat.


·块级别的位图索引和虚拟列(用来创建索引)


·自动为joins和groupbys决定reducers的数量:目前在SparkSQL中,你需要使用“SETspark.sql.shuffle.partitions=[num_tasks];”来控制并行的post-shuffle的度。


·仅元数据查询:对于仅使用元数据来回答的查询,SparkSQL还是启动任务来计算结果。


·偏斜数据标志:SparkSQL不遵循Hive中的偏斜数据标志。


·join中的STREAMTABLEhint:SparkSQL不遵循STREAMTABLEhint。


·为查询结果合并多个小文件:如果结果输出中包含多个小文件,Hive可以选择性地将多个小文件合并成更少的更大的文件,避免HDFS元数据的溢出。SparkSQL不支持这些。


数据类型

SparkSQL和DataFrames支持以下的数据类型:


·Numerictypes


oByteType:Represents1-bytesignedintegernumbers.Therangeofnumbersisfrom-128to127.


oShortType:Represents2-bytesignedintegernumbers.Therangeofnumbersisfrom-32768to32767.


oIntegerType:Represents4-bytesignedintegernumbers.Therangeofnumbersisfrom-2147483648to2147483647.


oLongType:Represents8-bytesignedintegernumbers.Therangeofnumbersisfrom-9223372036854775808to9223372036854775807.


oFloatType:Represents4-bytesingle-precisionfloatingpointnumbers.


oDoubleType:Represents8-bytedouble-precisionfloatingpointnumbers.


oDecimalType:Representsarbitrary-precisionsigneddecimalnumbers.Backedinternallybyjava.math.BigDecimal.ABigDecimalconsistsofanarbitraryprecisionintegerunscaledvalueanda32-bitintegerscale.


·Stringtype


oStringType:Representscharacterstringvalues.


·Binarytype


oBinaryType:Representsbytesequencevalues.


·Booleantype


oBooleanType:Representsbooleanvalues.


·Datetimetype


oTimestampType:Representsvaluescomprisingvaluesoffieldsyear,month,day,hour,minute,andsecond.


oDateType:Representsvaluescomprisingvaluesoffieldsyear,month,day.


·Complextypes


oArrayType(elementType,containsNull):RepresentsvaluescomprisingasequenceofelementswiththetypeofelementType.containsNullisusedtoindicateifelementsinaArrayTypevaluecanhavenullvalues.


oMapType(keyType,valueType,valueContainsNull):Representsvaluescomprisingasetofkey-valuepairs.ThedatatypeofkeysaredescribedbykeyTypeandthedatatypeofvaluesaredescribedbyvalueType.ForaMapTypevalue,keysarenotallowedtohavenullvalues.valueContainsNullisusedtoindicateifvaluesofaMapTypevaluecanhavenullvalues.


oStructType(fields):RepresentsvalueswiththestructuredescribedbyasequenceofStructFields(fields).


§StructField(name,dataType,nullable):RepresentsafieldinaStructType.Thenameofafieldisindicatedbyname.ThedatatypeofafieldisindicatedbydataType.nullableisusedtoindicateifvaluesofthisfieldscanhavenullvalues.


SparkSQL中所有的数据类型的都在包packageorg.apache.spark.sql.types中,你可以这样访问它们:


importorg.apache.spark.sql.types._


Datatype


ValuetypeinScala


APItoaccessorcreateadatatype




ByteType


Byte


ByteType




ShortType


Short


ShortType




IntegerType


Int


IntegerType




LongType


Long


LongType




FloatType


Float


FloatType




DoubleType


Double


DoubleType




DecimalType


java.math.BigDecimal


DecimalType




StringType


String


StringType




BinaryType


Array[Byte]


BinaryType




BooleanType


Boolean


BooleanType




TimestampType


java.sql.Timestamp


TimestampType




DateType


java.sql.Date


DateType




ArrayType


scala.collection.Seq


ArrayType(elementType,[containsNull]) Note:ThedefaultvalueofcontainsNullistrue.




MapType


scala.collection.Map


MapType(keyType,valueType,[valueContainsNull]) Note:ThedefaultvalueofvalueContainsNullistrue.




StructType


org.apache.spark.sql.Row


StructType(fields) Note:fieldsisaSeqofStructFields.Also,twofieldswiththesamenamearenotallowed.




StructField


ThevaluetypeinScalaofthedatatypeofthisfield(Forexample,IntforaStructFieldwiththedatatypeIntegerType)


StructField(name,dataType,nullable)

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台