执行事务
Ignite 中的所有查询都是事务性的。可以将显式事务作为任何表和 SQL API 调用的第一个参数提供。如果未提供显式事务,则将为每个调用创建一个隐式事务。
1.事务生命周期
创建事务时,发起事务的节点将作为事务协调器,协调器查找所需的分区,并将读写请求发送到包含主分区的节点。为了事务正常执行,集群中的所有节点必须具有相似的时间,不能超过schemaSync.maxClockSkewMillis。
如果数据未被其他事务锁定,则节点将获取相关数据的锁,并尝试在事务中修改数据。操作完成后锁被释放。这样多个事务可以在同一个分区上工作,同时更改不同的数据。此外,某些操作可能会提前对数据进行短期锁定,以确保作正确进行。
如果事务中涉及的分区的主副本的节点故障,则事务最终会自动回滚,Ignite 将在提交尝试时返回TransactionException。
2.事务隔离和并发
Ignite 中的所有读写事务会在第一次读取或写入访问期间获取锁,并在事务提交或回滚之前一直持有锁。所有读写事务都是序列化的,因此只要锁仍然存在,其他事务就无法更改锁定的数据,但是只读事务仍然可以读取数据。
2.1.死锁预防
Ignite 使用WAIT_DIE死锁预防算法。当新的事务请求已被其他事务锁定的数据时,该事务将被取消,并使用相同的时间戳重试事务操作。如果事务较旧,则不会取消该事务并等待释放锁。
3.执行事务
以下是显式事务的代码示例:
KeyValueView<Long, Account> accounts =
table.keyValueView(Mapper.of(Long.class), Mapper.of(Account.class));
accounts.put(null, 42, new Account(16_000));
var tx = client.transactions().begin();
Account account = accounts.get(tx, 42);
account.balance += 500;
accounts.put(tx, 42, account);
assert accounts.get(tx, 42).balance == 16_500;
tx.rollback();
assert accounts.get(tx, 42).balance == 16_000;var accounts = table.GetKeyValueView<long, Account>();
await accounts.PutAsync(transaction: null, 42, new Account(16_000));
await using ITransaction tx = await client.Transactions.BeginAsync();
(Account account, bool hasValue) = await accounts.GetAsync(tx, 42);
account = account with { Balance = account.Balance + 500 };
await accounts.PutAsync(tx, 42, account);
Debug.Assert((await accounts.GetAsync(tx, 42)).Value.Balance == 16_500);
await tx.RollbackAsync();
Debug.Assert((await accounts.GetAsync(null, 42)).Value.Balance == 16_000);
public record Account(decimal Balance);auto accounts = table.get_key_value_view<account, account>();
account init_value(42, 16'000);
accounts.put(nullptr, {42}, init_value);
auto tx = client.get_transactions().begin();
std::optional<account> res_account = accounts.get(&tx, {42});
res_account->balance += 500;
accounts.put(&tx, {42}, res_account);
assert(accounts.get(&tx, {42})->balance == 16'500);
tx.rollback();
assert(accounts.get(&tx, {42})->balance == 16'000);4.事务管理
还可以使用runInTransaction方法管理事务,其会自动完成如下任务:
- 事务将启动并替换为闭包;
- 如果闭包执行期间没有抛出异常,则提交该事务;
- 如果出现可恢复的错误,将重试事务,闭包必须是纯功能性代码,即不会引起副作用。
事务可以以同步方式,也可以以异步方式执行。
下面是同步更新账户余额的示例:
client.transactions().runInTransaction(tx -> {
Account acct = accounts.get(tx, key);
if (acct != null) {
acct.balance += 200.0d;
}
accounts.put(tx, key, acct);
});下面是相同的逻辑,以异步的方式执行:
CompletableFuture<Void> future = client.transactions().runInTransactionAsync(tx ->
accounts.getAsync(tx, key)
.thenCompose(acct -> {
acct.balance += 300.0d;
return accounts.putAsync(tx, key, acct);
})
);
future.join();5.只读事务
在发起事务时,也可以将事务配置为只读事务。在这些事务中无法执行数据修改,但它们也不持有锁,并且可以在非主分区上执行,从而进一步提高其性能。只读事务始终在发起时检查数据,即使新数据已写入数据库。
以下是执行只读事务的方法:
var tx = client.transactions().begin(new TransactionOptions().readOnly(true));
int balance = accounts.get(tx, 42).balance;
tx.commit();await using var tx = await client.Transactions.BeginAsync(
new TransactionOptions { ReadOnly = true });
var account = await accounts.GetAsync(tx, 42);
int balance = account.Value.Balance;
await tx.CommitAsync();auto tx_opts = transaction_options()
.set_read_only(true);
auto tx = m_client.get_transactions().begin(tx_opts);
record_view.get(&tx, 42);
tx.commit();提示
只读事务在特定时间读取数据,如果此后写入了新数据,则旧数据仍将存储在版本存储中,并且在低水位线之前可用。如果在事务期间达到低水位线,则数据将一直可用,直到事务结束。
6.事务超时
有时如果事务花费的时间过长,最好放弃事务,当达到超时时,事务会自动回滚。
以下是配置事务超时的方法:
KeyValueView<Long, Account> accounts =
table.keyValueView(Mapper.of(Long.class), Mapper.of(Account.class));
var tx = client.transactions().begin(new TransactionOptions().timeoutMillis(10000));
accounts.put(tx, 42, account);
tx.commit();await using var tx = await Client.Transactions.BeginAsync(
new TransactionOptions { TimeoutMillis = 10_000 });
await accounts.PutAsync(tx, 42, account);
await tx.CommitAsync();auto accounts = table.get_key_value_view<account, account>();
auto tx_opts = transaction_options()
.set_timeout_millis(10000);
auto tx = m_client.get_transactions().begin(tx_opts);
record_view.insert(&tx, 42);
tx.commit();18624049226
