最近2018中文字幕在日韩欧美国产成人片_国产日韩精品一区二区在线_在线观看成年美女黄网色视频_国产精品一区三区五区_国产精彩刺激乱对白_看黄色黄大色黄片免费_人人超碰自拍cao_国产高清av在线_亚洲精品电影av_日韩美女尤物视频网站

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Hadoop2.6.0學(xué)習(xí)筆記(五)自定義InputFormat和RecordReader

魯春利的工作筆記,誰說程序員不能有文藝范?

創(chuàng)新互聯(lián)專注于二道企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,商城開發(fā)。二道網(wǎng)站建設(shè)公司,為二道等地區(qū)提供建站服務(wù)。全流程定制網(wǎng)站制作,專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)


 

TextInputFormat提供了對文本文件的處理方式,通過InputSplit進行分片(FileSplit),每一個分片分別new一個LineRecordReader進行讀取解析,解析得到的每一行以的形式傳給Mapper的map()函數(shù)。

應(yīng)用示例:隨機生成100個小數(shù)并求最大值。

MapReduce自帶的輸入類型都是基于HDFS的,本示例不從HDFS讀取數(shù)據(jù),而是從內(nèi)存中生成100個小數(shù),然后求最大值。

自定義InputFormat

package com.lucl.hadoop.mapreduce.rand;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 * @author luchunli
 * @description 自定義InputFormat
 */
public class RandomInputFormat extends InputFormat {

    public static float [] floatValues = null;
    
    /** 自定義分片規(guī)則 **/
    @Override
    public List getSplits(JobContext context) throws IOException,
            InterruptedException {
        // 初始化數(shù)組的長度
        int NumOfValues = context.getConfiguration().getInt("lucl.random.nums", 100);
        floatValues = new float[NumOfValues];
        
        Random random = new Random ();
        for (int i = 0; i < NumOfValues; i++) {
            floatValues[i] = random.nextFloat();
        }
        System.out.println("生成的隨機數(shù)的值如下:");
        for (float f : floatValues) {
            System.out.println(f);
        }
        System.out.println("====================");
        
        // 如下代碼表示指定兩個map task來處理這100個小數(shù),每個map task處理50個小數(shù)
        // 初始化split分片數(shù)目,split分片的數(shù)量等于map任務(wù)的數(shù)量,但是也可以通過配置參數(shù)mapred.map.tasks來指定
        // 如果該參數(shù)和splits的切片數(shù)不一致時,map task的數(shù)目如何確定,后續(xù)再通過代碼分析
        int NumSplits = context.getConfiguration().getInt("mapreduce.job.maps", 2);
        int begin = 0;
        // Math.floor是為了下取整,這里是100剛好整除,如果是99的話Math.floor的值是49.0
        // 50
        int length = (int)Math.floor(NumOfValues / NumSplits);    
        // end = 49,第一個split的范圍就是0~49
        int end = length - 1;    
        
        // 默認的FileInputFormat類的getSplits方法中是通過文件數(shù)目和blocksize進行分的,
        // 文件超過一個塊會分成多個split,否則一個文件一個split分片
        List splits = new ArrayList();
        for (int i = 0; i < NumSplits - 1; i++) {    // 2個splits分片,分別為0和1
            RandomInputSplit split = new RandomInputSplit(begin, end);
            splits.add(split);
            
            // begin是上一個split切片的下一個值
            begin = end + 1;        // 50
            // 切片的長度不變,結(jié)束位置為起始位置+分片的長度,而數(shù)組下標是從0開始的,
            // 因此結(jié)束位置應(yīng)該是begin加長度-1
            end = begin + (length - 1);    // 50 + (50 -1) = 99
        }
        RandomInputSplit split = new RandomInputSplit(begin, end);
        splits.add(split);
        
        /**
         * 
         *     通過默認的TextInputFormat來實現(xiàn)的時候,如果有兩個小文件,則splits=2,參見:
         *     http://luchunli.blog.51cto.com/2368057/1676185
         * 
         */                  return splits;     }     @Override     public RecordReader createRecordReader(InputSplit split,             TaskAttemptContext context) throws IOException, InterruptedException {         return new RandomRecordReader();     } }

自定義InputSplit

package com.lucl.hadoop.mapreduce.rand;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
/**
 * @author luchunli
 * @description 
 *     自定義InputSplit,參照了{@link org.apache.hadoop.mapreduce.lib.input.Filesplit}
 *  
 *     FileSplit是針對HDFS上文件的實現(xiàn),因此其屬性包括文件絕對路徑(Path)、分片起始位置(start)、  *     分片長度(length)、副本信息(保存Block復(fù)本數(shù)據(jù)到的主機數(shù)組)。  *     
 *  自定義的InputSplit是針對內(nèi)存中的數(shù)組數(shù)據(jù)進行的處理,因此無需記錄文件路徑及副本信息,只需要記錄對數(shù)組分片的起始位置、分片長度即可。  * 
 */ public class RandomInputSplit extends InputSplit implements Writable {     private int start;     private int end;     private ArrayWritable floatArray = new ArrayWritable(FloatWritable.class);          public RandomInputSplit () {}          /**      * Constructs a split       *       * @param start      * @param end       *      */     public RandomInputSplit (int start, int end) {         this.start = start;         this.end = end;                  int len = this.end - this.start + 1;         int index = start;         FloatWritable [] result = new FloatWritable[len];         for (int i = 0; i < len; i++) {             float f = RandomInputFormat.floatValues[index];             FloatWritable fw = new FloatWritable(f);                          result[i] = fw;                          index++;         }         floatArray.set(result);          //        System.out.println("查看分片數(shù)據(jù):"); //        for (FloatWritable fw : (FloatWritable[])floatArray.toArray()) { //            System.out.println(fw.get()); //        } //        System.out.println("=====================");     }     @Override     public long getLength() throws IOException, InterruptedException {         return this.end - this.start;     }     @Override     public String[] getLocations() throws IOException, InterruptedException {         return new String[]{"dnode1", "dnode2"};     }     @Override     public void readFields(DataInput in) throws IOException {         this.start = in.readInt();         this.end = in.readInt();         this.floatArray.readFields(in);             }     @Override     public void write(DataOutput out) throws IOException {         out.writeInt(this.getStart());         out.writeInt(this.getEnd());         this.floatArray.write(out);     }     public int getStart() {         return start;     }     public void setStart(int start) {         this.start = start;     }     public int getEnd() {         return end;     }     public void setEnd(int end) {         this.end = end;     }     public ArrayWritable getFloatArray() {         return floatArray;     }     @Override     public String toString() {         return this.getStart() + "-" + this.getEnd();     } }

自定義RecordReader

package com.lucl.hadoop.mapreduce.rand;

import java.io.IOException;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 * @author luchunli
 * @description 自定義RecordReader
 *
 */
public class RandomRecordReader extends RecordReader {
    private int start;
    private int end;
    private int index;

    private IntWritable key = null;
    private ArrayWritable value = null;
    private RandomInputSplit rsplit = null;
    
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.rsplit = (RandomInputSplit)split;
        this.start = this.rsplit.getStart();
        this.end = this.rsplit.getEnd();
        this.index = this.start;
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (null == key) {
            key = new IntWritable();
        }
        if (null == value) {
            value = new ArrayWritable(FloatWritable.class);
        }
        if (this.index <= this.end) {
            key.set(this.index);
            value = rsplit.getFloatArray();
            index = end + 1;
            return true;
        } 
        return false;
    }

    @Override
    public IntWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public ArrayWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        if (this.index == this.end) {
            return 0F;
        }
        return Math.min(1.0F, (this.index - this.start) / (float)(this.end - this.start));
    }

    @Override
    public void close() throws IOException {
        // ......
    }

}

實現(xiàn)Mapper

package com.lucl.hadoop.mapreduce.rand;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author luchunli
 * @description Mapper
 */
public class RandomMapper extends Mapper {
    private static final IntWritable one = new IntWritable(1);
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 為了查看當前map是在那臺機器上執(zhí)行的,在該機器上創(chuàng)建個隨機文件,
        // 執(zhí)行完成后到DN節(jié)點對應(yīng)目錄下查看即可
        SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss");
        File file = new File("/home/hadoop", "Mapper-" + format.format(new Date()));
        if (!file.exists()) {
            file.createNewFile();
        }
    }
    
    @Override
    protected void map(IntWritable key, ArrayWritable value, Context context)
            throws IOException, InterruptedException {
        FloatWritable [] floatArray = (FloatWritable[])value.toArray();
        float maxValue = floatArray[0].get();
        float tmp = 0;
        
        for (int i = 1; i < floatArray.length; i++) {
            tmp = floatArray[i].get();
            if (tmp > maxValue) {
                maxValue = tmp;
            }
        }
        
        // 這里必須要保證多個map輸出的key是一樣的,否則reduce處理時會認為是不同的數(shù)據(jù),
        // shuffle會分成多個組,導(dǎo)致每個map task算出一個最大值
        context.write(one, new FloatWritable(maxValue));
    }
}

實現(xiàn)Reducer

package com.lucl.hadoop.mapreduce.rand;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author luchunli
 * @description Rducer
 */
public class RandomReducer extends Reducer {
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss");
        // 為了查看當前reduce是在那臺機器上執(zhí)行的,在該機器上創(chuàng)建個隨機文件
        File file = new File("/home/hadoop", "Reducer-" + format.format(new Date()));
        if (!file.exists()) {
            file.createNewFile();
        }
    }
    
    @Override
    protected void reduce(IntWritable key, Iterable value, Context context)
            throws IOException, InterruptedException {
        Iterator it = value.iterator();
        float maxValue = 0;
        float tmp = 0;
        if (it.hasNext()) {
            maxValue = it.next().get();
        } else {
            context.write(new Text("The max value is : "), new FloatWritable(maxValue));
            return;
        }
        
        while (it.hasNext()) {
            tmp = it.next().get();
            if (tmp > maxValue) {
                maxValue = tmp;
            }
        }
        context.write(new Text("The max value is : "), new FloatWritable(maxValue));
    }
}

定義驅(qū)動類

package com.lucl.hadoop.mapreduce.rand;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

/**
 * @author luchunli
 * @description MapReduce自帶的輸入類都是基于HDFS的,如下示例代碼不用從HDFS上面讀取內(nèi)容,
 * 而是在內(nèi)存里面隨機生成100個(0-1)float類型的小數(shù),然后求這100個小數(shù)的最大值。
 */
public class RandomDriver extends Configured implements Tool {

    public static void main(String[] args) {
        try {
            ToolRunner.run(new RandomDriver(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        conf.set("lucl.random.nums", "100");
        conf.set("mapreduce.job.maps", "2");
        
        Job job = Job.getInstance(getConf(), this.getClass().getSimpleName());
        
        job.setJarByClass(RandomDriver.class);
        
        job.setInputFormatClass(RandomInputFormat.class);
        
        job.setMapperClass(RandomMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(FloatWritable.class);
        
        job.setReducerClass(RandomReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        
        FileOutputFormat.setOutputPath(job, new Path(args[0]));
        
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
}

打包運行

[hadoop@nnode code]$ hadoop jar RandomMR.jar /201512020027
15/12/02 00:28:07 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
生成的隨機數(shù)的值如下:
0.020075738
0.700349
0.9617876
0.8286018
0.03357637
0.55033255
0.112645924
0.43312508
0.33184355
0.6960902
0.23912054
0.8523424
0.4133852
0.028242588
0.9031814
0.39397871
0.38278967
0.5842654
0.4569224
0.4008881
0.2230537
0.92889327
0.20127994
0.09574646
0.23173904
0.4365906
0.11567855
0.027944028
0.6965957
0.78311944
0.2365641
0.8575301
0.07472658
0.5219022
0.9409952
0.7122519
0.8722465
0.30288923
0.51773626
0.91211754
0.93172425
0.38484365
0.44844115
0.24589789
0.83361626
0.40983224
0.9444963
0.12061542
0.8446641
0.5303581
0.11295539
0.094003916
0.11822218
0.4997149
0.98296344
0.48746037
0.31420535
0.1151396
0.7904337
0.80005115
0.18344402
0.8171619
0.8749699
0.48023254
0.0044505
0.43879867
0.22367835
0.62924916
0.6998315
0.222148
0.7392884
0.4174865
0.4528237
0.70034826
0.3057149
0.29177833
0.22782367
0.8182611
0.46680295
0.4778521
0.6365823
0.43971914
0.27055055
0.26839674
0.5263245
0.8824649
0.51182485
0.20494783
0.7679403
0.31936407
0.13476872
0.47281688
0.3402111
0.28706527
0.038203478
0.7351879
0.6165404
0.41761196
0.5229257
0.7284225
====================
15/12/02 00:28:08 INFO mapreduce.JobSubmitter: number of splits:2
15/12/02 00:28:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448981819300_0014
15/12/02 00:28:09 INFO impl.YarnClientImpl: Submitted application application_1448981819300_0014
15/12/02 00:28:09 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448981819300_0014/
15/12/02 00:28:09 INFO mapreduce.Job: Running job: job_1448981819300_0014
15/12/02 00:28:38 INFO mapreduce.Job: Job job_1448981819300_0014 running in uber mode : false
15/12/02 00:28:38 INFO mapreduce.Job:  map 0% reduce 0%
15/12/02 00:29:13 INFO mapreduce.Job:  map 100% reduce 0%
15/12/02 00:29:32 INFO mapreduce.Job:  map 100% reduce 100%
15/12/02 00:29:32 INFO mapreduce.Job: Job job_1448981819300_0014 completed successfully
15/12/02 00:29:32 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=26
                FILE: Number of bytes written=323256
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=520
                HDFS: Number of bytes written=31
                HDFS: Number of read operations=7
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=64430
                Total time spent by all reduces in occupied slots (ms)=16195
                Total time spent by all map tasks (ms)=64430
                Total time spent by all reduce tasks (ms)=16195
                Total vcore-seconds taken by all map tasks=64430
                Total vcore-seconds taken by all reduce tasks=16195
                Total megabyte-seconds taken by all map tasks=65976320
                Total megabyte-seconds taken by all reduce tasks=16583680
        Map-Reduce Framework
                Map input records=2
                Map output records=2
                Map output bytes=16
                Map output materialized bytes=32
                Input split bytes=520
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=32
                Reduce input records=2
                Reduce output records=1
                Spilled Records=4
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=356
                CPU time spent (ms)=1940
                Physical memory (bytes) snapshot=513851392
                Virtual memory (bytes) snapshot=2541506560
                Total committed heap usage (bytes)=257171456
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=0
        File Output Format Counters 
                Bytes Written=31
[hadoop@nnode code]$

查看輸出結(jié)果

[hadoop@nnode code]$ hdfs dfs -ls /201512020027
Found 2 items
-rw-r--r--   2 hadoop hadoop          0 2015-12-02 00:29 /201512020027/_SUCCESS
-rw-r--r--   2 hadoop hadoop         31 2015-12-02 00:29 /201512020027/part-r-00000
[hadoop@nnode code]$ hdfs dfs -text /201512020027/part-r-00000
The max value is :      0.98296344
[hadoop@nnode code]$

網(wǎng)站名稱:Hadoop2.6.0學(xué)習(xí)筆記(五)自定義InputFormat和RecordReader
文章網(wǎng)址:http://fisionsoft.com.cn/article/jdohdj.html