2017-01-14 7 views
1

私はspark sqlのソースとしてredisを使用しようとしていますが、rddを変換する方法に悩まされています。以下は私のコードです:redisをsparkのデータセットまたはデータフレームに変換する方法は?

RDD<Tuple2<String,String>> rdd1 = rc.fromRedisKV("user:*",3,redisConfig); 

    JavaRDD<Row> userRDD = rdd1.toJavaRDD().map(new Function<Tuple2<String,String>, Row>(){ 
     public Row call(Tuple2<String, String> tuple2) throws Exception { 
      System.out.println(tuple2._2); 
      return RowFactory.create(tuple2._2().split(",")); 
     } 
    }); 

    List<StructField> structFields = new ArrayList<StructField>(); 
    structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); 
    structFields.add(DataTypes.createStructField("sex", DataTypes.StringType, false)); 
    structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false)); 
    StructType structType = DataTypes.createStructType(structFields); 

    Dataset ds = spark.createDataFrame(userRDD, structType); 
    ds.createOrReplaceTempView("user"); 
    ds.printSchema(); 

    String sql = "select name, sex, age from user "; 

    List<Row> list2 = spark.sql(sql).collectAsList(); 

私は次の例外だ:私は次に何をすべきか見当がつかない

Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD 

を、助けてください!

答えて

0

最終的に私のコードに間違いはありませんが、アプリケーションのjarをSparkサーバーにアップロードする必要があります。

関連する問題