SparkSQL编程模型:
第一步:
需要一个SQLContext对象,该对象是SparkSQL操作的入口
而构建一个SQLContext对象需要一个SparkContext
第二步:
构建好入口对象之后,要引入隐式转换的方法,作用是将读取到的各种文件转换成DataFrame,DataFrame是SparkSQL上进行统一操作的数据类型
第三步:
根据数据的格式,构建一个样例类。作用是提供将读取到的各种各样的数据类型隐式转换成一个统一的数据格式,方便编程
第四步:
使用SQLContext对象读取文件,并将其转换成DataFrame
第五步:
对数据进行相关操作。
1.DataFrame自带的操作方式。DataFrame提供了很多操作数据的方法,如where,select等
2.DSL方式。DSL其实使用的也是DataFrame提供的方法,但是在操作属性时可以方便的使用’ + 属性名的方式进行操作
3.将数据注册成表,通过SQL语句操作
object TextFile{ def main(args:Array[String]){ //第一步 //构建SparkContext对象,主要要使用new调用构造方法,否则就变成使用样例类的Apply方法了 val sc = new SparkContext() //构建SQLContext对象 val sqlContext = new SQLContext(sc) //第二步 import sqlContext.implicits._ //第三步 case Person(name:String,age:Int) //第四步,textFile从指定路径读取文件如果是集群模式要写hdfs文件地址;通过两个map操作将读取到的文件转换成Person类的对象,每一行对应一个Person对象;toDF将其转换成DataFrame val people = sc.textFile( "文件路径" ). map (_.split( "," )). map { case (name,age) => Person(name,age.toInt)}.toDF() //第五步 //DataFrame方法 println ( "------------------------DataFrame------------------------------------" ) //赛选出age>10的记录,然后只选择name属性,show方法将其输出 people.where(people( "age" ) > 10 ). select (people( "name" )).show() //DSL println ( "---------------------------DSL---------------------------------" ) people.where( ‘age > 10).select(‘ name).show() //SQL println ( "-----------------------------SQL-------------------------------" ) //将people注册成people表 people.registerTempTable( "people" ) //使用sqlContext的sql方法来写SQL语句 //查询返回的是RDD,所以对其进行collect操作,之后循环打印 sqlContext.sql( "select name from people where age > 10" ).collect.foreach( println ) //保存为parquet文件,之后的parquet演示会用到 people.saveAsParquet( "保存的路径" ) } }
parquet格式文件测试:
val sc = new SparkContext() val sql = new SQLContext(sc) import sql.implicits._ val parquet = sql.parquetFile(args (0 )) println ( "------------------------DataFrame------------------------------------" ) println (parquet.where(parquet( "age" ) > 10 ). select (parquet( "name" )).show()) println ( "---------------------------DSL---------------------------------" ) println (parquet.where( ‘age > 10).select(‘ name).show()) println ( "-----------------------------SQL-------------------------------" ) parquet.registerTempTable( "parquet" ) sql.sql( "select name from parquet where age > 10" ). map (p => "name:" + p (0 )).collect().foreach( println )
Json格式测试:
val sc = new SparkContext() val sql = new SQLContext(sc) import sql.implicits._ val json = sql.jsonFile(args (0 )) println ( "------------------------DataFrame------------------------------------" ) println (json.where(json( "age" ) > 10 ). select (json( "name" )).show()) println ( "---------------------------DSL---------------------------------" ) println (json.where( ‘age > 10).select(‘ name).show()) println ( "-----------------------------SQL-------------------------------" ) json.registerTempTable( "json" ) sql.sql( "select name from json where age > 10" ). map (p => "name:" + p (0 )).collect().foreach( println )
可以看到上面的代码几乎和读取文本文件的一模一样,只不顾sc在读取文件的时候使用了parquetFile/jsonFile方法,而之后的操作是一摸一样的
由于parquet和json数据读取进来就是一个可操作的格式并且会自动转换成DataFrame,所以省去了case class的定义步骤和toDF的操作
以上为SparkSQL API的简单使用
$(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘ ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i ‘).text(i)); }; $numbering.fadeIn(1700); }); });Spark(九) -- SparkSQL API编程
标签:sparksql
查看更多关于Spark(九) -- SparkSQL API编程的详细内容...