更新时间:2021-10-29 来源:黑马程序员 浏览量:
SparkSQL不仅能够查询MySQL数据库中的数据,还可以向表中插人新的数据,实现方式的具体代码如文件4-5所示。
文件4-5 SparkSqlToMysql.scala
import java.util.Properties import org.apachen.spark.rdd.RDD import org.apache.spark.sq1.{DataFrame, SparkSession} //创建样例类Person case class Person (id: Int, name:String,age: Int) object SparkSqlToMysql { def main(args:ArrayL String]): Unit ={ //1.创建sparkSession对象 val spark: SparkSession=sparksession.builder() .appNamne("SparksqIToMysql") .master("local[2]") . getOrCreate() //2.创建数据 val data=spark.sparkContext .patgoarrav("3,wangwu,22","4,zhaoliu,26")) //3.按MySQL列名切分数据 val arRRD:RRD[Arey[String]] =data.map(_.split(",")") //4.RDD关联Person样例类 val personRDD:RDD[Person]= arrRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt). //导人隐式转换 import spark.implicits_ //5.将RDD转换成DataFrame val personDF:DataFrame=personRDD.toDF() //6.设置JDBC配置参数 val prop =new Properties() prop.setProperty("user","root") prop.setProperty("password","123456") prop.setProperty("driver","com.mysql.jdbc.Driver") //7.写入数据 personDF.write.mode("append").jdbc( "jdbc:mysql://192.168.121.134:3306/spark","spark.person",prop) personDF.show() } }
在文件4-5中,第5行代码首先创建case class Person样例类;第9~ 12行代码用来创建SparkSession对象;第14~15行代码则通过spark.SparkContext.parallelize( )方法创建一个RDD,该RDD值表示两个person数据;第17~24行代码表示将数据按照逗号切分并匹配case class Person中的字段用于转换成DataFrame对象;第26~29行代码表示设置JDBC配置参数,访问MySQL数据库;第31行代码personDF. write. mode()方法表示设置写人数据方式,该参数append是一个枚举类型,枚举参数分别有append、overwriteerrorIfExistsignore4个值,分别表示为追加、覆盖、表如果存在即报错(该值为默认值)、忽略新保存的数据。
运行文件4-5中的代码,返回sQLyog工具查看当前数据表,数据表内容如图4-7所示。