数据流处理
1.概述
Ignite提供了一个数据流API,可用于将大量数据注入Ignite集群,流化的目的是高效地快速加载数据。数据流化之后会自组织,并以并行和分区感知的方式在节点间分布。
数据流API设计为可扩展的,并为流式注入Ignite的数据提供至少一次
传递语义,这意味着每个条目至少处理一次。
2.使用
数据流处理器与某个缓存关联,并提供用于将数据注入缓存的接口。
在典型场景中,拿到数据流处理器之后,就可以使用其中某个方法将数据流式注入缓存中,剩下的就交给Ignite负责了。
拿到某个缓存的数据流处理器的方法如下:
// Get the data streamer reference and stream data.
try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("myCache")) {
// Stream entries.
for (int i = 0; i < 100000; i++)
stmr.addData(i, Integer.toString(i));
}
System.out.println("dataStreamerExample output:" + cache.get(99999));
在Ignite的Java版本中,数据流处理器是IgniteDataStreamer
接口的实现,IgniteDataStreamer
提供了一组addData(…)
方法来向缓存中添加键-值对,完整的方法列表,可以参见IgniteDataStreamer的javadoc。
using (var stmr = ignite.GetDataStreamer<int, string>("myCache"))
{
for (var i = 0; i < 1000; i++)
stmr.AddData(i, i.ToString());
}
流处理器可以以多线程的方式接收数据。
流式处理的最佳实践是数据预加载。
3.限制
数据流处理器无法保证:
- 默认除非全部成功完成,否则无法保证数据一致性;
- 数据立即加载,数据会被持有一段时间;
- 数据的顺序,数据加载到缓存的顺序可能与注入流处理器的顺序不同;
- 默认是用来处理外部存储的数据。
如果allowOverwite属性是false
(默认),那么要考虑:
- 流式处理的数据中不应有主键重复的数据;
- 数据流取消或者数据流节点故障可能导致数据不一致;
- 如果加载到持久化缓存中,同时创建快照可能包含不一致的数据,并且可能无法完全恢复。
Ignite数据流处理器的关键概念是数据流接收器和allowOverwite参数。
4.数据流接收器
Ignite的DataStreamer是一个编排器,它本身不写数据,该动作由StreamerReceiver执行。默认的接收器是为最快的负载和更少的网络请求而设计的,有了这个接收器,流处理器才会专注于主备数据的并行传输。
可以配置自己的接收器,具体参见流转换器和流访问器。在流接收器中实现的逻辑是在存储数据的节点上执行的。
try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("myCache")) {
stmr.allowOverwrite(true);
stmr.receiver((StreamReceiver<Integer, String>) (cache, entries) -> entries.forEach(entry -> {
// do something with the entry
cache.put(entry.getKey(), entry.getValue());
}));
}
private class MyStreamReceiver : IStreamReceiver<int, string>
{
public void Receive(ICache<int, string> cache, ICollection<ICacheEntry<int, string>> entries)
{
foreach (var entry in entries)
{
// do something with the entry
cache.Put(entry.Key, entry.Value);
}
}
}
public static void StreamReceiverDemo()
{
var ignite = Ignition.Start();
using (var stmr = ignite.GetDataStreamer<int, string>("myCache"))
{
stmr.AllowOverwrite = true;
stmr.Receiver = new MyStreamReceiver();
}
}
将接收器更改为非默认值会更改数据分发算法。对于非默认接收器,流处理器仅向主节点接收器发送数据批次,并且主节点需要另一个请求来发送备份写入。
提示
流接收器不会自动将数据写入缓存,因此需要显式调用put
方法。
5.覆写数据
默认是不会覆写现有数据的。可以通过将数据流的allowOverwrite
属性设置为true
来更改该行为。由于默认接收器不会覆盖数据,因此会自动选择其他的接收器。任何非默认接收器都被视为覆写的,即相当于allowOverwrite
属性为true
,不过自定义的接收器可以使用putIfAbsent
方法来解决这个问题。
提示
当allowOverwrite
为false
(默认),更新是不会传播到外部存储(如果开启)的。
stmr.allowOverwrite(true);
stmr.AllowOverwrite = true;
5.1.StreamTransformer
StreamTransformer
是StreamReceiver
的简单实现,用于更新流中的数据。数据流转换器利用了并置的特性,并在将要存储数据的节点上更新数据。
在下面的示例中,使用StreamTransformer
为文本流中找到的每个不同单词增加一个计数:
String[] text = { "hello", "world", "hello", "Ignite" };
CacheConfiguration<String, Long> cfg = new CacheConfiguration<>("wordCountCache");
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(cfg);
try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
// Allow data updates.
stmr.allowOverwrite(true);
// Configure data transformation to count instances of the same word.
stmr.receiver(StreamTransformer.from((e, arg) -> {
// Get current count.
Long val = e.getValue();
// Increment count by 1.
e.setValue(val == null ? 1L : val + 1);
return null;
}));
// Stream words into the streamer cache.
for (String word : text)
stmr.addData(word, 1L);
}
class MyEntryProcessor : ICacheEntryProcessor<string, long, object, object>
{
public object Process(IMutableCacheEntry<string, long> e, object arg)
{
//get current count
var val = e.Value;
//increment count by 1
e.Value = val == 0 ? 1L : val + 1;
return null;
}
}
public static void StreamTransformerDemo()
{
var ignite = Ignition.Start(new IgniteConfiguration
{
DiscoverySpi = new TcpDiscoverySpi
{
LocalPort = 48500,
LocalPortRange = 20,
IpFinder = new TcpDiscoveryStaticIpFinder
{
Endpoints = new[]
{
"127.0.0.1:48500..48520"
}
}
}
});
var cfg = new CacheConfiguration("wordCountCache");
var stmCache = ignite.GetOrCreateCache<string, long>(cfg);
using (var stmr = ignite.GetDataStreamer<string, long>(stmCache.Name))
{
//Allow data updates
stmr.AllowOverwrite = true;
//Configure data transformation to count instances of the same word
stmr.Receiver = new StreamTransformer<string, long, object, object>(new MyEntryProcessor());
//stream words into the streamer cache
foreach (var word in GetWords())
{
stmr.AddData(word, 1L);
}
}
Console.WriteLine(stmCache.Get("a"));
Console.WriteLine(stmCache.Get("b"));
}
static IEnumerable<string> GetWords()
{
//populate words list somehow
return Enumerable.Repeat("a", 3).Concat(Enumerable.Repeat("b", 2));
}
5.2.StreamVisitor
StreamVisitor
也是StreamReceiver
的一个实现,它会访问流中的每个键-值对。
在下面的示例中,有两个缓存:marketData
和instruments
,收到market数据的瞬间就会将它们放入marketData
缓存的流处理器,映射到某market数据的集群节点上的marketData
的流处理器的StreamVisitor
就会被调用,在分别收到market数据后就会用最新的市场价格更新instrument
缓存。
注意,根本不会更新marketData
缓存,它一直是空的,只是直接在数据将要存储的集群节点上简单利用了market数据的并置处理能力。
static class Instrument {
final String symbol;
Double latest;
Double high;
Double low;
public Instrument(String symbol) {
this.symbol = symbol;
}
}
static Map<String, Double> getMarketData() {
//populate market data somehow
return new HashMap<>();
}
@Test
void streamVisitorExample() {
try (Ignite ignite = Ignition.start()) {
CacheConfiguration<String, Double> mrktDataCfg = new CacheConfiguration<>("marketData");
CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instruments");
// Cache for market data ticks streamed into the system.
IgniteCache<String, Double> mrktData = ignite.getOrCreateCache(mrktDataCfg);
// Cache for financial instruments.
IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg);
try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer("marketData")) {
// Note that we do not populate the 'marketData' cache (it remains empty).
// Instead we update the 'instruments' cache based on the latest market price.
mktStmr.receiver(StreamVisitor.from((cache, e) -> {
String symbol = e.getKey();
Double tick = e.getValue();
Instrument inst = instCache.get(symbol);
if (inst == null)
inst = new Instrument(symbol);
// Update instrument price based on the latest market tick.
inst.high = Math.max(inst.high, tick);
inst.low = Math.min(inst.low, tick);
inst.latest = tick;
// Update the instrument cache.
instCache.put(symbol, inst);
}));
// Stream market data into the cluster.
Map<String, Double> marketData = getMarketData();
for (Map.Entry<String, Double> tick : marketData.entrySet())
mktStmr.addData(tick);
}
}
}
class Instrument
{
public readonly string Symbol;
public double Latest { get; set; }
public double High { get; set; }
public double Low { get; set; }
public Instrument(string symbol)
{
this.Symbol = symbol;
}
}
private static Dictionary<string, double> getMarketData()
{
//populate market data somehow
return new Dictionary<string, double>
{
["foo"] = 1.0,
["foo"] = 2.0,
["foo"] = 3.0
};
}
public static void StreamVisitorDemo()
{
var ignite = Ignition.Start(new IgniteConfiguration
{
DiscoverySpi = new TcpDiscoverySpi
{
LocalPort = 48500,
LocalPortRange = 20,
IpFinder = new TcpDiscoveryStaticIpFinder
{
Endpoints = new[]
{
"127.0.0.1:48500..48520"
}
}
}
});
var mrktDataCfg = new CacheConfiguration("marketData");
var instCfg = new CacheConfiguration("instruments");
// Cache for market data ticks streamed into the system
var mrktData = ignite.GetOrCreateCache<string, double>(mrktDataCfg);
// Cache for financial instruments
var instCache = ignite.GetOrCreateCache<string, Instrument>(instCfg);
using (var mktStmr = ignite.GetDataStreamer<string, double>("marketData"))
{
// Note that we do not populate 'marketData' cache (it remains empty).
// Instead we update the 'instruments' cache based on the latest market price.
mktStmr.Receiver = new StreamVisitor<string, double>((cache, e) =>
{
var symbol = e.Key;
var tick = e.Value;
Instrument inst = instCache.Get(symbol);
if (inst == null)
{
inst = new Instrument(symbol);
}
// Update instrument price based on the latest market tick.
inst.High = Math.Max(inst.High, tick);
inst.Low = Math.Min(inst.Low, tick);
inst.Latest = tick;
});
var marketData = getMarketData();
foreach (var tick in marketData)
{
mktStmr.AddData(tick);
}
mktStmr.Flush();
Console.Write(instCache.Get("foo"));
}
}
5.配置数据流处理器线程池大小
数据流处理器线程池专用于处理来自数据流处理器的批次数据。
默认池大小为max(8, CPU总核数)
,使用IgniteConfiguration.setDataStreamerThreadPoolSize(…)
可以改变线程池的大小。
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStreamerThreadPoolSize" value="10"/>
<!-- other properties -->
</bean>
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDataStreamerThreadPoolSize(10);
Ignite ignite = Ignition.start(cfg);
18624049226