Skip to content

对象序列化

Ignite 3 提供了一种序列化 Java 对象和类型的方法,并在服务端和客户端之间发送数据。

1.基础类型

Ignite 会自动处理基础类型的序列化,例如,以下计算作业接受一个 Integer 并返回一个 Integer

java
class IntegerComputeJob implements ComputeJob<Integer, Integer> {
    @Override
    public @Nullable CompletableFuture<Integer> executeAsync(
        JobExecutionContext context, @Nullable Integer arg
    ) {
        return completedFuture(arg - 1);
    }
}

由于参数和结果都是基础类型并且是自动序列化的,因此不需要任何其他代码来处理序列化:

java
try (IgniteClient client = IgniteClient.builder().addresses("address/to/cluster:port").build()) {
    Integer result = client.compute().execute(
        JobTarget.anyNode(client.clusterNodes()),
        JobDescriptor.builder(IntegerComputeJob.class).build(),
        1
    );
}
csharp
using var client = await IgniteClient.StartAsync(
    new IgniteClientConfiguration("address/to/cluster:port"));

IJobExecution<int> jobExec = await client.Compute.SubmitAsync(
    JobTarget.AnyNode(await client.GetClusterNodesAsync()),
    new JobDescriptor<int, int>("org.example.IntegerComputeJob"),
    1);

int result = await jobExec.GetResultAsync();

2.元组

Ignite 是面向元组设计的,并自动处理元组序列化,例如以下计算任务接受元组并返回元组:

java
class TupleComputeJob implements ComputeJob<Tuple, Tuple> {
    @Override
    public @Nullable CompletableFuture<Tuple> executeAsync(JobExecutionContext context, @Nullable Tuple arg) {
        Tuple resultTuple = Tuple.copy(arg);
        resultTuple.set("col", "new value");

        return completedFuture(resultTuple);
    }
}

由于参数和结果都是元组,因此它们会自动序列化,不需要额外的处理:

java
try (IgniteClient client = IgniteClient.builder().addresses("address/to/cluster:port").build()) {
    Tuple resultTuple = client.compute().execute(
        JobTarget.anyNode(client.clusterNodes()),
        JobDescriptor.builder(TupleComputeJob.class).build(),
        Tuple.create().set("col", "value")
    );
}
csharp
using var client = await IgniteClient.StartAsync(
    new IgniteClientConfiguration("address/to/cluster:port"));

IJobExecution<IIgniteTuple> jobExec = await client.Compute.SubmitAsync(
    JobTarget.AnyNode(await client.GetClusterNodesAsync()),
    new JobDescriptor<IIgniteTuple, IIgniteTuple>("org.example.TupleComputeJob"),
    new IgniteTuple { ["col"] = "value" });

IIgniteTuple result = await jobExec.GetResultAsync();

3.用户对象

用户对象按以下方式自动编组:

  • 如果配置了自定义的编组器,则使用该编组器;
  • 在未定义编组器的情况下,用户 Java 对象会被编组为二进制元组;
  • 如果存在嵌套对象,则它们将递归编组为元组。

下面是使用自定义逻辑编组的用户对象示例(使用 ObjectMapper 做 JSON 序列化,可根据具体需求灵活处理)。

下面从计算任务代码开始,它应该是同一部署单元的一部分。

3.1.服务端

下面的代码显示了如何在服务端处理编组,使其可以正确地将数据发送到客户端并接收它们的响应:

  • 下面是将用作作业参数的自定义对象:

    java
    class ArgumentCustomServerObject {
        int arg1;
        String arg2;
    }
  • 使用ObjectMapper对象为其定义一个编组器:

    java
    final ObjectMapper MAPPER = new ObjectMapper();
    
    class ArgumentCustomServerObjectMarshaller implements Marshaller<ArgumentCustomServerObject, byte[]> {
        @Override
        public byte @Nullable [] marshal(@Nullable ArgumentCustomServerObject object) throws UnsupportedObjectTypeMarshallingException {
            try {
                return MAPPER.writeValueAsBytes(object);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public @Nullable ArgumentCustomServerObject unmarshal(byte @Nullable [] raw) throws UnsupportedObjectTypeMarshallingException {
            try {
                return MAPPER.readValue(raw, ArgumentCustomServerObject.class);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
  • 还需要创建另一个将用于存储计算作业结果的对象,以及相应的编组器:

    java
    class ResultCustomServerObject {
        int res1;
        String res2;
        long res3;
    }
    
    class ResultCustomServerObjectMarshaller implements Marshaller<ResultCustomServerObject, byte[]> {
        @Override
        public byte @Nullable [] marshal(@Nullable ResultCustomServerObject object) throws UnsupportedObjectTypeMarshallingException {
            try {
                return MAPPER.writeValueAsBytes(object);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public @Nullable ResultCustomServerObject unmarshal(byte @Nullable [] raw) throws UnsupportedObjectTypeMarshallingException {
            try {
                return MAPPER.readValue(raw, ResultCustomServerObject.class);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

上面的编组器定义了如何将相应的对象表示为byte[],以及如何从byte[]中读取这些对象。但是定义这些类并不能启用自定义序列化,因为需要指定在序列化对象时使用该编组器。在 Ignite 中,这是通过覆盖 ComputeJob 中的两个方法然后将其用作编组器的工厂方法来完成的:

下面的代码提供了在计算作业中实现编组器的示例:

java
class PojoComputeJob implements ComputeJob<ArgumentCustomServerObject, ResultCustomServerObject> {

    @Override
    public @Nullable CompletableFuture<ResultCustomServerObject> executeAsync(
        JobExecutionContext context,
        @Nullable ArgumentCustomServerObject arg
    ) {
        ResultCustomServerObject res = new ResultCustomServerObject();
        res.res1 = arg.arg1;
        res.res2 = arg.arg2;
        res.res3 = 1;

        return completedFuture(res);
    }

    @Override
    public Marshaller<ArgumentCustomServerObject, byte[]> inputMarshaller() {
        return new ArgumentCustomServerObjectMarshaller();
    }

    @Override
    public Marshaller<ResultCustomServerObject, byte[]> resultMarshaller() {
        return new ResultCustomServerObjectMarshaller();
    }
}

这样,Ignite 服务端将能够处理对所需对象的编组,并将其发送给客户端,以及解组客户端响应。

3.2.客户端

在客户端,处理传入对象和编组响应所需的代码大致相同:

  • 定义用于计算作业的自定义对象:

    java
    class ArgumentCustomClientObject {
        int arg1;
        String arg2;
    }
    csharp
    record ArgumentCustomClientObject(int arg1, string arg2);
  • 定义对象的编组器:

    java
    final ObjectMapper MAPPER = new ObjectMapper();
    
    class ArgumentCustomClientObjectMarshaller implements Marshaller<ArgumentCustomClientObject, byte[]> {
        @Override
        public byte @Nullable [] marshal(@Nullable ArgumentCustomClientObject object) throws UnsupportedObjectTypeMarshallingException {
            try {
                return MAPPER.writeValueAsBytes(object);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public @Nullable ArgumentCustomClientObject unmarshal(byte @Nullable [] raw) throws UnsupportedObjectTypeMarshallingException {
            try {
                return MAPPER.readValue(raw, ArgumentCustomClientObject.class);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
    csharp
    class MyJsonMarshaller<T> : IMarshaller<T>
    {
        public void Marshal(T obj, IBufferWriter<byte> writer)
        {
            using var utf8JsonWriter = new Utf8JsonWriter(writer);
            JsonSerializer.Serialize(utf8JsonWriter, obj);
        }
    
        public T Unmarshal(ReadOnlySpan<byte> bytes) =>
            JsonSerializer.Deserialize<T>(bytes)!;
    }
  • 对结果对象执行相同的操作:

    java
    class ResultCustomClientObject {
        int res1;
        String res2;
        long res3;
    }
    
    
    class ResultCustomClientObjectMarshaller implements Marshaller<ResultCustomClientObject, byte[]> {
        @Override
        public byte @Nullable [] marshal(@Nullable ResultCustomClientObject object) throws UnsupportedObjectTypeMarshallingException {
            try {
                return MAPPER.writeValueAsBytes(object);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public @Nullable ResultCustomClientObject unmarshal(byte @Nullable [] raw) throws UnsupportedObjectTypeMarshallingException {
            try {
                return MAPPER.readValue(raw, ResultCustomClientObject.class);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    // ....
    csharp
    record ResultCustomClientObject(int res1, string res2, long res3);
    
    // Use the same generic MyJsonMarshaller class (see above) for the result object.

现在,所有编组器都已定义,可以使用自定义对象并在计算作业中处理参数和结果的编组:

java
try (IgniteClient client = IgniteClient.builder().addresses("address/to/cluster:port").build()) {
    // Marshalling example of pojo.
    ResultCustomClientObject resultPojo = client.compute().execute(
        JobTarget.anyNode(client.clusterNodes()),
        JobDescriptor.<ArgumentCustomClientObject, ResultCustomClientObject>builder(PojoComputeJob.class.getName())
            .argumentMarshaller(new ArgumentCustomClientObjectMarshaller())
            .resultMarshaller(new ResultCustomClientObjectMarshaller())
            .build(),
        new ArgumentCustomClientObject()
    );
}
csharp
using var client = await IgniteClient.StartAsync(
    new IgniteClientConfiguration("address/to/cluster:port"));

IJobExecution<ResultCustomClientObject> jobExec = await client.Compute.SubmitAsync(
    JobTarget.AnyNode(await client.GetClusterNodesAsync()),
    new JobDescriptor<ArgumentCustomClientObject, ResultCustomClientObject>("org.example.PojoComputeJob")
    {
        ArgMarshaller = new MyJsonMarshaller<ArgumentCustomClientObject>(),
        ResultMarshaller = new MyJsonMarshaller<ResultCustomClientObject>()
    },
    new ArgumentCustomClientObject(1, "abc"));

ResultCustomClientObject result = await jobExec.GetResultAsync();

18624049226