数据流
数据流提供了一种更快、更高效的方式来加载、组织和分发数据。数据流在所有表视图中都可用。

数据流提供至少一次传输保证。
1.使用数据流 API
数据流 API 使用发布者-订阅者模型,可以快速可靠地将大量数据加载到集群中,创建一个发布者之后可以将数据条目流式写入表视图,系统会在集群中分发这些条目。可以通过DataStreamerOptions对象配置数据的处理方式,包括批次大小、自动刷新间隔、重试限制等。
1.1.配置数据流处理器
配置DataStreamerOptions的参数可以对数据流的处理方式进行微调:
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1000)
.perPartitionParallelOperations(1)
.autoFlushInterval(1000)
.retryLimit(16)
.build();var options = new DataStreamerOptions
{
PageSize = 1000,
RetryLimit = 8,
AutoFlushInterval = TimeSpan.FromSeconds(3)
};pageSize:指定在每页或数据块中处理的条目数;perPartitionParallelOperations:指定每个分区上允许的并行操作数;autoFlushInterval:指定系统自动刷新未完成缓冲区的时间间隔(毫秒);retryLimit:指定失败的数据提交的最大重试次数。
1.2.流化数据
在将数据流式写入集群时,需将每个条目包装在DataStreamerItem<T>类的实例中,该包装器可以对数据执行PUT和REMOVE操作:
DataStreamerItem.of(entry)用于插入新条目;DataStreamerItem.removed(entry)用于删除现有的条目。
然后包装的数据就可以传给发布者并流式写入表。
下面的示例演示了如何使用 RecordView,创建发布者、配置数据流处理器、将记录插入到accounts表中,然后将其删除:
public class RecordViewPojoDataStreamerExample {
private static final int ACCOUNTS_COUNT = 1000;
public static void main(String[] args) throws Exception {
/**
* Assuming the 'accounts' table already exists.
*/
try (IgniteClient client = IgniteClient.builder()
.addresses("127.0.0.1:10800")
.build()) {
RecordView<Account> view = client.tables().table("accounts").recordView(Account.class);
streamAccountDataPut(view);
streamAccountDataRemove(view);
}
}
/**
* Streaming data using DataStreamerOperationType#PUT operation type.
*/
private static void streamAccountDataPut(RecordView<Account> view) {
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1000)
.perPartitionParallelOperations(1)
.autoFlushInterval(1000)
.retryLimit(16)
.build();
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<DataStreamerItem<Account>>()) {
streamerFut = view.streamData(publisher, options);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < ACCOUNTS_COUNT; i++) {
Account entry = new Account(i, "name" + i, rnd.nextLong(100_000), rnd.nextBoolean());
publisher.submit(DataStreamerItem.of(entry));
}
}
streamerFut.join();
}
/**
* Streaming data using DataStreamerOperationType#REMOVE operation type
*/
private static void streamAccountDataRemove(RecordView<Account> view) {
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1000)
.perPartitionParallelOperations(1)
.autoFlushInterval(1000)
.retryLimit(16)
.build();
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<DataStreamerItem<Account>>()) {
streamerFut = view.streamData(publisher, options);
for (int i = 0; i < ACCOUNTS_COUNT; i++) {
Account entry = new Account(i);
publisher.submit(DataStreamerItem.removed(entry));
}
}
streamerFut.join();
}
}using Apache.Ignite;
using Apache.Ignite.Table;
using var client = await IgniteClient.StartAsync(new("localhost"));
ITable? table = await client.Tables.GetTableAsync("accounts");
IRecordView<Account> view = table!.GetRecordView<Account>();
var options = new DataStreamerOptions
{
PageSize = 10_000,
AutoFlushInterval = TimeSpan.FromSeconds(1),
RetryLimit = 32
};
await view.StreamDataAsync(GetAccountsToAdd(5_000), options);
await view.StreamDataAsync(GetAccountsToRemove(1_000), options);
async IAsyncEnumerable<DataStreamerItem<Account>> GetAccountsToAdd(int count)
{
for (int i = 0; i < count; i++)
{
yield return DataStreamerItem.Create(
new Account(i, $"Account {i}"));
}
}
async IAsyncEnumerable<DataStreamerItem<Account>> GetAccountsToRemove(int count)
{
for (int i = 0; i < count; i++)
{
yield return DataStreamerItem.Create(
new Account(i, string.Empty), DataStreamerOperationType.Remove);
}
}
public record Account(int Id, string Name);1.3.使用流接收器
Ignite 流处理 API 可以通过创建接收器来定义服务端的数据处理逻辑,从而支持高级流式处理方案。当需要处理或转换服务端上的数据、从单个数据流更新多个表或处理与表模式不匹配的传入数据时,需要使用接收器。
使用接收器,可以以任何格式流式写入数据,因为它与模式无关,接收者还可以通过 DataStreamerReceiverContext 访问完整的 Ignite API。
数据流处理器仅在分区缓冲区有空间时通过请求项来控制数据流。DataStreamerOptions.perPartitionParallelOperations控制每个分区可以分配多少缓冲区。当缓冲区已满时,除非处理一些数据,否则流处理器将停止处理更多的数据。此外如果指定了resultSubscriber,它会在流处理器上施加背压。如果订阅者在消费结果方面速度较慢,流处理器会相应地降低其对发布者的请求率。
要使用接收器,需要实现 DataStreamerReceiver 接口。接收方的receive方法处理流式写入服务端的每批项目,因此可以根据需要应用自定义逻辑并返回结果:
@Nullable CompletableFuture<List<R>> receive(
List<T> page,
DataStreamerReceiverContext ctx,
@Nullable A arg);ValueTask<IList<TResult>?> ReceiveAsync(
IList<TItem> page,
TArg arg,
IDataStreamerReceiverContext context,
CancellationToken cancellationToken);page:当前要处理的数据项批次数量;ctx:接收方上下文,可以与 Ignite API 进行交互;arg:一个可选参数,可用于将自定义参数传递给接收方逻辑。
1.4.示例
1.4.1.更新多个表
以下示例演示如何实现一个接收器,处理包含客户和地址信息的数据,并更新服务端上的两个单独表:
- 首先,创建自定义接收器,从数据源中提取数据并将其写入两个单独的表中:
customer和saddresses。
private static class TwoTableReceiver implements DataStreamerReceiver<Tuple, Void, Void> {
@Override
public @Nullable CompletableFuture<List<Void>> receive(List<Tuple> page, DataStreamerReceiverContext ctx, @Nullable Void arg) {
// List<Tuple> is the source data. Those tuples do not conform to any table and can have arbitrary data.
RecordView<Tuple> customersTable = ctx.ignite().tables().table("customers").recordView();
RecordView<Tuple> addressesTable = ctx.ignite().tables().table("addresses").recordView();
for (Tuple sourceItem : page) {
// For each source item, receiver extracts customer and address data and upserts it into respective tables.
Tuple customer = Tuple.create()
.set("id", sourceItem.intValue("customerId"))
.set("name", sourceItem.stringValue("customerName"))
.set("addressId", sourceItem.intValue("addressId"));
Tuple address = Tuple.create()
.set("id", sourceItem.intValue("addressId"))
.set("street", sourceItem.stringValue("street"))
.set("city", sourceItem.stringValue("city"));
customersTable.upsert(null, customer);
addressesTable.upsert(null, address);
}
return null;
}
}class TwoTableReceiver : IDataStreamerReceiver<IIgniteTuple, object?, object>
{
public async ValueTask<IList<object>?> ReceiveAsync(
IList<IIgniteTuple> page,
object? arg,
IDataStreamerReceiverContext context,
CancellationToken cancellationToken)
{
IRecordView<IIgniteTuple> customerTable = (await context.Ignite.Tables.GetTableAsync("customers"))!.RecordBinaryView;
IRecordView<IIgniteTuple> addressesTable = (await context.Ignite.Tables.GetTableAsync("addresses"))!.RecordBinaryView;
foreach (IIgniteTuple sourceItem in page)
{
// For each source item, the receiver extracts customer and address data and upserts it into respective tables.
var customer = new IgniteTuple
{
["id"] = sourceItem["customerId"],
["name"] = sourceItem["customerName"],
["addressId"] = sourceItem["addressId"]
};
var address = new IgniteTuple
{
["id"] = sourceItem["addressId"],
["street"] = sourceItem["street"],
["city"] = sourceItem["city"],
};
await customerTable.UpsertAsync(null, customer);
await addressesTable.UpsertAsync(null, address);
}
return null;
}
}- 创建一个引用接收器类的描述符。稍后写入数据时将传递给
SubmissionPublisher。
DataStreamerReceiverDescriptor<Tuple, Void, Void> desc = DataStreamerReceiverDescriptor
.builder(TwoTableReceiver.class)
.build();ReceiverDescriptor<IIgniteTuple, object?, object> desc = ReceiverDescriptor.Of(new TwoTableReceiver());- 接下来,获取目标表并对数据进行分区,然后流式写入数据。在本例中,按
customerId进行分区,以确保接收器与客户数据的并置,从而实现本地写入。然后定义如何从数据源中提取主键和值,并使用SubmissionPublisher流式写入。
// Example source data
List<Tuple> sourceData = IntStream.range(1, 10)
.mapToObj(i -> Tuple.create()
.set("customerId", i)
.set("customerName", "Customer " + i)
.set("addressId", i)
.set("street", "Street " + i)
.set("city", "City " + i))
.collect(Collectors.toList());
CompletableFuture<Void> streamerFut;
RecordView<Tuple> customersTable = client.tables().table("customers").recordView();
// Extract the target table key from each source item; since the source has "customerId" but the target table uses "id", the function maps customerId to id accordingly.
Function<Tuple, Tuple> keyFunc = sourceItem -> Tuple.create().set("id", sourceItem.intValue("customerId"));
// Extract the data payload sent to the receiver. In this case, we use the entire source item as the payload.
Function<Tuple, Tuple> payloadFunc = Function.identity();
// Stream data using a publisher.
try (var publisher = new SubmissionPublisher<Tuple>()) {
streamerFut = customersTable.streamData(
publisher,
desc,
keyFunc,
payloadFunc,
null, // Optional receiver arguments
null, // Result subscriber
null // Options
);
for (Tuple item : sourceData) {
publisher.submit(item);
}
}
streamerFut.join();IAsyncEnumerable<IIgniteTuple> sourceData = GetSourceData();
IRecordView<IIgniteTuple> customersTable = (await client.Tables.GetTableAsync("customers"))!.RecordBinaryView;
IAsyncEnumerable<object> streamerResults = customersTable.StreamDataAsync(
sourceData,
desc,
x => new IgniteTuple { ["id"] = x["customerId"] },
x => x,
null,
DataStreamerOptions.Default,
CancellationToken.None);
await foreach (object result in streamerResults)
{
// ...
}
static async IAsyncEnumerable<IIgniteTuple> GetSourceData()
{
await Task.Yield(); // Simulate async enumeration.
for (int i = 0; i < 10; i++)
{
yield return new IgniteTuple
{
["customerId"] = i,
["customerName"] = $"Customer {i}",
["addressId"] = i,
["street"] = $"Street {i}",
["city"] = $"City {i}"
};
}
}1.4.2.分布式计算
还可以将流处理器与接收器一起使用来执行分布式计算,例如对返回的结果进行按项计算和MapReduce任务。
下面的示例演示了模拟欺诈检测过程,该过程通常涉及使用 ML 模型对每笔交易进行密集处理。
- 首先,创建一个自定义接收器,用于处理结果的欺诈检测计算:
private static class FraudDetectorReceiver implements DataStreamerReceiver<Tuple, Void, Tuple> {
@Override
public @Nullable CompletableFuture<List<Tuple>> receive(List<Tuple> page, DataStreamerReceiverContext ctx, @Nullable Void arg) {
List<Tuple> results = new ArrayList<>(page.size());
for (Tuple tx : page) {
results.add(detectFraud(tx));
}
return CompletableFuture.completedFuture(results);
}
private static Tuple detectFraud(Tuple txInfo) {
// Simulate fraud detection processing.
double fraudRisk = Math.random();
// Add result to the tuple and return.
return txInfo.set("fraudRisk", fraudRisk);
}
}class FraudDetectorReceiver : IDataStreamerReceiver<IIgniteTuple, object?, IIgniteTuple>
{
public async ValueTask<IList<IIgniteTuple>?> ReceiveAsync(
IList<IIgniteTuple> page,
object? arg,
IDataStreamerReceiverContext context,
CancellationToken cancellationToken)
{
var result = new List<IIgniteTuple>(page.Count);
foreach (var tx in page)
{
IIgniteTuple resTuple = await DetectFraud(tx);
result.Add(resTuple);
}
return result;
}
private static async Task<IIgniteTuple> DetectFraud(IIgniteTuple transaction)
{
// Simulate fraud detection logic - add a random risk score to the tuple.
await Task.Delay(10);
transaction["fraudRisk"] = Random.Shared.NextDouble();
return transaction;
}
}- 接下来,使用虚拟表在集群中流式写入示例事务列表,该虚拟表按事务 ID 对数据进行分区并使用
FraudDetectorReceiver进行欺诈检测。订阅结果以记录每个已处理的事务、处理错误并确认流式写入何时完成:
public void runReceiverStreamProcessing() {
// Source data is a list of financial transactions.
// We distribute this processing across the cluster, then gather and return results.
List<Tuple> sourceData = IntStream.range(1, 10)
.mapToObj(i -> Tuple.create()
.set("txId", i)
.set("txData", "{some-json-data}"))
.collect(Collectors.toList());
DataStreamerReceiverDescriptor<Tuple, Void, Tuple> desc = DataStreamerReceiverDescriptor
.builder(FraudDetectorReceiver.class)
.build();
CompletableFuture<Void> streamerFut;
// Streaming requires a target table to partition data.
// Use a dummy table for this scenario, because we are not going to store any data.
TableDefinition txDummyTableDef = TableDefinition.builder("tx_dummy")
.columns(column("id", ColumnType.INTEGER))
.primaryKey("id")
.build();
Table dummyTable = client.catalog().createTable(txDummyTableDef);
// Source data has "txId" field, but target dummy table has "id" column, so keyFunc maps "txId" to "id".
Function<Tuple, Tuple> keyFunc = sourceItem -> Tuple.create().set("id", sourceItem.value("txId"));
// Payload function is used to extract the payload (data that goes to the receiver) from the source item.
// In our case, we want to use the whole source item as the payload.
Function<Tuple, Tuple> payloadFunc = Function.identity();
Flow.Subscriber<Tuple> resultSubscriber = new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Tuple item) {
System.out.println("Transaction processed: " + item);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error during streaming: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Streaming completed.");
}
};
try (var publisher = new SubmissionPublisher<Tuple>()) {
streamerFut = dummyTable.recordView().streamData(
publisher,
desc,
keyFunc,
payloadFunc,
null, // Arg
resultSubscriber,
null // Options
);
for (Tuple item : sourceData) {
publisher.submit(item);
}
}
streamerFut.join();
}// Source data is a list of financial transactions.
// We want to distribute this processing across the cluster, then gather and return results
IAsyncEnumerable<IIgniteTuple> data = GetSourceData();
ReceiverDescriptor<IIgniteTuple, object?, IIgniteTuple> fraudDetectorReceiverDesc = ReceiverDescriptor.Of(new FraudDetectorReceiver());
// Streaming requires a target table to partition data.
// Use a dummy table for this scenario, because we are not going to store any data.
await client.Sql.ExecuteScriptAsync("CREATE TABLE IF NOT EXISTS TX_DUMMY (ID LONG)");
ITable dummyTable = await client.Tables.GetTableAsync("TX_DUMMY");
// Source data has "txId" field, but target dummy table has "id" column, so keyFunc maps "txId" to "id".
Func<IIgniteTuple, IIgniteTuple> keyFunc = tuple => new IgniteTuple { ["id"] = tuple["txId"] };
// Payload function is used to extract the payload (data that goes to the receiver) from the source item.
// In our case, we want to use the whole source item as the payload.
Func<IIgniteTuple, IIgniteTuple> payloadFunc = tuple => tuple;
IAsyncEnumerable<IIgniteTuple> results = dummyTable.RecordBinaryView.StreamDataAsync(
data,
fraudDetectorReceiverDesc,
keyFunc,
payloadFunc,
receiverArg: null);
await foreach (IIgniteTuple processedTx in results)
{
Console.WriteLine("Transaction processed: " + processedTx);
}
async IAsyncEnumerable<IIgniteTuple> GetSourceData()
{
await Task.Yield(); // Simulate async data source.
for (int i = 0; i < 1000; i++)
{
yield return new IgniteTuple
{
["txId"] = i,
["txData"] = "{some-json-data}"
};
}
}1.4.3.在.NET中自定义编组器
在 .NET 中,可以通过实现 IMarshaller 接口来编写自定义的编组器。
例如,下面的代码演示了如何使用JsonMarshaller来序列化数据、参数和结果。
ITable? table = await client.Tables.GetTableAsync("my-table");
ReceiverDescriptor<MyData, MyArg, MyResult> receiverDesc = ReceiverDescriptor.Of(new MyReceiver());
IAsyncEnumerable<MyData> data = Enumerable
.Range(1, 100)
.Select(x => new MyData(x, $"Name {x}"))
.ToAsyncEnumerable();
IAsyncEnumerable<MyResult> results = table!.RecordBinaryView.StreamDataAsync(
data: data,
receiver: receiverDesc,
keySelector: dataItem => new IgniteTuple { ["id"] = dataItem.Id },
payloadSelector: dataItem => dataItem,
receiverArg: new MyArg("Some info"));
await foreach (MyResult result in results)
{
Console.WriteLine(result);
}
public record MyData(int Id, string Name);
public record MyArg(string Info);
public record MyResult(MyData Data, MyArg Arg);
public class MyReceiver : IDataStreamerReceiver<MyData, MyArg, MyResult>
{
public IMarshaller<MyData> PayloadMarshaller =>
new JsonMarshaller<MyData>();
public IMarshaller<MyArg> ArgumentMarshaller =>
new JsonMarshaller<MyArg>();
public IMarshaller<MyResult> ResultMarshaller =>
new JsonMarshaller<MyResult>();
public ValueTask<IList<MyResult>?> ReceiveAsync(IList<MyData> page, MyArg arg, IDataStreamerReceiverContext context, CancellationToken cancellationToken)
{
IList<MyResult> results = page
.Select(data => new MyResult(data, arg))
.ToList();
return ValueTask.FromResult(results)!;
}
}2.跟踪失败的条目
如果数据流处理器无法处理某个条目,它会在DataStreamerException中收集失败的条目,然后可以捕获此异常并使用failedItems()方法访问失败的条目,如下例所示。
可以在后台流式处理期间捕获异步错误和立即提交错误:
RecordView<Account> view = client.tables().table("accounts").recordView(Account.class);
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<DataStreamerItem<Account>>()) {
streamerFut = view.streamData(publisher, options)
.exceptionally(e -> {
System.out.println("Failed items during background streaming: " +
((DataStreamerException)e.getCause()).failedItems());
return null;
});
/** Trying to insert an account record. */
Account entry = new Account(1, "Account name", rnd.nextLong(100_000), rnd.nextBoolean());
publisher.submit(DataStreamerItem.of(entry));
} catch (DataStreamerException e) {
/** Handle entries that failed during submission. */
System.out.println("Failed items during submission: " + e.failedItems());
}
streamerFut.join();ITable? table = await Client.Tables.GetTableAsync("my-table");
IRecordView<IIgniteTuple> view = table!.RecordBinaryView;
IList<IIgniteTuple> data = [new IgniteTuple { ["key"] = 1L, ["val"] = "v" }];
try
{
await view.StreamDataAsync(data.ToAsyncEnumerable());
}
catch (DataStreamerException e)
{
Console.WriteLine("Failed items: " + string.Join(",", e.FailedItems));
}2.1.调整内存使用
数据流可能需要大量内存才能有序地处理请求。根据实际环境的差异,可能需要增加或减少数据流预留的内存量。
对于集群中的每个节点,数据流需预留的内存量等于pageSize(默认为 1000 个条目)乘以perNodeParallelOperations(默认为 1)。例如,具有默认参数且平均条目大小为 1KB 的 10 个分区的表将预留 10MB 。
可以在创建DataStreamerOptions对象时修改这些选项:
RecordView<Tuple> view = client.tables().table("accounts").recordView();
var publisher = new SubmissionPublisher<Tuple>();
var options = DataStreamerOptions.builder()
.pageSize(10_000)
.perPartitionParallelOperations(10)
.build();
streamerFut = view.streamData(publisher, options);// .NET streamer does not have a perPartitionParallelOperations option yet.
var options = new DataStreamerOptions
{
PageSize = 10_000
};此外,数据流处理器会定期刷新不完整的缓冲区,以避免消息无限期停留,有时可能由于数据分布不均匀等原因,缓冲区可能填充缓慢或一直无法填满。
此行为由autoFlushInterval属性控制,默认值为 5000 毫秒。还可以配置retryLimit参数以定义失败提交的最大重试次数,默认值为 16。
18624049226
