# 数据注入和流处理
# 1.数据注入和流处理
Ignite数据加载和流处理功能可以以可扩展以及容错的方式将大量或者持续/流式数据注入集群。在一个中等规模的集群中,数据注入Ignite的速度可以轻易地达到每秒百万级。
数据加载
从像Ignite持久化存储或者第三方存储这样的数据源中进行数据的预加载,在下面的数据加载章节中有详细描述。
工作方式
- 客户端节点通过Ignite数据流处理器向Ignite缓存中注入有限的或者持续的数据流;
- 数据在Ignite数据节点间自动分区,并在数据节点间均匀分布;
- 数据流可以在Ignite数据节点上以并置的方式直接并行处理;
- 客户端也可以在数据流上执行并发的SQL查询。
数据流处理器
数据流处理器是通过IgniteDataStreamer
API定义的,它可以往Ignite数据流缓存中注入大量的持续不断的数据流,数据流处理器对于所有流入Ignite的数据以可扩展和容错的方式提供了至少一次保证。
查询数据
可以和Ignite的SQL、TEXT以及基于谓词的缓存查询一起使用Ignite数据索引能力的全部功能来在数据流中进行查询。
与已有的流处理技术集成
Ignite可以与各种主要的流处理技术和框架集成,比如Kafka、Camel、Storm或者JMS,从而为基于Ignite的架构带来更强大的流处理功能。
# 2.数据加载
Ignite提供了若干种技术,用于从第三方数据库或者其他的数据源进行初始化数据加载。
# 2.1.概述
用标准的缓存put(...)
和putAll(...)
操作加载大量的数据通常是比较低效的。Ignite提供了IgniteDataStreamer
API来与主要的流技术集成,还有IgniteCache
API,它们有助于以一个更高效的方式将大量数据注入Ignite缓存。
# 2.2.IgniteDataStreamer
数据流处理器是通过IgniteDataStreamer
API定义的,它可以将大量的连续数据注入Ignite缓存。数据流处理器以可扩展和容错的方式在数据被发送到集群节点之前通过把数据形成批次来获得高性能。
注意
数据流处理器可以随时用于大量数据加载,包括启动时的预加载。
想了解更多信息请参照数据流处理器。
# 2.3.IgniteCache.loadCache()
如果数据由第三方数据库持久化,Ignite需要将数据从磁盘上预加载到内存中,应用才能使用SQL等更高级的功能。
Ignite原生持久化
Ignite的原生持久化不需要在重启时将数据预热到内存,集群会直接处理磁盘上的数据,因此基于IgniteCache.loadCache()
的加载技术和原生持久化无关。
要从比如关系数据库这样的第三方存储中预加载数据可以使用IgniteCache.loadCache()
方法,它可以在不传入要加载的所有键的情况下进行缓存的数据加载。
在每个节点上IgniteCache.loadCache()
方法都会委托给CacheStore.loadCache()
方法,如果只想在本地节点上加载,可以用IgniteCache.localLoadCache()
方法。
注意
对于分区缓存以及像关系数据库这样的第三方存储,如果键没有映射到某个节点,不管是主还是备,都会被自动丢弃。 这与Ignite持久化存储无关,因为每个节点只会存储属于自己的数据。
下面是如何使用第三方存储的CacheStore.loadCache()
实现的示例,对于CacheStore
的完整示例,可以参照第三方存储章节。
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
...
// This method is called whenever "IgniteCache.loadCache()" or
// "IgniteCache.localLoadCache()" methods are called.
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null)
throw new CacheLoaderException("Expected entry count parameter is not provided.");
final int entryCnt = (Integer)args[0];
Connection conn = null;
try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
try (ResultSet rs = st.executeQuery()) {
int cnt = 0;
while (cnt < entryCnt && rs.next()) {
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
cnt++;
}
}
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
...
}
分区感知的数据加载
如上所述,同样的查询会在所有节点上执行,因为每个节点都会迭代所有的结果集,忽略掉不属于自己的数据,效率不是很高。如果数据库中的每条记录都保存分区ID,这个情况会有所改善。可以通过org.apache.ignite.cache.affinity.Affinity
接口来获得要存储在缓存中的任何数据的分区ID。
下面的代码片段可以获得每个要存储在缓存中的Person
对象的分区ID。
IgniteCache cache = ignite.cache(cacheName);
Affinity aff = ignite.affinity(cacheName);
for (int personId = 0; personId < PERSONS_CNT; personId++) {
// Get partition ID for the key under which person is stored in cache.
int partId = aff.partition(personId);
Person person = new Person(personId);
person.setPartitionId(partId);
// Fill other fields.
cache.put(personId, person);
}
当Person
对象能够分区感知,每个节点就可以只查询属于自己所属分区的数据。要做到这一点,可以将一个Ignite实例注入CacheStore,然后用它来确定本地节点所属的分区。
下面的代码片段演示了用Affinity
来只加载本地分区的数据,注意示例代码是单线程的,不过它可以通过分区ID高效地并行化。
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
// Will be automatically injected.
@IgniteInstanceResource
private Ignite ignite;
...
// This mehtod is called whenever "IgniteCache.loadCache()" or
// "IgniteCache.localLoadCache()" methods are called.
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
Affinity aff = ignite.affinity(cacheName);
ClusterNode locNode = ignite.cluster().localNode();
try (Connection conn = connection()) {
for (int part : aff.primaryPartitions(locNode))
loadPartition(conn, part, clo);
for (int part : aff.backupPartitions(locNode))
loadPartition(conn, part, clo);
}
}
private void loadPartition(Connection conn, int part, IgniteBiInClosure<Long, Person> clo) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where partId=?")) {
st.setInt(1, part);
try (ResultSet rs = st.executeQuery()) {
while (rs.next()) {
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
}
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
...
}
注意
注意键和分区的映射依赖于affinity函数中配置的分区数量(参照org.apache.ignite.cache.affinity.AffinityFunction
)。如果affinity函数配置改变,数据库中存储的分区ID必须相应地更新。
性能改进
为了维护一致性和持久性,Ignite的原生持久化支持预写日志,预写日志默认是开启的。不过这会影响数据预加载的性能,因此建议在数据预加载时禁用WAL,加载完成启用WAL,具体可以看WAL的Java API文档,以及SQL的ALTER TABLE文档。
# 3.数据流处理器
# 3.1.概述
数据流处理器是通过IgniteDataStreamer
API定义的,用于将大量的持续数据流注入Ignite缓存。数据流处理器以可扩展和容错的方式,为所有注入Ignite的流式数据提供至少一次保证。
数据流处理器不参与事务。
# 3.2.IgniteDataStreamer
快速地将大量的数据流注入Ignite的主要抽象是IgniteDataStreamer
,在内部它会适当地将数据整合成批次然后将这些批次与缓存这些数据的节点并置在一起。
高速加载是通过如下技术获得的:
- 映射到同一个集群节点上的数据会作为一个批次保存在缓冲区中;
- 多个缓冲区可以同时共存;
- 为了避免内存溢出,数据流处理器有一个缓冲区的最大数,它们可以并行的处理;
要将数据加入数据流处理器,调用IgniteDataStreamer.addData(...)
方法即可。
// Get the data streamer reference and stream data.
try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("myStreamCache")) {
// Stream entries.
for (int i = 0; i < 100000; i++)
stmr.addData(i, Integer.toString(i));
}
允许覆写
数据流处理器默认不会覆写已有的数据,这意味着如果遇到一个缓存内已有的数据,它会跳过该数据。这是最有效且高性能的模式,因为数据流处理器不需要在后台考虑数据的版本。
如果预想到数据在数据流缓存中可能存在以及希望覆写它,设置IgniteDataStreamer.allowOverwrite(true)
即可。
流处理器,CacheStore以及AllowOverwrite
AllowOverwrite
属性如果是false
(默认),第三方存储的更新会被忽略。
只有当AllowOverwrite
为true
时,第三方存储才会被更新。
# 3.3.StreamReceiver
对于希望执行一些自定义的逻辑而不仅仅是添加新数据的情况,可以利用一下StreamReceiver
API。
流接收器可以以并置的方式直接在缓存该数据条目的节点上对数据流做出反应,可以在数据进入缓存之前修改数据或者在数据上添加任何的预处理逻辑。
注意
注意StreamReceiver
不会自动地将数据加入缓存,需要显式地调用任意的cache.put(...)
方法。
# 3.4.StreamTransformer
StreamTransformer
是一个StreamReceiver
的简单实现,它会基于之前的值来修改数据流缓存中的数据。更新是并置的,即它会明确地在数据缓存的集群节点上发生。
在下面的例子中,通过StreamTransformer
在文本流中为每个发现的确切的单词增加一个计数。
# 3.5.StreamVisitor
StreamVisitor
也是StreamReceiver
的一个方便实现,它会访问流中的每个键值组。注意,访问器不会更新缓存。如果键值组需要存储在缓存内,那么需要显式地调用任意的cache.put(...)
方法。
在下面的示例中,有两个缓存:marketData
和instruments
,收到market数据的瞬间就会将它们放入marketData
缓存的流处理器,映射到特定market数据的集群节点上的marketData
的流处理器的StreamVisitor
就会被调用,在分别收到market数据后就会用最新的市场价格更新instrument
缓存。
注意,根本不会更新marketData
缓存,它一直是空的,只是直接在数据将要存储的集群节点上简单利用了market数据的并置处理能力。
CacheConfiguration<String, Double> mrktDataCfg = new CacheConfiguration<>("marketData");
CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache");
// Cache for market data ticks streamed into the system.
IgniteCache<String, Double> mrktData = ignite.getOrCreateCache(mrktDataCfg);
// Cache for financial instruments.
IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg);
try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer("marketData")) {
// Note that we do not populate 'marketData' cache (it remains empty).
// Instead we update the 'instruments' cache based on the latest market price.
mktStmr.receiver(StreamVisitor.from((cache, e) -> {
String symbol = e.getKey();
Double tick = e.getValue();
Instrument inst = instCache.get(symbol);
if (inst == null)
inst = new Instrument(symbol);
// Update instrument price based on the latest market tick.
inst.setHigh(Math.max(inst.getLatest(), tick);
inst.setLow(Math.min(inst.getLatest(), tick);
inst.setLatest(tick);
// Update instrument cache.
instCache.put(symbol, inst);
}));
// Stream market data into Ignite.
for (Map.Entry<String, Double> tick : marketData)
mktStmr.addData(tick);
}
18624049226