# 使用MapReduce实现join操作

2016-12-30 09:55:06来源:oschina作者:小萝卜_人点击

用户数据文件user.txt，列：userid、name：

1LiXiaolong
2JetLi
3Zhangsan
4Lisi
5Wangwu

12015-06-07 15:10:18192.168.137.101
32015-06-07 15:12:18192.168.137.102
32015-06-07 15:18:36192.168.137.102
12015-06-07 15:22:38192.168.137.101
12015-06-07 15:26:11192.168.137.103

期望计算结果：

1LiXiaolong2015-06-07 15:10:18192.168.137.101
1LiXiaolong2015-06-07 15:22:38192.168.137.101
1LiXiaolong2015-06-07 15:26:11192.168.137.103
3Zhangsan2015-06-07 15:12:18192.168.137.102
3Zhangsan2015-06-07 15:18:36192.168.137.102

计算思路：

2) 在reduce阶段将来自不同表的数据区分开，然后做笛卡尔乘积，输出结果；

实现代码：

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class ReduceJoinDemo {
public static final String DELIMITER = "/t"; // 字段分隔符
static class MyMappper extends Mapper {
@Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
String filePath = split.getPath().toString();
// 获取记录字符串
String line = value.toString();
// 抛弃空记录
if (line == null || line.trim().equals("")) return;
String[] values = line.split(DELIMITER);
// 处理user.txt数据
if (filePath.contains("users.txt")) {
if (values.length < 2) return;
context.write(new Text(values[0]), new Text("u#" + values[1]));
}
if (values.length < 3) return;
context.write(new Text(values[0]), new Text("l#" + values[1] + DELIMITER + values[2]));
}
}
}
static class MyReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values,
Reducer.Context context)
throws IOException, InterruptedException {
String val = tval.toString();
if(val.startsWith("u#")) {
} else if(val.startsWith("l#")) {
}
} for (String u : linkU) {
for (String l : linkL) {
context.write(key, new Text(u + DELIMITER + l));
}
}
}
}
private final static String FILE_IN_PATH = "hdfs://cluster1/join/in";
private final static String FILE_OUT_PATH = "hdfs://cluster1/join/out";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = getHAContiguration();
// 删除已存在的输出目录
FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf);
if (fileSystem.exists(new Path(FILE_OUT_PATH))) {
fileSystem.delete(new Path(FILE_OUT_PATH), true);
}
Job job = Job.getInstance(conf, "Reduce Join Demo");
job.setMapperClass(MyMappper.class);
job.setJarByClass(ReduceJoinDemo.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
job.waitForCompletion(true);
}
private static Configuration getHAContiguration() {
Configuration conf = new Configuration();
conf.setStrings("dfs.nameservices", "cluster1");
// 必须配置，可以通过该类获取当前处于active状态的namenode
return conf;
}
}

当join的两个表中有一个表数据量不大，可以轻松加载到各节点内存中时，可以使用DistributedCache将小表的数据加载到分布式缓存，然后MapReduce框架会缓存数据分发到需要执行map任务的节点上，在map节点上直接调用本地的缓存文件参与计算。在Map端完成join操作，可以降低网络传输到Reduce端的数据流量，有利于提高整个作业的执行效率。

计算思路：

实现代码：

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Scanner;
import java.util.StringTokenizer;
import org.apache.commons.collections.map.HashedMap;
public class DistributedCacheDemo {
public static final String DELIMITER = "/t"; // 字段分隔符
static class MyMappper extends Mapper {
private Map userMaps = new HashedMap();

@Override
protected void setup(Mapper.Context context) throws IOException ,InterruptedException {
//可以通过localCacheFiles获取本地缓存文件的路径
//Configuration conf = context.getConfiguration();
//Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(conf);

//此处使用快捷方式users.txt访问
String line;
while((line = br.readLine()) != null) {
//map端加载缓存数据
String[] splits = line.split(DELIMITER);
if(splits.length < 2) continue;
userMaps.put(splits[0], splits[1]);
}
};

@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
// 获取记录字符串
String line = value.toString();
// 抛弃空记录
if (line == null || line.trim().equals("")) return;

String[] values = line.split(DELIMITER);
if(values.length < 3) return;

String name = userMaps.get(values[0]);
Text t_key = new Text(values[0]);
Text t_value = new Text(name + DELIMITER + values[1] + DELIMITER + values[2]);
context.write(t_key, t_value);
}
}
private final static String FILE_IN_PATH = "hdfs://cluster1/join/in/login_logs.txt";
private final static String FILE_OUT_PATH = "hdfs://cluster1/join/out";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = getHAConfiguration();
// 删除已存在的输出目录
FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf);
if (fileSystem.exists(new Path(FILE_OUT_PATH))) {
fileSystem.delete(new Path(FILE_OUT_PATH), true);
}
//添加分布式缓存文件 可以在map或reduce中直接通过users.txt链接访问对应缓存文件

Job job = Job.getInstance(conf, "Map Distributed Cache Demo");
job.setMapperClass(MyMappper.class);
job.setJarByClass(DistributedCacheDemo.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
job.waitForCompletion(true);
}
private static Configuration getHAConfiguration() {
Configuration conf = new Configuration();
conf.setStrings("dfs.nameservices", "cluster1");
//必须配置，可以通过该类获取当前处于active状态的namenode
return conf;
}
}

使用HiveQL可以轻松完成该任务，只需使用表连接语句，hive会自动生成并优化mapreduce程序来执行查询操作。

实现步骤：

2) 创建users外部表：create external table users(userid int, name string) row format delimited fields terminated by '/t' location '/join/in/users';

通常情况下我们会使用hive来帮助我们完成join操作，map-join和reduce-join用于实现一些复杂的、特殊的需求。此外还有一种实现方式：SemiJoin，这是一种介于map-join和reduce-join之间的方法，就是在map端过滤掉一些数据，在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输，从而减少了shuffle的网络传输量，使整体效率得到提高。

执行效率：map-join>SemiJoin>reduce-join。