通過擴展 Spark SQL ,打造自己的大數(shù)據(jù)分析引擎
Spark SQL 的 Catalyst ,這部分真的很有意思,值得去仔細(xì)研究一番,今天先來說說Spark的一些擴展機制吧,上一次寫Spark,對其SQL的解析進(jìn)行了一定的魔改,今天我們按套路來,使用磚廠為我們提供的機制,來擴展Spark...
首先我們先來了解一下 Spark SQL 的整體執(zhí)行流程,輸入的查詢先被解析成未關(guān)聯(lián)元數(shù)據(jù)的邏輯計劃,然后根據(jù)元數(shù)據(jù)和解析規(guī)則,生成邏輯計劃,再經(jīng)過優(yōu)化規(guī)則,形成優(yōu)化過的邏輯計劃(RBO),將邏輯計劃轉(zhuǎn)換成物理計劃在經(jīng)過代價模型(CBO),輸出真正的物理執(zhí)行計劃。
我們今天舉三個擴展的例子,來進(jìn)行說明。
擴展解析器
這個例子,我們擴展解析引擎,我們對輸入的SQL,禁止泛查詢即不許使用select *來做查詢,以下是解析的代。
- package wang.datahub.parser
- import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
- import org.apache.spark.sql.catalyst.expressions.Expression
- import org.apache.spark.sql.catalyst.parser.ParserInterface
- import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
- import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
- import org.apache.spark.sql.types.{DataType, StructType}
- class MyParser(parser: ParserInterface) extends ParserInterface {
- /**
- * Parse a string to a [[LogicalPlan]].
- */
- override def parsePlan(sqlText: String): LogicalPlan = {
- val logicalPlan = parser.parsePlan(sqlText)
- logicalPlan transform {
- case project @ Project(projectList, _) =>
- projectList.foreach {
- name =>
- if (name.isInstanceOf[UnresolvedStar]) {
- throw new RuntimeException("You must specify your project column set," +
- " * is not allowed.")
- }
- }
- project
- }
- logicalPlan
- }
- /**
- * Parse a string to an [[Expression]].
- */
- override def parseExpression(sqlText: String): Expression = parser.parseExpression(sqlText)
- /**
- * Parse a string to a [[TableIdentifier]].
- */
- override def parseTableIdentifier(sqlText: String): TableIdentifier =
- parser.parseTableIdentifier(sqlText)
- /**
- * Parse a string to a [[FunctionIdentifier]].
- */
- override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
- parser.parseFunctionIdentifier(sqlText)
- /**
- * Parse a string to a [[StructType]]. The passed SQL string should be a comma separated
- * list of field definitions which will preserve the correct Hive metadata.
- */
- override def parseTableSchema(sqlText: String): StructType =
- parser.parseTableSchema(sqlText)
- /**
- * Parse a string to a [[DataType]].
- */
- override def parseDataType(sqlText: String): DataType = parser.parseDataType(sqlText)
- }
接下來,我們測試一下
- package wang.datahub.parser
- import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
- import org.apache.spark.sql.catalyst.parser.ParserInterface
- object MyParserApp {
- def main(args: Array[String]): Unit = {
- System.setProperty("hadoop.home.dir","E:\\devlop\\envs\\hadoop-common-2.2.0-bin-master");
- type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
- type ExtensionsBuilder = SparkSessionExtensions => Unit
- val parserBuilder: ParserBuilder = (_, parser) => new MyParser(parser)
- val extBuilder: ExtensionsBuilder = { e => e.injectParser(parserBuilder)}
- val spark = SparkSession
- .builder()
- .appName("Spark SQL basic example")
- .config("spark.master", "local[*]")
- .withExtensions(extBuilder)
- .getOrCreate()
- spark.sparkContext.setLogLevel("ERROR")
- import spark.implicits._
- val df = Seq(
- ( "First Value",1, java.sql.Date.valueOf("2010-01-01")),
- ( "First Value",4, java.sql.Date.valueOf("2010-01-01")),
- ("Second Value",2, java.sql.Date.valueOf("2010-02-01")),
- ("Second Value",9, java.sql.Date.valueOf("2010-02-01"))
- ).toDF("name", "score", "date_column")
- df.createTempView("p")
- // val df = spark.read.json("examples/src/main/resources/people.json")
- // df.toDF().write.saveAsTable("person")
- //,javg(score)
- // custom parser
- // spark.sql("select * from p ").show
- spark.sql("select * from p").show()
- }
- }
下面是執(zhí)行結(jié)果,符合我們的預(yù)期。
擴展優(yōu)化器
接下來,我們來擴展優(yōu)化器,磚廠提供了很多默認(rèn)的RBO,這里可以方便的構(gòu)建我們自己的優(yōu)化規(guī)則,本例中我們構(gòu)建一套比較奇怪的規(guī)則,而且是完全不等價的,這里只是為了說明。
針對字段+0的操作,規(guī)則如下:
- 如果0出現(xiàn)在+左邊,則直接將字段變成右表達(dá)式,即 0+nr 等效為 nr
- 如果0出現(xiàn)在+右邊,則將0變成3,即 nr+0 變成 nr+3
- 如果沒出現(xiàn)0,則表達(dá)式不變
下面是代碼:
- package wang.datahub.optimizer
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.catalyst.expressions.{Add, Expression, Literal}
- import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
- import org.apache.spark.sql.catalyst.rules.Rule
- object MyOptimizer extends Rule[LogicalPlan] {
- def apply(logicalPlan: LogicalPlan): LogicalPlan = {
- logicalPlan.transformAllExpressions {
- case Add(left, right) => {
- println("this this my add optimizer")
- if (isStaticAdd(left)) {
- right
- } else if (isStaticAdd(right)) {
- Add(left, Literal(3L))
- } else {
- Add(left, right)
- }
- }
- }
- }
- private def isStaticAdd(expression: Expression): Boolean = {
- expression.isInstanceOf[Literal] && expression.asInstanceOf[Literal].toString == "0"
- }
- def main(args: Array[String]): Unit = {
- System.setProperty("hadoop.home.dir","E:\\devlop\\envs\\hadoop-common-2.2.0-bin-master");
- val testSparkSession: SparkSession = SparkSession.builder().appName("Extra optimization rules")
- .master("local[*]")
- .withExtensions(extensions => {
- extensions.injectOptimizerRule(session => MyOptimizer)
- })
- .getOrCreate()
- testSparkSession.sparkContext.setLogLevel("ERROR")
- import testSparkSession.implicits._
- testSparkSession.experimental.extraOptimizations = Seq()
- Seq(-1, -2, -3).toDF("nr").write.mode("overwrite").json("./test_nrs")
- // val optimizedResult = testSparkSession.read.json("./test_nrs").selectExpr("nr + 0")
- testSparkSession.read.json("./test_nrs").createTempView("p")
- var sql = "select nr+0 from p";
- var t = testSparkSession.sql(sql)
- println(t.queryExecution.optimizedPlan)
- println(sql)
- t.show()
- sql = "select 0+nr from p";
- var u = testSparkSession.sql(sql)
- println(u.queryExecution.optimizedPlan)
- println(sql)
- u.show()
- sql = "select nr+8 from p";
- var v = testSparkSession.sql(sql)
- println(v.queryExecution.optimizedPlan)
- println(sql)
- v.show()
- // println(optimizedResult.queryExecution.optimizedPlan.toString() )
- // optimizedResult.collect().map(row => row.getAs[Long]("(nr + 0)"))
- Thread.sleep(1000000)
- }
- }
執(zhí)行如下
- this this my add optimizer
- this this my add optimizer
- this this my add optimizer
- Project [(nr#12L + 3) AS (nr + CAST(0 AS BIGINT))#14L]
- +- Relation[nr#12L] json
- select nr+0 from p
- this this my add optimizer
- this this my add optimizer
- this this my add optimizer
- +------------------------+
- |(nr + CAST(0 AS BIGINT))|
- +------------------------+
- | 2|
- | 1|
- | 0|
- +------------------------+
- this this my add optimizer
- Project [nr#12L AS (CAST(0 AS BIGINT) + nr)#21L]
- +- Relation[nr#12L] json
- select 0+nr from p
- this this my add optimizer
- +------------------------+
- |(CAST(0 AS BIGINT) + nr)|
- +------------------------+
- | -1|
- | -2|
- | -3|
- +------------------------+
- this this my add optimizer
- this this my add optimizer
- this this my add optimizer
- Project [(nr#12L + 8) AS (nr + CAST(8 AS BIGINT))#28L]
- +- Relation[nr#12L] json
- select nr+8 from p
- this this my add optimizer
- this this my add optimizer
- this this my add optimizer
- +------------------------+
- |(nr + CAST(8 AS BIGINT))|
- +------------------------+
- | 7|
- | 6|
- | 5|
- +------------------------+
擴展策略
SparkStrategies包含了一系列特定的Strategies,這些Strategies是繼承自QueryPlanner中定義的Strategy,它定義接受一個Logical Plan,生成一系列的Physical Plan
通過Strategies把邏輯計劃轉(zhuǎn)換成可以具體執(zhí)行的物理計劃,代碼如下
- package wang.datahub.strategy
- import org.apache.spark.sql.{SparkSession, Strategy}
- import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
- import org.apache.spark.sql.execution.SparkPlan
- object MyStrategy extends Strategy {
- def apply(plan: LogicalPlan): Seq[SparkPlan] = {
- println("Hello world!")
- Nil
- }
- def main(args: Array[String]): Unit = {
- System.setProperty("hadoop.home.dir","E:\\devlop\\envs\\hadoop-common-2.2.0-bin-master");
- val spark = SparkSession.builder().master("local").getOrCreate()
- spark.experimental.extraStrategies = Seq(MyStrategy)
- val q = spark.catalog.listTables.filter(t => t.name == "six")
- q.explain(true)
- spark.stop()
- }
- }
執(zhí)行效果
好了,擴展部分就先介紹到這,接下來我計劃可能會簡單說說RBO和CBO,結(jié)合之前做過的一個小功能,一條SQL的查詢時間預(yù)估。
本文轉(zhuǎn)載自微信公眾號「麒思妙想」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系麒思妙想公眾號。