今天教大家借助一款框架快速实现一个数据库,这个框架就是Calcite,下面会带大家通过两个例子快速教会大家怎么实现,一个是可以通过 SQL 语句的方式可以直接查询文件内容,第二个是模拟 Mysql 查询功能,以及最后告诉大家怎么实现 SQL 查询 Kafka 数据。
Calcite
Calcite 是一个用于优化异构数据源的查询处理的可插拔基础框架(他是一个框架),可以将任意数据(Any data, Anywhere)DML 转换成基于 SQL 的 DML 引擎,并且我们可以选择性的使用它的部分功能。
Calcite能干什么
使用 SQL 访问内存中某个数据 使用 SQL 访问某个文件的数据 跨数据源的数据访问、聚合、排序等(例如 Mysql 和 Redis 数据源中的数据进行join)当我们需要自建一个数据库的时候,数据可以为任何格式的,比如text、word、xml、mysql、es、csv、第三方接口数据等等,我们只有数据,我们想让这些数据支持 SQL 形式动态增删改查。
另外,像Hive、Drill、Flink、Phoenix 和 Storm 等项目中,数据处理系统都是使用 Calcite 来做 SQL 解析和查询优化,当然,还有部分用来构建自己的 JDBC driver。
名词解释
Token就是将标准 SQL(可以理解为Mysql)关键词以及关键词之间的字符串截取出来,每一个token,会被封装为一个SqlNode,SqlNode会衍生很多子类,比如Select会被封装为SqlSelect,当前 SqlNode 也能反解析为 SQL 文本。
RelDataTypeField某个字段的名称和类型信息
RelDataType多个 RelDataTypeField 组成了 RelDataType,可以理解为数据行
Table一个完整的表的信息
Schema所有元数据的组合,可以理解为一组 Table 或者库的概念
开始使用
1. 引入包
< dependency > < groupId > org .apache .calcite groupId > < artifactId > calcite - core artifactId > 目前最新版本 2022 - 09 - 10 日更新 --> < version > 1.32 .0 version > dependency >
2. 创建model.json文件和表结构csv
model.json 里面主要描述或者说告诉 Calcite 如何创建 Schema,也就是告诉框架怎么创建出库。
{ "version" : "1.0" , "defaultSchema" : "CSV" , "schemas" : [ { "name" : "CSV" , "type" : "custom" , "factory" : "csv.CsvSchemaFactory" , "operand" : { "directory" : "csv" } } ] }
接下来还需要定义一个 csv 文件,用来定义表结构。
NAME : string , MONEY : string aixiaoxian , 10000 万 xiaobai , 10000 万 adong , 10000 万 maomao , 10000 万 xixi , 10000 万 zizi , 10000 万 wuwu , 10000 万 kuku , 10000 万
整个项目的结构大概就是这样:
3. 实现Schema的工厂类
在上述文件中指定的包路径下去编写 CsvSchemaFactory 类,实现 SchemaFactory 接口,并且实现里面唯一的方法 create 方法,创建Schema(库)。
public class CsvSchemaFactory implements SchemaFactory { @Override
public Schema create ( SchemaPlus parentSchema , String name , Map < String , Object > operand ) { final String directory = ( String ) operand .get ( "directory" ) ; File directoryFile = new File ( directory ) ; return new CsvSchema ( directoryFile , "scannable" ) ; } }
4. 自定义Schma类
有了 SchemaFactory,接下来需要自定义 Schema 类。
自定义的 Schema 需要实现 Schema 接口,但是直接实现要实现的方法太多,我们去实现官方的 AbstractSchema 类,这样就只需要实现一个方法就行(如果有其他定制化需求可以实现原生接口)。
核心的逻辑就是createTableMap方法,用于创建出 Table 表。
他会扫描指定的Resource下面的所有 csv 文件,将每个文件映射成Table对象,最终以map形式返回,Schema接口的其他几个方法会用到这个对象。
@Override
protected Map < String , Table > getTableMap ( ) { if ( tableMap == null ) { tableMap = createTableMap ( ) ; } return tableMap ; } private Map < String , Table > createTableMap ( ) { final Source baseSource = Sources .of ( directoryFile ) ; File [ ] files = directoryFile .listFiles ( ( dir , name ) -> { final String nameSansGz = trim ( name , ".gz" ) ; return nameSansGz .endsWith ( ".csv" ) ; } ) ; if ( files == null ) { System .out .println ( "directory " + directoryFile + " not found" ) ; files = new File [ 0 ] ; } final ImmutableMap .Builder < String , Table > builder = ImmutableMap .builder ( ) ; for ( File file : files ) { Source source = Sources .of ( file ) ; final Source sourceSansCsv = source .trimOrNull ( ".csv" ) ; if ( sourceSansCsv != null ) { final Table table = createTable ( source ) ; builder .put ( sourceSansCsv .relative ( baseSource ) .path ( ) , table ) ; } } return builder .build ( ) ; }
5. 自定义 Table
Schema 有了,并且数据文件 csv 也映射成 Table 了,一个 csv 文件对应一个 Table。
接下来我们去自定义 Table,自定义 Table 的核心是我们要定义字段的类型和名称,以及如何读取 csv文件。
先获取数据类型和名称,即单表结构,从csv文件头中获取(当前文件头需要我们自己定义,包括规则我们也可以定制化)。
public abstract class CsvTable extends AbstractTable { protected final Source source ; protected final @Nullable RelProtoDataType protoRowType ; private @Nullable RelDataType rowType ; private @Nullable List < RelDataType > fieldTypes ; CsvTable ( Source source , @Nullable RelProtoDataType protoRowType ) { this .source = source ; this .protoRowType = protoRowType ; } @Override
public RelDataType getRowType ( RelDataTypeFactory typeFactory ) { if ( protoRowType != null ) { return protoRowType .apply ( typeFactory ) ; } if ( rowType == null ) { rowType = CsvEnumerator .deduceRowType ( ( JavaTypeFactory ) typeFactory , source , null ) ; } return rowType ; } public List < RelDataType > getFieldTypes ( RelDataTypeFactory typeFactory ) { if ( fieldTypes == null ) { fieldTypes = new ArrayList <> ( ) ; CsvEnumerator .deduceRowType ( ( JavaTypeFactory ) typeFactory , source , fieldTypes ) ; } return fieldTypes ; } public static RelDataType deduceRowType ( JavaTypeFactory typeFactory , Source source , @Nullable List < RelDataType > fieldTypes ) { final List < RelDataType > types = new ArrayList <> ( ) ; final List < String > names = new ArrayList <> ( ) ; try ( CSVReader reader = openCsv ( source ) ) { String [ ] strings = reader .readNext ( ) ; if ( strings == null ) { strings = new String [ ] { "EmptyFileHasNoColumns:boolean" } ; } for ( String string : strings ) { final String name ; final RelDataType fieldType ; final int colon = string .indexOf ( ':' ) ; if ( colon >= 0 ) { name = string .substring ( 0 , colon ) ; String typeString = string .substring ( colon + 1 ) ; Matcher decimalMatcher = DECIMAL_TYPE_PATTERN .matcher ( typeString ) ; if ( decimalMatcher .matches ( ) ) { int precision = Integer .parseInt ( decimalMatcher .group ( 1 ) ) ; int scale = Integer .parseInt ( decimalMatcher .group ( 2 ) ) ; fieldType = parseDecimalSqlType ( typeFactory , precision , scale ) ; } else { switch ( typeString ) { case "string" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .VARCHAR ) ; break ; case "boolean" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .BOOLEAN ) ; break ; case "byte" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .TINYINT ) ; break ; case "char" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .CHAR ) ; break ; case "short" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .SMALLINT ) ; break ; case "int" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .INTEGER ) ; break ; case "long" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .BIGINT ) ; break ; case "float" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .REAL ) ; break ; case "double" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .DOUBLE ) ; break ; case "date" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .DATE ) ; break ; case "timestamp" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .TIMESTAMP ) ; break ; case "time" : fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .TIME ) ; break ; default : LOGGER .warn ( "Found unknown type: {} in file: {} for column: {}. Will assume the type of " + "column is string." , typeString , source .path ( ) , name ) ; fieldType = toNullableRelDataType ( typeFactory , SqlTypeName .VARCHAR ) ; break ; } } } else { name = string ; fieldType = typeFactory .createSqlType ( SqlTypeName .VARCHAR ) ; } names .add ( name ) ; types .add ( fieldType ) ; if ( fieldTypes != null ) { fieldTypes .add ( fieldType ) ; } } } catch ( IOException e ) { } if ( names .isEmpty ( ) ) { names .add ( "line" ) ; types .add ( typeFactory .createSqlType ( SqlTypeName .VARCHAR ) ) ; } return typeFactory .createStructType ( Pair .zip ( names , types ) ) ; } }
获取文件中的数据,上面把Table的表结构字段名称和类型都获取到了以后,就剩最后一步了,获取文件中的数据。我们需要自定义一个类,实现 ScannableTable 接口,并且实现里面唯一的方法 scan 方法,其实本质上就是读文件,然后把文件的每一行的数据和上述获取的 fileType 进行匹配。
@Override
public Enumerable < Object [ ] > scan ( DataContext root ) { JavaTypeFactory typeFactory = root .getTypeFactory ( ) ; final List < RelDataType > fieldTypes = getFieldTypes ( typeFactory ) ; final List < Integer > fields = ImmutableIntList .identity ( fieldTypes .size ( ) ) ; final AtomicBoolean cancelFlag = DataContext .Variable .CANCEL_FLAG .get ( root ) ; return new AbstractEnumerable < @Nullable Object [ ] > ( ) { @Override
public Enumerator < @Nullable Object [ ] > enumerator ( ) { return new CsvEnumerator <> ( source , cancelFlag , false , null , CsvEnumerator .arrayConverter ( fieldTypes , fields , false ) ) ; } } ; } public CsvEnumerator ( Source source , AtomicBoolean cancelFlag , boolean stream , @Nullable String @Nullable [ ] filterValues , RowConverter < E > rowConverter ) { this .cancelFlag = cancelFlag ; this .rowConverter = rowConverter ; this .filterValues = filterValues == null ? null : ImmutableNullableList .copyOf ( filterValues ) ; try { this .reader = openCsv ( source ) ; this .reader .readNext ( ) ; } catch ( IOException e ) { throw new RuntimeException ( e ) ; } } @Override
public E current ( ) { return castNonNull ( current ) ; } @Override
public boolean moveNext ( ) { try { outer : for ( ; ; ) { if ( cancelFlag .get ( ) ) { return false ; } final String [ ] strings = reader .readNext ( ) ; if ( strings == null ) { current = null ; reader .close ( ) ; return false ; } if ( filterValues != null ) { for ( int i = 0 ; i < strings .length ; i ++ ) { String filterValue = filterValues .get ( i ) ; if ( filterValue != null ) { if ( ! filterValue .equals ( strings [ i ] ) ) { continue outer ; } } } } current = rowConverter .convertRow ( strings ) ; return true ; } } catch ( IOException e ) { throw new RuntimeException ( e ) ; } } protected @Nullable Object convert ( @Nullable RelDataType fieldType , @Nullable String string ) { if ( fieldType == null || string == null ) { return string ; } switch ( fieldType .getSqlTypeName ( ) ) { case BOOLEAN : if ( string .length ( ) == 0 ) { return null ; } return Boolean .parseBoolean ( string ) ; case TINYINT : if ( string .length ( ) == 0 ) { return null ; } return Byte .parseByte ( string ) ; case SMALLINT : if ( string .length ( ) == 0 ) { return null ; } return Short .parseShort ( string ) ; case INTEGER : if ( string .length ( ) == 0 ) { return null ; } return Integer .parseInt ( string ) ; case BIGINT : if ( string .length ( ) == 0 ) { return null ; } return Long .parseLong ( string ) ; case FLOAT : if ( string .length ( ) == 0 ) { return null ; } return Float .parseFloat ( string ) ; case DOUBLE : if ( string .length ( ) == 0 ) { return null ; } return Double .parseDouble ( string ) ; case DECIMAL : if ( string .length ( ) == 0 ) { return null ; } return parseDecimal ( fieldType .getPrecision ( ) , fieldType .getScale ( ) , string ) ; case DATE : if ( string .length ( ) == 0 ) { return null ; } try { Date date = TIME_FORMAT_DATE .parse ( string ) ; return ( int ) ( date .getTime ( ) / DateTimeUtils .MILLIS_PER_DAY ) ; } catch ( ParseException e ) { return null ; } case TIME : if ( string .length ( ) == 0 ) { return null ; } try { Date date = TIME_FORMAT_TIME .parse ( string ) ; return ( int ) date .getTime ( ) ; } catch ( ParseException e ) { return null ; } case TIMESTAMP : if ( string .length ( ) == 0 ) { return null ; } try { Date date = TIME_FORMAT_TIMESTAMP .parse ( string ) ; return date .getTime ( ) ; } catch ( ParseException e ) { return null ; } case VARCHAR : default : return string ; } }
6. 最后
至此我们需要准备的东西:库、表名称、字段名称、字段类型都有了,接下来我们去写我们的 SQL 语句查询我们的数据文件。
创建好几个测试的数据文件,例如上面项目结构中我创建 2 个 csv 文件USERINFO.csv、ASSET.csv,然后创建测试类。
这样跑起来,就可以通过 SQL 语句的方式直接查询数据了。
public class Test { public static void main ( String [ ] args ) throws SQLException { Connection connection = null ; Statement statement = null ; try { Properties info = new Properties ( ) ; info .put ( "model" , Sources .of ( Test .class .getResource ( "/model.json" ) ) .file ( ) .getAbsolutePath ( ) ) ; connection = DriverManager .getConnection ( "jdbc:calcite:" , info ) ; statement = connection .createStatement ( ) ; print ( statement .executeQuery ( "select * from asset " ) ) ; print ( statement .executeQuery ( " select * from userinfo " ) ) ; print ( statement .executeQuery ( " select age from userinfo where name ='aixiaoxian' " ) ) ; print ( statement .executeQuery ( " select * from userinfo where age >60 " ) ) ; print ( statement .executeQuery ( " select * from userinfo where name like 'a%' " ) ) ; } finally { connection .close ( ) ; } } private static void print ( ResultSet resultSet ) throws SQLException { final ResultSetMetaData metaData = resultSet .getMetaData ( ) ; final int columnCount = metaData .getColumnCount ( ) ; while ( resultSet .next ( ) ) { for ( int i = 1 ; ; i ++ ) { System .out .print ( resultSet .getString ( i ) ) ; if ( i < columnCount ) { System .out .print ( ", " ) ; } else { System .out .println ( ) ; break ; } } } } }
查询结果:
这里在测试的时候踩到2个坑,大家如果自己实验的时候可以避免下。
Calcite 默认会把你的 SQL 语句中的表名和类名全部转换为大写,因为默认的 csv(其他文件也一样)文件的名称就是表名,除非你自定义规则,所以你的文件名要写成大写。
Calcite 有一些默认的关键字不能用作表名,不然会查询失败,比如我刚开始定的user.csv就一直查不出来,改成USERINFO就可以了,这点和Mysql 的内置关键字差不多,也可以通过个性化配置去改。
演示Mysql
首先,还是先准备Calcite 需要的东西:库、表名称、字段名称、字段类型。
如果数据源使用Mysql的话,这些都不用我们去 JAVA 服务中去定义,直接在 Mysql 客户端创建好,这里直接创建两张表用于测试,就和我们的csv文件一样。
CREATE TABLE `USERINFO1` ( `NAME` varchar ( 255 ) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL , `AGE` int DEFAULT NULL ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb3 ; CREATE TABLE `ASSET` ( `NAME` varchar ( 255 ) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL , `MONEY` varchar ( 255 ) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb3 ;
上述csv 案例中的 SchemaFactory 以及 Schema 这些都不需要创建,因为 Calcite 默认提供了 Mysql 的 Adapter适配器。
其实,上述两步都不需要做,我们真正要做的是,告诉Calcite 你的 JDBC 的连接信息就行了,也是在 model.json 文件中定义。
{ "version" : "1.0" , "defaultSchema" : "Demo" , "schemas" : [ { "name" : "Demo" , "type" : "custom" , "factory" : "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory" , "operand" : { "jdbcDriver" : "com.mysql.cj.jdbc.Driver" , "jdbcUrl" : "jdbc:mysql://localhost:3306/irving" , "jdbcUser" : "root" , "jdbcPassword" : "123456" } } ] }
在项目中引入 Mysql 的驱动包
< dependency > < groupId > mysql groupId > < artifactId > mysql - connector - java artifactId > < version > 8.0 .30 version > dependency >
写好测试类,这样直接就相当于完成了所有的功能了。
public class TestMysql { public static void main ( String [ ] args ) throws SQLException { Connection connection = null ; Statement statement = null ; try { Properties info = new Properties ( ) ; info .put ( "model" , Sources .of ( TestMysql .class .getResource ( "/mysqlmodel.json" ) ) .file ( ) .getAbsolutePath ( ) ) ; connection = DriverManager .getConnection ( "jdbc:calcite:" , info ) ; statement = connection .createStatement ( ) ; statement .executeUpdate ( " insert into userinfo1 values ('xxx',12) " ) ; print ( statement .executeQuery ( "select * from asset " ) ) ; print ( statement .executeQuery ( " select * from userinfo1 " ) ) ; print ( statement .executeQuery ( " select age from userinfo1 where name ='aixiaoxian' " ) ) ; print ( statement .executeQuery ( " select * from userinfo1 where age >60 " ) ) ; print ( statement .executeQuery ( " select * from userinfo1 where name like 'a%' " ) ) ; } finally { connection .close ( ) ; } } private static void print ( ResultSet resultSet ) throws SQLException { final ResultSetMetaData metaData = resultSet .getMetaData ( ) ; final int columnCount = metaData .getColumnCount ( ) ; while ( resultSet .next ( ) ) { for ( int i = 1 ; ; i ++ ) { System .out .print ( resultSet .getString ( i ) ) ; if ( i < columnCount ) { System .out .print ( ", " ) ; } else { System .out .println ( ) ; break ; } } } } }
查询结果:
Mysql实现原理
上述我们在 model.json 文件中指定了org.apache.calcite.adapter.jdbc.JdbcSchema$Factory类,可以看下这个类的代码。
这个类是把 Factory 和 Schema 写在了一起,其实就是调用schemafactory类的create方法创建一个 schema 出来,和我们上面自定义的流程是一样的。
其中JdbcSchema类也是 Schema 的子类,所以也会实现getTable方法(这个我们上述也实现了,我们当时是获取表结构和表的字段类型以及名称,是从csv文件头中读文件的),JdbcSchema的实现是通过连接 Mysql 服务端查询元数据信息,再将这些信息封装成 Calcite需要的对象格式。
这里同样要注意 csv方式的2个注意点,大小写和关键字问题。
public static JdbcSchema create ( SchemaPlus parentSchema , String name , Map < String , Object > operand ) { DataSource dataSource ; try { final String dataSourceName = ( String ) operand .get ( "dataSource" ) ; if ( dataSourceName != null ) { dataSource = AvaticaUtils .instantiatePlugin ( DataSource .class , dataSourceName ) ; } else { final String jdbcUrl = ( String ) requireNonNull ( operand .get ( "jdbcUrl" ) , "jdbcUrl" ) ; final String jdbcDriver = ( String ) operand .get ( "jdbcDriver" ) ; final String jdbcUser = ( String ) operand .get ( "jdbcUser" ) ; final String jdbcPassword = ( String ) operand .get ( "jdbcPassword" ) ; dataSource = dataSource ( jdbcUrl , jdbcDriver , jdbcUser , jdbcPassword ) ; } } catch ( Exception e ) { throw new RuntimeException ( "Error while reading dataSource" , e ) ; } String jdbcCatalog = ( String ) operand .get ( "jdbcCatalog" ) ; String jdbcSchema = ( String ) operand .get ( "jdbcSchema" ) ; String sqlDialectFactory = ( String ) operand .get ( "sqlDialectFactory" ) ; if ( sqlDialectFactory == null || sqlDialectFactory .isEmpty ( ) ) { return JdbcSchema .create ( parentSchema , name , dataSource , jdbcCatalog , jdbcSchema ) ; } else { SqlDialectFactory factory = AvaticaUtils .instantiatePlugin ( SqlDialectFactory .class , sqlDialectFactory ) ; return JdbcSchema .create ( parentSchema , name , dataSource , factory , jdbcCatalog , jdbcSchema ) ; } } @Override public @Nullable Table getTable ( String name ) { return getTableMap ( false ) .get ( name ) ; } private synchronized ImmutableMap < String , JdbcTable > getTableMap ( boolean force ) { if ( force || tableMap == null ) { tableMap = computeTables ( ) ; } return tableMap ; } private ImmutableMap < String , JdbcTable > computeTables ( ) { Connection connection = null ; ResultSet resultSet = null ; try { connection = dataSource .getConnection ( ) ; final Pair < @Nullable String , @Nullable String > catalogSchema = getCatalogSchema ( connection ) ; final String catalog = catalogSchema .left ; final String schema = catalogSchema .right ; final Iterable < MetaImpl .MetaTable > tableDefs ; Foo threadMetadata = THREAD_METADATA .get ( ) ; if ( threadMetadata != null ) { tableDefs = threadMetadata .apply ( catalog , schema ) ; } else { final List < MetaImpl .MetaTable > tableDefList = new ArrayList <> ( ) ; final DatabaseMetaData metaData = connection .getMetaData ( ) ; resultSet = metaData .getTables ( catalog , schema , null , null ) ; while ( resultSet .next ( ) ) { final String catalogName = resultSet .getString ( 1 ) ; final String schemaName = resultSet .getString ( 2 ) ; final String tableName = resultSet .getString ( 3 ) ; final String tableTypeName = resultSet .getString ( 4 ) ; tableDefList .add ( new MetaImpl .MetaTable ( catalogName , schemaName , tableName , tableTypeName ) ) ; } tableDefs = tableDefList ; } final ImmutableMap .Builder < String , JdbcTable > builder = ImmutableMap .builder ( ) ; for ( MetaImpl .MetaTable tableDef : tableDefs ) { final String tableTypeName2 = tableDef .tableType == null ? null : tableDef .tableType .toUpperCase ( Locale .ROOT ) .replace ( ' ' , '_' ) ; final TableType tableType = Util .enumVal ( TableType .OTHER , tableTypeName2 ) ; if ( tableType == TableType .OTHER && tableTypeName2 != null ) { System .out .println ( "Unknown table type: " + tableTypeName2 ) ; } final JdbcTable table = new JdbcTable ( this , tableDef .tableCat , tableDef .tableSchem , tableDef .tableName , tableType ) ; builder .put ( tableDef .tableName , table ) ; } return builder .build ( ) ; } catch ( SQLException e ) { throw new RuntimeException ( "Exception while reading tables" , e ) ; } finally { close ( connection , null , resultSet ) ; } }
SQL执行流程
OK,到这里基本上两个简单的案例已经演示好了,最后补充一下整个Calcite架构和整个 SQL 的执行流程。
整个流程如下:SQL解析(Parser)=> SQL校验(Validator)=> SQL查询优化(optimizer)=> SQL生成 => SQL执行
SQL Parser
所有的 SQL 语句在执行前都需要经历 SQL 解析器解析,解析器的工作内容就是将 SQL 中的 Token 解析成抽象语法树,每个树的节点都是一个 SqlNode,这个过程其实就是 Sql Text => SqlNode 的过程。
我们前面的 Demo 没有自定义 Parser,是因为 Calcite 采用了自己默认的 Parser(SqlParserImpl)。
SqlNode
SqlNode是整个解析中的核心,比如图中你可以发现,对于每个比如select、from、where关键字之后的内容其实都是一个SqlNode。
parserConfig方法主要是设置 SqlParserFactory 的参数,比如我们上面所说的我本地测试的时候踩的大小写的坑,就可以在这里设置。
直接调用setCaseSensitive=false即不会将 SQL 语句中的表名列名转为大写,下面是默认的,其他的参数可以按需配置。
SQL Validator
SQL 语句先经过 Parser,然后经过语法验证器,注意 Parser 并不会验证语法的正确性。
其实 Parser 只会验证 SQL 关键词的位置是否正确,我们上述2个 Parser 的例子中都没有创建 schema 和 table 这些,但是如果这样写,那就会报错,这个错误就是 parser 检测后抛出来的(ParseLocationErrorTest)。
真正的校验在 validator 中,会去验证查询的表名是否存在,查询的字段是否存在,类型是否匹配,这个过程比较复杂,默认的 validator 是SqlValidatorImpl。
查询优化比如关系代数,比如什么投影、笛卡尔积这些,Calcite提供了很多内部的优化器,也可以实现自己的优化器。
适配器Calcite 是不包含存储层的,所以提供一种适配器的机制来访问外部的数据存储或者存储引擎。
最后,进阶官网里面写了未来会支持Kafka适配器到公共Api中,到时候使用起来就和上述集成Mysql一样方便,但是现在还没有支持,我这里给大家提供个自己实现的方式,这样就可以通过 SQL 的方式直接查询 Kafka 中的 Topic 数据等信息。
这里我们内部集成实现了KSQL的能力,查询结果是OK的。
还是像上述步骤一样,我们需要准备库、表名称、字段名称、字段类型、数据源(多出来的地方)。
自定义Sql解析,之前我们都没有自定义解析,这里需要自定义解析,是因为我需要动态解析sql的where条件里面的partation。
配置解析器,就是之前案例中提到的配置大小写之类的 创建解析器,使用的默认SqlParseImpl 开始解析,生成AST,我们可以基于生成的SqlNode做一些业务相关的校验和参数解析适配器获取数据源
public class KafkaConsumerAdapter { public static List < KafkaResult > executor ( KafkaSqlInfo kafkaSql ) { Properties props = new Properties ( ) ; props .put ( CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG , kafkaSql .getSeeds ( ) ) ; props .put ( ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getCanonicalName ( ) ) ; props .put ( ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getCanonicalName ( ) ) ; props .put ( ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" ) ; KafkaConsumer < String , String > consumer = new KafkaConsumer <> ( props ) ; List < TopicPartition > topics = new ArrayList <> ( ) ; for ( Integer partition : kafkaSql .getPartition ( ) ) { TopicPartition tp = new TopicPartition ( kafkaSql .getTableName ( ) , partition ) ; topics .add ( tp ) ; } consumer .assign ( topics ) ; for ( TopicPartition tp : topics ) { Map < TopicPartition , Long > offsets = consumer .endOffsets ( Collections .singleton ( tp ) ) ; long position = 500 ; if ( offsets .get ( tp ) .longValue ( ) > position ) { consumer .seek ( tp , offsets .get ( tp ) .longValue ( ) - 500 ) ; } else { consumer .seek ( tp , 0 ) ; } } List < KafkaResult > results = new ArrayList <> ( ) ; boolean flag = true ; while ( flag ) { ConsumerRecords < String , String > records = consumer .poll ( Duration .ofMillis ( 100 ) ) ; for ( ConsumerRecord < String , String > record : records ) { KafkaResult result = new KafkaResult ( ) ; result .setPartition ( record .partition ( ) ) ; result .setOffset ( record .offset ( ) ) ; result .setMsg ( record .value ( ) ) ; result .setKey ( record .key ( ) ) ; results .add ( result ) ; } if ( ! records .isEmpty ( ) ) { flag = false ; } } consumer .close ( ) ; return results ; } }
执行查询,就可以得到我们想要的效果了。
public class TestKafka { public static void main ( String [ ] args ) throws Exception { KafkaService kafkaService = new KafkaService ( ) ; KafkaSqlInfo sqlInfo = kafkaService .parseSql ( "select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 " ) ; List < KafkaResult > results = KafkaConsumerAdapter .executor ( sqlInfo ) ; query ( sqlInfo .getTableName ( ) , results , sqlInfo .getSql ( ) ) ; sqlInfo = kafkaService .parseSql ( "select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%' limit 1000 " ) ; results = KafkaConsumerAdapter .executor ( sqlInfo ) ; query ( sqlInfo .getTableName ( ) , results , sqlInfo .getSql ( ) ) ; sqlInfo = kafkaService .parseSql ( "select count(*) AS addad from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 " ) ; results = KafkaConsumerAdapter .executor ( sqlInfo ) ; query ( sqlInfo .getTableName ( ) , results , sqlInfo .getSql ( ) ) ; } private static void query ( String tableName , List < KafkaResult > results , String sql ) throws Exception { String model = createTempJson ( ) ; KafkaTableSchema .generateSchema ( tableName , results ) ; Properties info = new Properties ( ) ; info .setProperty ( "lex" , Lex .JAVA .toString ( ) ) ; Connection connection = DriverManager .getConnection ( Driver .CONNECT_STRING_PREFIX + "model=inline:" + model , info ) ; Statement st = connection .createStatement ( ) ; ResultSet result = st .executeQuery ( sql ) ; ResultSetMetaData rsmd = result .getMetaData ( ) ; List < Map < String , Object >> ret = new ArrayList <> ( ) ; while ( result .next ( ) ) { Map < String , Object > map = new LinkedHashMap <> ( ) ; for ( int i = 1 ; i <= rsmd .getColumnCount ( ) ; i ++ ) { map .put ( rsmd .getColumnName ( i ) , result .getString ( rsmd .getColumnName ( i ) ) ) ; } ret .add ( map ) ; } result .close ( ) ; st .close ( ) ; connection .close ( ) ; } private static void print ( ResultSet resultSet ) throws SQLException { final ResultSetMetaData metaData = resultSet .getMetaData ( ) ; final int columnCount = metaData .getColumnCount ( ) ; while ( resultSet .next ( ) ) { for ( int i = 1 ; ; i ++ ) { System .out .print ( resultSet .getString ( i ) ) ; if ( i < columnCount ) { System .out .print ( ", " ) ; } else { System .out .println ( ) ; break ; } } } } private static String createTempJson ( ) throws IOException { JSONObject object = new JSONObject ( ) ; object .put ( "version" , "1.0" ) ; object .put ( "defaultSchema" , "QAKAFKA" ) ; JSONArray array = new JSONArray ( ) ; JSONObject tmp = new JSONObject ( ) ; tmp .put ( "name" , "QAKAFKA" ) ; tmp .put ( "type" , "custom" ) ; tmp .put ( "factory" , "kafka.KafkaSchemaFactory" ) ; array .add ( tmp ) ; object .put ( "schemas" , array ) ; return object .toJSONString ( ) ; } }
生成临时的model.json,之前是基于文件,现在基于text字符串,mode=inline模式
设置我的表结构、表名称、字段名、字段类型等,并放置在内存中,同时将适配器查询出来的数据也放进去table里面
获取连接,执行查询,完美!
原文地址:https://mp.weixin.qq测试数据/s/Ppr_9DzbQAYe3cXu8K3vcQ