当前位置:主页 > 软件编程 > JAVA代码 >

Hadoop中的压缩与解压缩案例详解

时间:2022-11-16 09:15:30 | 栏目:JAVA代码 | 点击:

压缩主要关注点:压缩率,压缩速度,解压速度,是否可切片

一:压缩的作用

压缩技术能够减少底层HDFS读写字节数,减少磁盘IO,提升网络传输效率,因为磁盘IO和网络带宽是Hadoop的宝贵资源;特别是在运行MR程序时,I/O,网络数据传输,shuffle及Merge都要花大量的时间,因此压缩非常重要; 压缩是提升Hadoop运行效率的一种优化策略,使用得当可以提升效率,但是使用不当也可能降低效率

1.1:压缩的原则:

1、计算(运算)密集型任务:大量使用CPU去做数学运算,此时少用压缩
2、IO密集型任务:此时多用压缩         压缩需要消耗CPU资源

1.2:MR支持的压缩编码

DEFLATE                                                不支持切分

Gzip                                                         不支持切分

bzip2                                                         支持切分

LZO                非hadoop自带 安装           支持切分

Snappy                                                    不支持切分

为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器

  • org.apache.hadoop.io.compress.DefaultCodec
  • org.apache.hadoop.io.compress.GzipCodec
  • org.apache.hadoop.io.compress.BZip2Codec
  • com.hadoop.compression.lzo.LzopCodec
  • org.apache.hadoop.io.compress.SnappyCodec

1.3:压缩性能的比较

1.4:压缩方式的选择

1.4.1Gzip压缩

优点:压缩/解压缩比较快,处理Gzip格式文件和直接处理文本一样

缺点:不支持split

应用场景:

        当每个文件压缩之后在130M以内(1个块大小内),考虑。

1.4.2:Bzip2压缩

优点:比Gzip压缩率高,支持split

缺点:压缩/解压速度慢

应用场景:适合对速度要求不高,但需要较高的压缩率  

                   或者输出数据比较大,处理之后的数据需要压缩存档,同时对单个很大的文本文件像压缩减少存储空间,同时需支持split;

1.4.3LZO压缩

优点:压缩/解压缩比较快,合理的压缩率,支持split,是Hadoop最流行的压缩格式,在Linux系统下需要安装

缺点:压缩率比Gzip低一些,Hadoop本身不支持,为了支持split需要建立索引,还需要指定InputFormat为Lzo格式

应用场景:一个很大的文本文件,压缩之后还大于200M以上可以考虑,而且单个文件越大,LZO优点越明显;

1.4.4Snappy压缩

优点: 压缩速度和合理的压缩率

缺点:不支持split,压缩率比gzip低,Hadoop本身不支持需要安装

应用场景:当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的 中间数据的压缩格式,或者作为一个MapReduce作业的输出和另外一个MapReduce作业的输入。

压缩可以在MapReduce作用的任意阶段启用。

 二:MapReduce数据压缩

Map之前的输入端压缩 :(Hadoop自动检查文件扩展名如果扩展名能够匹配就会使用恰当的编解码方式对文件进行压缩和解压)

Mapper输出采用压缩:(可有效改善shuffle过程,此过程是资源消耗最多的环节)

注:(LZO是Hadoop通用的编解码器,其设计目标是达到与硬盘读取速度相当的压缩速度,因此速度是优先考虑的因素,其次是压缩率,LZO的压缩速度是Gzip的5倍,解压速度是Gzip的2倍)

Reducer输出压缩:压缩技术能够减少要存储的数据量,将i磁盘空间。

三:压缩的参数配置

io.compression.codecs   (在core-site.xml中配置)  (map输入前)

mapreduce.map.output.compress(在mapred-site.xml中配置)  (map到reduce)

mapreduce.map.output.compress.codec(在mapred-site.xml中配置)

mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)(reduce输出)

mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)

mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置)

如果压缩写到了配置文件中则,所有都会进行压缩,如果只是写到了当前程序中,只对当前程序起作用。

3.1:设置reduce输出端的压缩格式

//设置Reduced端输出压缩
FileOutputFormat.setCompressOutput(job,true);
//压缩的结果是BZip2Codec
FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);

3.2:设置map输入的压缩方式

//开启map输出压缩
conf.setBoolean("mapreduce.map.output.compress",true);
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

四:文件的压缩与解压缩案例

public class FileCompress {
    public static void main(String[] args) throws IOException {
        //压缩传入两个参数 path  以及压缩格式
//        compress("E:\\a.txt","org.apache.hadoop.io.compress.BZip2Codec");
 
        decompress("E:\\a.txt.bz2");
 
    }
 
    private static void decompress(String path) throws IOException {
        //1:校验是否能解压    CompressionCodecFactory     A factory that will find the correct codec for a given filename.
        CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
        //This class encapsulates a streaming compression/decompression pair.
        CompressionCodec codec = factory.getCodec(new Path(path));
        if (codec == null){
            System.out.println("cannot find codec for file " + path);
            return;
        }
        //2 获取普通输入流,再获取解压输入流
        FileInputStream fis = new FileInputStream(new File(path));
        //允许客户端 重新定义输入流
        CompressionInputStream cis = codec.createInputStream(fis);
        //3:获取输出流
        FileOutputStream fos = new FileOutputStream(new File(path + ".decodec"));
 
        //4 将压缩输入流写入输出流
        IOUtils.copyBytes(cis , fos, new Configuration());
        //5:关闭资源
        IOUtils.closeStream(fos);
        IOUtils.closeStream(cis);
        IOUtils.closeStream(fis);
 
    }
 
    private static void compress(String path, String method) throws IOException {
        //1:获取输入流
        FileInputStream fis = new FileInputStream(path);
        //2:获取压缩编码器   编解码器就是算吗
        CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
        CompressionCodec codec = factory.getCodecByName(method);
        //3:获取普通输出流,获取压缩输出流     获取编码器的扩展名
        FileOutputStream fos = new FileOutputStream(new File(path + codec.getDefaultExtension()));
        CompressionOutputStream cos = codec.createOutputStream(fos);
        //4:赋值输入流到流输出流
        IOUtils.copyBytes(fis,cos,new Configuration());
        //5 关闭资源
        IOUtils.closeStream(cos);
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
 
    }
}

您可能感兴趣的文章:

相关文章