一、前言

最近在使用datax同步数据表,由于好多个表,一开始每次都需要等一个执行完再执行下一个,这样明显效率很低,于是写了个Java方法来批量操作;
环境: linux服务器

二、Java代码

1
import java.io.BufferedReader;
2
import java.io.File;
3
import java.io.FileOutputStream;
4
import java.io.InputStreamReader;
5
import java.text.SimpleDateFormat;
6
import java.util.Date;
7
8
/**
9
 * @Author: Huang JX
10
 * @Date: 2021/11/11
11
 * @Description: datax 批量执行脚本,日志存放在当前目录的 DataxSyncLogs 中;
12
 * 注意:需要输入 datax 脚本的文件夹路径作为 main 函数的参数,如 home/xxx/datax/job/
13
 * <p>
14
 * 使用说明:
15
 * 1、去掉文件的包名package;
16
 * 2、将此Java文件放到 /datax/bin/目录下;
17
 * 3、javac DataxSyncUtil.java 
18
 * 4、java DataxSyncUtil [参数]datax脚本文件夹路径
19
 */
20
public class DataxSyncUtil {
21
    public static void main(String[] args) throws Exception {
22
        FileOutputStream out = null;
23
        StringBuffer sb = new StringBuffer();
24
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
25
        try {
26
            String jobPath = args[0];
27
            if (jobPath == null && "".equals(jobPath.trim())) {
28
                throw new Exception("请输入datax脚本文件夹路径");
29
            }
30
            String logDir = jobPath + "DataxSyncLogs";
31
            File logDirFile = new File(logDir);
32
            if (!logDirFile.exists()) {
33
                logDirFile.mkdir(); // 创建日志文件夹
34
            }
35
            String logFileName = logDir + "/datax_sync_" + sdf.format(new Date()) + ".log";
36
            File file = new File(logFileName);
37
            if (!file.exists())
38
                file.createNewFile();
39
            out = new FileOutputStream(file, true);
40
            sb.append("==========================================\n");
41
            sb.append("==========Datax Sync Job Start!==========\n");
42
            sb.append("==========================================\n");
43
            File f = new File(jobPath);
44
            if (!f.exists()) {
45
                sb.append(jobPath + " not exists\n");
46
                return;
47
            }
48
            System.out.print(sb.toString());
49
            out.write(sb.toString().getBytes("utf-8"));
50
            String cmdStr;
51
            File fa[] = f.listFiles();
52
            for (int i = 0; i < fa.length; i++) {
53
                sb = new StringBuffer();
54
                File fs = fa[i];
55
                if (!fs.isDirectory()) {
56
                    Process pr = null;
57
                    cmdStr = "python3 datax.py " + jobPath + fs.getName();
58
                    sb.append("start cmd: " + cmdStr + "\n");
59
                    System.out.print("start cmd: " + cmdStr + "\n");
60
                    pr = Runtime.getRuntime().exec(cmdStr);
61
                    BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
62
                    String line;
63
                    while (true) {
64
                        if (!((line = in.readLine()) != null)) break;
65
                        sb.append(line + "\n");
66
                        System.out.print(line + "\n");
67
                    }
68
                    in.close();
69
                    pr.waitFor();
70
                    out.write(sb.toString().getBytes("utf-8"));
71
                }
72
            }
73
            sb.append("==========================================\n");
74
            sb.append("===========Datax Sync Job End!===========\n");
75
            sb.append("==========================================\n");
76
            System.out.print(sb.toString());
77
            out.write(sb.toString().getBytes("utf-8"));
78
        } finally {
79
            out.flush();
80
            out.close();
81
        }
82
    }
83
}

【注意】看一下注释的说明,将job的文件路径做为main函数的参数;

【更新记录】
1、2021-12-06 更新写日志方式,每跑完一个任务就写日志到文件中;

三、使用说明

  1. DataxSyncUtil拷到服务器中datax的bin目录下,如/home/xxx/datax/bin/,注意去除Java类的包名;
  2. 编译Java程序,javac DataxSyncUtil.java
  3. 执行Java程序,传入参数为放置datax脚本的文件夹,如java DataxSyncUtil /home/xxx/datax/job/
  4. 如果想在后台运行,则为nohup java DataxSyncUtil /home/xxx/datax/job/ &