此篇博客主要记录一下商品推荐系统的主要实现过程。
一、获取用户对商品的偏好值
代码实现
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.math.BigDecimal;
public class GoodsStep1 extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsStep1(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class GS1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
// 将行为转化为偏好值
double like = 0.0;
if (split.length >= 3) {
String str = split[2].toLowerCase();
if (str.equals("paysuccess")) { // 支付成功
like = 0.3;
} else if (str.equals("addreview")) { //评论
like = 0.3;
} else if (str.equals("createorder")) { // 创建订单
like = 0.2;
} else if (str.equals("addcar")){ // 加入购物车
like = 0.15;
} else { // 浏览
like = 0.05;
}
}
// key=用户:商品 value=[偏好,偏好]
Text outkey = new Text(split[0] + ":" + split[1]);
DoubleWritable outvalue = new DoubleWritable(like);
context.write(outkey, outvalue);
}
}
public static class GS1Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
// 避免精度丢失选用bigDecimal
BigDecimal sum = new BigDecimal(0.0);
for (DoubleWritable value : values) {
BigDecimal v = new BigDecimal(value.get());
sum = sum.add(v);
}
DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());
context.write(key, outvalue);
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "step1");
job.setJarByClass(this.getClass());
// 2、装配map,指定map对象,map对象的输出key和value
job.setMapperClass(GoodsStep1.GS1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
// 默认reduce
job.setReducerClass(GoodsStep1.GS1Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// 输入分片类型
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("data/userLog.log"));
// 5、指定输出文件,文件类型, 文件路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step1"));
// 6、运行
job.waitForCompletion(true);
return 0;
}
}
二、将偏好数据整理成偏好矩阵
代码实现
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class GoodsStep2 extends Configured implements Tool {
public static class GS2Mapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String uid_gid = key.toString();
String[] split = uid_gid.split(":");
// 将商品id作为输出key
Text outkey = new Text(split[1]);
// 将用户id与偏好值组合形成value
Text outvalue = new Text(split[0] + ":" + value.toString());
context.write(outkey, outvalue);
}
}
public static class GS2Reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer buffer = new StringBuffer();
for (Text value : values) {
buffer.append(value.toString()).append(",");
}
buffer.setLength(buffer.length() - 1);
context.write(key, new Text(buffer.toString()));
}
}
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsStep2(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");
Job job = Job.getInstance(conf, "step2");
job.setJarByClass(this.getClass());
// 2、装配map,指定map对象,map对象的输出key和value
job.setMapperClass(GoodsStep2.GS2Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 默认reduce
job.setReducerClass(GoodsStep2.GS2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入分片类型
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));
// 5、指定输出文件,文件类型, 文件路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step2"));
// 6、运行
job.waitForCompletion(true);
return 0;
}
}
三、统计商品共现次数
代码实现
笛卡尔积
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.ArrayList;
public class GoodsStep3 extends Configured implements Tool {
public static class GS3Mapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String uid_gid = key.toString();
String[] split = uid_gid.split(":");
Text outkey = new Text(split[0]);
Text outvalue = new Text(split[1]);
context.write(outkey, outvalue);
}
}
public static class GS3Reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
ArrayList<String> list = new ArrayList<>();
for (Text value:values) {
list.add(value.toString());
}
for (String g1 : list) {
for (String g2:list) {
if (!g1.equals(g2)) {
Text outkey = new Text(g1);
Text outvalue = new Text(g2);
context.write(outkey, outvalue);
}
}
}
}
}
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsStep3(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");
Job job = Job.getInstance(conf, "step3");
job.setJarByClass(this.getClass());
// 2、装配map,指定map对象,map对象的输出key和value
job.setMapperClass(GS3Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 默认reduce
job.setReducerClass(GS3Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入分片类型
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));
// 5、指定输出文件,文件类型, 文件路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step3"));
// 6、运行
job.waitForCompletion(true);
return 0;
}
}
共现次数
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class GoodsStep4 extends Configured implements Tool {
public static class GS4Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
String outkey = split[0] + ":" + split[1];
context.write(new Text(outkey), new IntWritable(1));
}
}
public static class GS4Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += 1;
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsStep4(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "step4");
job.setJarByClass(this.getClass());
// 2、装配map,指定map对象,map对象的输出key和value
job.setMapperClass(GS4Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 默认reduce
job.setReducerClass(GS4Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 输入分片类型
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("src/main/resources/step3/part-r-00000"));
// 5、指定输出文件,文件类型, 文件路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step4"));
// 6、运行
job.waitForCompletion(true);
return 0;
}
}
四、获取商品共现矩阵
代码实现
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class GoodsStep5 extends Configured implements Tool {
public static class GS5Mapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String goods = key.toString();
String[] split = goods.split(":");
// key为第一列商品,value为第二列商品:次数
Text outkey = new Text(split[0]);
Text outvalue = new Text(split[1] + ":" + value.toString());
context.write(outkey, outvalue);
}
}
public static class GS5Reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer buffer = new StringBuffer();
for (Text value : values) {
buffer.append(value.toString()).append(",");
}
buffer.setLength(buffer.length() - 1);
context.write(key, new Text(buffer.toString()));
}
}
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsStep5(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "step5");
job.setJarByClass(this.getClass());
// 2、装配map,指定map对象,map对象的输出key和value
job.setMapperClass(GS5Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 默认reduce
job.setReducerClass(GS5Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入分片类型
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step4/part-r-00000"));
// 5、指定输出文件,文件类型, 文件路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step5"));
// 6、运行
job.waitForCompletion(true);
return 0;
}
}
五、获取推荐值
代码实现
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
public class GoodsStep6 extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsStep6(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 第二步
* 375 11:0.25,5:0.25,4:0.55
* 商品 用户:偏好值
* 第五步
* 375 203:1,961:1,91:1,90:2,89:1
* 商品 商品:共现次数
* 输出数据:
* 用户:商品 推荐值
*/
public static class GS6Mapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
for (String str : split) {
// key=商品 value={用户:偏好值,商品:共现次数}
context.write(key, new Text(str));
}
}
}
public static class GS6Reducer extends Reducer<Text, Text, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 偏好集合[用户:偏好]
HashMap<String, String> like = new HashMap<>();
// 共现集合[商品:共现次数]
HashMap<String, String> same = new HashMap<>();
for (Text value : values) {
String data = value.toString();
String[] split = data.split(":");
if (split[1].contains(".")) {
like.put(split[0], split[1]);
} else {
same.put(split[0], split[1]);
}
}
for (Map.Entry<String, String> l : like.entrySet()) {
for (Map.Entry<String, String> s : same.entrySet()) {
//用户偏好值
BigDecimal lvalue = new BigDecimal(l.getValue());
//商品共现
BigDecimal svalue = new BigDecimal(s.getValue());
//用户:共现商品
Text outkey = new Text(l.getKey() + ":" + s.getKey());
double outvalue = lvalue.multiply(svalue).doubleValue();
context.write(outkey, new DoubleWritable(outvalue));
}
}
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "step6");
job.setJarByClass(this.getClass());
// 2、装配map,指定map对象,map对象的输出key和value
job.setMapperClass(GoodsStep6.GS6Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 默认reduce
job.setReducerClass(GoodsStep6.GS6Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// 输入分片类型
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step2"),
new Path("src/main/resources/step5"));
// 5、指定输出文件,文件类型, 文件路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step6"));
// 6、运行
job.waitForCompletion(true);
return 0;
}
}
六、推荐值累加及数据清洗
代码实现
推荐值累加
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.math.BigDecimal;
public class GoodsStep7 extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsStep7(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class GS7Mapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class GS7Reducer extends Reducer<Text, Text, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
BigDecimal sum = new BigDecimal(0.0);
for (Text value : values) {
BigDecimal v = new BigDecimal(value.toString());
sum = sum.add(v);
}
DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());
context.write(key, outvalue);
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "step7");
job.setJarByClass(this.getClass());
// 2、装配map,指定map对象,map对象的输出key和value
job.setMapperClass(GoodsStep7.GS7Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 默认reduce
job.setReducerClass(GoodsStep7.GS7Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// 输入分片类型
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step6"));
// 5、指定输出文件,文件类型, 文件路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step7"));
// 6、运行
job.waitForCompletion(true);
return 0;
}
}
数据清洗
统计已经支付成功一次的数据
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class GoodsStep8 extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsStep8(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class GS8Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
boolean paySuccess = split[2].toLowerCase().equals("paysuccess");
if (paySuccess) {
context.write(new Text(split[0] + ":" + split[1]), new IntWritable(1));
}
}
}
public static class GS8Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
for (IntWritable i : values) {
num ++;
}
if (num == 1) context.write(key, new IntWritable(num));
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "step8.1");
job.setJarByClass(this.getClass());
// 2、装配map,指定map对象,map对象的输出key和value
job.setMapperClass(GoodsStep8.GS8Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 默认reduce
job.setReducerClass(GoodsStep8.GS8Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 输入分片类型
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("data/userLog.log"));
// 5、指定输出文件,文件类型, 文件路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8"));
// 6、运行
job.waitForCompletion(true);
return 0;
}
}
在整理出来的数据中去除统计出来支付成功的
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Iterator;
public class GoodsStep8_2 extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsStep8_2(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class GS8_1Mapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class GS8_1Reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// int num = 0;
// String outvalue = "";
// for (Text value : values) {
// outvalue = value.toString();
// num ++;
// }
// if (num == 1) context.write(key, new Text(outvalue));
Iterator<Text> iter = values.iterator();
Text outvalue = iter.next();
if (iter.hasNext()) {}
else {
context.write(key, outvalue);
}
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "step8.2");
job.setJarByClass(this.getClass());
// 2、装配map,指定map对象,map对象的输出key和value
job.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 默认reduce
job.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入分片类型
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step7"),
new Path("src/main/resources/step8"));
// 5、指定输出文件,文件类型, 文件路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8_2"));
// 6、运行
job.waitForCompletion(true);
return 0;
}
}
七、写入数据库
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
public class GoodsStep9 {
public static void main(String[] args) {
try {
toDB();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void toDB() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Class.forName("com.mysql.cj.jdbc.Driver");
Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmall?serverTimezone=Asia/Shanghai", "briup", "briup");
Statement statement = null;
FSDataInputStream open = fs.open(new Path("src/main/resources/step8_2/part-r-00000"));
BufferedReader br = new BufferedReader(new InputStreamReader(open));
String line = "";
while ((line = br.readLine()) != null) {
// 11:512 0.25
// 用户:商品 推荐值
String[] str = line.split("\t");
String[] uid_gid = str[0].split(":");
statement = conn.createStatement();
String sql = "delete from recommend where customerId = '" + uid_gid[0] + "' and bookId = '" + uid_gid[1] + "'";
String sql2 = "insert into recommend(customerId, bookId, recommendNum) values ('"
+ uid_gid[0] + "','" + uid_gid[1] + "'," + str[1] + ")";
statement.addBatch(sql);
statement.addBatch(sql2);
statement.executeBatch();
}
}
}
八、工作流
package zb.grms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GoodsMain extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new GoodsMain(), args);
GoodsStep9.toDB();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
// String inpath = new String("inpath");
// Path path = new Path(conf.get(inpath));
Path path = new Path("data/userLog.log");
Path outpath = new Path("src/main/resources/step1");
Path outpath2 = new Path("src/main/resources/step2");
Path outpath3 = new Path("src/main/resources/step3");
Path outpath4 = new Path("src/main/resources/step4");
Path outpath5 = new Path("src/main/resources/step5");
Path outpath6 = new Path("src/main/resources/step6");
Path outpath7 = new Path("src/main/resources/step7");
Path outpath8 = new Path("src/main/resources/step8");
Path outpath9 = new Path("src/main/resources/step8_2");
//获取所有mr步骤job配置
//step1
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setMapperClass(GoodsStep1.GS1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setReducerClass(GoodsStep1.GS1Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,path);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,outpath);
//step8
Job job8 = Job.getInstance(conf);
job8.setJarByClass(this.getClass());
job8.setMapperClass(GoodsStep8.GS8Mapper.class);
job8.setMapOutputKeyClass(Text.class);
job8.setMapOutputValueClass(IntWritable.class);
job8.setReducerClass(GoodsStep8.GS8Reducer.class);
job8.setOutputKeyClass(Text.class);
job8.setOutputValueClass(IntWritable.class);
job8.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job8,path);
job8.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job8,outpath8);
//step2
Job job2 = Job.getInstance(conf);
job2.setJarByClass(this.getClass());
job2.setMapperClass(GoodsStep2.GS2Mapper.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setReducerClass(GoodsStep2.GS2Reducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job2,outpath);
job2.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job2,outpath2);
//step3bookId
Job job3 = Job.getInstance(conf);
job3.setJarByClass(this.getClass());
job3.setMapperClass(GoodsStep3.GS3Mapper.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setReducerClass(GoodsStep3.GS3Reducer.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
job3.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job3,outpath);
job3.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job3,outpath3);
//step4
Job job4 = Job.getInstance(conf);
job4.setJarByClass(this.getClass());
job4.setMapperClass(GoodsStep4.GS4Mapper.class);
job4.setMapOutputKeyClass(Text.class);
job4.setMapOutputValueClass(IntWritable.class);
job4.setReducerClass(GoodsStep4.GS4Reducer.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(IntWritable.class);
job4.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job4,outpath3);
job4.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job4,outpath4);
//step5
Job job5 = Job.getInstance(conf);
job5.setJarByClass(this.getClass());
job5.setMapperClass(GoodsStep5.GS5Mapper.class);
job5.setMapOutputKeyClass(Text.class);
job5.setMapOutputValueClass(Text.class);
job5.setReducerClass(GoodsStep5.GS5Reducer.class);
job5.setOutputKeyClass(Text.class);
job5.setOutputValueClass(Text.class);
job5.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job5,outpath4);
job5.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job5,outpath5);
//step6
Job job6 = Job.getInstance(conf);
job6.setJarByClass(this.getClass());
job6.setMapperClass(GoodsStep6.GS6Mapper.class);
job6.setMapOutputKeyClass(Text.class);
job6.setMapOutputValueClass(Text.class);
job6.setReducerClass(GoodsStep6.GS6Reducer.class);
job6.setOutputKeyClass(Text.class);
job6.setOutputValueClass(DoubleWritable.class);
job6.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.setInputPaths(job6,outpath2,outpath5);
job6.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job6,outpath6);
//step7
Job job7 = Job.getInstance(conf);
job7.setJarByClass(this.getClass());
job7.setMapperClass(GoodsStep7.GS7Mapper.class);
job7.setMapOutputKeyClass(Text.class);
job7.setMapOutputValueClass(Text.class);
job7.setReducerClass(GoodsStep7.GS7Reducer.class);
job7.setOutputKeyClass(Text.class);
job7.setOutputValueClass(DoubleWritable.class);
job7.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.setInputPaths(job7,outpath6);
job7.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job7,outpath7);
//step9
Job job9 = Job.getInstance(conf);
job9.setJarByClass(this.getClass());
job9.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);
job9.setMapOutputKeyClass(Text.class);
job9.setMapOutputValueClass(Text.class);
job9.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);
job9.setOutputKeyClass(Text.class);
job9.setOutputValueClass(Text.class);
job9.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.setInputPaths(job9,outpath7,outpath8);
job9.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job9,outpath9);
//创建可控作业
ControlledJob cj = new ControlledJob(conf);
cj.setJob(job);
ControlledJob cj2 = new ControlledJob(conf);
cj2.setJob(job2);
ControlledJob cj3 = new ControlledJob(conf);
cj3.setJob(job3);
ControlledJob cj4 = new ControlledJob(conf);
cj4.setJob(job4);
ControlledJob cj5 = new ControlledJob(conf);
cj5.setJob(job5);
ControlledJob cj6 = new ControlledJob(conf);
cj6.setJob(job6);
ControlledJob cj7 = new ControlledJob(conf);
cj7.setJob(job7);
ControlledJob cj8 = new ControlledJob(conf);
cj8.setJob(job8);
ControlledJob cj9 = new ControlledJob(conf);
cj9.setJob(job9);
//添加作业间的依赖关系
cj2.addDependingJob(cj);
cj3.addDependingJob(cj);
cj4.addDependingJob(cj3);
cj5.addDependingJob(cj4);
cj6.addDependingJob(cj2);
cj6.addDependingJob(cj5);
cj7.addDependingJob(cj6);
cj9.addDependingJob(cj7);
cj9.addDependingJob(cj8);
//创建工作流,创建控制器
JobControl jobs = new JobControl("work_flow");
jobs.addJob(cj);
jobs.addJob(cj2);
jobs.addJob(cj3);
jobs.addJob(cj4);
jobs.addJob(cj5);
jobs.addJob(cj6);
jobs.addJob(cj7);
jobs.addJob(cj8);
jobs.addJob(cj9);
//启动控制器-》一键完成所有mr计算任务
Thread t=new Thread(jobs);
t.start();
while(true){
if(jobs.allFinished()){
System.out.println("作业全部完成");
System.out.println(jobs.getSuccessfulJobList());
jobs.stop();
return 0;
}else if(jobs.getFailedJobList().size()>0) {
System.out.println("任务失败");
System.out.println(jobs.getFailedJobList());
jobs.stop();
return -1;
}
}
}
}