2016-03-20 10 views
2

だから、私は次の表を持っていると仮定します。意味:次のようなもの:SparkのSQL:DISTINCTのすべての値を取得するにはどうすればよいですか? - どのように多くとそれらの値私は名前ごとに異なる色のテーブルを取得したいと思い</p> <pre><code>Name | Color ------------------------------ John | Blue Greg | Red John | Yellow Greg | Red Greg | Blue </code></pre> <p>:

Name | Distinct | Values 
-------------------------------------- 
John | 2  | Blue, Yellow 
Greg | 2  | Red, Blue 

どのようにすればいいですか?

+0

'非常にsimple'は、指定しないようにミスを犯し、あなたのRDBMS、以下にsimple''から変えています:ここで私はこれを行うために作成したクラスです。だから回答を編集し、RDBMSを追加してください –

+0

@ThomasG公平なプラットフォームが指定されており、RDBMSではありません。 – zero323

+0

[PySpark 1.5.0では、列\ 'x \'の値に基づいて列\ y \のすべての項目をどのようにリストしますか?](http://stackoverflow.com/questions/36115411/)あなたのリストのすべての項目の列のyの値に基づく – zero323

答えて

3

collect_listは重複を削除せずにリストを表示します。 collect_setは自動的に重複 火花1.6.0はそれをチェックアウトするので、これだけ

select 
Name, 
count(distinct color) as Distinct, # not a very good name 
collect_set(Color) as Values 
from TblName 
group by Name 

この機能が実装されて削除されます。PySParkについては

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala

/** 
    * Aggregate function: returns a set of objects with duplicate elements eliminated. 
    * 
    * For now this is an alias for the collect_set Hive UDAF. 
    * 
    * @group agg_funcs 
    * @since 1.6.0 
    */ 
    def collect_set(columnName: String): Column = collect_set(Column(columnName)) 
0

を。私はR/Pandasのバックグラウンドから来ているので、実際にSpark Dataframesを使うのが少し楽になっています。データフレーム

  • にあなたのファイルを読む

    1. セットアップスパークSQLコンテキスト
    2. 一時テーブルとして、それが直接SQL構文に
    3. を使用して
    4. クエリをあなたのデータフレームを登録します。これを行うには

      結果をオブジェクトとして保存し、ファイルに出力する。あなたのことを実行する。

    class SQLspark(): 
    
    def __init__(self, local_dir='./', hdfs_dir='/users/', master='local', appname='spark_app', spark_mem=2): 
        self.local_dir = local_dir 
        self.hdfs_dir = hdfs_dir 
        self.master = master 
        self.appname = appname 
        self.spark_mem = int(spark_mem) 
        self.conf = (SparkConf() 
          .setMaster(self.master) 
          .setAppName(self.appname) 
          .set("spark.executor.memory", self.spark_mem)) 
        self.sc = SparkContext(conf=self.conf) 
        self.sqlContext = SQLContext(self.sc) 
    
    
    def file_to_df(self, input_file): 
        # import file as dataframe, all cols will be imported as strings 
        df = self.sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").option("inferSchema", "true").load(input_file) 
        # # cache df object to avoid rebuilding each time 
        df.cache() 
        # register as temp table for querying, use 'spark_df' as table name 
        df.registerTempTable("spark_df") 
        return df 
    
    # you also cast a spark dataframe as a pandas df 
    def sparkDf_to_pandasDf(self, input_df): 
        pandas_df = input_df.toPandas() 
        return pandas_df 
    
    def find_distinct(self, col_name): 
        my_query = self.sqlContext.sql("""SELECT distinct {} FROM spark_df""".format(col_name)) 
        # now do your thing with the results etc 
        my_query.show() 
        my_query.count() 
        my_query.collect() 
    
    ############### 
    if __name__ == '__main__': 
    
    # instantiate class 
    # see function for variables to input 
    spark = TestETL(os.getcwd(), 'hdfs_loc', "local", "etl_test", 10) 
    
    
    # specify input file to process 
    tsv_infile = 'path/to/file' 
    
  • 関連する問題

     関連する問題