2017-02-25 6 views
1

imはreduce関数でPageRankNode(2つのフィールドを持つ)のiterableを入力値として受け取り、それを優先度キューに追加するmapreduceプログラムを作成します。各オブジェクトを反復して優先度キューに追加すると、結果の優先度キューには最後に追加されたオブジェクトのみが含まれます。 しかし、私は同じタイプの新しいオブジェクトを作成し、優先度のキューに追加すると予想どおりに動作しているようです。新しいオブジェクトを追加する優先度キューと既に作成済みの追加

私はなぜこれが起こっているのだろうか? 以下のサンプルが動作します。しかし、 "topPages.add(新しいPageRankNode(pageNode.pageName、pageNode.pageRank))"の代わりに、 "topPages.add(pageNode)"を使用すると、期待どおりに動作しません。

優先度キューのコンパレータの実装も以下に追加されています。

private Comparator<PageRankNode> comparator= new PageNodeComparator(); 
    private PriorityQueue<PageRankNode> topPages= new PriorityQueue<PageRankNode>(100,comparator); 

public void reduce(NullWritable key,Iterable<PageRankNode> pageNodes,Context context) throws IOException,InterruptedException{ 
    for(PageRankNode pageNode:pageNodes){ 
     //topPages.add(pageNode); 
     topPages.add(new PageRankNode(pageNode.pageName,pageNode.pageRank)); 
     if(topPages.size()>100){ 
      topPages.poll(); 
     } 
    } 
    PageRankNode pageNode; 
    while(!topPages.isEmpty()){ 
     pageNode=topPages.poll(); 
     context.write(NullWritable.get(),new Text(pageNode.pageName+":"+pageNode.pageRank)); 
    } 

} 
public class PageNodeComparator implements Comparator<PageRankNode>{ 

    public int compare(PageRankNode x,PageRankNode y){ 
     if(x.pageRank < y.pageRank){ 
      return -1; 
     } 
     if(x.pageRank > y.pageRank){ 
      return 1; 
     } 
     return 0; 
    } 
} 

答えて

1

これを正しく診断するのに十分な情報を提供していないと思います。 reduceメソッドにInterruptedExceptionがあることがわかります。これは、複数のスレッドでこれを実行している可能性があることを示唆しています。

私は同じことをして、その出力が期待どおりの小さなプログラムを書いた。

import java.util.Arrays; 
import java.util.Comparator; 
import java.util.PriorityQueue; 

public class Main { 
    private static Comparator<PageRankNode> comparator = new PageNodeComparator(); 
    private static PriorityQueue<PageRankNode> topPages = new PriorityQueue<PageRankNode>(100, comparator); 

    public static void main(String[] args) { 
    reduce(Arrays.asList(
     new PageRankNode("A", 1000), 
     new PageRankNode("B", 1500), 
     new PageRankNode("C", 500), 
     new PageRankNode("D", 700), 
     new PageRankNode("E", 7000), 
     new PageRankNode("F", 60) 
    )); 
    } 

    public static void reduce(Iterable<PageRankNode> pageNodes) { 
    for(PageRankNode pageNode : pageNodes) { 
     //topPages.add(pageNode); 
     topPages.add(new PageRankNode(pageNode.pageName, pageNode.pageRank)); 
     if(topPages.size() > 100) { 
     topPages.poll(); 
     } 
    } 
    PageRankNode pageNode; 
    while(!topPages.isEmpty()) { 
     pageNode = topPages.poll(); 
     System.out.println(pageNode.pageName); 
    } 
    } 

    public static class PageRankNode { 
    private String pageName; 
    private int pageRank; 

    public PageRankNode(String pageName, int pageRank) { 
     this.pageName = pageName; 
     this.pageRank = pageRank; 
    } 
    } 

    public static class PageNodeComparator implements Comparator<PageRankNode> { 

    @Override 
    public int compare(PageRankNode x, PageRankNode y) { 
     if(x.pageRank < y.pageRank) { 
     return -1; 
     } 
     if(x.pageRank > y.pageRank) { 
     return 1; 
     } 
     return 0; 
    } 
    } 
} 

出力は次のとおりです。

F 
C 
D 
A 
B 
E 
関連する問題