Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version>
</dependency>
x-- 切测试库
use test_db;
-- 创建测试表flinktest
CREATE TABLE flinktest
(
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
-- 插入样例数据
insert into flinktest values
(1,1,'jim',2),
(2,1,'grace',2),
(3,2,'tom',2),
(4,3,'bush',3),
(5,3,'helen',3);
-- 查看表数据情况
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 1 | 1 | jim | 2 |
| 5 | 3 | helen | 3 |
| 4 | 3 | bush | 3 |
| 3 | 2 | tom | 2 |
| 2 | 1 | grace | 2 |
+--------+----------+----------+------+
Doris Type | Flink Type |
---|---|
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
DECIMALV2 | DECIMAL |
TIME | DOUBLE |
HLL | Unsupported datatype |
代码示例:
xxxxxxxxxx
package com.zenitera.bigdata.doris;
import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class Flink_stream_read_doris {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
Properties props = new Properties();
props.setProperty("fenodes", "hdt-dmcp-ops01:8130");
props.setProperty("username", "root");
props.setProperty("password", "123456");
props.setProperty("table.identifier", "test_db.flinktest");
env
.addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema()))
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*
代码控制台输出:
[4, 3, bush, 3]
[2, 1, grace, 2]
[1, 1, jim, 2]
[5, 3, helen, 3]
[3, 2, tom, 2]
*/
Flink 读写 Doris 数据主要有两种方式
代码示例:
xxxxxxxxxx
package com.zenitera.bigdata.doris;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
* 使用 Flink 将 JSON 数据 写到Doris数据库
*/
public class Flink_stream_write_doris_json {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
env
.fromElements("{\"siteid\":\"10\", \"citycode\": \"1001\",\"username\": \"ww\",\"pv\":\"100\"}")
.addSink(DorisSink.sink(
new DorisExecutionOptions.Builder()
.setBatchIntervalMs(2000L)
.setEnableDelete(false)
.setMaxRetries(3)
.setStreamLoadProp(pro)
.build(),
new DorisOptions.Builder()
.setFenodes("hdt-dmcp-ops01:8130")
.setUsername("root")
.setPassword("123456")
.setTableIdentifier("test_db.flinktest")
.build())
);
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*
代码执行前: 5 rows
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 1 | 1 | jim | 2 |
| 5 | 3 | helen | 3 |
| 4 | 3 | bush | 3 |
| 3 | 2 | tom | 2 |
| 2 | 1 | grace | 2 |
+--------+----------+----------+------+
代码执行后: 6 rows
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 2 | 1 | grace | 2 |
| 3 | 2 | tom | 2 |
| 5 | 3 | helen | 3 |
| 1 | 1 | jim | 2 |
| 10 | 1001 | ww | 100 |
| 4 | 3 | bush | 3 |
+--------+----------+----------+------+
*/
代码示例:
xxxxxxxxxx
package com.zenitera.bigdata.doris;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.*;
public class Flink_stream_write_doris_rowdata {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()};
String[] fields = {"siteid", "citycode", "username", "pv"};
env
.fromElements("{\"siteid\":\"100\", \"citycode\": \"1002\",\"username\": \"wang\",\"pv\":\"100\"}")
.map(json -> {
JSONObject obj = JSON.parseObject(json);
GenericRowData rowData = new GenericRowData(4);
rowData.setField(0, obj.getIntValue("siteid"));
rowData.setField(1, obj.getShortValue("citycode"));
rowData.setField(2, StringData.fromString(obj.getString("username")));
rowData.setField(3, obj.getLongValue("pv"));
return rowData;
})
.addSink(DorisSink.sink(
fields,
types,
new DorisExecutionOptions.Builder()
.setBatchIntervalMs(2000L)
.setEnableDelete(false)
.setMaxRetries(3)
.build(),
new DorisOptions.Builder()
.setFenodes("hdt-dmcp-ops01:8130")
.setUsername("root")
.setPassword("123456")
.setTableIdentifier("test_db.flinktest")
.build())
);
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*
代码执行前: 6 rows
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 2 | 1 | grace | 2 |
| 3 | 2 | tom | 2 |
| 5 | 3 | helen | 3 |
| 1 | 1 | jim | 2 |
| 10 | 1001 | ww | 100 |
| 4 | 3 | bush | 3 |
+--------+----------+----------+------+
代码执行后: 7 rows
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 1 | 1 | jim | 2 |
| 2 | 1 | grace | 2 |
| 3 | 2 | tom | 2 |
| 5 | 3 | helen | 3 |
| 10 | 1001 | ww | 100 |
| 100 | 1002 | wang | 100 |
| 4 | 3 | bush | 3 |
+--------+----------+----------+------+
*/
Doris测试表:
xxxxxxxxxx
use test_db;
truncate table flinktest;
insert into flinktest values
(1,1,'aaa',1),
(2,2,'bbb',2),
(3,3,'ccc',3);
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 2 | 2 | bbb | 2 |
| 1 | 1 | aaa | 1 |
| 3 | 3 | ccc | 3 |
+--------+----------+----------+------+
3 rows in set (0.01 sec)
Flink-SQL代码示例:
xxxxxxxxxx
package com.zenitera.bigdata.doris;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Flink_SQL_doris {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table flink_0518(" +
" siteid int, " +
" citycode int, " +
" username string, " +
" pv bigint " +
")with(" +
" 'connector' = 'doris', " +
" 'fenodes' = 'hdt-dmcp-ops01:8130', " +
" 'table.identifier' = 'test_db.flinktest', " +
" 'username' = 'root', " +
" 'password' = '123456' " +
")");
tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");
}
public static class Flink_0518 {
private Integer siteid;
private Integer citycode;
private String username;
private Long pv;
}
}
执行代码,执行完成后查看Doris对应表数据进行验证:
xxxxxxxxxx
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 3 | 3 | ccc | 3 |
| 2 | 2 | bbb | 2 |
| 1 | 1 | aaa | 1 |
| 4 | 4 | wangting | 4 |
+--------+----------+----------+------+
4 rows in set (0.01 sec)
xxxxxxxxxx
package com.zenitera.bigdata.doris;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Flink_SQL_doris_read {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table flink_0520(" +
" siteid int, " +
" citycode SMALLINT, " +
" username string, " +
" pv bigint " +
")with(" +
" 'connector' = 'doris', " +
" 'fenodes' = 'hdt-dmcp-ops01:8130', " +
" 'table.identifier' = 'test_db.flinktest', " +
" 'username' = 'root', " +
" 'password' = '123456' " +
")");
tEnv.sqlQuery("select * from flink_0520").execute().print();
}
}
/*
控制台输出信息:
+----+-------------+----------+---------------+---------+
| op | siteid | citycode | username | pv |
+----+-------------+----------+---------------+---------+
| +I | 1 | 1 | aaa | 1 |
| +I | 3 | 3 | ccc | 3 |
| +I | 2 | 2 | bbb | 2 |
| +I | 4 | 4 | wangting | 4 |
+----+-------------+----------+---------------+---------+
4 rows in set
*/