数据流
要流式注入大量数据,需要使用数据流技术。数据流提供了一种更快、更高效的方式来加载、组织和分发数据。数据流在所有表视图中都可用。
数据流提供至少一次传输保证。
1.使用数据流 API
java
RecordView<Tuple> view = defaultTable().recordView();
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<CustomData>()) {
streamerFut = view.<CustomData, String, Boolean>streamData(
publisher,
null,
item -> Tuple.create().set("id", item.getId()),
item -> item.serializeToString(),
null,
List.of(new DeploymentUnit("test", "1.0.0")),
"org.foo.bar.StreamReceiver",
"receiverArg1");
publisher.submit(new CustomData(1, "x"));
}
csharp
public async Task TestBasicStreamingRecordBinaryView()
{
var options = DataStreamerOptions.Default with { BatchSize = 10 };
var data = Enumerable.Range(0, Count).Select(x => new IgniteTuple { ["id"] = 1L, ["name"] = "foo" }).ToList();
await TupleView.StreamDataAsync(data.ToAsyncEnumerable(), options);
}
2.跟踪失败的条目
如果数据流无法提交数据,它会保存所有失败的项目,并将其在DataStreamerException#failedItems
中返回,然后可以捕获异常以跟踪未发送到集群的条目:
java
RecordView<Tuple> view = defaultTable().recordView();
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<CustomData>()) {
streamerFut = view.<CustomData, String, Boolean>streamData(
publisher,
null,
item -> Tuple.create().set("id", item.getId()),
item -> item.serializeToString(),
null,
List.of(new DeploymentUnit("test", "1.0.0")),
"org.foo.bar.StreamReceiver",
"receiverArg1")
.exceptionally(e -> {
// Handle entries that failed during background streaming
System.out.println(((DataStreamerException)e.getCause()).failedItems())
}
);
publisher.submit(new CustomData(1, "x"));
}
catch (DataStreamerException e){
Set<?> failedItems = e.failedItems()
// Handle entries that failed during submission
System.out.println(failedItems)
}
3.配置数据流的属性
所有数据流的参数都可以使用DataStreamerOptions
对象进行配置,例如下面的代码将数据流设置为有 3 次重试:
java
RecordView<Tuple> view = client.tables().table("Person").recordView();
var publisher = new SubmissionPublisher<Tuple>();
var options = DataStreamerOptions.builder()
.retryLimit(3)
.build();
streamerFut = view.streamData(publisher, options);
3.1.优化内存使用
数据流可能需要大量内存才能有序地处理请求。根据实际环境的差异,可能需要增加或减少数据流预留的内存量。
对于集群中的每个节点,数据流保留的内存量等于batchSize
(默认为 1000 个条目)乘以perNodeParallelOperations
(默认为 4)。例如,具有默认参数且平均条目大小为 1KB 的 10 节点集群将预留 40MB 。
这些参数的修改方法如下:
java
RecordView<Tuple> view = client.tables().table("Person").recordView();
var publisher = new SubmissionPublisher<Tuple>();
var options = DataStreamerOptions.builder()
.batchSize(10000)
.perNodeParallelOperations(10)
.build();
streamerFut = view.streamData(publisher, options);
此外,数据流会定期刷新缓冲区,以避免消息长时间停留(特定缓冲区可能会缓慢填满或根本不会完全填满,具体取决于数据分布)。这是通过autoFlushInterval
属性配置的(默认为 5000ms)。
18624049226