软件世界网 购物 网址 三丰软件 | 小说 美女秀 图库大全 游戏 笑话 | 下载 开发知识库 新闻 开发 图片素材
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
移动开发 架构设计 编程语言 Web前端 互联网
开发杂谈 系统运维 研发管理 数据库 云计算 Android开发资料
  软件世界网 -> 云计算 -> MapReduce实现二阶矩阵相乘 -> 正文阅读

[云计算]MapReduce实现二阶矩阵相乘


二阶矩阵相乘公式
[img]http://img.blog.csdn.net/20160323005658613

上例中的C11=A11*B11+A12*B21+A13*B31=1*3+0*2+2*1=5、C12=A11*B12+A12*B22+A13*B32=1*1+0*1+2*0=1


分析
 因为分布式计算的特点,需要找到相互独立的计算过程,以便能够在不同的节点上进行计算而不会彼此影响。根据矩
阵乘法的公式,C中各个元素的计算都是相互独立的,即各个cij在计算过程中彼此不影响。这样的话,在Map阶段可
以把计算所需要的元素都集中到同一个key中,然后,在Reduce阶段就可以从中解析出各个元素来计算cij。  另外,
以a11为例,它将会在c11、c12...c1p的计算中使用,以b11为例,它将会在c11、c21...cm1的计算中使用,也就是说,在Map阶段,当我们从HDFS取出一行记录时,如
果该记录是A的元素,则需要存储成p个<key, value>对,并且这p个key互不相同;如果该记录是B的元素,则需要存
储成m个<key, value>对,同样的,m个key也应互不相同;但同时,用于存放计算cij的ai1、ai2……ain和b1j、
b2j……bnj的<key, value>对的key应该都是相同的,这样才能被传递到同一个Reduce中。



设计
普遍有一个共识是:数据结构+算法=程序,所以在编写代码之前需要先理清数据存储结构和处理数据的算法。

Map阶段

在Map阶段,需要做的是进行数据准备。把来自矩阵A的元素aij,标识成p条<key, value>的形式,key="i,k",(其中
k=1,2,...,p),value="A,j,Aij";把来自矩阵B的元素bij,标识成m条<key, value>形式,key="k,j"(其中
k=1,2,...,m),value="B,i,Bij"。  经过处理,用于计算cij需要的a、b就转变为有相同key("i,j")的数据对,通过value
中"A"、"B"能区分元素是来自矩阵A还是矩阵B,以及具体的位置(在矩阵A的第几列,在矩阵B的第几行)。

Shuffle阶段
这个阶段是Hadoop自动完成的阶段,具有相同key的value被分到同一个list中,形成<key,list(value)>对,再传递给Reduce。

Reduce阶段 
在Reduce阶段,有两个问题需要解决:
a. 当前的<key, list(value)>对是为了计算矩阵C的哪个元素?因为map阶段对数据的处理,key(i,j)中的数据对,就
是其在矩阵C中的位置,第i行j列。

b. list中的每个value是来自矩阵A或矩阵B的哪个位置?这个也在map阶段进行了标记,对于value(x,y,z),只需要找
到y相同的来自不同矩阵(即x分别为A和B)的两个元素,取z相乘,然后加和即可。

矩阵的两种表示方式

矩阵常用的两种表示方式,第一种是原始的表示方式,第二种是稀疏矩阵(只存储非0的元素)的表示方式。
第一种:使用最原始的表示方式,相同行内不同列数据通过","分割,不同行通过换行分割; 

第二种:通过行列表示法,即文件中的每行数据有三个元素通过分隔符分割,第一个元素表示行,第二个元素表示
列,第三个元素表示数据。这种方式对于可以不列出为0的元素,即可以减少稀疏矩阵的数据量。 
[img]http://img.blog.csdn.net/20160323012708637



编写代码:
第一种数据结构
查看源数据:
[hadoop@master ~]$ hadoop fs -cat /user/hdfs/matrix/B
10,15
0,2
11,9
[hadoop@master ~]$ hadoop fs -cat /user/hdfs/matrix/A
1,2,3
4,5,0
7,8,9
10,11,12

MartrixMultiply:
package com.oner.mr.matrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.oner.mr.util.HdfsDAO;

public class MartrixMultiply {

	public static class MatrixMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		private String flag;// A or B;
		private int m = 4;// 矩阵A的行数
		private int p = 2;// 矩阵B的列数
		private int rowIndexA = 1; // 矩阵A,当前在第几行
		private int rowIndexB = 1; // 矩阵B,当前在第几行

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			FileSplit split = (FileSplit) context.getInputSplit();
			flag = split.getPath().getName();// // 得到读取的矩阵名称
		}

		@Override
		protected void map(LongWritable k1, Text v1, Context context)
				throws IOException, InterruptedException {
			// 切分每行数据
			String[] fields = MainRun.DELIMITER.split(v1.toString());

			if (flag.equals("A")) {// 如果读的是矩阵A,则fields格式为{1,2,3},数组长度为3
				for (int k = 1; k <= p; k++) {// p表示矩阵B的列数
					Text key = new Text(rowIndexA + "," + k);//
					for (int j = 0; j < fields.length; j++) {// j代表矩阵A的当前列,fields.length表示矩阵A的列数,等于矩阵B的行数
						Text value = new Text("A," + (j + 1) + "," + fields[j]);// v的值为
						context.write(key, value);// 输出的数据格式key为(i,k),value为(A,j,Aij)。
						System.out.println(key.toString() + "  "
								+ value.toString());
					}

				}
				rowIndexA++; // 每执行一次map方法,矩阵向下移动一行

			} else if (flag.equals("B")) {// 如果读的是B,fields的格式为{10,15},数组长度为2
				for (int k = 1; k <= m; k++) {// m表示矩阵A的行数
					for (int j = 0; j < fields.length; j++) {// fields.length表示矩阵B的列数
						Text key = new Text(k + "," + (j + 1));
						Text value = new Text("B:" + rowIndexB + ","
								+ fields[j]);
						context.write(key, value);// 输出的数据格式key为(k,j),value为(B,i,Bij)。
						System.out.println(key.toString() + "  "
								+ value.toString());
					}
				}
				rowIndexB++;// 每执行一次map方法,矩阵向下移动一行
			}
		}

	}

	public static class MatrixReducer extends
			Reducer<Text, Text, Text, LongWritable> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			Map<String, String> mapA = new HashMap<String, String>();
			Map<String, String> mapB = new HashMap<String, String>();

			System.out.print(key.toString() + ":");

			for (Text value : values) {
				String val = value.toString();
				System.out.print("(" + val + ")");

				if (val.startsWith("A")) {
					String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到A,j,Aij中的j,Aij
					mapA.put(kv[0], kv[1]);// 将j作为key,Aij作为value存入mapA

					// System.out.println("A:" + kv[0] + "," + kv[1]);

				} else if (val.startsWith("B")) {
					String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到B,j,Bij中的i,Bij
					mapB.put(kv[0], kv[1]);// 将i作为key,Bij作为value存入mapB

					// System.out.println("B:" + kv[0] + "," + kv[1]);
				}
			}

			long result = 0;
			Iterator<String> mkeys = mapA.keySet().iterator();// 得到mapA所有的键集合
			while (mkeys.hasNext()) {
				String mkey = mkeys.next();
				if (mapB.get(mkey) == null) {// 因为mkey取的是mapA的key集合,所以只需要判断mapB是否存在即可。
					continue;
				}
				result += Long.parseLong(mapA.get(mkey))
						* Long.parseLong(mapB.get(mkey));
			}
			context.write(key, new LongWritable(result));
			System.out.println();

			// System.out.println("C:" + key.toString() + "," + result);

		}
	}

	public static void run(Map<String, String> path) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();

		String input = path.get("input");
		String input1 = path.get("input1");
		String input2 = path.get("input2");
		String output = path.get("output");

		HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
		hdfs.rmr(input);
		hdfs.mkdirs(input);
		hdfs.copyFile(path.get("A"), input1);
		hdfs.copyFile(path.get("B"), input2);

		Job job = Job.getInstance(conf);
		job.setJarByClass(MainRun.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass(MatrixMapper.class);
		job.setReducerClass(MatrixReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
		FileOutputFormat.setOutputPath(job, new Path(output));

		job.waitForCompletion(true);
	}
}

MainRun:
package com.oner.mr.matrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

/*
 * 驱动程序
 */
public class MainRun {

	public static final String HDFS = "hdfs://master:9000";
	public static final Pattern DELIMITER = Pattern.compile("[\t,]");

	public static void main(String[] args) throws ClassNotFoundException,
			IOException, InterruptedException {
		martrixMultiply();
	}

	private static void martrixMultiply() throws ClassNotFoundException,
			IOException, InterruptedException {
		Map<String, String> path = new HashMap<String, String>();
		path.put("A", "/home/hadoop/logfile/matrix/A.csv");// 本地的数据文件
		path.put("B", "/home/hadoop/logfile/matrix/B.csv");
		path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目录
		path.put("input1", HDFS + "/user/hdfs/matrix/A");
		path.put("input2", HDFS + "/user/hdfs/matrix/B");
		path.put("output", HDFS + "/user/hdfs/matrix/output");

		MartrixMultiply.run(path);
	}

}

打成jar包后运行:hadoop jar matrix.jar com.oner.mr.matrix.MainRun
查看结果:
[hadoop@master ~]$ hadoop fs -cat /user/hdfs/matrix/output/part-r-00000
1,1	43
1,2	46
2,1	40
2,2	70
3,1	169
3,2	202
4,1	232
4,2	280
绘图演示结果:
[img]http://img.blog.csdn.net/20160328181328923



第二种数据结构
MainRun:
package com.oner.mr.sparsematrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

/*
 * 驱动程序
 */
public class MainRun {

	public static final String HDFS = "hdfs://master:9000";
	public static final Pattern DELIMITER = Pattern.compile("[\t,]");

	public static void main(String[] args) throws ClassNotFoundException,
			IOException, InterruptedException {
		sparseMartrixMultiply();
	}

	private static void sparseMartrixMultiply() throws ClassNotFoundException,
			IOException, InterruptedException {
		Map<String, String> path = new HashMap<String, String>();
		path.put("A", "/home/hadoop/logfile/matrix2/A.csv");// 本地的数据文件
		path.put("B", "/home/hadoop/logfile/matrix2/B.csv");
		path.put("input", HDFS + "/user/hdfs/matrix2");// HDFS的目录
		path.put("input1", HDFS + "/user/hdfs/matrix2/A");
		path.put("input2", HDFS + "/user/hdfs/matrix2/B");
		path.put("output", HDFS + "/user/hdfs/matrix2/output");

		MartrixMultiply.run(path);
	}

}

MartrixMultiply:
package com.oner.mr.sparsematrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.oner.mr.util.HdfsDAO;

public class MartrixMultiply {

	public static class SparseMatrixMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		private String flag;// A or B;
		private int m = 4;// 矩阵A的行数
		private int p = 2;// 矩阵B的列数

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			FileSplit split = (FileSplit) context.getInputSplit();
			flag = split.getPath().getName();// // 得到读取的矩阵名称
		}

		@Override
		protected void map(LongWritable k1, Text v1, Context context)
				throws IOException, InterruptedException {
			// 切分每行数据
			String[] fields = MainRun.DELIMITER.split(v1.toString());

			if ("A".equals(flag)) {
				for (int i = 1; i <= p; i++) {
					context.write(new Text(fields[0] + "," + i), new Text("A,"
							+ fields[1] + "," + fields[2]));
				}
			} else if ("B".equals(flag)) {
				for (int i = 1; i <= m; i++) {
					context.write(new Text(i + "," + fields[1]), new Text("B,"
							+ fields[0] + "," + fields[2]));
				}
			}

		}

	}

	public static class SparseMatrixReducer extends
			Reducer<Text, Text, Text, LongWritable> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			Map<String, String> mapA = new HashMap<String, String>();
			Map<String, String> mapB = new HashMap<String, String>();

			for (Text value : values) {
				String val = value.toString();

				if (val.startsWith("A")) {
					String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到A,j,Aij中的j,Aij
					mapA.put(kv[0], kv[1]);// 将j作为key,Aij作为value存入mapA

				} else if (val.startsWith("B")) {
					String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到B,j,Bij中的i,Bij
					mapB.put(kv[0], kv[1]);// 将i作为key,Bij作为value存入mapB

				}
			}

			long result = 0;
			// 可能在mapA中存在在mapB中不存在的key,或相反情况
			// 因为,数据定义的时候使用的是稀疏矩阵的定义
			// 所以,这种只存在于一个map中的key,说明其对应元素为0,不影响结果
			Iterator<String> mkeys = mapA.keySet().iterator();// 得到mapA所有的键集合
			while (mkeys.hasNext()) {
				String mkey = mkeys.next();
				if (mapB.get(mkey) == null) {// 因为mkey取的是mapA的key集合,所以只需要判断mapB是否存在即可。
					continue;
				}
				result += Long.parseLong(mapA.get(mkey))
						* Long.parseLong(mapB.get(mkey));
			}
			context.write(key, new LongWritable(result));

		}
	}

	public static void run(Map<String, String> path) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();

		String input = path.get("input");
		String input1 = path.get("input1");
		String input2 = path.get("input2");
		String output = path.get("output");

		HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
		hdfs.rmr(input);
		hdfs.mkdirs(input);
		hdfs.copyFile(path.get("A"), input1);
		hdfs.copyFile(path.get("B"), input2);

		Job job = Job.getInstance(conf);
		job.setJarByClass(MainRun.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass(SparseMatrixMapper.class);
		job.setReducerClass(SparseMatrixReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
		FileOutputFormat.setOutputPath(job, new Path(output));

		job.waitForCompletion(true);
	}
}

打成jar包运行:hadoop jar matrix

查看结果:
[hadoop@master matrix2]$ hadoop fs -cat /user/hdfs/matrix2/output/part-r-00000
1,1	43
1,2	46
2,1	40
2,2	70
3,1	169
3,2	202
4,1	232
4,2	280

......显示全文...
    点击查看全文


上一篇文章      下一篇文章      查看所有文章
2016-03-28 21:49:19  
云计算 最新文章
CentOS7上安装Zabbix(快速安装监控工具Zab
十分钟搭建NeuralStyle服务
solr入门之拼写纠错深入研究及代码Demo
3个netty5的例子,简单介绍netty的用法
RedhatOpenshift云平台注册使用
Akka框架——第一节:并发编程简介
Hadoop实战:Linux报tmp磁盘存储不足
linux安装thrift
感觉快更快规划计划高考韩国
solr相似匹配
360图书馆 软件开发资料 文字转语音 购物精选 软件下载 美食菜谱 新闻资讯 电影视频 小游戏 Chinese Culture 股票 租车
生肖星座 三丰软件 视频 开发 短信 中国文化 网文精选 搜图网 美图 阅读网 多播 租车 短信 看图 日历 万年历 2018年1日历
2018-1-17 17:16:31
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  软件世界网 --