简介
通过 pulsar -flink-connector 读取到 Apache pulsar 中的namespaces、topics的 元数据 信息。
pulsar-flink-connector 的 github: https://github测试数据/streamnative/pulsar-flink
Maven
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
<dependency> <groupId>io.streamnative.connectors</groupId> <artifactId>pulsar-flink-connector- 2.11 - 1.12 </artifactId> <version> 2.7 . 3 </version> </dependency>
<!-- JAR repositories --> <repositories> <repository> <id>central</id> <layout> default </layout> <url>https: //repo1.maven.org/maven2</url> </repository> <repository> <id>bintray-streamnative-maven</id> <name>bintray</name> <url>https: //dl.bintray测试数据/streamnative/maven</url> </repository> </repositories> |
CODE
使用PulsarMetadataReader获取元数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
package com.levi.demo;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar测试数据mon.schema.SchemaInfo; import org.apache.pulsar测试数据mon.schema.SchemaType;
import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map;
/** * Test. * * @author levi * @version 1.0 **/ public class Test {
public static void main(String[] args) { final ClientConfigurationData configurationData = new ClientConfigurationData(); configurationData.setServiceUrl( "pulsar://127.0.0.1:6650" ); //Your Pulsar Token final AuthenticationToken token = new AuthenticationToken( "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx" ); configurationData.setAuthentication(token);
try ( final PulsarMetadataReader reader = new PulsarMetadataReader( "http://127.0.0.1:8443" , configurationData, "" , new HashMap(), - 1 , - 1 )) { //获取namespaces final List<String> namespaces = reader.listNamespaces(); System.out.println( "namespaces: " + namespaces.toString());
for ( final String namespace : namespaces) { //获取Topics final List<String> topics = reader.getTopics(namespace); System.out.println( "topic: " + topics.toString());
for (String topic : topics) { //获取字段SchemaInfo final SchemaInfo schemaInfo = reader.getPulsarSchema(topic); final String name = schemaInfo.getName(); System.out.println( "SchemaName:" + name); //topicName final SchemaType type = schemaInfo.getType(); System.out.println( "SchemaType:" + type.toString()); // "JSON"... final Map<String, String> properties = schemaInfo.getProperties(); System.out.println(properties); final String schemaDefinition = schemaInfo.getSchemaDefinition(); System.out.println(schemaDefinition); // Field info. } }
} catch (IOException | PulsarAdminException e) { e.printStackTrace(); }
}
} |
到此这篇关于 Java 使用pulsar-flink-connector读取pulsar catalog 元数据的文章就介绍到这了,更多相关Java读取pulsar catalog元数据内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
原文链接:https://HdhCmsTestcnblogs测试数据/levi125/p/14500436.html
查看更多关于Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖的详细内容...