电商推荐系统

在这里插入图片描述

此篇博客主要记录一下商品推荐系统的主要实现过程。

一、获取用户对商品的偏好值

在这里插入图片描述

代码实现

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.math.BigDecimal;

public class GoodsStep1 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep1(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class GS1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            // 将行为转化为偏好值
            double like = 0.0;
            if (split.length >= 3) {
                String str = split[2].toLowerCase();
                if (str.equals("paysuccess")) { // 支付成功
                    like = 0.3;
                } else if (str.equals("addreview")) { //评论
                    like = 0.3;
                } else if (str.equals("createorder")) { // 创建订单
                    like = 0.2;
                } else if (str.equals("addcar")){ // 加入购物车
                    like = 0.15;
                } else { // 浏览
                    like = 0.05;
                }
            }
            // key=用户:商品 value=[偏好,偏好]
            Text outkey = new Text(split[0] + ":" + split[1]);
            DoubleWritable outvalue = new DoubleWritable(like);
            context.write(outkey, outvalue);
        }
    }

    public static class GS1Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            // 避免精度丢失选用bigDecimal
            BigDecimal sum = new BigDecimal(0.0);
            for (DoubleWritable value : values) {
                BigDecimal v = new BigDecimal(value.get());
                sum = sum.add(v);
            }
            DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());
            context.write(key, outvalue);
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step1");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep1.GS1Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        //  默认reduce
        job.setReducerClass(GoodsStep1.GS1Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        // 输入分片类型
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("data/userLog.log"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step1"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

二、将偏好数据整理成偏好矩阵

在这里插入图片描述
在这里插入图片描述

代码实现

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class GoodsStep2 extends Configured implements Tool {

    public static class GS2Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String uid_gid = key.toString();
            String[] split = uid_gid.split(":");
            // 将商品id作为输出key
            Text outkey = new Text(split[1]);
            // 将用户id与偏好值组合形成value
            Text outvalue = new Text(split[0] + ":" + value.toString());
            context.write(outkey, outvalue);
        }
    }

    public static class GS2Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer buffer = new StringBuffer();
            for (Text value : values) {
                buffer.append(value.toString()).append(",");
            }
            buffer.setLength(buffer.length() - 1);
            context.write(key, new Text(buffer.toString()));
        }
    }

    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep2(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");
        Job job = Job.getInstance(conf, "step2");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep2.GS2Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GoodsStep2.GS2Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step2"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

三、统计商品共现次数

在这里插入图片描述

代码实现

笛卡尔积
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;


public class GoodsStep3 extends Configured implements Tool {
    public static class GS3Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String uid_gid = key.toString();
            String[] split = uid_gid.split(":");
            Text outkey = new Text(split[0]);
            Text outvalue = new Text(split[1]);
            context.write(outkey, outvalue);
        }
    }

    public static class GS3Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            ArrayList<String> list = new ArrayList<>();
            for (Text value:values) {
                list.add(value.toString());
            }

            for (String g1 : list) {
                for (String g2:list) {
                    if (!g1.equals(g2)) {
                        Text outkey = new Text(g1);
                        Text outvalue = new Text(g2);
                        context.write(outkey, outvalue);
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep3(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");
        Job job = Job.getInstance(conf, "step3");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GS3Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GS3Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step3"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}
共现次数
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class GoodsStep4 extends Configured implements Tool {

    public static class GS4Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");
            String outkey = split[0] + ":" + split[1];
            context.write(new Text(outkey), new IntWritable(1));
        }
    }

    public static class GS4Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable i : values) {
                sum += 1;
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep4(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step4");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GS4Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //  默认reduce
        job.setReducerClass(GS4Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 输入分片类型
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("src/main/resources/step3/part-r-00000"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step4"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

四、获取商品共现矩阵

在这里插入图片描述

代码实现

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class GoodsStep5 extends Configured implements Tool {
    public static class GS5Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String goods = key.toString();
            String[] split = goods.split(":");
            // key为第一列商品,value为第二列商品:次数
            Text outkey = new Text(split[0]);
            Text outvalue = new Text(split[1] + ":" + value.toString());
            context.write(outkey, outvalue);
        }
    }

    public static class GS5Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer buffer = new StringBuffer();
            for (Text value : values) {
                buffer.append(value.toString()).append(",");
            }
            buffer.setLength(buffer.length() - 1);
            context.write(key, new Text(buffer.toString()));
        }
    }

    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep5(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step5");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GS5Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GS5Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step4/part-r-00000"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step5"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

五、获取推荐值

在这里插入图片描述

代码实现

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;

public class GoodsStep6 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep6(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 第二步
     * 375	11:0.25,5:0.25,4:0.55
     * 商品 用户:偏好值
     * 第五步
     * 375	203:1,961:1,91:1,90:2,89:1
     * 商品 商品:共现次数
     * 输出数据:
     * 用户:商品 推荐值
     */
    public static class GS6Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            for (String str : split) {
                // key=商品 value={用户:偏好值,商品:共现次数}
                context.write(key, new Text(str));
            }
        }
    }

    public static class GS6Reducer extends Reducer<Text, Text, Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 偏好集合[用户:偏好]
            HashMap<String, String> like = new HashMap<>();
            // 共现集合[商品:共现次数]
            HashMap<String, String> same = new HashMap<>();
            for (Text value : values) {
                String data = value.toString();
                String[] split = data.split(":");
                if (split[1].contains(".")) {
                    like.put(split[0], split[1]);
                } else {
                    same.put(split[0], split[1]);
                }
            }
            for (Map.Entry<String, String> l : like.entrySet()) {
                for (Map.Entry<String, String> s : same.entrySet()) {
                    //用户偏好值
                    BigDecimal lvalue = new BigDecimal(l.getValue());
                    //商品共现
                    BigDecimal svalue = new BigDecimal(s.getValue());
                    //用户:共现商品
                    Text outkey = new Text(l.getKey() + ":" + s.getKey());
                    double outvalue = lvalue.multiply(svalue).doubleValue();
                    context.write(outkey, new DoubleWritable(outvalue));
                }
            }
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step6");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep6.GS6Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GoodsStep6.GS6Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step2"),
                new Path("src/main/resources/step5"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step6"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

六、推荐值累加及数据清洗

在这里插入图片描述

代码实现

推荐值累加
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.math.BigDecimal;

public class GoodsStep7 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep7(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class GS7Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static class GS7Reducer extends Reducer<Text, Text, Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            BigDecimal sum = new BigDecimal(0.0);
            for (Text value : values) {
                BigDecimal v = new BigDecimal(value.toString());
                sum = sum.add(v);
            }
            DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());
            context.write(key, outvalue);
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step7");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep7.GS7Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GoodsStep7.GS7Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step6"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step7"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}
数据清洗
统计已经支付成功一次的数据
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class GoodsStep8 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep8(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class GS8Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            boolean paySuccess = split[2].toLowerCase().equals("paysuccess");
            if (paySuccess) {
                context.write(new Text(split[0] + ":" + split[1]), new IntWritable(1));
            }
        }
    }

    public static class GS8Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int num = 0;
            for (IntWritable i : values) {
                num ++;
            }
            if (num == 1) context.write(key, new IntWritable(num));
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step8.1");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep8.GS8Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //  默认reduce
        job.setReducerClass(GoodsStep8.GS8Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 输入分片类型
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("data/userLog.log"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}
在整理出来的数据中去除统计出来支付成功的
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class GoodsStep8_2 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep8_2(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class GS8_1Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static class GS8_1Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//            int num = 0;
//            String outvalue = "";
//            for (Text value : values) {
//                outvalue = value.toString();
//                num ++;
//            }
//            if (num == 1) context.write(key, new Text(outvalue));
            Iterator<Text> iter = values.iterator();
            Text outvalue = iter.next();
            if (iter.hasNext()) {}
            else {
                context.write(key, outvalue);
            }
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step8.2");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step7"),
                new Path("src/main/resources/step8"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8_2"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

七、写入数据库

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

public class GoodsStep9 {
    public static void main(String[] args) {
        try {
            toDB();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void toDB() throws Exception {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Class.forName("com.mysql.cj.jdbc.Driver");
        Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmall?serverTimezone=Asia/Shanghai", "briup", "briup");
        Statement statement = null;
        FSDataInputStream open = fs.open(new Path("src/main/resources/step8_2/part-r-00000"));
        BufferedReader br = new BufferedReader(new InputStreamReader(open));
        String line = "";
        while ((line = br.readLine()) != null) {
            // 11:512	0.25
            // 用户:商品 推荐值
            String[] str = line.split("\t");
            String[] uid_gid = str[0].split(":");
            statement = conn.createStatement();
            String sql = "delete from recommend where customerId = '" + uid_gid[0] + "' and bookId = '" + uid_gid[1] + "'";
            String sql2 = "insert into recommend(customerId, bookId, recommendNum) values ('"
                    + uid_gid[0] + "','" + uid_gid[1] + "'," + str[1] + ")";
            statement.addBatch(sql);
            statement.addBatch(sql2);
            statement.executeBatch();
        }
    }
}

八、工作流

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class GoodsMain extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsMain(), args);
            GoodsStep9.toDB();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
//        String inpath = new String("inpath");
//        Path path = new Path(conf.get(inpath));
        Path path = new Path("data/userLog.log");
        Path outpath =  new Path("src/main/resources/step1");
        Path outpath2 = new Path("src/main/resources/step2");
        Path outpath3 = new Path("src/main/resources/step3");
        Path outpath4 = new Path("src/main/resources/step4");
        Path outpath5 = new Path("src/main/resources/step5");
        Path outpath6 = new Path("src/main/resources/step6");
        Path outpath7 = new Path("src/main/resources/step7");
        Path outpath8 = new Path("src/main/resources/step8");
        Path outpath9 = new Path("src/main/resources/step8_2");
        //获取所有mr步骤job配置
        //step1
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setMapperClass(GoodsStep1.GS1Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        job.setReducerClass(GoodsStep1.GS1Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,path);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,outpath);
        //step8
        Job job8 = Job.getInstance(conf);
        job8.setJarByClass(this.getClass());
        job8.setMapperClass(GoodsStep8.GS8Mapper.class);
        job8.setMapOutputKeyClass(Text.class);
        job8.setMapOutputValueClass(IntWritable.class);
        job8.setReducerClass(GoodsStep8.GS8Reducer.class);
        job8.setOutputKeyClass(Text.class);
        job8.setOutputValueClass(IntWritable.class);
        job8.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job8,path);
        job8.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job8,outpath8);
        //step2
        Job job2 = Job.getInstance(conf);
        job2.setJarByClass(this.getClass());
        job2.setMapperClass(GoodsStep2.GS2Mapper.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setReducerClass(GoodsStep2.GS2Reducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        job2.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job2,outpath);
        job2.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job2,outpath2);
        //step3bookId
        Job job3 = Job.getInstance(conf);
        job3.setJarByClass(this.getClass());
        job3.setMapperClass(GoodsStep3.GS3Mapper.class);
        job3.setMapOutputKeyClass(Text.class);
        job3.setMapOutputValueClass(Text.class);
        job3.setReducerClass(GoodsStep3.GS3Reducer.class);
        job3.setOutputKeyClass(Text.class);
        job3.setOutputValueClass(Text.class);
        job3.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job3,outpath);
        job3.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job3,outpath3);
        //step4
        Job job4 = Job.getInstance(conf);
        job4.setJarByClass(this.getClass());
        job4.setMapperClass(GoodsStep4.GS4Mapper.class);
        job4.setMapOutputKeyClass(Text.class);
        job4.setMapOutputValueClass(IntWritable.class);
        job4.setReducerClass(GoodsStep4.GS4Reducer.class);
        job4.setOutputKeyClass(Text.class);
        job4.setOutputValueClass(IntWritable.class);
        job4.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job4,outpath3);
        job4.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job4,outpath4);
        //step5
        Job job5 = Job.getInstance(conf);
        job5.setJarByClass(this.getClass());
        job5.setMapperClass(GoodsStep5.GS5Mapper.class);
        job5.setMapOutputKeyClass(Text.class);
        job5.setMapOutputValueClass(Text.class);
        job5.setReducerClass(GoodsStep5.GS5Reducer.class);
        job5.setOutputKeyClass(Text.class);
        job5.setOutputValueClass(Text.class);
        job5.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job5,outpath4);
        job5.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job5,outpath5);
        //step6
        Job job6 = Job.getInstance(conf);
        job6.setJarByClass(this.getClass());
        job6.setMapperClass(GoodsStep6.GS6Mapper.class);
        job6.setMapOutputKeyClass(Text.class);
        job6.setMapOutputValueClass(Text.class);
        job6.setReducerClass(GoodsStep6.GS6Reducer.class);
        job6.setOutputKeyClass(Text.class);
        job6.setOutputValueClass(DoubleWritable.class);
        job6.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job6,outpath2,outpath5);
        job6.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job6,outpath6);
        //step7
        Job job7 = Job.getInstance(conf);
        job7.setJarByClass(this.getClass());
        job7.setMapperClass(GoodsStep7.GS7Mapper.class);
        job7.setMapOutputKeyClass(Text.class);
        job7.setMapOutputValueClass(Text.class);
        job7.setReducerClass(GoodsStep7.GS7Reducer.class);
        job7.setOutputKeyClass(Text.class);
        job7.setOutputValueClass(DoubleWritable.class);
        job7.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job7,outpath6);
        job7.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job7,outpath7);
        //step9
        Job job9 = Job.getInstance(conf);
        job9.setJarByClass(this.getClass());
        job9.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);
        job9.setMapOutputKeyClass(Text.class);
        job9.setMapOutputValueClass(Text.class);
        job9.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);
        job9.setOutputKeyClass(Text.class);
        job9.setOutputValueClass(Text.class);
        job9.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job9,outpath7,outpath8);
        job9.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job9,outpath9);

        //创建可控作业
        ControlledJob cj = new ControlledJob(conf);
        cj.setJob(job);
        ControlledJob cj2 = new ControlledJob(conf);
        cj2.setJob(job2);
        ControlledJob cj3 = new ControlledJob(conf);
        cj3.setJob(job3);
        ControlledJob cj4 = new ControlledJob(conf);
        cj4.setJob(job4);
        ControlledJob cj5 = new ControlledJob(conf);
        cj5.setJob(job5);
        ControlledJob cj6 = new ControlledJob(conf);
        cj6.setJob(job6);
        ControlledJob cj7 = new ControlledJob(conf);
        cj7.setJob(job7);
        ControlledJob cj8 = new ControlledJob(conf);
        cj8.setJob(job8);
        ControlledJob cj9 = new ControlledJob(conf);
        cj9.setJob(job9);
        //添加作业间的依赖关系
        cj2.addDependingJob(cj);
        cj3.addDependingJob(cj);
        cj4.addDependingJob(cj3);
        cj5.addDependingJob(cj4);
        cj6.addDependingJob(cj2);
        cj6.addDependingJob(cj5);
        cj7.addDependingJob(cj6);
        cj9.addDependingJob(cj7);
        cj9.addDependingJob(cj8);
        //创建工作流,创建控制器
        JobControl jobs = new JobControl("work_flow");
        jobs.addJob(cj);
        jobs.addJob(cj2);
        jobs.addJob(cj3);
        jobs.addJob(cj4);
        jobs.addJob(cj5);
        jobs.addJob(cj6);
        jobs.addJob(cj7);
        jobs.addJob(cj8);
        jobs.addJob(cj9);
        //启动控制器-》一键完成所有mr计算任务
        Thread t=new Thread(jobs);
        t.start();
        while(true){
            if(jobs.allFinished()){
                System.out.println("作业全部完成");
                System.out.println(jobs.getSuccessfulJobList());
                jobs.stop();
                return 0;
            }else if(jobs.getFailedJobList().size()>0) {
                System.out.println("任务失败");
                System.out.println(jobs.getFailedJobList());
                jobs.stop();
                return -1;
            }
        }
    }
}

总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/373111.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python 连接 mysql 详解(mysql-connector-python)

文章目录 1 概述1.1 第三方库&#xff1a;mysql-connector-python1.2 可视化工具&#xff1a;navicat1.3 创建测试数据库 2 连接 mysql 数据库2.1 创建一个连接2.2 捕获连接异常2.3 从配置文件中获取连接信息 3 执行 sql 语句3.1 插入、更新、删除3.2 查询 1 概述 1.1 第三方库…

Photoshop 2023下载安装教程,免费直装版,2步搞定安装,附安装包

准备工作&#xff1a; 1、提前准备好photoshop 2023安装包 没有的可以参考下面方式获取 2、系统要求Windows 10 及以上 安装步骤 1.找到下载好的安装包&#xff0c;直接双击解压 2.双击运行【Set-up.exe】文件 3.点击文件夹图标&#xff0c;更改安装位置 4.点击【继续】&a…

从一到无穷大 #22 基于对象存储执行OLAP分析的学术or工程经验,我们可以从中学习到什么?

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作)&#xff0c;由 李兆龙 确认&#xff0c;转载请注明版权。 文章目录 引言以AWS S3为例的对象存储基本特征成本时延吞吐量最优请求大小Model for Cloud Sto…

Visual Studio使用Git忽略不想上传到远程仓库的文件

前言 作为一个.NET开发者而言&#xff0c;有着宇宙最强IDE&#xff1a;Visual Studio加持&#xff0c;让我们的开发效率得到了更好的提升。我们不需要担心环境变量的配置和其他代码管理工具&#xff0c;因为Visual Studio有着众多的拓展工具。废话不多说&#xff0c;直接进入正…

机器学习 | 基于网格搜索的SVM超参数调节

机器学习模型被定义为一个数学模型&#xff0c;其中包含许多需要从数据中学习的参数。然而&#xff0c;有一些参数&#xff0c;称为超参数&#xff0c;这些参数不能直接学习。它们通常是由人类在实际训练开始前根据直觉或经验和试验选择的。这些参数通过提高模型的性能&#xf…

Deepin系统安装x11vnc远程桌面工具实现无公网ip访问本地桌面

文章目录 1. 安装x11vnc2. 本地远程连接测试3. Deepin安装Cpolar4. 配置公网远程地址5. 公网远程连接Deepin桌面6. 固定连接公网地址7. 固定公网地址连接测试 x11vnc是一种在Linux系统中实现远程桌面控制的工具&#xff0c;它的原理是通过X Window系统的协议来实现远程桌面的展…

2018年苏州大学837复试机试C/C++

2018年苏州大学复试机试 要求 要求用C/C编程&#xff1b;对程序中必要的地方进行注释。上机规则 请在电脑桌面上新建一个文件夹文件夹名为考试姓名&#xff08;中文&#xff09;&#xff1b;考试完毕后&#xff0c;将所编写的文件放在上述文件中。 第一题&#xff08;20分&…

CTFshow web(php特性 105-108)

web105 <?php /* # -*- coding: utf-8 -*- # Author: Firebasky # Date: 2020-09-16 11:25:09 # Last Modified by: h1xa # Last Modified time: 2020-09-28 22:34:07 */ highlight_file(__FILE__); include(flag.php); error_reporting(0); $error你还想要flag嘛&…

在windows和Linux中的安装 boost 以及 安装 muduo 和 mysql

一、CMake安装 Ubuntu Linux 下安装和卸载cmake 3.28.2版本-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/135960115?spm1001.2014.3001.5501二、安装boost boost官网&#xff1a;boost官网 我下载的boost版本&#xff1a; windows:boost_1_84_0.zipli…

ROS机器视觉应用中的关键点

1.ROS图像接口 ​​​​​​ 2.摄像头内参标定 3.ROS&#xff0b;OpenCV物体识别 ​​​ 4.小结

C++学习Day04之对象字节数初探

目录 一、程序及输出1.1 空类字节数1.2 非空类字节数1.3 设置对齐方式的数值 二、分析与总结 一、程序及输出 1.1 空类字节数 #include<iostream> using namespace std;class Person { };void test01() {//空类的sizeof结果是1 原因 每个对象都应该在内存上有独一无二…

项目02《游戏-07-开发》Unity3D

基于 项目02《游戏-06-开发》Unity3D &#xff0c; 接下来做UI框架的逻辑系统&#xff0c;管理器和UI背包&#xff0c; 首先闯将UI框架的两个重要脚本 BasePanel.cs 和 UIManager.cs &#xff0c; 双击BasePanel.cs脚本修改代码&#xff1a; using UnityEngine; pu…

从头开始构建和训练 Transformer(上)

1、导 读 2017 年&#xff0c;Google 研究团队发表了一篇名为《Attention Is All You Need》的论文&#xff0c;提出了 Transformer 架构&#xff0c;是机器学习&#xff0c;特别是深度学习和自然语言处理领域的范式转变。 Transformer 具有并行处理功能&#xff0c;可以实现…

bitcoin core 请求拒绝响应【或者】卡死

日志 经过排查节点日志&#xff0c;发现抛出异常。 tail -f debug.log日志&#xff1a; 2024-02-05T05:56:26Z BlockUntilSyncedToCurrentChain: txindex is catching up on block notifications 2024-02-05T05:56:26Z BlockUntilSyncedToCurrentChain: txindex is catching…

Docker容器化扫描SonarScanner

文章目录 1.SonarQube创建项目1.1 手工创建1.2 创建web_test_learning1.3 选择本地1.4 点击创建1.5 点击继续1.6 项目总揽 2.SonarScanner2.1 获取镜像2.2 启动扫描2.3 页面结果 3.SonarLint3.1 下载SonarLint插件3.2 配置连接到SonarQube3.3 在SonarQube页面“用IDE打开”3.4 …

今日arXiv最热NLP大模型论文:微软提出SliceGPT,删除25%模型参数,性能几乎无损

引言&#xff1a;探索大型语言模型的高效压缩方法 随着大型语言模型&#xff08;LLMs&#xff09;在自然语言处理领域的广泛应用&#xff0c;它们对计算和内存资源的巨大需求成为了一个不容忽视的问题。为了缓解这些资源限制&#xff0c;研究者们提出了多种模型压缩方法&#…

《短链接--阿丹》--技术选型与架构分析

整个短链接专栏会持续更新。有兴趣的可以关注一下我的这个专栏。 《短链接--搭建解析》--立项+需求分析文档-CSDN博客 阿丹: 其实整套项目中的重点,根据上面的简单需求分析来看,整体的项目难题有两点。 1、快速的批量生成短链,并找到对应的存储。 并且要保持唯一性质。…

java基本知识详解

八大基本数据类型 java的数据类型可以说很简洁&#xff0c;只有整型&#xff0c;浮点型&#xff0c;字符型&#xff0c;和布尔型四大种&#xff0c;八小种基本类型。 整型 byte&#xff1a;-2^7 ~ 2^7-1&#xff0c;即-128 ~ 127。1字节。 short&#xff1a;-2^15 ~ 2^15-…

SpringIOC之support模块PropertySourcesPlaceholderConfigurer

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

app对接优量汇收益如何?

优量汇作为国内头部的广告联盟&#xff0c;开发者在对接时要注意哪些方面&#xff1f; AdSet官网 | 聚合SDK广告变现平台-上海神蓍信息科技有限公司 一、优量汇优势&#xff1a; &#xff08;1&#xff09;快速变现&#xff0c;节省对接时间 1、快速变现&#xff0c;节省接…