.NET客户端
Ignite 3 的客户端通过标准套接字连接接入集群。与 Ignite 2 不同,Ignite 3 中没有单独的胖和瘦客户端,只有瘦
客户端。
客户端不会成为集群拓扑的一部分,不保存任何数据,也不会用来承载计算任务。
1.入门
1.1.环境要求
运行 C# 瘦客户端需要 .NET 8.0 或更高版本。
1.2.安装
C# 客户端可通过 NuGet
安装,然后使用add package
命令将其导入项目中:
dotnet add package Ignite.Ignite --version 3.0.0
2.接入集群
首先要使用IgniteClient
初始化客户端:
var clientCfg = new IgniteClientConfiguration
{
Endpoints = { "127.0.0.1" }
};
using var client = await IgniteClient.StartAsync(clientCfg);
3.认证
身份认证信息的传递方式如下:
var cfg = new IgniteClientConfiguration("127.0.0.1:10800")
{
Authenticator = new BasicAuthenticator
{
Username = "myUser",
Password = "myPassword"
}
};
IIgniteClient client = await IgniteClient.StartAsync(cfg);
4.SQL API
Ignite 3 聚焦于 SQL,SQL API 是处理数据的主要方式。Ignite 支持的SQL语法,具体请参见SQL 参考章节的内容,以下是发送 SQL 请求的方法:
IResultSet<IIgniteTuple> resultSet = await client.Sql.ExecuteAsync(transaction: null, "select name from tbl where id = ?", 42);
List<IIgniteTuple> rows = await resultSet.ToListAsync();
IIgniteTuple row = rows.Single();
Debug.Assert(row["name"] as string == "John Doe");
4.1.SQL脚本
SQL API 默认一次执行一个 SQL 语句,如果要执行一组 SQL 语句,需要使用executeScript()
方法,这些语句将按顺序执行:
string script =
"CREATE TABLE IF NOT EXISTS Person (id int primary key, city_id int, name varchar, age int, company varchar);" +
"INSERT INTO Person (1,3, 'John', 43, 'Sample')";
await Client.Sql.ExecuteScriptAsync(script);
提示
当第一页准备好返回时,每个语句的执行都被视为完成。因此在处理大型数据集时,SELECT 语句可能会受到同一脚本中后续语句的影响。
5.事务
Ignite 3 中的所有表操作都是事务性的,可以将显式事务作为任何 Table 和 SQL API 调用的第一个参数传入。如果未提供显式事务,则将为每个调用创建一个隐式事务。
以下是显式事务的使用方法:
public class Account
{
public long Id { get; set; }
public long Balance { get; set; }
[NotMapped]
public Guid UnmappedId { get; set; }
}
var accounts = table.GetKeyValueView<long, Account>();
await accounts.PutAsync(transaction: null, 42, new Account(16_000));
await using ITransaction tx = await client.Transactions.BeginAsync();
(Account account, bool hasValue) = await accounts.GetAsync(tx, 42);
account = account with { Balance = account.Balance + 500 };
await accounts.PutAsync(tx, 42, account);
Debug.Assert((await accounts.GetAsync(tx, 42)).Value.Balance == 16_500);
await tx.RollbackAsync();
Debug.Assert((await accounts.GetAsync(null, 42)).Value.Balance == 16_000);
public record Account(decimal Balance);
6.Table API
要操作某个表,需要获取该表的特定视图并调用其方法,只能使用 SQL API 创建表。
使用表时,可以使用内置的 Tuple 类型,这是一组底层的键值对,或者将数据映射到自己定义的业务类型以实现强类型访问。
6.1.获取Table实例
使用IgniteTables.table(String)
方法可以获取表的实例,还可以使用IgniteTables.tables()
方法列出所有现有表。
var existingTables = await Client.Tables.GetTablesAsync();
var firstTable = existingTables[0];
var myTable = await Client.Tables.GetTableAsync("MY_TABLE");
6.2.Table的基本操作
拿到Table
实例后,就需要选择一个视图来确定如何操作数据。
6.2.1.二进制记录视图
该视图可用于直接操作表元组。
IRecordView<IIgniteTuple> view = table.RecordBinaryView;
IIgniteTuple fullRecord = new IgniteTuple
{
["id"] = 42,
["name"] = "John Doe"
};
await view.UpsertAsync(transaction: null, fullRecord);
IIgniteTuple keyRecord = new IgniteTuple { ["id"] = 42 };
(IIgniteTuple value, bool hasValue) = await view.GetAsync(transaction: null, keyRecord);
Debug.Assert(hasValue);
Debug.Assert(value.FieldCount == 2);
Debug.Assert(value["id"] as int? == 42);
Debug.Assert(value["name"] as string == "John Doe");
6.2.2.记录视图
该视图映射到一个业务类型上,可以使用映射到表元组的用户对象来操作表。
var pocoView = table.GetRecordView<Poco>();
await pocoView.UpsertAsync(transaction: null, new Poco(42, "John Doe"));
var (value, hasValue) = await pocoView.GetAsync(transaction: null, new Poco(42));
Debug.Assert(hasValue);
Debug.Assert(value.Name == "John Doe");
public record Poco(long Id, string? Name = null);
6.2.3.二进制键-值视图
该视图可以分别使用键和值元组来操作表。
IKeyValueView<IIgniteTuple, IIgniteTuple> kvView = table.KeyValueBinaryView;
IIgniteTuple key = new IgniteTuple { ["id"] = 42 };
IIgniteTuple val = new IgniteTuple { ["name"] = "John Doe" };
await kvView.PutAsync(transaction: null, key, val);
(IIgniteTuple? value, bool hasValue) = await kvView.GetAsync(transaction: null, key);
Debug.Assert(hasValue);
Debug.Assert(value.FieldCount == 1);
Debug.Assert(value["name"] as string == "John Doe");
6.2.4.键-值视图
该视图映射到一个业务类型上,可以使用映射到表元组的键对象和值对象来操作表。
IKeyValueView<long, Poco> kvView = table.GetKeyValueView<long, Poco>();
await kvView.PutAsync(transaction: null, 42, new Poco(Id: 0, Name: "John Doe"));
(Poco? value, bool hasValue) = await kvView.GetAsync(transaction: null, 42);
Debug.Assert(hasValue);
Debug.Assert(value.Name == "John Doe");
public record Poco(long Id, string? Name = null);
7.数据流
要流式注入大量数据,需要使用数据流机制。数据流提供了一种更快、更高效的方式来加载、组织和优化分发数据,数据流在所有表视图中都可用。 数据流提供至少一次传输保证。
7.1.使用数据流 API
public async Task TestBasicStreamingRecordBinaryView()
{
var options = DataStreamerOptions.Default with { BatchSize = 10 };
var data = Enumerable.Range(0, Count).Select(x => new IgniteTuple { ["id"] = 1L, ["name"] = "foo" }).ToList();
await TupleView.StreamDataAsync(data.ToAsyncEnumerable(), options);
}
8.客户端指标
.NET 客户端的指标使用System.Diagnostics.Metrics
API通过名为Apache.Ignite
的计量暴露。例如下面是用 dotnet-counters 工具访问 Ignite 指标的方法:
dotnet-counters monitor --counters Apache.Ignite,System.Runtime --process-id PID
还可以通过创建监听器在代码中获取指标:
var listener = new MeterListener();
listener.InstrumentPublished = (instrument, meterListener) =>
{
if (instrument.Meter.Name == "Apache.Ignite")
{
meterListener.EnableMeasurementEvents(instrument);
}
};
listener.SetMeasurementEventCallback<int>(
(instrument, measurement, tags, state) => Console.WriteLine($"{instrument.Name}: {measurement}"));
listener.Start();
8.1.可用的.NET指标
指标名 | 描述 |
---|---|
connections-active | 当前的在线连接数 |
connections-established | 已建立的连接数 |
connections-lost | 丢失的连接数 |
connections-lost-timeout | 由于超时而丢失的连接数 |
handshakes-failed | 握手失败的次数 |
handshakes-failed-timeout | 由于超时而失败的握手次数 |
requests-active | 当前的线上请求数 |
requests-sent | 发送的请求数 |
requests-completed | 已完成的请求数。收到响应后,请求即完成 |
requests-retried | 请求重试次数 |
requests-failed | 失败的请求数 |
bytes-sent | 发送的字节数 |
bytes-received | 接收的字节数 |
streamer-batches-sent | 发送的数据流批次数 |
streamer-items-sent | 发送的数据流项目数 |
streamer-batches-active | 当前线上的数据流批次数 |
streamer-items-queued | 当前正在排队的数据流项目数 |
18624049226