Kubesphere创建Seatunnel集群
通过kubekey安装好kubernetes集群后,继续安装seatunnel 直接根据seatunnel的官方文档 (opens new window)一步一步安装即可 安装好后通过kubectl get pods -A查看 然后我需要从达梦数据库同步数据到clickhouse中 首先需要安装驱动
数据库类型 | 驱动类名 | JDBC URL | 驱动下载地址 |
---|---|---|---|
dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
然后根据文档内容放置
# Tip
Warn: for license compliance, you have to provide the database driver yourself. Copy it to the $SEATNUNNEL_HOME/lib/
directory in order to make it work.
e.g.
- If you use MySQL, download and copy
mysql-connector-java-xxx.jar
to$SEATNUNNEL_HOME/lib/
. - For Spark/Flink, also copy it to
$SPARK_HOME/jars/
or$FLINK_HOME/lib/
.
我这里即放在了lib下又放在了plugins/Jdbc/lib下 然后还需要加入seatunnel-hadoop3-3.1.4-uber-2.3.1-optional.jar (opens new window)
参考:https://github.com/apache/seatunnel/issues/4454
放在lib下即可
之后通过restapi下发任务就行
# 返回Zeta集群的概览
返回Zeta引擎集群的概览信息。
curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/overview?tag1=value1&tag2=value2"
# 参数
参数名称 | 是否必传 | 参数类型 | 参数描述 |
---|---|---|---|
tag键值对 | 否 | 字符串 | 一组标签值,通过该标签值过滤满足条件的节点信息。 |
# 响应
{
"projectVersion": "2.3.5-SNAPSHOT",
"gitCommitAbbrev": "DeadD0d0",
"totalSlot": "0",
"unassignedSlot": "0",
"works": "1",
"runningJobs": "0",
"finishedJobs": "0",
"failedJobs": "0",
"cancelledJobs": "0"
}
2
3
4
5
6
7
8
9
10
11
# 注意事项
- 当使用
dynamic-slot
时,返回结果中的totalSlot
和unassignedSlot
将始终为0。设置为固定的slot值后,集群中总的slot数量和未分配的slot数量将正确返回。 - 添加标签过滤后,
works
、totalSlot
、unassignedSlot
将返回满足条件的节点的相关指标。注意,runningJobs
等与作业相关的指标为集群级别的结果,无法根据标签进行过滤。
# 返回当前节点的线程堆栈信息
curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/thread-dump"
# 响应
[
{
"threadName": "",
"threadId": 0,
"threadState": "",
"stackTrace": ""
}
]
2
3
4
5
6
7
8
# 查询当前运行的任务
curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/running-jobs"
# 响应
[
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"envOptions": {
},
"createTime": "",
"jobDag": {
"vertices": [
],
"edges": [
]
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false,
"metrics": {
"sourceReceivedCount": "",
"sinkWriteCount": ""
}
}
]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 查询任务详情
curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/running-job/<JOB_ID>"
# 参数
参数名称 | 是否必传 | 参数类型 | 参数描述 |
---|---|---|---|
jobId | 是 | long | job id |
# 响应
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"createTime": "",
"jobDag": {
"vertices": [
],
"edges": [
]
},
"metrics": {
"SourceReceivedCount": "",
"SourceReceivedQPS": "",
"SourceReceivedBytes": "",
"SourceReceivedBytesPerSeconds": "",
"SinkWriteCount": "",
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
"TableSourceReceivedQPS": {},
"TableSinkWriteCount": {},
"TableSinkWriteQPS": {},
"TableSinkWriteBytes": {},
"TableSinkWriteBytesPerSeconds": {}
},
"finishedTime": "",
"errorMsg": null,
"envOptions": {
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
jobId
, jobName
, jobStatus
, createTime
, jobDag
, metrics
字段总会返回. envOptions
, pluginJarsUrls
, isStartWithSavePoint
字段在Job在RUNNING状态时会返回 finishedTime
, errorMsg
字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL
当我们查询不到这个Job时,返回结果为:
{
"jobId" : ""
}
2
3
# 返回所有已完成的作业信息
curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/finished-jobs/<STATUS>"
# 参数
参数名称 | 是否必传 | 参数类型 | 参数描述 |
---|---|---|---|
status | 可选 | string | finished job status. FINISHED,CANCELED,FAILED,UNKNOWABLE |
# 响应
[
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"errorMsg": null,
"createTime": "",
"finishTime": "",
"jobDag": "",
"metrics": ""
}
]
2
3
4
5
6
7
8
9
10
11
12
# 返回系统监控信息
curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/system-monitoring-information"
# 响应
[
{
"isMaster": "true",
"host": "localhost",
"port": "5801",
"processors":"8",
"physical.memory.total":"16.0G",
"physical.memory.free":"16.3M",
"swap.space.total":"0",
"swap.space.free":"0",
"heap.memory.used":"135.7M",
"heap.memory.free":"440.8M",
"heap.memory.total":"576.5M",
"heap.memory.max":"3.6G",
"heap.memory.used/total":"23.54%",
"heap.memory.used/max":"3.73%",
"minor.gc.count":"6",
"minor.gc.time":"110ms",
"major.gc.count":"2",
"major.gc.time":"73ms",
"load.process":"24.78%",
"load.system":"60.00%",
"load.systemAverage":"2.07",
"thread.count":"117",
"thread.peakCount":"118",
"cluster.timeDiff":"0",
"event.q.size":"0",
"executor.q.async.size":"0",
"executor.q.client.size":"0",
"executor.q.client.query.size":"0",
"executor.q.client.blocking.size":"0",
"executor.q.query.size":"0",
"executor.q.scheduled.size":"0",
"executor.q.io.size":"0",
"executor.q.system.size":"0",
"executor.q.operations.size":"0",
"executor.q.priorityOperation.size":"0",
"operations.completed.count":"10",
"executor.q.mapLoad.size":"0",
"executor.q.mapLoadAllKeys.size":"0",
"executor.q.cluster.size":"0",
"executor.q.response.size":"0",
"operations.running.count":"0",
"operations.pending.invocations.percentage":"0.00%",
"operations.pending.invocations.count":"0",
"proxy.count":"8",
"clientEndpoint.count":"0",
"connection.active.count":"2",
"client.connection.count":"0",
"connection.count":"0"
}
]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 下发任务
# 达梦数据库同步到clickhouse
curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/submit-job" \
-H "Content-Type: application/json" \
-d '{
"env": {
"parallelism": 4,
"job.mode": "batch"
},
"source": [
{
"url": "jdbc:dm://<DB_IP>:<DB_PORT>",
"driver": "dm.jdbc.driver.DmDriver",
"user": "<USERNAME>",
"password": "<PASSWORD>",
"plugin_name": "Jdbc",
"query": "SELECT * FROM DATABASE.TABLE"
}
],
"transform": [],
"sink": [
{
"plugin_name": "Clickhouse",
"host": "<CLICKHOUSE_HOST>:<CLICKHOUSE_PORT>",
"database": "DATABASE",
"table": "TABLE",
"username": "<USERNAME>",
"password": "<PASSWORD>"
}
]
}'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 将数据从达梦数据库同步到MySQL
curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/submit-job" \
-H "Content-Type: application/json" \
-d '{
"env": {
"parallelism": 4,
"job.mode": "batch"
},
"source": [
{
"url": "jdbc:dm://***.***.***.***:5236",
"driver": "dm.jdbc.driver.DmDriver",
"user": "****",
"password": "****",
"plugin_name": "Jdbc",
"query": "SELECT * FROM KLJC_YX.ST_CHANNEL_USER_SEX_AGE_OD_YMDH",
"split.key": "DAY_ID"
}
],
"transform": [],
"sink": [
{
"plugin_name": "Jdbc",
"driver": "com.mysql.cj.jdbc.Driver",
"url": "jdbc:mysql://***.***.***.***:3306/DATABASE",
"user": "****",
"password": "****",
"query": "INSERT INTO TABLE (ID, NAME, AGE, SEX, CREATE_TIME, UPDATE_TIME) VALUES (?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE NAME=VALUES(NAME), AGE=VALUES(AGE), SEX=VALUES(SEX), CREATE_TIME=VALUES(CREATE_TIME), UPDATE_TIME=VALUES(UPDATE_TIME);"
}
]
}'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 将数据同步到Oracle
这里需要注意Oracle的驱动,放在plugins/Jdbc/lib下,驱动版本需要和Oracle版本一致,如果出现一些连接错误可以尝试升级/降级驱动版本
curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/submit-job" \
-H "Content-Type: application/json" \
-d '{
"env": {
"parallelism": 4,
"job.mode": "batch"
},
"source": [
{
"url": "jdbc:dm://***.***.***.***:5236",
"driver": "dm.jdbc.driver.DmDriver",
"user": "****",
"password": "****",
"plugin_name": "Jdbc",
"query": "SELECT ID AS id, CHANNEL_ID AS name, DEST_CHANNEL_ID AS age, SEX_AGE_TYPE AS sex, CREATE_TIME AS create_time, UPDATE_TIME AS update_time FROM DATABASE.TABLE",
"split.key": "create_time"
}
],
"transform": [],
"sink": [
{
"plugin_name": "Jdbc",
"driver": "oracle.jdbc.OracleDriver",
"url": "jdbc:oracle:thin:@***.***.***.***:49161:XE",
"user": "****",
"password": "****",
"query": "MERGE INTO DATABASE.TABLE target USING (SELECT ? AS id, ? AS name, ? AS age, ? AS sex, SYSDATE AS create_time, SYSDATE AS update_time FROM dual) source ON (target.id = source.id) WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age, target.sex = source.sex, target.update_time = source.update_time WHEN NOT MATCHED THEN INSERT (id, name, age, sex, create_time, update_time) VALUES (source.id, source.name, source.age, source.sex, source.create_time, source.update_time)"
}
]
}'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 响应
{
"jobId": 733584788375666689,
"jobName": "rest_api_test"
}
2
3
4
# 批量下发任务
curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/submit-jobs" \
-H "Content-Type: application/json" \
-d '[
{
"params":{
"jobId":"123456",
"jobName":"SeaTunnel-01"
},
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 1000,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
},
{
"params":{
"jobId":"1234567",
"jobName":"SeaTunnel-02"
},
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 1000,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
}
]'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# 响应
[
{
"jobId": "123456",
"jobName": "SeaTunnel-01"
},{
"jobId": "1234567",
"jobName": "SeaTunnel-02"
}
]
2
3
4
5
6
7
8
9
# 停止任务
curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/stop-job" \
-H "Content-Type: application/json" \
-d '{
"jobId": 733584788375666689,
"isStopWithSavePoint": false # if job is stopped with save point
}'
2
3
4
5
6
# 响应
{
"jobId": 733584788375666689
}
2
3
# 批量停止任务
curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/stop-jobs" \
-H "Content-Type: application/json" \
-d '
[
{
"jobId": 914020933369528379,
"isStopWithSavePoint": false
},
{
"jobId": 914020933369462843,
"isStopWithSavePoint": false
},
{
"jobId": 914021513472376888,
"isStopWithSavePoint": false
},
{
"jobId": 914021736243134522,
"isStopWithSavePoint": false
}
]'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 响应
[
{
"jobId": 881432421482889220
},
{
"jobId": 881432456517910529
}
]
2
3
4
5
6
7
8
# 加密配置
curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/encrypt-config" \
-H "Content-Type: application/json" \
-d '{
"env": {
"parallelism": 1,
"shade.identifier":"base64"
},
"source": [
{
"plugin_name": "MySQL-CDC",
"schema" : {
"fields": {
"name": "string",
"age": "int"
}
},
"result_table_name": "fake",
"parallelism": 1,
"hostname": "127.0.0.1",
"username": "seatunnel",
"password": "seatunnel_password",
"table-name": "inventory_vwyw0n"
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Clickhouse",
"host": "localhost:8123",
"database": "default",
"table": "fake_all",
"username": "seatunnel",
"password": "seatunnel_password"
}
]
}'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 响应
{
"env": {
"parallelism": 1,
"shade.identifier": "base64"
},
"source": [
{
"plugin_name": "MySQL-CDC",
"schema": {
"fields": {
"name": "string",
"age": "int"
}
},
"result_table_name": "fake",
"parallelism": 1,
"hostname": "127.0.0.1",
"username": "c2VhdHVubmVs",
"password": "c2VhdHVubmVsX3Bhc3N3b3Jk",
"table-name": "inventory_vwyw0n"
}
],
"transform": [],
"sink": [
{
"plugin_name": "Clickhouse",
"host": "localhost:8123",
"database": "default",
"table": "fake_all",
"username": "c2VhdHVubmVs",
"password": "c2VhdHVubmVsX3Bhc3N3b3Jk"
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 更新运行节点的tags
因为更新只能针对于某个节点,因此需要用当前节点ip:port用于更新(如果更新成功,则返回"success"信息)
# 更新节点tags
# 请求体
如果请求参数是 Map
对象,表示要更新当前节点的 tags。
{
"tag1": "dev_1",
"tag2": "dev_2"
}
2
3
4
# 响应
如果更新成功,返回以下响应:
{
"status": "success",
"message": "update node tags done."
}
2
3
4
# 移除节点tags
# 请求体
如果参数为空 Map
对象,表示要清除当前节点的 tags。
{}
# 响应
如果移除成功,返回以下响应:
{
"status": "success",
"message": "update node tags done."
}
2
3
4
# 请求参数异常
- 如果请求参数为空:
{
"status": "fail",
"message": "Request body is empty."
}
2
3
4
- 如果参数不是
Map
对象:
{
"status": "fail",
"message": "Invalid JSON format in request body."
}
2
3
4