2016-07-21 5 views
0

私はスパークするのが新しく、recommendProductsForUsersの出力をHbaseテーブルに保存したいと思っています。私は、保存するためにJavaPairRDDとsaveAsNewAPIHadoopDatasetを使用することを示す例(https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/)を見つけました。スパークを保存するMatrixFactorizationModelお勧めの製品をHbaseに投稿

JavaRDD<Tuple2<Object, Rating[]>>JavaPairRDD<ImmutableBytesWritable, Put>に変換するには、どうすればsaveAsNewAPIHadoopDatasetを使用できますか?

//Loads the data from hdfs 
    MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(), trainedDataPath); 

//Get recommendations for all users 
    JavaRDD<Tuple2<Object, Rating[]>> ratings3 = sameModel.recommendProductsForUsers(noOfProductsToReturn).toJavaRDD(); 
+0

モデルや推奨事項を保存しますか? – eliasah

+0

@eliasah私は推奨事項を保存したい – Ani

答えて

0

これは私が上記の問題を解決した方法です。これが誰かに役立つことを願っています。

JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts1 = ratings3 
        .mapToPair(new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() { 

         @Override 
         public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> arg0) 
           throws Exception { 
          Rating[] userAndProducts = arg0._2; 
          System.out.println("***********" + userAndProducts.length + "**************"); 
          List<Item> items = new ArrayList<Item>(); 
          Put put = null 
          String recommendedProduct = "";       
          for (Rating r : userAndProducts) { 

//Some logic here to convert Ratings into appropriate put command 
// recommendedProduct = r.product; 

} 

          put.addColumn(Bytes.toBytes("recommendation"), Bytes.toBytes("product"),Bytes.toBytes(recommendedProduct));      Bytes.toBytes("product"),Bytes.toBytes(response.getItems().toString())); 

          return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); 
         } 
        }); 

      System.out.println("*********** Number of records in JavaPairRdd: "+ hbasePuts1.count() +"**************"); 
      hbasePuts1.saveAsNewAPIHadoopDataset(newApiJobConfig.getConfiguration()); 
      jsc.stop();   
0

mapToPairを使用すると、それはこのように書き

JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = javaRDD.mapToPair(
    new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() { 
@Override 
public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> row) throws Exception { 

    Put put = new Put(Bytes.toBytes(row.getString(0))); 
    put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier1"), Bytes.toBytes(row.getString(1))); 
    put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier2"), Bytes.toBytes(row.getString(2))); 

     return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);  
} 
}); 

は、あなたが、コンストラクタ内の行のキーとそれを供給プットの新しいインスタンスをcretneし、各列に追加呼び出す:あなたは例を提供し、同じソースから(私は手でタイプを変更しました)。次に作成されたputを返します。

0

我々は単なるソーススプライスマシンをオープンしており、MLIBと照会とストレージをスプライスマシンに統合した例があります。これが助けになるかどうかはわかりませんが、私にあなたに知らせると思っていました。ポストのため

http://community.splicemachine.com/use-spark-libraries-splice-machine/

おかげで、非常にクール。

+0

ありがとう、これは非常に興味深い、これを検討します。 – Ani

関連する問題