时间:2022-08-05 11:43:09 | 栏目:JAVA代码 | 点击:次
RsyncFile.java
import lombok.NoArgsConstructor; import lombok.SneakyThrows; import java.io.*; import java.util.ArrayList; import java.util.Date; import java.util.concurrent.*; /** * @ClassName RsyncFile * @Descriptiom TODO rsync多线程同步迁移数据 * @Author KING * @Date 2019/11/25 09:17 * @Version 1.2.2 * rsync -vzrtopg --progress --delete //镜像同步 **/ @NoArgsConstructor public class RsyncFile implements Runnable{ private static final int availProcessors = Runtime.getRuntime().availableProcessors(); //构造以cpu核心数为核心池,cpu线程数为最大池,超时时间为1s,线程队列为大小为无界的安全阻塞线程队列,拒绝策略为DiscardOldestPolicy()的线程池。(同步数据当然不能丢下拒绝任务) private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1, availProcessors, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); //保存扫描得到的文件列表 private static ArrayList<String> fileNameList = new ArrayList<String>(); private String shellname; private String filename; private String userip; private CountDownLatch countDownLatch; private static int deep = 0; public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) { this.shellname = ShellName; this.filename = filename; this.userip = UserIP; this.countDownLatch = countDownLatch; } public static void main(String[] args) { try { new RsyncFile().Do(args[0],args[1],Integer.parseInt(args[2])); }catch (ArrayIndexOutOfBoundsException e){ System.out.println(e); System.out.println("Error , args send fault"); System.out.println("please send localAddress remote username @ remote IP or hostname and catalogue"); System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]"); }catch(NumberFormatException e1){ System.out.println(e1); System.out.println("please input Right Directory depth, this number must be int"); System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]"); } } @SneakyThrows private void Do(String content,String UserIP,int setdeep){ System.out.println("开始执行"); System.out.println("开始时间:" + new Date()); Long a = System.nanoTime(); File file = new File(content); System.out.println("开始扫描本地指定目录"); GetAllFile(file,setdeep);//按深度扫描非空文件夹和文件 System.out.println("扫描本地目录完成"); //给脚本赋予权限 String [] cmd={"/bin/sh","-c","chmod 755 ./do*"}; Runtime.getRuntime().exec(cmd); //创建远端目录操作 System.out.println("开始创建远端目录结构"); //一次计数锁用于保证目录创建完成 CountDownLatch doDirLock = new CountDownLatch(1); ThreadPool.execute(new RsyncFile("./doDirc.sh",content,UserIP,doDirLock)); doDirLock.await(); System.out.println("创建远端目录结构完成"); //开始同步工作 System.out.println("开始执行同步工作"); System.out.println("同步的文件夹或文件总数: " + fileNameList.size()); System.out.println("正在同步。。。。。"); //fileNameList.size()次计数锁用于保证数据同步完成(保证计时准确) CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size()); System.out.println(fileNameList.size()); for (String fileName:fileNameList) { //除去文件名中与UserIP重复的文件路径 String RemoteDir = UserIP.concat(fileName.replace(content, "")); System.out.println("要同步的本地目录或文件: " + fileName); System.out.println("要同步的远端目录或文件: " + RemoteDir); ThreadPool.execute(new RsyncFile("./doRync.sh",fileName, RemoteDir,rsyncLock)); } rsyncLock.await(); System.out.println("执行同步工作完成"); //开始文件校验工作 System.out.println("执行文件校验及重传"); //一次计数锁用于保证校验完成 CountDownLatch chechSumLock = new CountDownLatch(1); ThreadPool.execute(new RsyncFile("./doChecksum.sh",content,UserIP,chechSumLock)); chechSumLock.await(); System.out.println("文件校验及重传完成"); ThreadPool.shutdown(); Long b = System.nanoTime(); Long aLong = (b - a)/1000000L; System.out.println("处理时间" + aLong + "ms"); System.out.println("结束时间:" + new Date()); } /** * 执行rsync脚本的线程方法,使用PrintWriter来与linux Terminal交互 */ @Override public void run() { try { String command=shellname.concat(" ").concat(filename).concat(" ").concat(userip); File wd = new File("/bin"); Process process = null; process = Runtime.getRuntime().exec("/bin/bash", null, wd); if (process != null) { InputStream is = process.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())), true); //切换到当前class文件所在目录 out.println("cd " + System.getProperty("user.dir")); out.println(command); out.println("exit"); StringBuilder sb = new StringBuilder(); String line; while ((line = reader.readLine()) != null) { sb.append(line + System.lineSeparator()); } process.waitFor(); reader.close(); out.close(); process.destroy(); System.out.println("result:" + sb.toString()); }else { System.out.println("找不到系统bash工具,请检查系统是否异常,并为系统创建/bin/sh的bash工具软连接"); } } catch (Exception e) { System.err.println(e.getMessage()); }finally { //倒记数锁释放一次 countDownLatch.countDown(); } } /**遍历指定的目录并能指定深度 * @param file 指定要遍历的目录 * @param setDeep 设定遍历深度 */ @SneakyThrows private static void GetAllFile(File file, int setDeep) { if(file != null){ if(file.isDirectory() && deep<setDeep){ deep++; File f[] = file.listFiles(); if(f != null) { int length = f.length; for(int i = 0; i < length; i++) GetAllFile(f[i],setDeep); deep--; } } else { if(file.isDirectory()){ //如果为目录末尾添加 / 保证rsync正常处理 fileNameList.add(file.getAbsolutePath().concat("/")); }else { fileNameList.add(file.getAbsolutePath()); } } } } }
doDir.sh
rsync -av --include='*/' --exclude='*' $1 $2 |tee -a /tmp/rsync.log 2>&1 echo "创建目录结构操作"
doRsync.sh
rsync -avzi --stats --progress $1 $2 |tee -a /tmp/rsync.log 2>&1
doChecksum.sh
rsync -acvzi --stats --progress $1 $2 |tee -a /tmp/checksum.log 2>&1
rsync输出日志说明如下
YXcstpoguax path/to/file ||||||||||| ||||||||||?t- x: The extended attribute information changed |||||||||?t-- a: The ACL information changed ||||||||?t--- u: The u slot is reserved for future use |||||||?t---- g: Group is different ||||||?t----- o: Owner is different |||||?t------ p: Permission are different ||||?t------- t: Modification time is different |||?t-------- s: Size is different ||?t--------- c: Different checksum (for regular files), or || changed value (for symlinks, devices, and special files) |?t---------- the file type: | f: for a file, | d: for a directory, | L: for a symlink, | D: for a device, | S: for a special file (e.g. named sockets and fifos) ?t----------- the type of update being done:: <: file is being transferred to the remote host (sent) >: file is being transferred to the local host (received) c: local change/creation for the item, such as: - the creation of a directory - the changing of a symlink, - etc. h: the item is a hard link to another item (requires --hard-links). .: the item is not being updated (though it might have attributes that are being modified) *: means that the rest of the itemized-output area contains a message (e.g. "deleting")