好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

java实现对Hadoop的操作

基本操作

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.junit.Test;

import org.junit.jupiter.api.BeforeEach;

import org.junit.jupiter.api.DisplayName;

import org.junit.runner.RunWith;

import org.junit.runners.JUnit4;

 

import java.io.IOException;

import java.net.URI;

import java.net.URISyntaxException;

import java.util.Arrays;

 

@RunWith (JUnit4. class )

@DisplayName ( "Test using junit4" )

public class HadoopClientTest {

 

     private FileSystem fileSystem = null ;

 

     @BeforeEach

     public void init() throws URISyntaxException, IOException, InterruptedException {

         Configuration configuration = new Configuration();

 

         configuration.set( "dfs.replication" , "1" );

         configuration.set( "dfs.blocksize" , "64m" );

         fileSystem = FileSystem.get( new URI( "hdfs://hd-even-01:9000" ), configuration, "root" );

     }

     /**

      * 从本地复制文件到Hadoop

      *

      * @throws URISyntaxException

      * @throws IOException

      * @throws InterruptedException

      */

     @Test

     public void copyFileFromLocal() throws URISyntaxException, IOException, InterruptedException {

         // 上传文件

         fileSystem.copyFromLocalFile( new Path( "C:\\Users\\Administrator\\Desktop\\win10激活.txt" ), new Path( "/even1" ));

         // 关闭流,报错winUtils,因为使用了linux的tar包,如果windows要使用,则需要编译好这个winUtils包才能使用

         fileSystem.close();

     }

 

     /**

      * 从Hadoop下载文件到本地,下载需要配置Hadoop环境,并添加winutils到bin目录

      *

      * @throws URISyntaxException

      * @throws IOException

      * @throws InterruptedException

      */

     @Test

     public void copyFileToLocal() throws URISyntaxException, IOException, InterruptedException {

         // 下载文件

         fileSystem.copyToLocalFile( new Path( "/win10激活.txt" ), new Path( "E:/" ));

         // 关闭流,报错winUtils,因为使用了linux的tar包,如果windows要使用,则需要编译好这个winUtils包才能使用

         fileSystem.close();

     }

 

 

     /**

      * 创建文件夹

      *

      * @throws IOException

      */

     @Test

     public void hdfsMkdir() throws IOException {

         // 调用创建文件夹方法

         fileSystem.mkdirs( new Path( "/even1" ));

         // 关闭方法

         fileSystem.close();

     }

 

     /**

      * 移动文件/修改文件名

      */

     public void hdfsRename() throws IOException {

         fileSystem.rename( new Path( "" ), new Path( "" ));

         fileSystem.close();

     }

 

     /**

      * 删除文件/文件夹

      *

      * @throws IOException

      */

     @Test

     public void hdfsRm() throws IOException {

//        fileSystem.delete(new Path(""));

         // 第二个参数表示递归删除

         fileSystem.delete( new Path( "" ), true );

 

         fileSystem.close();

     }

 

     /**

      * 查看hdfs指定目录的信息

      *

      * @throws IOException

      */

     @Test

     public void hdfsLs() throws IOException {

         // 调用方法返回远程迭代器,第二个参数是把目录文件夹内的文件也列出来

         RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles( new Path( "/" ), true );

         while (listFiles.hasNext()) {

             LocatedFileStatus locatedFileStatus = listFiles.next();

 

             System.out.println( "文件路径:" + locatedFileStatus.getPath());

             System.out.println( "块大小:" + locatedFileStatus.getBlockSize());

             System.out.println( "文件长度:" + locatedFileStatus.getLen());

             System.out.println( "副本数量:" + locatedFileStatus.getReplication());

             System.out.println( "块信息:" + Arrays.toString(locatedFileStatus.getBlockLocations()));

         }

 

         fileSystem.close();

     }

 

     /**

      * 判断是文件还是文件夹

      */

     @Test

     public void findHdfs() throws IOException {

         // 1,展示状态信息

         FileStatus[] listStatus = fileSystem.listStatus( new Path( "/" ));

         // 2,遍历所有文件

         for (FileStatus fileStatus : listStatus) {

             if (fileStatus.isFile())

                 System.out.println( "是文件:" + fileStatus.getPath().getName());

             else if (fileStatus.isDirectory())

                 System.out.println( "是文件夹:" + fileStatus.getPath().getName());

         }

 

         fileSystem.close();

     }

 

}

文件读写

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.junit.Before;

import org.junit.Test;

import org.junit.jupiter.api.DisplayName;

import org.junit.runner.RunWith;

import org.junit.runners.JUnit4;

 

import java.io.*;

import java.net.URI;

import java.net.URISyntaxException;

import java.nio.charset.StandardCharsets;

import java.util.Arrays;

 

@RunWith (JUnit4. class )

@DisplayName ( "this is read write test!" )

public class HadoopReadWriteTest {

     FileSystem fileSystem = null ;

     Configuration configuration = null ;

     @Before

     public void init() throws URISyntaxException, IOException, InterruptedException {

         // 1,加载配置

         configuration = new Configuration();

         // 2,构建客户端

         fileSystem = FileSystem.get( new URI( "hdfs://hd-even-01:9000/" ), configuration, "root" );

     }

 

 

     @Test

     public void testReadData() throws IOException {

         // 1,获取hdfs文件流

         FSDataInputStream open = fileSystem.open( new Path( "/win10激活.txt" ));

         // 2,设置一次获取的大小

         byte [] bytes = new byte [ 1024 ];

         // 3,读取数据

         while (open.read(bytes) != - 1 )

             System.out.println(Arrays.toString(bytes));

 

         open.close();

         fileSystem.close();

     }

 

     /**

      * 使用缓存流

      *

      * @throws IOException

      */

     @Test

     public void testReadData1() throws IOException {

         FSDataInputStream open = fileSystem.open( new Path( "/win10激活.txt" ));

 

         // 使用缓冲流会快点

         BufferedReader bufferedReader = new BufferedReader( new InputStreamReader(open, StandardCharsets.UTF_8));

 

         String line = "" ;

 

         while ((line = bufferedReader.readLine()) != null ) {

             System.out.println(line);

         }

 

         bufferedReader.close();

         open.close();

         fileSystem.close();

     }

 

     /**

      * 指定偏移量来实现只读部分内容

      */

     @Test

     public void readSomeData() throws IOException {

         FSDataInputStream open = fileSystem.open( new Path( "/win10激活.txt" ));

 

 

         // 指定开始的index

         open.seek( 14 );

 

         // 指定读的多少

         byte [] bytes = new byte [ 5 ];

         while (open.read(bytes) != - 1 )

             System.out.println( new String(bytes));

 

         open.close();

         fileSystem.close();

 

     }

 

     /**

      * 流方式写数据

      * @throws IOException

      */

     @Test

     public void writeData() throws IOException {

         // 1,获取输出流

         FSDataOutputStream out = fileSystem.create( new Path( "/win11.txt" ), false );

 

         // 2,获取需要写的文件输入流

         FileInputStream in = new FileInputStream( new File( "C:\\Users\\Administrator\\Desktop\\xixi.txt" ));

 

         byte [] b = new byte [ 1024 ];

         int read = 0 ;

         while ((read = in.read(b)) != - 1 ) {

             out.write(b, 0 , read);

         }

         in.close();

         out.close();

         fileSystem.close();

     }

 

     /**

      * 直接写字符串

      */

     @Test

     public void writeData1() throws IOException {

         // 1,创建输出流

         FSDataOutputStream out = fileSystem.create( new Path( "/aibaobao.txt" ), false );

         // 2,写数据

         out.write( "wochaoaibaobao" .getBytes());

         // 3,关闭流

         IOUtils.closeStream(out);

         fileSystem.close();

     }

 

     /**

      * IOUtils方式上传

      *

      * @throws IOException

      */

     @Test

     public void putToHdfs() throws IOException {

         // 1,获取输入流

         FileInputStream in = new FileInputStream( new File( "C:\\Users\\Administrator\\Desktop\\xixi.txt" ));

         // 2,获取输出流

         FSDataOutputStream out = fileSystem.create( new Path( "/haddopPut.txt" ), false );

         // 3,拷贝

         IOUtils.copyBytes(in, out, configuration);

         // 4,关闭流

         IOUtils.closeStream(in);

         IOUtils.closeStream(out);

         fileSystem.close();

     }

 

     /**

      * IOUtils方式下载

      * @throws IOException

      */

     @Test

     public void getFromHdfs() throws IOException {

         // 1,获取输入流

         FSDataInputStream open = fileSystem.open( new Path( "/haddopPut.txt" ));

         // 2,获取输出流

         FileOutputStream out = new FileOutputStream( new File( "C:\\Users\\Administrator\\Desktop\\haddopPut.txt" ));

         // 3,拷贝

         IOUtils.copyBytes(open, out, configuration);

         // 4,关闭流

         IOUtils.closeStream(open);

         IOUtils.closeStream(out);

         fileSystem.close();

     }

}

到此这篇关于java实现对Hadoop的操作的文章就介绍到这了,更多相关Java Hadoop内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/weixin_37581297/article/details/84349916

查看更多关于java实现对Hadoop的操作的详细内容...

  阅读:16次