# 分布式计算
# 1.分布式计算API
Ignite提供了一套API,用于在集群范围以容错和负载平衡的方式执行分布式计算。可以提交单个任务执行,也可以通过自动任务拆分实施MapReduce模式的并行计算,该API可以进行细粒度的控制。
# 1.1.获取计算接口实例
执行分布式计算的主要入口是计算接口,该接口可通过Ignite
实例获得:
该接口提供了在集群范围分发不同类型计算任务然后执行并置计算的方法。
# 1.2.指定计算节点集
每个计算接口的实例都是与一组执行任务的节点集相关联的。如果没有参数,ignite.compute()
返回的计算接口是与所有的服务端节点关联的,要获得与某节点集关联的实例,需要使用Ignite.compute(ClusterGroup group)
。在下面的示例中,计算接口只绑定到远程节点,即除了运行本代码的所有节点。
# 1.3.执行任务
Ignite提供了3个接口,用于实现具体的任务并通过计算接口执行。
IgniteRunnable
:一个java.lang.Runnable
的扩展,可用于实现没有输入参数和返回值的计算;IgniteCallable
:一个java.util.concurrent.Callable
的扩展,会有一个返回值;IgniteClosure
:一个函数式接口,可以接受一个参数,并且有返回值。
一个任务可以执行一次(在某个节点上),或者广播到所有的节点。
# 1.3.1.执行Runnable任务
要执行Runnable任务,需要使用计算接口的run(…)
方法,这时该任务会被发送到与计算接口关联的某个节点上。
# 1.3.2.执行Callable任务
要执行Callable任务,需要使用计算接口的call(…)
方法。
# 1.3.3.执行IgniteClosure任务
执行IgniteClosure
,需要调用计算接口的apply(…)
方法。该方法接受一个任务及其输入参数,该参数会在执行时传给IgniteClosure
。
# 1.3.4.执行广播任务
broadcast()
方法会在与计算实例相关联的所有节点上执行任务。
# 1.3.5.异步执行
前述所有方法都有对应的异步实现。
callAsync(…)
;runAsync(…)
;applyAsync(…)
;broadcastAsync(…)
。
异步方法会返回一个表示执行结果的IgniteFuture
,在下面的示例中,会异步执行一组Callable任务:
# 1.4.任务执行超时
任务执行可以配置一个超时时间,如果任务未在指定时间段内完成,该任务会被停止,并且由该任务生成的作业也都会被取消。
要执行带有超时限制的任务,需要使用计算接口的withTimeout(…)
方法。该方法会返回一个计算接口,然后以给定的时限执行第一个任务,后续的任务并没有超时限制,需要为每一个需要超时限制的任务调用withTimeout(…)
方法。
IgniteCompute compute = ignite.compute();
compute.withTimeout(300_000).run(() -> {
// your computation
// ...
});
# 1.5.本地节点内作业共享状态
通常来说在一个节点内的不同的计算作业之间共享状态是很有用的,为此Ignite在每个节点上提供了一个共享节点局部变量。
IgniteCluster cluster = ignite.cluster();
ConcurrentMap<String, Integer> nodeLocalMap = cluster.nodeLocalMap();
节点局部变量类似于非分布式的线程局部变量,它只会保持在本地节点上。节点局部变量可以用于计算任务在不同的执行中共享状态,也可以用于部署的服务。
在下面的示例中,作业每次在某个节点上执行时,都会在本地节点上增加一个计数器。结果是每个节点上的本地节点计数器会显示该作业在该节点上执行了多少次。
IgniteCallable<Long> job = new IgniteCallable<Long>() {
@IgniteInstanceResource
private Ignite ignite;
@Override
public Long call() {
// Get a reference to node local.
ConcurrentMap<String, AtomicLong> nodeLocalMap = ignite.cluster().nodeLocalMap();
AtomicLong cntr = nodeLocalMap.get("counter");
if (cntr == null) {
AtomicLong old = nodeLocalMap.putIfAbsent("counter", cntr = new AtomicLong());
if (old != null)
cntr = old;
}
return cntr.incrementAndGet();
}
};
# 1.6.从计算任务访问数据
如果计算任务需要访问Ignite中存储的数据,那么可以通过Ignite
实例实现:
注意,上面的示例可能不是最有效的方法。原因是与键1
相对应的Person
对象可能不在当前任务执行的节点上。这时对象是通过网络获取的,通过将计算与数据并置可以避免这种情况。
注意
如果要在IgniteCallable
和IgniteRunnable
任务中使用键和值对象,那么要确保相关的类定义部署在所有的节点上。
# 2.集群组
ClusterGroup
表示集群内节点的一个逻辑组。当希望把某操作限定在一个节点子集中时(而不是整个集群),可以在Ignite的许多API中使用该接口。例如希望仅在远程节点上部署服务,或者仅在具有某属性的节点集上执行作业等。
注意
注意IgniteCluster
接口也是一个集群组,只不过包括集群内的所有节点。
可以限制作业执行、服务部署、消息、事件以及其它任务只在部分集群组内执行,比如,下面的示例只把作业广播到远程节点(除了本地节点):
为了方便,Ignite也有一些预定义的集群组:
# 3.ExecutorService
Ignite提供了一个java.util.concurrent.ExecutorService
接口的分布式实现,该实现将任务提交到集群的服务端节点执行。任务在整个集群中负载平衡,只要集群中至少有一个节点,就可以保证任务得到执行。
ExecutorService
可以通过Ignite
实例获得:
// Get cluster-enabled executor service.
ExecutorService exec = ignite.executorService();
// Iterate through all words in the sentence and create jobs.
for (final String word : "Print words using runnable".split(" ")) {
// Execute runnable on some node.
exec.submit(new IgniteRunnable() {
@Override
public void run() {
System.out.println(">>> Printing '" + word + "' on this node from grid job.");
}
});
}
也可以限制作业在一个集群组中执行:
// A group for nodes where the attribute 'worker' is defined.
ClusterGroup workerGrp = ignite.cluster().forAttribute("ROLE", "worker");
// Get an executor service for the cluster group.
ExecutorService exec = ignite.executorService(workerGrp);
# 4.MapReduce API
# 4.1.概述
Ignite提供了用于执行简化的MapReduce操作的API。MapReduce范式基于以下假设:要执行的任务可以被拆分为多个作业(映射阶段),并分别执行每个作业,然后每个作业的结果汇总到最终结果中(汇总阶段)。
Ignite中,作业是根据预配置的负载平衡策略在节点间分配的,结果会被汇总在提交任务的节点上。
MapReduce范式由ComputeTask
接口提供。
提示
ComputeTask
仅在需要对作业到节点的映射或自定义故障转移逻辑进行细粒度控制时使用,对于其他情况,都建议使用简单的闭包。
# 4.2.ComputeTask接口
ComputeTask
接口提供了一种实现自定义映射和汇总逻辑的方法,该接口有3个方法:map(…)
、result()
和reduce()
。
map()
用于根据输入参数创建计算作业并将其映射到工作节点。该方法参数为要在其上运行任务的集群节点集合以及任务的输入参数。该方法会返回一个映射,其中作业为键,映射的工作节点为值,然后将作业发送到映射的节点并在其中执行。
result()
会在完成每个作业后调用,并返回一个ComputeJobResultPolicy
指示如何继续执行任务的实例。该方法参数为作业的结果以及到目前为止接收到的所有作业结果的列表,该方法可能返回以下值之一:
WAIT
:等待所有剩余工作完成(如果有);REDUCE
:立即进入汇总阶段,丢弃所有剩余的作业和尚未收到的结果;FAILOVER
:将作业故障转移到另一个节点(请参见容错章节的介绍)。
当所有作业都已完成(或result()
方法中某个作业返回REDUCE
策略)时,在汇总阶段中会调用reduce()
方法。该方法参数为具有所有完成结果的列表,并返回计算的最终结果。
# 4.3.执行计算任务
要执行计算任务,需调用IgniteCompute.execute(…)
方法,并将输入参数作为最后一个参数传入。
通过使用集群组,可以将作业的执行限制在节点的子集上。
# 4.4.处理作业故障
如果节点在任务执行期间故障,则为该节点安排的所有作业都会自动发送到另一个可用节点(由于内置的故障转移机制)。但是,如果作业引发异常,则可以将作业视为失败,然后将其转移到另一个节点以重新执行,该行为可通过在result(…)
方法中返回FAILOVER
实现:
@Override
public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
IgniteException err = res.getException();
if (err != null)
return ComputeJobResultPolicy.FAILOVER;
// If there is no exception, wait for all job results.
return ComputeJobResultPolicy.WAIT;
}
# 4.5.计算任务适配器
有几个辅助类,可以提供result(…)
和map(…)
的常用实现。
ComputeTaskAdapter
:其定义了一个默认的result(...)
方法实现,它在当一个作业抛出异常时返回一个FAILOVER
策略,否则会返回一个WAIT
策略,这样会等待所有的作业完成,并且有结果;ComputeTaskSplitAdapter
:其继承了ComputeTaskAdapter
,然后实现了map(...)
以将作业自动分配给节点。他引入了一个新的split(...)
方法,可以实现根据输入的数据生成作业的逻辑。
# 4.6.分布式任务会话
提示
该功能在.NET/C#/C++中不可用。
对于每个任务,Ignite会创建一个分布式会话,该会话保存有关任务的信息,并且对任务本身及其派生的所有作业都可见。可以使用此会话在作业之间共享属性,属性可以在作业执行之前或期间分配,并且可以按照设置它们的顺序对其他作业可见。
@ComputeTaskSessionFullSupport
private static class TaskSessionAttributesTask extends ComputeTaskSplitAdapter<Object, Object> {
@Override
protected Collection<? extends ComputeJob> split(int gridSize, Object arg) {
Collection<ComputeJob> jobs = new LinkedList<>();
// Generate jobs by number of nodes in the grid.
for (int i = 0; i < gridSize; i++) {
jobs.add(new ComputeJobAdapter(arg) {
// Auto-injected task session.
@TaskSessionResource
private ComputeTaskSession ses;
// Auto-injected job context.
@JobContextResource
private ComputeJobContext jobCtx;
@Override
public Object execute() {
// Perform STEP1.
// ...
// Tell other jobs that STEP1 is complete.
ses.setAttribute(jobCtx.getJobId(), "STEP1");
// Wait for other jobs to complete STEP1.
for (ComputeJobSibling sibling : ses.getJobSiblings())
try {
ses.waitForAttribute(sibling.getJobId(), "STEP1", 0);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Move on to STEP2.
// ...
return ...
}
});
}
return jobs;
}
@Override
public Object reduce(List<ComputeJobResult> results) {
// No-op.
return null;
}
}
# 4.7.计算任务示例
下面的示例演示了一个字符计数应用,他将一个给定的字符串拆分为单词,然后在单独的作业中计算每个单词的长度,作业会分发到所有的集群节点上。
# 5.负载平衡
Ignite会对由计算任务产生的作业以及通过分布式计算API提交的单个任务自动负载平衡,通过IgniteCompute.run(…)
和其他计算方法提交的单个任务会被视为单个作业的任务。
Ignite默认使用轮询算法(RoundRobinLoadBalancingSpi
),该算法在为计算任务指定的节点上按顺序分配作业。
提示
负载平衡不适用于并置计算。
负载平衡算法由IgniteConfiguration.loadBalancingSpi
属性控制。
# 5.1.轮询式负载均衡
RoundRobinLoadBalancingSpi
以轮询方式遍历并选择下一个可用的节点,可用节点是在执行任务获取计算实例时定义的。
轮询式负载平衡支持两种操作模式:任务级和全局级。
如果配置成任务级模式,当任务开始执行时实现会随机地选择一个节点,然后会顺序地迭代拓扑中所有的节点,对于任务拆分的大小等同于节点的数量时,这个模式保证所有的节点都会参与任务的执行。
警告
在任务级模式中需要启用以下事件类型:EVT_TASK_FAILED
、EVT_TASK_FINISHED
、EVT_JOB_MAPPED
。
如果配置成全局级模式,对于所有的任务都会维护一个节点的单一连续队列然后每次都会从队列中选择一个节点。这个模式中(不像每任务模式),当多个任务并发执行时,即使任务的拆分大小等同于节点的数量,同一个任务的某些作业仍然可能被赋予同一个节点。
默认使用全局级模式。
# 5.2.随机和加权式负载平衡
WeightedRandomLoadBalancingSpi
会为作业的执行从可用节点列表中随机选择一个节点,也可以选择为节点赋予权值,这样有更高权重的节点最终会使将作业分配给它的机会更多,所有节点的权重默认值都是10。
# 5.3.作业窃取
通常集群由很多计算机组成,这就可能存在配置不均衡的情况,这时开启JobStealingCollisionSpi
就会有助于避免作业聚集在过载的节点,因为它们将被未充分利用的节点窃取。
JobStealingCollisionSpi
可以将作业从高负载节点移动到低负载节点,当部分作业完成得很快,而其它的作业还在高负载节点中排队时,这个SPI就会非常有用,这时等待作业就会被移动到更快/低负载的节点。
JobStealingCollisionSpi
采用的是后负载技术,它可以在任务已经被调度在节点A执行后重新分配到节点B。
警告
如果要启用作业窃取,则必须将故障转移SPI配置为JobStealingFailoverSpi
。具体请参见容错。
下面是配置JobStealingCollisionSpi
的示例:
# 6.容错
Ignite支持作业的自动故障转移,当一个节点崩溃时,作业会被转移到其它可用节点再次执行。集群中只要有一个节点在线,作业就不会丢失。
全局故障转移策略由IgniteConfiguration.failoverSpi
属性控制。
可用的实现包括:
AlwaysFailoverSpi
:该实现会将一个故障的作业路由到另一个节点,这也是默认的模式。当来自一个计算任务的作业失败后,首先会尝试将故障的作业路由到该任务还没有被执行过的节点上,如果没有可用的节点,然后会试图将故障的作业路由到可能运行同一个任务中其它的作业的节点上,如果上述的尝试都失败了,那么该作业就不会被故障转移。
NeverFailoverSpi
:该实现不对失败的作业故障转移;
JobStealingFailoverSpi
:只有在希望启用作业窃取时,才需要使用这个实现。
# 7.作业调度
当作业到达目标节点时,会被提交到一个线程池并以随机顺序调度执行,但是通过配置CollisionSpi
可以更改作业顺序。CollisionSpi
接口提供了一种在每个节点调度作业执行的方法。
Ignite提供了CollisionSpi
接口的几种实现:
FifoQueueCollisionSpi
:在多个线程中进行简单的FIFO排序,这时默认的实现;PriorityQueueCollisionSpi
:按优先级排序;JobStealingFailoverSpi
:该实现用于开启作业窃取。
CollisionSpi
通过IgniteConfiguration.collisionSpi
属性来配置。
# 7.1.FIFO排序
FifoQueueCollisionSpi
提供了作业到达时的FIFO排序,作业以多线程模式执行。线程数由parallelJobsNumber
参数控制,默认值为CPU核数的2倍。
# 7.2.优先级排序
使用PriorityQueueCollisionSpi
可以为单独的作业分配优先级,因此高优先级的作业会比低优先级的作业先执行,也可以指定要处理作业的线程数。
任务优先级是在任务会话中通过grid.task.priority
属性配置的,如果任务未分配优先级,那么会使用默认优先级0。
public class MyUrgentTask extends ComputeTaskSplitAdapter<Object, Object> {
// Auto-injected task session.
@TaskSessionResource
private ComputeTaskSession taskSes = null;
@Override
protected Collection<ComputeJob> split(int gridSize, Object arg) {
// Set high task priority.
taskSes.setAttribute("grid.task.priority", 10);
List<ComputeJob> jobs = new ArrayList<>(gridSize);
for (int i = 1; i <= gridSize; i++) {
jobs.add(new ComputeJobAdapter() {
@Override
public Object execute() throws IgniteException {
//your implementation goes here
return null;
}
});
}
// These jobs will be executed with higher priority.
return jobs;
}
@Override
public Object reduce(List<ComputeJobResult> results) throws IgniteException {
return null;
}
}
# 8.计算和数据并置
并置计算是一种分布式数据处理模式,其会将在某数据集上执行的计算任务发送到待处理数据所在的节点,并且仅将计算结果返回。这样可以最大程度减少节点之间的数据传输,并可以显著缩短任务执行时间。
Ignite提供了几种执行并置计算的方法,所有这些方法都使用关联函数来确定数据的位置。
计算接口提供了affinityCall(…)
和affinityRun(…)
方法,可以通过键或分区将任务和数据并置在一起。
提示
affinityCall(…)
和affinityRun(…)
方法保证在任务执行期间,给定的键或分区中的数据在目标节点上是存在的。
# 8.1.通过键并置
要将计算任务发送到给定键所在的节点,可以使用以下方法:
IgniteCompute.affinityCall(String cacheName, Object key, IgniteCallable<R> job)
;IgniteCompute.affinityRun(String cacheName, Object key, IgniteRunnable job)
;
Ignite会调用配置好的关联函数来确定给定键的位置。
# 8.2.通过分区并置
affinityCall(Collection<String> cacheNames, int partId, IgniteCallable job)
和affinityRun(Collection<String> cacheNames, int partId, IgniteRunnable job)
会将任务发送到给定ID的分区所在的节点。当需要检索多个键的对象并且知道这些键属于同一分区时,这很有用。这时可以为每个键创建一个任务,而不是多个任务。
例如,假设要计算特定键子集的某字段的算术平均值。如果要分发计算,则可以按分区对键进行分组,并将每组键发送到分区所在的节点以获取值。组的数量(即任务的数量)不会超过分区的总数(默认为1024)。下面是说明此示例的代码段:
// this task sums up the values of the salary field for the given set of keys
private static class SumTask implements IgniteCallable<BigDecimal> {
private Set<Long> keys;
public SumTask(Set<Long> keys) {
this.keys = keys;
}
@IgniteInstanceResource
private Ignite ignite;
@Override
public BigDecimal call() throws Exception {
IgniteCache<Long, BinaryObject> cache = ignite.cache("person").withKeepBinary();
BigDecimal sum = new BigDecimal(0);
for (long k : keys) {
BinaryObject person = cache.localPeek(k, CachePeekMode.PRIMARY);
if (person != null)
sum = sum.add(new BigDecimal((float) person.field("salary")));
}
return sum;
}
}
public static void calculateAverage(Ignite ignite, Set<Long> keys) {
// get the affinity function configured for the cache
Affinity<Long> affinityFunc = ignite.affinity("person");
// this map stores collections of keys for each partition
HashMap<Integer, Set<Long>> partMap = new HashMap<>();
keys.forEach(k -> {
int partId = affinityFunc.partition(k);
Set<Long> keysByPartition = partMap.computeIfAbsent(partId, key -> new HashSet<Long>());
keysByPartition.add(k);
});
BigDecimal total = new BigDecimal(0);
IgniteCompute compute = ignite.compute();
List<String> caches = Arrays.asList("person");
// iterate over all partitions
for (Map.Entry<Integer, Set<Long>> pair : partMap.entrySet()) {
// send a task that gets specific keys for the partition
BigDecimal sum = compute.affinityCall(caches, pair.getKey().intValue(), new SumTask(pair.getValue()));
total = total.add(sum);
}
System.out.println("the average salary is " + total.floatValue() / keys.size());
}
如果要处理缓存中的所有数据,可以迭代缓存中的所有分区,并发送处理每个单独分区上存储的数据的任务。
// this task sums up the value of the 'salary' field for all objects stored in
// the given partition
public static class SumByPartitionTask implements IgniteCallable<BigDecimal> {
private int partId;
public SumByPartitionTask(int partId) {
this.partId = partId;
}
@IgniteInstanceResource
private Ignite ignite;
@Override
public BigDecimal call() throws Exception {
// use binary objects to avoid deserialization
IgniteCache<Long, BinaryObject> cache = ignite.cache("person").withKeepBinary();
BigDecimal total = new BigDecimal(0);
try (QueryCursor<Cache.Entry<Long, BinaryObject>> cursor = cache
.query(new ScanQuery<Long, BinaryObject>(partId).setLocal(true))) {
for (Cache.Entry<Long, BinaryObject> entry : cursor) {
total = total.add(new BigDecimal((float) entry.getValue().field("salary")));
}
}
return total;
}
}
性能考量
当要处理的数据量足够大时,并置计算有性能优势。在某些情况下,当数据量较小时,缓存查询可能会执行得更好。
# 8.3.EntryProcessor
EntryProcessor
用于在存储缓存条目的节点上处理该缓存条目并返回处理结果。对于EntryProcessor
,不必传输整个对象来执行操作,可以远程执行操作,并且只传输结果。
如果EntryProcessor
为不存在的条目设置值,则该条目将添加到缓存中。
对于给定的键,EntryProcessor
在一个锁内以原子方式执行。
18624049226