软件世界网 购物 网址 三丰软件 | 小说 美女秀 图库大全 游戏 笑话 | 下载 开发知识库 新闻 开发 图片素材
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
移动开发 架构设计 编程语言 Web前端 互联网
开发杂谈 系统运维 研发管理 数据库 云计算 Android开发资料
  软件世界网 -> 云计算 -> MapReduce实现二阶矩阵相乘 -> 正文阅读
云计算 最新文章
CentOS7上安装Zabbix(快速安装监控工具Zab
十分钟搭建NeuralStyle服务
solr入门之拼写纠错深入研究及代码Demo
3个netty5的例子,简单介绍netty的用法
RedhatOpenshift云平台注册使用
Akka框架——第一节:并发编程简介
Hadoop实战:Linux报tmp磁盘存储不足
linux安装thrift
感觉快更快规划计划高考韩国
solr相似匹配

[云计算]MapReduce实现二阶矩阵相乘

  2016-03-28 21:49:21

二阶矩阵相乘公式


上例中的C11=A11*B11+A12*B21+A13*B31=1*3+0*2+2*1=5、C12=A11*B12+A12*B22+A13*B32=1*1+0*1+2*0=1


分析
 因为分布式计算的特点,需要找到相互独立的计算过程,以便能够在不同的节点上进行计算而不会彼此影响。根据矩
阵乘法的公式,C中各个元素的计算都是相互独立的,即各个cij在计算过程中彼此不影响。这样的话,在Map阶段可
以把计算所需要的元素都集中到同一个key中,然后,在Reduce阶段就可以从中解析出各个元素来计算cij。  另外,
以a11为例,它将会在c11、c12...c1p的计算中使用,以b11为例,它将会在c11、c21...cm1的计算中使用,也就是说,在Map阶段,当我们从HDFS取出一行记录时,如
果该记录是A的元素,则需要存储成p个<key, value>对,并且这p个key互不相同;如果该记录是B的元素,则需要存
储成m个<key, value>对,同样的,m个key也应互不相同;但同时,用于存放计算cij的ai1、ai2……ain和b1j、
b2j……bnj的<key, value>对的key应该都是相同的,这样才能被传递到同一个Reduce中。



设计
普遍有一个共识是:数据结构+算法=程序,所以在编写代码之前需要先理清数据存储结构和处理数据的算法。

Map阶段

在Map阶段,需要做的是进行数据准备。把来自矩阵A的元素aij,标识成p条<key, value>的形式,key="i,k",(其中
k=1,2,...,p),value="A,j,Aij";把来自矩阵B的元素bij,标识成m条<key, value>形式,key="k,j"(其中
k=1,2,...,m),value="B,i,Bij"。  经过处理,用于计算cij需要的a、b就转变为有相同key("i,j")的数据对,通过value
中"A"、"B"能区分元素是来自矩阵A还是矩阵B,以及具体的位置(在矩阵A的第几列,在矩阵B的第几行)。

Shuffle阶段
这个阶段是Hadoop自动完成的阶段,具有相同key的value被分到同一个list中,形成<key,list(value)>对,再传递给Reduce。

Reduce阶段 
在Reduce阶段,有两个问题需要解决:
a. 当前的<key, list(value)>对是为了计算矩阵C的哪个元素?因为map阶段对数据的处理,key(i,j)中的数据对,就
是其在矩阵C中的位置,第i行j列。

b. list中的每个value是来自矩阵A或矩阵B的哪个位置?这个也在map阶段进行了标记,对于value(x,y,z),只需要找
到y相同的来自不同矩阵(即x分别为A和B)的两个元素,取z相乘,然后加和即可。

矩阵的两种表示方式

矩阵常用的两种表示方式,第一种是原始的表示方式,第二种是稀疏矩阵(只存储非0的元素)的表示方式。
第一种:使用最原始的表示方式,相同行内不同列数据通过","分割,不同行通过换行分割; 

第二种:通过行列表示法,即文件中的每行数据有三个元素通过分隔符分割,第一个元素表示行,第二个元素表示
列,第三个元素表示数据。这种方式对于可以不列出为0的元素,即可以减少稀疏矩阵的数据量。 




编写代码:
第一种数据结构
查看源数据:
[hadoop@master ~]$ hadoop fs -cat /user/hdfs/matrix/B
10,15
0,2
11,9
[hadoop@master ~]$ hadoop fs -cat /user/hdfs/matrix/A
1,2,3
4,5,0
7,8,9
10,11,12

MartrixMultiply:
package com.oner.mr.matrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.oner.mr.util.HdfsDAO;

public class MartrixMultiply {

	public static class MatrixMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		private String flag;// A or B;
		private int m = 4;// 矩阵A的行数
		private int p = 2;// 矩阵B的列数
		private int rowIndexA = 1; // 矩阵A,当前在第几行
		private int rowIndexB = 1; // 矩阵B,当前在第几行

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			FileSplit split = (FileSplit) context.getInputSplit();
			flag = split.getPath().getName();// // 得到读取的矩阵名称
		}

		@Override
		protected void map(LongWritable k1, Text v1, Context context)
				throws IOException, InterruptedException {
			// 切分每行数据
			String[] fields = MainRun.DELIMITER.split(v1.toString());

			if (flag.equals("A")) {// 如果读的是矩阵A,则fields格式为{1,2,3},数组长度为3
				for (int k = 1; k <= p; k++) {// p表示矩阵B的列数
					Text key = new Text(rowIndexA + "," + k);//
					for (int j = 0; j < fields.length; j++) {// j代表矩阵A的当前列,fields.length表示矩阵A的列数,等于矩阵B的行数
						Text value = new Text("A," + (j + 1) + "," + fields[j]);// v的值为
						context.write(key, value);// 输出的数据格式key为(i,k),value为(A,j,Aij)。
						System.out.println(key.toString() + "  "
								+ value.toString());
					}

				}
				rowIndexA++; // 每执行一次map方法,矩阵向下移动一行

			} else if (flag.equals("B")) {// 如果读的是B,fields的格式为{10,15},数组长度为2
				for (int k = 1; k <= m; k++) {// m表示矩阵A的行数
					for (int j = 0; j < fields.length; j++) {// fields.length表示矩阵B的列数
						Text key = new Text(k + "," + (j + 1));
						Text value = new Text("B:" + rowIndexB + ","
								+ fields[j]);
						context.write(key, value);// 输出的数据格式key为(k,j),value为(B,i,Bij)。
						System.out.println(key.toString() + "  "
								+ value.toString());
					}
				}
				rowIndexB++;// 每执行一次map方法,矩阵向下移动一行
			}
		}

	}

	public static class MatrixReducer extends
			Reducer<Text, Text, Text, LongWritable> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			Map<String, String> mapA = new HashMap<String, String>();
			Map<String, String> mapB = new HashMap<String, String>();

			System.out.print(key.toString() + ":");

			for (Text value : values) {
				String val = value.toString();
				System.out.print("(" + val + ")");

				if (val.startsWith("A")) {
					String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到A,j,Aij中的j,Aij
					mapA.put(kv[0], kv[1]);// 将j作为key,Aij作为value存入mapA

					// System.out.println("A:" + kv[0] + "," + kv[1]);

				} else if (val.startsWith("B")) {
					String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到B,j,Bij中的i,Bij
					mapB.put(kv[0], kv[1]);// 将i作为key,Bij作为value存入mapB

					// System.out.println("B:" + kv[0] + "," + kv[1]);
				}
			}

			long result = 0;
			Iterator<String> mkeys = mapA.keySet().iterator();// 得到mapA所有的键集合
			while (mkeys.hasNext()) {
				String mkey = mkeys.next();
				if (mapB.get(mkey) == null) {// 因为mkey取的是mapA的key集合,所以只需要判断mapB是否存在即可。
					continue;
				}
				result += Long.parseLong(mapA.get(mkey))
						* Long.parseLong(mapB.get(mkey));
			}
			context.write(key, new LongWritable(result));
			System.out.println();

			// System.out.println("C:" + key.toString() + "," + result);

		}
	}

	public static void run(Map<String, String> path) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();

		String input = path.get("input");
		String input1 = path.get("input1");
		String input2 = path.get("input2");
		String output = path.get("output");

		HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
		hdfs.rmr(input);
		hdfs.mkdirs(input);
		hdfs.copyFile(path.get("A"), input1);
		hdfs.copyFile(path.get("B"), input2);

		Job job = Job.getInstance(conf);
		job.setJarByClass(MainRun.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass(MatrixMapper.class);
		job.setReducerClass(MatrixReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
		FileOutputFormat.setOutputPath(job, new Path(output));

		job.waitForCompletion(true);
	}
}

MainRun:
package com.oner.mr.matrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

/*
 * 驱动程序
 */
public class MainRun {

	public static final String HDFS = "hdfs://master:9000";
	public static final Pattern DELIMITER = Pattern.compile("[\t,]");

	public static void main(String[] args) throws ClassNotFoundException,
			IOException, InterruptedException {
		martrixMultiply();
	}

	private static void martrixMultiply() throws ClassNotFoundException,
			IOException, InterruptedException {
		Map<String, String> path = new HashMap<String, String>();
		path.put("A", "/home/hadoop/logfile/matrix/A.csv");// 本地的数据文件
		path.put("B", "/home/hadoop/logfile/matrix/B.csv");
		path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目录
		path.put("input1", HDFS + "/user/hdfs/matrix/A");
		path.put("input2", HDFS + "/user/hdfs/matrix/B");
		path.put("output", HDFS + "/user/hdfs/matrix/output");

		MartrixMultiply.run(path);
	}

}

打成jar包后运行:hadoop jar matrix.jar com.oner.mr.matrix.MainRun
查看结果:
[hadoop@master ~]$ hadoop fs -cat /user/hdfs/matrix/output/part-r-00000
1,1	43
1,2	46
2,1	40
2,2	70
3,1	169
3,2	202
4,1	232
4,2	280
绘图演示结果:




第二种数据结构
MainRun:
package com.oner.mr.sparsematrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

/*
 * 驱动程序
 */
public class MainRun {

	public static final String HDFS = "hdfs://master:9000";
	public static final Pattern DELIMITER = Pattern.compile("[\t,]");

	public static void main(String[] args) throws ClassNotFoundException,
			IOException, InterruptedException {
		sparseMartrixMultiply();
	}

	private static void sparseMartrixMultiply() throws ClassNotFoundException,
			IOException, InterruptedException {
		Map<String, String> path = new HashMap<String, String>();
		path.put("A", "/home/hadoop/logfile/matrix2/A.csv");// 本地的数据文件
		path.put("B", "/home/hadoop/logfile/matrix2/B.csv");
		path.put("input", HDFS + "/user/hdfs/matrix2");// HDFS的目录
		path.put("input1", HDFS + "/user/hdfs/matrix2/A");
		path.put("input2", HDFS + "/user/hdfs/matrix2/B");
		path.put("output", HDFS + "/user/hdfs/matrix2/output");

		MartrixMultiply.run(path);
	}

}

MartrixMultiply:
package com.oner.mr.sparsematrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.oner.mr.util.HdfsDAO;

public class MartrixMultiply {

	public static class SparseMatrixMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		private String flag;// A or B;
		private int m = 4;// 矩阵A的行数
		private int p = 2;// 矩阵B的列数

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			FileSplit split = (FileSplit) context.getInputSplit();
			flag = split.getPath().getName();// // 得到读取的矩阵名称
		}

		@Override
		protected void map(LongWritable k1, Text v1, Context context)
				throws IOException, InterruptedException {
			// 切分每行数据
			String[] fields = MainRun.DELIMITER.split(v1.toString());

			if ("A".equals(flag)) {
				for (int i = 1; i <= p; i++) {
					context.write(new Text(fields[0] + "," + i), new Text("A,"
							+ fields[1] + "," + fields[2]));
				}
			} else if ("B".equals(flag)) {
				for (int i = 1; i <= m; i++) {
					context.write(new Text(i + "," + fields[1]), new Text("B,"
							+ fields[0] + "," + fields[2]));
				}
			}

		}

	}

	public static class SparseMatrixReducer extends
			Reducer<Text, Text, Text, LongWritable> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			Map<String, String> mapA = new HashMap<String, String>();
			Map<String, String> mapB = new HashMap<String, String>();

			for (Text value : values) {
				String val = value.toString();

				if (val.startsWith("A")) {
					String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到A,j,Aij中的j,Aij
					mapA.put(kv[0], kv[1]);// 将j作为key,Aij作为value存入mapA

				} else if (val.startsWith("B")) {
					String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到B,j,Bij中的i,Bij
					mapB.put(kv[0], kv[1]);// 将i作为key,Bij作为value存入mapB

				}
			}

			long result = 0;
			// 可能在mapA中存在在mapB中不存在的key,或相反情况
			// 因为,数据定义的时候使用的是稀疏矩阵的定义
			// 所以,这种只存在于一个map中的key,说明其对应元素为0,不影响结果
			Iterator<String> mkeys = mapA.keySet().iterator();// 得到mapA所有的键集合
			while (mkeys.hasNext()) {
				String mkey = mkeys.next();
				if (mapB.get(mkey) == null) {// 因为mkey取的是mapA的key集合,所以只需要判断mapB是否存在即可。
					continue;
				}
				result += Long.parseLong(mapA.get(mkey))
						* Long.parseLong(mapB.get(mkey));
			}
			context.write(key, new LongWritable(result));

		}
	}

	public static void run(Map<String, String> path) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();

		String input = path.get("input");
		String input1 = path.get("input1");
		String input2 = path.get("input2");
		String output = path.get("output");

		HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
		hdfs.rmr(input);
		hdfs.mkdirs(input);
		hdfs.copyFile(path.get("A"), input1);
		hdfs.copyFile(path.get("B"), input2);

		Job job = Job.getInstance(conf);
		job.setJarByClass(MainRun.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass(SparseMatrixMapper.class);
		job.setReducerClass(SparseMatrixReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
		FileOutputFormat.setOutputPath(job, new Path(output));

		job.waitForCompletion(true);
	}
}

打成jar包运行:hadoop jar matrix

查看结果:
[hadoop@master matrix2]$ hadoop fs -cat /user/hdfs/matrix2/output/part-r-00000
1,1	43
1,2	46
2,1	40
2,2	70
3,1	169
3,2	202
4,1	232
4,2	280

上一篇文章      下一篇文章      查看所有文章
2016-03-28 21:49:19  
360图书馆 论文大全 母婴/育儿 软件开发资料 网页快照 文字转语音 购物精选 软件 美食菜谱 新闻中心 电影下载 小游戏 Chinese Culture
生肖星座解梦 三沣玩客 拍拍 视频 开发 Android开发 站长 古典小说 网文精选 搜图网 天下美图 中国文化英文 多播视频 装修知识库
2017-1-23 14:34:13
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  软件世界网 --