# 数据流处理

# 1.概述

Ignite提供了一个数据流API,可用于将大量连续的数据流注入Ignite集群,数据流API支持容错和线性扩展,并为注入Ignite的数据提供了至少一次保证,这意味着每个条目至少会被处理一次。

数据通过与缓存关联的数据流处理器流式注入到缓存中。数据流处理器自动缓冲数据并将其分组成批次以提高性能,并将其并行发送到多个节点。

数据流API提供以下功能:

  • 添加到数据流处理器的数据将在节点之间自动分区和分布;
  • 可以以并置方式并发处理数据;
  • 客户端可以在注入数据时对数据执行并发SQL查询。

# 2.数据流处理器

数据流处理器与某个缓存关联,并提供用于将数据注入缓存的接口。

在典型场景中,用户拿到数据流处理器之后,会使用其中某个方法将数据流式注入缓存中,而Ignite根据分区规则对数据条目进行批处理,从而避免不必要的数据移动。

拿到某个缓存的数据流处理器的方法如下:

    在Ignite的Java版本中,数据流处理器是IgniteDataStreamer接口的实现,IgniteDataStreamer提供了一组addData(…​)方法来向缓存中添加键-值对,完整的方法列表,可以参见IgniteDataStreamer的javadoc。

    # 3.覆写已有的数据

    数据流处理器默认不会覆盖已有的数据,通过将allowOverwrite属性配置为true,可以修改该行为。

      提示

      如果allowOverwrite配置为false(默认),更新不会传播到外部存储(如果开启)。

      # 4.处理数据

      如果需要在添加新数据之前执行自定义逻辑,则可以使用数据流接收器。在将数据存储到缓存之前,数据流接收器用于以并置方式处理数据,其中实现的逻辑会在存储数据的节点上执行。

        提示

        注意数据流接收器不会自动将数据注入缓存,需要显式地调用put(…​)方法之一。

        警告

        要在远端节点执行的接收器类定义必须在该节点可用,这可通过2种方式实现:

        # 4.1.StreamTransformer

        StreamTransformerStreamReceiver的简单实现,用于更新流中的数据。数据流转换器利用了并置的特性,并在将要存储数据的节点上更新数据。

        在下面的示例中,使用StreamTransformer为文本流中找到的每个不同单词增加一个计数:

          # 4.2.StreamVisitor

          StreamVisitor也是StreamReceiver的一个方便实现,它会访问流中的每个键-值对,但不会更新缓存。如果键-值对需要存储在缓存内,那么需要显式地调用任意的put(...)方法。

          在下面的示例中,有两个缓存:marketDatainstruments,收到market数据的瞬间就会将它们放入marketData缓存的流处理器,映射到某market数据的集群节点上的marketData的流处理器的StreamVisitor就会被调用,在分别收到market数据后就会用最新的市场价格更新instrument缓存。

          注意,根本不会更新marketData缓存,它一直是空的,只是直接在数据将要存储的集群节点上简单利用了market数据的并置处理能力。

            # 5.配置数据流处理器线程池大小

            数据流处理器线程池专用于处理来自数据流处理器的消息。

            默认池大小为max(8, CPU总核数),使用IgniteConfiguration.setDataStreamerThreadPoolSize(…​)可以改变线程池的大小。

              18624049226

              最后更新时间:: 4/29/2022, 2:39:27 PM