执行事务
Ignite 3 中的所有查询都是事务性的。可以将显式事务作为任何表和 SQL API 调用的第一个参数提供。如果未提供显式事务,则将为每个调用创建一个隐式事务。
1.事务生命周期
创建事务时,发起事务的节点将作为事务协调器
,协调器查找所需的主分区,并将读写请求发送到包含主分区的节点。为了事务正常执行,集群中的所有节点必须具有相似的时间,不能超过schemaSync.maxClockSkew
。
如果数据未被其他事务锁定,则节点将获取相关数据的锁,并尝试在事务中修改数据。操作完成后锁被释放。这样多个事务可以在同一个分区上工作,同时更改不同的数据。此外,某些操作可能会提前对数据进行短期锁定,以确保作正确进行。
如果事务中涉及的分区的主副本的节点失败,则事务最终会自动回滚,Ignite 将在提交尝试时返回TransactionException
。
2.事务隔离和并发
Ignite 中的所有读写事务会在第一次读取或写入访问期间获取锁,并在事务提交或回滚之前一直持有锁。所有读写事务都是序列化的,因此只要锁仍然存在,其他事务就无法更改锁定的数据,但是只读事务仍然可以读取数据。
2.1.死锁预防
Ignite 3 使用WAIT_DIE
死锁预防算法。当新的事务请求已被其他事务锁定的数据时,该事务将被取消,并使用相同的时间戳重试事务操作。如果事务较旧,则不会取消该事务并等待释放锁。
3.执行事务
以下是显式事务的代码示例:
KeyValueView<Long, Account> accounts =
table.keyValueView(Mapper.of(Long.class), Mapper.of(Account.class));
accounts.put(null, 42, new Account(16_000));
var tx = client.transactions().begin();
Account account = accounts.get(tx, 42);
account.balance += 500;
accounts.put(tx, 42, account);
assert accounts.get(tx, 42).balance == 16_500;
tx.rollback();
assert accounts.get(tx, 42).balance == 16_000;
var accounts = table.GetKeyValueView<long, Account>();
await accounts.PutAsync(transaction: null, 42, new Account(16_000));
await using ITransaction tx = await client.Transactions.BeginAsync();
(Account account, bool hasValue) = await accounts.GetAsync(tx, 42);
account = account with { Balance = account.Balance + 500 };
await accounts.PutAsync(tx, 42, account);
Debug.Assert((await accounts.GetAsync(tx, 42)).Value.Balance == 16_500);
await tx.RollbackAsync();
Debug.Assert((await accounts.GetAsync(null, 42)).Value.Balance == 16_000);
public record Account(decimal Balance);
auto accounts = table.get_key_value_view<account, account>();
account init_value(42, 16'000);
accounts.put(nullptr, {42}, init_value);
auto tx = client.get_transactions().begin();
std::optional<account> res_account = accounts.get(&tx, {42});
res_account->balance += 500;
accounts.put(&tx, {42}, res_account);
assert(accounts.get(&tx, {42})->balance == 16'500);
tx.rollback();
assert(accounts.get(&tx, {42})->balance == 16'000);
4.事务管理
还可以使用runInTransaction
管理事务,该 API 将自动完成如下任务:
- 事务将启动并替换为闭包;
- 如果闭包执行期间没有抛出异常,则提交该事务;
- 如果出现可恢复的错误,将重试事务,闭包必须是纯功能性代码,即不会引起副作用。
以下是将资金从一个账户转移到另一个账户并处理可能的透支的事务示例:
igniteTransactions.runInTransaction(tx -> {
CompletableFuture<Tuple> fut1 = view.getAsync(tx, Tuple.create().set("accountId", 1));
CompletableFuture<Tuple> fut2 = view.getAsync(tx, Tuple.create().set("accountId", 2)); // Read second balance concurrently
if (fut1.join().doubleValue("balance") - amount < 0) {
tx.rollback();
return;
}
view.upsert(tx, Tuple.create().set("accountId", 1).set("balance", fut1.join().doubleValue("balance") - amount));
view.upsert(tx, Tuple.create().set("accountId", 2).set("balance", fut2.join().doubleValue("balance") + amount);
});
5.只读事务
在发起事务时,也可以将事务配置为只读事务。在这些事务中无法执行数据修改,但它们也不持有锁,并且可以在非主分区上执行,从而进一步提高其性能。只读事务始终在发起时检查数据,即使新数据已写入数据库。
以下是进行只读事务的方法:
var tx = client.transactions().begin(new TransactionOptions().readOnly(true));
int balance = accounts.get(tx, 42).balance;
tx.commit();
提示
只读事务在特定时间读取数据,如果此后写入了新数据,则旧数据仍将存储在版本存储中,并且在低水位线之前可用。如果在事务期间达到低水位线,则数据将一直可用,直到事务结束。
18624049226