DataFrame和RDD互操作的两种方式:
1)反射:case class 前提:事先需要知道你的字段、字段类型 2)编程:Row 如果第一种情况不能满足你的要求(事先不知道列)3) 选型:优先考虑第一种
1 package com.imooc.spark 2 3 import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType} 4 import org.apache.spark.sql.{Row, SparkSession} 5 6 /** 7 * DataFrame和RDD的互操作 8 */ 9 object DataFrameRDDApp {10 11 def main(args: Array[String]) {12 13 val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()14 15 //inferReflection(spark)16 17 program(spark)18 19 spark.stop()20 }21 22 def program(spark: SparkSession): Unit = {23 // RDD ==> DataFrame24 val rdd = spark.sparkContext.textFile("file:///Users/arthurlance/data/infos.txt")25 26 val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))27 28 val structType = StructType(Array(StructField("id", IntegerType, true),29 StructField("name", StringType, true),30 StructField("age", IntegerType, true)))31 32 val infoDF = spark.createDataFrame(infoRDD,structType)33 infoDF.printSchema()34 infoDF.show()35 36 37 //通过df的api进行操作38 infoDF.filter(infoDF.col("age") > 30).show39 40 //通过sql的方式进行操作41 infoDF.createOrReplaceTempView("infos")42 spark.sql("select * from infos where age > 30").show()43 }44 45 def inferReflection(spark: SparkSession) {46 // RDD ==> DataFrame47 val rdd = spark.sparkContext.textFile("file:///Users/arthurlance/data/infos.txt")48 49 //注意:需要导入隐式转换50 import spark.implicits._51 val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()52 53 infoDF.show()54 55 infoDF.filter(infoDF.col("age") > 30).show56 57 infoDF.createOrReplaceTempView("infos")58 spark.sql("select * from infos where age > 30").show()59 }60 61 case class Info(id: Int, name: String, age: Int)62 63 }