# 键-值数据网格
# 1.数据网格
Ignite针对越来越火的水平扩展概念而构建,具有实时按需增加节点的能力。它可以线性扩展至几百个节点,通过数据位置的强语义以及关联数据路由来降低冗余数据噪声。
Ignite数据网格是一个分布式内存键-值存储
,它可以视为一个分布式的分区化哈希映射,每个集群节点都持有所有数据的一部分,这意味着随着集群节点(服务端)的增加,就可以缓存更多的数据。
与其它键值存储系统不同,Ignite通过可插拔的哈希算法来决定数据的位置,每个客户端都可以通过一个哈希函数决定一个键属于哪个节点,而不需要任何特定的映射服务器或者协调器节点。
Ignite数据网格支持本地、复制、分区模式的数据集,可以使用标准SQL语法方便地进行跨数据集查询,同时还支持在数据中进行分布式SQL关联。
Ignite数据网格轻量快速,是目前在集群中支持数据的事务性和原子性的最快的实现之一。
数据一致性
只要集群处于在线状态,即使节点故障或拓扑发生变化,Ignite都会保证不同节点之间的数据始终保持一致。
JCache (JSR107)
Ignite实现了JCache
(JSR107)规范。
# 2.超越JCache
# 2.1.概述
Ignite是**JCache(JSR107)**规范的一个实现,JCache为数据访问提供了简单易用且功能强大的API。不过规范忽略了任何有关数据分布以及一致性的细节来允许开发商在自己的实现中有足够的自由度。
可以通过JCache实现:
- 基本缓存操作
- ConcurrentMap API
- 并行处理(EntryProcessor)
- 事件和度量
- 可插拔的持久化
在JCache之外,Ignite还提供了ACID事务,数据查询的能力(包括SQL),各种内存模型、查询、事务等。
# 2.2.IgniteCache
IgniteCache
接口是Ignite缓存实现的一个入口,提供了保存和获取数据,执行查询,包括SQL,迭代和扫描等等的方法。IgniteCache
是基于**JCache(JSR107)**的,所以在非常基本的API上可以减少到javax.cache.Cache
接口,不过IgniteCache
还提供了JCache规范之外的、有用的功能,比如数据加载,查询,异步模型等。
可以从Ignite
中直接获得IgniteCache
的实例:
Ignite ignite = Ignition.ignite();
// Obtain instance of cache named "myCache".
// Note that different caches may have different generics.
IgniteCache<Integer, String> cache = ignite.cache("myCache");
缓存名限制
因为这个已知的问题,缓存名不能包含文件名中禁止的字符(例如Linux上的/
或Windows上的\
)。
动态缓存
也可以动态地创建缓存实例,这时Ignite会在所有的符合条件的集群节点中创建和部署该缓存。动态缓存启动后,它也会自动部署到新加入的符合条件的节点上。
Ignite ignite = Ignition.ignite();
CacheConfiguration cfg = new CacheConfiguration();
cfg.setName("myCache");
cfg.setAtomicityMode(TRANSACTIONAL);
// Create cache with given name, if it does not exist.
IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cfg);
XML配置
在缓存节点上定义的基于Spring的XML配置的所有缓存同时也会自动地在所有的集群节点上创建和部署(不需要在每个集群节点上指定同样的配置)。
# 2.3.基本操作
下面是一些JCache基本原子操作的例子:
死锁
如果批量(比如IgniteCache#putAll
、IgniteCache#invokeAll
等)操作以并行方式执行,那么键应该是有序的,以避免死锁,应使用TreeMap
而不是HashMap
以保证一致性、有序性,注意这个对于ATOMIC
和TRANSACTIONAL
模式的缓存都是一样的。
# 2.4.EntryProcessor
当在缓存中执行puts
和updates
操作时,通常需要在网络中发送完整的状态数据,而EntryProcessor
可以直接在主节点上处理数据,只需要传输增量数据而不是全量数据。
此外,可以在EntryProcessor
中嵌入自定义逻辑,比如,获取之前缓存的数据然后加1。
原子性
EntryProcessor
通过给键加锁以原子性方式执行。
# 2.5.异步支持
和Ignite中的所有API一样,IgniteCache
实现了IgniteAsynchronousSupport
接口,因此可以以异步的方式使用。
// Enable asynchronous mode.
IgniteCache<String, Integer> asyncCache = ignite.cache("mycache").withAsync();
// Asynhronously store value in cache.
asyncCache.getAndPut("1", 1);
// Get future for the above invocation.
IgniteFuture<Integer> fut = asyncCache.future();
// Asynchronously listen for the operation to complete.
fut.listenAsync(f -> System.out.println("Previous cache value: " + f.get()));
# 2.6.将Ignite作为JCache实现
如果创建缓存实例时使用JCache管理器,JCache可以接受一个特定的供应商配置。如果从另一个JCache实现上移植,在不修改已有代码的前提下,就可以利用Ignite提供的分布式缓存的优势。
下面是使用JCache管理器时,如何进行Ignite缓存配置的示例:
// Get or create a cache manager.
CacheManager cacheMgr = Caching.getCachingProvider().getCacheManager();
// This is an Ignite configuration object (org.apache.ignite.configuration.CacheConfiguration).
CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>();
// Specify cache mode and/or any other Ignite-specific configuration properties.
cfg.setCacheMode(CacheMode.PARTITIONED);
// Create a cache based on the configuration created above.
Cache<Integer, String> cache = cacheMgr.createCache("aCache", cfg);
// Cache operations
Integer key = 1;
String value = "11";
cache.put(key, value");
System.out.println(cache.get(key));
另外,CachingProvider.getCacheManager(..)
方法可以接受一个特定的URI,它指向一个Ignite的xml格式配置文件,比如:
// Get or create a cache manager.
CacheManager cacheMgr = Caching.getCachingProvider().getCacheManager(
Paths.get("path/to/ignite/xml/configuration/file").toUri(), null);
// Get a cache defined in the configuration file provided above.
Cache<Integer, String> cache = cacheMgr.getCache("myCache");
// Cache operations
Integer key = 1;
String value = "11";
cache.put(key, value");
System.out.println(cache.get(key));
注意,上述的Cache<K, V>
实例只支持javax.cache.Cache
类型。
要使用Ignite提供的扩展功能,比如SQL、扫描或者持续查询,需要将javax.cache.Cache
实例转换成IgniteCache
实例,这样的:
// Get or create a cache manager.
CacheManager cacheMgr = Caching.getCachingProvider().getCacheManager(
Paths.get("path/to/ignite/xml/configuration/file").toUri(), null);
// Get a cache defined in the configuration file provided above.
Cache<Integer, String> cache = cacheMgr.getCache("myCache");
// Obtain org.apache.ignite.IgniteCache instance.
IgniteCache igniteCache = (IgniteCache) cache;
// Ignite specific cache operations
//......
# 3.缓存配置
为了满足应用的需求,Ignite可以对缓存的行为进行各种各样的配置,缓存的属性由CacheConfiguration
类定义,该类通过IgniteConfiguration.setCacheConfiguration()
方法传递。它定义了在集群中启动一个缓存所有必要的配置参数,在一个集群中,通过不同的名字,可以有多个缓存,Ignite缓存可以配置如下的特性:
分区和复制
:Ignite提供了3种不同的缓存操作模式:PARTITIONED
、REPLICATED
、LOCAL
,每个缓存只能配置一个模式,缓存模式由CacheMode
枚举定义。
分区丢失策略
:当某些分区由于与之对应的主或备节点故障而丢失时,可以配置缓存的工作方式。
主备副本
:在PARTITIONED
模式中,数据的主键归属的节点被称为主节点,对于缓存的数据,可以可选地配置任意数量的备份节点。
缓存组
:可以在单个缓存组中对缓存进行配置,这样可以共享各种内部结构,从而提高拓扑事件的处理能力以及减少总体内存的使用。
缓存模板
:如果希望用和集群中已有缓存相同的配置创建新的缓存时,缓存模板会非常有用,这样不用定义一长串的配置参数就可以创建一个缓存。
# 3.1.分区和复制
# 3.1.1.概述
Ignite提供了三种不同的缓存操作模式,分区
、复制
和本地
。缓存模型可以为每个缓存单独配置,缓存模型是通过CacheMode
枚举定义的。
数据分区内部实现
如果想了解Ignite分区的内部实现,以及再平衡的工作机制,可以看这个Wiki页面。
# 3.1.2.分区模式
PARTITIONED
模式是扩展性最好的分布式缓存模式,也是默认的模式。这种模式下,所有数据被均等地分布在分区中,所有的分区也被均等地拆分在相关的节点中,实际上就是为缓存的数据创建了一个巨大的内存内分布式存储。这个方式可以在所有节点上只要匹配总可用存储(内存和磁盘)就可以存储尽可能多的数据,因此,可以在集群的所有节点的内存中可以存储TB级的数据。也就是说,只要有足够多的节点,就可以存储足够多的数据。
与REPLICATED
模式不同,它更新是很昂贵的,因为集群内的每个节点都需要更新,而PARTITIONED
模式更新就很廉价,因为对于每个键只需要更新一个主节点(可选择一个或者多个备份节点),不过读取变得较为昂贵,因为只有特定节点才持有缓存的数据。
为了避免额外的数据移动,总是访问恰好缓存有要访问的数据的节点是很重要的,这个方法叫做关联并置,当工作在分区化缓存时强烈建议使用。
注意
分区化缓存适合于数据量很大而更新频繁的场合。
下图描述了一个分区缓存的简单视图,实际上,键A赋予了运行在JVM1上的节点,键B赋予了运行在JVM3上的节点等等。
下面的配置章节显示了如何配置缓存模式的例子。
# 3.1.3.复制模式
REPLICATED
模式中,所有数据都被复制到集群内的每个节点,因为每个节点都有效所以这个缓存模式提供了最大的数据可用性。不过这个模式每个数据更新都要传播到其它所有节点,因而会对性能和可扩展性产生影响。
Ignite中,复制缓存的实现类似于分区缓存,每个键都有一个主拷贝而且在集群内的其它节点也会有备份。比如下图中,对于键A,运行于JVM1的节点为主节点,但是同时它还存储了其它数据的拷贝(B、C、D)。
因为相同的数据被存储在所有的集群节点中,复制缓存的大小受到节点的有效存储(内存和磁盘)的限制。这个模式适用于读缓存比写缓存频繁的多而且数据集较小的场景,如果应用超过80%的时间用于查找缓存,那么就要考虑使用复制
缓存模式了。
注意
复制缓存适用于数据集不大而且更新不频繁的场合。
# 3.1.4.本地模式
LOCAL
模式是最轻量的模式,因为没有数据分布到其它节点。它适用于或者数据是只读的,或者需要定期刷新的场景中。当缓存数据失效需要从持久化存储中加载数据时,它也可以工作于通读
模式。除了分布化以外,本地缓存包括了分布式缓存的所有功能,比如自动数据回收、过期、磁盘交换、数据查询以及事务。
# 3.1.5.配置
缓存可以每个缓存分别配置,通过设置CacheConfiguration
的cacheMode
属性实现:
# 3.2.分区丢失策略
在整个集群的生命周期中,所有的主节点以及持有这些分区数据的备份节点全部故障,导致分区数据丢失,是有可能的,这会导致部分数据丢失,这个需要根据实际需要进行处理。比如,有些应用认为这是严重的问题,会阻止所有到这个分区的写操作,而其它的应用可能忽略掉这个问题,因为数据可以重新加载。
策略
Ignite支持如下的分区丢失策略:
READ_ONLY_SAFE
:所有缓存/表的写操作都会抛出异常,在线分区的读操作是可以的,丢失分区的读操作会抛出异常;READ_ONLY_ALL
:包括丢失分区在内的所有分区都是可以读的,所有分区的写操作都会抛出异常,从丢失的分区读取数据的结果是未定义的,而且不同的节点返回结果可能不同;READ_WRITE_SAFE
:在线分区的读写都是可以的,丢失分区的读写操作都会抛出异常;READ_WRITE_ALL
:所有的读写都会继续进行,就像所有分区都处于一致状态那样(就像没有分区丢失),从丢失分区的读操作是未定义的,并且不同节点返回结果可能不同;IGNORE
:该模式不会标记丢失的分区为丢失状态,假定没有发生分区丢失,并且立即清除分区丢失状态。从技术上来说,分区不会被加入lostPartitions
列表,这是与READ_WRITE_ALL
模式的主要区别。从丢失的分区读取数据的结果是未定义的,而且不同的节点返回结果可能不同,IGNORE
模式为默认模式。
分区丢失策略是在缓存层级进行配置的:
分区丢失事件处理
Ignite提供了一些功能来处理分区丢失事件。
首先,确保订阅了EVT_CACHE_REBALANCE_PART_DATA_LOST
事件,这样在分区丢失时会收到通知。
Ignite ignite = Ignition.ignite();
// Local listener that listenes to local events.
IgnitePredicate<CacheEvent> locLsnr = evt -> {
System.out.println("Received event [evt=" + evt.name() + "]");
return true; // Continue listening.
};
// Subscribe to specified cache events occuring on local node.
ignite.events().localListen(locLsnr,
EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
下一步,如果业务逻辑希望某时获取所有的丢失分区,那么要使用IgniteCache.lostPartitions()
方法。
// Cache reference
IgniteCache cache;
// Getting a list of the lost partitions.
Collection<Integer> lostPartitions = cache.lostPartitions();
// Performing some actions upon the partitions.
...
最后,如果确认所有的丢失分区都已经恢复(比如之前所有的主备节点故障之后,存储在Ignite持久化中的该分区数据,重新导入),那么需要使用Ignite.resetLostPartitions(...)
方法清除lost
标志,以使所有分区恢复正常:
// Clear partition's lost state and moves caches to a normal mode.
ignite.resetLostPartitions(Arrays.asList("cache1", "cache2"));
// check that there are no more lost partitions
boolean lostPartiion = cache.lostPartitions().isEmpty()
# 3.3.主备副本
# 3.3.1.概述
在PARTITIONED
模式下,数据的主键归属的节点叫做这些主键的主节点,对于缓存的数据,也可以可选地配置任意多个备份节点。如果副本数量大于0,那么Ignite会自动地为每个独立的键赋予备份节点,比如,如果副本数量为1,那么数据网格内缓存的每个键都会有2个备份,一主一备。
注意
因为性能原因备份默认是被关闭的。
# 3.3.2.配置备份
备份可以通过CacheConfiguration
的backups
属性进行配置,如下:
# 3.3.3.同步和异步备份
CacheWriteSynchronizationMode
枚举可以用来配置主备之间同步和异步更新。写同步模式告诉Ignite在完成写或者提交之前客户端节点是否要等待来自远程节点的响应。
同步写模式可以设置为下面的三种之一:
同步写模式 | 描述 |
---|---|
FULL_SYNC | 客户端节点要等待所有相关远程节点的写入或者提交完成(主和备)。 |
FULL_ASYNC | 客户端不需要等待参与节点的响应。这时远程节点可能会在任何缓存写入方法完成或Transaction.commit() 方法完成后稍微更新其状态。 |
PRIMARY_SYNC | 这是默认模式,客户端节点会等待主节点的写或者提交完成,但不会等待备份节点的更新完成。 |
缓存数据一致性
注意不管那种写同步模式,在使用事务时缓存数据都会保持在所有相关节点上的完整一致性。
写同步模式可以通过CacheConfiguration
的writeSynchronizationMode
属性进行配置,像下面这样:
# 3.4.缓存组
对于集群中的缓存来说,总有一个开销,即缓存被拆分为分区后其状态必须在每个集群节点上进行跟踪以满足系统的需要。
比如,每个节点会维护一个叫做分区映射的数据结构,它驻留在Java堆中,并且消耗了部分空间,当拓扑变更(新节点加入集群或者某个节点离开)事件触发时,分区映射会在集群节点间进行交换,并且如果开启了Ignite的原生持久化,那么对于每个分区来说,都会在磁盘上打开一个文件进行读写,因此,如果有更多的缓存和分区:
- 分区映射就会占用更多的Java堆,每个缓存都有自己的分区映射;
- 新节点加入集群就会花费更多的时间;
- 节点离开集群也会因为再平衡花费更多的时间;
- 打开中的分区文件就会更多从而影响检查点的性能;
通常,如果只有几十甚至几百个缓存时,不用担心这些问题,但是如果增长到上千时,这类问题就会凸显。
要避免这个情况,可以考虑使用缓存组,一个组内的缓存会共享各种内部数据结构比如上面提到的分区映射,这样,会提高拓扑事件处理的效率以及降低整体的内存使用量。注意,从API上来看,缓存是不是组的一部分并没有什么区别。
通过配置CacheConfiguration
的groupName
属性可以创建一个缓存组,示例如下:
从上例来看,Person
和Organization
都属于group1
。
键-值对如何区分?
将缓存分配给一个缓存组之后,它的数据就会被存储于共享分区的内部数据结构中,插入缓存中的每个键都会附加该键所属的缓存的唯一ID,这个ID是从缓存名派生的。这些都是透明的,从而使得Ignite可以混合存储于共享分区、B+树以及分区文件中的缓存数据。
将缓存分组的原因很简单,如果决定对1000个缓存进行分组,那么存储分区数据、分区映射以及打开的Ignite持久化分区文件数量都会变为原来的千分之一。
分组适用于所有场景么?
虽然有这么多的好处,但是它可能影响读操作和索引的性能,这是由于所有的数据和索引都混合在一个共享的数据结构(分区映射、B+树)中,查询的时间变长导致的。 因此,如果集群有数十个和数百个节点和缓存,并且由于内部结构、检查点性能下降和/或节点到集群的连接速度较慢而遇到Java堆使用增加的情况,可以考虑使用缓存组。
# 3.5.缓存模板
# 3.5.1.概述
模板是CacheConfiguration
类的实例,可以通过Ignite.addCacheConfiguration(...)
方法在集群中注册,之后可以作为创建新缓存的基础,从模板创建的缓存会继承模板的所有属性。
如果希望用和集群中已有缓存相同的配置创建新的缓存时,缓存模板会非常有用,这样不用定义一长串的配置参数就可以创建一个缓存。目前,Ignite中的模板支持CREATE TABLE和REST命令。如果使用了模板名,Ignite就会用模板中的所有配置来实例化新缓存。
# 3.5.2.定义缓存模板
要创建一个缓存模板,可以定义一个缓存配置然后将其加入Ignite
实例,如下所示。如果希望在XML文件中定义缓存模板,需要在模板名之后加一个*
号,用于区别这个配置是一个模板而不是一个实际的缓存。
在集群中注册缓存模板后,就可以用它创建相同配置的缓存。
# 3.5.3.基于模板创建缓存
可以使用下面的命令基于模板创建缓存:
使用REST命令
http://host:port/ignite?cmd=getorcreate&cacheName=mynewCache&templateName=myCacheTemplate
在这个命令中,需要将host
和port
改成实际的值。
使用CREATE TABLE命令
CREATE TABLE IF NOT EXISTS Person (
id int,
city_id int,
name varchar,
age int,
company varchar,
PRIMARY KEY (id, city_id)
) WITH "template=myCacheTemplate, key_type=PersonKey, value_type=MyPerson";
# 4.缓存查询
# 4.1.概述
Ignite提供了非常优雅的查询API,支持基于谓词的扫描查询、SQL查询(ANSI-99兼容)、文本查询。对于SQL查询,Ignite提供了内存索引,因此所有的数据检索都是非常快的。
Ignite也通过IndexingSpi
和SpiQuery
类提供对自定义索引的支持。
# 4.2.主要的抽象
IgniteCache
有若干个查询方法,这些方法可以获得一些Query
的子类以及返回QueryCursor
。
查询
Query
抽象类表示一个在分布式缓存上执行的抽象分页查询。可以通过Query.setPageSize(...)
方法设置返回游标的每页大小(默认值是1024
)。
查询游标
QueryCursor
表示查询的结果集,可以透明地进行一页一页地迭代。每当迭代到每页的最后时,会自动地在后台请求下一页的数据,当不需要分页时,可以使用QueryCursor.getAll()
方法,它会获得整个查询结果集然后存储在集合里。
关闭游标
如果调用了QueryCursor.getAll()
方法,游标会自动关闭。如果通过for循环迭代一个游标或者显式地获得Iterator
,必须显式地关闭或者使用AutoCloseable
语法。
# 4.3.扫描查询
扫描查询可以通过用户定义的谓词以分布式的形式进行缓存的查询。
扫描查询还支持可选的转换器闭包,它可以在服务端节点在将数据发送到客户端之前对其进行转换。这个很有用,比如,当只是希望从一个大的对象获取少量字段时,这样可以最小化网络的数据传输量,下面的示例显示了如何只获取对象的键,而不发送对象的值。
# 4.4.SQL查询
Ignite的SQL查询请参照SQL网格的相关章节。
# 4.5.文本查询
Ignite也支持通过Lucene索引实现的基于文本的查询。
文本查询:
IgniteCache<Long, Person> cache = ignite.cache("mycache");
// Query for all people with "Master Degree" in their resumes.
TextQuery txt = new TextQuery(Person.class, "Master Degree");
try (QueryCursor<Entry<Long, Person>> masters = cache.query(txt)) {
for (Entry<Long, Person> e : cursor)
System.out.println(e.getValue().toString());
}
# 4.6.通过注解进行查询的配置
索引可以在代码中通过@QuerySqlField
注解进行配置,来告诉Ignite那个类型要被索引,键-值对可以传入CacheConfiguration.setIndexedTypes(MyKey.class, MyValue.class)
方法。注意这个方法只会接受成对的类型,一个是键类型,一个是值类型。
public class Person implements Serializable {
/** Person ID (indexed). */
@QuerySqlField(index = true)
private long id;
/** Organization ID (indexed). */
@QuerySqlField(index = true)
private long orgId;
/** First name (not-indexed). */
@QuerySqlField
private String firstName;
/** Last name (not indexed). */
@QuerySqlField
private String lastName;
/** Resume text (create LUCENE-based TEXT index for this field). */
@QueryTextField
private String resume;
/** Salary (indexed). */
@QuerySqlField(index = true)
private double salary;
...
}
# 4.7.使用QueryEntity进行查询配置
索引和字段也可以通过org.apache.ignite.cache.QueryEntity
进行配置,它便于通过Spring使用XML进行配置,详细信息可以参照JavaDoc。它与@QuerySqlField
注解是等价的,因为在内部类注解会被转换成查询实体。
# 5.近缓存
分区缓存和复制缓存也可以通过近缓存
前移,它是堆内存中一个较小的本地缓存,可以用来存储最近或者最频繁访问的数据。和分区缓存一样,可以控制近缓存的大小以及回收策略。
近缓存可以通过在Ignite.createNearCache(NearCacheConfiguration)
中传入NearCacheConfiguration
或者通过调用Ignite.getOrCreateNearCache(String, NearCacheConfiguration)
方法在客户端节点直接创建。使用Ignite.getOrCreateCache(CacheConfiguration, NearCacheConfiguration)
,可以在动态启动一个分布式缓存的同时为其创建一个近缓存。
通常只要用了Ignite的关联并置,近缓存就不应该用了。如果计算与相应的分区化缓存节点是并置的,那么近缓存也不需要了,因为所有数据只在分区缓存的本地才有效。不过有时无法将计算任务发送给远端节点,这时近缓存就可以显著提高应用的扩展性和总体性能。
事务
近缓存是完全事务性的,当服务端的数据发生改变时会自动地获得更新或者失效。
服务端节点的近缓存
当以非并置的方式访问服务器端的PARTITIONED
缓存中的数据时,也可以通过CacheConfiguration.setNearConfiguration(...)
方法在服务端节点上配置近缓存。
# 5.1.配置
CacheConfiguration
中与近缓存有关的大部分参数都会继承于服务端的配置,比如,如果服务端缓存有一个ExpiryPolicy
配置,近缓存中的条目也会基于同样的策略。
下表中列出的参数是不会从服务端配置中继承的,是通过NearCacheConfiguration
对象单独提供的:
setter方法 | 描述 | 默认值 |
---|---|---|
setNearEvictionPolicy(CacheEvictionPolicy) | 近缓存回收策略 | 无 |
setNearStartSize(int) | 缓存初始大小 | 375,000 |
# 6.持续查询
# 6.1.持续查询
持续查询可以监控缓存中数据的变化,启动后就会收到符合查询条件的数据变化的通知。
持续查询的功能是通过ContinuousQuery
类体现的,下面会详细描述。
持续查询和MVCC
对于开启了MVCC的缓存,持续查询有一些功能限制。
事务化SQL和MVCC的测试版
在Ignite的2.7版本中,事务化SQL和MVCC功能以测试版本形式发布,允许开发者试用并分享反馈,该功能不建议用在生产环境。
# 6.1.1.初始查询
当要执行持续查询时,在将持续查询注册在集群中以及开始接收更新之前,可以选择指定一个初始化查询。
初始化查询可以通过ContinuousQuery.setInitialQuery(Query)
方法进行设置,并且可以是任意查询类型,包括扫描查询,SQL查询和文本查询。
# 6.1.2.远程过滤器
这个过滤器在给定键对应的主和备节点上执行,然后评估更新是否需要作为一个事件传播给该查询的本地监听器。
如果过滤器返回true
,那么本地监听器就会收到通知,否则事件会被忽略。产生更新的特定主和备节点,会在主/备节点以及应用端执行的本地监听器之间,减少不必要的网络流量。
远程过滤器可以通过ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>)
方法进行设置。
# 6.1.3.本地监听器
当缓存被修改时(一个条目被插入、更新或者删除),更新对应的事件就会发送给持续查询的本地监听器,之后应用就可以做出对应的反应。
当事件通过了远程过滤器,它们就会被发送给客户端,通知哪里的本地监听器。
本地监听器是通过ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>)
方法设置的。
# 6.1.4.远程转换器
持续查询默认会将整个更新后的对象发送给应用端的监听器,这会导致网络的过度使用,如果传输的对象很大,更是如此。另外,应用通常更希望得到更新对象的字段的子集,而不是整个对象。
为了解决这个问题,可以使用ContinuousQueryWithTransformer
,它可以为每个更新的对象配置一个自定义的转换器工厂,它会在远程节点执行,然后只将转换后的结果发给监听器。
// Create a new continuous query with the transformer.
ContinuousQueryWithTransformer qry = new ContinuousQueryWithTransformer();
// Factory to create transformers.
Factory factory = FactoryBuilder.factoryOf(
// Return one field of a complex object.
// Only this field will be sent over to a local listener over the network.
(IgniteClosure<CacheEntryEvent, String>)
event -> ((Organization)event.getValue()).name());
qry.setRemoteTransformerFactory(factory);
// Listener that will receive transformed data.
qry.setLocalListener(names -> {
for (Object name : names)
System.out.println("New organization name: " + name);
});
ContinuousQueryWithTransformer
的使用示例,GitHub上有。
# 6.2.事件传递保证
持续查询的实现会明确地保证,一个事件只会传递给客户端的本地监听器一次。
除了主节点,在每个备份节点维护一个更新队列也是可能的。如果主节点故障或者由于某些其它原因拓扑发生了改变,之后每个备份节点会刷新它的内部队列的内容给客户端,确保事件都会被传递给客户端的本地监听器。
当所有的备份节点都刷新它们的队列给客户端时,为了避免重复通知,Ignite会为每个分区维护一个更新计数器。当某个分区的一个条目已经更新,这个分区的计数器在主节点和备份节点都会增加。这个计数器的值会和事件通知一起发给客户端,该客户端还维护该映射的副本。如果客户端收到了一个更新,对应的计数小于它的本地映射,这个更新会被视为重复然后被丢弃。
一旦客户端确认一个事件已经收到,主节点和备份节点会从它们的备份队列中删除该事件的记录。
# 6.3.示例
关于描述持续查询如何使用的完整示例,已经随着Ignite的二进制包一起发布,名为CacheContinuousQueryExample
,相关的代码在GitHub上也有。
# 7.关联并置
数据和计算以及数据和数据的并置可以显著地提升应用的性能和可扩展性。
# 7.1.数据与数据的并置
在许多情况下,如果不同的缓存键被同时访问,那么将它们并置在一起是很有利的。通常来说业务逻辑需要访问不止一个的缓存键,通过将它们并置在一起可以确保具有同一个affinityKey
的所有键都会缓存在同一个处理节点上,从而避免从远程节点获取数据的昂贵网络开销。
例如,有一个Person
和Company
对象,然后希望将Person
对象和其工作的Company
对象并置在一起。
具体做法是,用于缓存Person
对象的缓存键应该有一个属性加注了@AffinityKeyMapped
注解,它会提供用于并置的Company
键的值,为了方便也可以选用AffinityKey
类。
当在缓存上使用SQL以及创建SQL表时,需要通过CREATE TABLE的AFFINITY_KEY
参数定义关联键。
Scala中的注解
注意,如果Scala的case class用于键类并且它的构造函数参数之一加注了@AffinityKeyMapped
注解,默认这个注解并不会正确地用于生成的字段,因此也就不会被Ignite识别。要覆盖这个行为,可以使用@field
元注解而不是@AffinityKeyMapped
(看下面的示例)。
SQL关联
当在分区缓存的数据上执行分布式SQL关联时,一定要确保关联的键是并置的。
# 7.2.数据和计算的并置
也可以向缓存数据的节点发送计算任务,这个概念叫做数据和计算的并置,它可以向特定的节点发送整个工作单元。
要将数据和计算并置在一起,需要使用IgniteCompute.affinityRun(...)
和IgniteCompute.affinityCall(...)
方法。
下面的例子显示了如何和上面提到的缓存Person
和Company
对象的同一个集群节点进行并置计算:
# 7.3.IgniteCompute和EntryProcessor
IgniteCompute.affinityRun(...)
和IgniteCache.invoke(...)
方法都提供了数据和计算并置的能力。主要的不同在于invoke(...)
方法是原子的并且执行时在键上加了锁,无法从EntryProcessor
逻辑内部访问其它的键,因为它会触发一个死锁。
而affinityRun(...)
和affinityCall(...)
不持有任何锁。比如,在这些方法内可以开启多个事务或者执行缓存查询而不用担心死锁,这时Ignite会自动检测处理是并置的然后对事务采用优化过的轻量级一阶段提交而不是二阶段提交。
注意
关于IgniteCache.invoke(...)
方法的更多信息,请参照EntryProcessor文档。
# 7.4.关联函数
分区的映射控制一个分区缓存在哪个网格节点或者哪些节点上。AffinityFunction
是一个可插拔的API用于确定网格中分区到节点的一个理想映射。当集群拓扑发生变化时,分区到节点的映射可能变成次优解(不同于关联函数提供的理想分布),直到再平衡结束。
Ignite提供了RendezvousAffinityFunction
,这个函数允许分区到节点的映射有点区别(即一些节点可能比其它节点持有稍微多一点的分区数量)。不过它保证当拓扑发生变化时,分区只会迁移到一个新加入的节点或者只来自一个离开的节点,集群内已有的节点间不会发生数据的交换。
注意,缓存关联函数不会直接映射键和节点,它映射的是键和分区。分区只是来自一个有限集合的简单的数字(0-默认的1024)。在键映射到它们的分区之后(即获得了它们的分区号),已有的分区到节点的映射会用于当前的拓扑版本,如果拓扑发生变化,这个键到分区的映射就会失效。
下面的代码片段显示了如何自定义和配置一个关联函数:
AffinityFunction
是一个可插拔的API,也可以提供这个函数的自定义实现,AffinityFunction
API的三个主要方法是:
partitions()
:获取一个缓存的分区总数量,集群启动之后无法改变。partition(...)
:给定一个键,这个方法确定一个键属于哪个分区,这个映射在时间上不会改变。assignPartitions(...)
:这个方法在拓扑发生变化时每次都会被调用,这个方法对于给定的拓扑返回一个分区到节点的映射。
映射的故障安全
集群中分区的安排要遵循主备副本不在同一台物理机上,要确保不发生这样的情况,可以调整RendezvousAffinityFunction
的excludeNeighbors
参数。
有时将一个分区的主备副本放在不同的机架上也是很有用的。这时可以为每个节点赋予一个特别的属性然后在RendezvousAffinityFunction
上使用AffinityBackupFilter
属性来排除同一个机架中被分配用于备份副本的节点。
此外,可以用ClusterNodeAttributeAffinityBackupFilter
创建AffinityBackupFilter
,该AffinityBackupFilter
根据节点对某些环境变量(例如AVAILABILITY_ZONE
)的值来分离主备副本。有关如何配置此属性的更多信息,请参见ClusterNodeAttributeAffinityBackupFilter.java
类的javadoc。
# 7.5.关联键映射器
CacheAffinityKeyMapper
是一个可插拔的API,负责为一个缓存键获取关联键。通常缓存键本身就用于关联键,不过为了与其它的缓存键并置,有时改变一个缓存键的关联是很重要的。
CacheAffinityKeyMapper
的主要方法是affinityKey(key)
,它会为一个缓存键返回一个affinityKey
。Ignite默认会查找加注@CacheAffinityKeyMapped
注解的所有属性和方法。如果没有找到这样的属性或者方法,那么缓存键本身就会用做关联键。如果找到了这样的属性或者方法,那么这个属性或者方法的值就会从CacheAffinityKeyMapper.affinityKey(key)
方法返回,这样只要需要,就可以指定一个替代的关联键,而不是缓存键本身。
# 8.事务
# 8.1.事务
# 8.1.1.原子化模式
Ignite支持几种类型的缓存操作,事务
模式和原子
模式,原子模式
支持多个原子性操作,一次一个。事务模式
中,可以将一个或多个键上的多个缓存操作组成单个逻辑操作(称为事务)。在这些键上的所有操作都不会有其它的操作干扰,并且要么全部成功,要么全部失败。操作没有部分执行。
原子化模式由CacheAtomicityMode
枚举定义(TRANSACTIONAL
、TRANSACTIONAL_SNAPSHOT
、ATOMIC
),可以通过CacheConfiguration
的atomicityMode
属性进行配置。
TRANSACTIONAL
和TRANSACTIONAL_SNAPSHOT
模式支持完全兼容ACID的事务,TRANSACTIONAL
模式只支持基于Java API的键-值事务,这个模式的事务有不同的并发模型和隔离级别。
TRANSACTIONAL_SNAPSHOT
模式同时支持键-值事务和SQL事务,它是通过多版本并发控制(MVCC)来支持两种类型的事务的,后面会详细介绍多版本并发控制以及这个模式的限制。
事务化SQL和MVCC的测试版
在Ignite的2.7版本中,事务化SQL和MVCC功能以测试版本形式发布,允许开发者试用并分享反馈,该功能不建议用在生产环境。
只有在需要支持ACID操作时,才需要开启TRANSACTIONAL
和TRANSACTIONAL_SNAPSHOT
模式。
ATOMIC
模式因为避免了事务锁,所以性能更好,但是仍然提供了单个数据的原子性和一致性。ATOMIC
模式的另一个不同是批量写,比如putAll(...)
和removeAll(...)
方法不在一个事务中执行,因此可能部分失败并抛出CachePartialUpdateException
,它里面包含了更新失败的键列表。
原子化模式是在CacheAtomicityMode
枚举中定义的,
性能
注意当使用ATOMIC
模式时,事务是被禁用的,因为不需要事务,因此可以获得更高的性能和吞吐量。
# 8.1.2.IgniteTransactions
IgniteTransactions
接口包括了启动和结束事务的功能,以及订阅监听器或者获得指标数据。
跨缓存事务
可以在一个事务中组合来自不同缓存的多个操作。注意它可以在一个事务中更新不同类型的缓存,比如复制
和分区
缓存。
近缓存事务
近缓存是完全事务化的(TRANSACTIONAL
模式),当数据在服务端改变时,会自动地获得更新或者失效。TRANSACTIONAL_SNAPSHOT
模式不支持近缓存。
警告
不能在具有不同原子化模式的多个缓存上执行事务。单个事务中包含的所有缓存都必须具有相同的原子化模式(TRANSACTIONAL
或TRANSACTIONAL_SNAPSHOT
)。
可以像下面这样获得IgniteTransactions
的一个实例:
Ignite ignite = Ignition.ignite();
IgniteTransactions transactions = ignite.transactions();
下面是一个事务如何在Ignite中执行的例子:
try (Transaction tx = transactions.txStart()) {
Integer hello = cache.get("Hello");
if (hello == 1)
cache.put("Hello", 11);
cache.put("World", 22);
tx.commit();
}
事务对象生命周期
一个Transaction
对象用完之后需要关闭,要保证不出差错,可以这么做:
- 在
try-with-resources
语句中启动Transaction
,它最终会调用close()方法; - 使用
finally
代码块,手工调用tx.close()
方法。
事务化方法
当缓存开启事务时,并不是IgniteCache
API中的所有方法都支持事务,需要看它的方法签名,如果有throws TransactionException
,那么它就满足ACID原则,可以安全地用于分布式事务。
# 8.1.3.2阶段提交(2PC)
Ignite在事务中使用2阶段提交(2PC)的协议,不过也会尽可能地使用一阶段提交。在一个事务中当数据更新时,在调用commit()
方法之前,Ignite会在本地事务映射中保持事务状态,这时只要需要,数据都会被传输到参与事务的远程节点。
顾名思义,有两个阶段:准备和提交,具体可以参照如下文章:
- Apache Ignite事务架构:2阶段提交协议
- Apache Ignite事务架构:并发模型和隔离级别
- Apache Ignite事务架构:故障和恢复
- Apache Ignite事务架构:Ignite持久化的事务处理
- Apache Ignite事务架构:第三方持久化的事务处理
或者,也可以看下面的资料,了解事务子系统的内部实现。
ACID完整性
Ignite提供了完整的ACID(原子性、一致性、隔离性和持久性)兼容事务来确保一致性。
第三方持久化提交
对于使用第三方持久化的缓存事务更新,该更新会从发起事务的节点写入底层数据库,即使该节点是客户端节点也是如此,因此要确保该节点对底层数据库具有写权限。
警告
如果事务更新使用第三方持久化数据库的多个缓存,则每个数据库将按照顺序提交。如果任何数据库未能提交更新,例如当一个节点故障时,所有以前的数据库将无法回滚这些更新。 因此不建议在单个事务中更新具有不同外部持久化存储的缓存。此外如果需要使用事务更新具有第三方持久化存储的缓存,请确保所有缓存使用相同的数据库。
# 8.1.4.死锁检测
当处理分布式事务时必须要遵守的主要规则是参与一个事务的键的锁,必须按照同样的顺序获得,违反这个规则就可能导致分布式死锁。
Ignite无法避免分布式死锁,而是有一个内建的功能来使调试和解决这个问题更容易。
在下面的代码片段中,事务启动时带有超时限制,如果到期,死锁检测过程就会试图查找一个触发这个超时的可能的死锁。当超过超时时间时,无论死锁如何,都会向应用层抛出CacheException
,并将TransactionTimeoutException
作为其触发原因。不过如果检测到了一个死锁,返回的TransactionTimeoutException
的触发原因会是TransactionDeadlockException
(至少涉及死锁的一个事务)。
try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.READ_COMMITTED, 300, 0)) {
cache.put(1, 1);
cache.put(2, 1);
tx.commit();
}
catch (CacheException e) {
if (e.getCause() instanceof TransactionTimeoutException &&
e.getCause().getCause() instanceof TransactionDeadlockException)
System.out.println(e.getCause().getCause().getMessage());
}
TransactionDeadlockException
里面包含了有用的信息,有助于找到导致死锁的原因。
Deadlock detected:
K1: TX1 holds lock, TX2 waits lock.
K2: TX2 holds lock, TX1 waits lock.
Transactions:
TX1 [txId=GridCacheVersion [topVer=74949328, time=1463469328421, order=1463469326211, nodeOrder=1], nodeId=ad68354d-07b8-4be5-85bb-f5f2362fbb88, threadId=73]
TX2 [txId=GridCacheVersion [topVer=74949328, time=1463469328421, order=1463469326210, nodeOrder=1], nodeId=ad68354d-07b8-4be5-85bb-f5f2362fbb88, threadId=74]
Keys:
K1 [key=1, cache=default]
K2 [key=2, cache=default]
死锁检测是一个多步过程,根据集群中节点的数量、键以及可能导致死锁涉及的事务数,可能需要做多次迭代。一个死锁检测的发起者是发起事务并且出现TransactionTimeoutException
错误的那个节点,这个节点会检查是否发生了死锁,通过与其它远程节点交换请求/响应,并且准备一个与死锁有关的、由TransactionDeadlockException
提供的报告,每个这样的消息(请求/响应)都会被称为一次迭代。
因为死锁检测过程不结束,事务就不会回滚,有时,如果希望对于事务回滚有一个可预测的时间,调整一下参数还是有意义的(下面会描述)。
IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS
:指定死锁检测过程迭代的最大数,如果这个属性的值小于等于0,死锁检测会被禁用(默认为1000);IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT
:指定死锁检测机制的超时时间(默认为1分钟)。
注意如果迭代次数太少,可能获得一个不完整的死锁检测报告。
注意
如果想彻底避免死锁,可以看下面的无死锁事务章节。
# 8.1.5.无死锁事务
对于乐观
的可序列化
事务,锁不是按顺序获得的。该模式中键可以按照任何顺序访问,因为事务锁是通过一个额外的检查以并行的方式获得的,这使得Ignite可以避免死锁。
这里需要引入几个概念来描述可序列化
的事务锁的工作方式。Ignite中的每个事务都会被赋予一个叫做XidVersion
的可比较的版本号,事务提交时该事务中修改的每个条目都会被赋予一个叫做EntryVersion
的新的版本号,一个版本号为XidVersionA
的乐观可序列化
事务在如下情况下会抛出TransactionOptimisticException
异常而失败:
- 有一个进行中的
悲观
的或者非可序列化乐观
事务在可序列化
事务中的一个条目上持有了一个锁; - 有另外一个进行中的版本号为
XidVersionB
的乐观可序列化
事务,在XidVersionB > XidVersionA
时以及这个事务在可序列化
事务中的一个条目上持有了一个锁; - 在该
乐观可序列化
事务获得所有必要的锁时,存在在提交之前的版本与当前版本不同的条目;
注意
在一个高并发环境中,乐观锁可能出现高事务失败率,而悲观锁如果锁被事务以一个不同的顺序获得可能导致死锁。
不过在一个同质化的环境中,乐观可序列化锁对于大的事务可能提供更好的性能,因为网络交互的数量只取决于事务相关的节点的数量,而不取决于事务中的键的数量。
# 8.1.6.失败事务处理
如下的异常可能导致事务失败:
异常名称 | 描述 |
---|---|
由TransactionTimeoutException 触发的CacheException | 事务超时会触发TransactionTimeoutException ,解决办法 :可以增加超时时间或者缩短事务执行时间 |
TransactionDeadlockException 触发TransactionTimeoutException ,再触发CacheException | 事务死锁会触发这个异常,解决办法 :使用死锁检测机制调试和修正死锁,或者切换到乐观序列化事务(无死锁事务)。 |
TransactionOptimisticException | 某种原因的乐观事务失败会抛出这个异常,大多数情况下,该异常发生在事务试图并发更新数据的场景中。解决办法 :重新执行事务。 |
TransactionRollbackException | 事务自动或者手动回滚时,可能抛出这个异常,这时,数据状态是一致的。解决方法 :因为数据状态是一致的,所以可以对事务进行重试。 |
TransactionHeuristicException | 这是一个不太可能发生的异常,由Ignite中意想不到的内部错误或者通信错误导致,该异常存在于事务子系统无法预知的不确定场景中,目前没有被合理地处理。解决办法 :如果出现该异常,数据可能不一致,这时需要对数据进行重新加载,或者报告给Ignite开发社区。 |
事务重试
对于乐观和悲观事务的重试,都是合理的,即使因为网络或者节点故障导致事务失败,Ignite仍然会利用备份或者磁盘上的可用数据,来保证数据一致性。 但是,不要在应用层无限地进行事务重试,包括在有限的时间内这样做也不好。
# 8.1.7.长时间运行事务终止
在Ignite集群中,部分事件会触发分区映射的交换过程以及数据的再平衡,来保证整个集群的数据分布,这个事件的一个例子就是集群拓扑变更事件,它会在新节点加入或者已有节点离开时触发,还有,新的缓存或者SQL表创建时,也会触发分区映射的交换。
当分区映射交换开始时,Ignite会在特定的阶段拿到一个全局锁,在未完成的事务并行执行时无法获得锁,这些事务会阻止分区映射交换进程,从而阻断一些新节点加入进程这样的一些操作。
使用TransactionConfiguration.setTxTimeoutOnPartitionMapExchange(...)
方法,可以配置长期运行事务阻断分区映射交换的最大时间,时间一到,所有的未完成事务都会回滚,让分区映射交换进程先完成。
下面的示例显示如何配置超时时间:
如果事务因为超时而回滚,可以捕获并且处理TransactionTimeoutException
。
# 8.1.8.集成JTA
Ignite可以通过TransactionConfiguration#setTxManagerFactory
方法配置一个JTA事务管理器搜索类,事务管理器工厂是一个工厂,它给Ignite提供了一个JTA事务管理器的实例。
Ignite提供了一个CacheJndiTmFactory
工厂,它是一个通过JNDI名字查找事务管理器的现成的事务管理器工厂实现。
设置了之后,在事务化缓存中的每一次缓存操作,Ignite都会检查是否存在一个进行中的JTA事务。如果JTA事务开启了,Ignite也会开启一个事务然后通过自己的XAResource
内部实现将其加入JTA事务,Ignite事务将与相应的JTA事务一起准备、提交或回滚。
下面是一个在Ignite中使用JTA事务管理器的示例:
// Get an instance of JTA transaction manager.
TMService tms = appCtx.getComponent(TMService.class);
// Get an instance of Ignite cache.
IgniteCache<String, Integer> cache = cache();
UserTransaction jtaTx = tms.getUserTransaction();
// Start JTA transaction.
jtaTx.begin();
try {
// Do some cache operations.
cache.put("key1", 1);
cache.put("key2", 2);
// Commit the transaction.
jtaTx.commit();
}
finally {
// Rollback in a case of exception.
if (jtaTx.getStatus() == Status.STATUS_ACTIVE)
jtaTx.rollback();
}
# 8.2.并发模型和隔离级别
当原子化模式配置为TRANSACTIONAL
时,Ignite对事务支持乐观
和悲观
的并发模型。并发模型决定了何时获得一个条目级的事务锁-在访问数据时或者在prepare
阶段。锁定可以防止对一个对象的并发访问。比如,当试图用悲观锁更新一个ToDo列表项时,服务端会在该对象上置一个锁,在提交或者回滚该事务之前,其它的事务或者操作都无法更新该条目。不管在一个事务中使用那种并发模型,在提交之前都存在事务中的所有条目被锁定的时刻。
隔离级别定义了并发事务如何"看"以及处理针对同一个键的操作。Ignite支持读提交
、可重复读
、可序列化
隔离级别。
并发模型和隔离级别的所有组合都是可以同时使用的。下面是针对Ignite提供的每一个并发-隔离组合的行为和保证的描述。
# 8.2.1.悲观事务
在悲观
事务中,锁是在第一次读或者写访问期间获得(取决于隔离级别)然后被事务持有直到其被提交或者回滚。该模式中,锁首先在主节点获得然后在准备阶段提升至备份节点。下面的隔离级别可以配置为悲观
并发模型。
读提交
:数据被无锁地读取并且不会被事务本身缓存。如果缓存配置允许,数据是可能从一个备份节点中读取的。在这个隔离级别中,可以有所谓的非可重复读,因为当在自己的事务中读取数据两次时,一个并发事务可以改变该数据。锁只有在第一次写访问时才会获得(包括EntryProcessor
调用)。这意味着事务中已经读取的一个条目在该事务提交时可能有一个不同的值,这种情况是不会抛出异常的。可重复读
:获得条目锁以及第一次读或者写访问时从主节点获得数据,然后就存储在本地事务映射中。之后对同一数据的所有连续访问都是本地化的,并且返回最后一次读或者被更新的事务值。这意味着没有其它的并发事务可以改变锁定的数据,这样就获得了事务的可重复读。可序列化
:在悲观
模式中,这个隔离级别与可重复读
是一样的工作方式。
注意,在悲观
模式中,锁的顺序是很重要的。此外Ignite可以按照指定的顺序依次并且准确地获得锁。
性能考量
设想拓扑中有三个节点(A、B、C),并且在事务中针对键[1, 2, 3, 4, 5, 6]执行一个putAll
。假定这些键以如下形式映射到节点:{A: 1, 4}, {B: 2, 5}, {C: 3, 6},因为Ignite在悲观
模式中无法改变获得锁的顺序,它会产生6次连续地网络往返:[A, B, C, A, B, C]。在键的锁定顺序对于一个事务的语义不重要的情况下,将键按照分区进行分组然后将在一个分区的键一起锁定是明智的。这在一个大的事务中可以显著地降低网络消息的数量。在这个示例中,如果对于一个putAll
键按照如下的方式排序:[1, 4, 2, 5, 3, 6],之后只需要3次的连续网络访问。
拓扑变化约束
注意,如果至少获取了一个悲观事务锁,则在提交或回滚事务之前,将无法更改缓存拓扑。因此应该避免长时间持有事务锁。
# 8.2.2.乐观事务
在乐观事务中,条目锁是在二阶段提交的准备
阶段从主节点获得的,然后提升至备份节点,该锁在事务提交时被释放。如果回滚事务没有试图做提交,是不会获得锁的。下面的隔离级别可以与乐观
并发模型配置在一起。
读提交
:应该作用于缓存的改变是在源节点上收集的,然后事务提交后生效。事务数据无锁地读取并且不会在事务中缓存。如果缓存配置允许,该数据是可能从备份节点中读取的。在这个隔离级别中,可以有一个所谓的非可重复读,因为在自己的事务中读取数据两次时另一个事务可以修改数据。这个模式组合在第一次读或者写操作后如果条目值被修改是不会做校验的,并且不会抛出异常。可重复读
:这个隔离级别的事务的工作方式类似于乐观
的读提交
的事务,只有一个不同:读取值缓存于发起节点并且所有的后续读保证都是本地化的。这个模式组合在第一次读或者写操作后如果条目值被修改是不会做校验的,并且不会抛出异常。可序列化
:在第一次读访问之后会存储一个条目的版本,如果Ignite引擎检测到发起事务中的条目只要有一个被修改,Ignite就会在提交阶段放弃该事务,这是在提交阶段对网格内的事务中记载的条目的版本进行内部检查实现的。简而言之,这意味着Ignite如果在一个事务的提交阶段检测到一个冲突,就会放弃这个事务并且抛出TransactionOptimisticException
异常以及回滚已经做出的任何改变,开发者应该处理这个异常并且重试该事务。
// Re-try transaction finite amount of time.
int retryCount = 10;
int retries = 0;
// Start transaction in optimistic mode with serializable isolation level.
while (retries < retryCount) {
retries++;
try (Transaction tx =
ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC,
TransactionIsolation.SERIALIZABLE)) {
// Modify cache entries as part of this transaction.
....
// commit transaction.
tx.commit();
// Transaction succeeded. Leave the while loop.
break;
}
catch (TransactionOptimisticException e) {
// Transaction has failed. Retry.
}
}
这里另外一个需要注意的重要的点是,即使一个条目只是简单地读取(没有改变,cache.put(...)
),一个事务仍然可能失败,因为该条目的值对于发起事务中的逻辑很重要。
注意,对于读提交
和可重复读
事务,键的顺序是很重要的,因为这些模式中锁也是按顺序获得的。
乐观可序列化事务的重试
对于乐观可序列化事务的失败,重试是个好办法,因为事务试图更新的数据, 可能被并发地修改,因此,需要通过重试逻辑来处理TransactionOptimisticException
异常,只不过需要合理地限制重试的次数。
# 8.2.3.读一致性
为了在悲观模式下实现完全的读一致性,需要获取读锁。这意味着只有在悲观的可重复读(或可序列化)事务中,悲观模式下的读之间的完全一致性才能够实现。
当使用乐观事务时,通过禁止读之间的潜在冲突,可以实现完全的读一致性。此行为由乐观的可序列化模式提供。但是,请注意,在提交发生之前,用户仍然可以读取部分事务状态,因此事务逻辑必须对其进行保护。只有在提交阶段,在任何冲突的情况下,才会抛出TransactionOptimisticException
,这将允许开发者重试事务。
警告
如果没有使用悲观的可重复读或者可序列化事务,或者也没有使用乐观的序列化事务,那么就可以看到部分的事务状态,这意味着如果一个事务更新对象A和B,那么另一个事务可能看到A的新值和B的旧值。
# 8.3.多版本并发控制
# 8.3.1.概述
配置为TRANSACTIONAL_SNAPSHOT
原子化模式的缓存,支持SQL事务以及键-值事务,并且为两种类型的事务开启了多版本并发控制(MVCC)。
事务化SQL和MVCC的测试版
在Ignite的2.7版本中,事务化SQL和MVCC功能以测试版本形式发布,允许开发者试用并分享反馈,该功能不建议用在生产环境。
# 8.3.2.多版本并发控制
多版本并发控制,是一种在多用户并发访问时,控制数据一致性的方法,MVCC实现了快照隔离保证,确保每个事务总是看到一致的数据快照。
每个事务在开始时会获得一个数据的一致性快照,并且只能查看和修改该快照中的数据。当事务更新一个条目时,Ignite验证其它事务没有更新该条目,并创建该条目的一个新的版本。新版本只有在事务成功提交时才对其它事务可见。
快照不是物理快照,而是由MVCC协调器生成的逻辑快照,MVCC协调器是协调集群中的事务活动的集群节点。协调器跟踪所有活动的事务,并在每个事务完成时得到通知。启用MVCC的缓存的所有操作都需要从协调器请求一个数据的快照。
# 8.3.3.开启MVCC
要为缓存开启MVCC,需要在缓存配置中使用TRANSACTIONAL_SNAPSHOT
原子化模式,如果使用CREATE TABLE
命令创建表,则需要在该命令的WITH
子句中将原子化模式作为参数传进去。
警告
TRANSACTIONAL_SNAPSHOT
模式只支持默认的并发模型(悲观
)和默认的隔离级别(可重复读
),具体可以看上面的并发模型和隔离级别章节。
# 8.3.4.并发更新
在一个事务中,如果一个条目先被读取然后被更新,那么就有一种可能性,即另一个事务可能在两个操作之间切入然后首先更新该条目,这时,当第一个事务试图更新该条目时就会抛出异常,然后该事务会被标记为只能回滚
,这时开发者就需要进行事务重试。
那么怎么知道发生了冲突呢?
- 如果使用了Java的事务API,会抛出
CacheException
异常(异常信息为Cannot serialize transaction due to write conflict (transaction is marked for rollback)
),并且Transaction.rollbackOnly
标志为true
; - 如果通过JDBC/ODBC驱动执行了SQL事务,那么会得到
SQLSTATE:40001
错误代码。
# 8.3.5.限制
跨缓存事务
TRANSACTIONAL_SNAPSHOT
模式是缓存级的,并且事务中涉及的缓存不允许有不同的原子化模式,因此,如果希望在一个SQL事务中覆盖多个表,则必须使用TRANSACTIONAL_SNAPSHOT
模式创建所有表。
嵌套事务
通过JDBC/ODBC连接参数,Ignite支持三种模式来处理SQL的嵌套事务:
JDBC连接串:
jdbc:ignite:thin://127.0.0.1/?nestedTransactionsMode=COMMIT
当在一个事务中出现了嵌套事务,系统的行为依赖于nestedTransactionsMode
参数:
ERROR
:如果发生了嵌套事务,会抛出错误,包含事务会回滚,这是默认的行为;COMMIT
:包含事务会被提交,嵌套事务开始并且在遇到COMMIT
语句时会提交,包含事务中的其余部分会作为隐式事务执行;IGNORE
:不要使用这个模式,嵌套事务的开始会被忽略,嵌套事务中的语句会作为包含事务的一部分来执行,然后所有的变更会随着嵌套事务的提交而提交,包含事务中的其余部分会作为隐式事务执行。
持续查询
如果在开启了MVCC的缓存上使用持续查询,那么有些限制需要知道:
- 当接收到更新事件时,在MVCC协调器得知更新之前,后续读取已更新键的操作可能会在一段时间内返回旧值。这是因为更新事件是从更新键的节点发送的,并且是在键更新后立即发送的。这时MVCC协调器可能不会立即感知该更新,因此,随后的读取可能会在这段时间内返回过时的信息。
- 当使用持续查询时,每个节点单个事务可以更新的键数量是有限制的。更新后的值保存在内存中,如果更新太多,节点可能没有足够的内存来保存所有对象。为了避免内存溢出错误,每个事务在一个节点上只能最多更新20000个键(默认值)。如果超过此值,事务将抛出异常并回滚。可以通过指定
IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD
系统属性来修改该值。
其它的限制
开启MVCC的缓存,下面的特性是不支持的,这些限制后续的版本可能会解决:
- 近缓存;
- 过期策略;
- 事件;
- 缓存拦截器;
- 第三方持久化;
- 堆内缓存;
- 显式锁;
- localEvict()和localPeek()方法。
# 9.锁
缓存事务会隐式地获得锁,不过有些情况下显式锁是很有用的。IgniteCache
API的lock()
方法会返回一个java.util.concurrent.locks.Lock
的实例,它可以在任意给定的键上定义显式的分布式锁,也可以通过IgniteCache.lockAll()
方法给集合对象加锁。
IgniteCache<String, Integer> cache = ignite.cache("myCache");
// Create a lock for the given key
Lock lock = cache.lock("keyLock");
try {
// Acquire the lock
lock.lock();
cache.put("Hello", 11);
cache.put("World", 22);
}
finally {
// Release the lock
lock.unlock();
}
原子化模式
Ignite中,只有在TRANSACTIONAL
原子化模式中才支持锁,它可以通过CacheConfiguration
的atomicityMode
属性进行配置。
锁和事务
显式锁是非事务性的,不能在事务中使用(会抛出异常)。如果确实需要在事务中使用显式锁,那么需要使用事务的TransactionConcurrency.PESSIMISTIC
并发控制,它会为相关的缓存数据获得显式锁。
# 10.数据再平衡
# 10.1.概述
当一个新节点加入集群时,已有节点会放弃一部分缓存条目的所有权转交给新的节点,以使整个网格在任何时候都保持键的均等平衡。
如果新的节点成为一些分区的主节点或者备份节点,它会从该分区之前的主节点获取数据,或者从该分区的备份节点之一获取数据。一旦分区全部载入新的节点,旧节点就会被标记为过时然后该节点在所有当前的事务完成之后最终会被退出。因此,在一些很短的时间段,在拓扑发生变化之后,有一种情况是在缓存中对于一个键备份的数量可能比事先配置的多。不过一旦再平衡完成,额外的备份会被删除。
# 10.2.再平衡模式
下面的再平衡模式是在CacheRebalanceMode
枚举中定义的:
缓存再平衡模式 | 描述 |
---|---|
SYNC | 同步再平衡模式,直到所有必要的数据全部从其它有效节点加载完毕分布式缓存才会启动,这意味着所有对缓存的开放API的调用都会阻塞直到再平衡结束 |
ASYNC | 异步平衡模式,分布式缓存会马上启动,然后在后台会从其它节点加载所有必要的数据 |
NONE | 该模式下不会发生再平衡,这意味着要么在访问数据时从持久化存储载入,要么数据被显式地填充。 |
默认启用ASYNC
再平衡模式,要使用其它的再平衡模式,可以像下面这样设置CacheConfiguration
的rebalanceMode
属性:
# 10.3.再平衡线程池调节
IgniteConfiguration
提供了一个setRebalanceThreadPoolSize
方法,它可以从Ignite的系统线程池中获取一定数量的线程用于数据再平衡。每当一个节点需要向远程节点发送一批数据时,或者需要处理来自相反方向的一批数据时,都会从池中获取一个系统线程,这个远程节点既可能是一个分区的主节点,也可能是备份节点。在批次发送/接收完以及处理完之后,该线程就会被释放。
默认只会有一个线程用于再平衡。这基本上意味着在一个特定的时间点只有一个线程用于从一个节点到另一节点传输批量数据,或者处理来自远端的批量数据。举例来说,如果集群有两个节点和一个缓存,那么所有缓存的分区都会一个一个地按照顺序进行再平衡。如果集群有两个节点和两个不同的缓存,那么这些缓存会以并行的方式进行再平衡,但是在一个特定的时间点,就像上面解释的那样,只会处理属于某一个特定缓存的批量数据。
注意
每个缓存的分区数量不会影响再平衡的性能,有影响的是数据的总量,再平衡线程池大小以及下面章节列出的其它参数。
根据系统中缓存的数量以及缓存中存储的数据量,如果再平衡线程池的大小为1
,要将所有的数据再平衡至一个节点上,会花费很长的时间。要加快预加载的进程,可以根据需要增加IgniteConfiguration.setRebalanceThreadPoolSize
的值。
假定将IgniteConfiguration.setRebalanceThreadPoolSize
的值设为4然后考虑上述的示例,再平衡的过程大致如下:
- 如果集群有两个节点和一个缓存,那么缓存的分区逻辑上会分为4个不同的组,它们会以并行的方式进行处理,每个线程处理一个组,属于一个特定组的分区会按照顺序一个一个地进行再平衡。
- 如果集群有两个节点和两个不同的缓存,那么每个缓存的分区逻辑上都会分为四个不同的组(每个缓存都会有四个组,总共8个组),然后会有四个不同的线程并行地处理这些组,不过在一个特定的时间点,就像上面解释的那样,只有属于一个特定组(总共8个)的数据会被处理。
警告
在内部,系统线程池广泛用于和缓存有关的所有操作(put,get等),SQL引擎和其它模块,因此将IgniteConfiguration.setRebalanceThreadPoolSize
设置为一个很大的值会显著提高再平衡的性能,但是会影响应用的吞吐量。
# 10.4.再平衡消息限流
当再平衡器将数据从一个节点传输到另一个节点时,它会将整个数据集拆分为多个批次然后将每一个批次作为一个单独的消息进行发送。如果数据集很大那么就会有很多的消息要发送,CPU和网络就会过度的消耗,这时在再平衡消息之间进行等待是合理的,以使由于再平衡过程导致的性能下降冲击最小化。这个时间间隔由IgniteConfiguration
的rebalanceThrottle
属性控制,它的默认值是0,意味着在消息之间没有暂停,注意单个消息的大小也可以通过rebalanceBatchSize
属性进行设置(默认值是512K)。
比如,如果希望再平衡器间隔100ms每个消息发送2MB数据,需要提供如下的配置:
# 10.5.配置
缓存的再平衡行为可以通过下面的配置属性进行配置:
CacheConfiguration
setter方法 | 描述 | 默认值 |
---|---|---|
setRebalanceMode | 分布式缓存的再平衡模式,具体请参见再平衡模式章节 | ASYNC |
setRebalanceDelay | 当节点加入或者离开(故障)集群时,再平衡应该自动启动的延迟时间(毫秒),如果打算在节点离开拓扑后重启节点,或者打算在同时/一个个启动多个节点的过程中,所有节点都启动完成之前不进行重新分区或者再平衡,也可以推迟。 | 0,无延迟 |
setRebalanceOrder | 完成再平衡的顺序,只有SYNC 和ASYNC 再平衡模式的缓存才可以将再平衡顺序设置为非0值,具有更小值的缓存再平衡会被首先完成,再平衡默认是无序的 | 0 |
IgniteConfiguration:
setter方法 | 描述 | 默认值 |
---|---|---|
setRebalanceThreadPoolSize | 用于再平衡的线程最大值 | 1,对整个集群的操作影响最小 |
setRebalanceThrottle | 可以看上面的再平衡消息限流章节。 | 0,无间隔 |
setRebalanceBatchSize | 单个再平衡消息的大小(字节),再平衡算法会在发送数据之前将每个节点的整个数据集拆分成多个批次。 | 512K |
setRebalanceBatchesPrefetchCount | 为了达到更好的性能,数据提供者节点会在再平衡开始时提供不止一个批次然后在下一个请求时提供一个新的批次。这个方法会设置再平衡开始时数据提供者节点产生的批次的数量 | 2 |
setRebalanceTimeout | 节点间正在交换的等待再平衡消息的超时时间 | 10秒 |
# 11.拓扑验证
拓扑验证器用于验证集群拓扑对于未来的缓存操作是否有效。
拓扑验证器在每次集群拓扑发生变化时都会被调用(或者新节点加入或者已有节点故障或者其它的)。如果没有配置拓扑验证器,那么集群拓扑会被认为一直有效。
当TopologyValidator.validate(Collection)
方法返回true时,那么对于特定的缓存以及在这个缓存上的所有有效操作拓扑都会被认为是有效的,否则,该缓存上的所有更新操作都会抛出如下异常:
CacheException
:所有试图更新的操作都会抛出(put,remove等)IgniteException
:试图进行事务提交的操作会抛出
返回false以及声明拓扑无效后,当下一次拓扑发生变化时拓扑验证器可以返回正常状态。
示例:
...
for (CacheConfiguration cCfg : iCfg.getCacheConfiguration()) {
if (cCfg.getName() != null) {
if (cCfg.getName().equals(CACHE_NAME_1))
cCfg.setTopologyValidator(new TopologyValidator() {
@Override public boolean validate(Collection<ClusterNode> nodes) {
return nodes.size() == 2;
}
});
else if (cCfg.getName().equals(CACHE_NAME_2))
cCfg.setTopologyValidator(new TopologyValidator() {
@Override public boolean validate(Collection<ClusterNode> nodes) {
return nodes.size() >= 2;
}
});
}
}
...
在这个例子中,对缓存允许更新操作情况如下:
CACHE_NAME_1
:集群具有两个节点时;CACHE_NAME_2
:集群至少有两个节点时;
配置
拓扑验证器通过CacheConfiguration.setTopologyValidator(TopologyValidator)
方法既可以用代码也可以用XML进行配置。
# 12.读修复
注意
这是一个实验性API!
读修复
是指在正常读取操作期间修复主备数据之间不一致的技术。当用户操作读取了一个或多个特定键时,Ignite会检查给定键在所有备份副本中的值。
注意
读修复模式旨在保持一致性。不过由于检查了备份副本,因此读操作的成本增加了约2倍。通常不建议一直使用此模式,而应一次性使用。
要启用读修复模式,需要获取一个开启了读修复的缓存实例,如下所示:
IgniteCache<Object, Object> cache = ignite.cache("my_cache").withReadRepair();
Object value = cache.get(10);
警告
一致性检查与下面的缓存配置不兼容:
- 没有备份的缓存;
- 本地缓存;
- 近缓存;
- 开启通读的缓存。
# 12.1.事务化缓存
拓扑中的值将替换为最新版本的值。
- 对于配置为
TransactionConcurrency.OPTIMISTIC
并发模型或TransactionIsolation.READ_COMMITTED
隔离级别的事务自动处理; - 对于配置为
TransactionConcurrency.PESSIMISTIC
并发模型和TransactionIsolation.READ_COMMITTED
隔离级别的事务,在commit()
阶段自动处理;
当检测到备份不一致时,Ignite将生成一个违反一致性事件(如果在配置中启用了该事件),通过监听此事件可以获取有关不一致问题的通知。关于如果进行事件监听,请参见本地和远程事件的介绍。
警告
如果事务中已经缓存了值,则读修复不能保证检查所有副本
。
例如,如果使用非TransactionIsolation.READ_COMMITTED
隔离级别,并且已经读取了该值或执行了写入操作,则将获得缓存的值。
# 12.2.原子化缓存
如果发现差异,则抛出违反一致性异常。
注意
由于原子化缓存的性质,可以观察到假阳性结果。比如在缓存加载中尝试检查一致性可能会触发违反一致性异常。读修复的实现会尝试检查给定键三次,尝试次数可以通过IGNITE_NEAR_GET_MAX_REMAPS
系统属性来修改。
警告
注意不会为原子缓存记录违反一致性事件。
18624049226