自定义InputFormat合并小文件
案例需求
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。
案例分析
小文件的优化无非以下几种方式:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
- 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
- 在mapreduce处理时,可采用combineInputFormat提高效率
案例实现
本节实现的是上述第二种方式
- 首先继承FileInputFormat类自定义MyInputFormat方法,在自定义的MyInputFormat类中需重写RecordReader方法。
- 然后自定义MyRecordReader继承RecordReader,在自定义的MyRecordReader类中需重新initialize(初始化方法)、nextKeyValue(该方法用于获取 <k1,v1>,读取源数据转换为<k1,v1>)、getCurrentKey(返回k1)、getCurrentValue(返回v1)、getProgress(获取文件读取进度)、close(释放资源)等方法。
- 然后在MyInputFormat类中实例化自定义的MyRecordReader类并传递源数据,设置文件是否允许被切割(本案例不允许被切割)。
- Mapper代码按照正常逻辑实现,可以省略Reduce代码,最后主类JobMain中设置文件的读取类为自定义的MyInputFormat。
程序的核心机制:
- 自定义一个InputFormat
- 改写RecordReader,实现一次读取一个完整文件封装为KV
- 在输出时使用SequenceFileOutPutFormat输出合并文件
// 第一步
package myinput_format;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {
private Configuration configuration = null;
private FileSplit fileSplit = null;
// 定义标志位判断文件是否处理完成 ---false表示文件没有被读取完
private boolean processed = false;
private BytesWritable bytesWritable = new BytesWritable();
private FileSystem fileSystem = null;
private FSDataInputStream inputStream = null;
// 初始化
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
// inputSplit----文件切片包含文件名信息
fileSplit = (FileSplit)inputSplit;
// 获取Configuration 对象 ---向上提取(以便nextKeyValue函数使用)
configuration = taskAttemptContext.getConfiguration();
}
// 该方法用于获取 <k1,v1>,读取源数据转换为<k1,v1>
/*
* k1-------->NullWritable
* v1-------->BytesWritable
* */
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// 判断文件是否读取完成
if (!processed){
// 1.获取源文件字节输入流
// 1.1 获取源文件的文件系统(FileSystem)
fileSystem = FileSystem.get(configuration);
// 1.2 通过(FileSystem)获取文件字节输入流 ---源文件路径
inputStream = fileSystem.open(fileSplit.getPath());
// 2.获取源文件数据到普通的字节数组(byte[])
long a = fileSplit.getLength();
byte[] bytes = new byte[(int) a];
// byte[] bytes = new byte[(int) fileSplit.getLength() ];// 需要容纳下小文件字节大小,强行装换为int类型
//2.1源文件数据读取到上述自定义的字节数组
IOUtils.readFully(inputStream,bytes,0,(int) fileSplit.getLength());
// 3.普通的字节数组转换封装到BytesWritable,得到 v1
bytesWritable.set(bytes,0,(int) fileSplit.getLength());
processed = true;
return true;
}
return false;
}
//返回 k1
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
//返回 v1
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
// 获取文件读取进度
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
// 进行资源释放
@Override
public void close() throws IOException {
// 关闭输入流
inputStream.close();
fileSystem.close();
}
}
// 第二步
package myinput_format;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class MyInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
// RecordReader抽象类返回子类,需要自定义LineRecordReader类
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
// 1.创建自定义RecordReader对象
MyRecordReader myRecordReader = new MyRecordReader();
// 2.将源数据InputSplit和TaskAttemptContext传递给自定义RecordReader对象
myRecordReader.initialize(inputSplit,taskAttemptContext);
return myRecordReader;
}
// 设置文件是否可以被切割(功能实现小文件合并设置为不可被切割)
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
// 第三步
package myinput_format;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable> {
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
// <k1,v1>转为<k2,v2>
// 1.获取文件名作为k2
// 根据 context 获取文件切片--其中包含文件名
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
// 2.v2就是v1写入上下文
context.write(new Text(fileName),value);
}
}
// 第四步
package myinput_format;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
//该方法用于指定一个job任务
@Override
public int run(String[] strings) throws Exception {
// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称
Job job = Job.getInstance(super.getConf(), "input_format");
// 打包运行出错添加
job.setJarByClass(JobMain.class);
// 2.配置 job任务对象(八个步骤)
// 2.1 读取文件 ---指定读取类
job.setInputFormatClass(MyInputFormat.class);
// 指点源文件路径
MyInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/myinput_format"));
// 2.2 进入指定map阶段处理方式和数据类型
// 设置map阶段用的类
job.setMapperClass(SequenceFileMapper.class);
// 设置Map阶段K2的类型 --- 单词(字符串)
job.setMapOutputKeyClass(Text.class);
// 设置Map阶段V2的类型 ---
job.setMapOutputValueClass(BytesWritable.class);
// 2.3(4,5,6) 进入Shuffle阶段 --先采用默认方式处理
// 2.7 指定Reduce阶段的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 2.8 设置输出类型 -- 保存在二进制文件中
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));
// 判断目标目录是否存在,存在则删除
Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/myinput_format_out");
SequenceFileOutputFormat.setOutputPath(job,path);
// 获取hdfs文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());
// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());
// 判断目录是否存在
boolean exists = fileSystem.exists(path);
if (exists){
// 删除目标目录
fileSystem.delete(path,true);
}
// 等待任务结束
boolean bl = job.waitForCompletion(true);
return bl ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 启动 job任务 --记录任务执行状态 0表示成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
自定义outputFormat格式输出文件
案例需求
现在有一些订单的评论数据,需求,将订单的好评与差评进行区分开来,将最终的数据分开到不同的文件夹下面去,数据内容参见资料文件夹,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评
案例分析
程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现。
案例实现
- 首先继承FileOutputFormat类自定义MyOutputFormat类,在自定义的MyOutputFormat类中需重写RecordWriter方法。
- 然后自定义MyRecordWriter继承RecordWriter,在自定义的MyRecordWriter类中需重新write(具体输出数据的方法)、close(释放资源)等方法。
- 然后在MyOutputFormat类中实例化自定义的MyRecotdWriter类(有参构造传递)传递源数据
- Mapper代码按照正常逻辑实现,可以省略Reduce代码,最后主类JobMain中设置文件的输出类为自定义的MyOutputFormat。
// 第一步
package myoutput_format;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class MyRecordWriter extends RecordWriter<Text,NullWritable> {
private FSDataOutputStream goodCommentsOutputStream;
private FSDataOutputStream badCommentsOutputStream;
public MyRecordWriter() {
}
public MyRecordWriter(FSDataOutputStream goodCommentsOutputStream, FSDataOutputStream badCommentsOutputStream) {
this.goodCommentsOutputStream = goodCommentsOutputStream;
this.badCommentsOutputStream = badCommentsOutputStream;
}
/**
*
* @param text 行文本内容
* @param nullWritable
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
//1:从行文本数据中获取第9个字段
String[] split = text.toString().split("\t");
String numStr = split[9];
//2:根据字段的值,判断评论的类型,然后将对应的数据写入不同的文件夹文件中
if(Integer.parseInt(numStr) <= 1){
//好评或者中评
goodCommentsOutputStream.write(text.toString().getBytes());
// 添加换行符
goodCommentsOutputStream.write("\r\n".getBytes());
}else{
//差评
badCommentsOutputStream.write(text.toString().getBytes());
badCommentsOutputStream.write("\r\n".getBytes());
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(goodCommentsOutputStream);
IOUtils.closeStream(badCommentsOutputStream);
}
}
// 第二步
package myoutput_format;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
// 1.获取目标文件输出流(两个)
FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration());
// 文件写入路径
FSDataOutputStream goodCommentsOutputStream = fileSystem.create(new Path("hdfs://hadoop01:9000/hadoop_mapreduce/good_comments/good_comments.txt"));
FSDataOutputStream badCommentsOutputStream = fileSystem.create(new Path("hdfs://hadoop01:9000/hadoop_mapreduce/bad_comments/bad_comments.txt"));
// 2.传递给----->MyRecotdWriter类使用(有参构造传递)
MyRecordWriter myRecordWriter = new MyRecordWriter(goodCommentsOutputStream,badCommentsOutputStream);
return myRecordWriter;
}
}
// 第三步
package myoutput_format;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyOutputFormatMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
// 第四步
package myoutput_format;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
//该方法用于指定一个job任务
@Override
public int run(String[] strings) throws Exception {
// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称
Job job = Job.getInstance(super.getConf(), "output_format");
// 打包运行出错添加
job.setJarByClass(JobMain.class);
// 2.配置 job任务对象(八个步骤)
// 2.1 读取文件 ---指定读取类
job.setInputFormatClass(TextInputFormat.class);
// 指点源文件路径
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/myoutput_format"));
// 2.2 进入指定map阶段处理方式和数据类型
// 设置map阶段用的类
job.setMapperClass(MyOutputFormatMapper.class);
// 设置Map阶段K2的类型 --- 单词(字符串)
job.setMapOutputKeyClass(Text.class);
// 设置Map阶段V2的类型 ---
job.setMapOutputValueClass(NullWritable.class);
// 2.3(4,5,6) 进入Shuffle阶段 --先采用默认方式处理
// 2.7 指定Reduce阶段的数据类型
// 2.8 设置输出类型 -- 保存在二进制文件中
job.setOutputFormatClass(MyOutputFormat.class);
// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));
// 判断目标目录是否存在,存在则删除
Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/myoutput_format_out");
SequenceFileOutputFormat.setOutputPath(job,path);
// 获取hdfs文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());
// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());
// 判断目录是否存在
boolean exists = fileSystem.exists(path);
if (exists){
// 删除目标目录
fileSystem.delete(path,true);
}
// 等待任务结束
boolean bl = job.waitForCompletion(true);
return bl ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 启动 job任务 --记录任务执行状态 0表示成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}