2012-04-13 4 views
2

Java-Client 'HECTOR'を使用して、Cassandraに格納されたデータに対して簡単なmap-reduceジョブを実行しようとしています。Hectorを使用してCassandraデータにmapreduceを実行する

この美しいblogpostで説明されているhadoop-wordcountの例は既に正常に実行されています。私もHadoop Supportの記事を読んでいます。

しかし、私がやりたいことは、実装の面で少し違います(wordcountの例では、mapreduce-site.xmlが記述されているスクリプトを使用しています)。私は誰かが、私がカザンドラデータの 'HECTOR'からローカルではなく、分散モードでmap-reduceジョブを実行する方法を理解するのを手伝ってほしいです。

私のコードはローカルモードでmap-reduceジョブを成功させます。しかし、私が望むのは、それらを分散モードで実行し、結果をcassandraキースペースに新しいColumnFamilyとして書き込むことです。

私は分散モードでそれを実行するための
$PATH_TO_HADOOP/conf/mapred-site.xml
(上記のブログ投稿で述べたように)このどこかに設定する必要があるかもしれませんが、私はどこを知りません。ここで

ここに私のコードだ

public class test_forum implements Tool { 

private String KEYSPACE = "test_forum"; 
private String COLUMN_FAMILY ="posts"; 
private String OUTPUT_COLUMN_FAMILY = "output_post_count"; 
private static String CONF_COLUMN_NAME = "text"; 


public int run(String[] strings) throws Exception { 

    Configuration conf = new Configuration(); 

    conf.set(CONF_COLUMN_NAME, "text"); 
    Job job = new Job(conf,"test_forum"); 

    job.setJarByClass(test_forum.class); 
    job.setMapperClass(TokenizerMapper.class); 
    job.setReducerClass(ReducerToCassandra.class); 

    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(IntWritable.class); 

    job.setOutputKeyClass(ByteBuffer.class); 
    job.setOutputValueClass(List.class); 

    job.setOutputFormatClass(ColumnFamilyOutputFormat.class); 
    job.setInputFormatClass(ColumnFamilyInputFormat.class); 


    System.out.println("Job Set"); 


    ConfigHelper.setRpcPort(job.getConfiguration(), "9160"); 
    ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost"); 
    ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); 

    ConfigHelper.setInputColumnFamily(job.getConfiguration(),KEYSPACE,COLUMN_FAMILY); 
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); 

    SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("text"))); 

    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),predicate); 

    System.out.println("running job now.."); 

    boolean success = job.waitForCompletion(true); 

    return success ? 0:1; //To change body of implemented methods use File | Settings | File Templates. 

} 



public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable> 
{ 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 
    private ByteBuffer sourceColumn; 
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) 
    throws IOException, InterruptedException 
    { 
     sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); 
    } 

    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException 
    { 



     IColumn column = columns.get(sourceColumn); 

     if (column == null) { 
      return; 
     } 

     String value = ByteBufferUtil.string(column.value()); 
     System.out.println("read " + key + ":" + value + " from " + context.getInputSplit()); 

     StringTokenizer itr = new StringTokenizer(value); 

     while (itr.hasMoreTokens()) 
     { 
      word.set(itr.nextToken()); 
      context.write(word, one); 
     } 
    } 


} 

    public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> 
{ 
    private ByteBuffer outputKey; 

    public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
    { 
     int sum = 0; 

     byte[] keyBytes = word.getBytes(); 
     outputKey = ByteBuffer.wrap(Arrays.copyOf(keyBytes, keyBytes.length)); 


     for (IntWritable val : values) 
      sum += val.get(); 

     System.out.println(word.toString()+" -> "+sum); 
     context.write(outputKey, Collections.singletonList(getMutation(word, sum))); 

    } 

    private static Mutation getMutation(Text word, int sum) 
    { 
     Column c = new Column(); 
     c.setName(Arrays.copyOf(word.getBytes(), word.getLength())); 
     c.setValue(ByteBufferUtil.bytes(String.valueOf(sum))); 
     c.setTimestamp(System.currentTimeMillis()); 

     Mutation m = new Mutation(); 
     m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); 
     m.column_or_supercolumn.setColumn(c); 
     System.out.println("Mutating"); 
     return m; 

    } 

} 




public static void main(String[] args) throws Exception, ClassNotFoundException, InterruptedException { 

    System.out.println("Working..!"); 

    int ret=ToolRunner.run(new Configuration(), new test_forum(), args); 

    System.out.println("Done..!"); 

    System.exit(ret); 

} 

}

私が手警告は、次のとおりです。

WARN - JobClient     - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
WARN - JobClient     - No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 

しかし、コードが正常に実行マップ-減らすタスクを実行し、しかし、私は知りませんデータはどこに書き込まれますか?

EDIT:出力のためにcassandraにcolumnFamilyを作成していませんでした。したがって、それは書いていませんでした。だから、唯一の問題は分散モードでそれを実行する方法です。

ありがとうございます。

答えて

2

あなたのクラスでjarファイルを作成しましたか?

クラスタ上でジョブクラスを伝播できるようにするには、Hadoopにjarが必要です。実行しなかった場合、「ジョブジャーファイルセットなし」というエラーが説明され、分散モードで実行できない理由が説明されています。 "hadoop jar ..."コマンドでジョブを起動し、jarの依存関係(少なくともapache-cassandra!)を追加してください。あなたの仕事を提出するときには、あなたのcassandraサーバーが立ち上がってリリーフポートを聞いていなければなりません。

ところで、HadoopとCassandraはHectorを必要としません。 ColumnFamilyInputFormat(およびColumnFamilyOutputFormat)は、独自の方法でCassandraにデータを読み書きする方法を示していません。そのため、RpcPortInitialAdressPartionnerを設定しなければなりませんでした。

最後の注意:ColumnFamilyOutputFormatは出力列ファミリを作成しません。既に存在していなければなりません。そうでなければ、書き込み時にエラーが発生します。このことができます

希望、

ブノワ

関連する問題