Kubesphere创建Seatunnel集群

11/22/2024 kuberneteskubesphereSeatunnelkylin

通过kubekey安装好kubernetes集群后,继续安装seatunnel 直接根据seatunnel的官方文档 (opens new window)一步一步安装即可 安装好后通过kubectl get pods -A查看 image 然后我需要从达梦数据库同步数据到clickhouse中 首先需要安装驱动

数据库类型 驱动类名 JDBC URL 驱动下载地址
dm dm.jdbc.driver.DmDriver jdbc:dm://localhost:5236 https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18

然后根据文档内容放置

# Tip

Warn: for license compliance, you have to provide the database driver yourself. Copy it to the $SEATNUNNEL_HOME/lib/ directory in order to make it work.

e.g.

  • If you use MySQL, download and copy mysql-connector-java-xxx.jar to $SEATNUNNEL_HOME/lib/.
  • For Spark/Flink, also copy it to $SPARK_HOME/jars/ or $FLINK_HOME/lib/.

我这里即放在了lib下又放在了plugins/Jdbc/lib下 然后还需要加入seatunnel-hadoop3-3.1.4-uber-2.3.1-optional.jar (opens new window)

参考:https://github.com/apache/seatunnel/issues/4454

放在lib下即可

之后通过restapi下发任务就行

# 返回Zeta集群的概览

返回Zeta引擎集群的概览信息。

curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/overview?tag1=value1&tag2=value2"
1

# 参数

参数名称 是否必传 参数类型 参数描述
tag键值对 字符串 一组标签值,通过该标签值过滤满足条件的节点信息。

# 响应

{
    "projectVersion": "2.3.5-SNAPSHOT",
    "gitCommitAbbrev": "DeadD0d0",
    "totalSlot": "0",
    "unassignedSlot": "0",
    "works": "1",
    "runningJobs": "0",
    "finishedJobs": "0",
    "failedJobs": "0",
    "cancelledJobs": "0"
}
1
2
3
4
5
6
7
8
9
10
11

# 注意事项

  • 当使用 dynamic-slot 时,返回结果中的 totalSlotunassignedSlot 将始终为0。设置为固定的slot值后,集群中总的slot数量和未分配的slot数量将正确返回。
  • 添加标签过滤后,workstotalSlotunassignedSlot 将返回满足条件的节点的相关指标。注意,runningJobs 等与作业相关的指标为集群级别的结果,无法根据标签进行过滤。

# 返回当前节点的线程堆栈信息

curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/thread-dump"
1

# 响应

[
  {
    "threadName": "",
    "threadId": 0,
    "threadState": "",
    "stackTrace": ""
  }
]
1
2
3
4
5
6
7
8

# 查询当前运行的任务

curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/running-jobs"
1

# 响应

[
  {
    "jobId": "",
    "jobName": "",
    "jobStatus": "",
    "envOptions": {
    },
    "createTime": "",
    "jobDag": {
      "vertices": [
      ],
      "edges": [
      ]
    },
    "pluginJarsUrls": [
    ],
    "isStartWithSavePoint": false,
    "metrics": {
      "sourceReceivedCount": "",
      "sinkWriteCount": ""
    }
  }
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 查询任务详情

curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/running-job/<JOB_ID>"
1

# 参数

参数名称 是否必传 参数类型 参数描述
jobId long job id

# 响应

{
  "jobId": "",
  "jobName": "",
  "jobStatus": "",
  "createTime": "",
  "jobDag": {
    "vertices": [
    ],
    "edges": [
    ]
  },
  "metrics": {
    "SourceReceivedCount": "",
    "SourceReceivedQPS": "",
    "SourceReceivedBytes": "",
    "SourceReceivedBytesPerSeconds": "",
    "SinkWriteCount": "",
    "SinkWriteQPS": "",
    "SinkWriteBytes": "",
    "SinkWriteBytesPerSeconds": "",
    "TableSourceReceivedCount": {},
    "TableSourceReceivedBytes": {},
    "TableSourceReceivedBytesPerSeconds": {},
    "TableSourceReceivedQPS": {},
    "TableSinkWriteCount": {},
    "TableSinkWriteQPS": {},
    "TableSinkWriteBytes": {},
    "TableSinkWriteBytesPerSeconds": {}
  },
  "finishedTime": "",
  "errorMsg": null,
  "envOptions": {
  },
  "pluginJarsUrls": [
  ],
  "isStartWithSavePoint": false
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

jobId, jobName, jobStatus, createTime, jobDag, metrics 字段总会返回. envOptions, pluginJarsUrls, isStartWithSavePoint 字段在Job在RUNNING状态时会返回 finishedTime, errorMsg 字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL

当我们查询不到这个Job时,返回结果为:

{
  "jobId" : ""
}
1
2
3

# 返回所有已完成的作业信息

curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/finished-jobs/<STATUS>"
1

# 参数

参数名称 是否必传 参数类型 参数描述
status 可选 string finished job status. FINISHED,CANCELED,FAILED,UNKNOWABLE

# 响应

[
  {
    "jobId": "",
    "jobName": "",
    "jobStatus": "",
    "errorMsg": null,
    "createTime": "",
    "finishTime": "",
    "jobDag": "",
    "metrics": ""
  }
]
1
2
3
4
5
6
7
8
9
10
11
12

# 返回系统监控信息

curl -X GET "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/system-monitoring-information"
1

# 响应

[
  {
    "isMaster": "true",
    "host": "localhost",
    "port": "5801",
    "processors":"8",
    "physical.memory.total":"16.0G",
    "physical.memory.free":"16.3M",
    "swap.space.total":"0",
    "swap.space.free":"0",
    "heap.memory.used":"135.7M",
    "heap.memory.free":"440.8M",
    "heap.memory.total":"576.5M",
    "heap.memory.max":"3.6G",
    "heap.memory.used/total":"23.54%",
    "heap.memory.used/max":"3.73%",
    "minor.gc.count":"6",
    "minor.gc.time":"110ms",
    "major.gc.count":"2",
    "major.gc.time":"73ms",
    "load.process":"24.78%",
    "load.system":"60.00%",
    "load.systemAverage":"2.07",
    "thread.count":"117",
    "thread.peakCount":"118",
    "cluster.timeDiff":"0",
    "event.q.size":"0",
    "executor.q.async.size":"0",
    "executor.q.client.size":"0",
    "executor.q.client.query.size":"0",
    "executor.q.client.blocking.size":"0",
    "executor.q.query.size":"0",
    "executor.q.scheduled.size":"0",
    "executor.q.io.size":"0",
    "executor.q.system.size":"0",
    "executor.q.operations.size":"0",
    "executor.q.priorityOperation.size":"0",
    "operations.completed.count":"10",
    "executor.q.mapLoad.size":"0",
    "executor.q.mapLoadAllKeys.size":"0",
    "executor.q.cluster.size":"0",
    "executor.q.response.size":"0",
    "operations.running.count":"0",
    "operations.pending.invocations.percentage":"0.00%",
    "operations.pending.invocations.count":"0",
    "proxy.count":"8",
    "clientEndpoint.count":"0",
    "connection.active.count":"2",
    "client.connection.count":"0",
    "connection.count":"0"
  }
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# 下发任务

# 达梦数据库同步到clickhouse

curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/submit-job" \
    -H "Content-Type: application/json" \
    -d '{
        "env": {
            "parallelism": 4,
            "job.mode": "batch"
        },
        "source": [
            {
                "url": "jdbc:dm://<DB_IP>:<DB_PORT>",
                "driver": "dm.jdbc.driver.DmDriver",
                "user": "<USERNAME>",
                "password": "<PASSWORD>",
                "plugin_name": "Jdbc",
                "query": "SELECT * FROM DATABASE.TABLE"
            }
        ],
        "transform": [],
        "sink": [
            {
                "plugin_name": "Clickhouse",
                "host": "<CLICKHOUSE_HOST>:<CLICKHOUSE_PORT>",
                "database": "DATABASE",
                "table": "TABLE",
                "username": "<USERNAME>",
                "password": "<PASSWORD>"
            }
        ]
    }'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 将数据从达梦数据库同步到MySQL

curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/submit-job" \
-H "Content-Type: application/json" \
-d '{
    "env": {
        "parallelism": 4,
        "job.mode": "batch"
    },
    "source": [
        {
            "url": "jdbc:dm://***.***.***.***:5236",
            "driver": "dm.jdbc.driver.DmDriver",
            "user": "****",
            "password": "****",
            "plugin_name": "Jdbc",
            "query": "SELECT * FROM KLJC_YX.ST_CHANNEL_USER_SEX_AGE_OD_YMDH",
            "split.key": "DAY_ID"
        }
    ],
    "transform": [],
    "sink": [
        {
            "plugin_name": "Jdbc",
            "driver": "com.mysql.cj.jdbc.Driver",
            "url": "jdbc:mysql://***.***.***.***:3306/DATABASE",
            "user": "****",
            "password": "****",
            "query": "INSERT INTO TABLE (ID, NAME, AGE, SEX, CREATE_TIME, UPDATE_TIME) VALUES (?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE NAME=VALUES(NAME), AGE=VALUES(AGE), SEX=VALUES(SEX), CREATE_TIME=VALUES(CREATE_TIME), UPDATE_TIME=VALUES(UPDATE_TIME);"
        }
    ]
}'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 将数据同步到Oracle

这里需要注意Oracle的驱动,放在plugins/Jdbc/lib下,驱动版本需要和Oracle版本一致,如果出现一些连接错误可以尝试升级/降级驱动版本

curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/submit-job" \
-H "Content-Type: application/json" \
-d '{
    "env": {
        "parallelism": 4,
        "job.mode": "batch"
    },
    "source": [
        {
            "url": "jdbc:dm://***.***.***.***:5236",
            "driver": "dm.jdbc.driver.DmDriver",
            "user": "****",
            "password": "****",
            "plugin_name": "Jdbc",
            "query": "SELECT ID AS id, CHANNEL_ID AS name, DEST_CHANNEL_ID AS age, SEX_AGE_TYPE AS sex, CREATE_TIME AS create_time, UPDATE_TIME AS update_time FROM DATABASE.TABLE",
            "split.key": "create_time"
        }
    ],
    "transform": [],
    "sink": [
        {
            "plugin_name": "Jdbc",
            "driver": "oracle.jdbc.OracleDriver",
            "url": "jdbc:oracle:thin:@***.***.***.***:49161:XE",
            "user": "****",
            "password": "****",
            "query": "MERGE INTO DATABASE.TABLE target USING (SELECT ? AS id, ? AS name, ? AS age, ? AS sex, SYSDATE AS create_time, SYSDATE AS update_time FROM dual) source ON (target.id = source.id) WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age, target.sex = source.sex, target.update_time = source.update_time WHEN NOT MATCHED THEN INSERT (id, name, age, sex, create_time, update_time) VALUES (source.id, source.name, source.age, source.sex, source.create_time, source.update_time)"
        }
    ]
}'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 响应

{
    "jobId": 733584788375666689,
    "jobName": "rest_api_test"
}
1
2
3
4

# 批量下发任务

curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/submit-jobs" \
-H "Content-Type: application/json" \
-d '[
  {
    "params":{
      "jobId":"123456",
      "jobName":"SeaTunnel-01"
    },
    "env": {
      "job.mode": "batch"
    },
    "source": [
      {
        "plugin_name": "FakeSource",
        "result_table_name": "fake",
        "row.num": 1000,
        "schema": {
          "fields": {
            "name": "string",
            "age": "int",
            "card": "int"
          }
        }
      }
    ],
    "transform": [
    ],
    "sink": [
      {
        "plugin_name": "Console",
        "source_table_name": ["fake"]
      }
    ]
  },
  {
    "params":{
      "jobId":"1234567",
      "jobName":"SeaTunnel-02"
    },
    "env": {
      "job.mode": "batch"
    },
    "source": [
      {
        "plugin_name": "FakeSource",
        "result_table_name": "fake",
        "row.num": 1000,
        "schema": {
          "fields": {
            "name": "string",
            "age": "int",
            "card": "int"
          }
        }
      }
    ],
    "transform": [
    ],
    "sink": [
      {
        "plugin_name": "Console",
        "source_table_name": ["fake"]
      }
    ]
  }
]'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

# 响应

[
  {
    "jobId": "123456",
    "jobName": "SeaTunnel-01"
  },{
    "jobId": "1234567",
    "jobName": "SeaTunnel-02"
  }
]
1
2
3
4
5
6
7
8
9

# 停止任务

curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/stop-job" \
-H "Content-Type: application/json" \
-d '{
    "jobId": 733584788375666689,
    "isStopWithSavePoint": false # if job is stopped with save point
}'
1
2
3
4
5
6

# 响应

{
    "jobId": 733584788375666689
}
1
2
3

# 批量停止任务

curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/stop-jobs" \
-H "Content-Type: application/json" \
-d '
[
  {
    "jobId": 914020933369528379,
    "isStopWithSavePoint": false
  },
  {
    "jobId": 914020933369462843,
    "isStopWithSavePoint": false
  },
  {
    "jobId": 914021513472376888,
    "isStopWithSavePoint": false
  },
  {
    "jobId": 914021736243134522,
    "isStopWithSavePoint": false
  }
]'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 响应

[
  {
    "jobId": 881432421482889220
  },
  {
    "jobId": 881432456517910529
  }
]
1
2
3
4
5
6
7
8

# 加密配置

curl -X POST "http://<IP_ADDRESS>:<PORT>/hazelcast/rest/maps/encrypt-config" \
-H "Content-Type: application/json" \
-d '{
    "env": {
        "parallelism": 1,
        "shade.identifier":"base64"
    },
    "source": [
        {
            "plugin_name": "MySQL-CDC",
            "schema" : {
                "fields": {
                    "name": "string",
                    "age": "int"
                }
            },
            "result_table_name": "fake",
            "parallelism": 1,
            "hostname": "127.0.0.1",
            "username": "seatunnel",
            "password": "seatunnel_password",
            "table-name": "inventory_vwyw0n"
        }
    ],
    "transform": [
    ],
    "sink": [
        {
            "plugin_name": "Clickhouse",
            "host": "localhost:8123",
            "database": "default",
            "table": "fake_all",
            "username": "seatunnel",
            "password": "seatunnel_password"
        }
    ]
}'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 响应

{
    "env": {
        "parallelism": 1,
        "shade.identifier": "base64"
    },
    "source": [
        {
            "plugin_name": "MySQL-CDC",
            "schema": {
                "fields": {
                    "name": "string",
                    "age": "int"
                }
            },
            "result_table_name": "fake",
            "parallelism": 1,
            "hostname": "127.0.0.1",
            "username": "c2VhdHVubmVs",
            "password": "c2VhdHVubmVsX3Bhc3N3b3Jk",
            "table-name": "inventory_vwyw0n"
        }
    ],
    "transform": [],
    "sink": [
        {
            "plugin_name": "Clickhouse",
            "host": "localhost:8123",
            "database": "default",
            "table": "fake_all",
            "username": "c2VhdHVubmVs",
            "password": "c2VhdHVubmVsX3Bhc3N3b3Jk"
        }
    ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 更新运行节点的tags

因为更新只能针对于某个节点,因此需要用当前节点ip:port用于更新(如果更新成功,则返回"success"信息)

# 更新节点tags

# 请求体

如果请求参数是 Map 对象,表示要更新当前节点的 tags。

{
  "tag1": "dev_1",
  "tag2": "dev_2"
}
1
2
3
4
# 响应

如果更新成功,返回以下响应:

{
  "status": "success",
  "message": "update node tags done."
}
1
2
3
4

# 移除节点tags

# 请求体

如果参数为空 Map 对象,表示要清除当前节点的 tags。

{}
1
# 响应

如果移除成功,返回以下响应:

{
  "status": "success",
  "message": "update node tags done."
}
1
2
3
4

# 请求参数异常

  • 如果请求参数为空:
{
  "status": "fail",
  "message": "Request body is empty."
}
1
2
3
4
  • 如果参数不是 Map 对象:
{
  "status": "fail",
  "message": "Invalid JSON format in request body."
}
1
2
3
4