hetu-core/hetu-docs/zh/connector/kafka.md

19 KiB
Raw Permalink Blame History

Kafka连接器

概述

此连接器允许在openLooKeng中将Apache Kafka主题用作表。每条消息在openLooKeng中显示为一行。

主题可以是实时的:当数据到达时,行将出现,当段被删除时,行将消失。如果在单个查询中多次访问同一个表(例如,执行自联接),这可能会导致奇怪的行为。

说明

Kafka代理最低支持版本为0.10.0。

配置

要配置Kafka连接器创建具有以下内容的目录属性文件etc/catalog/kafka.properties,并适当替换以下属性:

connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port

多Kafka集群

可以根据需要创建任意多的目录因此如果有额外的Kafka集群只需添加另一个不同的名称的属性文件到etc/catalog中(确保它以.properties结尾)。例如,如果将属性文件命名为sales.propertiesopenLooKeng将使用配置的连接器创建一个名为sales的目录。

配置属性

配置属性包括:

属性名称 说明
kafka.table-names 目录提供的所有表列表
kafka.default-schema 表的默认模式名
kafka.nodes Kafka集群节点列表
kafka.connect-timeout 连接Kafka集群超时
kafka.buffer-size Kafka读缓冲区大小
kafka.table-description-dir 包含主题描述文件的目录
kafka.hide-internal-columns 控制内部列是否是表模式的一部分
kerberos.on 是否开启Kerberos认证
user.password.auth.on 是否开启kafka用户密码认证
sasl.jaas.config 认证相关信息
java.security.krb5.conf krb5.conf文件路径
group.id kafka的groupID
security.protocol Kafka的安全认证协议
sasl.mechanism sasl机制
sasl.kerberos.service.name kafka服务运行时的kerberos principal name

kafka.table-names

此目录提供的所有表的逗号分隔列表。表名可以是非限定的(简单名称),并将被放入默认模式(见下文)中,或者用模式名称(<schema-name>.<table-name>)限定。

对于这里定义的每个表都可能存在一个表描述文件见下文。如果没有表描述文件则使用表名作为Kafka的主题名称且数据列不映射到表。该表仍将包含所有内部列见下文

此属性是必需的;没有默认值,并且必须至少定义一个表。

kafka.default-schema

定义将包含没有定义限定模式名称的所有表的模式。

此属性是可选的;默认值为default

kafka.nodes

Kafka数据节点的hostname:port对的逗号分隔列表。

此属性是必需的;没有默认值,并且必须至少定义一个表。

说明

openLooKeng必须仍然能够连接到群集的所有节点即使这里只指定了子集因为段文件可能只位于特定的节点上。

kafka.connect-timeout

连接数据节点超时。繁忙的Kafka集群在接受连接之前可能要花费一些时间当看到由于超时而导致的查询失败时增加该值是一种很好的策略。

此属性是可选的默认值为10秒10s)。

kafka.buffer-size

从Kafka读取数据的内部数据缓冲区大小。数据缓冲区必须至少能够容纳一条消息理想情况下可以容纳多条消息。每个工作节点和数据节点分配一个数据缓冲区。

此属性是可选的;默认值为64kb

kafka.table-description-dir

在openLooKeng部署中引用一个文件夹其中包含一个或多个JSON文件必须以.json结尾),其中包含表描述文件。

此属性是可选的;默认值为etc/kafka

kafka.hide-internal-columns

除了在表描述文件中定义数据列外,连接器还为每个表维护许多附加列。如果这些列是隐藏的,它们仍然可以在查询中使用,但是不会显示在DESCRIBE <table-name>SELECT *中。

此属性是可选的;默认值为true

kerberos.on

是否开启kerberos认证适用于开启了kerberos认证的集群如果在运行presto-kafka中的测试包请置为false因为测试程序使用内嵌Kafka不支持认证且该项与user.password.auth.on仅能选择一个若两者均为trueuser.password.auth.on将覆盖kerberos.on且会异常。

此属性是可选的;默认值为false

user.password.auth.on

是否开启用户密码认证适用于开启了用户密码认证的集群如果在运行presto-kafka中的测试包请置为false因为测试程序使用内嵌Kafka不支持认证且该项与kerberos.on仅能选择一个若两者均为trueuser.password.auth.on将覆盖kerberos.on且会异常。 若使用用户密码认证需要在jvm配置中增加-Djava.ext.dirs=$JAVA_HOME/jre/lib/ext:/Users/mac/apps/lib其中/Users/mac/apps/lib下需要放置kafka-client.jar否则会报出 loginModule not found的相关异常 此属性是可选的;默认值为false

sasl.jaas.config

Kafka的认证相关信息。 对于kerberos认证可能的值为sasl.jaas.config= com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true useTicketCache=true serviceName=kafka keyTab="/Users/mac/Desktop/user01.keytab" principal="user01@EXAMPLE.COM"; 对于用户密码认证可能的值为sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producerpwd";

此属性是可选的;默认值为``。

java.security.krb5.conf

存放krb5.conf文件的路径要注意全局配置中也需要配置此选项例如部署后在jvm.config中配置,而在开发中需要在启动PrestoServer时使用"-D"参数配置。

此属性是可选的;默认值为``。

group.id

kafka的groupId。

此属性是可选的;默认值为``。

security.protocol

Kafka的安全协议。

此属性是可选的;默认值为SASL_PLAINTEXT

sasl.mechanism

sasl机制被用于客户端连接安全的机制。

此属性是可选的;默认值为GSSAPI

sasl.kerberos.service.name

kafka运行时的kerberos principal name。

此属性是可选的;默认值为kafka

内部列

对于每个已定义的表,连接器维护以下列:

列名 类型 说明
_partition_id BIGINT 包含该行的Kafka分区ID。
_partition_offset BIGINT 此行在Kafka分区内的偏移量。
_segment_start BIGINT 包含该行的段(包括该段)中最小的偏移量。这个偏移量是分区特定的。
_segment_end BIGINT 包含该行的段(不包括该段)中最大的偏移量。这个偏移量是分区特定的。这与下一个段(如果存在)的_segment_start的值相同。
_segment_count BIGINT 段内当前行的运行计数。对于未压缩的主题,_segment_start + _segment_count等于_partition_offset
_message_corrupt BOOLEAN 如果解码器无法解码此行的消息则为true。如果为true则应将消息映射的数据列视为无效。
_message VARCHAR 作为UTF-8编码的字符串的消息字节。这只对文本主题有用。
_message_length BIGINT 消息字节数。
_key_corrupt BOOLEAN 如果解码器无法解码此行的键则为true。如果为true则应将该键映射的数据列视为无效。
_key VARCHAR 作为UTF-8编码的字符串的键字节。这只对文本键有用。
_key_length BIGINT 键字节数。

对于没有表定义文件的表,_key_corrupt列和_message_corrupt列将始终为false

表定义文件

Kafka仅以字节消息的形式维护主题并让生产者和消费者定义如何解释消息。对于openLooKeng必须将这些数据映射到列中以便允许对数据进行查询。

说明

对于包含JSON数据的文本主题完全可以使用openLooKeng /functions/json来解析包含映射到UTF-8字符串中的字节的_message而不用任何表定义文件。但是这相当麻烦并且使得编写SQL查询变得很困难。

表定义文件由一个表的JSON定义组成。文件名可以任意但必须以.json结尾。

{
    "tableName": ...,
    "schemaName": ...,
    "topicName": ...,
    "key": {
        "dataFormat": ...,
        "fields": [
            ...
        ]
    },
    "message": {
        "dataFormat": ...,
        "fields": [
            ...
       ]
    }
}
字段 是否必填 类型 说明
tableName 必填 string 该文件定义的openLooKeng表名。
schemaName 可选 string 将包含表的模式。如果省略,则使用默认模式名称。
topicName 必填 string 映射的Kafka主题。
key 可选 JSON对象 映射到消息键的数据列的字段定义。
message 可选 JSON对象 映射到消息本身的数据列的字段定义。

Kafka中的键和消息

从Kafka 0.8版本开始,每个主题中的每条消息都可以有一个可选的键。表定义文件包含键和消息的节,用于将数据映射到表列。

表定义中的key字段和message字段均为必须包含两个字段的JSON对象

字段 是否必填 类型 说明
dataFormat 必填 string 选择该组字段的解码器。
fields 必填 JSON数组 字段定义列表。每个字段定义在openLooKeng表中创建一个新列。

每个字段定义都是一个JSON对象

{
    "name": ...,
    "type": ...,
    "dataFormat": ...,
    "mapping": ...,
    "formatHint": ...,
    "hidden": ...,
    "comment": ...
}
字段 是否必填 类型 说明
name 必填 string openLooKeng表中的列名。
type 必填 string 列的openLooKeng类型。
dataFormat 可选 string 选择该字段的列解码器。默认使用此行数据格式和列类型的默认解码器。
dataSchema 可选 string Avro模式所在的路径或URL。仅用于Avro解码器。
mapping 可选 string 列的映射信息。这是解码器特定的,见下文。
formatHint 可选 string 设置列解码器的列特定格式提示。
hidden 可选 boolean 将列对DESCRIBESELECT *隐藏。默认为false
comment 可选 string 添加列注释,该注释通过DESCRIBE显示。

键或消息的字段说明不受限制。

行解码

对于键和消息,使用解码器将消息和键数据映射到表列。

Kafka连接器包含以下的解码器

  • raw - 不解释Kafka消息将原始消息字节范围映射到表列
  • csv - Kafka消息被解释为逗号分隔消息字段映射到表列
  • json - Kafka消息解释为JSONJSON字段映射到表列
  • avro - Kafka消息按照Avro模式解析Avro字段映射到表列

说明

如果表没有定义文件,则使用dummy解码器,该解码器不暴露任何列。

raw解码器

Raw解码器支持从Kafka消息或键中读取原始基于字节值并将其转换为openLooKeng列。

对于字段,支持如下属性:

  • dataFormat-选择转换数据类型的宽度
  • type - openLooKeng数据类型支持的数据类型列表见下表
  • mapping - <start>[:<end>];要转换的字节的开始和结束位置(可选)

dataFormat属性选择转换的字节数。如果不填,则假定为BYTE。所有值都有符号。

支持的值为:

  • BYTE - 1字节
  • SHORT - 2字节大端序
  • INT - 4字节大端序
  • LONG - 8字节大端序
  • FLOAT - 4字节IEEE 754格式
  • DOUBLE - 8字节IEEE 754格式

type属性定义值映射到的openLooKeng数据类型。

根据分配给列的openLooKeng类型可以使用不同的dataFormat值

openLooKeng数据类型 允许dataFormat
BIGINT BYTESHORTINTLONG
INTEGER BYTESHORTINT
SMALLINT BYTESHORT
TINYINT BYTE
DOUBLE DOUBLEFLOAT
BOOLEAN BYTESHORTINTLONG
VARCHAR / VARCHAR(x) BYTE

mapping属性指定用于解码的键或消息中的字节范围。可以是1个或2个数字中间用冒号隔开<start>[:<end>])。

如果只给出起始位置:

  • 对于固定宽度类型,该列将对指定的dateFormat使用适当字节数(见上文)。
  • VARCHAR值被解码时,从起始位置到消息结尾的所有字节将被使用。

如果给出开始和结束位置,则:

  • 对于固定宽度类型,大小必须等于指定dataFormat所使用的字节数。
  • 对于VARCHAR,起始(包括)和结束(不包括)之间的所有字节都将被使用。

如果未指定mapping属性则等效于将起始位置设置为0而将结束位置设置为未定义。

数值数据类型(BIGINTINTEGERSMALLINTTINYINTDOUBLE)的解码方案非常简单。从输入消息中读取字节序列并根据以下任一条件进行解码:

  • 大端序编码integer类型
  • IEEE 754格式用于DOUBLE)。

dataFormat所隐含的已解码字节序列的长度。对于VARCHAR数据类型字节序列根据UTF-8编码进行解释。

csv解码器

CSV解码器将代表消息或键的字节转换为UTF-8编码的字符串然后将结果解释为CSV逗号分隔值行。

对于字段,必须定义typemapping属性:

  • type - openLooKeng数据类型支持的数据类型列表见下表
  • mapping - CSV记录中字段的索引

dataFormatformatHint不支持必须省略。下表列出了支持的openLooKeng类型可用于type和解码方案:

openLooKeng数据类型 解码规则
BIGINT INTEGER SMALLINT TINYINT 使用Java Long.parseLong()解码
DOUBLE 使用Java Double.parseDouble()解码
BOOLEAN “true”字符序列映射到true;其他字符序列映射到false
VARCHAR / VARCHAR(x) 原样使用

json解码器

JSON解码器根据4627将代表消息或键的字节转换为JSON。请注意消息或键必须转换为JSON对象而不是数组或简单类型。

对于字段,支持如下属性:

  • type - 列的openLooKeng类型。
  • dataFormat - 用于列的字段解码器。
  • mapping - 以斜杠分隔的字段名列表用于从JSON对象中选择字段
  • formatHint - 仅限custom-date-time,详见下文

JSON解码器支持多个字段解码器_default用于标准表列和许多基于日期和时间的类型的解码器。

下表列出了可如type中使用的openLooKeng数据类型和可通过dataFormat属性指定的匹配字段解码器。

openLooKeng数据类型 允许dataFormat
BIGINT INTEGER SMALLINT TINYINT DOUBLE BOOLEAN VARCHAR VARCHAR(x) 默认字段解码器(省略dataFormat属性)
TIMESTAMP TIMESTAMP WITH TIME ZONE TIME TIME WITH TIME ZONE custom-date-timeiso8601rfc2822milliseconds-since-epochseconds-since-epoch
DATE custom-date-timeiso8601rfc2822

默认字段译码器

这是标准的字段解码器支持所有的openLooKeng物理数据类型。通过JSON转换规则字段值将被强制转换为boolean值、long值、double值或string值。对于非基于日期/时间的列,应使用此解码器。

日期和时间解码器

如果需要将JSON对象中的值转换为openLooKeng DATETIMETIME WITH TIME ZONETIMESTAMPTIMESTAMP WITH TIME ZONE列,则需要通过字段定义的dataFormat属性选择特定的解码器。

  • iso8601 - 基于文本将文本字段解析为ISO8601时间戳。

  • rfc2822 - 基于文本,将文本字段解析为2822时间戳。

  • custom-date-time - 基于文本,根据通过formatHint属性指定的Joda格式模式解析一个文本字段。格式模式应符合https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html

  • milliseconds-since-epoch - 基于数字将文本或数字解释为自epoch时间以来的毫秒数。

  • seconds-since-epoch - 基于数字将文本或数字解释为自epoch时间以来的毫秒数。

对于TIMESTAMP WITH TIME ZONETIME WITH TIME ZONE数据类型如果解码值中存在时区信息则在openLooKeng值中使用时区。否则结果时区将被设置为UTC

avro解码器

Avro解码器根据模式转换表示Avro格式的消息或键的字节。消息必须嵌入Avro模式。openLooKeng不支持无模式Avro解码。

对于键/消息,使用avro解码器时,必须定义dataSchema。这应该指向需要解码的消息的有效Avro模式文件的位置。此位置可以是远程Web服务器例如dataSchema: 'http://example.org/schema/avro_data.avsc')或本地文件系统(例如:dataSchema: '/usr/local/schema/avro_data.avsc'。如果无法从openLooKeng协调节点访问此位置解码器将失败。

对于字段,支持如下属性:

  • name - openLooKeng表中的列名。
  • type - 列的openLooKeng类型。
  • mapping - 以斜杠分隔的字段名列表用于从Avro模式中选择字段如果mapping中指定的字段在原始Avro模式中不存在则读取操作将返回NULL。

下表列出了支持的openLooKeng类型可在type中用于等价的Avro字段类型。

openLooKeng数据类型 允许的Avro数据类型
BIGINT INTLONG
DOUBLE DOUBLEFLOAT
BOOLEAN BOOLEAN
VARCHAR / VARCHAR(x) STRING
VARBINARY FIXEDBYTES
ARRAY ARRAY
MAP MAP

Avro模式演进

Avro解码器支持向后兼容的模式演进特性。通过向后兼容性就可以使用较新的模式读取用较旧的模式创建的Avro数据。Avro模式中的任何更改也必须反映在openLooKeng的主题定义文件中。Avro模式中新增/重命名的字段必须有默认值。

Schema的演进行为如下

  • 新模式中增加的列:当表使用新模式时,使用旧模式创建的数据将产生默认值。
  • 新模式中移除的列:使用旧模式创建的数据将不再输出已移除列的数据。
  • 列在新的模式中被重命名:这等价于移除列并添加新列,当表使用新模式时,使用旧模式创建的数据将产生默认值。
  • 更改新模式中的列类型如果Avro支持该类型强制那么就会发生转换。不兼容的类型将引发错误。