更新时间:2021-09-08 来源:黑马程序员 浏览量:
当case类不能提前定义的时候,就需要采用编程方式定义Schema信息,定义DataFrame主要包含3个步骤,具体如下:
(1)创建一个Row对象结构的RDD;
(2)基于StructType类型创建Schema;
(3)通过SparkSession提供的createDataFrame()方法来拼接Schema。
根据上述步骤,创建SparkSqlSchema. scala文件,使用编程方式定义Schema信息的具体代码如文件4-3所示。
文件4-3 SparkSqlSchema.scala
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sq1.types. {IntegerType,StringType,StructField,StructType} import org.apache.spark.sql.(DataFrame,Row,Sparkession) object SparkSqlSchema { def main(args: Array[string]): Unit=( //1.创建SparkSession val spark: sparkSession=Sparksession.bullder() .appName ("SparkSq1Schema") .master ("1oca1[2]") .getOrCreate () //2.获取sparkConttext对象 val sc: SparkContext=spark.sparkContext //设置日志打印级别 sc.setLogLevel ( "WARN") //3.加载数据 val dataRDD:RDD[String]=sc.textFile("D://spark//person.txt") //4.切分每一行 val dataArrayRDD:RDD[ Array[string]]=dataRDD.map( .split(" ")) //5.加载数据到Row对象中 val personRDD:RDD[Row]= dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //6.创建Schema val schema:StructType=StructType(Seq( StructField("id",IntegerType,false), StructField("name",StringType,false), StructField("age", IntegerType, false) )) //7.利用personRDD与Schema创建DataFrame val personDF:DataFrame=spark.createDataFrame(personRDD,schema) //8.DSL操作显示DataFrame的数据结果 personDF . show () //9.将DataFrame注册成表 personDF.createOrReplaceTempView ("t_person") //10.sq1语句操作 spark.sq1 ("select¥from t_ person") .show() //11.关闭资源 sc.stop() spark.stop ()
在文件4-3中,第9~23行代码表示将文件转换成为RDD的基本步骤,第25~29行代码即为编程方式定义Schema的核心代码,Spark SQL提供了Class StructType( val fields:Array[StructField])类来表示模式信息,生成一个StructType对象,需要提供fields作为输入参数,fields是个集合类型,StructField(name,dataTypenullable)参数分别表示为字段名称、字段数据类型、字段值是否允许为空值,根据person.txt文本数据文件分别设置id、name、age字段作为Schema,第31行代码表示通过调用spark.createDataFrame()方法将RDD和Schema进行合并转换为DataFrame,第33~40行代码即为操作DataFrame进行数据查询。