# Ignite持续查询

持续查询可以监控缓存中数据的变化,启动后就会收到符合查询条件的数据变化的通知。

所有更新事件都会精确一次传播给查询中注册的本地监听器。

还可以指定一个远程过滤器,以缩小监听更新的条目范围。

# 1.本地监听器

当缓存发生更新(数据的插入、更新和删除)后,一个事件就会发给持续查询的本地监听器,之后应用就可以做出相应的反应,本地监听器是在发起查询的节点上执行的。

注意,如果启动时没有本地监听器,持续查询会抛出异常。

    # 2.初始查询

    当要执行持续查询时,在将持续查询注册在集群中以及开始接收更新之前,可以选择指定一个初始化查询。其可以通过ContinuousQuery.setInitialQuery(Query)方法进行设置。

    和扫描查询一样,持续查询通过query()方法执行然后返回一个游标。设置初始查询后,可以使用该游标迭代初始查询的结果。

      # 3.远程过滤器

      过滤器会计算每条更新,然后评估该更新是否需要传播给该查询的本地监听器。如果过滤器返回true,那么本地监听器会收到更新的通知。

      出于冗余原因,将对数据的主备版本(如果配置了备份)都执行过滤器。因此可以将远程过滤器用作更新事件的远程监听器。

        提示

        为了使用远程过滤器,要确保过滤器的类定义在服务端节点可用,这有两个途径:

        • 将类文件加入每个服务端节点的类路径中;
        • 开启对等类加载

        # 4.远程转换器

        持续查询默认会将整个更新后的对象发送给应用端的监听器,这会导致网络的过度使用,如果传输的对象很大,更是如此。另外,应用通常更希望得到更新对象的字段的子集,而不是整个对象。

        为了解决这个问题,可以使用带有转换器的持续查询,转换器是一个在远程节点上针对每个更新执行的函数,然后只会返回转换的结果。

        IgniteCache<Integer, Person> cache = ignite.getOrCreateCache("myCache");
        
        // Create a new continuous query with a transformer.
        ContinuousQueryWithTransformer<Integer, Person, String> qry = new ContinuousQueryWithTransformer<>();
        
        // Factory to create transformers.
        Factory factory = FactoryBuilder.factoryOf(
            // Return one field of a complex object.
            // Only this field will be sent over to the local listener.
            (IgniteClosure<CacheEntryEvent, String>)
                event -> ((Person)event.getValue()).getName()
        );
        
        qry.setRemoteTransformerFactory(factory);
        
        // Listener that will receive transformed data.
        qry.setLocalListener(names -> {
            for (String name : names)
                System.out.println("New person name: " + name);
        });
        

        提示

        为了使用转换器,要确保转换器的类定义在服务端节点可用,这有两个途径:

        • 将类文件加入每个服务端节点的类路径中;
        • 开启对等类加载

        # 5.事件传递保证

        持续查询的实现保证一个事件只会传递给客户端的本地监听器一次的精确一次语义。

        主备节点都会维护一个更新队列,该队列持有事件,这些事件由服务端的持续查询处理,但尚未传递给客户端。假设主节点故障或集群拓扑因故发生变更,每个备份节点都会将其更新队列的内容刷新到客户端,以确保将每个事件都传递到客户端的本地监听器。

        Ignite管理一个特殊的分区级更新计数器,该计数器会避免重复的通知。一旦某个分区中的条目更新,该分区的计数器将在主备节点上都递增。该计数器的值也与事件通知一起发送到客户端。因此,客户端可以跳过已经处理的事件。客户端确认收到事件后,主节点和备份节点将从其备份队列中删除此事件的记录。

        # 6.示例

        下面的示例演示了持续查询的典型使用:

        18624049226

        最后更新时间:: 11/25/2023, 3:51:28 PM