博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
DataFrame和RDD互操作的两种方式:
阅读量:5105 次
发布时间:2019-06-13

本文共 1964 字,大约阅读时间需要 6 分钟。

DataFrame和RDD互操作的两种方式:

1)反射:case class 前提:事先需要知道你的字段、字段类型
2)编程:Row 如果第一种情况不能满足你的要求(事先不知道列)
3) 选型:优先考虑第一种

 

1 package com.imooc.spark 2  3 import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType} 4 import org.apache.spark.sql.{Row, SparkSession} 5  6 /** 7  * DataFrame和RDD的互操作 8  */ 9 object DataFrameRDDApp {10 11   def main(args: Array[String]) {12 13     val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()14 15     //inferReflection(spark)16 17     program(spark)18 19     spark.stop()20   }21 22   def program(spark: SparkSession): Unit = {23     // RDD ==> DataFrame24     val rdd = spark.sparkContext.textFile("file:///Users/arthurlance/data/infos.txt")25 26     val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))27 28     val structType = StructType(Array(StructField("id", IntegerType, true),29       StructField("name", StringType, true),30       StructField("age", IntegerType, true)))31 32     val infoDF = spark.createDataFrame(infoRDD,structType)33     infoDF.printSchema()34     infoDF.show()35 36 37     //通过df的api进行操作38     infoDF.filter(infoDF.col("age") > 30).show39 40     //通过sql的方式进行操作41     infoDF.createOrReplaceTempView("infos")42     spark.sql("select * from infos where age > 30").show()43   }44 45   def inferReflection(spark: SparkSession) {46     // RDD ==> DataFrame47     val rdd = spark.sparkContext.textFile("file:///Users/arthurlance/data/infos.txt")48 49     //注意:需要导入隐式转换50     import spark.implicits._51     val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()52 53     infoDF.show()54 55     infoDF.filter(infoDF.col("age") > 30).show56 57     infoDF.createOrReplaceTempView("infos")58     spark.sql("select * from infos where age > 30").show()59   }60 61   case class Info(id: Int, name: String, age: Int)62 63 }

 

转载于:https://www.cnblogs.com/arthurLance/p/10626821.html

你可能感兴趣的文章
MySql update inner join!MySql跨表更新 多表update sql语句?如何将select出来的部分数据update到另一个表里面?...
查看>>
我最宏大的个人愿望
查看>>
北漂周记--第5记--拼命编程
查看>>
比赛总结一
查看>>
SpringBoot项目打包
查看>>
JSP的3种方式实现radio ,checkBox,select的默认选择值
查看>>
Linux操作系统 和 Windows操作系统 的区别
查看>>
《QQ欢乐斗地主》山寨版
查看>>
文件流的使用以及序列化和反序列化的方法使用
查看>>
Android-多线程AsyncTask
查看>>
第一个Spring冲刺周期团队进展报告
查看>>
C++函数基础知识
查看>>
红黑树 c++ 实现
查看>>
Android 获取网络链接类型
查看>>
报表服务框架:WEB前端UI
查看>>
5.9UDP客户端服务器-基于OK6410
查看>>
java自学基础、项目实战网站推荐
查看>>
软件包的使用
查看>>
linux中启动与终止lnmp的脚本
查看>>
gdb中信号的处理[转]
查看>>