spark-streamingsql, 使用SQL操作Spark流

分享于 

7分钟阅读

GitHub

  繁體 雙語
Manipulate Spark-streaming by SQL
  • 源代码名称:spark-streamingsql
  • 源代码网址:http://www.github.com/Intel-bigdata/spark-streamingsql
  • spark-streamingsql源代码文档
  • spark-streamingsql源代码下载
  • Git URL:
    git://www.github.com/Intel-bigdata/spark-streamingsql.git
    Git Clone代码到本地:
    git clone http://www.github.com/Intel-bigdata/spark-streamingsql
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/Intel-bigdata/spark-streamingsql
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    
    针对( 不再维护 Spark,因为 Spark 2.0支持结构化流式流)的流式

    基于 CatalystSpark流为 Apache Spark 提供了的流式 SQL,旨在支持数据流的SQL风格查询。 我们的目标是通过桥接结构化数据查询和流处理之间的差距来提高催化剂的进度。

    我们为 Apache Spark ( 下文称为流式 SQL ) 提供的流式SQL提供:

    • 基于扩展时间的流和表数据的SQL支持,基于扩展时间的窗口聚合和 Join。
    • DStream和SQL之间易于相互操作。
    • 流源的外部源API支持。

    快速启动

    创建 StreamSQLContext

    StreamSQLContext 是所有流式sql相关功能的主要入口点。 StreamSQLContext 可以通过以下方式创建:

    valssc:StreamingContextvalsqlContext:SQLContextvalstreamSqlContext=newStreamSQLContext(ssc, sqlContext)

    或者你可以使用 HiveContext 获得完整的配置单元语义支持,例如:

    valssc:StreamingContextvalhiveContext:HiveContextvalstreamSqlContext=newStreamSQLContext(ssc, hiveContext)
    在DStreams上运行 SQL
    caseclassPerson(name: String, age: String)// Create an DStream of Person objects and register it as a stream.valpeople:DStream[Person] = ssc.socketTextStream(serverIP, serverPort)
    . map(_.split(","))
    . map(p =>Person(p(0), p(1).toInt))valschemaPeopleStream= streamSqlContext.createSchemaDStream(people)
    schemaPeopleStream.registerAsTable("people")valteenagers= sql("SELECT name FROM people WHERE age> = 10 && age <= 19")// The results of SQL queries are themselves DStreams and support all the normal operationsteenagers.map(t =>"Name: "+ t(0)).print()
    ssc.start()
    ssc.awaitTerminationOrTimeout(30*1000)
    ssc.stop()
    流关系 Join
    valuserStream:DStream[User]
    streamSqlContext.registerDStreamAsTable(userStream, "user")valitemStream:DStream[Item]
    streamSqlContext.registerDStreamAsTable(itemStream, "item")
    sql("SELECT * FROM user JOIN item ON user.id = item.id").print()valhistoryItem:DataFramehistoryItem.registerTempTable("history")
    sql("SELECT * FROM user JOIN item ON user.id = history.id").print()
    基于时间的窗口 Join/聚合
    sql(
     """ |SELECT t.word, COUNT(t.word) |FROM (SELECT * FROM test) OVER (WINDOW '9' SECONDS, SLIDE '3' SECONDS) AS t |GROUP BY t.word""".stripMargin)
    sql(
     """ |SELECT * FROM | user1 OVER (WINDOW '9' SECONDS, SLIDE '6' SECONDS) AS u |JOIN | user2 OVER (WINDOW '9' SECONDS, SLIDE '6' SECONDS) AS v |ON u.id = v.id |WHERE u.id> 1 and u.id <3 and v.id> 1 and v.id <3""".stripMargin)

    注意:对于基于时间的窗口 Join,窗口的大小和滑动大小对于所有连接的流都应该是相同的。 这就是Spark流的局限性。

    Kafka的外部源API支持
    streamSqlContext.command(
     """ |CREATE TEMPORARY TABLE t_kafka ( | word string |) |USING org.apache.spark.sql.streaming.sources.KafkaSource |OPTIONS( | zkQuorum"localhost:2181", | groupId"test", | topics"aa:1", | messageToRow"org.apache.spark.sql.streaming.examples.MessageDelimiter")""".stripMargin)

    有关更多示例,请检查示例

    如何构建和部署

    流SQL是使用rtc构建的,你可以使用相关命令在 test/compile/package. 中

    在 spark-1.1 上构建流 SQL,你可以在中将Spark版本更改为你想要的版本,当前流式的SQL可以使用Spark版本 1.3 +。

    要使用流式 SQL,将打包的jar 放入你的环境中,Spark可以访问它,你可以使用 spark-submit --jars 或者其他方法。

    常见问题解答

    Q1.当前版本版本中提供了哪些类型的接口?

    当前版本仅支持 Scala DSL编程模型。 目前不支持 spark sql CLI和JDBC驱动程序。

    Q2.是否支持现有表的架构推理?

    是,可以使用SparkSQL从 static 源获取架构并应用到流子句中。

    Q3。它遵循的SQL标准是什么类型的?

    spark-streamsql覆盖依赖 SparkSQL,它可以支持DMLs大部分部分和一些 ddl。

    Q4.我可以在 spark streamsql中运行链式SQL查询?

    Curretly不支持这样的功能。

    Q5.它识别配置单元 Metastore?

    是,可以使用HiveContext初始化 StreamSQLContext,以获得 spark-streamsql的配置单元支持。

    如何在 spark-streamsql ( 说 UDTF,UDAF,UDF。) 中运行定制函数?

    是,你可以通过StreamSQLContext来创建 register UDF。

    Q7可以将( 覆盖) 查询结果插入表或者外部存储( HBase )?

    不支持,你需要通过 Scala 代码处理这个问题。

    Q8.它支持最新版本的Spark?

    不,pre发行版只支持 Spark 1.4.1,目前我们仍在支持 1.5. x 和 1.6. x.

    联系人:Jerry,Grace,[Jiangang Duan] ( mailto:jiangang.duan@intel.com ), Hao。

    这里项目是在Apache许可版本 2.0下开放的。


    MAN  str  spa  spark  Spark Streaming  
    相关文章