新聞中心
這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Hadoop2.6.0學(xué)習(xí)筆記(五)自定義InputFormat和RecordReader
魯春利的工作筆記,誰說程序員不能有文藝范?
創(chuàng)新互聯(lián)專注于二道企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,商城開發(fā)。二道網(wǎng)站建設(shè)公司,為二道等地區(qū)提供建站服務(wù)。全流程定制網(wǎng)站制作,專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
TextInputFormat提供了對文本文件的處理方式,通過InputSplit進行分片(FileSplit),每一個分片分別new一個LineRecordReader進行讀取解析,解析得到的每一行以
應(yīng)用示例:隨機生成100個小數(shù)并求最大值。
MapReduce自帶的輸入類型都是基于HDFS的,本示例不從HDFS讀取數(shù)據(jù),而是從內(nèi)存中生成100個小數(shù),然后求最大值。
自定義InputFormat
package com.lucl.hadoop.mapreduce.rand; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * @author luchunli * @description 自定義InputFormat */ public class RandomInputFormat extends InputFormat{ public static float [] floatValues = null; /** 自定義分片規(guī)則 **/ @Override public List getSplits(JobContext context) throws IOException, InterruptedException { // 初始化數(shù)組的長度 int NumOfValues = context.getConfiguration().getInt("lucl.random.nums", 100); floatValues = new float[NumOfValues]; Random random = new Random (); for (int i = 0; i < NumOfValues; i++) { floatValues[i] = random.nextFloat(); } System.out.println("生成的隨機數(shù)的值如下:"); for (float f : floatValues) { System.out.println(f); } System.out.println("===================="); // 如下代碼表示指定兩個map task來處理這100個小數(shù),每個map task處理50個小數(shù) // 初始化split分片數(shù)目,split分片的數(shù)量等于map任務(wù)的數(shù)量,但是也可以通過配置參數(shù)mapred.map.tasks來指定 // 如果該參數(shù)和splits的切片數(shù)不一致時,map task的數(shù)目如何確定,后續(xù)再通過代碼分析 int NumSplits = context.getConfiguration().getInt("mapreduce.job.maps", 2); int begin = 0; // Math.floor是為了下取整,這里是100剛好整除,如果是99的話Math.floor的值是49.0 // 50 int length = (int)Math.floor(NumOfValues / NumSplits); // end = 49,第一個split的范圍就是0~49 int end = length - 1; // 默認的FileInputFormat類的getSplits方法中是通過文件數(shù)目和blocksize進行分的, // 文件超過一個塊會分成多個split,否則一個文件一個split分片 List splits = new ArrayList (); for (int i = 0; i < NumSplits - 1; i++) { // 2個splits分片,分別為0和1 RandomInputSplit split = new RandomInputSplit(begin, end); splits.add(split); // begin是上一個split切片的下一個值 begin = end + 1; // 50 // 切片的長度不變,結(jié)束位置為起始位置+分片的長度,而數(shù)組下標是從0開始的, // 因此結(jié)束位置應(yīng)該是begin加長度-1 end = begin + (length - 1); // 50 + (50 -1) = 99 } RandomInputSplit split = new RandomInputSplit(begin, end); splits.add(split); /** * * 通過默認的TextInputFormat來實現(xiàn)的時候,如果有兩個小文件,則splits=2,參見: * http://luchunli.blog.51cto.com/2368057/1676185 **/ return splits; } @Override public RecordReadercreateRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new RandomRecordReader(); } }
自定義InputSplit
package com.lucl.hadoop.mapreduce.rand; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; /** * @author luchunli * @description * 自定義InputSplit,參照了{@link org.apache.hadoop.mapreduce.lib.input.Filesplit} **/ public class RandomInputSplit extends InputSplit implements Writable { private int start; private int end; private ArrayWritable floatArray = new ArrayWritable(FloatWritable.class); public RandomInputSplit () {} /** * Constructs a split * * @param start * @param end * */ public RandomInputSplit (int start, int end) { this.start = start; this.end = end; int len = this.end - this.start + 1; int index = start; FloatWritable [] result = new FloatWritable[len]; for (int i = 0; i < len; i++) { float f = RandomInputFormat.floatValues[index]; FloatWritable fw = new FloatWritable(f); result[i] = fw; index++; } floatArray.set(result); // System.out.println("查看分片數(shù)據(jù):"); // for (FloatWritable fw : (FloatWritable[])floatArray.toArray()) { // System.out.println(fw.get()); // } // System.out.println("====================="); } @Override public long getLength() throws IOException, InterruptedException { return this.end - this.start; } @Override public String[] getLocations() throws IOException, InterruptedException { return new String[]{"dnode1", "dnode2"}; } @Override public void readFields(DataInput in) throws IOException { this.start = in.readInt(); this.end = in.readInt(); this.floatArray.readFields(in); } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.getStart()); out.writeInt(this.getEnd()); this.floatArray.write(out); } public int getStart() { return start; } public void setStart(int start) { this.start = start; } public int getEnd() { return end; } public void setEnd(int end) { this.end = end; } public ArrayWritable getFloatArray() { return floatArray; } @Override public String toString() { return this.getStart() + "-" + this.getEnd(); } }
* FileSplit是針對HDFS上文件的實現(xiàn),因此其屬性包括文件絕對路徑(Path)、分片起始位置(start)、 * 分片長度(length)、副本信息(保存Block復(fù)本數(shù)據(jù)到的主機數(shù)組)。 *
* 自定義的InputSplit是針對內(nèi)存中的數(shù)組數(shù)據(jù)進行的處理,因此無需記錄文件路徑及副本信息,只需要記錄對數(shù)組分片的起始位置、分片長度即可。 *
自定義RecordReader
package com.lucl.hadoop.mapreduce.rand; import java.io.IOException; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * @author luchunli * @description 自定義RecordReader * */ public class RandomRecordReader extends RecordReader{ private int start; private int end; private int index; private IntWritable key = null; private ArrayWritable value = null; private RandomInputSplit rsplit = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.rsplit = (RandomInputSplit)split; this.start = this.rsplit.getStart(); this.end = this.rsplit.getEnd(); this.index = this.start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (null == key) { key = new IntWritable(); } if (null == value) { value = new ArrayWritable(FloatWritable.class); } if (this.index <= this.end) { key.set(this.index); value = rsplit.getFloatArray(); index = end + 1; return true; } return false; } @Override public IntWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public ArrayWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { if (this.index == this.end) { return 0F; } return Math.min(1.0F, (this.index - this.start) / (float)(this.end - this.start)); } @Override public void close() throws IOException { // ...... } }
實現(xiàn)Mapper
package com.lucl.hadoop.mapreduce.rand; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; /** * @author luchunli * @description Mapper */ public class RandomMapper extends Mapper{ private static final IntWritable one = new IntWritable(1); @Override protected void setup(Context context) throws IOException, InterruptedException { // 為了查看當前map是在那臺機器上執(zhí)行的,在該機器上創(chuàng)建個隨機文件, // 執(zhí)行完成后到DN節(jié)點對應(yīng)目錄下查看即可 SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss"); File file = new File("/home/hadoop", "Mapper-" + format.format(new Date())); if (!file.exists()) { file.createNewFile(); } } @Override protected void map(IntWritable key, ArrayWritable value, Context context) throws IOException, InterruptedException { FloatWritable [] floatArray = (FloatWritable[])value.toArray(); float maxValue = floatArray[0].get(); float tmp = 0; for (int i = 1; i < floatArray.length; i++) { tmp = floatArray[i].get(); if (tmp > maxValue) { maxValue = tmp; } } // 這里必須要保證多個map輸出的key是一樣的,否則reduce處理時會認為是不同的數(shù)據(jù), // shuffle會分成多個組,導(dǎo)致每個map task算出一個最大值 context.write(one, new FloatWritable(maxValue)); } }
實現(xiàn)Reducer
package com.lucl.hadoop.mapreduce.rand; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author luchunli * @description Rducer */ public class RandomReducer extends Reducer{ @Override protected void setup(Context context) throws IOException, InterruptedException { SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss"); // 為了查看當前reduce是在那臺機器上執(zhí)行的,在該機器上創(chuàng)建個隨機文件 File file = new File("/home/hadoop", "Reducer-" + format.format(new Date())); if (!file.exists()) { file.createNewFile(); } } @Override protected void reduce(IntWritable key, Iterable value, Context context) throws IOException, InterruptedException { Iterator it = value.iterator(); float maxValue = 0; float tmp = 0; if (it.hasNext()) { maxValue = it.next().get(); } else { context.write(new Text("The max value is : "), new FloatWritable(maxValue)); return; } while (it.hasNext()) { tmp = it.next().get(); if (tmp > maxValue) { maxValue = tmp; } } context.write(new Text("The max value is : "), new FloatWritable(maxValue)); } }
定義驅(qū)動類
package com.lucl.hadoop.mapreduce.rand; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; /** * @author luchunli * @description MapReduce自帶的輸入類都是基于HDFS的,如下示例代碼不用從HDFS上面讀取內(nèi)容, * 而是在內(nèi)存里面隨機生成100個(0-1)float類型的小數(shù),然后求這100個小數(shù)的最大值。 */ public class RandomDriver extends Configured implements Tool { public static void main(String[] args) { try { ToolRunner.run(new RandomDriver(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); conf.set("lucl.random.nums", "100"); conf.set("mapreduce.job.maps", "2"); Job job = Job.getInstance(getConf(), this.getClass().getSimpleName()); job.setJarByClass(RandomDriver.class); job.setInputFormatClass(RandomInputFormat.class); job.setMapperClass(RandomMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(FloatWritable.class); job.setReducerClass(RandomReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FloatWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); return job.waitForCompletion(true) ? 0 : 1; } }
打包運行
[hadoop@nnode code]$ hadoop jar RandomMR.jar /201512020027 15/12/02 00:28:07 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 生成的隨機數(shù)的值如下: 0.020075738 0.700349 0.9617876 0.8286018 0.03357637 0.55033255 0.112645924 0.43312508 0.33184355 0.6960902 0.23912054 0.8523424 0.4133852 0.028242588 0.9031814 0.39397871 0.38278967 0.5842654 0.4569224 0.4008881 0.2230537 0.92889327 0.20127994 0.09574646 0.23173904 0.4365906 0.11567855 0.027944028 0.6965957 0.78311944 0.2365641 0.8575301 0.07472658 0.5219022 0.9409952 0.7122519 0.8722465 0.30288923 0.51773626 0.91211754 0.93172425 0.38484365 0.44844115 0.24589789 0.83361626 0.40983224 0.9444963 0.12061542 0.8446641 0.5303581 0.11295539 0.094003916 0.11822218 0.4997149 0.98296344 0.48746037 0.31420535 0.1151396 0.7904337 0.80005115 0.18344402 0.8171619 0.8749699 0.48023254 0.0044505 0.43879867 0.22367835 0.62924916 0.6998315 0.222148 0.7392884 0.4174865 0.4528237 0.70034826 0.3057149 0.29177833 0.22782367 0.8182611 0.46680295 0.4778521 0.6365823 0.43971914 0.27055055 0.26839674 0.5263245 0.8824649 0.51182485 0.20494783 0.7679403 0.31936407 0.13476872 0.47281688 0.3402111 0.28706527 0.038203478 0.7351879 0.6165404 0.41761196 0.5229257 0.7284225 ==================== 15/12/02 00:28:08 INFO mapreduce.JobSubmitter: number of splits:2 15/12/02 00:28:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448981819300_0014 15/12/02 00:28:09 INFO impl.YarnClientImpl: Submitted application application_1448981819300_0014 15/12/02 00:28:09 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448981819300_0014/ 15/12/02 00:28:09 INFO mapreduce.Job: Running job: job_1448981819300_0014 15/12/02 00:28:38 INFO mapreduce.Job: Job job_1448981819300_0014 running in uber mode : false 15/12/02 00:28:38 INFO mapreduce.Job: map 0% reduce 0% 15/12/02 00:29:13 INFO mapreduce.Job: map 100% reduce 0% 15/12/02 00:29:32 INFO mapreduce.Job: map 100% reduce 100% 15/12/02 00:29:32 INFO mapreduce.Job: Job job_1448981819300_0014 completed successfully 15/12/02 00:29:32 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=26 FILE: Number of bytes written=323256 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=520 HDFS: Number of bytes written=31 HDFS: Number of read operations=7 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=64430 Total time spent by all reduces in occupied slots (ms)=16195 Total time spent by all map tasks (ms)=64430 Total time spent by all reduce tasks (ms)=16195 Total vcore-seconds taken by all map tasks=64430 Total vcore-seconds taken by all reduce tasks=16195 Total megabyte-seconds taken by all map tasks=65976320 Total megabyte-seconds taken by all reduce tasks=16583680 Map-Reduce Framework Map input records=2 Map output records=2 Map output bytes=16 Map output materialized bytes=32 Input split bytes=520 Combine input records=0 Combine output records=0 Reduce input groups=1 Reduce shuffle bytes=32 Reduce input records=2 Reduce output records=1 Spilled Records=4 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=356 CPU time spent (ms)=1940 Physical memory (bytes) snapshot=513851392 Virtual memory (bytes) snapshot=2541506560 Total committed heap usage (bytes)=257171456 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=0 File Output Format Counters Bytes Written=31 [hadoop@nnode code]$
查看輸出結(jié)果
[hadoop@nnode code]$ hdfs dfs -ls /201512020027 Found 2 items -rw-r--r-- 2 hadoop hadoop 0 2015-12-02 00:29 /201512020027/_SUCCESS -rw-r--r-- 2 hadoop hadoop 31 2015-12-02 00:29 /201512020027/part-r-00000 [hadoop@nnode code]$ hdfs dfs -text /201512020027/part-r-00000 The max value is : 0.98296344 [hadoop@nnode code]$
網(wǎng)站名稱:Hadoop2.6.0學(xué)習(xí)筆記(五)自定義InputFormat和RecordReader
文章網(wǎng)址:http://fisionsoft.com.cn/article/jdohdj.html