好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

IDEA 开发配置SparkSQL及简单使用案例代码

1.添加依赖

在idea项目的pom.xml中添加依赖。

?

1

2

3

4

5

6

<!--spark sql依赖,注意版本号-->

<dependency>

     <groupId>org.apache.spark</groupId>

     <artifactId>spark-sql_2. 12 </artifactId>

     <version> 3.0 . 0 </version>

</dependency>

2.案例代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

package com.zf.bigdata.spark.sql

 

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

 

object Spark01_SparkSql_Basic {

 

     def main(args: Array[String]): Unit = {

 

         //创建上下文环境配置对象

         val sparkConf = new SparkConf().setMaster( "local[*]" ).setAppName( "sparkSql" )

         //创建 SparkSession 对象

         val spark = SparkSession.builder().config(sparkConf).getOrCreate()

 

         // DataFrame

         val df: DataFrame = spark.read.json( "datas/user.json" )

         //df.show()

 

         // DataFrame => Sql

 

         //df.createOrReplaceTempView("user")

         //spark.sql("select * from user").show()

         //spark.sql("select age from user").show()

         //spark.sql("select avg(age) from user").show()

 

         //DataFrame => Dsl

 

         //如果涉及到转换操作,转换需要引入隐式转换规则,否则无法转换,比如使用$提取数据的值

         //spark 不是包名,是上下文环境对象名

         import spark.implicits._

         //df.select("age","username").show()

         //df.select($"age"+1).show()

         //df.select('age+1).show()

 

         // DataSet

 

         //val seq = Seq(1,2,3,4)

         //val ds: Dataset[Int] = seq.toDS()

         // ds.show()

 

         // RDD <=> DataFrame

         val rdd = spark.sparkContext.makeRDD(List(( 1 , "张三" , 10 ),( 2 , "李四" , 20 )))

         val df1: DataFrame = rdd.toDF( "id" , "name" , "age" )

         val rdd1: RDD[Row] = df1.rdd

 

         // DataFrame <=> DataSet

         val ds: Dataset[User] = df1.as[User]

         val df2: DataFrame = ds.toDF()

 

         // RDD <=> DataSet

         val ds1: Dataset[User] = rdd.map {

             case (id, name, age) => {

                 User(id, name = name, age = age)

             }

         }.toDS()

         val rdd2: RDD[User] = ds1.rdd

 

         spark.stop()

     }

     case class User(id:Int,name:String,age:Int)

 

}

PS:下面看下在IDEA中开发Spark SQL程序

IDEA 中程序的打包和运行方式都和 SparkCore 类似,Maven 依赖中需要添加新的依赖项:

?

1

2

3

4

5

<dependency>

     <groupId>org.apache.spark</groupId>

     <artifactId>spark-sql_2. 11 </artifactId>

     <version> 2.1 . 1 </version>

</dependency>

一、指定Schema格式

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.IntegerType

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.Row

 

object Demo1 {

   def main(args: Array[String]): Unit = {

     //使用Spark Session 创建表

     val spark = SparkSession.builder().master( "local" ).appName( "UnderstandSparkSession" ).getOrCreate()

 

     //从指定地址创建RDD

     val personRDD = spark.sparkContext.textFile( "D:\\tmp_files\\student.txt" ).map(_.split( "\t" ))

 

     //通过StructType声明Schema

     val schema = StructType(

       List(

         StructField( "id" , IntegerType),

         StructField( "name" , StringType),

         StructField( "age" , IntegerType)))

 

     //把RDD映射到rowRDD

     val rowRDD = personRDD.map(p=>Row(p( 0 ).toInt,p( 1 ),p( 2 ).toInt))

     val personDF = spark.createDataFrame(rowRDD, schema)

 

     //注册表

     personDF.createOrReplaceTempView( "t_person" )

 

     //执行SQL

     val df = spark.sql( "select * from t_person order by age desc limit 4" )

     df.show()

     spark.stop()

 

   }

}

二、使用case class

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

import org.apache.spark.sql.SparkSession

 

//使用case class

object Demo2 {

 

   def main(args: Array[String]): Unit = {

     //创建SparkSession

     val spark = SparkSession.builder().master( "local" ).appName( "CaseClassDemo" ).getOrCreate()

 

     //从指定的文件中读取数据,生成对应的RDD

     val lineRDD = spark.sparkContext.textFile( "D:\\tmp_files\\student.txt" ).map(_.split( "\t" ))

 

     //将RDD和case class 关联

     val studentRDD = lineRDD.map( x => Student(x( 0 ).toInt,x( 1 ),x( 2 ).toInt))

 

     //生成 DataFrame,通过RDD 生成DF,导入隐式转换

     import spark.sqlContext.implicits._

     val studentDF = studentRDD.toDF

 

     //注册表 视图

     studentDF.createOrReplaceTempView( "student" )

 

     //执行SQL

     spark.sql( "select * from student" ).show()

 

     spark.stop()

   }

}

 

//case class 一定放在外面

case class Student(stuID:Int,stuName:String,stuAge:Int)

三、把数据保存到数据库

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

import org.apache.spark.sql.types.IntegerType

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.Row

import java.util.Properties

 

object Demo3 {

   def main(args: Array[String]): Unit = {

     //使用Spark Session 创建表

     val spark = SparkSession.builder().master( "local" ).appName( "UnderstandSparkSession" ).getOrCreate()

 

     //从指定地址创建RDD

     val personRDD = spark.sparkContext.textFile( "D:\\tmp_files\\student.txt" ).map(_.split( "\t" ))

 

     //通过StructType声明Schema

     val schema = StructType(

       List(

         StructField( "id" , IntegerType),

         StructField( "name" , StringType),

         StructField( "age" , IntegerType)))

 

     //把RDD映射到rowRDD

     val rowRDD = personRDD.map(p => Row(p( 0 ).toInt, p( 1 ), p( 2 ).toInt))

 

     val personDF = spark.createDataFrame(rowRDD, schema)

 

     //注册表

     personDF.createOrReplaceTempView( "person" )

 

     //执行SQL

     val df = spark.sql( "select * from person " )

 

     //查看SqL内容

     //df.show()

 

     //将结果保存到mysql中

     val props = new Properties()

     props.setProperty( "user" , "root" )

     props.setProperty( "password" , "123456" )

     props.setProperty( "driver" , "com.mysql.jdbc.Driver" )

     df.write.mode( "overwrite" ).jdbc( "jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8" , "student" , props)

     spark.close()

 

   }

}

以上内容转自:
https://blog.csdn.net/weixin_43520450/article/details/106093582
作者:故明所以

到此这篇关于IDEA 开发配置SparkSQL及简单使用案例代码的文章就介绍到这了,更多相关IDEA 开发 SparkSQL内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

查看更多关于IDEA 开发配置SparkSQL及简单使用案例代码的详细内容...

  阅读:16次