Skip to content

SQL处理

1.介绍

Ignite是一个兼容于ANSI-99、可水平扩展和容错的分布式SQL数据库,根据使用场景,数据在整个集群中是以分区或者复制的模式进行分发。

作为SQL数据库,Ignite支持所有DML命令,包括SELECT、UPDATE、INSERT和DELETE语句,并且还实现了与分布式系统相关的DDL命令的子集。

在外部工具和应用中通过使用JDBCODBC驱动,可以像处理任何其他支持SQL的存储一样与Ignite交互。Java、.NET和C++开发者还可以使用原生SQL API

在内部,SQL表具有与键-值缓存相同的数据结构,这意味着可以更改数据的分区分布,并利用关联并置技术获得更好的性能。

Ignite的SQL引擎使用H2数据库来解析和优化查询并生成执行计划。

1.1.分布式查询

分区表的查询以分布式方式执行:

  • 对该查询进行解析,并分为多个映射查询和一个汇总查询;
  • 所有映射查询都在数据所在的所有节点上执行;
  • 所有节点都向查询发起方提供本地执行的结果集,查询发起方会将各个结果集汇总为最终结果。

也可以强制查询在本地进行处理,即在执行查询的节点上的数据子集上执行。

1.2.本地查询

如果在复制表上执行查询,将会在本地数据上执行。

2.理解模式

2.1.概述

Ignite具有若干默认模式,并支持创建自定义模式。

默认两个模式可用:

  • SYS模式:其中包含许多和集群各种信息有关的系统视图,不能在此模式中创建表,更多信息请参见系统视图章节的介绍;
  • PUBLIC模式:未指定模式时的默认模式。

在以下场景中,可以创建自定义模式:

  • 可以在集群配置中指定自定义模式,请参见自定义模式
  • Ignite为通过编程接口或XML配置创建的每个缓存创建一个模式,具体请参见缓存和模式名

2.2.PUBLIC模式

如果需要且未指定模式时,默认会使用PUBLIC模式。例如,当通过JDBC接入集群而未显式设置模式时,就会使用PUBLIC模式。

2.3.自定义模式

可以通过IgniteConfigurationsqlSchemas属性设置自定义模式,启动集群之前在配置中指定模式列表,然后在运行时就可以在这些模式中创建对象。

下面是带有两个自定义模式的配置示例:

xml
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="sqlConfiguration">
        <bean class="org.apache.ignite.configuration.SqlConfiguration">
            <property name="sqlSchemas">
                <list>
                    <value>MY_SCHEMA</value>
                    <value>MY_SECOND_SCHEMA</value>
                </list>
            </property>
        </bean>
    </property>
</bean>
java
IgniteConfiguration cfg = new IgniteConfiguration();

SqlConfiguration sqlCfg = new SqlConfiguration();

sqlCfg.setSqlSchemas("MY_SCHEMA", "MY_SECOND_SCHEMA" );

cfg.setSqlConfiguration(sqlCfg);
csharp
var cfg = new IgniteConfiguration
{
    SqlSchemas = new[]
    {
        "MY_SCHEMA",
        "MY_SECOND_SCHEMA"
    }
};

要接入指定的模式,比如通过JDBC驱动,那么可以在连接串中指定模式名:

jdbc:ignite:thin://127.0.0.1/MY_SCHEMA

2.4.缓存和模式名

当创建带有可查询字段的缓存时,可以通过SQL API来对缓存的数据进行维护,在SQL层面,每个缓存对应一个独立的模式,模式的名字等同于缓存的名字。

简单来说,当通过SQL API创建了一个表,可以通过编程接口将其当做键-值缓存访问,而对应的缓存名,可以通过CREATE TABLE语句的WITH子句中的CACHE_NAME参数进行指定:

sql
CREATE TABLE City (
  ID INT(11),
  Name CHAR(35),
  CountryCode CHAR(3),
  District CHAR(20),
  Population INT(11),
  PRIMARY KEY (ID, CountryCode)
) WITH "backups=1, CACHE_NAME=City";

更多信息请参见CREATE TABLE章节的介绍。

如果未指定这个参数,缓存名定义为如下格式(大写格式):

SQL_<SCHEMA_NAME>_<TABLE_NAME>

3.定义索引

除了常规的DDL命令,比如CREATE/DROP INDEX,开发者还可以使用SQL API来定义索引。

提示

索引的功能是通过ignite-indexing模块提供的,所以如果通过Java代码启动Ignite,需要将这个模块加入类路径

Ignite会自动为每个缓存的主键和关联键字段创建索引,当在值对象的字段上创建索引时,Ignite会创建一个由索引字段和主键字段组成的组合索引。在SQL的角度,该索引由2列组成:索引列和主键列。

3.1.使用SQL创建索引

具体请参见CREATE INDEX章节的内容。

3.2.使用注解配置索引

索引和可查询字段,在代码上,可以通过@QuerySqlField注解进行配置。在下面的示例中,Ignite的SQL引擎会在idsalary字段上创建索引:

java
public class Person implements Serializable {
    /** Indexed field. Will be visible to the SQL engine. */
    @QuerySqlField(index = true)
    private long id;

    /** Queryable field. Will be visible to the SQL engine. */
    @QuerySqlField
    private String name;

    /** Will NOT be visible to the SQL engine. */
    private int age;

    /**
     * Indexed field sorted in descending order. Will be visible to the SQL engine.
     */
    @QuerySqlField(index = true, descending = true)
    private float salary;
}
csharp
class Person
{
    // Indexed field. Will be visible to the SQL engine.
    [QuerySqlField(IsIndexed = true)] public long Id;

    //Queryable field. Will be visible to the SQL engine
    [QuerySqlField] public string Name;

    //Will NOT be visible to the SQL engine.
    public int Age;

    /** Indexed field sorted in descending order.
      * Will be visible to the SQL engine. */
    [QuerySqlField(IsIndexed = true, IsDescending = true)]
    public float Salary;
}

SQL查询中,类型名会被用作表名,这时,表名为Person(使用的模式名和定义见模式章节的介绍)。

idsalary都是索引字段,id为升序排列,而salary为倒序排列。

如果不希望索引一个字段,但是希望在SQL查询中使用该列,那么该字段需要加上该注解,但是不需要index = true参数,这样的字段叫做可查询字段,在上例中,name定义为可查询字段

age字段既不是可查询字段,也不是一个索引字段,因此在SQL查询中是无法访问的。

定义索引字段后,还需要注册索引类型

运行时更新索引和可查询字段

如果希望运行时管理索引或者对象字段的可见性,需要使用CREATE/DROP INDEX命令。

3.2.1.索引嵌套对象

使用注解,对象的嵌套字段也可以被索引和查询。比如,考虑一个Person对象内部有一个Address对象:

java
public class Person {
    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField(index = true)
    private long id;

    /** Queryable field. Will be visible for SQL engine. */
    @QuerySqlField
    private String name;

    /** Will NOT be visible for SQL engine. */
    private int age;

    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField(index = true)
    private Address address;
}

Address类的结构如下:

java
public class Address {
    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField (index = true)
    private String street;

    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField(index = true)
    private int zip;
}

在上面的示例中,Address类的所有字段都加上了@QuerySqlField(index = true)注解,Person类的Address对象,也加上了该注解。

这样就可以执行下面的SQL语句:

java
QueryCursor<List<?>> cursor = personCache.query(new SqlFieldsQuery( "select * from Person where street = 'street1'"));

注意在SQL语句的WHERE条件中不需要指定address.street,这是因为Address类的字段会被合并到Person中,这样就可以简单地在查询中直接访问Address中的字段。

警告

如果在嵌套对象上创建了索引,就不能在这个表上执行UPDATE或者INSERT语句。

3.2.2.注册索引类型

定义索引和可查询字段之后,需要将它们及其所属的对象类型一起注册到SQL引擎中。

要指定应建立索引的类型,需要在CacheConfiguration.setIndexedTypes()方法中传递相应的键-值对,如下例所示:

java
// Preparing configuration.
CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>();

// Registering indexed type.
ccfg.setIndexedTypes(Long.class, Person.class);
csharp
var ccfg = new CacheConfiguration
{
    QueryEntities = new[]
    {
        new QueryEntity(typeof(long), typeof(Person))
    }
};

此方法仅接受成对的类型:一个键类,一个值类,基本类型需要用包装器类型传入。

预定义字段

除了用@QuerySqlField注解标注的所有字段,每个表都有两个特别的预定义字段:_key_val,它表示到整个键对象和值对象的引用。这很有用,比如当它们中的一个是基本类型并且希望用它的值进行过滤时,执行SELECT * FROM Person WHERE _key = 100查询即可。

注意

因为有二进制编组器,不需要将索引类型类加入集群节点的类路径中,SQL引擎不需要对象反序列化就可以钻取索引和可查询字段的值。

3.2.3.组合索引

当查询条件复杂时可以使用多字段索引来加快查询的速度,这时可以用@QuerySqlField.Group注解。如果希望一个字段参与多个组合索引时也可以将多个@QuerySqlField.Group注解加入orderedGroups中。

比如,下面的Person类中age字段加入了名为age_salary_idx的组合索引,它的分组序号是0并且降序排列,同一个组合索引中还有一个字段salary,它的分组序号是3并且升序排列。最重要的是salary字段还是一个单列索引(除了orderedGroups声明之外,还加上了index = true)。分组中的order不需要是什么特别的数值,它只是用于分组内的字段排序。

java
public class Person implements Serializable {
    /** Indexed in a group index with "salary". */
    @QuerySqlField(orderedGroups = { @QuerySqlField.Group(name = "age_salary_idx", order = 0, descending = true) })

    private int age;

    /** Indexed separately and in a group index with "age". */
    @QuerySqlField(index = true, orderedGroups = { @QuerySqlField.Group(name = "age_salary_idx", order = 3) })
    private double salary;
}
csharp
class Person
{
    [QuerySqlField(IndexGroups = new[] {"age_salary_idx"})]
    public int Age;

    [QuerySqlField(IsIndexed = true, IndexGroups = new[] {"age_salary_idx"})]
    public double Salary;
}

注意

@QuerySqlField.Group放在@QuerySqlField(orderedGroups={...})外面是无效的。

3.3.使用查询实体配置索引

索引和字段也可以通过org.apache.ignite.cache.QueryEntity进行配置,它便于利用Spring进行基于XML的配置。

在上面基于注解的配置中涉及的所有概念,对于基于QueryEntity的方式也都有效,此外,如果类型的字段通过@QuerySqlField进行了配置并且通过CacheConfiguration.setIndexedTypes注册过的,在内部也会被转换为查询实体。

下面的示例显示的是如何定义单一字段索引、组合索引和可查询字段:

xml
<bean class="org.apache.ignite.configuration.IgniteConfiguration" id="ignite.cfg">
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <property name="name" value="Person"/>
            <!-- Configure query entities -->
            <property name="queryEntities">
                <list>
                    <bean class="org.apache.ignite.cache.QueryEntity">
                        <!-- Setting  the type of the key -->
                        <property name="keyType" value="java.lang.Long"/>

                        <property name="keyFieldName" value="id"/>

                        <!-- Setting type of the value -->
                        <property name="valueType" value="org.apache.ignite.examples.Person"/>

                        <!-- Defining fields that will be either indexed or queryable.
                             Indexed fields are added to the 'indexes' list below.-->
                        <property name="fields">
                            <map>
                                <entry key="id" value="java.lang.Long"/>
                                <entry key="name" value="java.lang.String"/>
                                <entry key="salary" value="java.lang.Float "/>
                            </map>
                        </property>
                        <!-- Defining indexed fields.-->
                        <property name="indexes">
                            <list>
                                <!-- Single field (aka. column) index -->
                                <bean class="org.apache.ignite.cache.QueryIndex">
                                    <constructor-arg value="name"/>
                                </bean>
                                <!-- Group index. -->
                                <bean class="org.apache.ignite.cache.QueryIndex">
                                    <constructor-arg>
                                        <list>
                                            <value>id</value>
                                            <value>salary</value>
                                        </list>
                                    </constructor-arg>
                                    <constructor-arg value="SORTED"/>
                                </bean>
                            </list>
                        </property>
                    </bean>
                </list>
            </property>
        </bean>
    </property>
</bean>
java
CacheConfiguration<Long, Person> cache = new CacheConfiguration<Long, Person>("myCache");

QueryEntity queryEntity = new QueryEntity();

queryEntity.setKeyFieldName("id").setKeyType(Long.class.getName()).setValueType(Person.class.getName());

LinkedHashMap<String, String> fields = new LinkedHashMap<>();
fields.put("id", "java.lang.Long");
fields.put("name", "java.lang.String");
fields.put("salary", "java.lang.Long");

queryEntity.setFields(fields);

queryEntity.setIndexes(Arrays.asList(new QueryIndex("name"),
        new QueryIndex(Arrays.asList("id", "salary"), QueryIndexType.SORTED)));

cache.setQueryEntities(Arrays.asList(queryEntity));
csharp
var cacheCfg = new CacheConfiguration
{
    Name = "myCache",
    QueryEntities = new[]
    {
        new QueryEntity
        {
            KeyType = typeof(long),
            KeyFieldName = "id",
            ValueType = typeof(dotnet_helloworld.Person),
            Fields = new[]
            {
                new QueryField
                {
                    Name = "id",
                    FieldType = typeof(long)
                },
                new QueryField
                {
                    Name = "name",
                    FieldType = typeof(string)
                },
                new QueryField
                {
                    Name = "salary",
                    FieldType = typeof(long)
                },
            },
            Indexes = new[]
            {
                new QueryIndex("name"),
                new QueryIndex(false, QueryIndexType.Sorted, new[] {"id", "salary"})
            }
        }
    }
};
Ignition.Start(new IgniteConfiguration
{
    CacheConfiguration = new[] {cacheCfg}
});

SQL查询中会使用valueType的简称作为表名,这时,表名为Person(模式名的用法和定义请参见理解模式章节的内容)。

QueryEntity定义之后,就可以执行下面的查询了:

java
SqlFieldsQuery qry = new SqlFieldsQuery("SELECT id, name FROM Person" + "WHERE id > 1500 LIMIT 10");

运行时更新索引和可查询字段

如果希望运行时管理索引或者对象字段的可见性,需要使用CREATE/DROP INDEX命令。

3.4.配置索引内联值

正确的索引内联值有助于增加索引字段上的查询速度,关于如何选择正确的内联值,请参见增加索引内联值章节的介绍。

大多数情况下,只需要为可变长度字段的索引设置内联值,比如字符串或者数组,默认值是10。

可通过如下方式修改默认值:

  • 单独为每个索引配置内联值;
  • 通过CacheConfiguration.sqlIndexMaxInlineSize属性为缓存内的所有索引配置内联值;
  • 通过IGNITE_MAX_INDEX_PAYLOAD_SIZE系统属性为集群内的所有索引配置内联值。

配置将按照上面的顺序依次生效。

可以为每个索引单独配置内联值,这会覆盖默认值。如果要为开发者定义的索引设置内联值,可以用下面的方法之一,该值以字节数为单位。

注解方式

java
@QuerySqlField(index = true, inlineSize = 13)
private String country;
csharp
[QuerySqlField(IsIndexed = true, IndexInlineSize = 13)]
public string Country { get; set; }

QueryEntity方式

java
QueryIndex idx = new QueryIndex("country");
idx.setInlineSize(13);
queryEntity.setIndexes(Arrays.asList(idx));
csharp
var qe = new QueryEntity
{
    Indexes = new[]
    {
        new QueryIndex
        {
            InlineSize = 13
        }
    }
};

CREATE INDEX命令

如果使用的是CREATE INDEX命令,那么可以使用INLINE_SIZE选项来配置内联值:

sql
create index country_idx on Person (country) INLINE_SIZE 13;

3.5.自定义键

如果只使用预定义的SQL数据类型作为缓存键,那么就没必要对和DML相关的配置做额外的操作,这些数据类型在GridQueryProcessor#SQL_TYPES常量中进行定义,列举如下:

  • 所有的基本类型及其包装器,除了charCharacter
  • String;
  • BigDecimal;
  • byte[];
  • java.util.Date, java.sql.Date, java.sql.Timestamp;
  • java.util.UUID

不过如果决定引入复杂的自定义缓存键,那么在DML语句中要指向这些字段就需要:

  • QueryEntity中定义这些字段,与在值对象中配置字段一样;
  • 使用新的配置参数QueryEntitty.setKeyFields(..)来对键和值进行区分。

下面的例子展示了如何实现:

xml
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <property name="name" value="personCache"/>
            <!-- Configure query entities -->
            <property name="queryEntities">
                <list>
                    <bean class="org.apache.ignite.cache.QueryEntity">
                        <!-- Registering key's class. -->
                        <property name="keyType" value="CustomKey"/>
                        <!-- Registering value's class. -->
                        <property name="valueType" value="org.apache.ignite.examples.Person"/>
                        <!-- Defining all the fields that will be accessible from DML. -->
                        <property name="fields">
                            <map>
                                <entry key="firstName" value="java.lang.String"/>
                                <entry key="lastName" value="java.lang.String"/>
                                <entry key="intKeyField" value="java.lang.Integer"/>
                                <entry key="strKeyField" value="java.lang.String"/>
                            </map>
                        </property>
                        <!-- Defining the subset of key's fields -->
                        <property name="keyFields">
                            <set>
                                <value>intKeyField</value>
                                <value>strKeyField</value>
                            </set>
                        </property>
                    </bean>
                </list>
            </property>
        </bean>
    </property>
</bean>
java
// Preparing cache configuration.
CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<Long, Person>("personCache");

// Creating the query entity.
QueryEntity entity = new QueryEntity("CustomKey", "Person");

// Listing all the queryable fields.
LinkedHashMap<String, String> fields = new LinkedHashMap<>();

fields.put("intKeyField", Integer.class.getName());
fields.put("strKeyField", String.class.getName());

fields.put("firstName", String.class.getName());
fields.put("lastName", String.class.getName());

entity.setFields(fields);

// Listing a subset of the fields that belong to the key.
Set<String> keyFlds = new HashSet<>();

keyFlds.add("intKeyField");
keyFlds.add("strKeyField");

entity.setKeyFields(keyFlds);

// End of new settings, nothing else here is DML related

entity.setIndexes(Collections.<QueryIndex>emptyList());

cacheCfg.setQueryEntities(Collections.singletonList(entity));

ignite.createCache(cacheCfg);
csharp
var ccfg = new CacheConfiguration
{
    Name = "personCache",
    QueryEntities = new[]
    {
        new QueryEntity
        {
            KeyTypeName = "CustomKey",
            ValueTypeName = "Person",
            Fields = new[]
            {
                new QueryField
                {
                    Name = "intKeyField",
                    FieldType = typeof(int),
                    IsKeyField = true
                },
                new QueryField
                {
                    Name = "strKeyField",
                    FieldType = typeof(string),
                    IsKeyField = true
                },
                new QueryField
                {
                    Name = "firstName",
                    FieldType = typeof(string)
                },
                new QueryField
                {
                    Name = "lastName",
                    FieldType = typeof(string)
                }
            }
        },
    }
};

哈希值自动计算和equals实现

如果自定义键可以被序列化为二进制形式,那么Ignite会自动进行哈希值的计算并且实现equals方法。

但是,如果键类型是Externalizable类型,那么就无法序列化为二进制形式,那么就需要自行实现hashCodeequals方法,具体请参见使用二进制对象章节的介绍。

4.使用SQL API

除了使用JDBC驱动,Java开发者还可以使用Ignite的SQL API来访问和修改Ignite中存储的数据。

SqlFieldsQuery类是执行SQL查询和处理结果集的接口,SqlFieldsQuery通过IgniteCache.query(SqlFieldsQuery)方法执行,然后会返回一个游标。

4.1.配置可查询字段

如果希望使用SQL语句来查询缓存,需要定义值对象的哪些字段是可查询的,可查询字段是数据模型中SQL引擎可以处理的字段。

提示

如果使用JDBC或者SQL工具建表,则不需要定义可查询字段。

提示

索引的功能是通过ignite-indexing模块提供的,所以如果通过Java代码启动Ignite,需要将这个模块加入类路径

在Java中,可查询字段可以通过两种方式来定义:

  • 使用注解;
  • 通过查询实体定义。

4.1.1.@QuerySqlField注解

要让某个字段可查询,需要在值类定义的对应字段上加注@QuerySqlField注解,然后调用CacheConfiguration.setIndexedTypes(…​)方法。

java
class Person implements Serializable {
    /** Indexed field. Will be visible to the SQL engine. */
    @QuerySqlField(index = true)
    private long id;

    /** Queryable field. Will be visible to the SQL engine. */
    @QuerySqlField
    private String name;

    /** Will NOT be visible to the SQL engine. */
    private int age;

    /**
     * Indexed field sorted in descending order. Will be visible to the SQL engine.
     */
    @QuerySqlField(index = true, descending = true)
    private float salary;
}

public static void main(String[] args) {
    Ignite ignite = Ignition.start();
    CacheConfiguration<Long, Person> personCacheCfg = new CacheConfiguration<Long, Person>();
    personCacheCfg.setName("Person");

    personCacheCfg.setIndexedTypes(Long.class, Person.class);
    IgniteCache<Long, Person> cache = ignite.createCache(personCacheCfg);
}
csharp
class Person
{
    // Indexed field. Will be visible to the SQL engine.
    [QuerySqlField(IsIndexed = true)] public long Id;

    //Queryable field. Will be visible to the SQL engine
    [QuerySqlField] public string Name;

    //Will NOT be visible to the SQL engine.
    public int Age;

    /**
      * Indexed field sorted in descending order.
      * Will be visible to the SQL engine.
    */
    [QuerySqlField(IsIndexed = true, IsDescending = true)]
    public float Salary;
}

public static void SqlQueryFieldDemo()
{
    var cacheCfg = new CacheConfiguration
    {
        Name = "cacheName",
        QueryEntities = new[]
        {
            new QueryEntity(typeof(int), typeof(Person))
        }
    };

    var ignite = Ignition.Start();
    var cache = ignite.CreateCache<int, Person>(cacheCfg);
}

4.1.2.查询实体

可以通过QueryEntity类来定义可查询字段,查询实体可以通过XML来配置:

xml
<bean class="org.apache.ignite.configuration.IgniteConfiguration" id="ignite.cfg">
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <property name="name" value="Person"/>
            <!-- Configure query entities -->
            <property name="queryEntities">
                <list>
                    <bean class="org.apache.ignite.cache.QueryEntity">
                        <!-- Setting  the type of the key -->
                        <property name="keyType" value="java.lang.Long"/>

                        <property name="keyFieldName" value="id"/>

                        <!-- Setting type of the value -->
                        <property name="valueType" value="org.apache.ignite.examples.Person"/>

                        <!-- Defining fields that will be either indexed or queryable.
                             Indexed fields are added to the 'indexes' list below.-->
                        <property name="fields">
                            <map>
                                <entry key="id" value="java.lang.Long"/>
                                <entry key="name" value="java.lang.String"/>
                                <entry key="salary" value="java.lang.Float "/>
                            </map>
                        </property>
                        <!-- Defining indexed fields.-->
                        <property name="indexes">
                            <list>
                                <!-- Single field (aka. column) index -->
                                <bean class="org.apache.ignite.cache.QueryIndex">
                                    <constructor-arg value="name"/>
                                </bean>
                                <!-- Group index. -->
                                <bean class="org.apache.ignite.cache.QueryIndex">
                                    <constructor-arg>
                                        <list>
                                            <value>id</value>
                                            <value>salary</value>
                                        </list>
                                    </constructor-arg>
                                    <constructor-arg value="SORTED"/>
                                </bean>
                            </list>
                        </property>
                    </bean>
                </list>
            </property>
        </bean>
    </property>
</bean>
java
class Person implements Serializable {
    private long id;

    private String name;

    private int age;

    private float salary;
}

public static void main(String[] args) {
    Ignite ignite = Ignition.start();
    CacheConfiguration<Long, Person> personCacheCfg = new CacheConfiguration<Long, Person>();
    personCacheCfg.setName("Person");

    QueryEntity queryEntity = new QueryEntity(Long.class, Person.class)
            .addQueryField("id", Long.class.getName(), null).addQueryField("age", Integer.class.getName(), null)
            .addQueryField("salary", Float.class.getName(), null)
            .addQueryField("name", String.class.getName(), null);

    queryEntity.setIndexes(Arrays.asList(new QueryIndex("id"), new QueryIndex("salary", false)));

    personCacheCfg.setQueryEntities(Arrays.asList(queryEntity));

    IgniteCache<Long, Person> cache = ignite.createCache(personCacheCfg);
}
csharp
private class Person
{
    public long Id;

    public string Name;

    public int Age;

    public float Salary;
}

public static void QueryEntitiesDemo()
{
    var personCacheCfg = new CacheConfiguration
    {
        Name = "Person",
        QueryEntities = new[]
        {
            new QueryEntity
            {
                KeyType = typeof(long),
                ValueType = typeof(Person),
                Fields = new[]
                {
                    new QueryField("Id", typeof(long)),
                    new QueryField("Name", typeof(string)),
                    new QueryField("Age", typeof(int)),
                    new QueryField("Salary", typeof(float))
                },
                Indexes = new[]
                {
                    new QueryIndex("Id"),
                    new QueryIndex(true, "Salary"),
                }
            }
        }
    };
    var ignite = Ignition.Start();
    var personCache = ignite.CreateCache<int, Person>(personCacheCfg);
}

4.2.查询

要在缓存上执行查询,简单地创建一个SqlFieldsQuery对象,将查询字符串传给构造方法,然后执行cache.query(…​)即可。注意在下面的示例中,Person缓存必须配置为对SQL引擎可见

java
IgniteCache<Long, Person> cache = ignite.cache("Person");

SqlFieldsQuery sql = new SqlFieldsQuery(
        "select concat(firstName, ' ', lastName) from Person");

// Iterate over the result set.
try (QueryCursor<List<?>> cursor = cache.query(sql)) {
    for (List<?> row : cursor)
        System.out.println("personName=" + row.get(0));
}
csharp
var cache = ignite.GetCache<long, Person>("Person");

var sql = new SqlFieldsQuery("select concat(FirstName, ' ', LastName) from Person");

using (var cursor = cache.Query(sql))
{
    foreach (var row in cursor)
    {
        Console.WriteLine("personName=" + row[0]);
    }
}
cpp
Cache<int64_t, Person> cache = ignite.GetOrCreateCache<int64_t, Person>("Person");

// Iterate over the result set.
// SQL Fields Query can only be performed using fields that have been listed in "QueryEntity" been of the config!
QueryFieldsCursor cursor = cache.Query(SqlFieldsQuery("select concat(firstName, ' ', lastName) from Person"));
while (cursor.HasNext())
{
    std::cout << "personName=" << cursor.GetNext().GetNext<std::string>() << std::endl;
}

SqlFieldsQuery会返回一个游标,然后可以用游标来迭代匹配SQL查询的结果集。

4.2.1.本地执行

如果要强制一个查询在本地执行,可以使用SqlFieldsQuery.setLocal(true)方法。这时,查询是在执行查询的节点的本地数据上执行,这意味着查询的结果集是不完整的,所以使用这个模式前要了解这个限制。

4.2.2.WHERE子句的子查询

INSERTMERGE语句中的SELECT查询,以及由UPDATEDELETE操作生成的SELECT查询也是分布式的,可以以并置或非并置的模式执行。

但是,如果WHERE子句中有一个子查询,那么其只能以并置的方式执行。

比如,考虑下面的查询:

sql
DELETE FROM Person WHERE id IN
    (SELECT personId FROM Salary s WHERE s.amount > 2000);

SQL引擎会生成一个SELECT查询,来获取要删除的条目列表。该查询是分布式的,在整个集群中执行,大致如下:

sql
SELECT _key, _val FROM Person WHERE id IN
    (SELECT personId FROM Salary s WHERE s.amount > 2000);

但是,IN子句中的子查询(SELECT personId FROM Salary …​)并不是分布式的,只能在节点的本地可用数据集上执行。

4.3.插入、更新、删除和合并

使用SqlFieldsQuery可以执行DML命令来修改数据:

java
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(
        new SqlFieldsQuery("INSERT INTO Person(id, firstName, lastName) VALUES(?, ?, ?)")
                .setArgs(1L, "John", "Smith"))
        .getAll();
java
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(new SqlFieldsQuery("UPDATE Person set lastName = ? " + "WHERE id >= ?")
        .setArgs("Jones", 2L)).getAll();
java
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(new SqlFieldsQuery("DELETE FROM Person " + "WHERE id >= ?").setArgs(2L))
        .getAll();
java
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(new SqlFieldsQuery("MERGE INTO Person(id, firstName, lastName)"
        + " values (1, 'John', 'Smith'), (5, 'Mary', 'Jones')")).getAll();

当使用SqlFieldsQuery来执行DDL语句时,必须调用query(…​)方法返回的游标的getAll()方法。

4.4.指定模式

通过SqlFieldsQuery执行的任何SELECT语句,默认都是在PUBLIC模式下解析的。但是如果表不在这个模式下,需要调用SqlFieldsQuery.setSchema(…​)来指定模式,这样语句就在指定的模式下执行了。

java
SqlFieldsQuery sql = new SqlFieldsQuery("select name from City").setSchema("PERSON");
csharp
var sqlFieldsQuery = new SqlFieldsQuery("select name from City") {Schema = "PERSON"};
cpp
// SQL Fields Query can only be performed using fields that have been listed in "QueryEntity" been of the config!
SqlFieldsQuery sql = SqlFieldsQuery("select name from City");
sql.SetSchema("PERSON");

另外,也可以在语句中指定模式:

java
SqlFieldsQuery sql = new SqlFieldsQuery("select name from Person.City");

4.5.创建表

可以向SqlFieldsQuery传递任何受支持的DDL语句,如下所示:

java
IgniteCache<Long, Person> cache = ignite
        .getOrCreateCache(new CacheConfiguration<Long, Person>().setName("Person"));

// Creating City table.
cache.query(new SqlFieldsQuery(
        "CREATE TABLE City (id int primary key, name varchar, region varchar)")).getAll();
csharp
var cache = ignite.GetOrCreateCache<long, Person>(
    new CacheConfiguration
    {
        Name = "Person"
    }
);

//Creating City table
cache.Query(new SqlFieldsQuery("CREATE TABLE City (id int primary key, name varchar, region varchar)"));
cpp
Cache<int64_t, Person> cache = ignite.GetOrCreateCache<int64_t, Person>("Person");

// Creating City table.
cache.Query(SqlFieldsQuery("CREATE TABLE City (id int primary key, name varchar, region varchar)"));

在SQL模式方面,上述代码的执行结果,创建了下面的表:

  • Person模式中的Person表(如果之前未创建);
  • Person模式中的City表。

要查询City表,可以使用两种方式:select * from Person.Citynew SqlFieldsQuery("select * from City").setSchema("PERSON")(注意大写)。

4.6.取消查询

有两种方式可以取消长时间运行的查询。

第一种方式是设置查询执行超时:

java
SqlFieldsQuery query = new SqlFieldsQuery("SELECT * from Person");

// Setting query execution timeout
query.setTimeout(10_000, TimeUnit.SECONDS);
csharp
var query = new SqlFieldsQuery("select * from Person") {Timeout = TimeSpan.FromSeconds(10)};

第二个方式是调用QueryCursor.close()来终止查询:

java
SqlFieldsQuery query = new SqlFieldsQuery("SELECT * FROM Person");

// Executing the query
QueryCursor<List<?>> cursor = cache.query(query);

// Halting the query that might be still in progress.
cursor.close();
csharp
var qry = new SqlFieldsQuery("select * from Person");
var cursor = cache.Query(qry);

//Executing query

//Halting the query that might be still in progress
cursor.Dispose();

4.7.示例

Ignite的源代码中有一个直接可以运行的SqlDmlExample,其演示了所有上面提到过的DML操作的使用。

5.分布式关联

分布式关联是指SQL语句中通过关联子句组合了两个或者更多的分区表,如果这些表关联在分区列(关联键)上,该关联称为并置关联,否则称为非并置关联

并置关联更高效,因为其可以高效地在集群节点间分布。

Ignite默认将每个关联查询都视为并置关联,并按照并置的模式执行。

警告

如果查询是非并置的,需要通过SqlFieldsQuery.setDistributedJoins(true)来开启查询执行的非并置模式,否则查询的结果集会是不正确的。

警告

如果经常关联表,那么建议将表在同一个列(关联表的列)上进行分区。

非并置的关联仅适用于无法使用并置关联的场景。

5.1.并置关联

下图解释了并置关联的执行过程,一个并置关联(Q)会被发给存储与查询条件匹配的数据的所有节点,然后查询在每个节点的本地数据集上执行(E(Q)),结果集(R)会在查询的发起节点(客户端节点)聚合:

5.2.非并置关联

如果以非并置模式执行查询,则SQL引擎将在存储与查询条件匹配的数据的所有节点上本地执行查询。但是因为数据不是并置的,所以每个节点将通过发送广播或单播请求从其他节点拉取缺失的数据(本地不存在),下图描述了此过程:

如果关联是在主键或关联键上,则节点将发送单播请求,因为这时节点知道缺失数据的位置。否则节点将发送广播请求。出于性能原因,广播和单播请求都被汇总为批次。

通过设置JDBC/ODBC参数,或通过调用SqlFieldsQuery.setDistributedJoins(true)使用SQL API,可以启用非并置查询执行模式。

警告

如果对复制表中的列使用非并置关联,则该列必须有索引。否则会抛出异常。

6.SQL事务

警告

支持SQL事务当前处于测试阶段,生产环境建议使用键-值事务。

6.1.概述

配置为TRANSACTIONAL_SNAPSHOT原子化模式的缓存支持SQL事务。TRANSACTIONAL_SNAPSHOT模式是Ignite缓存的多版本并发控制(MVCC)实现,关于MVCC的更多信息以及当前的限制,请参见多版本并发控制章节的内容。

关于Ignite支持的事务语法,请参见事务章节的内容。

6.2.启用MVCC

在缓存配置中使用TRANSACTIONAL_SNAPSHOT原子化模式可以为缓存开启MVCC,如果使用CREATE TABLE命令建表,可以在命令的WITH子句中指定原子化模式参数。

xml
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">

            <property name="name" value="myCache"/>

            <property name="atomicityMode" value="TRANSACTIONAL_SNAPSHOT"/>

        </bean>
    </property>
</bean>
java
CacheConfiguration cacheCfg = new CacheConfiguration<>();
cacheCfg.setName("myCache");

cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
csharp
var cacheCfg = new CacheConfiguration
{
    Name = "myCache",
    AtomicityMode = CacheAtomicityMode.TransactionalSnapshot
};
sql
CREATE TABLE Person WITH "ATOMICITY=TRANSACTIONAL_SNAPSHOT"

6.3.限制

6.3.1.跨缓存事务

TRANSACTIONAL_SNAPSHOT模式是缓存级的,因此不允许在一个事务中的缓存具有不同的原子化模式,如果要在一个事务中覆盖多张表,那么所有的相关表都要使用TRANSACTIONAL_SNAPSHOT模式创建。

6.3.2.嵌套事务

通过JDBC/ODBC连接参数,Ignite支持三种模式用于处理嵌套的SQL事务。

JDBC连接串示例:

jdbc:ignite:thin://127.0.0.1/?nestedTransactionsMode=COMMIT

当事务中发生了嵌套的事务,系统的行为取决于nestedTransactionsMode参数:

  • ERROR:如果遇到嵌套事务,会抛出错误并且包含的事务会回滚,这是默认的行为;
  • COMMIT:包含事务会被挂起,嵌套事务启动后如果遇到COMMIT语句会被提交。包含事务中的其余语句会作为隐式事务执行;
  • IGNORE不要使用这个模式,嵌套事务的开始会被忽略,嵌套事务中的语句会作为包含事务的一部分执行,并且随着嵌套事务的提交而提交所有的变更,包含事务的剩余语句会作为隐式事务执行。

7.自定义SQL函数

Ignite的SQL引擎支持通过额外用Java编写的自定义SQL函数,来扩展ANSI-99规范定义的SQL函数集。

一个自定义SQL函数仅仅是一个加注了@QuerySqlFunction注解的公共静态方法。

java
// Defining a custom SQL function.
public class MyFunctions {
    @QuerySqlFunction
    public static int sqr(int x) {
        return x * x;
    }
}

持有自定义SQL函数的类需要使用setSqlFunctionClasses(...)方法在某个CacheConfiguration中注册。

java
// Preparing a cache configuration.
CacheConfiguration cfg = new CacheConfiguration();

// Registering the class that contains custom SQL functions.
cfg.setSqlFunctionClasses(MyFunctions.class);

经过了上述配置的缓存部署之后,在SQL查询中就可以调用自定义函数了,如下所示:

java
// Preparing the query that uses customly defined 'sqr' function.
SqlFieldsQuery query = new SqlFieldsQuery(
  "SELECT name FROM Blocks WHERE sqr(size) > 100");

// Executing the query.
cache.query(query).getAll();

类注册

在自定义SQL函数可能要执行的所有节点上,通过CacheConfiguration.setSqlFunctionClasses(...)注册的类都需要添加到类路径中,否则在自定义函数执行时会抛出ClassNotFoundException异常。

8.JDBC驱动

Ignite提供了JDBC驱动,可以通过标准的SQL语句处理分布式数据,比如从JDBC端直接进行SELECTINSERTUPDATEDELETE

目前,Ignite支持两种类型的驱动,轻量易用的JDBC Thin模式驱动以及以客户端节点形式与集群进行交互的JDBC客户端驱动

8.1.JDBC Thin模式驱动

JDBC Thin模式驱动是Ignite提供的默认轻量级驱动,要使用这种驱动,只需要将ignite-core-{version}.jar加入应用的类路径即可。

驱动会接入集群的一个节点然后将所有的请求转发给它进行处理。节点会处理分布式的查询以及结果集的汇总,然后将结果集反馈给客户端应用。

JDBC连接串可以有两种模式:URL查询模式以及分号模式:

// URL query pattern
jdbc:ignite:thin://<hostAndPortRange0>[,<hostAndPortRange1>]...[,<hostAndPortRangeN>][/schema][?<params>]

hostAndPortRange := host[:port_from[..port_to]]

params := param1=value1[&param2=value2]...[&paramN=valueN]

// Semicolon pattern
jdbc:ignite:thin://<hostAndPortRange0>[,<hostAndPortRange1>]...[,<hostAndPortRangeN>][;schema=<schema_name>][;param1=value1]...[;paramN=valueN]
  • host:必需,它定义了要接入的集群节点主机地址;
  • port_from:打开连接的端口范围的起始点,如果忽略此参数默认为10800
  • port_to:可选,如果忽略此参数则等同于port_from
  • schema:要访问的模式名,默认是PUBLIC,这个名字对应于SQL的ANSI-99标准,不加引号是大小写不敏感的,加引号是大小写敏感的。如果使用了分号模式,模式可以通过参数名schema定义;
  • <params>:可选。

驱动类名为org.apache.ignite.IgniteJdbcThinDriver,比如,下面就是如何打开到集群节点的连接,监听地址为192.168.0.50:

java
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

// Open the JDBC connection.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://192.168.0.50");

如果通过bash接入则JDBC URL需要加引号

如果通过bash环境接入,则连接URL需要加" ",比如:"jdbc:ignite:thin://[address]:[port];user=[username];password=[password]"

8.1.1.参数

下表列出了JDBC连接串支持的所有参数:

属性名描述默认值
userSQL连接的用户名,如果服务端开启了认证则此参数为必需。关于如何开启认证和创建用户,可以分别参见认证创建用户的文档。ignite
passwordSQL连接的密码,如果服务端开启了认证则此参数为必需。关于如何开启认证和创建用户,可以分别参见认证创建用户的文档。ignite
distributedJoins对于非并置数据是否使用分布式关联false
enforceJoinOrder是否在查询中强制表的关联顺序,如果配置为true,查询优化器在关联中不会对表进行重新排序。false
collocated如果SQL语句包含按主键或关联键对结果集进行分组的GROUP BY子句,可以将此参数设置为true。当Ignite执行分布式查询时,会向单个集群节点发送子查询,如果事先知道待查询的数据是在同一个节点上并置在一起的,并且是按主键或关联键分组的,那么Ignite通过在参与查询的每个节点本地分组数据来实现显著的性能和网络优化。false
replicatedOnly查询是否只包含复制表,这是一个潜在的可能提高性能的提示。false
autoCloseServerCursor当拿到最后一个结果集时是否自动关闭服务端游标。开启之后,对ResultSet.close()的调用就不需要网络访问,这样会改进性能。但是,如果服务端游标已经关闭,在调用ResultSet.getMetadata()方法时会抛出异常,这时为什么默认值为false的原因。false
partitionAwareness启用分区感知模式,该模式中,驱动会尝试确定要查询的数据所在的节点,然后把请求发给这些节点。false
partitionAwarenessSQLCacheSize驱动为优化而在本地保留的不同SQL查询数。当第一次执行查询时,驱动会接收正在查询的表的分区分布,并将其保存以备将来在本地使用。下次查询此表时,驱动使用该分区分布来确定要查询的数据的位置,以便将查询直接发送到正确的节点。当集群拓扑发生变更时,此包含SQL查询的本地存储将失效。此参数的最佳值应等于要执行的不同SQL查询的数量。1000
partitionAwarenessPartitionDistributionsCacheSize表示分区分布的不同对象的数量,驱动在本地保留以进行优化。具体请参见partitionAwarenessSQLCacheSize参数的说明。当集群拓扑发生变更时,持有分区分布对象的本地存储将失效。此参数的最佳值应等于要在查询中使用的不同表(缓存组)的数量。1000
socketSendBuffer发送套接字缓冲区大小,如果配置为0,会使用操作系统默认值。0
socketReceiveBuffer接收套接字缓冲区大小,如果配置为0,会使用操作系统默认值。0
tcpNoDelay是否使用TCP_NODELAY选项。true
lazy查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端。对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停甚至OutOfMemoryError,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。false
skipReducerOnUpdate开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总方),然后会准备一个更新值的批次发给远程节点。这个方式可能影响性能,如果一个DML操作需要移动大量数据时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。false

关于和安全有关的参数,请参见使用SSL章节的介绍。

8.1.2.连接串示例

  • jdbc:ignite:thin://myHost:接入myHost,其它比如端口为10800等都是默认值;
  • jdbc:ignite:thin://myHost:11900:接入myHost,自定义端口为11900,其它为默认值;
  • jdbc:ignite:thin://myHost:11900;user=ignite;password=ignite:接入myHost,自定义端口为11900,并且带有用于认证的用户凭据;
  • jdbc:ignite:thin://myHost:11900;distributedJoins=true&autoCloseServerCursor=true:接入myHost,自定义端口为11900,开启了分布式关联和autoCloseServerCursor优化;
  • jdbc:ignite:thin://myHost:11900/myschema;:接入myHost,自定义端口为11900,模式为MYSCHEMA
  • jdbc:ignite:thin://myHost:11900/"MySchema";lazy=false:接入myHost,自定义端口为11900,模式为MySchema(模式名区分大小写),并且禁用了查询的延迟执行。

8.1.3.多端点

在连接串中配置多个连接端点也是可以的,这样如果连接中断会开启自动故障转移,JDBC驱动会从列表中随机选择一个地址接入。如果之前的连接中断,驱动会选择另一个地址直到连接恢复,如果所有的端点都不可达,JDBC会停止重连并且抛出异常。

下面的示例会显示如何通过连接串传递3个地址:

java
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

// Open the JDBC connection passing several connection endpoints.
Connection conn = DriverManager.getConnection(
  "jdbc:ignite:thin://192.168.0.50:101,192.188.5.40:101, 192.168.10.230:101");

8.1.4.分区感知

警告

分区感知是一个试验性特性,API和设计架构在正式发布之前可能会变更。

分区感知是一个可使JDBC驱动“感知”集群中分区分布的功能。它使得驱动可以选择持有待查询数据的节点,并将查询直接发送到那些节点(如果在驱动的配置中提供了节点的地址)。分区感知可以提高使用关联键的查询的平均性能。

没有分区感知时,JDBC驱动将连接到某个节点,然后所有查询都通过该节点执行。如果数据分布在其他节点上,则必须在集群内重新路由查询,这会增加一个额外的网络波动。分区感知通过将查询直接发送到正确的节点来消除该波动。

要使用分区感知功能,需要在连接属性中提供所有服务端节点的地址,驱动会将请求直接发送到存储查询所请求数据的节点。

警告

注意,当前需要在连接属性中提供所有服务端节点的地址,因为在打开连接后驱动不会自动加载它们。这意味着如果新的服务端节点加入集群,需要将节点的地址添加到连接属性中,然后重新连接驱动,否则驱动将无法直接向该节点发送请求。

要开启分区感知,需要将partitionAwareness=true参数添加到连接串中,然后提供多个服务端节点的地址。

java
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

Connection conn = DriverManager
        .getConnection("jdbc:ignite:thin://192.168.0.50,192.188.5.40,192.168.10.230?partitionAwareness=true");

提示

分区感知功能只能使用默认的关联函数。

8.1.5.集群配置

为了接收和处理来自JDBC Thin驱动转发过来的请求,一个节点需要绑定到一个本地网络端口10800,然后监听入站请求。

通过ClientConnectorConfiguration,可以对参数进行修改:

java
IgniteConfiguration cfg = new IgniteConfiguration()
    .setClientConnectorConfiguration(new ClientConnectorConfiguration());
xml
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  <property name="clientConnectorConfiguration">
    <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration" />
  </property>
</bean>

其支持如下的参数:

参数名描述默认值
host绑定的主机名或者IP地址,如果配置为null,会使用localHostnull
port绑定的TCP端口,如果指定的端口已被占用,Ignite会使用portRange属性来查找其它可用的端口。10800
portRange定义尝试绑定的端口数量,比如,如果端口配置为10800并且端口范围为100,Ignite会从10800开始,在[10800,10900]范围内查找可用端口。100
maxOpenCursorsPerConnection每个连接打开的服务端游标的最大数量。128
threadPoolSize线程池中负责请求处理的线程数量。max(8,CPU核数)
socketSendBufferSizeTCP套接字发送缓冲区大小,如果配置为0,会使用操作系统默认值。0
socketReceiveBufferSizeTCP套接字接收缓冲区大小,如果配置为0,会使用操作系统默认值。0
tcpNoDelay是否使用TCP_NODELAY选项。true
idleTimeout客户端连接空闲超时时间。在空闲超过配置的超时时间后,客户端与服务端的连接会断开。如果该参数配置为0或者负值,空闲超时会被禁用。0
isJdbcEnabled是否允许JDBC访问。true
isThinClientEnabled是否允许瘦客户端访问。true
sslEnabled如果开启SSL,只允许SSL客户端连接。一个节点只允许一种连接模式:SSL或普通,一个节点无法同时接收两种模式的客户端连接,但是这个参数集群中的各个节点可以不同。false
useIgniteSslContextFactory在Ignite配置中是否使用SSL上下文工厂(具体可以看IgniteConfiguration.sslContextFactory)。true
sslClientAuth是否需要客户端认证。false
sslContextFactory提供节点侧SSL的Factory<SSLContext>实现的类名。null

JDBC Thin模式驱动并非线程安全

JDBC对象中的ConnectionStatementResultSet不是线程安全的。因此不能在多线程中使用一个JDBC连接的Statement和ResultSet。 JDBC Thin模式驱动防止并发,如果检测到了并发访问,那么会抛出SQLException,消息为:Concurrent access to JDBC connection is not allowed [ownThread=<guard_owner_thread_name>,curThread=<current_thread_name>]",SQLSTATE="08006

8.1.6.使用SSL

JDBC Thin模式驱动可以使用SSL来保护与集群之间的通信,集群端和驱动端必须同时配置SSL,集群配置方面,请参见瘦客户端和JDBC/ODBC的SSL/TLS章节的介绍。

JDBC驱动中开启SSL,需要在连接串中传递sslMode=require参数,并且提供密钥库和信任库参数:

java
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

String keyStore = "keystore/node.jks";
String keyStorePassword = "123456";

String trustStore = "keystore/trust.jks";
String trustStorePassword = "123456";

try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?sslMode=require"
        + "&sslClientCertificateKeyStoreUrl=" + keyStore + "&sslClientCertificateKeyStorePassword="
        + keyStorePassword + "&sslTrustCertificateKeyStoreUrl=" + trustStore
        + "&sslTrustCertificateKeyStorePassword=" + trustStorePassword)) {

    ResultSet rs = conn.createStatement().executeQuery("select 10");
    rs.next();
    System.out.println(rs.getInt(1));
} catch (Exception e) {
    e.printStackTrace();
}

下表列出了和SSL/TLS连接有关的参数:

参数名描述默认值
sslMode开启SSL连接。可用的模式为:1.require:在客户端开启SSL协议,只有SSL连接才可以接入。2.disable:在客户端禁用SSL协议,只支持普通连接。disable
sslProtocol安全连接的协议名,如果未指定,会使用TLS协议。协议实现由JSSE提供:SSLv3 (SSL), TLSv1 (TLS), TLSv1.1, TLSv1.2TLS
sslKeyAlgorithm用于创建密钥管理器的密钥管理器算法。注意多数情况使用默认值即可。算法实现由JSSE提供:PKIX (X509或SunPKIX), SunX509
sslClientCertificateKeyStoreUrl客户端密钥存储库文件的url,这是个强制参数,因为没有密钥管理器SSL上下文无法初始化。如果sslModerequire并且未通过属性文件指定密钥存储库 URL,那么会使用JSSE属性javax.net.ssl.keyStore的值。JSSE系统属性javax.net.ssl.keyStore的值。
sslClientCertificateKeyStorePassword客户端密钥存储库密码。如果sslModerequire并且未通过属性文件指定密钥存储库密码,那么会使用JSSE属性javax.net.ssl.keyStorePassword的值。JSSE属性javax.net.ssl.keyStorePassword的值。
sslClientCertificateKeyStoreType用于上下文初始化的客户端密钥存储库类型。如果sslModerequire并且未通过属性文件指定密钥存储库类型,那么会使用JSSE属性javax.net.ssl.keyStoreType的值。JSSE属性javax.net.ssl.keyStoreType的值,如果属性未定义,默认值为JKS。
sslTrustCertificateKeyStoreUrltruststore文件的URL。这是个可选参数,但是sslTrustCertificateKeyStoreUrlsslTrustAll必须配置一个。如果sslModerequire并且未通过属性文件指定truststore文件URL,那么会使用JSSE属性javax.net.ssl.trustStore的值。JSSE系统属性javax.net.ssl.trustStore的值。
sslTrustCertificateKeyStorePasswordtruststore密码。如果sslModerequire并且未通过属性文件指定truststore密码,那么会使用JSSE属性javax.net.ssl.trustStorePassword的值。JSSE系统属性javax.net.ssl.trustStorePassword的值。
sslTrustCertificateKeyStoreTypetruststore类型。如果sslModerequire并且未通过属性文件指定truststore类型,那么会使用JSSE属性javax.net.ssl.trustStoreType的值。JSSE系统属性javax.net.ssl.trustStoreType的值。如果属性未定义,默认值为JKS。
sslTrustAll禁用服务端的证书验证。配置为true信任任何服务端证书(撤销的、过期的或者自签名的SSL证书)。注意,如果不能完全信任网络(比如公共互联网),不要在生产中启用该选项。false
sslFactoryFactory<SSLSocketFactory>的自定义实现的类名,如果sslModerequire并且指定了该工厂类,自定义的工厂会替换JSSE的默认值,这时其它的SSL属性也会被忽略。null

默认实现基于JSSE,并且需要处理两个Java密钥库文件。

  • sslClientCertificateKeyStoreUrl:客户端认证密钥库文件,其持有客户端的密钥和证书;
  • sslTrustCertificateKeyStoreUrl:可信证书密钥库文件,包含用于验证服务器证书的证书信息。

信任库是可选参数,但是sslTrustCertificateKeyStoreUrl或者sslTrustAll必须配置两者之一。

使用sslTrustAll参数

如果生产环境位于不完全可信网络(尤其是公共互联网),不要开启此选项。

如果希望使用自己的实现或者通过某种方式配置SSLSocketFactory,可以使用驱动的sslFactory参数,这是一个包含Factory<SSLSocketFactory>接口实现的类名字符串,该类对于JDBC驱动的类加载器必须可用。

8.2.Ignite DataSource

DataSource对象可用作部署对象,其可以通过JNDI命名服务按逻辑名定位。Ignite JDBC驱动的org.apache.ignite.IgniteJdbcThinDataSource实现了JDBC的DataSource接口,这样就可以使用DataSource接口了。

除了通用的DataSource属性外,IgniteJdbcThinDataSource还支持所有可以传递给JDBC连接字符串的Ignite特有属性。例如,distributedJoins属性可以通过IgniteJdbcThinDataSource#setDistributedJoins()方法进行调整。

具体请参见IgniteJdbcThinDataSource的javadoc。

8.3.示例

要处理集群中的数据,需要使用下面的一种方式来创建一个JDBCConnection对象:

java
// Open the JDBC connection via DriverManager.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://192.168.0.50");

或者:

java
// Or open connection via DataSource.
IgniteJdbcThinDataSource ids = new IgniteJdbcThinDataSource();
ids.setUrl("jdbc:ignite:thin://127.0.0.1");
ids.setDistributedJoins(true);

Connection conn = ids.getConnection();

之后就可以执行SELECTSQL查询了:

java
// Query people with specific age using prepared statement.
PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");

stmt.setInt(1, 30);

ResultSet rs = stmt.executeQuery();

while (rs.next()) {
    String name = rs.getString("name");
    int age = rs.getInt("age");
    // ...
}

此外,可以使用DML语句对数据进行修改。

8.3.1.INSERT

java
// Insert a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");

stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);

stmt.execute();

8.3.2.MERGE

java
// Merge a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("MERGE INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");

stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);

stmt.executeUpdate();

8.3.3.UPDATE

java
// Update a Person.
conn.createStatement().
  executeUpdate("UPDATE Person SET age = age + 1 WHERE age = 25");

8.3.4.DELETE

java
conn.createStatement().execute("DELETE FROM Person WHERE age = 25");

8.4.流处理

Ignite的JDBC驱动可以通过SET STREAMING命令对流化数据进行批量处理,具体可以看SET STREAMING的相关内容。

8.5.错误码

Ignite的JDBC驱动将错误码封装进了java.sql.SQLException类,它简化了应用端的错误处理。可以使用java.sql.SQLException.getSQLState()方法获取错误码,该方法会返回一个包含预定义ANSI SQLSTATE错误码的字符串:

java
PreparedStatement ps;

try {
    ps = conn.prepareStatement("INSERT INTO Person(id, name, age) values (1, 'John', 'unparseableString')");
} catch (SQLException e) {
    switch (e.getSQLState()) {
    case "0700B":
        System.out.println("Conversion failure");
        break;

    case "42000":
        System.out.println("Parsing error");
        break;

    default:
        System.out.println("Unprocessed error: " + e.getSQLState());
        break;
    }
}

下表中列出了Ignite目前支持的所有ANSI SQLSTATE错误码,未来这个列表可能还会扩展:

代码描述
0700B转换失败(比如,一个字符串表达式无法解析成数值或者日期)
0700E无效的事务隔离级别
08001驱动接入集群失败
08003连接意外地处于关闭状态
08004连接被集群拒绝
08006通信中发生I/O错误
22004不允许的空值
22023不支持的参数类型
23000违反了数据完整性约束
24000无效的结果集状态
0A000不支持的操作
40001并发更新冲突,具体请参见并发更新章节的介绍。
42000查询解析异常
50000Ignite内部错误,这个代码不是ANSI定义的,属于Ignite特有的错误,获取java.sql.SQLException的错误信息可以了解更多的细节

9.JDBC客户端驱动

9.1.JDBC客户端驱动

JDBC客户端节点模式驱动使用客户端节点连接接入集群,这要求开发者提供一个完整的Spring XML配置作为JDBC连接串的一部分,然后拷贝下面所有的jar文件到应用或者SQL工具的类路径中:

  • {IGNITE_HOME}\libs目录下的所有jar文件;
  • {IGNITE_HOME}\ignite-indexing{IGNITE_HOME}\ignite-spring目录下的所有jar文件;

这个驱动很重,而且可能不支持Ignite的最新SQL特性,但是因为它底层使用客户端节点连接,它可以执行分布式查询,然后在应用端直接对结果进行汇总。

JDBC连接URL的规则如下:

jdbc:ignite:cfg://[<params>@]<config_url>

其中:

  • <config_url>是必需的,表示指向Ignite客户端节点配置文件的任意合法URL,当驱动试图建立到集群的连接时,这个节点会在Ignite JDBC客户端节点驱动中启动;
  • <params>是可选的,格式如下:
param1=value1:param2=value2:...:paramN=valueN

驱动类名为org.apache.ignite.IgniteJdbcDriver,比如下面的代码,展示了如何打开一个到集群的JDBC连接:

java
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");

// Open JDBC connection (cache name is not specified, which means that we use default cache).
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");

安全连接

关于如何保护JDBC客户端驱动的更多信息,请参见高级安全的相关文档。

9.1.1.支持的参数

属性描述默认值
cache缓存名,如果未定义会使用默认的缓存,区分大小写
nodeId要执行的查询所在节点的Id,对于在本地查询是有用的
local查询只在本地节点执行,这个参数和nodeId参数都是通过指定节点来限制数据集false
collocated优化标志,当Ignite执行一个分布式查询时,它会向单个的集群节点发送子查询,如果提前知道要查询的数据已经被并置到同一个节点,Ignite会有显著的性能提升和拓扑优化false
distributedJoins可以在非并置的数据上使用分布式关联。false
streaming通过INSERT语句为本链接开启批量数据加载模式,具体可以参照后面的流模式相关章节。false
streamingAllowOverwrite通知Ignite对于重复的已有键,覆写它的值而不是忽略它们,具体可以参照后面的流模式相关章节。false
streamingFlushFrequency超时时间,毫秒,数据流处理器用于刷新数据,数据默认会在连接关闭时刷新,具体可以参照后面的流模式相关章节。0
streamingPerNodeBufferSize数据流处理器的每节点缓冲区大小,具体可以参照后面的流模式相关章节。1024
streamingPerNodeParallelOperations数据流处理器的每节点并行操作数。具体可以参照后面的流模式相关章节。16
transactionsAllowed目前已经支持了ACID事务,但是仅仅在键-值API层面,在SQL层面Ignite支持原子性,还不支持事务一致性,这意味着使用这个功能的时候驱动可能抛出Transactions are not supported这样的异常。但是,有时需要使用事务语法(即使不需要事务语义),比如一些BI工具会一直强制事务行为,也需要将该参数配置为true以避免异常。false
multipleStatementsAllowedJDBC驱动可以同时处理多个SQL语句并且返回多个ResultSet对象,如果该参数为false,多个语句的查询会返回错误。false
lazy查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。false
skipReducerOnUpdate开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。false

9.1.2.流模式

使用JDBC驱动,可以以流模式(批处理模式)将数据注入Ignite集群。这时驱动会在内部实例化IgniteDataStreamer然后将数据传给它。要激活这个模式,可以在JDBC连接串中增加streaming参数并且设置为true

java
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");

// Opening connection in the streaming mode.
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://streaming=true@file:///etc/config/ignite-jdbc.xml");

目前,流模式只支持INSERT操作,对于想更快地将数据预加载进缓存的场景非常有用。JDBC驱动定义了多个连接参数来影响流模式的行为,这些参数已经在上述的参数表中列出。

缓存名

确保在JDBC连接字符串中通过cache=参数为流操作指定目标缓存。如果未指定缓存或缓存与流式DML语句中使用的表不匹配,则更新会被忽略。

这些参数几乎覆盖了IgniteDataStreamer的所有常规配置,这样就可以根据需要更好地调整流处理器。关于如何配置流处理器可以参考流处理器的相关文档来了解更多的信息。

基于时间的刷新

默认情况下,当要么连接关闭,要么达到了streamingPerNodeBufferSize,数据才会被刷新,如果希望按照时间的方式来刷新,那么可以调整streamingFlushFrequency参数。

java
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");

// Opening a connection in the streaming mode and time based flushing set.
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://streaming=true:streamingFlushFrequency=1000@file:///etc/config/ignite-jdbc.xml");

PreparedStatement stmt = conn.prepareStatement(
  "INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");

// Adding the data.
for (int i = 1; i < 100000; i++) {
      // Inserting a Person object with a Long key.
      stmt.setInt(1, i);
      stmt.setString(2, "John Smith");
      stmt.setInt(3, 25);

      stmt.execute();
}

conn.close();

// Beyond this point, all data is guaranteed to be flushed into the cache.

9.2.示例

要处理集群中的数据,需要使用下面的一种方式来创建一个JDBCConnection对象:

java
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");

// Open JDBC connection (cache name is not specified, which means that we use default cache).
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");

之后就可以执行SELECTSQL查询了:

java
// Query names of all people.
ResultSet rs = conn.createStatement().executeQuery("select name from Person");

while (rs.next()) {
    String name = rs.getString(1);
}
java
// Query people with specific age using prepared statement.
PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");

stmt.setInt(1, 30);

ResultSet rs = stmt.executeQuery();

while (rs.next()) {
    String name = rs.getString("name");
    int age = rs.getInt("age");
}

此外,可以使用DML语句对数据进行修改。

9.2.1.INSERT

java
// Insert a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");

stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);

stmt.execute();

9.2.2.MERGE

java
// Merge a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("MERGE INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");

stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);

stmt.executeUpdate();

9.2.3.UPDATE

java
// Update a Person.
conn.createStatement().
  executeUpdate("UPDATE Person SET age = age + 1 WHERE age = 25");

9.2.4.DELETE

java
conn.createStatement().execute("DELETE FROM Person WHERE age = 25");

10.ODBC驱动

10.1.ODBC驱动

10.1.1.概述

Ignite包括一个ODBC驱动,可以通过标准SQL查询和原生ODBC API查询和修改存储于分布式缓存中的数据。

要了解ODBC的细节,可以参照ODBC开发者参考

Ignite的ODBC驱动实现了ODBC API的3.0版。

10.1.2.集群配置

Ignite的ODBC驱动在Windows中被视为一个动态库,在Linux中被视为一个共享对象,应用不会直接加载它,而是在必要时使用一个驱动加载器API来加载和卸载ODBC驱动。

Ignite的ODBC驱动在内部使用TCP来接入Ignite集群,集群范围的连接参数可以通过IgniteConfiguration.clientConnectorConfiguration属性来配置:

xml
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="clientConnectorConfiguration">
        <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration"/>
    </property>
</bean>
java
IgniteConfiguration cfg = new IgniteConfiguration();

ClientConnectorConfiguration clientConnectorCfg = new ClientConnectorConfiguration();
cfg.setClientConnectorConfiguration(clientConnectorCfg);

客户端连接器配置支持下面的参数:

属性描述默认值
host绑定的主机名或者IP地址,如果为null,会绑定localhostnull
port绑定的TCP端口,如果指定的端口被占用,Ignite会使用portRange属性寻找其它的可用端口。10800
portRange定义尝试绑定的端口范围。比如port配置为10800并且portRange100,那么服务端会按照顺序去尝试绑定[10800, 10900]范围内的端口,直到找到可用的端口。100
maxOpenCursorsPerConnection单个连接可以同时打开的最大游标数。128
threadPoolSize线程池中负责请求处理的线程数。MAX(8, CPU核数)
socketSendBufferSizeTCP套接字发送缓冲区大小,如果配置为0,会使用系统默认值0
socketReceiveBufferSizeTCP套接字接收缓冲区大小,如果配置为0,会使用系统默认值。0
tcpNoDelay是否使用TCP_NODELAY选项。true
idleTimeout客户端连接的空闲超时时间。如果空闲时间超过配置的超时时间,客户端会自动断开与服务端的连接。如果该参数配置为0或者为负值,空闲超时会被禁用。0
isOdbcEnabled是否允许通过ODBC访问。true
isThinClientEnabled是否允许通过瘦客户端访问。true

可以通过如下方式修改参数:

xml
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    <!-- Enabling ODBC. -->
    <property name="clientConnectorConfiguration">
        <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
            <property name="host" value="127.0.0.1"/>
            <property name="port" value="10800"/>
            <property name="portRange" value="5"/>
            <property name="maxOpenCursorsPerConnection" value="512"/>
            <property name="socketSendBufferSize" value="65536"/>
            <property name="socketReceiveBufferSize" value="131072"/>
            <property name="threadPoolSize" value="4"/>
        </bean>
    </property>
</bean>
java
IgniteConfiguration cfg = new IgniteConfiguration();
...
ClientConnectorConfiguration clientConnectorCfg = new ClientConnectorConfiguration();

clientConnectorCfg.setHost("127.0.0.1");
clientConnectorCfg.setPort(12345);
clientConnectorCfg.setPortRange(2);
clientConnectorCfg.setMaxOpenCursorsPerConnection(512);
clientConnectorCfg.setSocketSendBufferSize(65536);
clientConnectorCfg.setSocketReceiveBufferSize(131072);
clientConnectorCfg.setThreadPoolSize(4);

cfg.setClientConnectorConfiguration(clientConnectorCfg);
...

通过ClientListenerProcessor从ODBC驱动端建立的连接也是可以配置的,关于如何从驱动端修改连接的配置,可以看这里

10.1.3.线程安全

Ignite ODBC驱动的当前实现仅在连接层提供了线程安全,这意味着如果没有额外的同步处理,多线程无法访问同一个连接。不过可以为每个线程创建独立的连接,然后同时使用。

10.1.4.环境要求

Ignite的ODBC驱动官方在如下环境中进行了测试:

OSWindows(XP及以上,32位和64位版本)
Windows Server(2008及以上,32位和64位版本)
Ubuntu(14.x和15.x,64位)
C++编译器MS Visual C++ (10.0及以上), g++ (4.4.0及以上)
Visual Studio2010及以上

10.1.5.构建ODBC驱动

在Windows中,Ignite提供了预构建的32位和64位驱动的安装器,因此如果只是想在Windows中安装驱动,那么直接看下面的安装驱动章节就可以了。

对于Linux环境,安装之前还是需要进行构建,因此如果使用的是Linux或者使用Windows但是仍然想自己构建驱动,那么往下看。

Ignite的ODBC驱动的源代码随着Ignite版本一起发布,在使用之前可以自行构建。

因为ODBC驱动是用C++编写的,因此它是作为Ignite C++的一部分提供的,并且依赖于一些C++库,具体点说依赖于utilsbinaryIgnite库,这就意味着,在构建ODBC驱动本身之前,需要先构建它们。

这里假定使用的是二进制版本,如果使用的是源代码版本,那么需要将所有使用的%IGNITE_HOME%\platforms\cpp替换为%IGNITE_HOME%\modules\platforms\cpp

10.1.5.1.在Windows上构建

如果要在Windows上构建ODBC驱动,需要MS Visual Studio 2010及以后的版本。一旦打开了Ignite方案%IGNITE_HOME%\platforms\cpp\project\vs\ignite.sln(或者ignite_86.sln,32位平台),在方案浏览器中点击odbc项目,然后选择“Build”,Visual Studio会自动地检测并且构建所有必要的依赖。

.sln文件的路径可能会有所不同,具体取决于是从源文件还是从二进制文件进行构建。如果在%IGNITE_HOME%\platforms\cpp\project\vs\中找不到.sln文件,可以尝试在%IGNITE_HOME%\modules\platforms\cpp\project\vs\中查找。

注意

如果使用VS 2015及以后的版本(MSVC14.0及以后),需要将legacy_stdio_definitions.lib作为额外的库加入odbc项目的链接器配置以构建项目。要在IDE中将库文件加入链接器,可以打开项目节点的上下文菜单,选择Properties,然后在Project Properties对话框中,选择Linker,然后编辑Linker Input,这时就可以将legacy_stdio_definitions.lib加入分号分割的列表中。

构建过程完成之后,会生成ignite.odbc.dll文件,对于64位版本,位于%IGNITE_HOME%\platforms\cpp\project\vs\x64\Release中,对于32位版本,位于%IGNITE_HOME%\platforms\cpp\project\vs\Win32\Release中。

注意

确认为系统使用相应的驱动(32位或64位)。

10.1.5.2.在Windows中构建安装器

为了简化安装,构建完驱动之后可能想构建安装器,Ignite使用WiX工具包来生成ODBC的安装器,因此需要下载并安装WiX,记得一定要把Wix工具包的bin目录加入PATH变量中。

一切就绪之后,打开终端然后定位到%IGNITE_HOME%\platforms\cpp\odbc\install目录,按顺序执行如下的命令来构建安装器:

bash
candle.exe ignite-odbc-amd64.wxs
light.exe -ext WixUIExtension ignite-odbc-amd64.wixobj
bash
candle.exe ignite-odbc-x86.wxs
light.exe -ext WixUIExtension ignite-odbc-x86.wixobj

完成之后,目录中会出现ignite-odbc-amd64.msiignite-odbc-x86.msi文件,然后就可以使用它们进行安装了。

10.1.5.3.在Linux上构建

在一个基于Linux的操作系统中,如果要构建及使用Ignite ODBC驱动,需要安装选择的ODBC驱动管理器,Ignite ODBC驱动已经使用UnixODBC进行了测试。

环境要求

  • C++编译器;
  • cmake 3.6+;
  • JDK;
  • openssl,包括头文件;
  • unixODBC。

下面列出了几种流行发行版的安装说明:

bash
sudo apt-get install -y build-essential cmake openjdk-11-jdk unixodbc-dev libssl-dev
bash
sudo yum install -y epel-release
sudo yum install -y java-11-openjdk-devel cmake3 unixODBC-devel openssl-devel make gcc-c++
bash
sudo yum install -y java-11-openjdk-devel cmake3 unixODBC-devel openssl-devel make gcc-c++

提示

JDK只用于构建过程,并不会用于ODBC驱动。

构建ODBC驱动

  • 为cmake创建一个构建目录,将其称为${CPP_BUILD_DIR}
  • (可选)选择安装目录前缀(默认为/usr/local),将其称为${CPP_INSTALL_DIR}
  • 通过如下命令构建和安装驱动:
shell
cd ${CPP_BUILD_DIR}
cmake -DCMAKE_BUILD_TYPE=Release -DWITH_ODBC=ON ${IGNITE_HOME}/platforms/cpp -DCMAKE_INSTALL_PREFIX=${CPP_INSTALL_DIR}
make
sudo make install
shell
cd ${CPP_BUILD_DIR}
cmake3 -DCMAKE_BUILD_TYPE=Release -DWITH_ODBC=ON  ${IGNITE_HOME}/platforms/cpp -DCMAKE_INSTALL_PREFIX=${CPP_INSTALL_DIR}
make
sudo make install

构建过程完成后,可以通过如下命令找到ODBC驱动位于何处:

bash
whereis libignite-odbc

路径很可能是:/usr/local/lib/libignite-odbc.so

10.1.6.安装ODBC驱动

要使用ODBC驱动,首先要在系统中进行注册,因此ODBC驱动管理器必须能找到它。

10.1.6.1.在Windows上安装

在32位的Windows上需要使用32位版本的驱动,而在64位的Windows上可以使用64位和32位版本的驱动,也可以在64位的Windows上同时安装32位和64位版本的驱动,这样32位和64位的应用都可以使用驱动。

使用安装器进行安装

注意

首先要安装微软的Microsoft Visual C++ 2010 Redistributable 32位或者64位包。

这是最简单的方式,也是建议的方式,只需要启动指定版本的安装器即可:

  • 32位:%IGNITE_HOME%\platforms\cpp\bin\odbc\ignite-odbc-x86.msi
  • 64位:%IGNITE_HOME%\platforms\cpp\bin\odbc\ignite-odbc-amd64.msi

手动安装

要在Windows上手动安装ODBC驱动,首先要为驱动在文件系统中选择一个目录,选择一个位置后就可以把驱动放在哪并且确保所有的驱动依赖可以被解析,也就是说,它们要么位于%PATH%,要么和驱动DLL位于同一个目录。

之后,就需要使用%IGNITE_HOME%/platforms/cpp/odbc/install目录下的安装脚本之一,注意,执行这些脚本很可能需要管理员权限。

bash
install_x86 <absolute_path_to_32_bit_driver>
bash
install_amd64 <absolute_path_to_64_bit_driver> [<absolute_path_to_32_bit_driver>]
10.1.6.2.在Linux上安装

要在Linux上构建和安装ODBC驱动,首先需要安装ODBC驱动管理器,Ignite ODBC驱动已经和UnixODBC进行了测试。

如果已经构建完成并且执行了make install命令,libignite-odbc.so很可能会位于/usr/local/lib,要在ODBC驱动管理器中安装ODBC驱动并且可以使用,需要按照如下的步骤进行操作:

  • 确保链接器可以定位ODBC驱动的所有依赖。可以使用ldd命令像如下这样进行检查(假定ODBC驱动位于/usr/local/lib):ldd /usr/local/lib/libignite-odbc.so,如果存在到其它库的无法解析的链接,需要将这些库文件所在的目录添加到LD_LIBRARY_PATH
  • 编辑$IGNITE_HOME/platforms/cpp/odbc/install/ignite-odbc-install.ini文件,并且确保Apache Ignite段的Driver参数指向libignite-odbc.so所在的位置;
  • 要安装Ignite的ODBC驱动,可以使用如下的命令:odbcinst -i -d -f $IGNITE_HOME/platforms/cpp/odbc/install/ignite-odbc-install.ini,要执行这条命令,很可能需要root权限。

到现在为止,Ignite的ODBC驱动已经安装好了并且可以用了,可以像其它ODBC驱动一样,连接、使用。

10.2.连接串和DSN

10.2.1.连接串格式

Ignite的ODBC驱动支持标准的连接串格式,下面是正常的语法:

connection-string ::= empty-string[;] | attribute[;] | attribute; connection-string
empty-string ::=
attribute ::= attribute-keyword=attribute-value | DRIVER=[{]attribute-value[}]
attribute-keyword ::= identifier
attribute-value ::= character-string

简单来说,连接串就是一个字符串,其中包含了用分号分割的参数。

10.2.2.支持的参数

Ignite的ODBC驱动可以使用一些连接串/DSN参数,所有的参数都是大小写不敏感的,因此ADDRESSAddressaddress都是有效的参数名,并且指向的是同一个参数。如果参数未指定,会使用默认值,其中的一个例外是ADDRESS属性,如果未指定,会使用SERVERPORT属性代替:

属性关键字描述默认值
ADDRESS要连接的远程节点的地址,格式为:<host>[:<port>]。比如:localhost, example.com:12345, 127.0.0.1, 192.168.3.80:5893,如果指定了这个属性,SERVERPORT将会被忽略。
SERVER要连接的节点地址,如果指定了ADDRESS属性,本属性会被忽略。
PORT节点的OdbcProcessor监听的端口,如果指定了ADDRESS属性,本属性会被忽略。10800
USERSQL连接的用户名。如果服务端开启了认证,该参数为必需。“”
PASSWORDSQL连接的密码。如果服务端开启了认证,该参数为必需。“”
SCHEMA模式名。PUBLIC
DSN要连接的DSN名
PAGE_SIZE数据源的响应中返回的行数,默认值会适用于大多数场景,小些的值会导致获取数据变慢,大些的值会导致驱动的额外内存占用,以及获取下一页时的额外延迟。1024
DISTRIBUTED_JOINS为在ODBC连接上执行的所有查询开启非并置的分布式关联特性。false
ENFORCE_JOIN_ORDER强制SQL查询中表关联顺序,如果设置为true,查询优化器在关联时就不会对表进行再排序。false
PROTOCOL_VERSION使用的ODBC协议版本,目前支持如下的版本:2.1.0、2.1.5、2.3.0、2.3.2和2.5.0,因为向后兼容,也可以使用协议的早期版本。2.3.0
REPLICATED_ONLY配置查询只在全复制的表上执行,这是个提示,用于更高效地执行。false
COLLOCATED如果SQL语句包含按主键或关联键对结果集进行分组的GROUP BY子句,可以将此参数设置为true。当Ignite执行分布式查询时,会向单个集群节点发送子查询,如果事先知道待查询的数据是在同一个节点上并置在一起的,并且是按主键或关联键分组的,那么Ignite通过在参与查询的每个节点本地分组数据来实现显著的性能和网络优化。false
LAZY查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。false
SKIP_REDUCER_ON_UPDATE开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。false
SSL_MODE确定服务端是否需要SSL连接。可以根据需要使用require或者disable
SSL_KEY_FILE指定包含服务端SSL私钥的文件名。
SSL_CERT_FILE指定包含SSL服务器证书的文件名。
SSL_CA_FILE指定包含SSL服务器证书颁发机构(CA)的文件名。

10.2.3.连接串示例

下面的串,可以用于SQLDriverConnectODBC调用,来建立与Ignite节点的连接。

properties
DRIVER={Apache Ignite};
ADDRESS=localhost:10800;
SCHEMA=somecachename;
USER=yourusername;
PASSWORD=yourpassword;
SSL_MODE=[require|disable];
SSL_KEY_FILE=<path_to_private_key>;
SSL_CERT_FILE=<path_to_client_certificate>;
SSL_CA_FILE=<path_to_trusted_certificates>
DRIVER={Apache Ignite};ADDRESS=localhost:10800;CACHE=yourCacheName
DRIVER={Apache Ignite};ADDRESS=localhost:10800
DSN=MyIgniteDSN
DRIVER={Apache Ignite};ADDRESS=example.com:12901;CACHE=MyCache;PAGE_SIZE=4096

10.2.4.配置DSN

如果要使用DSN(数据源名)来进行连接,可以使用同样的参数。

要在Windows上配置DSN,需要使用一个叫做odbcad32(32位x86系统)/odbcad64(64位)的系统工具,这是一个ODBC数据源管理器。

安装DSN工具时,如果使用的是预构建的msi文件,一定要先安装Microsoft Visual C++ 2010(32位x86,或者64位x64)。

要启动这个工具,打开Control panel->Administrative Tools->数据源(ODBC),当ODBC数据源管理器启动后,选择Add...->Apache Ignite,然后以正确的方式配置DSN。

在Linux上配置DSN,需要找到odbc.ini文件,这个文件的位置各个发行版有所不同,依赖于发行版使用的驱动管理器,比如,如果使用unixODBC,那么可以执行如下的命令来输出系统级的ODBC相关信息:

shell
odbcinst -j

使用SYSTEM DATA SOURCESUSER DATA SOURCES属性,可以定位odbc.ini文件。

找到odbc.ini文件之后,可以用任意编辑器打开它,然后像下面这样添加DSN片段:

[DSN Name]
description=<Insert your description here>
driver=Apache Ignite
<Other arguments here...>

10.3.查询和修改数据

10.3.1.概述

本章会详细描述如何接入Ignite集群,如何使用ODBC驱动执行各种SQL查询。

在实现层,Ignite的ODBC驱动使用SQL字段查询来获取Ignite缓存中的数据,这意味着通过ODBC只可以访问这些集群配置中定义的字段。

另外,ODBC驱动支持DML,这意味着通过ODBC连接不仅仅可以读取数据,还可以修改数据。

提示

这里是完整的ODBC示例

10.3.2.配置Ignite集群

第一步,需要对集群节点进行配置,这个配置需要包含缓存的配置以及定义了QueryEntities的属性。如果应用(当前场景是ODBC驱动)要通过SQL语句进行数据的查询和修改,QueryEntities是必须的,或者,也可以使用DDL创建表。

cpp
SQLHENV env;

// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);

// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);

SQLHDBC dbc;

// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);

// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";

// Connecting to Ignite Cluster.
SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query1[] = "CREATE TABLE Person ( "
    "id LONG PRIMARY KEY, "
    "firstName VARCHAR, "
    "lastName VARCHAR, "
  	"salary FLOAT) "
    "WITH \"template=partitioned\"";

SQLExecDirect(stmt, query1, SQL_NTS);

SQLCHAR query2[] = "CREATE TABLE Organization ( "
    "id LONG PRIMARY KEY, "
    "name VARCHAR) "
    "WITH \"template=partitioned\"";

SQLExecDirect(stmt, query2, SQL_NTS);

SQLCHAR query3[] = "CREATE INDEX idx_organization_name ON Organization (name)";

SQLExecDirect(stmt, query3, SQL_NTS);
xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
  <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">

    <!-- Enabling ODBC. -->
    <property name="odbcConfiguration">
      <bean class="org.apache.ignite.configuration.OdbcConfiguration"/>
    </property>

    <!-- Configuring cache. -->
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          <property name="name" value="Person"/>
          <property name="cacheMode" value="PARTITIONED"/>
          <property name="atomicityMode" value="TRANSACTIONAL"/>
          <property name="writeSynchronizationMode" value="FULL_SYNC"/>

          <property name="queryEntities">
            <list>
              <bean class="org.apache.ignite.cache.QueryEntity">
                <property name="keyType" value="java.lang.Long"/>
                <property name="keyFieldName" value="id"/>
                <property name="valueType" value="Person"/>

                <property name="fields">
                  <map>
                    <entry key="firstName" value="java.lang.String"/>
                    <entry key="lastName" value="java.lang.String"/>
                    <entry key="salary" value="java.lang.Double"/>
                  </map>
                </property>
              </bean>
            </list>
          </property>
        </bean>

        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          <property name="name" value="Organization"/>
          <property name="cacheMode" value="PARTITIONED"/>
          <property name="atomicityMode" value="TRANSACTIONAL"/>
          <property name="writeSynchronizationMode" value="FULL_SYNC"/>

          <property name="queryEntities">
            <list>
              <bean class="org.apache.ignite.cache.QueryEntity">
                <property name="keyType" value="java.lang.Long"/>
                <property name="keyFieldName" value="id"/>
                <property name="valueType" value="Organization"/>

                <property name="fields">
                  <map>
                    <entry key="id" value="java.lang.Integer"/>
                    <entry key="name" value="java.lang.String"/>
                  </map>
                </property>

                <property name="indexes">
                    <list>
                        <bean class="org.apache.ignite.cache.QueryIndex">
                            <constructor-arg value="name"/>
                        </bean>
                    </list>
                </property>
              </bean>
            </list>
          </property>
        </bean>
      </list>
    </property>
  </bean>
</beans>

从上述配置中可以看出,定义了两个缓存,包含了PersonOrganization类型的数据,它们都列出了使用SQL可以读写的字段和索引。

10.3.3.接入集群

配置好然后启动集群,就可以从ODBC驱动端接入了。如何做呢?准备一个有效的连接串然后连接时将其作为一个参数传递给ODBC驱动就可以了。

另外,也可以像下面这样使用一个预定义的DSN来接入。

cpp
SQLHENV env;

// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);

// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);

SQLHDBC dbc;

// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);

// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";

// Connecting to Ignite Cluster.
SQLRETURN ret = SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);

if (!SQL_SUCCEEDED(ret))
{
  SQLCHAR sqlstate[7] = { 0 };
  SQLINTEGER nativeCode;

  SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
  SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));

  SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);

  std::cerr << "Failed to connect to Apache Ignite: "
            << reinterpret_cast<char*>(sqlstate) << ": "
            << reinterpret_cast<char*>(errMsg) << ", "
            << "Native error code: " << nativeCode
            << std::endl;

  // Releasing allocated handles.
  SQLFreeHandle(SQL_HANDLE_DBC, dbc);
  SQLFreeHandle(SQL_HANDLE_ENV, env);

  return;
}

10.3.4.查询数据

都准备好后,就可以使用ODBC API执行SQL查询了。

cpp
SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] = "SELECT firstName, lastName, salary, Organization.name FROM Person "
  "INNER JOIN \"Organization\".Organization ON Person.orgId = Organization.id";
SQLSMALLINT queryLen = static_cast<SQLSMALLINT>(sizeof(queryLen));

SQLRETURN ret = SQLExecDirect(stmt, query, queryLen);

if (!SQL_SUCCEEDED(ret))
{
  SQLCHAR sqlstate[7] = { 0 };
  SQLINTEGER nativeCode;

  SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
  SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));

  SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);

  std::cerr << "Failed to perfrom SQL query upon Apache Ignite: "
            << reinterpret_cast<char*>(sqlstate) << ": "
            << reinterpret_cast<char*>(errMsg) << ", "
            << "Native error code: " << nativeCode
            << std::endl;
}
else
{
  // Printing the result set.
  struct OdbcStringBuffer
  {
    SQLCHAR buffer[BUFFER_SIZE];
    SQLLEN resLen;
  };

  // Getting a number of columns in the result set.
  SQLSMALLINT columnsCnt = 0;
  SQLNumResultCols(stmt, &columnsCnt);

  // Allocating buffers for columns.
  std::vector<OdbcStringBuffer> columns(columnsCnt);

  // Binding colums. For simplicity we are going to use only
  // string buffers here.
  for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
    SQLBindCol(stmt, i + 1, SQL_C_CHAR, columns[i].buffer, BUFFER_SIZE, &columns[i].resLen);

  // Fetching and printing data in a loop.
  ret = SQLFetch(stmt);
  while (SQL_SUCCEEDED(ret))
  {
    for (size_t i = 0; i < columns.size(); ++i)
      std::cout << std::setw(16) << std::left << columns[i].buffer << " ";

    std::cout << std::endl;

    ret = SQLFetch(stmt);
  }
}

// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);

列绑定

在上例中,所有的列都绑定到SQL_C_CHAR,这意味着获取时所有的值都会被转换成字符串,这样做是为了简化,获取时进行值转换是非常慢的,因此默认的做法应该是与存储采用同样的方式进行获取。

10.3.5.插入数据

要将新的数据插入集群,ODBC端可以使用INSERT语句。

cpp
SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] =
	"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
	"VALUES (?, ?, ?, ?, ?, ?)";

SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

// Binding columns.
int64_t key = 0;
int64_t orgId = 0;
char name[1024] = { 0 };
SQLLEN nameLen = SQL_NTS;
double salary = 0.0;

SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,	sizeof(name), sizeof(name), name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);

// Filling cache.
key = 1;
orgId = 1;
strncpy(name, "John", sizeof(name));
salary = 2200.0;

SQLExecute(stmt);
SQLMoreResults(stmt);

++key;
orgId = 1;
strncpy(name, "Jane", sizeof(name));
salary = 1300.0;

SQLExecute(stmt);
SQLMoreResults(stmt);

++key;
orgId = 2;
strncpy(name, "Richard", sizeof(name));
salary = 900.0;

SQLExecute(stmt);
SQLMoreResults(stmt);

++key;
orgId = 2;
strncpy(name, "Mary", sizeof(name));
salary = 2400.0;

SQLExecute(stmt);

// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);

下面,是不使用预编译语句插入Organization数据:

cpp
SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query1[] = "INSERT INTO \"Organization\".Organization (id, name)
    VALUES (1L, 'Some company')";

SQLExecDirect(stmt, query1, static_cast<SQLSMALLINT>(sizeof(query1)));

SQLFreeStmt(stmt, SQL_CLOSE);

SQLCHAR query2[] = "INSERT INTO \"Organization\".Organization (id, name)
    VALUES (2L, 'Some other company')";

  SQLExecDirect(stmt, query2, static_cast<SQLSMALLINT>(sizeof(query2)));

// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);

错误检查

为了简化,上面的代码没有进行错误检查,但是在生产环境中不要这样做。

10.3.6.更新数据

下面使用UPDATE语句更新存储在集群中的部分人员的工资信息:

cpp
void AdjustSalary(SQLHDBC dbc, int64_t key, double salary)
{
  SQLHSTMT stmt;

  // Allocate a statement handle
  SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

  SQLCHAR query[] = "UPDATE Person SET salary=? WHERE id=?";

  SQLBindParameter(stmt, 1, SQL_PARAM_INPUT,
      SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);

  SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG,
      SQL_BIGINT, 0, 0, &key, 0, 0);

  SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

  // Releasing statement handle.
  SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}

...
AdjustSalary(dbc, 3, 1200.0);
AdjustSalary(dbc, 1, 2500.0);

10.3.7.删除数据

最后,使用DELETE语句删除部分记录:

cpp
void DeletePerson(SQLHDBC dbc, int64_t key)
{
  SQLHSTMT stmt;

  // Allocate a statement handle
  SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

  SQLCHAR query[] = "DELETE FROM Person WHERE id=?";

  SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT,
      0, 0, &key, 0, 0);

  SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

  // Releasing statement handle.
  SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}

...
DeletePerson(dbc, 1);
DeletePerson(dbc, 4);

10.3.8.通过参数数组进行批处理

Ignite的ODBC驱动支持在DML语句中通过参数数组进行批处理。

还是使用上述插入数据的示例,但是只调用一次SQLExecute:

cpp
SQLHSTMT stmt;

// Allocating a statement handle.
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] =
	"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
	"VALUES (?, ?, ?, ?, ?, ?)";

SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

// Binding columns.
int64_t key[4] = {0};
int64_t orgId[4] = {0};
char name[1024 * 4] = {0};
SQLLEN nameLen[4] = {0};
double salary[4] = {0};

SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,	1024, 1024, name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, salary, 0, 0);

// Filling cache.
key[0] = 1;
orgId[0] = 1;
strncpy(name, "John", 1023);
salary[0] = 2200.0;
nameLen[0] = SQL_NTS;

key[1] = 2;
orgId[1] = 1;
strncpy(name + 1024, "Jane", 1023);
salary[1] = 1300.0;
nameLen[1] = SQL_NTS;

key[2] = 3;
orgId[2] = 2;
strncpy(name + 1024 * 2, "Richard", 1023);
salary[2] = 900.0;
nameLen[2] = SQL_NTS;

key[3] = 4;
orgId[3] = 2;
strncpy(name + 1024 * 3, "Mary", 1023);
salary[3] = 2400.0;
nameLen[3] = SQL_NTS;

// Asking the driver to store the total number of processed argument sets
// in the following variable.
SQLULEN setsProcessed = 0;
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMS_PROCESSED_PTR, &setsProcessed, SQL_IS_POINTER);

// Setting the size of the arguments array. This is 4 in our case.
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMSET_SIZE, reinterpret_cast<SQLPOINTER>(4), 0);

// Executing the statement.
SQLExecute(stmt);

// Releasing the statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);

注意

注意这种类型的批处理目前只支持INSERT、UPDATE、 DELETE、和MERGE语句,还不支持SELECT,data-at-execution功能也不支持通过参数数组进行批处理。

10.3.9.流处理

Ignite的ODBC驱动可以通过SET STREAMING命令对流化数据进行批量处理,具体可以看SET STREAMING的相关内容。

注意

流处理模式中,参数数组和data-at-execution参数是不支持的。

10.4.规范

10.4.1.概述

ODBC定义了若干接口一致性级别,在本章中可以知道Ignite的ODBC驱动支持了哪些特性。

10.4.2.核心接口一致性

特性支持程度备注
通过调用SQLAllocHandleSQLFreeHandle来分配和释放所有处理器类型
使用SQLFreeStmt函数的所有形式
通过调用SQLBindCol,绑定列结果集
通过调用SQLBindParameterSQLNumParams,处理动态参数,包括参数数组,只针对输入方向,
指定绑定偏移量
使用数据执行对话框,涉及SQLParamDataSQLPutData的调用
管理游标和游标名部分实现了SQLCloseCursor,Ignite不支持命名游标
通过调用SQLColAttributeSQLDescribeColSQLNumResultColsSQLRowCount,访问结果集的描述(元数据)
通过调用目录函数SQLColumnsSQLGetTypeInfoSQLStatisticsSQLStatistics查询数据字典部分不支持SQLStatistics
通过调用SQLConnectSQLDataSourcesSQLDisconnectSQLDriverConnect管理数据源和连接,通过SQLDrivers获取驱动的信息,不管支持ODBC那个级别。
通过调用SQLExecDirectSQLExecuteSQLPrepare预编译和执行SQL语句。
通过调用SQLFetch,或者将FetchOrientation参数设置为SQL_FETCH_NEXT之后调用SQLFetchScroll,获取一个结果集或者多行数据中的一行,只能向前
通过调用SQLGetData,获得一个未绑定的列
通过调用SQLGetConnectAttrSQLGetEnvAttrSQLGetStmtAttr,获取所有属性的当前值,或者通过调用SQLSetConnectAttrSQLSetEnvAttrSQLSetStmtAttr,将所有属性赋为默认值,以及为某个属性赋为非默认值。部分并不支持所有属性
通过调用SQLCopyDescSQLGetDescFieldSQLGetDescRecSQLSetDescFieldSQLSetDescRec,操作描述符的某字段。
通过调用SQLGetDiagFieldSQLGetDiagRec,获得诊断信息。
通过调用SQLGetFunctionsSQLGetInfo,检测驱动兼容性,以及通过调用SQLNativeSql,在发送到数据源之前检测SQL语句中的任何文本代换的结果
使用SQLEndTran的语法提交一个事务,驱动的核心级别不需要支持真事务,因此,应用无法指定SQL_ROLLBACK或者为SQL_ATTR_AUTOCOMMIT连接属性指定SQL_AUTOCOMMIT_OFF
调用SQLCancel取消数据执行对话框,以及多线程环境中,在另一个线程中取消ODBC函数的执行,核心级别的接口一致性不需要支持函数的异步执行,也不需要使用SQLCancel取消一个ODBC函数的异步执行。平台和ODBC驱动都不需要多线程地同时自主活动,不过在多线程环境中,ODBC驱动必须是线程安全的,从应用来的请求的序列化是实现这个规范的一致的方式,即使它导致了一系列的性能问题。当前的ODBC驱动实现不支持异步执行
通过调用SQLSpecialColumns获得表的行标识符SQL_BEST_ROWID部分当前的实现总是返回空

10.4.3.Level1接口一致性

特性支持程度备注
指定数据库表和视图的模式(使用两部分命名)。
ODBC函数调用的真正异步执行,在给定的连接上,适用的函数要么是全同步的,要么是全异步的。
使用可滚动的游标,调用SQLFetchScroll时使用FetchOrientation参数而不是SQL_FETCH_NEXT,可以在方法内访问结果集而不是只能向前。
通过调用SQLPrimaryKeys获得表的主键。部分目前返回空结果集。
使用存储过程,通过调用SQLProcedureColumnsSQLProcedures,使用ODBC的转义序列进行存储过程数据字典的查询以及存储过程的调用。
通过调用SQLBrowseConnect,通过交互式浏览可用的服务器接入一个数据源。
使用ODBC函数而不是SQL语句来执行某个数据库操作:带有SQL_POSITIONSQL_REFRESHSQLSetPos
通过调用SQLMoreResults,访问由批处理和存储过程生成的多结果集的内容。
划定跨越多个ODBC函数的事务边界,获得真正的原子性以及在SQLEndTran中指定SQL_ROLLBACK的能力。Ignite SQL不支持事务

10.4.4.Level2接口一致性

特性支持程度备注
使用三部分命名的数据库表和视图。Ignite SQL不支持catalog。
通过调用SQLDescribeParam描述动态参数。
不仅仅使用输入参数,还使用输出参数以及输入/输出参数,还有存储过程的结果。Ignite SQL不支持输出参数。
使用书签,通过在第0列上调用SQLDescribeColSQLColAttribute获得书签;通过调用SQLFetchScroll时将参数FetchOrientation配置为SQL_FETCH_BOOKMARK,在书签上进行获取;通过调用SQLBulkOperations时将参数配置为SQL_UPDATE_BY_BOOKMARKSQL_DELETE_BY_BOOKMARKSQL_FETCH_BY_BOOKMARK可以进行书签的更新、删除和获取操作。Ignite SQL不支持书签。
通过调用SQLColumnPrivilegesSQLForeignKeysSQLTablePrivileges获取数据字典的高级信息。部分SQLForeignKeys已经实现,但是返回空的结果集。
通过在SQLBulkOperations中使用SQL_ADD或者在SQLSetPos中使用SQL_DELETESQL_UPDATE,使用ODBC函数而不是SQL语句执行额外的数据库操作。
为某个语句开启ODBC函数的异步执行。
通过调用SQLSpecialColumns获得表的SQL_ROWVER列标识符。部分已实现,但是返回空结果集。
SQL_ATTR_CONCURRENCY语句参数配置除了SQL_CONCUR_READ_ONLY以外的至少一个值。
登录请求以及SQL查询的超时功能(SQL_ATTR_LOGIN_TIMEOUTSQL_ATTR_QUERY_TIMEOUT)。部分SQL_ATTR_QUERY_TIMEOUT支持已实现,SQL_ATTR_LOGIN_TIMEOUT还未实现。
修改默认隔离级别的功能,在隔离级别为序列化时支持事务的功能。Ignite SQL不支持事务。

10.4.5.函数支持

函数名支持程度一致性级别
SQLAllocHandleCore
SQLBindColCore
SQLBindParameterCore
SQLBrowseConnectLevel1
SQLBulkOperationsLevel1
SQLCancelCore
SQLCloseCursorCore
SQLColAttributeCore
SQLColumnPrivilegesLevel2
SQLColumnsCore
SQLConnectCore
SQLCopyDescCore
SQLDataSourcesN/ACore
SQLDescribeColCore
SQLDescribeParamLevel2
SQLDisconnectCore
SQLDriverConnectCore
SQLDriversN/ACore
SQLEndTran部分Core
SQLExecDirectCore
SQLExecuteCore
SQLFetchCore
SQLFetchScrollCore
SQLForeignKeys部分Level2
SQLFreeHandleCore
SQLFreeStmtCore
SQLGetConnectAttr部分Core
SQLGetCursorNameCore
SQLGetDataCore
SQLGetDescFieldCore
SQLGetDescRecCore
SQLGetDiagFieldCore
SQLGetDiagRecCore
SQLGetEnvAttr部分Core
SQLGetFunctionsCore
SQLGetInfoCore
SQLGetStmtAttr部分Core
SQLGetTypeInfoCore
SQLMoreResultsLevel1
SQLNativeSqlCore
SQLNumParamsCore
SQLNumResultColsCore
SQLParamDataCore
SQLPrepareCore
SQLPrimaryKeys部分Level1
SQLProcedureColumnsLevel1
SQLProceduresLevel1
SQLPutDataCore
SQLRowCountCore
SQLSetConnectAttr部分Core
SQLSetCursorNameCore
SQLSetDescFieldCore
SQLSetDescRecCore
SQLSetEnvAttr部分Core
SQLSetPosLevel1
SQLSetStmtAttr部分Core
SQLSpecialColumns部分Core
SQLStatisticsCore
SQLTablePrivilegesLevel2
SQLTablesCore

10.4.6.环境属性一致性

特性支持程度一致性级别
SQL_ATTR_CONNECTION_POOLING可选
SQL_ATTR_CP_MATCH可选
SQL_ATTR_ODBC_VERCore
SQL_ATTR_OUTPUT_NTS可选

10.4.7.连接属性一致性

特性支持程度一致性级别
SQL_ATTR_ACCESS_MODECore
SQL_ATTR_ASYNC_ENABLELevel1/Level2
SQL_ATTR_AUTO_IPDLevel2
SQL_ATTR_AUTOCOMMITLevel1
SQL_ATTR_CONNECTION_DEADLevel1
SQL_ATTR_CONNECTION_TIMEOUTLevel2
SQL_ATTR_CURRENT_CATALOGLevel2
SQL_ATTR_LOGIN_TIMEOUTLevel2
SQL_ATTR_ODBC_CURSORSCore
SQL_ATTR_PACKET_SIZELevel2
SQL_ATTR_QUIET_MODECore
SQL否_ATTR_TRACECore
SQL_AT否TR_TRACEFILECore
SQL_AT否TR_TRANSLATE_LIBCore
SQL_ATTR_TRANSLATE_OPTIONCore
SQL_ATTR_TXN_ISOLATIONLevel1/Level2

10.4.8.语句属性一致性

特性支持程度一致性级别
SQL_ATTR_APP_PARAM_DESC部分Core
SQL_ATTR_APP_ROW_DESC部分Core
SQL_ATTR_ASYNC_ENABLELevel1/Level2
SQL_ATTR_CONCURRENCYLevel1/Level2
SQL_ATTR_CURSOR_SCROLLABLELevel1
SQL_ATTR_CURSOR_SENSITIVITYLevel2
SQL_ATTR_CURSOR_TYPELevel1/Level2
SQL_ATTR_ENABLE_AUTO_IPDLevel2
SQL_ATTR_FETCH_BOOKMARK_PTRLevel2
SQL_ATTR_IMP_PARAM_DESC部分Core
SQL_ATTR_IMP_ROW_DESC部分Core
SQL_ATTR_KEYSET_SIZELevel2
SQL_ATTR_MAX_LENGTHLevel1
SQL_ATTR_MAX_ROWSLevel1
SQL_ATTR_METADATA_IDCore
SQL_ATTR_NOSCANCore
SQL_ATTR_PARAM_BIND_OFFSET_PTRCore
SQL_ATTR_PARAM_BIND_TYPECore
SQL_ATTR_PARAM_OPERATION_PTRCore
SQL_ATTR_PARAM_STATUS_PTRCore
SQL_ATTR_PARAMS_PROCESSED_PTRCore
SQL_ATTR_PARAMSET_SIZECore
SQL_ATTR_QUERY_TIMEOUTLevel2
SQL_ATTR_RETRIEVE_DATALevel1
SQL_ATTR_ROW_ARRAY_SIZECore
SQL_ATTR_ROW_BIND_OFFSET_PTRCore
SQL_ATTR_ROW_BIND_TYPECore
SQL_ATTR_ROW_NUMBERLevel1
SQL_ATTR_ROW_OPERATION_PTRLevel1
SQL_ATTR_ROW_STATUS_PTRCore
SQL_ATTR_ROWS_FETCHED_PTRCore
SQL_ATTR_SIMULATE_CURSORLevel2
SQL_ATTR_USE_BOOKMARKSLevel2

10.4.9.描述符头字段一致性

特性支持程度一致性级别
SQL_DESC_ALLOC_TYPECore
SQL_DESC_ARRAY_SIZECore
SQL_DESC_ARRAY_STATUS_PTRCore/Level1
SQL_DESC_BIND_OFFSET_PTRCore
SQL_DESC_BIND_TYPECore
SQL_DESC_COUNTCore
SQL_DESC_ROWS_PROCESSED_PTRCore

10.4.10.描述符记录字段一致性

特性支持程度一致性级别
SQL_DESC_AUTO_UNIQUE_VALUELevel2
SQL_DESC_BASE_COLUMN_NAMECore
SQL_DESC_BASE_TABLE_NAMELevel1
SQL_DESC_CASE_SENSITIVECore
SQL_DESC_CATALOG_NAMELevel2
SQL_DESC_CONCISE_TYPECore
SQL_DESC_DATA_PTRCore
SQL_DESC_DATETIME_INTERVAL_CODECore
SQL_DESC_DATETIME_INTERVAL_PRECISIONCore
SQL_DESC_DISPLAY_SIZECore
SQL_DESC_FIXED_PREC_SCALECore
SQL_DESC_INDICATOR_PTRCore
SQL_DESC_LABELLevel2
SQL_DESC_LENGTHCore
SQL_DESC_LITERAL_PREFIXCore
SQL_DESC_LITERAL_SUFFIXCore
SQL_DESC_LOCAL_TYPE_NAMECore
SQL_DESC_NAMECore
SQL_DESC_NULLABLECore
SQL_DESC_OCTET_LENGTHCore
SQL_DESC_OCTET_LENGTH_PTRCore
SQL_DESC_PARAMETER_TYPECore/Level2
SQL_DESC_PRECISIONCore
SQL_DESC_ROWVERLevel1
SQL_DESC_SCALECore
SQL_DESC_SCHEMA_NAMELevel1
SQL_DESC_SEARCHABLECore
SQL_DESC_TABLE_NAMELevel1
SQL_DESC_TYPECore
SQL_DESC_TYPE_NAMECore
SQL_DESC_UNNAMEDCore
SQL_DESC_UNSIGNEDCore
SQL_DESC_UPDATABLECore

10.4.11.SQL数据类型

下面是支持的SQL数据类型:

数据类型是否支持
SQL_CHAR
SQL_VARCHAR
SQL_LONGVARCHAR
SQL_WCHAR
SQL_WVARCHAR
SQL_WLONGVARCHAR
SQL_DECIMAL
SQL_NUMERIC
SQL_SMALLINT
SQL_INTEGER
SQL_REAL
SQL_FLOAT
SQL_DOUBLE
SQL_BIT
SQL_TINYINT
SQL_BIGINT
SQL_BINARY
SQL_VARBINARY
SQL_LONGVARBINARY
SQL_TYPE_DATE
SQL_TYPE_TIME
SQL_TYPE_TIMESTAMP
SQL_TYPE_UTCDATETIME
SQL_TYPE_UTCTIME
SQL_INTERVAL_MONTH
SQL_INTERVAL_YEAR
SQL_INTERVAL_YEAR_TO_MONTH
SQL_INTERVAL_DAY
SQL_INTERVAL_HOUR
SQL_INTERVAL_MINUTE
SQL_INTERVAL_SECOND
SQL_INTERVAL_DAY_TO_HOUR
SQL_INTERVAL_DAY_TO_MINUTE
SQL_INTERVAL_DAY_TO_SECOND
SQL_INTERVAL_HOUR_TO_MINUTE
SQL_INTERVAL_HOUR_TO_SECOND
SQL_INTERVAL_MINUTE_TO_SECOND
SQL_GUID

10.4.12.C数据类型

下面是支持的C数据类型:

数据类型是否支持
SQL_C_CHAR
SQL_C_WCHAR
SQL_C_SHORT
SQL_C_SSHORT
SQL_C_USHORT
SQL_C_LONG
SQL_C_SLONG
SQL_C_ULONG
SQL_C_FLOAT
SQL_C_DOUBLE
SQL_C_BIT
SQL_C_TINYINT
SQL_C_STINYINT
SQL_C_UTINYINT
SQL_C_BIGINT
SQL_C_SBIGINT
SQL_C_UBIGINT
SQL_C_BINARY
SQL_C_BOOKMARK
SQL_C_VARBOOKMARK
SQL_C_INTERVAL* (all interval types)
SQL_C_TYPE_DATE
SQL_C_TYPE_TIME
SQL_C_TYPE_TIMESTAMP
SQL_C_NUMERIC
SQL_C_GUID

10.5.数据类型

支持如下的SQL数据类型(规范中列出):

  • SQL_CHAR
  • SQL_VARCHAR
  • SQL_LONGVARCHAR
  • SQL_SMALLINT
  • SQL_INTEGER
  • SQL_FLOAT
  • SQL_DOUBLE
  • SQL_BIT
  • SQL_TINYINT
  • SQL_BIGINT
  • SQL_BINARY
  • SQL_VARBINARY
  • SQL_LONGVARBINARY
  • SQL_GUID
  • SQL_DECIMAL
  • SQL_TYPE_DATE
  • SQL_TYPE_TIMESTAMP
  • SQL_TYPE_TIME

10.6.错误码

要获取错误码, 可以使用SQLGetDiagRec()函数,它会返回一个ANSI SQL标准定义的错误码字符串,比如:

cpp
SQLHENV env;
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);

SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);

SQLHDBC dbc;
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);

SQLCHAR connectStr[] = "DRIVER={Apache Ignite};SERVER=localhost;PORT=10800;SCHEMA=Person;";
SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, 0, 0, 0, SQL_DRIVER_COMPLETE);

SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] = "SELECT firstName, lastName, resume, salary FROM Person";
SQLRETURN ret = SQLExecDirect(stmt, query, SQL_NTS);

if (ret != SQL_SUCCESS)
{
	SQLCHAR sqlstate[7] = "";
	SQLINTEGER nativeCode;

	SQLCHAR message[1024];
	SQLSMALLINT reallen = 0;

	int i = 1;
	ret = SQLGetDiagRec(SQL_HANDLE_STMT, stmt, i, sqlstate,
                      &nativeCode, message, sizeof(message), &reallen);

	while (ret != SQL_NO_DATA)
	{
		std::cout << sqlstate << ": " << message;

		++i;
		ret = SQLGetDiagRec(SQL_HANDLE_STMT, stmt, i, sqlstate,
                        &nativeCode, message, sizeof(message), &reallen);
	}
}

下表中列出了所有Ignite目前支持的错误码,该列表未来可能会扩展:

错误码描述
01S00无效连接串属性
01S02驱动程序不支持指定的值,并替换了一个类似的值
08001驱动接入集群失败
08002连接已经建立
08003未知原因导致的连接处于关闭状态
08004连接被集群踢出
08S01连接失败
22026字符串长度与数据执行对话框不匹配
23000违反完整性约束(比如主键重复、主键为空等等)
24000无效的游标状态
42000请求的语法错误
42S01表已经存在
42S02表不存在
42S11索引已经存在
42S12索引不存在
42S21列已经存在
42S22列不存在
HY000一般性错误,具体看错误消息
HY001内存分配错误
HY003无效的应用缓冲区类型
HY004无效的SQL数据类型
HY009无效的空指针使用
HY010函数调用顺序错误
HY090无效的字符串和缓冲区长度(比如长度为负或者为0)
HY092可选类型超范围
HY097列类型超范围
HY105无效的参数类型
HY106获取类型超范围
HYC00特性未实现
IM001函数不支持

11.多版本并发控制

警告

MVCC当前处于测试阶段。

11.1.概述

配置为TRANSACTIONAL_SNAPSHOT原子化模式的缓存,支持SQL事务以及键-值事务,并且为两种类型的事务开启了多版本并发控制(MVCC)。

11.2.多版本并发控制

多版本并发控制,是一种在多用户并发访问时,控制数据一致性的方法,MVCC实现了快照隔离保证,确保每个事务总是看到一致的数据快照。

每个事务在开始时会获得一个数据的一致性快照,并且只能查看和修改该快照中的数据。当事务更新一个条目时,Ignite验证其它事务没有更新该条目,并创建该条目的一个新的版本,新版本只有在事务成功提交时才对其它事务可见。如果该条目已被更新,当前事务会失败并抛出异常(关于如何处理更新冲突,请参见下面的并发更新章节的介绍)。

快照不是物理快照,而是由MVCC协调器生成的逻辑快照,MVCC协调器是协调集群中的事务活动的集群节点。协调器跟踪所有活动的事务,并在每个事务完成时得到通知。启用MVCC的缓存的所有操作都需要从协调器请求一个数据的快照。

11.3.开启MVCC

要为缓存开启MVCC,需要在缓存配置中使用TRANSACTIONAL_SNAPSHOT原子化模式,如果使用CREATE TABLE命令创建表,则需要在该命令的WITH子句中将原子化模式作为参数传进去。

xml
<bean class="org.apache.ignite.configuration.IgniteConfiguration">

    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <property name="name" value="myCache"/>
            <property name="atomicityMode" value="TRANSACTIONAL_SNAPSHOT"/>
        </bean>
    </property>
</bean>
sql
CREATE TABLE Person WITH "ATOMICITY=TRANSACTIONAL_SNAPSHOT"

提示

TRANSACTIONAL_SNAPSHOT模式只支持默认的并发模型(PESSIMISTIC)和默认的隔离级别(REPEATABLE_READ),具体可以看上面的并发模型和隔离级别章节。

11.4.并发更新

在一个事务中,如果一个条目先被读取然后被更新,那么就有一种可能性,即另一个事务可能在两个操作之间切入然后首先更新该条目,这时,当第一个事务试图更新该条目时就会抛出异常,然后该事务会被标记为只能回滚,这时开发者就需要进行事务重试。

那么怎么知道发生了冲突呢?

  • 如果使用了Java的事务API,会抛出CacheException异常(异常信息为Cannot serialize transaction due to write conflict (transaction is marked for rollback)),并且Transaction.rollbackOnly标志为true
  • 如果通过JDBC/ODBC驱动执行了SQL事务,那么会得到SQLSTATE:40001错误代码。
java
for(int i = 1; i <=5 ; i++) {
    try (Transaction tx = Ignition.ignite().transactions().txStart()) {
        System.out.println("attempt #" + i + ", value: " + cache.get(1));
        try {
            cache.put(1, "new value");
            tx.commit();
            System.out.println("attempt #" + i + " succeeded");
            break;
        } catch (CacheException e) {
            if (!tx.isRollbackOnly()) {
              // Transaction was not marked as "rollback only",
              // so it's not a concurrent update issue.
              // Process the exception here.
                break;
            }
        }
    }
}
java
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

// Open JDBC connection.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1");

PreparedStatement updateStmt = null;
PreparedStatement selectStmt = null;

try {
    // starting a transaction
    conn.setAutoCommit(false);

    selectStmt = conn.prepareStatement("select name from Person where id = 1");
    selectStmt.setInt(1, 1);
    ResultSet rs = selectStmt.executeQuery();

    if (rs.next())
        System.out.println("name = " + rs.getString("name"));

    updateStmt = conn.prepareStatement("update Person set name = ? where id = ? ");

    updateStmt.setString(1, "New Name");
    updateStmt.setInt(2, 1);
    updateStmt.executeUpdate();

    // committing the transaction
    conn.commit();
} catch (SQLException e) {
    if ("40001".equals(e.getSQLState())) {
        // retry the transaction
    } else {
        // process the exception
    }
} finally {
    if (updateStmt != null) updateStmt.close();
    if (selectStmt != null) selectStmt.close();
}
csharp
for (var i = 1; i <= 5; i++)
{
    using (var tx = ignite.GetTransactions().TxStart())
    {
        Console.WriteLine($"attempt #{i}, value: {cache.Get(1)}");
        try
        {
            cache.Put(1, "new value");
            tx.Commit();
            Console.WriteLine($"attempt #{i} succeeded");
            break;
        }
        catch (CacheException)
        {
            if (!tx.IsRollbackOnly)
            {
                // Transaction was not marked as "rollback only",
                // so it's not a concurrent update issue.
                // Process the exception here.
                break;
            }
        }
    }
}
cpp
for (int i = 1; i <= 5; i++)
{
    Transaction tx = ignite.GetTransactions().TxStart();
    std::cout << "attempt #" << i << ", value: " << cache.Get(1) << std::endl;
    try {
        cache.Put(1, "new value");
        tx.Commit();
        std::cout << "attempt #" << i << " succeeded" << std::endl;
        break;
    }
    catch (IgniteError e)
    {
        if (!tx.IsRollbackOnly())
        {
            // Transaction was not marked as "rollback only",
            // so it's not a concurrent update issue.
            // Process the exception here.
            break;
        }
    }
}

11.5.限制

11.5.1.跨缓存事务

TRANSACTIONAL_SNAPSHOT模式是缓存级的,并且事务中涉及的缓存不允许有不同的原子化模式,因此,如果希望在一个SQL事务中覆盖多个表,则必须使用TRANSACTIONAL_SNAPSHOT模式创建所有表。

11.5.2.嵌套事务

通过JDBC/ODBC连接参数,Ignite支持三种模式来处理SQL的嵌套事务:

JDBC连接串:

jdbc:ignite:thin://127.0.0.1/?nestedTransactionsMode=COMMIT

当在一个事务中出现了嵌套事务,系统的行为依赖于nestedTransactionsMode参数:

  • ERROR:如果发生了嵌套事务,会抛出错误,包含事务会回滚,这是默认的行为;
  • COMMIT:包含事务会被提交,嵌套事务开始并且在遇到COMMIT语句时会提交,包含事务中的其余部分会作为隐式事务执行;
  • IGNORE不要使用这个模式,嵌套事务的开始会被忽略,嵌套事务中的语句会作为包含事务的一部分来执行,然后所有的变更会随着嵌套事务的提交而提交,包含事务中的其余部分会作为隐式事务执行。

11.5.3.持续查询

如果在开启了MVCC的缓存上使用持续查询,那么有些限制需要知道:

  • 当接收到更新事件时,在MVCC协调器得知更新之前,后续读取已更新键的操作可能会在一段时间内返回旧值。这是因为更新事件是从更新键的节点发送的,并且是在键更新后立即发送的。这时MVCC协调器可能不会立即感知该更新,因此,随后的读取可能会在这段时间内返回过时的信息。
  • 当使用持续查询时,每个节点单个事务可以更新的键数量是有限制的。更新后的值保存在内存中,如果更新太多,节点可能没有足够的内存来保存所有对象。为了避免内存溢出错误,每个事务在一个节点上只能最多更新20000个键(默认值)。如果超过此值,事务将抛出异常并回滚。可以通过指定IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD系统属性来修改该值。

11.5.4.其它的限制

开启MVCC的缓存,下面的特性是不支持的,这些限制后续的版本可能会解决:

18624049226