-2

私はすべてのjsons(top/total)で各placeNameのパーセンタイルを見つけることができるように、Total(総数)とTop Elements(フィルタ後の数)を取得しようとしています。評価> 3:外部結合を行う方法:Spark Scala SQLContext

// sc : An existing SparkContext. 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val df = sqlContext.jsonFile("temp.txt") 
    //df.show() 


    val res = df.withColumn("visited", explode($"visited")) 

    val result = res.groupBy($"customerId", $"visited.placeName") 

Tried with joins : 
val result1 =res.groupBy($"customerId", $"visited.placeName").agg(count("*").alias("total")) 

val result2 = res 
.filter($"visited.rating" < 4) 
    .groupBy($"requestId", $"visited.placeName") 
    .agg(count("*").alias("top")) 

result1.show() 

result2.show() 
percentile = result1.join(result2, List("placeName","customerId"), "outer") 
sqlContext.sql("select top/total as percentile from temp groupBy placeName") 

ただし、私にエラーが発生します。

val result1 = result.withColumn("Top", getCount(res , true)) 
        .withColumn("Total",getCount(result, false)).show() 


    def getCount(df: DataFrame, flag: Boolean): Int { 
      if (flag == "true") return df.filter($"visited.rating" < 3).groupBy($"customerId", $"visited.placeName").agg(count("*")) 
      else return df.agg(count("*")) 
      } 

マイスキーマ:

は、私のようなUDF何かでこれを行うことができます

{ 
     "country": "France", 
     "customerId": "France001", 
     "visited": [ 
      { 
       "placeName": "US", 
       "rating": "2", 
       "famousRest": "N/A", 
       "placeId": "AVBS34" 

      }, 
       { 
       "placeName": "US", 
       "rating": "3", 
       "famousRest": "SeriousPie", 
       "placeId": "VBSs34" 

      }, 
       { 
       "placeName": "Canada", 
       "rating": "3", 
       "famousRest": "TimHortons", 
       "placeId": "AVBv4d" 

      }   
    ] 
} 

US top = 1 count = 3 
Canada top = 1 count = 3 


{ 
     "country": "Canada", 
     "customerId": "Canada012", 
     "visited": [ 
      { 
       "placeName": "UK", 
       "rating": "3", 
       "famousRest": "N/A", 
       "placeId": "XSdce2" 

      }, 


    ] 
} 
UK top = 1 count = 1 


{ 
     "country": "France", 
     "customerId": "France001", 
     "visited": [ 
      { 
       "placeName": "US", 
       "rating": "4.3", 
       "famousRest": "N/A", 
       "placeId": "AVBS34" 

      }, 
       { 
       "placeName": "US", 
       "rating": "3.3", 
       "famousRest": "SeriousPie", 
       "placeId": "VBSs34" 

      }, 
       { 
       "placeName": "Canada", 
       "rating": "4.3", 
       "famousRest": "TimHortons", 
       "placeId": "AVBv4d" 

      }   
    ] 
} 

US top = 2 count = 3 
Canada top = 1 count = 3 

だから、最後に私のようなものが必要です

PlaceName percentile 
US   57.14   (1+1+2)/(3+1+3) *100 
Canada  33.33   (1+1)/(3+3) *100 
UK   100    1*100 

スキーマ:

root 
|-- country: string(nullable=true) 
|-- customerId:string(nullable=true) 
|-- visited: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- placeId: string (nullable = true) 
| | |-- placeName: string (nullable = true) 
| | |-- famousRest: string (nullable = true) 
| | |-- rating: string (nullable = true) 

答えて

2

あなたが指定したコードでは、ソースがどのように構造化されているのか、この特定のエラーが発生する理由は明確ではありませんが、一般的にこのコードはリモートで有効ではありません。

  • getCountは重要ではありませんが重要な違いです。
  • getCountは、スコープ内にcolタイプが存在しないため、有効な機能ではありません。何らかの理由でo.a.s.sql.DataFrameの型エイリアスとして使用していない限り、これはコンパイルされません!
  • Sparkがネストされたアクション/変換をサポートしていない場合でも、Spark DataFrameでUDFを使用してクエリまたは集計を実行することはできません。
+0

私は意図したことを追加しようとしていました。別のクエリを使用して結合を試みました。val res = df.withColumn( "visited"、explode($ "visited")) 'val result1 = res.groupBy($" customerId "、$" visited.placeName ") .gg(count( "*")。エイリアス( "total")) val result2 = res .filter($ "visited.rating" <4) .groupBy($ "requestId"、$ "visited.placeName ") .agg(数(" * ")の別名。(" トップ」)) result1.show() result2.show() ' –

+0

と使用が加わり:'パーセンタイル= result1.join(結果2、リスト( "placeName"、 "customerId")、 "outer")sqlContext.sql( "temp groupBy placeNameからのパーセンタイルとしてトップ/トータルを選択") 'しかし、エラーも発生します。それがなぜpdfを試してみたかったのですか(カウントを追加するために(トップとトータル))。これにアプローチしてパーセンタイルを得る方法を教えてください –

関連する問題