# 使用Connect API创建自定义Kafka Connect

如果正在使用Apache Kafka,那么就有机会使用Kafka Connect连接器以进行数据的流入和流出。虽然可用的连接器越来越多,但是这些已有的仍有可能无法满足需求,这时就可以使用Kafka的Connect API创建自定义的连接器。这套API提供了一种简便的方法来创建容错的Kafka生产者或消费者,以将数据流入和流出Kafka。

本文将介绍Kafka Connect框架的基本概念和架构。然后分为四个步骤来逐步介绍Kafka Connect的开发。本文将主要集中在源端连接器上,但是涵盖的许多概念也将适用于接收端连接器,另外也会介绍相关的最佳实践。

# 什么是Kafka Connect?

Kafka Connect专门用于将数据流入/流出Kafka。从高层次上讲,连接器是一项管理任务及其配置的作业。在幕后,Kafka Connect可以创建容错的Kafka生产者和消费者,并跟踪他们已写入或读取的Kafka记录的偏移量。

除此之外,Kafka Connect还提供了许多强大的功能。它们可以轻松配置为将无法处理或无效的消息路由到死信队列,在源端将消息写入Kafka或接收端从Kafka消费消息之前应用单消息转换,并与Confluent模式注册表集成进行自动化的模式注册和管理,并将数据转换为Avro或JSON等类型。通过利用现有的连接器,开发者可以快速创建容错的数据管道,以可靠的方式将数据从外部源流式传输到Kafka主题,或者从Kafka主题流到外部接收端,而这些都只需配置即可无需写代码。

每个连接器实例都可以将其工作分解为多个任务,从而使复制数据的工作并行化并提供可伸缩性。连接器实例启动任务时,它将传递每个任务必须的配置属性,这些配置和该任务的状态以及生产和消费的记录的最新偏移量一起,都会被任务保存在主题外部。由于任务不存储任何状态,因此可以随时停止、启动或重启。新启动的任务仅需从Kafka取得最新的偏移量,然后继续运行即可。

Kafka Connect可以以独立或分布式模式运行。在独立模式下Kafka Connect在单个工作节点上运行,即在一个JVM进程中执行连接器及其任务。在分布式模式下,连接器及其任务在多个工作节点之间达到平衡。一般建议在分布式模式下运行Kafka Connect,因为独立模式没有容错能力。

要以分布式模式启动连接器,需要向Kafka Connect的REST API发送一个POST请求。该请求将触发Kafka Connect在多个节点自动调度连接器和任务的执行。在工作节点出现故障或被添加到组时,Kafka将自动协调以重新平衡这些连接器和任务。

# Kafka Connect入门

Kafka本身不包含连接器,连接器需要单独下载,启动这些连接器也很容易,只需要和配置参数一起,向Kafka Connect的REST API发送一个POST请求即可。

如果要集成的技术还没有Kafka Connect,本文将指导开发者逐步完成Kafka Connect的开发。实际上创建连接器只需实现几个Kafka Connect接口,而Kafka Connect框架负责其余的工作,因此开发者只需专注于实现特定集成的逻辑即可。

# Kafka Connect API

通过实现Kafka Connect API提供的若干接口和抽象类,就可以接入Kafka Connect框架。比如一个基本的源端连接器,将需要提供以下三类扩展:SourceConnectorSourceTaskAbstractConfig。这些共同定义了自定义Kafka Connect的配置和运行时行为,下面还会介绍如何启动并运行这个自定义的Kafka Connect。

# 步骤1:定义配置属性

启动连接器后,它们将处理一系列的属性,包括允许连接器及其任务与外部接收端或源端进行通信的属性、设置并行任务的最大数量、指定数据流对应的Kafka主题、自定义连接器运行所必须的参数等。

首先会将配置值作为String实例传给连接器,例如可以看Connector#start的方法签名:

public abstract class Connector implements Versioned {
[...]
	public abstract void start(Map<String, String> props);
[...]
}

启动时传给连接器之后,可以通过将其传给Kafka Connect API提供的AbstractConfig类实例,解析为更合适的类型。开发连接器的第一步是创建一个继承于AbstractConfig的类,该类可用于定义类型以及每个属性的默认值、验证方式、建议和文档。

假设正在开发用于处理来自云存储的数据流的源端连接器,在启动该连接器所需的配置属性中,可能希望包括用于生成记录的Kafka主题、要导入的对象的键前缀的白名单,下面是这个配置类的示例:

public class CloudStorageSourceConnectorConfig extends AbstractConfig {

    public CloudStorageSourceConnectorConfig(Map originals) {
        super(configDef(), originals);
    }

    protected static ConfigDef configDef() {
        return new ConfigDef()
                .define("bucket",
                        ConfigDef.Type.STRING,
                        ConfigDef.Importance.HIGH,
                        "Name of the bucket to import objects from")
                .define("prefix.whitelist",
                        ConfigDef.Type.LIST,
                        ConfigDef.Importance.HIGH,
                        "Whitelist of object key prefixes")
                .define("topic",
                        ConfigDef.Type.STRING,
                        ConfigDef.Importance.HIGH,
                        "Name of Kafka topic to produce to");
    }
}

注意在该示例中,将prefix.whitelist属性定义为List类型。当将原始值的映射传递给AbstractConfig类时,将根据配置定义将配置属性解析为适当的类型。结果就是以后从连接器的配置实例中获取prefix.whitelist值时,就是List类型了,即使该值最初是以逗号分隔的String形式传给连接器的,例如path/to/file/1,path/to/file/2,path/to/file/3这样的。

至少,每个配置定义都将需要一个配置名、配置值类型、重要性级别、记录配置属性的简短描述以及大多数情况下的默认值。但是还可以利用更高级的功能,例如定义配置组、在启动时调用的验证器、向用户建议配置值的推荐器以及指定配置顺序、对其他配置的依赖等。实际上,最佳实践是尽可能地包括验证器、推荐器、组和默认值,以确保用户在配置错误时能立即获得反馈,并可以轻松理解可用的配置项及其逻辑分组。

完成配置类后,就可以将注意力转向启动连接器,下面是CloudStorageSourceConnector类的start方法的示例实现:

public class CloudStorageSourceConnector extends SourceConnector {

    private CloudStorageSourceConnectorConfig connectorConfig;

    @Override
    public void start(Map<String, String> props) {
        this.connectorConfig = new CloudStorageConnectorConfig(props);
        this.configProps = Collections.unmodifiableMap(props);
    }

   [...]
}

连接器启动时,将创建自定义配置类的新实例,该实例为Kafka Connect框架提供配置定义。如果缺少任何必需的配置或提供的配置类型不正确,验证器将自动触发启动失败并显示相应的错误消息。

# 步骤2:将配置属性传递给任务

下一步是实现Connector#taskConfigs方法,该方法将返回一个映射列表,其中包含每个任务将用于将数据流传输到Kafka或从Kafka流出的配置属性:

public abstract class Connector implements Versioned {
[...]
	public abstract List<Map<String, String>> taskConfigs(int maxTasks);
[...]
}

该方法接受一个要并行运行的最大任务数的int值,其从启动时提供的tasks.max配置属性中提取。

taskConfigs返回的List中的每个映射对应于任务使用的配置属性。根据连接器正在执行的工作类型,虽然所有任务接收相同的配置属性可能有意义,但是也可能希望不同的任务实例获得不同的属性。例如假设要划分对象键前缀的数量,以在多个正在运行的任务实例之间平均传输数据。如果给定具有3个键前缀的白名单,可以为3个任务实例中的每个实例提供一个键前缀以导入对象。然后,每个任务可以专注于具有特定键前缀的对象的流数据,从而将工作拆分为并行任务。

实现taskConfig时要牢记一些注意事项。首先需要提供tasks.max配置属性,以使用户能够限制并行运行的任务数,它提供taskConfig所返回列表的大小上限。其次,返回列表的大小将决定启动多少任务。例如,对于数据库连接器,可能希望每个任务都从单个表中提取数据。如果数据库相对简单并且只有2个表,那么即使传递给taskConfigsmaxTasks值大于2,也可以返回一个大小为2的列表,而如果有6个表但maxTasks值为2,那么将需要每个任务从3个表中提取。

为了帮助执行此分组,Kafka Connect API提供了ConnectorUtils#groupPartitions工具方法,该方法将目标元素列表划分为所需的组数。同样,在这个云存储示例中,可以实现taskConfig获取对象键前缀的白名单,根据maxTasks的值或者前缀白名单的大小对列表进行划分,并返回一个配置列表,每个配置包含不同的对象键前缀,下面是一个示例实现:

@Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List prefixes = connectorConfig.getList(PREFIX_WHITELIST_CONFIG);
        int numGroups = Math.min(prefixes.size(), maxTasks);
        List<List> groupedPrefixes = ConnectorUtils.groupPartitions(prefixes, numGroups);
        List<Map<String, String>> taskConfigs = new ArrayList<>(groupedPrefixes.size());
        
        for (List taskPrefixes : groupedPrefixes) {
            Map<String, String> taskProps = new HashMap<>(configProps);
            taskProps.put(TASK_PREFIXES, String.join(",", taskPrefixes));
            taskConfigs.add(taskProps);
        }

        return taskConfigs;
    }

启动时,Kafka Connect框架会将taskConfigs返回的列表中的每个配置映射传递给每个任务。

连接器还需要实现其他方法,但是这些方法的实现相对简单。Connector#stop使开发者在连接器停止之前有机会关闭可能打开的所有资源。尽管完成所需的操作很简单,但重要