<tbody id="xj1oc"></tbody>

    <bdo id="xj1oc"></bdo><bdo id="xj1oc"><optgroup id="xj1oc"><thead id="xj1oc"></thead></optgroup></bdo>

    現在的位置: 首頁 > 黃專家專欄 > 正文

    作業的提交和監控(二)

    2014年11月03日 黃專家專欄 ⁄ 共 4632字 ⁄ 字號 評論關閉

    文件分片

    函數

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
        Path jobSubmitDir) throws IOException,
        InterruptedException, ClassNotFoundException {
      JobConf jConf = (JobConf)job.getConfiguration();
      int maps;
      if (jConf.getUseNewMapper()) {
        maps = writeNewSplits(job, jobSubmitDir);
      } else {
        maps = writeOldSplits(jConf, jobSubmitDir);
      }
      return maps;
    }

    執行文件分片,并得到需要的 map 數目

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    public InputSplit[] getSplits(JobConf job, int numSplits)
      throws IOException {
      // 得到輸入文件的各種狀態
      FileStatus[] files = listStatus(job);
    
      // Save the number of input files in the job-conf
      // conf 中設置輸入文件的數目
      job.setLong(NUM_INPUT_FILES, files.length);
    
      // 計算總的大小
      long totalSize = 0;                           // compute total size
      for (FileStatus file: files) {                // check we have valid files
        if (file.isDir()) {
          throw new IOException("Not a file: "+ file.getPath());
        }
        totalSize += file.getLen();
      }
    
      // numSplits 傳進來的是 map 的數目
      // 獲得每一個分片的期望大小
      long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    
      // 獲得最小的分片大小,這個可以在 mapred.min.split.size 中設置
      long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                              minSplitSize);
    
      // generate splits
      // 以下是生成分片的計算
      ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
      NetworkTopology clusterMap = new NetworkTopology();
      for (FileStatus file: files) {
        Path path = file.getPath();
        FileSystem fs = path.getFileSystem(job);
        long length = file.getLen();
        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
        // isSplitable 是判斷該文件是否可以分片
        // 一般情況下都是可以的,但是如果是 stream compressed 的方式,那么是不可以的
        if ((length != 0) && isSplitable(fs, path)) { 
          long blockSize = file.getBlockSize();
    
          // 計算每一個分片大小的實際函數
          // 得到真實的分片大小
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
    
          long bytesRemaining = length;
    
          // 允許最后一個分片在 SPLIT_SLOP(默認 1.1) 比例之下
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[] splitHosts = getSplitHosts(blkLocations, 
                length-bytesRemaining, splitSize, clusterMap);
            // 加入分片
            splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                splitHosts));
            bytesRemaining -= splitSize;
          }
    
          // 加入最后一個分片
          // 這個比例最大不超過期望分片的 1.1
          if (bytesRemaining != 0) {
            splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
                       blkLocations[blkLocations.length-1].getHosts()));
          }
        } else if (length != 0) {
          String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
          splits.add(new FileSplit(path, 0, length, splitHosts));
        } else { 
          //Create empty hosts array for zero length files
          splits.add(new FileSplit(path, 0, length, new String[0]));
        }
      }
      LOG.debug("Total # of splits: " + splits.size());
      return splits.toArray(new FileSplit[splits.size()]);
    }
    
    protected long computeSplitSize(long goalSize, long minSize,
                                         long blockSize) {
      // 計算分片大小,很明顯
      // 這里設定了最大最小值,每一個分片大小在 minSize 和 blockSize 之間
      return Math.max(minSize, Math.min(goalSize, blockSize));
    }

    這樣看,要想設置超過大于 block size 的也是可以的,只要將 minSize 設置很大即可 以上分片算法只是單純計算需要多少個 map ,根據設定的 mapred.map.tasks 計算出這個任務需要多少個 map 最終的 map 數目,可能和 mapred.map.tasks 不同

    但是這樣仍然會有一個問題,就是這個只是按照輸入文件的大小做邏輯的切分,但是如果文件中含有邊界(比如 Text 文件就是以行作為邊界),那么實際的劃分就不一定是這樣的。

    這個是由 RecordReader 實現的,它將某一個 split 解析成一個個 key 和 value 對

    我們看看實際的 TextInputFormat 類,它其實生成了 LineRecordReader

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    public LineRecordReader(Configuration job, FileSplit split,
        byte[] recordDelimiter) throws IOException {
      this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                      Integer.MAX_VALUE);
    
      // 得到文件開始和結束的位置
      start = split.getStart();
      end = start + split.getLength();
      final Path file = split.getPath();
      compressionCodecs = new CompressionCodecFactory(job);
      final CompressionCodec codec = compressionCodecs.getCodec(file);
    
      // open the file and seek to the start of the split
      FileSystem fs = file.getFileSystem(job);
      FSDataInputStream fileIn = fs.open(split.getPath());
    
      // skipFirstLine 表示跳過第一行
      boolean skipFirstLine = false;
      if (codec != null) {
        in = new LineReader(codec.createInputStream(fileIn), job,
              recordDelimiter);
        end = Long.MAX_VALUE;
      } else {
        if (start != 0) {
          // 如果開始的位置不是整個文件的開始
          // 那么,有可能是在行的中間, LineRecordReader 的處理方式是跳過這行,從下一行處理起
          skipFirstLine = true;
          --start;
          fileIn.seek(start);
        }
        in = new LineReader(fileIn, job, recordDelimiter);
      }
      if (skipFirstLine) {  // skip first line and re-establish "start".
          // 跳過第一行
        start += in.readLine(new Text(), 0,
                             (int)Math.min((long)Integer.MAX_VALUE, end - start));
      }
      this.pos = start;
    }
    
    public synchronized boolean next(LongWritable key, Text value)
      throws IOException {
    
      while (pos < end) {
        key.set(pos);
    
        // 在這里, 會處理一個完整行
        // 但是有可能最后一行的另外一個部分在另一個 split 里面
        // 但是 FSDataInputStream fileIn 作為一個抽象,這樣的操作使得對 Reader 透明了
        int newSize = in.readLine(value, maxLineLength,
                                  Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                           maxLineLength));
        if (newSize == 0) {
          return false;
        }
        pos += newSize;
        if (newSize < maxLineLength) {
          return true;
        }
    
        // line too long. try again
        LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
      }
    
      return false;
    }

    以上代碼我們可以知道,TextInputFormat 生成的 LineRecordReader 會根據行邊界來切分,避免了 split 邏輯分片不考慮邊界的情況。

    其實 SequenceFileInputFormat 輸入也同樣有邊界問題,這是根據創建時候的序列點來實現的。 具體代碼可以看 SequenceFileRecordReader 里面的實現

    抱歉!評論已關閉.

    黄色电影网址