# 数据流处理
# 1.概述
Ignite提供了一个数据流API,可用于将大量连续的数据流注入Ignite集群,数据流API支持容错和线性扩展,并为注入Ignite的数据提供了至少一次保证,这意味着每个条目至少会被处理一次。
数据通过与缓存关联的数据流处理器流式注入到缓存中。数据流处理器自动缓冲数据并将其分组成批次以提高性能,并将其并行发送到多个节点。
数据流API提供以下功能:
- 添加到数据流处理器的数据将在节点之间自动分区和分布;
- 可以以并置方式并发处理数据;
- 客户端可以在注入数据时对数据执行并发SQL查询。
# 2.数据流处理器
数据流处理器与特定的缓存关联,并提供用于将数据注入缓存的接口。
在典型场景中,用户拿到数据流处理器之后,会使用其中某个方法将数据流式注入缓存中,而Ignite根据分区规则对数据条目进行批处理,从而避免不必要的数据移动。
拿到特定缓存的数据流处理器的方法如下:
在Ignite的Java版本中,数据流处理器是IgniteDataStreamer
接口的实现,IgniteDataStreamer
提供了一组addData(…)
方法来向缓存中添加键-值对,完整的方法列表,可以参见IgniteDataStreamer的javadoc。
# 3.覆写已有的数据
数据流处理器默认不会覆盖已有的数据,通过将allowOverwrite
属性配置为true
,可以修改该行为。
提示
如果allowOverwrite
配置为false
(默认),更新不会传播到外部存储(如果开启)。
# 4.处理数据
如果需要在添加新数据之前执行自定义逻辑,则可以使用数据流接收器。在将数据存储到缓存之前,数据流接收器用于以并置方式处理数据,其中实现的逻辑会在存储数据的节点上执行。
提示
注意数据流接收器不会自动将数据注入缓存,需要显式地调用put(…)
方法之一。
# 4.1.StreamTransformer
StreamTransformer
是StreamReceiver
的简单实现,用于更新流中的数据。数据流转换器利用了并置的特性,并在将要存储数据的节点上更新数据。
在下面的示例中,使用StreamTransformer
为文本流中找到的每个不同单词增加一个计数:
# 4.2.StreamVisitor
StreamVisitor
也是StreamReceiver
的一个方便实现,它会访问流中的每个键-值对,但不会更新缓存。如果键-值对需要存储在缓存内,那么需要显式地调用任意的put(...)
方法。
在下面的示例中,有两个缓存:marketData
和instruments
,收到market数据的瞬间就会将它们放入marketData
缓存的流处理器,映射到特定market数据的集群节点上的marketData
的流处理器的StreamVisitor
就会被调用,在分别收到market数据后就会用最新的市场价格更新instrument
缓存。
注意,根本不会更新marketData
缓存,它一直是空的,只是直接在数据将要存储的集群节点上简单利用了market数据的并置处理能力。
# 5.配置数据流处理器线程池大小
数据流处理器线程池专用于处理来自数据流处理器的消息。
默认池大小为max(8, CPU总核数)
,使用IgniteConfiguration.setDataStreamerThreadPoolSize(…)
可以改变线程池的大小。
18624049226