一、前言
最近在使用datax同步数据表,由于好多个表,一开始每次都需要等一个执行完再执行下一个,这样明显效率很低,于是写了个Java方法来批量操作;
环境: linux
服务器
二、Java代码
1 | import java.io.BufferedReader; |
2 | import java.io.File; |
3 | import java.io.FileOutputStream; |
4 | import java.io.InputStreamReader; |
5 | import java.text.SimpleDateFormat; |
6 | import java.util.Date; |
7 | |
8 | /** |
9 | * @Author: Huang JX |
10 | * @Date: 2021/11/11 |
11 | * @Description: datax 批量执行脚本,日志存放在当前目录的 DataxSyncLogs 中; |
12 | * 注意:需要输入 datax 脚本的文件夹路径作为 main 函数的参数,如 home/xxx/datax/job/ |
13 | * <p> |
14 | * 使用说明: |
15 | * 1、去掉文件的包名package; |
16 | * 2、将此Java文件放到 /datax/bin/目录下; |
17 | * 3、javac DataxSyncUtil.java |
18 | * 4、java DataxSyncUtil [参数]datax脚本文件夹路径 |
19 | */ |
20 | public class DataxSyncUtil { |
21 | public static void main(String[] args) throws Exception { |
22 | FileOutputStream out = null; |
23 | StringBuffer sb = new StringBuffer(); |
24 | SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); |
25 | try { |
26 | String jobPath = args[0]; |
27 | if (jobPath == null && "".equals(jobPath.trim())) { |
28 | throw new Exception("请输入datax脚本文件夹路径"); |
29 | } |
30 | String logDir = jobPath + "DataxSyncLogs"; |
31 | File logDirFile = new File(logDir); |
32 | if (!logDirFile.exists()) { |
33 | logDirFile.mkdir(); // 创建日志文件夹 |
34 | } |
35 | String logFileName = logDir + "/datax_sync_" + sdf.format(new Date()) + ".log"; |
36 | File file = new File(logFileName); |
37 | if (!file.exists()) |
38 | file.createNewFile(); |
39 | out = new FileOutputStream(file, true); |
40 | sb.append("==========================================\n"); |
41 | sb.append("==========Datax Sync Job Start!==========\n"); |
42 | sb.append("==========================================\n"); |
43 | File f = new File(jobPath); |
44 | if (!f.exists()) { |
45 | sb.append(jobPath + " not exists\n"); |
46 | return; |
47 | } |
48 | System.out.print(sb.toString()); |
49 | out.write(sb.toString().getBytes("utf-8")); |
50 | String cmdStr; |
51 | File fa[] = f.listFiles(); |
52 | for (int i = 0; i < fa.length; i++) { |
53 | sb = new StringBuffer(); |
54 | File fs = fa[i]; |
55 | if (!fs.isDirectory()) { |
56 | Process pr = null; |
57 | cmdStr = "python3 datax.py " + jobPath + fs.getName(); |
58 | sb.append("start cmd: " + cmdStr + "\n"); |
59 | System.out.print("start cmd: " + cmdStr + "\n"); |
60 | pr = Runtime.getRuntime().exec(cmdStr); |
61 | BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream())); |
62 | String line; |
63 | while (true) { |
64 | if (!((line = in.readLine()) != null)) break; |
65 | sb.append(line + "\n"); |
66 | System.out.print(line + "\n"); |
67 | } |
68 | in.close(); |
69 | pr.waitFor(); |
70 | out.write(sb.toString().getBytes("utf-8")); |
71 | } |
72 | } |
73 | sb.append("==========================================\n"); |
74 | sb.append("===========Datax Sync Job End!===========\n"); |
75 | sb.append("==========================================\n"); |
76 | System.out.print(sb.toString()); |
77 | out.write(sb.toString().getBytes("utf-8")); |
78 | } finally { |
79 | out.flush(); |
80 | out.close(); |
81 | } |
82 | } |
83 | } |
【注意】看一下注释的说明,将job的文件路径做为main函数的参数;
【更新记录】
1、2021-12-06 更新写日志方式,每跑完一个任务就写日志到文件中;
三、使用说明
- 把
DataxSyncUtil
拷到服务器中datax的bin目录下,如/home/xxx/datax/bin/
,注意去除Java类的包名; - 编译Java程序,
javac DataxSyncUtil.java
- 执行Java程序,传入参数为放置datax脚本的文件夹,如
java DataxSyncUtil /home/xxx/datax/job/
- 如果想在后台运行,则为
nohup java DataxSyncUtil /home/xxx/datax/job/ &