SparkSQL编程——DataSet(3)

    • 一、DataSet
    • 二、何时使用DataSet
    • 三、创建DataSet
          • 1.从RDD创建DataSet
          • 2.从DataFrame转换DataSet
    • 四、DataSet的操作
    • 五、RDD、DataFrame和DateSet
          • 1.三者共性
          • 2.三者区别
          • 3.三者的相互转换
    • 六、使用IDEA编写SparkSQL程序
    • 七、用户自定义函数
      • 1.自定义UDF函数
      • 2.自定义UDAF函数
          • ①弱类型的UDAF
          • ②强类型的UDAF
    • 八、数据的加载与保存
      • 1.通用方法
          • ①指定保存
          • ② 指定文件
    • 九、where to go

一、DataSet

  • Dataset 是 Dataframe API的一个扩展,是Spark最新的数据抽象。Dataset是类型安全的结构化API,用于在Java和Scala中编写静态类型的代码。Dataset API在Python和R中不可用,因为这些语言是动态类型的。
  • Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。DataSet是强类型的。比如可以有 Dataset[Car],Dataset[Person].
  • DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。
  • 通常样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。

二、何时使用DataSet

  • DataFrame 的Row类型是Spark内部已经优化的类型,且在各种语言都能使用。当使用Dataset API 时,将Spark Row格式的每一行转换为指定的特定领域类型的对象(case类或 Java 类),此转换会有性能的下降。你可能会想,如果在使用Dataset时损失性能,那为什么我们还要使用它们呢?有下面几个主要原因:
    • 当你要执行的操作无法使用DataFrame操作表示时
    • 如果需要类型安全,并且愿意牺牲一定性能来实现它
  • 最常用的应用场景可能是先用DataFrame和再用Dataset,这可以手动在性能和类型安全之间进行权衡。这在有些情况时是很有用的,比如当基于DataFrame执行的提取、转换和加载 (ETL) 转换作业之后,想将数据送入驱动器并使用单机库操作时,或者是当需要在Spark SQL 中执行过滤和进一步操作之前,进行每行分析的预处理转换操作的时候。

三、创建DataSet

  • 创建DataSet最最常用的两种方法: 从RDD创建DataSet从DataFrame转换。两种方式都需要使用样例类,方便类型转换。
1.从RDD创建DataSet
  • 首先回顾下使用case class 创建的DataFrame。
    import spark.implicits._
    case class People(name: String, age: Int)
    val peopleRDD = sc.textFile("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json")
    
    peopleRDD.map{
       x => val para = x.split(" ")
       (para(0),para(1).trim.toInt)
    }.toDF
    
  • DataSet的创建一般也使用case class,Dataframe是Dataset的特列,DataFrame=Dataset[Row]
    import spark.implicits._
    case class People(name: String, age: Int)
    val peopleRDD =sc.textFile("/Users/xxx/opt/module/spark/examples/src/main/resources/people.json")
    
    peopleRDD.map{
    	x => val para = x.split(" ")
    	People(para(0),para(1).trim.toInt)
    }.toDS
    

    可以看出DataSet创建方式和DataFrame类似,区别就是DataFrame是Row类型的,而DataSet可以是其他类型的
2.从DataFrame转换DataSet
  • 从DataFrame转换DataSet比较简单,需要,我们使用as方法将其强制转换为指定的行类型,例如:
    df.as[People] // df 是一个DataFrame,People 是指定的类型

四、DataSet的操作

  • Dataset和DataFrame拥有完全相同的成员函数,Dataset上的转换/行动操作与DataFrame相同,区别只是每一行的数据类型不同。

五、RDD、DataFrame和DateSet

1.三者共性

  • 1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
  • 2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算。
  • 3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。
  • 4、三者都有partition的概念
  • 5、三者有许多共同的函数,如filter,排序等
  • 6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持import spark.implicits._
  • 7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型
2.三者区别
  • 1、RDD:RDD一般和spark mlib同时使用,不支持sparksql操作

  • 2、DataFrame:

    • 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过getAS方法解析或者模式匹配才能获取各个字段的值。
    • DataFrame与Dataset一般不与spark mlib同时使用
    • DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作。
    • DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
  • 3、Dataset:

    • Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。
    • DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息。
3.三者的相互转换

六、使用IDEA编写SparkSQL程序

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object HelloWorld {

  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    val df = spark.read.json("data/people.json")

    // Displays the content of the DataFrame to stdout
    df.show()

    df.filter($"age" > 21).show()

    df.createOrReplaceTempView("persons")

    spark.sql("SELECT * FROM persons where age > 21").show()

    spark.stop()
  }
}

七、用户自定义函数

  • 自定函数分数分为两种:自定义UDF函数自定义UDAF函数
  • 参考连接:Spark官网案例

1.自定义UDF函数

  • 自定义UDF函数函数比较简单,使用spark.udf.register注册函数,并编写函数体,如求字符字符串长度的UDF的函数strLen,注册完成后,使用方法同其他函数,直接调用即可:
    spark.udf.register("strLen", (str: String) => str.length())

2.自定义UDAF函数

  • 自定义UDAF函数比较复杂,又分为强类型的UDAF弱类型的UDAF函数
①弱类型的UDAF
  • 弱类型的UDAF 即 Untyped User-Defined Aggregate Functions。弱类型的函数需要继承UserDefinedAggregateFunction类,同时实现对应的抽象聚合方法。

  • 示例:定义一个求平均值的UDAF函数

    ///Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:
    
    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    
    object MyAverage extends UserDefinedAggregateFunction {
      // Data types of input arguments of this aggregate function
      def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
      // Data types of values in the aggregation buffer
      def bufferSchema: StructType = {
        StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
      }
      // The data type of the returned value
      def dataType: DataType = DoubleType
      // Whether this function always returns the same output on the identical input
      def deterministic: Boolean = true
      // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
      // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
      // the opportunity to update its values. Note that arrays and maps inside the buffer are still
      // immutable.
      def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0L
      }
      // Updates the given aggregation buffer `buffer` with new input data from `input`
      def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (!input.isNullAt(0)) {
          buffer(0) = buffer.getLong(0) + input.getLong(0)
          buffer(1) = buffer.getLong(1) + 1
        }
      }
      // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
      }
      // Calculates the final result
      def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
    }
    
    // Register the function to access it
    spark.udf.register("myAverage", MyAverage)
    
    val df = spark.read.json("examples/src/main/resources/employees.json")
    df.createOrReplaceTempView("employees")
    df.show()
    // +-------+------+
    // |   name|salary|
    // +-------+------+
    // |Michael|  3000|
    // |   Andy|  4500|
    // | Justin|  3500|
    // |  Berta|  4000|
    // +-------+------+
    
    val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
    result.show()
    // +--------------+
    // |average_salary|
    // +--------------+
    // |        3750.0|
    // +--------------+
    
②强类型的UDAF
  • 强类型的UDAF 即 Untyped User-Defined Aggregate Functions。强类型的函数需要继承Aggregator类,同时实现对应的抽象聚合方法。
  • 示例:定义一个求平均值的UDAF函数
    Type-Safe User-Defined Aggregate Functions
    User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:
    
    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.Encoder
    import org.apache.spark.sql.Encoders
    import org.apache.spark.sql.SparkSession
    
    case class Employee(name: String, salary: Long)
    case class Average(var sum: Long, var count: Long)
    
    object MyAverage extends Aggregator[Employee, Average, Double] {
      // A zero value for this aggregation. Should satisfy the property that any b + zero = b
      def zero: Average = Average(0L, 0L)
      // Combine two values to produce a new value. For performance, the function may modify `buffer`
      // and return it instead of constructing a new object
      def reduce(buffer: Average, employee: Employee): Average = {
        buffer.sum += employee.salary
        buffer.count += 1
        buffer
      }
      // Merge two intermediate values
      def merge(b1: Average, b2: Average): Average = {
        b1.sum += b2.sum
        b1.count += b2.count
        b1
      }
      // Transform the output of the reduction
      def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
      // Specifies the Encoder for the intermediate value type
      def bufferEncoder: Encoder[Average] = Encoders.product
      // Specifies the Encoder for the final output value type
      def outputEncoder: Encoder[Double] = Encoders.scalaDouble
    }
    
    val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
    ds.show()
    // +-------+------+
    // |   name|salary|
    // +-------+------+
    // |Michael|  3000|
    // |   Andy|  4500|
    // | Justin|  3500|
    // |  Berta|  4000|
    // +-------+------+
    
    // Convert the function to a `TypedColumn` and give it a name
    val averageSalary = MyAverage.toColumn.name("average_salary")
    val result = ds.select(averageSalary)
    result.show()
    // +--------------+
    // |average_salary|
    // +--------------+
    // |        3750.0|
    // +--------------+ 
    

八、数据的加载与保存

1.通用方法

①指定保存
  • Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。

    val df = spark.read.load("examples/src/main/resources/users.parquet") 
    df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
    
  • 当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称定json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。可以通过SparkSession提供的read.load方法用于通用加载数据,使用write和save保存数据。

    val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
    peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
    
  • 除此之外,可以直接运行SQL在文件上:

    val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`")
    peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
    
    peopleDF.show()
    //+----+-------+
    //| age|   name|
    //+----+-------+
    //|null|Michael|
    //|  30|   Andy|
    //|  19| Justin|
    //+----+-------+
    
② 指定文件
  • 可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下:
    SaveMode.ErrorIfExists(default):如果文件存在,则报错
    SaveMode.Append:追加
    SaveMode.Overwrite:覆写
    SaveMode.Ignore:数据存在,则忽略

九、where to go

第四章:SparkStreaming编程

更多推荐

Spark指南——第三章:SparkSQL编程——DataSet(3)