Skip to content

数据流

数据流提供了一种更快、更高效的方式来加载、组织和分发数据。数据流在所有表视图中都可用。

数据流

数据流提供至少一次传输保证。

1.使用数据流 API

数据流 API 使用发布者-订阅者模型,可以快速可靠地将大量数据加载到集群中,创建一个发布者之后可以将数据条目流式写入表视图,系统会在集群中分发这些条目。可以通过DataStreamerOptions对象配置数据的处理方式,包括批次大小、自动刷新间隔、重试限制等。

1.1.配置数据流处理器

配置DataStreamerOptions的参数可以对数据流的处理方式进行微调:

java
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1000)
.perPartitionParallelOperations(1)
.autoFlushInterval(1000)
.retryLimit(16)
.build();
csharp
var options = new DataStreamerOptions
{
    PageSize = 1000,
    RetryLimit = 8,
    AutoFlushInterval = TimeSpan.FromSeconds(3)
};
  • pageSize:指定在每页或数据块中处理的条目数;
  • perPartitionParallelOperations:指定每个分区上允许的并行操作数;
  • autoFlushInterval:指定系统自动刷新未完成缓冲区的时间间隔(毫秒);
  • retryLimit:指定失败的数据提交的最大重试次数。

1.2.流化数据

在将数据流式写入集群时,需将每个条目包装在DataStreamerItem<T>类的实例中,该包装器可以对数据执行PUTREMOVE操作:

  • DataStreamerItem.of(entry)用于插入新条目;
  • DataStreamerItem.removed(entry)用于删除现有的条目。

然后包装的数据就可以传给发布者并流式写入表。

下面的示例演示了如何使用 RecordView,创建发布者、配置数据流处理器、将记录插入到accounts表中,然后将其删除:

java
public class RecordViewPojoDataStreamerExample {
    private static final int ACCOUNTS_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        /**
         * Assuming the 'accounts' table already exists.
         */
        try (IgniteClient client = IgniteClient.builder()
                .addresses("127.0.0.1:10800")
                .build()) {
            RecordView<Account> view = client.tables().table("accounts").recordView(Account.class);

            streamAccountDataPut(view);
            streamAccountDataRemove(view);
        }
    }

    /**
     * Streaming data using DataStreamerOperationType#PUT operation type.
     */
    private static void streamAccountDataPut(RecordView<Account> view) {
        DataStreamerOptions options = DataStreamerOptions.builder()
                .pageSize(1000)
                .perPartitionParallelOperations(1)
                .autoFlushInterval(1000)
                .retryLimit(16)
                .build();

        CompletableFuture<Void> streamerFut;
        try (var publisher = new SubmissionPublisher<DataStreamerItem<Account>>()) {
            streamerFut = view.streamData(publisher, options);
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            for (int i = 0; i < ACCOUNTS_COUNT; i++) {
                Account entry = new Account(i, "name" + i, rnd.nextLong(100_000), rnd.nextBoolean());
                publisher.submit(DataStreamerItem.of(entry));
            }
        }
        streamerFut.join();
    }

    /**
     * Streaming data using DataStreamerOperationType#REMOVE operation type
     */
    private static void streamAccountDataRemove(RecordView<Account> view) {
        DataStreamerOptions options = DataStreamerOptions.builder()
                .pageSize(1000)
                .perPartitionParallelOperations(1)
                .autoFlushInterval(1000)
                .retryLimit(16)
                .build();

        CompletableFuture<Void> streamerFut;
        try (var publisher = new SubmissionPublisher<DataStreamerItem<Account>>()) {
            streamerFut = view.streamData(publisher, options);
            for (int i = 0; i < ACCOUNTS_COUNT; i++) {
                Account entry = new Account(i);
                publisher.submit(DataStreamerItem.removed(entry));
            }
        }
        streamerFut.join();
    }
}
csharp
using Apache.Ignite;
using Apache.Ignite.Table;

using var client = await IgniteClient.StartAsync(new("localhost"));
ITable? table = await client.Tables.GetTableAsync("accounts");
IRecordView<Account> view = table!.GetRecordView<Account>();

var options = new DataStreamerOptions
{
    PageSize = 10_000,
    AutoFlushInterval = TimeSpan.FromSeconds(1),
    RetryLimit = 32
};

await view.StreamDataAsync(GetAccountsToAdd(5_000), options);
await view.StreamDataAsync(GetAccountsToRemove(1_000), options);

async IAsyncEnumerable<DataStreamerItem<Account>> GetAccountsToAdd(int count)
{
    for (int i = 0; i < count; i++)
    {
        yield return DataStreamerItem.Create(
            new Account(i, $"Account {i}"));
    }
}

async IAsyncEnumerable<DataStreamerItem<Account>> GetAccountsToRemove(int count)
{
    for (int i = 0; i < count; i++)
    {
        yield return DataStreamerItem.Create(
            new Account(i, string.Empty), DataStreamerOperationType.Remove);
    }
}

public record Account(int Id, string Name);

1.3.使用流接收器

Ignite 流处理 API 可以通过创建接收器来定义服务端的数据处理逻辑,从而支持高级流式处理方案。当需要处理或转换服务端上的数据、从单个数据流更新多个表或处理与表模式不匹配的传入数据时,需要使用接收器。

使用接收器,可以以任何格式流式写入数据,因为它与模式无关,接收者还可以通过 DataStreamerReceiverContext 访问完整的 Ignite API。

数据流处理器仅在分区缓冲区有空间时通过请求项来控制数据流。DataStreamerOptions.perPartitionParallelOperations控制每个分区可以分配多少缓冲区。当缓冲区已满时,除非处理一些数据,否则流处理器将停止处理更多的数据。此外如果指定了resultSubscriber,它会在流处理器上施加背压。如果订阅者在消费结果方面速度较慢,流处理器会相应地降低其对发布者的请求率。

要使用接收器,需要实现 DataStreamerReceiver 接口。接收方的receive方法处理流式写入服务端的每批项目,因此可以根据需要应用自定义逻辑并返回结果:

java
@Nullable CompletableFuture<List<R>> receive(
List<T> page,
DataStreamerReceiverContext ctx,
@Nullable A arg);
csharp
ValueTask<IList<TResult>?> ReceiveAsync(
IList<TItem> page,
TArg arg,
IDataStreamerReceiverContext context,
CancellationToken cancellationToken);
  • page:当前要处理的数据项批次数量;
  • ctx:接收方上下文,可以与 Ignite API 进行交互;
  • arg:一个可选参数,可用于将自定义参数传递给接收方逻辑。

1.4.示例

1.4.1.更新多个表

以下示例演示如何实现一个接收器,处理包含客户和地址信息的数据,并更新服务端上的两个单独表:

  1. 首先,创建自定义接收器,从数据源中提取数据并将其写入两个单独的表中:customersaddresses
java
private static class TwoTableReceiver implements DataStreamerReceiver<Tuple, Void, Void> {
@Override
public @Nullable CompletableFuture<List<Void>> receive(List<Tuple> page, DataStreamerReceiverContext ctx, @Nullable Void arg) {
// List<Tuple> is the source data. Those tuples do not conform to any table and can have arbitrary data.

            RecordView<Tuple> customersTable = ctx.ignite().tables().table("customers").recordView();
            RecordView<Tuple> addressesTable = ctx.ignite().tables().table("addresses").recordView();

            for (Tuple sourceItem : page) {
               // For each source item, receiver extracts customer and address data and upserts it into respective tables.
               Tuple customer = Tuple.create()
                        .set("id", sourceItem.intValue("customerId"))
                        .set("name", sourceItem.stringValue("customerName"))
                        .set("addressId", sourceItem.intValue("addressId"));

               Tuple address = Tuple.create()
                        .set("id", sourceItem.intValue("addressId"))
                        .set("street", sourceItem.stringValue("street"))
                        .set("city", sourceItem.stringValue("city"));

               customersTable.upsert(null, customer);
               addressesTable.upsert(null, address);
            }

            return null;
      }
   }
csharp
class TwoTableReceiver : IDataStreamerReceiver<IIgniteTuple, object?, object>
{
   public async ValueTask<IList<object>?> ReceiveAsync(
      IList<IIgniteTuple> page,
      object? arg,
      IDataStreamerReceiverContext context,
      CancellationToken cancellationToken)
   {
      IRecordView<IIgniteTuple> customerTable = (await context.Ignite.Tables.GetTableAsync("customers"))!.RecordBinaryView;
      IRecordView<IIgniteTuple> addressesTable = (await context.Ignite.Tables.GetTableAsync("addresses"))!.RecordBinaryView;

      foreach (IIgniteTuple sourceItem in page)
      {
            // For each source item, the receiver extracts customer and address data and upserts it into respective tables.
            var customer = new IgniteTuple
            {
               ["id"] = sourceItem["customerId"],
               ["name"] = sourceItem["customerName"],
               ["addressId"] = sourceItem["addressId"]
            };

            var address = new IgniteTuple
            {
               ["id"] = sourceItem["addressId"],
               ["street"] = sourceItem["street"],
               ["city"] = sourceItem["city"],
            };

            await customerTable.UpsertAsync(null, customer);
            await addressesTable.UpsertAsync(null, address);
      }

      return null;
   }
}
  1. 创建一个引用接收器类的描述符。稍后写入数据时将传递给SubmissionPublisher
java
DataStreamerReceiverDescriptor<Tuple, Void, Void> desc = DataStreamerReceiverDescriptor
.builder(TwoTableReceiver.class)
.build();
csharp
ReceiverDescriptor<IIgniteTuple, object?, object> desc = ReceiverDescriptor.Of(new TwoTableReceiver());
  1. 接下来,获取目标表并对数据进行分区,然后流式写入数据。在本例中,按customerId进行分区,以确保接收器与客户数据的并置,从而实现本地写入。然后定义如何从数据源中提取主键和值,并使用SubmissionPublisher流式写入。
java
// Example source data
List<Tuple> sourceData = IntStream.range(1, 10)
.mapToObj(i -> Tuple.create()
.set("customerId", i)
.set("customerName", "Customer " + i)
.set("addressId", i)
.set("street", "Street " + i)
.set("city", "City " + i))
.collect(Collectors.toList());

CompletableFuture<Void> streamerFut;

RecordView<Tuple> customersTable = client.tables().table("customers").recordView();

// Extract the target table key from each source item; since the source has "customerId" but the target table uses "id", the function maps customerId to id accordingly.
Function<Tuple, Tuple> keyFunc = sourceItem -> Tuple.create().set("id", sourceItem.intValue("customerId"));

// Extract the data payload sent to the receiver. In this case, we use the entire source item as the payload.
Function<Tuple, Tuple> payloadFunc = Function.identity();

// Stream data using a publisher.
try (var publisher = new SubmissionPublisher<Tuple>()) {
   streamerFut = customersTable.streamData(
            publisher,
            desc,
            keyFunc,
            payloadFunc,
            null, // Optional receiver arguments
            null, // Result subscriber
            null // Options
   );

   for (Tuple item : sourceData) {
      publisher.submit(item);
   }
}

streamerFut.join();
csharp
IAsyncEnumerable<IIgniteTuple> sourceData = GetSourceData();

IRecordView<IIgniteTuple> customersTable = (await client.Tables.GetTableAsync("customers"))!.RecordBinaryView;

IAsyncEnumerable<object> streamerResults = customersTable.StreamDataAsync(
sourceData,
desc,
x => new IgniteTuple { ["id"] = x["customerId"] },
x => x,
null,
DataStreamerOptions.Default,
CancellationToken.None);

await foreach (object result in streamerResults)
{
// ...
}

static async IAsyncEnumerable<IIgniteTuple> GetSourceData()
{
await Task.Yield(); // Simulate async enumeration.

   for (int i = 0; i < 10; i++)
   {
      yield return new IgniteTuple
      {
            ["customerId"] = i,
            ["customerName"] = $"Customer {i}",
            ["addressId"] = i,
            ["street"] = $"Street {i}",
            ["city"] = $"City {i}"
      };
   }
}

1.4.2.分布式计算

还可以将流处理器与接收器一起使用来执行分布式计算,例如对返回的结果进行按项计算和MapReduce任务。

下面的示例演示了模拟欺诈检测过程,该过程通常涉及使用 ML 模型对每笔交易进行密集处理。

  1. 首先,创建一个自定义接收器,用于处理结果的欺诈检测计算:
java
private static class FraudDetectorReceiver implements DataStreamerReceiver<Tuple, Void, Tuple> {
@Override
public @Nullable CompletableFuture<List<Tuple>> receive(List<Tuple> page, DataStreamerReceiverContext ctx, @Nullable Void arg) {
List<Tuple> results = new ArrayList<>(page.size());

            for (Tuple tx : page) {
               results.add(detectFraud(tx));
            }

            return CompletableFuture.completedFuture(results);
      }

      private static Tuple detectFraud(Tuple txInfo) {
            // Simulate fraud detection processing.
            double fraudRisk = Math.random();

            // Add result to the tuple and return.
            return txInfo.set("fraudRisk", fraudRisk);
      }
   }
csharp
class FraudDetectorReceiver : IDataStreamerReceiver<IIgniteTuple, object?, IIgniteTuple>
{
   public async ValueTask<IList<IIgniteTuple>?> ReceiveAsync(
      IList<IIgniteTuple> page,
      object? arg,
      IDataStreamerReceiverContext context,
      CancellationToken cancellationToken)
   {
      var result = new List<IIgniteTuple>(page.Count);

      foreach (var tx in page)
      {
            IIgniteTuple resTuple = await DetectFraud(tx);
            result.Add(resTuple);
      }

      return result;
   }

   private static async Task<IIgniteTuple> DetectFraud(IIgniteTuple transaction)
   {
      // Simulate fraud detection logic - add a random risk score to the tuple.
      await Task.Delay(10);
      transaction["fraudRisk"] = Random.Shared.NextDouble();
      return transaction;
   }
}
  1. 接下来,使用虚拟表在集群中流式写入示例事务列表,该虚拟表按事务 ID 对数据进行分区并使用FraudDetectorReceiver进行欺诈检测。订阅结果以记录每个已处理的事务、处理错误并确认流式写入何时完成:
java
public void runReceiverStreamProcessing() {

 // Source data is a list of financial transactions.
 // We distribute this processing across the cluster, then gather and return results.
 List<Tuple> sourceData = IntStream.range(1, 10)
                     .mapToObj(i -> Tuple.create()
                     .set("txId", i)
                     .set("txData", "{some-json-data}"))
                     .collect(Collectors.toList());

     DataStreamerReceiverDescriptor<Tuple, Void, Tuple> desc = DataStreamerReceiverDescriptor
             .builder(FraudDetectorReceiver.class)
             .build();

     CompletableFuture<Void> streamerFut;

     // Streaming requires a target table to partition data.
     // Use a dummy table for this scenario, because we are not going to store any data.
     TableDefinition txDummyTableDef = TableDefinition.builder("tx_dummy")
             .columns(column("id", ColumnType.INTEGER))
             .primaryKey("id")
             .build();

     Table dummyTable = client.catalog().createTable(txDummyTableDef);

     // Source data has "txId" field, but target dummy table has "id" column, so keyFunc maps "txId" to "id".
     Function<Tuple, Tuple> keyFunc = sourceItem -> Tuple.create().set("id", sourceItem.value("txId"));

     // Payload function is used to extract the payload (data that goes to the receiver) from the source item.
     // In our case, we want to use the whole source item as the payload.
     Function<Tuple, Tuple> payloadFunc = Function.identity();

     Flow.Subscriber<Tuple> resultSubscriber = new Flow.Subscriber<>() {
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
             subscription.request(Long.MAX_VALUE);
         }

         @Override
         public void onNext(Tuple item) {
             System.out.println("Transaction processed: " + item);
         }

         @Override
         public void onError(Throwable throwable) {
             System.err.println("Error during streaming: " + throwable.getMessage());
         }

         @Override
         public void onComplete() {
             System.out.println("Streaming completed.");
         }
     };

     try (var publisher = new SubmissionPublisher<Tuple>()) {
         streamerFut = dummyTable.recordView().streamData(
                 publisher,
                 desc,
                 keyFunc,
                 payloadFunc,
                 null, // Arg
                 resultSubscriber,
                 null // Options
         );

         for (Tuple item : sourceData) {
             publisher.submit(item);
         }
     }

     streamerFut.join();
 }
csharp
// Source data is a list of financial transactions.
// We want to distribute this processing across the cluster, then gather and return results
IAsyncEnumerable<IIgniteTuple> data = GetSourceData();

ReceiverDescriptor<IIgniteTuple, object?, IIgniteTuple> fraudDetectorReceiverDesc = ReceiverDescriptor.Of(new FraudDetectorReceiver());

// Streaming requires a target table to partition data.
// Use a dummy table for this scenario, because we are not going to store any data.
await client.Sql.ExecuteScriptAsync("CREATE TABLE IF NOT EXISTS TX_DUMMY (ID LONG)");

ITable dummyTable = await client.Tables.GetTableAsync("TX_DUMMY");

// Source data has "txId" field, but target dummy table has "id" column, so keyFunc maps "txId" to "id".
Func<IIgniteTuple, IIgniteTuple> keyFunc = tuple => new IgniteTuple { ["id"] = tuple["txId"] };

// Payload function is used to extract the payload (data that goes to the receiver) from the source item.
// In our case, we want to use the whole source item as the payload.
Func<IIgniteTuple, IIgniteTuple> payloadFunc = tuple => tuple;

IAsyncEnumerable<IIgniteTuple> results = dummyTable.RecordBinaryView.StreamDataAsync(
data,
fraudDetectorReceiverDesc,
keyFunc,
payloadFunc,
receiverArg: null);

await foreach (IIgniteTuple processedTx in results)
{
Console.WriteLine("Transaction processed: " + processedTx);
}

async IAsyncEnumerable<IIgniteTuple> GetSourceData()
{
await Task.Yield(); // Simulate async data source.

   for (int i = 0; i < 1000; i++)
   {
      yield return new IgniteTuple
      {
            ["txId"] = i,
            ["txData"] = "{some-json-data}"
      };
   }
}

1.4.3.在.NET中自定义编组器

在 .NET 中,可以通过实现 IMarshaller 接口来编写自定义的编组器。

例如,下面的代码演示了如何使用JsonMarshaller来序列化数据、参数和结果。

csharp
ITable? table = await client.Tables.GetTableAsync("my-table");

ReceiverDescriptor<MyData, MyArg, MyResult> receiverDesc = ReceiverDescriptor.Of(new MyReceiver());

IAsyncEnumerable<MyData> data = Enumerable
    .Range(1, 100)
    .Select(x => new MyData(x, $"Name {x}"))
    .ToAsyncEnumerable();

IAsyncEnumerable<MyResult> results = table!.RecordBinaryView.StreamDataAsync(
    data: data,
    receiver: receiverDesc,
    keySelector: dataItem => new IgniteTuple { ["id"] = dataItem.Id },
    payloadSelector: dataItem => dataItem,
    receiverArg: new MyArg("Some info"));

await foreach (MyResult result in results)
{
    Console.WriteLine(result);
}

public record MyData(int Id, string Name);

public record MyArg(string Info);

public record MyResult(MyData Data, MyArg Arg);

public class MyReceiver : IDataStreamerReceiver<MyData, MyArg, MyResult>
{
    public IMarshaller<MyData> PayloadMarshaller =>
        new JsonMarshaller<MyData>();

    public IMarshaller<MyArg> ArgumentMarshaller =>
        new JsonMarshaller<MyArg>();

    public IMarshaller<MyResult> ResultMarshaller =>
        new JsonMarshaller<MyResult>();

    public ValueTask<IList<MyResult>?> ReceiveAsync(IList<MyData> page, MyArg arg, IDataStreamerReceiverContext context, CancellationToken cancellationToken)
    {
        IList<MyResult> results = page
            .Select(data => new MyResult(data, arg))
            .ToList();

        return ValueTask.FromResult(results)!;
    }
}

2.跟踪失败的条目

如果数据流处理器无法处理某个条目,它会在DataStreamerException中收集失败的条目,然后可以捕获此异常并使用failedItems()方法访问失败的条目,如下例所示。

可以在后台流式处理期间捕获异步错误和立即提交错误:

java
RecordView<Account> view = client.tables().table("accounts").recordView(Account.class);

CompletableFuture<Void> streamerFut;

try (var publisher = new SubmissionPublisher<DataStreamerItem<Account>>()) {
streamerFut = view.streamData(publisher, options)
.exceptionally(e -> {
System.out.println("Failed items during background streaming: " +
((DataStreamerException)e.getCause()).failedItems());
return null;
});

    /** Trying to insert an account record. */
    Account entry = new Account(1, "Account name", rnd.nextLong(100_000), rnd.nextBoolean());
    publisher.submit(DataStreamerItem.of(entry));
} catch (DataStreamerException e) {
      /** Handle entries that failed during submission. */
      System.out.println("Failed items during submission: " + e.failedItems());
}

streamerFut.join();
csharp
ITable? table = await Client.Tables.GetTableAsync("my-table");
IRecordView<IIgniteTuple> view = table!.RecordBinaryView;
IList<IIgniteTuple> data = [new IgniteTuple { ["key"] = 1L, ["val"] = "v" }];

try
{
await view.StreamDataAsync(data.ToAsyncEnumerable());
}
catch (DataStreamerException e)
{
Console.WriteLine("Failed items: " + string.Join(",", e.FailedItems));
}

2.1.调整内存使用

数据流可能需要大量内存才能有序地处理请求。根据实际环境的差异,可能需要增加或减少数据流预留的内存量。

对于集群中的每个节点,数据流需预留的内存量等于pageSize(默认为 1000 个条目)乘以perNodeParallelOperations(默认为 1)。例如,具有默认参数且平均条目大小为 1KB 的 10 个分区的表将预留 10MB 。

可以在创建DataStreamerOptions对象时修改这些选项:

java
RecordView<Tuple> view = client.tables().table("accounts").recordView();
var publisher = new SubmissionPublisher<Tuple>();

var options = DataStreamerOptions.builder()
        .pageSize(10_000)
        .perPartitionParallelOperations(10)
        .build();

streamerFut = view.streamData(publisher, options);
csharp
// .NET streamer does not have a perPartitionParallelOperations option yet.
var options = new DataStreamerOptions
{
PageSize = 10_000
};

此外,数据流处理器会定期刷新不完整的缓冲区,以避免消息无限期停留,有时可能由于数据分布不均匀等原因,缓冲区可能填充缓慢或一直无法填满。

此行为由autoFlushInterval属性控制,默认值为 5000 毫秒。还可以配置retryLimit参数以定义失败提交的最大重试次数,默认值为 16。

18624049226