2017-10-20 17 views
0

私はJavaの専門家ではありませんが、私はJavaの基礎を理解しています。常に理解できるようにJavaコードを常に深く理解しようとしています。 それは本当にばかげた疑いかもしれませんが、私の心の中でそれを明確に理解するのが大好きです。
私の疑問はJavaに関するものなので、私はJavaコミュニティに投稿しています。ラップされたタイプはHadoopでどのように機能しますか?

私はhadoopを使って作業していますが、hadoopは独自の型を使用しています。これはJavaのプリミティブ型をラップして、シリアル化とデシリアライゼーションに基づいてネットワーク経由でデータを送信する効率を高めます。

私の混乱がここから始まる、私たちはHadoopのコードで、このコードのHadoopのタイプで

org.apache.hadoop.io.IntWritable; 
org.apache.hadoop.io.LongWritable; 
org.apache.hadoop.io.Text; 
org.apache.hadoop.mapreduce.Mapper; 

import java.io.IOException; 
public class WordCountMapper 
{ 
extends Mapper<LongWritable,Text,Text,IntWritable> 
@Override 
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 
} 
} 
String line = value.toString(); 
for (String word : line.split(" ")){ 
if(word.length()>0){ 
context.write(new Text(word),new IntWritable(1)); 
} 

を実行している次のJavaコードを使用して処理することがHDFSでいくつかのデータを持っていると言うことができますが、このLongWritable、テキスト、IntWritableのようなものです。
文字列型のJavaで囲まれたText型をピックアップします(間違っている場合は修正してください)。私たちは、これらのパラメータは以下

import package i.e org.apache.hadoop.io.Text;にあるコードとの対話取得する方法上記のコードで私たちのメソッドマップにこれらのパラメータを渡していたときに、ここで
私の疑問は、テキストクラスコード

package org.apache.hadoop.io; 

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.nio.CharBuffer; 
import java.nio.charset.CharacterCodingException; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
import java.nio.charset.CharsetEncoder; 
import java.nio.charset.CodingErrorAction; 
import java.nio.charset.MalformedInputException; 
import java.text.CharacterIterator; 
import java.text.StringCharacterIterator; 
import java.util.Arrays; 
import org.apache.avro.reflect.Stringable; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import org.apache.hadoop.classification.InterfaceAudience.Public; 
import org.apache.hadoop.classification.InterfaceStability.Stable; 



@Stringable 
@InterfaceAudience.Public 
@InterfaceStability.Stable 
public class Text 
    extends BinaryComparable 
    implements WritableComparable<BinaryComparable> 
{ 
    private static final Log LOG = LogFactory.getLog(Text.class); 

    private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal() 
    { 
    protected CharsetEncoder initialValue() { 
     return Charset.forName("UTF-8").newEncoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT); 
    } 
    }; 



    private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal() 
    { 
    protected CharsetDecoder initialValue() { 
     return Charset.forName("UTF-8").newDecoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT); 
    } 
    }; 



    private static final byte[] EMPTY_BYTES = new byte[0]; 
    private byte[] bytes; 
    private int length; 

    public Text() 
    { 
    bytes = EMPTY_BYTES; 
    } 


    public Text(String string) 
    { 
    set(string); 
    } 

    public Text(Text utf8) 
    { 
    set(utf8); 
    } 


    public Text(byte[] utf8) 
    { 
    set(utf8); 
    } 




    public byte[] getBytes() 
    { 
    return bytes; 
    } 

    public int getLength() 
    { 
    return length; 
    } 








    public int charAt(int position) 
    { 
    if (position > length) return -1; 
    if (position < 0) { return -1; 
    } 
    ByteBuffer bb = (ByteBuffer)ByteBuffer.wrap(bytes).position(position); 
    return bytesToCodePoint(bb.slice()); 
    } 

    public int find(String what) { 
    return find(what, 0); 
    } 


    public int find(String what, int start) 
    { 
    try 
    { 
     ByteBuffer src = ByteBuffer.wrap(bytes, 0, length); 
     ByteBuffer tgt = encode(what); 
     byte b = tgt.get(); 
     src.position(start); 

     while (src.hasRemaining()) { 
     if (b == src.get()) { 
      src.mark(); 
      tgt.mark(); 
      boolean found = true; 
      int pos = src.position() - 1; 
      while (tgt.hasRemaining()) { 
      if (!src.hasRemaining()) { 
       tgt.reset(); 
       src.reset(); 
       found = false; 

      } 
      else if (tgt.get() != src.get()) { 
       tgt.reset(); 
       src.reset(); 
       found = false; 
      } 
      } 

      if (found) return pos; 
     } 
     } 
     return -1; 
    } 
    catch (CharacterCodingException e) { 
     e.printStackTrace(); } 
    return -1; 
    } 

    public void set(String string) 
    { 
    try 
    { 
     ByteBuffer bb = encode(string, true); 
     bytes = bb.array(); 
     length = bb.limit(); 
    } catch (CharacterCodingException e) { 
     throw new RuntimeException("Should not have happened " + e.toString()); 
    } 
    } 


    public void set(byte[] utf8) 
    { 
    set(utf8, 0, utf8.length); 
    } 

    public void set(Text other) 
    { 
    set(other.getBytes(), 0, other.getLength()); 
    } 






    public void set(byte[] utf8, int start, int len) 
    { 
    setCapacity(len, false); 
    System.arraycopy(utf8, start, bytes, 0, len); 
    length = len; 
    } 






    public void append(byte[] utf8, int start, int len) 
    { 
    setCapacity(length + len, true); 
    System.arraycopy(utf8, start, bytes, length, len); 
    length += len; 
    } 



    public void clear() 
    { 
    length = 0; 
    } 










    private void setCapacity(int len, boolean keepData) 
    { 
    if ((bytes == null) || (bytes.length < len)) { 
     if ((bytes != null) && (keepData)) { 
     bytes = Arrays.copyOf(bytes, Math.max(len, length << 1)); 
     } else { 
     bytes = new byte[len]; 
     } 
    } 
    } 



    public String toString() 
    { 
    try 
    { 
     return decode(bytes, 0, length); 
    } catch (CharacterCodingException e) { 
     throw new RuntimeException("Should not have happened " + e.toString()); 
    } 
    } 

    public void readFields(DataInput in) 
    throws IOException 
    { 
    int newLength = WritableUtils.readVInt(in); 
    setCapacity(newLength, false); 
    in.readFully(bytes, 0, newLength); 
    length = newLength; 
    } 

    public static void skip(DataInput in) throws IOException 
    { 
    int length = WritableUtils.readVInt(in); 
    WritableUtils.skipFully(in, length); 
    } 




    public void write(DataOutput out) 
    throws IOException 
    { 
    WritableUtils.writeVInt(out, length); 
    out.write(bytes, 0, length); 
    } 

    public boolean equals(Object o) 
    { 
    if ((o instanceof Text)) 
     return super.equals(o); 
    return false; 
    } 

されています上記のhadoopのコードを実行したときにHDFSのデータがマップメソッドで述べたパラメータを渡って流れるときに私が気づくかもしれません。
HDFSからの最初のデータセットがTextパラメータにヒットしたら、org.apache.hadoop.io.Textクラスの中をどのように流れますか?
どこから始まるのですか?(私は、クラスのsetメソッドから始まっていると仮定しています。これは、前述のマップメソッドと同じパラメータを持っているため、正しいですか?)
通常の文字列型からTextコードを入力しますか?

私の第二の疑問は、データがテキストタイプで保存されているとき、誰がそれを起動して倫理化を開始するかです。私は誰がこのwrite(DataOutput out)を呼び出し、データがネットワーク上のその宛先に到達するとreadFields(DataInput in)を呼び出すのでしょうか?
どのように動作しますか、どこを見なければなりませんか?

私が求めていることがはっきりしていることを希望します。

答えて

0

すべてのネットワーク操作やディスク操作と同様、すべてがバイトとして転送されます。 TextクラスはバイトをUTF-8に逆シリアル化します。 Writableはデータの表現方法を決定し、Comparablesはデータの順序付け方法を決定します。

ジョブに設定されているInputFormatは、マップに与えられたWritableまたはタスクを削減するものを決定します。 1つのマップタスクは各InputSplit

に開始され

Writablesに生のバイトストリームを分割して読み取る方法を決定

アンInputSplit

https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

を参照してください。
関連する問題