# 键-值API

# 1.基本缓存操作

# 1.1.获取缓存的实例

在缓存上的所有操作都是通过IgniteCache实例进行的,也可以在已有的缓存上拿到IgniteCache,也可以动态创建。

    # 1.2.动态创建缓存

    动态创建缓存方式如下:

      关于缓存的配置参数,请参见缓存配置章节的内容。

      在基线拓扑变更过程中调用创建缓存的方法,会抛出org.apache.ignite.IgniteCheckedException异常:

      javax.cache.CacheException: class org.apache.ignite.IgniteCheckedException: Failed to start/stop cache, cluster state change is in progress.
              at org.apache.ignite.internal.processors.cache.GridCacheUtils.convertToCacheException(GridCacheUtils.java:1323)
              at org.apache.ignite.internal.IgniteKernal.createCache(IgniteKernal.java:3001)
              at org.apache.ignite.internal.processors.platform.client.cache.ClientCacheCreateWithNameRequest.process(ClientCacheCreateWithNameRequest.java:48)
              at org.apache.ignite.internal.processors.platform.client.ClientRequestHandler.handle(ClientRequestHandler.java:51)
              at org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.onMessage(ClientListenerNioListener.java:173)
              at org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.onMessage(ClientListenerNioListener.java:47)
              at org.apache.ignite.internal.util.nio.GridNioFilterChain$TailFilter.onMessageReceived(GridNioFilterChain.java:278)
              at org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:108)
              at org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter$3.body(GridNioAsyncNotifyFilter.java:96)
              at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:119)
      
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:834)
      

      如果拿到这个异常,可以进行重试。

      # 1.3.销毁缓存

      要在整个集群中删除一个缓存,需要调用destroy()方法:

        # 1.4.原子化操作

        拿到缓存实例后,就可以对其进行读写操作:

          提示

          putAll()putAll()这样的批量操作方法,是以原子化的模式按顺序执行,可能部分失败。发生这种情况时,会抛出包含了更新失败数据列表的CachePartialUpdateException异常。 如果希望在一个操作中更新条目的集合,建议考虑使用事务

          下面是更多基本缓存操作的示例:

            # 1.5.异步执行

            大多数缓存操作方法都有对应的异步执行模式,方法名带有Async后缀。

              异步操作会返回一个代表操作结果的对象,可以以阻塞或非阻塞的方式,等待操作的完成。

              以非阻塞的方式等待结果,可以使用IgniteFuture.listen()IgniteFuture.chain()方法注册一个回调,其会在操作完成后被调用。

                回调执行和线程池

                如果在将回调传递给IgniteFuture.listen()IgniteFuture.chain()方法时已完成异步操作,则该回调由调用线程同步执行。否则当操作完成时,回调将异步执行。

                异步计算操作的回调由Ignite公共线程池中的线程调用。从回调内部调用同步缓存和计算操作可能会由于线程池不足而导致死锁。要实现异步计算操作的嵌套执行,可以利用自定义线程池

                除非使用IgniteConfiguration#asyncContinuationExecutor配置了不同的执行器,异步缓存操作的回调由ForkJoinPool#commonPool调用。

                • 默认的执行器对于回调中的任意操作都是安全的;
                • 在Ignite 2.11版本中,默认的行为已经改变。在此之前,异步缓存操作的回调是由Ignite的系统线程池(所谓的"平行线程池")调用的;
                • 要恢复之前的行为,可以使用IgniteConfiguration.setAsyncContinuationExecutor(Runnable::run)
                  • 以前的行为可以提供小的性能改进,因为回调是在没有任何间接或调度的情况下执行的。
                  • UNSAFE:系统线程执行回调时缓存操作无法继续,如果从回调中调用其他缓存操作,则可能发生死锁。

                # 2.使用二进制对象

                # 2.1.概述

                在Ignite中,数据以二进制格式存储,然后在每次读取时再反序列化为对象,不过可以直接操作二进制对象避免反序列化。

                二进制对象是缓存数据的二进制表示的包装器,每个二进制对象都有field(name)方法(返回对应字段的值)和type()方法(提取对象的类型信息)。当只需要处理对象的部分字段而不需要反序列化整个对象时,二进制对象会很有用。

                处理二进制对象时不需要具体的类定义,不重启集群就可以动态修改对象的结构

                在所有支持的平台上,二进制对象格式都是统一的,包括Java、.NET和C++。可以启动一个Java版Ignite集群,然后使用.NET和C++客户端接入集群,然后在这些客户端上使用二进制对象而不需要持有类定义。

                限制

                1. 在内部二进制对象的类型和字段以ID来标识,该ID由对应字符串名字的哈希值计算得出,这意味着属性或者类型不能有同样的名字哈希,因此不允许使用具有相同名字哈希的字段或类型。但是,可以通过配置提供自定义的ID生成实现
                2. 同样的原因,BinaryObject格式在类的不同层次上也不允许有同样的属性名;
                3. 如果类实现了Externalizable接口,Ignite会使用OptimizedMarshallerOptimizedMarshaller会使用writeExternal()readExternal()来进行类对象的序列化和反序列化,这需要将实现Externalizable的类加入服务端节点的类路径中。

                # 2.2.启用缓存的二进制模式

                当从缓存中拿数据时,默认返回的是反序列化格式,要处理二进制格式,需要使用withKeepBinary()方法拿到缓存的实例,这个实例会尽可能返回二进制格式的对象。

                  注意并不是所有的对象都会转为二进制对象格式,下面的类不会进行转换(即toBinary(Object)方法返回原始对象,以及这些类的实例存储不会发生变化):

                  • 所有的基本类型(byteint等)及其包装类(ByteInteger等);
                  • 基本类型的数组(byte[]int[]等);
                  • String及其数组;
                  • UUID及其数组;
                  • Date及其数组;
                  • Timestamp及其数组;
                  • Enum及其数组;
                  • 对象的映射、数组和集合(但如果它们是可以转成二进制的,则内部对象将被重新转换)。

                  # 2.3.创建和修改二进制对象

                  二进制对象实例是不可变的,要更新字段或者创建新的二进制对象,需要使用二进制对象的建造器工具类,其可以在没有对象的类定义的前提下,修改二进制对象的字段。

                  限制

                  • 无法修改已有字段的类型;
                  • 无法变更枚举值的顺序,也无法在枚举值列表的开始或者中部添加新的常量,但是可以在列表的末尾添加新的常量。

                  二进制对象建造器实例获取方式如下:

                    通过这个方式创建的建造器没有任何字段,调用setField(…​)方法可以添加字段:

                    通过调用toBuilder()方法,也可以从一个已有的二进制对象上获得建造器实例,这时该二进制对象的所有字段都会复制到该建造器中。

                    在下面的示例中,会在服务端通过EntryProcessor机制更新一个对象,而不需要在该节点部署该对象类定义,也不需要完整对象的反序列化。

                      # 2.4.二进制类型和二进制字段

                      二进制对象持有其表示的对象的类型信息,类型信息包括字段名、字段类型和关联字段名。

                      每个字段的类型通过一个BinaryField对象来表示,获得BinaryField对象后,如果需要从集合中的每个对象读取相同的字段,则可以多次重用该对象。重用BinaryField对象比直接从每个二进制对象读取字段值要快,下面是使用二进制字段的示例:

                      Collection<BinaryObject> persons = getPersons();
                      
                      BinaryField salary = null;
                      double total = 0;
                      int count = 0;
                      
                      for (BinaryObject person : persons) {
                          if (salary == null) {
                              salary = person.type().field("salary");
                          }
                      
                          total += (float) salary.value(person);
                          count++;
                      }
                      
                      double avg = total / count;
                      

                      # 2.5.二进制对象的调整建议

                      Ignite为给定类型的每个二进制对象保留一个模式,该模式指定对象中的字段及其顺序和类型。模式在整个集群中复制,具有相同字段但顺序不同的二进制对象被认为具有不同的模式,因此建议以相同的顺序往二进制对象中添加字段。

                      空字段通常需要5个字节来存储,字段ID4个字节,字段长度1个字节。在内存方面,最好不要包含字段,也不要包含空字段。但是,如果不包括字段,则Ignite会为此对象创建一个新模式,该模式与包含该字段的对象的模式不同。如果有多个字段以随机组合设置为null,那么Ignite会为每种组合维护一个不同的二进制对象模式,这样Java堆可能会被二进制对象模式耗尽。最好为二进制对象提供几个模式,并以相同的顺序设置相同类型的相同字段集。通过提供相同的字段集(即使具有空值)来创建二进制对象时,选择其中一个,这也是需要为空字段提供字段类型的原因。

                      如果有一个子集的字段是可选的,但要么全部不存在,要么全部存在,那么也可以嵌套二进制对象,可以将它们放在单独的二进制对象中,该对象存储在父对象的字段下,或者设置为null。

                      如果有大量字段,这些字段在任何组合中都是可选的,并且通常为空,则可以将其存储在映射字段中,值对象中将有几个固定字段,还有一个映射用于其他属性。

                      # 2.6.配置二进制对象

                      在绝大多数场景中,无需配置二进制对象。但是如果需要更改类型和字段ID的生成或插入自定义序列化器,则可以通过配置来实现。

                      二进制对象的类型和字段由其ID标识,该ID由相对应的字符串名计算为哈希值,并将其存储在每个二进制对象中,可以在配置中定义自己的ID生成实现。

                      名字到ID的转换分为两个步骤。首先,由名字映射器转换类型名(类名)或字段名,然后由ID映射器计算ID。可以指定全局名字映射器,全局ID映射器和全局二进制序列化器,以及每个类型的映射器和序列化器。每个类型的配置均支持通配符,这时所提供的配置将应用于与类型名字模板匹配的所有类型。

                        # 3.使用缓存查询

                        # 3.1.概述

                        IgniteCache有几个查询方法,他们会接收Query类的子类,然后返回一个QueryCursor。可用的查询类型包括:ScanQueryIndexQueryTextQuery

                        Query表示在缓存上执行的分页查询的抽象,页面大小通过Query.setPageSize(…​)进行配置,默认值为1024

                        QueryCursor表示结果集,可以透明地按页迭代。当用户迭代到页尾时,QueryCursor会自动在后台请求下一页。对于不需要分页的场景,可以使用QueryCursor.getAll()方法,其会拿到所有的数据,并将其存储在一个集合中。

                        关闭游标

                        调用QueryCursor.getAll()方法时,游标会自动关闭。如果在循环中迭代游标,或者显式拿到Iterator,必须手动关闭游标,或者使用try-with-resources语句。

                        # 3.2.执行扫描查询

                        扫描查询是以分布式的方式从缓存中获取数据的简单搜索查询,如果执行时没有参数,扫描查询会从缓存中获取所有数据。

                          如果指定了谓语,扫描查询会返回匹配谓语的数据,谓语应用于远端节点:

                            扫描查询还支持可选的转换器闭包,可以在数据返回之前在服务端转换数据,比如,当只想从大对象中获取少量字段,以最小化网络传输时,这个功能就很有用。下面的示例显示如何只返回键,而不返回值:

                            IgniteCache<Integer, Person> cache = ignite.getOrCreateCache("myCache");
                            
                            // Get only keys for persons earning more than 1,000.
                            List<Integer> keys = cache.query(new ScanQuery<>(
                                    // Remote filter
                                    (IgniteBiPredicate<Integer, Person>) (k, p) -> p.getSalary() > 1000),
                                    // Transformer
                                    (IgniteClosure<Cache.Entry<Integer, Person>, Integer>) Cache.Entry::getKey).getAll();
                            

                            # 3.3.本地扫描查询

                            扫描查询默认是分布到所有节点上的,不过也可以只在本地执行查询,这时查询只会处理本地节点(查询执行的节点)上存储的数据。

                              # 3.4.执行索引查询

                              警告

                              实验性API,2.12版本引入,只支持Java API。

                              索引查询处理分布式索引并检索与指定查询匹配的缓存条目。QueryCursor按索引定义的顺序返回结果。IndexQuery适用于少量数据符合过滤条件的场景,这时ScanQuery不是最佳选择:它首先提取所有缓存条目,然后对它们应用过滤器。IndexQuery依赖索引树结构,过滤掉大部分条目,不提取所有的数据。

                              // Create index by 2 fields (orgId, salary).
                              LinkedHashMap<String,String> fields = new LinkedHashMap<>();
                                  fields.put("orgId", Integer.class.getName());
                                  fields.put("salary", Integer.class.getName());
                              
                              QueryEntity personEntity = new QueryEntity(Integer.class, Person.class)
                                  .setFields(fields)
                                  .setIndexes(Collections.singletonList(
                                      new QueryIndex(Arrays.asList("orgId", "salary"), QueryIndexType.SORTED)
                                          .setName("ORG_SALARY_IDX")
                                  ));
                              
                              CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>("entityCache")
                                  .setQueryEntities(Collections.singletonList(personEntity));
                              
                              IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(ccfg);
                              
                              // Find the persons who work in Organization 1.
                              QueryCursor<Cache.Entry<Integer, Person>> cursor = cache.query(
                                  new IndexQuery<Integer, Person>(Person.class, "ORG_SALARY_IDX")
                                      .setCriteria(eq("orgId", 1))
                              );
                              

                              索引查询条件在IndexQueryCriteriaBuilder中定义,配置条件的目的是建立一个有效的范围来遍历索引树。因此条件字段必须匹配指定的索引。例如如果有一个用(A, B)定义的索引,则有效的条件集是(A)和(A, B),单(B)字段的条件是无效的,因为字段(B)不是指定索引字段的前缀集,不可能用它构建一个狭窄的索引范围。

                              提示

                              条件由AND运算符连接。也可以对同一字段使用多个条件。

                              // Find the persons who work in Organization 1 and have salary more than 1,000.
                              QueryCursor<Cache.Entry<Integer, Person>> cursor = cache.query(
                                  new IndexQuery<Integer, Person>(Person.class, "ORG_SALARY_IDX")
                                      .setCriteria(eq("orgId", 1), gt("salary", 1000))
                              );
                              

                              索引名是一个可选的参数,这时Ignite会尝试使用指定的条件字段自行计算索引。

                              // Ignite finds suitable index "ORG_SALARY_IDX" by specified criterion field "orgId".
                              QueryCursor<Cache.Entry<Integer, Person>> cursor = cache.query(
                                  new IndexQuery<Integer, Person>(Person.class)
                                      .setCriteria(eq("orgId", 1))
                              );
                              

                              如果未指定查询条件,则执行指定索引的完整扫描。如果也未指定索引名,则使用主键索引。

                              # 3.4.1.附加过滤

                              IndexQuery还支持可选的谓词,与ScanQuery相同。它适用于在过滤器与索引树范围不匹配的情况下进行额外的缓存条目过滤。例如包含一些逻辑、OR操作或字段不属于索引。

                              // Find the persons who work in Organization 1 and whose name contains 'Vasya'.
                              QueryCursor<Cache.Entry<Integer, Person>> cursor = cache.query(
                                  new IndexQuery<Integer, Person>(Person.class)
                                      .setCriteria(eq("orgId", 1))
                                      .setFilter((k, v) -> v.getName().contains("Vasya"))
                              );
                              

                              # 3.5.相关主题

                              # 4.读修复

                              警告

                              这是个试验性API。

                              限制

                              一致性检查与下面的缓存配置不兼容:

                              • 没有副本的缓存;
                              • 近缓存;
                              • 使用通读模式的缓存。

                              读修复是指在正常读取操作期间修复主备数据之间不一致的技术。当用户操作读取了某个或某些键时,Ignite会检查给定键在所有备份副本中的值。

                              读修复模式旨在保持一致性。不过由于检查了备份副本,因此读操作的成本增加了约2倍。通常不建议一直使用此模式,而应一次性使用。

                              要启用读修复模式,需要获取一个开启了读修复的缓存实例,如下所示:

                              IgniteCache<Object, Object> cache =
                                  ignite.cache("my_cache").withReadRepair(ReadRepairStrategy.CHECK_ONLY);
                              
                              Object value = cache.get(42);
                              

                              # 4.1.策略

                              如果发现一致性冲突,拓扑中的值将根据所选策略替换为修复的值:

                              策略 描述
                              LWW 以最后写入(最新的条目)为准,当无法修复(无法检测最新的条目)时可能抛出IgniteException,主要包括:
                              同一主键找到了空值和非空值,空值(缺失条目)没有版本,所以它无法与版本化的条目比较;
                              条目版本号相同,但是有不同的值。
                              PRIMARY 以主分区的值为准
                              RELATIVE_MAJORITY 相对多数,以检测到更多次的值为准。适用于副本数为偶数(也是Ignite的典型场景),而不是绝对多数。当无法为某值找到比其他值更多次数时,可能会抛出IgniteException,比如给定5个副本(4个备份),假定值A找到2次,而XYZ只找到1次,则以A为准,但是如果A找到2次,而BX分别找到1次,则该策略就不知道以谁为准。
                              REMOVE 不一致的条目会被删除。
                              CHECK_ONLY 只执行检查。

                              # 4.2.事件

                              对于每个冲突,会记录违反一致性事件(如果在配置中启用了该事件),然后可以监听该事件获取冲突的通知。

                              关于如何监听事件,请参见处理事件的相关章节。

                              # 4.3.事务化缓存

                              值按照如下规则进行修复:

                              • 对于配置为TransactionConcurrency.OPTIMISTIC并发模型或TransactionIsolation.READ_COMMITTED隔离级别的事务自动处理;
                              • 对于配置为TransactionConcurrency.PESSIMISTIC并发模型和TransactionIsolation.READ_COMMITTED隔离级别的事务,在commit()阶段自动处理;

                              限制

                              如果事务中已经缓存了值,则读修复不能保证检查所有副本

                              例如,如果使用非TransactionIsolation.READ_COMMITTED隔离级别,并且已经读取了该值或执行了写入操作,则将获得缓存的值。

                              # 4.4.原子化缓存

                              会被自动修复。

                              限制

                              由于原子化缓存的性质,可以观察到假阳性结果。比如在缓存加载中尝试检查一致性可能会触发违反一致性异常。读修复的实现默认会尝试检查给定键3次,尝试次数可以通过IGNITE_NEAR_GET_MAX_REMAPS系统属性来修改。

                              18624049226

                              最后更新时间:: 11/25/2023, 3:51:28 PM