深入解析commons-compress压缩神器

简介

Apache Commons Compress是Apache Commons的子项目,是一个专门用于压缩java工具库,我是传送门。当前最新稳定版本为1.2,maven依赖为:

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-compress</artifactId>
            <version>1.20</version>
        </dependency>
复制代码

并发压缩

示例代码

public static void compressFileList(String zipOutName, List<String> fileNameList) throws IOException, ExecutionException, InterruptedException {
        ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("compressFileList-pool-").build();
        ExecutorService executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20), factory);
        ParallelScatterZipCreator parallelScatterZipCreator = new ParallelScatterZipCreator(executor);
        OutputStream outputStream = new FileOutputStream(zipOutName);
        ZipArchiveOutputStream zipArchiveOutputStream = new ZipArchiveOutputStream(outputStream);
        //将每个文件的输入流组成ZipArchiveEntry提交给parallelScatterZipcreator执行
        zipArchiveOutputStream.setEncoding("UTF-8");
        for (String fileName : fileNameList) {
            File inFile = new File(fileName);
            final InputStreamSupplier inputStreamSupplier = () -> {
                try {
                    return new FileInputStream(inFile);
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                    return new NullInputStream(0);
                }
            };
            ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(inFile.getName());
            zipArchiveEntry.setMethod(ZipArchiveEntry.DEFLATED);
            zipArchiveEntry.setSize(inFile.length());
            zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 436);
            parallelScatterZipCreator.addArchiveEntry(zipArchiveEntry, inputStreamSupplier);
        }
        parallelScatterZipCreator.writeTo(zipArchiveOutputStream);
        zipArchiveOutputStream.close();
        outputStream.close();
        log.info("ParallelCompressUtil->ParallelCompressUtil-> info:{}", JSONObject.toJSONString(parallelScatterZipCreator.getStatisticsMessage()));
    }
复制代码

ParallelScatterZipCreator

这类就是并行压缩的核心处理类,看看源码:

image.png
可以看到这个类并行处理多文件压缩的方式还是使用线程池提交队列消费进行压缩,在将压缩后的输出合并写入到输出文件。继续往下看

image.png
我们可以看到这个类的构造方法,三个参数分别为:
第一个是会创建一个核心线程数和最大线程数均为当前系统可用资源,这里指的是全部哦,所以这里不推荐使用该类自建的线程池,因为这样会在压缩过程中占用系统全部资源,对造成其他程序造成影响。而且线程池会在最终压缩完成时shutdown。
第二个是中间存储,可以理解为在压缩过程中生成的临时文件,最终完成压缩时会删除,这里用默认即可。
第三个是压缩等级,使用的是java.util.zip包下面的Deflater算法,共有:

image.png
默认使用default compression level,这里可以根据需求自行选择。
最终是以ZipArchiveOutputStream这个类压缩输出,可以选择输出到文件或者新的OutputStream或是通道。


在上面for循环代码中,依次将待压缩文件组装成ZipArchiveEntry最后通过parallelScatterZipCreator.addArchiveEntry(ZipArchiveEntry,inputStreamSupplier)添加到线程池中由多个线程同时处理执行。

for (String fileName : fileNameList) {
    File inFile = new File(fileName);
    final InputStreamSupplier inputStreamSupplier = () -> {
        try {
            return new FileInputStream(inFile);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
            return new NullInputStream(0);
        }
    };
    ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(inFile.getName());
    zipArchiveEntry.setMethod(ZipArchiveEntry.DEFLATED);
    zipArchiveEntry.setSize(inFile.length());
    zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 436);
    parallelScatterZipCreator.addArchiveEntry(zipArchiveEntry, inputStreamSupplier);
}
复制代码

ZipArchiveEntry是基础java.zip包里的ZipEntry类,增加了对额外字段的扩展。method为压缩方式这里默认写ZipArchiveEntry.DEFLATED即可,size则为压缩的输入流的大小,unixmode为设置之后可以支持命令行进行unzip操作。
将每个待压缩的文件即输入流包装成ZipArchiveEntry后由addArchiveEntry加入线程池中。

 /**
     * Adds an archive entry to this archive.
     * <p>
     * This method is expected to be called from a single client thread
     * </p>
     *
     * @param zipArchiveEntry The entry to add.
     * @param source          The source input stream supplier
     */

    public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
        submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
    }
    /**
    * 将zipEntry创建为callable任务
    */
    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry,
        final InputStreamSupplier source) {
        final int method = zipArchiveEntry.getMethod();
        if (method == ZipMethod.UNKNOWN_CODE) {
            throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
        }
        final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
        return new Callable<ScatterZipOutputStream>() {
            @Override
            public ScatterZipOutputStream call() throws Exception {
                ScatterZipOutputStream scatterStream = tlScatterStreams.get();
                //将zipEntry的压缩写入提交至ScatterZipOutputStream中的队列
                scatterStream.addArchiveEntry(zipArchiveEntryRequest);
                return scatterStream;
            }
        };
    }
    
    /**
     * Submit a callable for compression.
     *
     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
     *
     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
     * @since 1.19
     */
    public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
        futures.add(es.submit(callable));
    }
复制代码

这里通过源码可以看到最终addArchiveEntry是将每一个entry转换为callable任务最终提交给线程池执行,返回的futrue则加入了parallel类中的队列。提交的任务则是到ScatterZipOutputStream类中的Queue< CompressedEntry >中,最终在调用parallel的writeTo方法时,取出队列的所有CompressedEntry写入最终输出里。

/**
* 将压缩任务提交至队列中
*/
public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
        try (final InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
            streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
        }
        items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(),
                                      streamCompressor.getBytesWrittenForLastEntry(), streamCompressor.getBytesRead()));
                               
    }
 /**
 * 最终将压缩内容写入输出流
 */
 public void writeTo(final ZipArchiveOutputStream target) throws IOException {
        backingStore.closeForWriting();
        try (final InputStream data = backingStore.getInputStream()) {
            for (final CompressedEntry compressedEntry : items) {
                try (final BoundedInputStream rawStream = new BoundedInputStream(data,
                        compressedEntry.compressedSize)) {
                    target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
                }
            }
        }
    }
复制代码

整理了一个流程图

image.png