Skip to content

数据流

要流式注入大量数据,需要使用数据流技术。数据流提供了一种更快、更高效的方式来加载、组织和分发数据。数据流在所有表视图中都可用。

数据流

数据流提供至少一次传输保证。

1.使用数据流 API

java
RecordView<Tuple> view = defaultTable().recordView();
CompletableFuture<Void> streamerFut;

try (var publisher = new SubmissionPublisher<CustomData>()) {
   streamerFut = view.<CustomData, String, Boolean>streamData(
           publisher,
           null,
           item -> Tuple.create().set("id", item.getId()),
           item -> item.serializeToString(),
           null,
           List.of(new DeploymentUnit("test", "1.0.0")),
           "org.foo.bar.StreamReceiver",
           "receiverArg1");

   publisher.submit(new CustomData(1, "x"));
}
csharp
public async Task TestBasicStreamingRecordBinaryView()
{
    var options = DataStreamerOptions.Default with { BatchSize = 10 };
    var data = Enumerable.Range(0, Count).Select(x => new IgniteTuple { ["id"] = 1L, ["name"] = "foo" }).ToList();

    await TupleView.StreamDataAsync(data.ToAsyncEnumerable(), options);
}

2.跟踪失败的条目

如果数据流无法提交数据,它会保存所有失败的项目,并将其在DataStreamerException#failedItems中返回,然后可以捕获异常以跟踪未发送到集群的条目:

java
RecordView<Tuple> view = defaultTable().recordView();
CompletableFuture<Void> streamerFut;


try (var publisher = new SubmissionPublisher<CustomData>()) {
   streamerFut = view.<CustomData, String, Boolean>streamData(
           publisher,
           null,
           item -> Tuple.create().set("id", item.getId()),
           item -> item.serializeToString(),
           null,
           List.of(new DeploymentUnit("test", "1.0.0")),
           "org.foo.bar.StreamReceiver",
           "receiverArg1")
   .exceptionally(e -> {
       // Handle entries that failed during background streaming
       System.out.println(((DataStreamerException)e.getCause()).failedItems())
   }
);


   publisher.submit(new CustomData(1, "x"));
}
catch (DataStreamerException e){
  Set<?> failedItems = e.failedItems()
  // Handle entries that failed during submission
  System.out.println(failedItems)
}

3.配置数据流的属性

所有数据流的参数都可以使用DataStreamerOptions对象进行配置,例如下面的代码将数据流设置为有 3 次重试:

java
RecordView<Tuple> view = client.tables().table("Person").recordView();
var publisher = new SubmissionPublisher<Tuple>();

var options = DataStreamerOptions.builder()
.retryLimit(3)
.build();

streamerFut = view.streamData(publisher, options);

3.1.优化内存使用

数据流可能需要大量内存才能有序地处理请求。根据实际环境的差异,可能需要增加或减少数据流预留的内存量。

对于集群中的每个节点,数据流保留的内存量等于batchSize(默认为 1000 个条目)乘以perNodeParallelOperations(默认为 4)。例如,具有默认参数且平均条目大小为 1KB 的 10 节点集群将预留 40MB 。

这些参数的修改方法如下:

java
RecordView<Tuple> view = client.tables().table("Person").recordView();
var publisher = new SubmissionPublisher<Tuple>();

var options = DataStreamerOptions.builder()
.batchSize(10000)
.perNodeParallelOperations(10)
.build();

streamerFut = view.streamData(publisher, options);

此外,数据流会定期刷新缓冲区,以避免消息长时间停留(特定缓冲区可能会缓慢填满或根本不会完全填满,具体取决于数据分布)。这是通过autoFlushInterval属性配置的(默认为 5000ms)。

18624049226