TiDB-flink-banner

Author: Fengbiao Liang (TiDB Cloud Solution Engineer at PingCAP)
Editors: Calvin Weng, Tom Dewan

TiDB is a distributed SQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads. Apache Flink is the most popular, open-source computing framework. It provides high-throughput, low-latency data computing and exactly-once semantics.

If you use TiDB with Flink, you may have had to quickly ingest streaming data from Apache Flink to TiDB. However, if you use Apache Flink’s default configuration, data ingestion performance will be limited. To achieve higher performance, you must: 

  • Enable bulk insert.
  • Establish a high concurrency connection.
  • Run the SQL ON DUPLICATE KEY UPDATE clause.

In this article, we share two ways you can achieve high-performance data ingestion by adjusting some parameters and code.

Test environment

Our test environment was:

  • Apache Flink: V1.13.5
  • Programing language: Scala V2.12
  • MySQL JDBC Connector: V8.0.27
  • Database: TiDB V5.3.0

Sample test cases 

There are different approaches to achieving high performance in this scenario.  We will recommend two of them, both of which enable BULK INSERT. The sample code is written in Scala.   

Solution 1: Use the Flink SQL statement

The Flink SQL statement is the easiest way to enable BULK INSERT from other data sources to TiDB. 

  1. Create the SQL table:
    var createTableStatement = """
           | CREATE TABLE order_item (
           | id BIGINT,
           | productId INT,
           | name STRING,
           | price DECIMAL(8,2),
           | cnt INT,
           | PRIMARY KEY (`id`) NOT ENFORCED
           | ) WITH (
           | 'connector' = 'jdbc',
           | 'url' = 'jdbc:mysql://<tidb_addr>:4000/?useServerPrepStmts=true
           | &cachePrepStmts=true&rewriteBatchedStatements=true', 
           | 'driver' = 'com.mysql.jdbc.Driver',
           | 'table-name' = 't_order_item',
           | 'username' = 'root',
           | 'password' = '',
           | 'sink.buffer-flush.max-rows' = '200',
           | 'sink.buffer-flush.interval' = '3',
           | 'sink.parallelism' = '200' ) 
          |""".stripMargin

    The SQL statement above creates a table in Apache Flink that maps to the TiDB table. For better performance, note the following:

    Parameter Comment
    rewriteBatchedStatements Rewrite single statement to batch; set to true
    sink.buffer-flush.max-rows Batch size of a bulk insert
    sink.parallelism Number of concurrent connections to TiDB
  2. Execute the insert SQL statement.

    You can execute the following code to insert data to TiDB. The first line creates a table using the SQL statement from step 1.

    The second line is the data from a data source table named mock_orderitem. When the code runs, it generates the insert SQL statement. For example: insert into t (a) values (10), (11), (12) on duplicate key update a = values(a);

    tableEnv.executeSql(createTableStatement); 
    tableEnv.executeSql("""INSERT INTO order_item (
          | id, productId, name, price, cnt)  
          | select id, productId, name, price, cnt 
          | FROM mock_orderitem""".stripMargin)
    

Solution 2: Use JDBC sink in your code

Developers favor this solution because it’s highly flexible. You can customize the data source, structure, and format. The sample code shown below: 

  1. Sets TiDB connection parameters.
  2. Uses “JdbcStatementBuilder” to generate SQL statements dynamically.
  3. Executes the SQL statement with multi-threads.
val executionOptions = JdbcExecutionOptions.builder.withBatchIntervalMs(3).withBatchSize(200).build 
val connectionOptions = (new JdbcConnectionOptions.JdbcConnectionOptionsBuilder) .withUrl("jdbc:mysql://<tidb_addr>:4000/?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true")
.withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("").build
var insertSQL = "INSERT INTO t_order_item (id, productId, name, price, cnt) values (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE id=values(id)"
val sb: JdbcStatementBuilder[OrderItem] = new JdbcStatementBuilder[OrderItem] {
override def accept(ps: PreparedStatement, t: OrderItem): Unit = {
ps.setLong(1, t.id);
ps.setInt(2, t.productId);
ps.setString(3, t.name);
ps.setInt(4, t.price);
ps.setInt(5, t.cnt);
}
}
val mySink = JdbcSink.sink(insertSQL, sb, executionOptions, connectionOptions) sourceDataStream.addSink(mySink).setParallelism(200) env.execute("TiDB Bulk Insert Job")

Note the following parameters in your code:

Parameter Comment
withBatchIntervalMs(ms)IU Interval for bulk insert
withBatchSize(number) Batch size of a bulk insert.
setParallelism(number) Number of concurrent connections to TiDB

Note:  

If you use the ON DUPLICATE KEY UPDATE clause in your SQL statement, please pay attention to the statement syntax. You should use the VALUES(col_name) function to refer to column values. For example: 

INSERT INTO table (id, name) values (?, ?) ON DUPLICATE KEY UPDATE id=values(id)

Summary

If you are writing a high-performance data ingestion program, we recommend that you read this article and test the parameters carefully. If you want to check whether BULK INSERT works, enable the TiDB general log to view the SQL statements. 

If you run into issues, feel free to join our community on Slack and TiDB Internals to share them with us. You can also request a demo for these methods from PingCAP.

Fengbiao Liang

About the Author

Fengbiao Liang

More From Fengbiao Liang

Subscribe to Stay Informed!

tidb-white

TiDB

TiDB is effortlessly scalable, open, and trusted to meet the real-time needs of the digital enterprise

cloud_icon-01

TiDB Cloud

Get the massive scale and resiliency of TiDB databases in a fully managed cloud service