好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java API如何实现向Hive批量导入数据

Java API实现向Hive批量导入数据

Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

package com.enn.idcard;

import java.io.IOException;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.ArrayList;

import java.util.List;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

/**

  * <p>Description: </p>

  * @author kangkaia

  * @date 2017年12月26日 下午1:42:24

  */

public class HiveJdbc {

     public static void main(String[] args) throws IOException {

         List<List> argList = new ArrayList<List>();

         List<String> arg = new ArrayList<String>();

         arg.add( "12345" );

         arg.add( "m" );

         argList.add(arg);

         arg = new ArrayList<String>();

         arg.add( "54321" );

         arg.add( "f" );

         argList.add(arg);

//      System.out.println(argList.toString());

         String dst = "/test/kk.txt" ;

         createFile(dst,argList);

         loadData2Hive(dst);

     }

 

     /**

      * 将数据插入hdfs中,用于load到hive表中,默认分隔符是"\001"

      * @param dst

      * @param contents

      * @throws IOException

      */

     public static void createFile(String dst , List<List> argList) throws IOException{

         Configuration conf = new Configuration();

         FileSystem fs = FileSystem.get(conf);

         Path dstPath = new Path(dst); //目标路径

         //打开一个输出流

         FSDataOutputStream outputStream = fs.create(dstPath);

         StringBuffer sb = new StringBuffer();

         for (List<String> arg:argList){

             for (String value:arg){

                 sb.append(value).append( "\001" );

             }

             sb.deleteCharAt(sb.length() - 4 ); //去掉最后一个分隔符

             sb.append( "\n" );

         }

         sb.deleteCharAt(sb.length() - 2 ); //去掉最后一个换行符

         byte [] contents =  sb.toString().getBytes();

         outputStream.write(contents);

         outputStream.close();

         fs.close();

         System.out.println( "文件创建成功!" );       

     }

     /**

      * 将HDFS文件load到hive表中

      * @param dst

      */

     public static void loadData2Hive(String dst) {

         String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver" ;

         String CONNECTION_URL = "jdbc:hive2://server-13:10000/default;auth=noSasl" ;

         String username = "admin" ;

         String password = "admin" ;

         Connection con = null ;

        

         try {

             Class.forName(JDBC_DRIVER);

             con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);

             Statement stmt = con.createStatement();        

             String sql = " load data inpath '" +dst+ "' into table population.population_information " ;

            

             stmt.execute(sql);

             System.out.println( "loadData到Hive表成功!" );

         } catch (SQLException e) {

             e.printStackTrace();

         } catch (ClassNotFoundException e) {

             e.printStackTrace();

         } finally {

             // 关闭rs、ps和con

             if (con != null ){

                 try {

                     con.close();

                 } catch (SQLException e) {

                     e.printStackTrace();

                 }

             }

         }

     }   

}

注意:

本例使用mvn搭建,conf配置文件放在src/main/resources目录下。

Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。

在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。一般而言,对于OLTP而言,行表优势大于列表,对于OLAP而言,列表的优势大于行表,特别容易想到当做聚合操作时,列表的复杂度将会比行表小的多,虽然单独rcfile的列运算不一定总是存在的,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。

通过hdfs导入hive的表默认是textfile格式的,因此可以改变存储格式,具体方法是先创建sequencefile、rcfile等格式的空表,然后重新插入数据即可。

?

1

2

3

insert overwrite table seqfile_table select * from textfile_table;

……

insert overwrite table rcfile_table select * from textfile_table;

java 批量插入hive中转在HDFS

稍微修改了下,这文章是通过将数据存盘后,加载到HIVE.

模拟数据放到HDFS然后加载到HIVE,请大家记得添加HIVE JDBC依赖否则会报错。

加载前的数据表最好用外部表,否则会drop表的时候元数据会一起删除!

?

1

2

3

4

5

< dependency >

  < groupId >org.apache.hive</ groupId >

  < artifactId >hive-jdbc</ artifactId >

  < version >1.1.0</ version >

</ dependency >

代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

import java.io.IOException;

import java.net.URI;

import java.net.URISyntaxException;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

public class Demo {

         public static void main(String[] args) throws Exception {

             List<List> argList = new ArrayList<List>();

             List<String> arg = new ArrayList<String>();

             arg.add( "12345" );

             arg.add( "m" );

             argList.add(arg);

             arg = new ArrayList<String>();

             arg.add( "54321" );

             arg.add( "f" );

             argList.add(arg);

//          System.out.println(argList.toString());

             String dst = "/test/kk.txt" ;

             createFile(dst,argList);

//          loadData2Hive(dst);

         }

         /**

          * 将数据插入hdfs中,用于load到hive表中,默认分隔符是"|"

          * @param dst

          * @param contents

          * @throws IOException

          * @throws Exception

          * @throws InterruptedException

          */

         public static void createFile(String dst , List<List> argList) throws IOException, InterruptedException, Exception{

             Configuration conf = new Configuration();

             FileSystem fs = FileSystem.get( new URI( "hdfs://hadoop:9000" ),conf, "root" );

             Path dstPath = new Path(dst); //目标路径

             //打开一个输出流

             FSDataOutputStream outputStream = fs.create(dstPath);

             StringBuffer sb = new StringBuffer();

             for (List<String> arg:argList){

                 for (String value:arg){

                     sb.append(value).append( "|" );

                 }

                 sb.deleteCharAt(sb.length() - 1 ); //去掉最后一个分隔符

                 sb.append( "\n" );

             }

             byte [] contents =  sb.toString().getBytes();

             outputStream.write(contents);

             outputStream.flush();;

             outputStream.close();

             fs.close();

             System.out.println( "文件创建成功!" );

            

         }

         /**

          * 将HDFS文件load到hive表中

          * @param dst

          */

         public static void loadData2Hive(String dst) {

             String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver" ;

             String CONNECTION_URL = "jdbc:hive2://hadoop:10000/default" ;

             String username = "root" ;

             String password = "root" ;

             Connection con = null ;

            

             try {

                 Class.forName(JDBC_DRIVER);

                 con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);

                 Statement stmt = con.createStatement();

                

                 String sql = " load data inpath '" +dst+ "' into table test " ; //test 为插入的表

                

                 stmt.execute(sql);

                 System.out.println( "loadData到Hive表成功!" );

             } catch (SQLException e) {

                 e.printStackTrace();

             } catch (ClassNotFoundException e) {

                 e.printStackTrace();

             } finally {

                 // 关闭rs、ps和con

                 if (con != null ){

                     try {

                         con.close();

                     } catch (SQLException e) {

                         e.printStackTrace();

                     }

                 }

             }

         }

        

     }

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/kangkangwanwan/article/details/78915134

查看更多关于Java API如何实现向Hive批量导入数据的详细内容...

  阅读:50次