Skip to content

分布式计算

Ignite 3 提供了分布式计算 API,具备负载平衡和容错的能力,可以从 Java 和 .NET 客户端提交任务并执行。

计算任务在集群节点上执行业务代码,因此在提交计算任务之前,需要将要执行的代码部署到执行计算任务的节点。根据实际需求,可以在集群中的单个节点、多个节点或所有节点上执行任务。Ignite 还提供了用于异步任务执行的 API。

计算任务一般会处理存储在集群中的数据,因此任意选择节点可能会给集群增加负载,因为所需的数据可能在其他的节点上。解决方案是采用并置计算,这时任务将在数据所在的节点上执行。

计算任务将提交到集群中的某个节点,如果需要其他节点参与,将使用内部集群连接向它们发送请求。

在将业务对象发送到节点之前,必须先对其进行序列化处理。大多数场景(元组、POJO、原生类型)会自动处理,但在更复杂的场景中,可能需要编写自己的序列化逻辑。

1.执行作业

1.1.单节点执行

有时可能需要在集群中的某个节点上执行作业,做法有如下几种:

  • submitAsync():将作业发送到集群,并返回一个 Future,当提交作业执行时,该 Future 将与JobExecution对象一起完成;
  • executeAsync():将作业发送到集群,并返回一个Future,该 Future 将在作业执行结果准备就绪时完成;
  • execute():将作业发送到集群并等待作业执行的结果。

提交作业时,必须创建 JobTarget 对象。可以指向某个节点、或启动并置计算作业,该作业将在指定数据所在的节点上执行,可以使用以下方法:

  • JobTarget.anyNode():作业将在任意一个节点上执行;
  • JobTarget.node():作业将在指定节点上执行;
  • JobTarget.colocated():作业将在数据所在的节点上执行。

下面的示例假定NodeNameJob类已被部署到服务端,然后在集群中的任何节点上执行作业。

java
public static void example() throws ExecutionException, InterruptedException {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();

    String executionResult = client.compute().execute(JobTarget.anyNode(client.clusterNodes()),
        JobDescriptor.builder(NodeNameJob.class).build(), null
    );

    System.out.println(executionResult);
}
csharp
ICompute compute = Client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();

IJobExecution<string> execution = await compute.SubmitAsync(
    JobTarget.AnyNode(nodes),
    new JobDescriptor<string, string>("org.example.NodeNameJob"),
    arg: "Hello");

string result = await execution.GetResultAsync();
cpp
using namespace ignite;

compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();

// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
	.deployment_units({deployment_unit{"unitName", "1.1.1"}})
	.build();

job_execution execution = comp.submit(job_target::any_node(nodes), job_desc, {std::string("Hello")}, {});
std::string result = execution.get_result()->get<std::string>();

1.2.多节点执行

要在多个节点上执行计算任务,方法与前述单节点执行的相同,不同点在于,无需创建JobTarget对象来指定执行节点,而是使用BroadcastJobTarget并指定执行任务的节点列表。

BroadcastJobTarget对象可以指定以下内容:

  • BroadcastJobTarget.nodes():作业将在列表中的所有节点上执行。
  • BroadcastJobTarget.table():作业将在持有指定表的分区的所有节点上执行。

可以通过设置节点列表来控制在哪些节点上执行任务。

java
public static void example() throws ExecutionException, InterruptedException {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();

    String executionResult = client.compute().execute(
            BroadcastJobTarget.nodes(node(0), node(1)),
        JobDescriptor.builder(NodeNameJob.class).build(), null
    );

    System.out.println(executionResult);
}
csharp
ICompute compute = Client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();

IBroadcastExecution<string> execution = await compute.SubmitBroadcastAsync(
    BroadcastJobTarget.Nodes(nodes),
    new JobDescriptor<object, string>("org.example.NodeNameJob"),
    arg: "Hello");

foreach (IJobExecution<string> jobExecution in execution.JobExecutions)
{
    string jobResult = await jobExecution.GetResultAsync();
    Console.WriteLine($"Job result from node {jobExecution.Node}: {jobResult}");
}
cpp
using namespace ignite;

compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();

// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
	.deployment_units({deployment_unit{"unitName", "1.1.1"}})
	.build();

broadcast_execution execution = comp.submit_broadcast(broadcast_job_target::nodes(nodes), job_desc, {std::string("Hello")}, {});
for (auto &exec: execution.get_job_executions()) {
    std::string result = exec.get_result()->get<std::string>();
}

1.3.可能的状态和过渡

下图描述了作业状态的可能转换: 计算作业状态

下表列出了支持的作业状态:

状态描述可过渡至
Submitted作业已创建并发送到集群,但尚未执行。QueuedCanceled
Queued作业已添加到队列中,并等待队列执行。ExecutingCanceled
Executing作业正在执行中。CancelingCompletedQueued
Completed作业执行成功,并返回执行结果。
Failed任务在执行过程中意外终止。Queued
Canceling作业已收到取消命令,但仍在运行。CompletedCanceled
Canceled作业已成功取消。

如果所有任务执行线程都处于繁忙状态,则节点接收的新任务将根据其作业优先级放入任务队列中。Ignite 首先按优先级对所有传入的作业进行排序,然后按时间排序,首先执行较早排队的作业。

1.4.取消正在执行的作业

当节点收到处于Executing状态的任务的取消命令时,会立即向执行该任务的线程发送中断。通常这将导致作业立即被取消,但有时作业也可能会继续执行。如果发生这种情况,则作业将处于Canceling状态,根据正在执行的代码,作业可能会成功完成,在不间断操作完成后被取消,或者保持未完成状态(例如,如果代码卡在循环中)。可以使用JobExecution.stateAsync()方法跟踪作业所处的状态,并对状态更改做出反应。

为了取消计算作业,首先要创建CancelHandle并从中拿到一个令牌,然后可以使用此令牌取消计算作业:

java
CancelHandle cancelHandle = CancelHandle.create();
CancellationToken cancelToken = cancelHandle.token();

CompletableFuture<Void> execution = client.compute().executeAsync(JobTarget.anyNode(client.clusterNodes()), JobDescriptor.builder(NodeNameJob.class).build(), cancelToken, null);

cancelHandle.cancel();

取消作业的另一种方法是使用 SQL 的 KILL COMPUTE 命令,可以通过COMPUTE_JOBS系统视图拿到作业的ID。

2.作业配置

2.1.作业优先级

可以通过设置JobExecutionOptions.priority属性来指定作业优先级。优先级较高的作业将在优先级较低的作业之前排队(例如优先级为 4 的作业将在优先级为 2 的作业之前执行)。

java
public static void example() throws ExecutionException, InterruptedException {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();

    // Create job execution options
    JobExecutionOptions options = JobExecutionOptions.builder().priority(1).build();

    String executionResult = client.compute().execute(JobTarget.anyNode(client.clusterNodes()),
            JobDescriptor.builder(NodeNameJob.class).options(options).build(), null
    );

    System.out.println(executionResult);
}
csharp
var options = JobExecutionOptions.Default with { Priority = 1 };

IJobExecution<string> execution = await Client.Compute.SubmitAsync(
    JobTarget.AnyNode(await Client.GetClusterNodesAsync()),
    new JobDescriptor<string, string>("org.example.NodeNameJob", Options: options),
    arg: "Hello");

string result = await execution.GetResultAsync();
cpp
using namespace ignite;

compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();

// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
	.deployment_units({deployment_unit{"unitName", "1.1.1"}})
	.build();

job_execution_options options{1, 0};
job_execution execution = comp.submit(job_target::any_node(nodes), job_desc, {std::string("Hello")}, std::move(options));
std::string result = execution.get_result()->get<std::string>();

2.2.作业重试

可以通过设置JobExecutionOptions.maxRetries属性来设置作业失败时重试的次数。如果设置该参数,则失败的作业将在转到Failed状态之前重试指定的次数。

java
public static void example() throws ExecutionException, InterruptedException {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();

    // Create job execution options
    JobExecutionOptions options = JobExecutionOptions.builder().maxRetries(5).build();

    String executionResult = client.compute().execute(JobTarget.anyNode(client.clusterNodes()),
            JobDescriptor.builder(NodeNameJob.class).options(options).build(), null
    );

    System.out.println(executionResult);
}
csharp
var options = JobExecutionOptions.Default with { MaxRetries = 5 };

IJobExecution<string> execution = await Client.Compute.SubmitAsync(
    JobTarget.AnyNode(await Client.GetClusterNodesAsync()),
    new JobDescriptor<string, string>("org.example.NodeNameJob", Options: options),
    arg: "Hello");

string result = await execution.GetResultAsync();
cpp
using namespace ignite;

compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();

// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
	.deployment_units({deployment_unit{"unitName", "1.1.1"}})
	.build();

job_execution_options options{0, 5};
job_execution execution = comp.submit(job_target::any_node(nodes), job_desc, {std::string("Hello")}, std::move(options));
std::string result = execution.get_result()->get<std::string>();

3.作业故障转移

Ignite 3 实现了一套机制来处理任务执行过程中发生的问题,处理以下情况:

3.1.工作节点下线

如果工作节点关闭,协调器节点会将分配给下线节点的所有作业重新分配给其他可用的节点。如果未找到则作业将失败,并将向客户端发送异常。

3.2.协调器节点下线

如果工作节点检测到协调器节点下线,所有作业都将被取消。注意,某些作业可能需要很长时间才能取消。

3.3.客户端断连

如果协调器节点检测到客户端断开连接,所有作业都将被取消。注意,某些作业可能需要很长时间才能取消。

4.并置计算

在 Ignite 3 中,可以使用并置的JobTarget执行并置计算。该操作可以保证在数据所在的节点上执行计算任务,如果任务需要操作数据,这可以显著减少执行时间。

java
public static void example() throws ExecutionException, InterruptedException {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();

    String executionResult = client.compute()
            .execute(
                    JobTarget.colocated("myTable", Tuple.create(Map.of("k", 1))),
                    JobDescriptor.builder(NodeNameJob.class).build(),
                    null
            );

    System.out.println(execution.resultAsync().get());
}
csharp
string table = "Person";
string key = "John";

IJobExecution<string> execution = await Client.Compute.SubmitAsync(
    JobTarget.Colocated(table, key),
    new JobDescriptor<string, string>("org.example.NodeNameJob"),
    arg: "Hello");

string result = await execution.GetResultAsync();
cpp
using namespace ignite;

compute comp = client.get_compute();
std::string table{"Person"};
std::string key{"John"};

// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
	.deployment_units({deployment_unit{"unitName", "1.1.1"}})
	.build();

job_execution execution = comp.submit(job_target::colocated(table, key), job_desc, {std::string("Hello")}, {});
std::string result = execution.get_result()->get<std::string>();

或者也可以通过创建BroadcastJobTarget.table()目标,然后 Ignite 将自动查找持有指定表的数据分区的所有节点,并在这些节点上执行计算作业。

以下示例在具有Person表分区的所有节点上执行相同的作业:

java
String executionResult = client.compute().execute(BroadcastJobTarget.table("Person"),
    JobDescriptor.builder(NodeNameJob.class).build(), null
);

System.out.println(executionResult);

5.MapReduce任务

Ignite 3 提供了一个 API,用于在集群中执行 MapReduce 操作,该操作会先拆分计算任务并分发在多个节点执行,然后汇总结果并将其返回给用户。

5.1.了解MapReduce任务

MapReduce 任务必须实现MapReduceTask接口,然后部署到对应的节点上才能执行。该接口提供了一种实现自定义映射和汇总逻辑的方法。接收任务的节点会成为协调器节点,该节点将负责将任务映射到其他节点,汇总其结果并将最终结果返回给客户端。

该类必须实现两个方法:splitAsyncreduceAsync

splitAsync()方法负责根据输入参数创建计算作业并将其映射到工作节点。该方法接收执行上下文和任务的参数,并返回一个CompletableFuture,其中包含将发送到工作节点的作业描述符列表。

reduceAsync()方法在汇总步骤中,当所有作业都已完成时被调用。该方法从工作节点接收已完成的作业结果的映射,并返回计算的最终结果。

5.2.创建映射类

所有 MapReduce 作业都必须提交到部署了这些类的节点,下面是一个示例:

java
private static class MapReduceNodeNameTask implements MapReduceTask<String, Object, String, String> {
        @Override
        public CompletableFuture<List<MapReduceJob<Object, String>>> splitAsync(TaskExecutionContext context, String args) {
            return completedFuture(context.ignite().clusterNodes().stream()
                    .map(node -> MapReduceJob.<Object, String>builder()
                            .jobDescriptor(JobDescriptor.builder(NodeNameJob.class).build())
                            .nodes(Set.of(node))
                            .args(args)
                            .build())
                    .collect(Collectors.toList()));
        }

        @Override
        public CompletableFuture<String> reduceAsync(TaskExecutionContext context, Map<UUID, String> results) {
            return completedFuture(results.values().stream()
                    .map(String.class::cast)
                    .collect(Collectors.joining(",")));
        }
    }

5.3.执行MapReduce任务

要执行 MapReduce 任务,请使用以下方法之一:

  • submitMapReduce():将 MapReduce 作业发送到集群,并返回可用于监控或修改计算任务执行的TaskExecution对象;
  • executeMapReduceAsync():将 MapReduce 作业发送到集群,并返回作为作业执行结果的Future
  • executeMapReduce():将作业发送到集群并等待作业执行的结果。
java
public static void example() throws ExecutionException, InterruptedException {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();

    TaskDescriptor<String, String> taskDescriptor = TaskDescriptor.builder(MapReduceNodeNameTask.class).build();

    String executionResult = client.compute()..executeMapReduce(taskDescriptor, null);

    System.out.println(executionResult);
}
csharp
ICompute compute = Client.Compute;
var taskDescriptor = new TaskDescriptor<string, string>("com.example.MapReduceNodeNameTask");
ITaskExecution<string> exec = await compute.SubmitMapReduceAsync(taskDescriptor, "arg");
string result = await exec.GetResultAsync();
Console.WriteLine(result);

18624049226