2017-11-08 3 views
0

スパーククラスタでspark_applyを使用して、2つの列でグループ化されたデータのkmeansを計算しようとしています。データはHiveから照会され、次のようになります。r sparklyr spark_applyエラー:org.apache.spark.sql.AnalysisException:参照 'id'が曖昧です

> samplog1 
# Source: lazy query [?? x 6] 
# Database: spark_connection 
            id  time1 latitude longitude   timestamp hr 
            <chr>  <dbl> <dbl>  <dbl>    <chr> <int> 
1 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509338e+12 1.373545 104.1265 2017-10-30 04:29:59  4 
2 fffc7412-deb1-4587-9c22-29ca833865ed 1.509332e+12 5.701320 117.4892 2017-10-30 02:49:47  2 
3 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509338e+12 5.334012 100.2172 2017-10-30 04:25:44  4 
4 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509338e+12 1.373545 104.1265 2017-10-30 04:29:44  4 
5 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509332e+12 5.334061 100.2173 2017-10-30 02:58:30  2 
6 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509339e+12 5.334012 100.2172 2017-10-30 04:55:41  4 
7 fffc7412-deb1-4587-9c22-29ca833865ed 1.509339e+12 5.729879 117.5787 2017-10-30 04:49:07  4 
8 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509340e+12 1.373545 104.1265 2017-10-30 05:02:08  5 
9 fffc7412-deb1-4587-9c22-29ca833865ed 1.509325e+12 5.701320 117.4892 2017-10-30 00:53:12  0 
10 fffc7412-deb1-4587-9c22-29ca833865ed 1.509336e+12 5.670300 117.4990 2017-10-30 04:08:12  4 

spark_applyに渡す機能は以下のとおりです。 idとhrでデータグループを取得し、各グループのkmeansを計算し、各グループが表す行の割合を計算し(信頼度)、最も高いメンバー数と信頼度を持つ中心を返すと仮定します:

kms <- function(idLogs){ 
    tryCatch({ 
    km <- sparklyr::ml_kmeans(idLogs, centers = 3, features = c("latitude","longitude")) 

    km1 <- copy_to(sc, km$centers, overwrite = T) 

    cluster <- sdf_predict(km) 

    clustCounts <- cluster %>% group_by(prediction) %>% 
    tally %>% 
    mutate(conf=n/sum(n), 
      prediction=prediction+1) 

    clustCounts1 <- merge(clustCounts, km1, by.x=3, by.y=0) 

    clustCounts1 <- copy_to(sc, clustCounts1, overwrite = T) 

    clustCounts2 <- clustCounts1 %>% filter(., conf==max(conf)) %>% select(latitude, longitude, conf) 

    return(data.frame(clustCounts2)) 
    }, error = function(e) { 
    return(
     data.frame(string_id = c(0), string_categories = c("error")) 
    ) 
    }) 
} 

と私はしかし、私はあいまいな「ID」列に関するエラーが発生します

spark_apply(x = samplog1, f = kms, group_by = c("id","hr")) 

としてそれを呼び出しています。

Error: org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#1569, id#1571.; 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:171) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4$$anonfun$26.apply(Analyzer.scala:470) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4$$anonfun$26.apply(Analyzer.scala:470) 
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:470) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:466) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335) 
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) 
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:108) 
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:118) 
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:122) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:122) 
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:466) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:346) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:346) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:327) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) 
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
at scala.collection.immutable.List.foldLeft(List.scala:84) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) 
at scala.collection.immutable.List.foreach(List.scala:318) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) 
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:37) 
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:37) 
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:35) 
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) 
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2141) 
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:721) 
at org.apache.spark.sql.DataFrame.selectExpr(DataFrame.scala:754) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at sparklyr.Invoke$.invoke(invoke.scala:102) 
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97) 
at sparklyr.StreamHandler$.read(stream.scala:62) 
at sparklyr.BackendHandler.channelRead0(handler.scala:52) 
at sparklyr.BackendHandler.channelRead0(handler.scala:14) 
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
at io.netty.handler.codec.ByteToM 

私が見てきたことは、IDが共有されているデータフレームを結合するときに起こり得ることです。この場合、私はいかなるデータフレームにも参加しません。可能性のある唯一の原因はマージ機能ですが、構成データフレームにはid列がありません。私はsparklyrに新しくspark_applyので、私は完全に間違った私の関数を書いたことができたことを感謝します。他の問題が明らかになる可能性がある場合は、以下のスクリプト全体を投稿しています。

Sys.setenv(HIVE_HOME="/opt/cloudera/parcels/CDH/lib/hive/") 

kms <- function(idLogs){ 
    tryCatch({ 
    km <- sparklyr::ml_kmeans(idLogs, centers = 3, features = c("latitude","longitude")) 

    km1 <- copy_to(sc, km$centers, overwrite = T) 

    cluster <- sdf_predict(km) 

    clustCounts <- cluster %>% group_by(prediction) %>% 
    tally %>% 
    mutate(conf=n/sum(n), 
      prediction=prediction+1) 

    clustCounts1 <- merge(clustCounts, km1, by.x=3, by.y=0) 

    clustCounts1 <- copy_to(sc, clustCounts1, overwrite = T) 

    clustCounts2 <- clustCounts1 %>% filter(., conf==max(conf)) %>% select(latitude, longitude, conf) 

    return(data.frame(clustCounts2)) 
    }, error = function(e) { 
    return(
     data.frame(string_id = c(0), string_categories = c("error")) 
    ) 
    }) 
} 

sc <- spark_connect(master = "yarn-client", 
        version="1.6.0", 
        spark_home = '/opt/cloudera/parcels/CDH/lib/spark/') 

tbl_change_db(sc, "clustergps") 

samplog <- tbl(sc, "part6") 

samplog <- mutate(samplog, timestamp = from_unixtime(time1/1000)) 

samplog <- mutate(samplog, hr = hour(timestamp)) 

samplog1 <- samplog %>% filter(id == 'fffd16d5-83f1-4ea1-95de-34b1fcad392b' | 
        id == 'fffc7412-deb1-4587-9c22-29ca833865ed' | 
        id == 'fffc68e3-866e-4be5-b1bc-5d21b89622ae') 


likelyLocs <- spark_apply(samplog1, kms, group_by = list("id","hr")) 

答えて

0

だから、ちょっとフィードバックをお願いします。出力列の名前を定義するspark_applyの "columns"パラメータを設定することで、この問題を解決できました。私はそれを任意の文字列/文字列値のベクトルに設定しました。

関連する問題