对象序列化
Ignite 3 提供了一种序列化 Java 对象和类型的方法,并在服务端和客户端之间发送数据。
1.基础类型
Ignite 会自动处理基础类型的序列化,例如,以下计算作业接受一个 Integer
并返回一个 Integer
:
class IntegerComputeJob implements ComputeJob<Integer, Integer> {
@Override
public @Nullable CompletableFuture<Integer> executeAsync(
JobExecutionContext context, @Nullable Integer arg
) {
return completedFuture(arg - 1);
}
}
由于参数和结果都是基础类型并且是自动序列化的,因此不需要任何其他代码来处理序列化:
try (IgniteClient client = IgniteClient.builder().addresses("address/to/cluster:port").build()) {
Integer result = client.compute().execute(
JobTarget.anyNode(client.clusterNodes()),
JobDescriptor.builder(IntegerComputeJob.class).build(),
1
);
}
using var client = await IgniteClient.StartAsync(
new IgniteClientConfiguration("address/to/cluster:port"));
IJobExecution<int> jobExec = await client.Compute.SubmitAsync(
JobTarget.AnyNode(await client.GetClusterNodesAsync()),
new JobDescriptor<int, int>("org.example.IntegerComputeJob"),
1);
int result = await jobExec.GetResultAsync();
2.元组
Ignite 是面向元组设计的,并自动处理元组序列化,例如以下计算任务接受元组并返回元组:
class TupleComputeJob implements ComputeJob<Tuple, Tuple> {
@Override
public @Nullable CompletableFuture<Tuple> executeAsync(JobExecutionContext context, @Nullable Tuple arg) {
Tuple resultTuple = Tuple.copy(arg);
resultTuple.set("col", "new value");
return completedFuture(resultTuple);
}
}
由于参数和结果都是元组,因此它们会自动序列化,不需要额外的处理:
try (IgniteClient client = IgniteClient.builder().addresses("address/to/cluster:port").build()) {
Tuple resultTuple = client.compute().execute(
JobTarget.anyNode(client.clusterNodes()),
JobDescriptor.builder(TupleComputeJob.class).build(),
Tuple.create().set("col", "value")
);
}
using var client = await IgniteClient.StartAsync(
new IgniteClientConfiguration("address/to/cluster:port"));
IJobExecution<IIgniteTuple> jobExec = await client.Compute.SubmitAsync(
JobTarget.AnyNode(await client.GetClusterNodesAsync()),
new JobDescriptor<IIgniteTuple, IIgniteTuple>("org.example.TupleComputeJob"),
new IgniteTuple { ["col"] = "value" });
IIgniteTuple result = await jobExec.GetResultAsync();
3.用户对象
用户对象按以下方式自动编组:
- 如果配置了自定义的编组器,则使用该编组器;
- 在未定义编组器的情况下,用户 Java 对象会被编组为二进制元组;
- 如果存在嵌套对象,则它们将递归编组为元组。
下面是使用自定义逻辑编组的用户对象示例(使用 ObjectMapper
做 JSON 序列化,可根据具体需求灵活处理)。
下面从计算任务代码开始,它应该是同一部署单元的一部分。
3.1.服务端
下面的代码显示了如何在服务端处理编组,使其可以正确地将数据发送到客户端并接收它们的响应:
下面是将用作作业参数的自定义对象:
javaclass ArgumentCustomServerObject { int arg1; String arg2; }
使用
ObjectMapper
对象为其定义一个编组器:javafinal ObjectMapper MAPPER = new ObjectMapper(); class ArgumentCustomServerObjectMarshaller implements Marshaller<ArgumentCustomServerObject, byte[]> { @Override public byte @Nullable [] marshal(@Nullable ArgumentCustomServerObject object) throws UnsupportedObjectTypeMarshallingException { try { return MAPPER.writeValueAsBytes(object); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } @Override public @Nullable ArgumentCustomServerObject unmarshal(byte @Nullable [] raw) throws UnsupportedObjectTypeMarshallingException { try { return MAPPER.readValue(raw, ArgumentCustomServerObject.class); } catch (IOException e) { throw new RuntimeException(e); } } }
还需要创建另一个将用于存储计算作业结果的对象,以及相应的编组器:
javaclass ResultCustomServerObject { int res1; String res2; long res3; } class ResultCustomServerObjectMarshaller implements Marshaller<ResultCustomServerObject, byte[]> { @Override public byte @Nullable [] marshal(@Nullable ResultCustomServerObject object) throws UnsupportedObjectTypeMarshallingException { try { return MAPPER.writeValueAsBytes(object); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } @Override public @Nullable ResultCustomServerObject unmarshal(byte @Nullable [] raw) throws UnsupportedObjectTypeMarshallingException { try { return MAPPER.readValue(raw, ResultCustomServerObject.class); } catch (IOException e) { throw new RuntimeException(e); } } }
上面的编组器定义了如何将相应的对象表示为byte[]
,以及如何从byte[]
中读取这些对象。但是定义这些类并不能启用自定义序列化,因为需要指定在序列化对象时使用该编组器。在 Ignite 中,这是通过覆盖 ComputeJob
中的两个方法然后将其用作编组器的工厂方法来完成的:
下面的代码提供了在计算作业中实现编组器的示例:
class PojoComputeJob implements ComputeJob<ArgumentCustomServerObject, ResultCustomServerObject> {
@Override
public @Nullable CompletableFuture<ResultCustomServerObject> executeAsync(
JobExecutionContext context,
@Nullable ArgumentCustomServerObject arg
) {
ResultCustomServerObject res = new ResultCustomServerObject();
res.res1 = arg.arg1;
res.res2 = arg.arg2;
res.res3 = 1;
return completedFuture(res);
}
@Override
public Marshaller<ArgumentCustomServerObject, byte[]> inputMarshaller() {
return new ArgumentCustomServerObjectMarshaller();
}
@Override
public Marshaller<ResultCustomServerObject, byte[]> resultMarshaller() {
return new ResultCustomServerObjectMarshaller();
}
}
这样,Ignite 服务端将能够处理对所需对象的编组,并将其发送给客户端,以及解组客户端响应。
3.2.客户端
在客户端,处理传入对象和编组响应所需的代码大致相同:
定义用于计算作业的自定义对象:
javaclass ArgumentCustomClientObject { int arg1; String arg2; }
csharprecord ArgumentCustomClientObject(int arg1, string arg2);
定义对象的编组器:
javafinal ObjectMapper MAPPER = new ObjectMapper(); class ArgumentCustomClientObjectMarshaller implements Marshaller<ArgumentCustomClientObject, byte[]> { @Override public byte @Nullable [] marshal(@Nullable ArgumentCustomClientObject object) throws UnsupportedObjectTypeMarshallingException { try { return MAPPER.writeValueAsBytes(object); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } @Override public @Nullable ArgumentCustomClientObject unmarshal(byte @Nullable [] raw) throws UnsupportedObjectTypeMarshallingException { try { return MAPPER.readValue(raw, ArgumentCustomClientObject.class); } catch (IOException e) { throw new RuntimeException(e); } } }
csharpclass MyJsonMarshaller<T> : IMarshaller<T> { public void Marshal(T obj, IBufferWriter<byte> writer) { using var utf8JsonWriter = new Utf8JsonWriter(writer); JsonSerializer.Serialize(utf8JsonWriter, obj); } public T Unmarshal(ReadOnlySpan<byte> bytes) => JsonSerializer.Deserialize<T>(bytes)!; }
对结果对象执行相同的操作:
javaclass ResultCustomClientObject { int res1; String res2; long res3; } class ResultCustomClientObjectMarshaller implements Marshaller<ResultCustomClientObject, byte[]> { @Override public byte @Nullable [] marshal(@Nullable ResultCustomClientObject object) throws UnsupportedObjectTypeMarshallingException { try { return MAPPER.writeValueAsBytes(object); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } @Override public @Nullable ResultCustomClientObject unmarshal(byte @Nullable [] raw) throws UnsupportedObjectTypeMarshallingException { try { return MAPPER.readValue(raw, ResultCustomClientObject.class); } catch (IOException e) { throw new RuntimeException(e); } } } // ....
csharprecord ResultCustomClientObject(int res1, string res2, long res3); // Use the same generic MyJsonMarshaller class (see above) for the result object.
现在,所有编组器都已定义,可以使用自定义对象并在计算作业中处理参数和结果的编组:
try (IgniteClient client = IgniteClient.builder().addresses("address/to/cluster:port").build()) {
// Marshalling example of pojo.
ResultCustomClientObject resultPojo = client.compute().execute(
JobTarget.anyNode(client.clusterNodes()),
JobDescriptor.<ArgumentCustomClientObject, ResultCustomClientObject>builder(PojoComputeJob.class.getName())
.argumentMarshaller(new ArgumentCustomClientObjectMarshaller())
.resultMarshaller(new ResultCustomClientObjectMarshaller())
.build(),
new ArgumentCustomClientObject()
);
}
using var client = await IgniteClient.StartAsync(
new IgniteClientConfiguration("address/to/cluster:port"));
IJobExecution<ResultCustomClientObject> jobExec = await client.Compute.SubmitAsync(
JobTarget.AnyNode(await client.GetClusterNodesAsync()),
new JobDescriptor<ArgumentCustomClientObject, ResultCustomClientObject>("org.example.PojoComputeJob")
{
ArgMarshaller = new MyJsonMarshaller<ArgumentCustomClientObject>(),
ResultMarshaller = new MyJsonMarshaller<ResultCustomClientObject>()
},
new ArgumentCustomClientObject(1, "abc"));
ResultCustomClientObject result = await jobExec.GetResultAsync();
18624049226