好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖

简介

通过 pulsar -flink-connector 读取到 Apache pulsar 中的namespaces、topics的 元数据 信息。
pulsar-flink-connector 的 github: https://github测试数据/streamnative/pulsar-flink

Maven

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

<dependency>

   <groupId>io.streamnative.connectors</groupId>

   <artifactId>pulsar-flink-connector- 2.11 - 1.12 </artifactId>

   <version> 2.7 . 3 </version>

</dependency>

 

   <!-- JAR repositories -->

   <repositories>

        <repository>

            <id>central</id>

            <layout> default </layout>

            <url>https: //repo1.maven.org/maven2</url>

        </repository>

        <repository>

            <id>bintray-streamnative-maven</id>

            <name>bintray</name>

            <url>https: //dl.bintray测试数据/streamnative/maven</url>

        </repository>

    </repositories>

CODE

使用PulsarMetadataReader获取元数据

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

package com.levi.demo;

 

import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;

import org.apache.pulsar.client.admin.PulsarAdminException;

import org.apache.pulsar.client.impl.auth.AuthenticationToken;

import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

import org.apache.pulsar测试数据mon.schema.SchemaInfo;

import org.apache.pulsar测试数据mon.schema.SchemaType;

 

import java.io.IOException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

/**

  * Test.

  *

  * @author levi

  * @version 1.0

  **/

public class Test {

 

     public static void main(String[] args)  {

         final ClientConfigurationData configurationData = new ClientConfigurationData();

         configurationData.setServiceUrl( "pulsar://127.0.0.1:6650" );

         //Your Pulsar Token

         final AuthenticationToken token =

                 new AuthenticationToken(

                         "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx" );

         configurationData.setAuthentication(token);

 

         try ( final PulsarMetadataReader reader =

                      new PulsarMetadataReader( "http://127.0.0.1:8443" ,

                              configurationData,

                              "" ,

                              new HashMap(),

                              - 1 ,

                              - 1 )) {

             //获取namespaces

             final List<String> namespaces = reader.listNamespaces();

             System.out.println( "namespaces: " + namespaces.toString());

            

             for ( final String namespace : namespaces) {

                 //获取Topics

                 final List<String> topics = reader.getTopics(namespace);

                 System.out.println( "topic: " + topics.toString());

                

                 for (String topic : topics) {

                     //获取字段SchemaInfo

                     final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);

                     final String name = schemaInfo.getName();

                     System.out.println( "SchemaName:" + name); //topicName

                     final SchemaType type = schemaInfo.getType();

                     System.out.println( "SchemaType:" + type.toString()); // "JSON"...

                     final Map<String, String> properties = schemaInfo.getProperties();

                     System.out.println(properties);

                     final String schemaDefinition = schemaInfo.getSchemaDefinition();

                     System.out.println(schemaDefinition); // Field info.

                 }

             }

 

         } catch (IOException | PulsarAdminException e) {

             e.printStackTrace();

         }

 

 

     }

 

 

}

 

到此这篇关于 Java 使用pulsar-flink-connector读取pulsar catalog 元数据的文章就介绍到这了,更多相关Java读取pulsar catalog元数据内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://HdhCmsTestcnblogs测试数据/levi125/p/14500436.html

查看更多关于Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖的详细内容...

  阅读:11次