MapReduce 编程实践


MapReduce 采用了「分而治之」的思想。在分布式计算中,MapReduce 框架负责处理并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map,把一个任务分解成多个任务;reduce,将任务的处理结果汇总。MapReduce 处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。

在 Hadoop 中,每个 MapReduce 任务都被初始化为一个 Job,每个 Job 又可以分为两种阶段:map 阶段和 reduce 阶段,分别用两个函数表示。map 函数接收一个 <key, value> 形式的输入,同样产生一个 <key, value> 形式的输出;Hadoop 函数接收一个如 <key, List> 形式的输入,然后对 list 进行处理,每个 reduce 产生 0 或 1 个输出,reduce 的输出也是 <key, value> 形式的。

数据去重

编写测试文件并上传

cat

put

上传 jar 包,执行程序

jar

查看输出文件

cat

数据排序

编写测试文件并上传

cat

put

上传 jar 包,执行程序

jar

查看输出文件

cat

单表关联

编写测试文件并上传

cat

上传 jar 包,执行程序

jar

查看输出文件

cat

参考代码

数据去重

public class RemoveDeduplication {

    /**
     * map 将输入中的 value 复制到输出数据的 key 上,并直接输出
     */
    public static class Map extends Mapper<Object, Text, Text, Text> {
        // 每行数据
        private static Text line = new Text();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            line = value;
            context.write(line, new Text(""));
        }

    }

    /**
     * reduce 将输入中的 key 复制到输出数据的 key 上,并直接输出
     */
    public static class Reduce extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(key, new Text(""));
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // 这句话很关键
        // conf.set("mapred.job.tracker", "192.168.1.2:9001");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: Data Deduplication <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "Data Deduplication");
        job.setJarByClass(RemoveDeduplication.class);

        // 设置 Map、Combine 和 Reduce 处理类
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);

        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

数据排序

public class DataSort {

    /**
     * map 将输入中的 value 化成 IntWritable 类型,作为输出的 key
     */
    public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
        private static IntWritable data = new IntWritable();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
        }

    }

    /**
     * reduce 将输入中的 key 复制到输出数据的 key 上,然后根据输入的 value-list 中元素的个数决定 key 的输出次数。
     * 用全局 lineNum 来代表 key 的位次
     */
    public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        private static IntWritable lineNum = new IntWritable(1);

        @Override
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            for (IntWritable val : values) {
                context.write(lineNum, key);
                lineNum = new IntWritable(lineNum.get() + 1);
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // 这句话很关键
        // conf.set("mapred.job.tracker", "192.168.1.2:9001");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: Data Sort <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "Data Sort");
        job.setJarByClass(DataSort.class);

        // 设置 Map 和 Reduce 处理类
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // 设置输出类型
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

单表关联

public class SimpleTableJoin {
    public static int time = 0;

    /**
     * map 将输出分割 child 和 parent,然后正序输出一次作为右表,
     * 反序输出一次作为左表,需要注意的是在输出的 value 中必须加上左右表的区别标识。
     */
    public static class Map extends Mapper<Object, Text, Text, Text> {

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String childName;
            String parentName;
            // 左右表标识
            String relationType;

            // 输入的一行预处理文本
            StringTokenizer itr = new StringTokenizer(value.toString());
            String[] values = new String[2];
            if (itr.hasMoreTokens()) {
                values[0] = itr.nextToken();
            }
            if (itr.hasMoreTokens()) {
                values[1] = itr.nextToken();
            }

            if (values[0].compareTo("child") != 0) {
                childName = values[0];
                parentName = values[1];
                // 输出左表
                relationType = "1";
                context.write(new Text(values[1]), new Text(relationType + "+" + childName + "+" + parentName));
                // 输出右表
                relationType = "2";
                context.write(new Text(values[0]), new Text(relationType + "+" + childName + "+" + parentName));
            }
        }

    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 输出表头
            if (0 == time) {
                context.write(new Text("grandchild"), new Text("grandparent"));
                time++;
            }
            int grandchildnum = 0;
            String[] grandchild = new String[10];
            int grandparentnum = 0;
            String[] grandparent = new String[10];

            for (Text value : values) {
                String record = value.toString();
                int len = record.length();
                int i = 2;
                if (0 == len) {
                    continue;
                }
                // 取得左右表标识
                char relationType = record.charAt(0);
                // 定义孩子和父母变量
                StringBuilder childName = new StringBuilder();
                StringBuilder parentName = new StringBuilder();
                // 获取 value-list 中 value 的 child
                while (record.charAt(i) != '+') {
                    childName.append(record.charAt(i));
                    i++;
                }
                i = i + 1;

                // 获取 value-list 中 value 的 parent
                while (i < len) {
                    parentName.append(record.charAt(i));
                    i++;
                }
                // 左表,取出 child 放入 grandchildren
                if ('1' == relationType) {
                    grandchild[grandchildnum] = childName.toString();
                    grandchildnum++;
                }
                // 右表,取出 parent 放入 grandparent
                if ('2' == relationType) {
                    grandparent[grandparentnum] = parentName.toString();
                    grandparentnum++;
                }
            }

            // grandchild 和 grandparent 数组求笛卡尔儿积
            if (0 != grandchildnum && 0 != grandparentnum) {
                for (int m = 0; m < grandchildnum; m++) {
                    for (int n = 0; n < grandparentnum; n++) {
                        // 输出结果
                        context.write(new Text(grandchild[m]), new Text(grandparent[n]));
                    }
                }
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // 这句话很关键
        // conf.set("mapred.job.tracker", "192.168.1.2:9001");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: Single Table Join <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "Single Table Join");
        job.setJarByClass(SimpleTableJoin.class);

        // 设置 Map 和 Reduce 处理类
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

参考链接

CentOS 系统下的 Hadoop 集群(第 9 期)_MapReduce 初级案例_wang_zhenwei 的博客 -CSDN 博客


文章作者: sleepingraven
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 sleepingraven !
评论
  目录