2016-10-19 14 views
1

私はSpark Datasetの分割アプリケーションを探していますが、これは前述のロジックと似ています。は、指定されたロジックでSpark Datasetを分割する方法があります

>>> import pandas as pd 
>>> import numpy as np 

>>> df1 = pd.DataFrame(np.random.randn(10, 4), columns=['a', 'b', 'c', 'd']) 
>>> df1 
      a   b   c   d 
0 -0.398502 -1.083682 0.850632 -1.443868 
1 -2.124333 1.093590 -0.338162 -1.414035 
2 0.753560 0.600687 -0.998277 -2.094359 
3 -0.635962 -0.291226 0.428961 1.158153 
4 -0.900506 -0.545226 -0.448178 -0.567717 
5 0.112911 0.351649 0.788940 2.071541 
6 -0.358625 0.500367 1.009819 -1.139761 
7 1.003608 0.246925 0.225138 -0.586727 
8 0.355274 -0.540685 1.482472 0.364989 
9 3.089610 -1.415088 -0.072107 -0.203137 
>>> 

>>> mask = df1.applymap(lambda x: x <-0.7) 
>>> 
>>> mask 
     a  b  c  d 
0 False True False True 
1 True False False True 
2 False False True True 
3 False False False False 
4 True False False False 
5 False False False False 
6 False False False True 
7 False False False False 
8 False False False False 
9 False True False False 
>>> mask.any(axis=1) 
0  True 
1  True 
2  True 
3 False 
4  True 
5 False 
6  True 
7 False 
8 False 
9  True 
dtype: bool 
>>> df1 = df1[-mask.any(axis=1)] 
>>> df1 
      a   b   c   d 
3 -0.635962 -0.291226 0.428961 1.158153 
5 0.112911 0.351649 0.788940 2.071541 
7 1.003608 0.246925 0.225138 -0.586727 
8 0.355274 -0.540685 1.482472 0.364989 
>>> 

はスパークでは、私はdf.filterかかわらず行ってますが、その試みてピックにのみマッチしますが、私の場合は、3 -4レベルにデータをフィルタリング(削除)する必要があります。上に示したレベルは1つだけです。これはフィルタリングの一種です。

答えて

-1

私は考え出しどのようにフィルタリングする必要があります、3つの層で、すべてのソリューションはdf.exceptコマンドです。 (私はJAVAででしたが、)データoffice_people.csv

+---+------------------+--------------------+--------------------+------+---------------+ 
| id|   full_name|   job_title|    email|gender|  ip_address| 
+---+------------------+--------------------+--------------------+------+---------------+ 
| 1| Walter Lawrence| Graphic Designer|[email protected]| Male| 179.89.185.194| 
| 2|  Mark Reynolds| Structural Engineer| [email protected]| Male| 23.192.227.122| 
| 3|  Gregory Jones|  Design Engineer| [email protected]| Male| 75.232.148.42| 
| 4|  Ashley Clark|  Staff Scientist| [email protected]|Female| 18.103.212.244| 
| 5| Dorothy Harvey|  Design Engineer| [email protected]|Female| 180.119.92.27| 
| 6|  Laura Allen|  Tax Accountant|[email protected]|Female| 194.60.142.75| 
| 7| Richard Knight|  Staff Scientist|[email protected]| Male| 5.25.210.201| 
| 8| Gregory Carpenter|Payment Adjustmen...|[email protected]| Male| 92.16.231.195| 
| 9|  Sean Thompson|    Teacher|[email protected]| Male| 4.216.52.79| 
| 10| Frances Stephens|Human Resources M...|[email protected]|Female| 181.11.246.116| 
| 11|  Louis Little| Nurse Practicioner| [email protected]| Male|209.135.198.222| 
| 12|  Frances Perry| Quality Engineer| [email protected]|Female| 173.162.64.208| 
| 13| Russell Hanson| Web Developer II|[email protected]| Male| 57.81.25.130| 
| 14| Michelle Wallace| Technical Writer| [email protected]|Female| 56.17.86.56| 
| 15| Keith Patterson|VP Product Manage...|[email protected]| Male| 252.146.42.238| 
| 16| Kathleen Howard|  Programmer III|[email protected]|Female| 235.163.98.206| 
| 17|Russell Cunningham|  VP Marketing|[email protected]| Male| 72.197.113.247| 
| 18|  Henry Dixon|  Developer III| [email protected]| Male| 63.144.255.192| 
| 19| Martha Jackson| Electrical Engineer|[email protected]|Female|167.209.159.139| 
| 20|  Sean Kelly|Systems Administr...| [email protected]| Male| 6.183.241.141| 
+---+------------------+--------------------+--------------------+------+---------------+ 
only showing top 20 rows 

Dataset<Row> people_non_Engineer = people.except(people_Engineer);は、単にマイルズ・ベイカー@あなたのアップデートを参照するには

import static org.apache.spark.sql.functions.col; 
import org.apache.spark.sql.Dataset; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.SparkSession; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.Metadata; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 

public class starthere { 

    public static void main(String[] args) { 
     // TODO Auto-generated method stub 
     SparkConf conf = new SparkConf().setMaster("local").setAppName("Example App"); 

     JavaSparkContext sc = new JavaSparkContext(conf); 

     // SQLContext sqlContext = new SQLContext(sc); 

     SparkSession spark = SparkSession 
       .builder() 
       .appName("Java Spark SQL Example") 
       .config("spark.some.config.option", "some-value") 
       .config("spark.sql.warehouse.dir", "file:///C:/tmp/") 
       .getOrCreate(); 

     StructType customSchema = new StructType(new StructField[] { 
       new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), 
       new StructField("full_name", DataTypes.StringType, true, Metadata.empty()), 
       new StructField("job_title", DataTypes.StringType, true, Metadata.empty()), 
       new StructField("email", DataTypes.StringType, true, Metadata.empty()), 
       new StructField("gender", DataTypes.StringType, true, Metadata.empty()), 
       new StructField("ip_address", DataTypes.StringType, true, Metadata.empty()) 
      }); 

     Dataset<Row> people = spark.read() 
       .format("com.databricks.spark.csv") 
       .schema(customSchema)  
       .option("header", "true") 
       .load("office_people.csv"); 

     people.show(); 
     Dataset<Row> people_Engineer = people.filter(col("job_title").contains("Engineer")); 
     people_Engineer.show(); 
     Dataset<Row> people_non_Engineer = people.except(people_Engineer); 
     people_non_Engineer.show(); 
     Dataset<Row> people_Sys_Admin = people.filter(col("job_title").contains("Systems Admin")); 
     Dataset<Row> people_non_tech = people_non_Engineer.except(people_Sys_Admin); 
     people_non_tech.show(); 
     Dataset<Row> people_Tech_Writer = people.filter(col("job_title").contains("Technical Writer")); 
     Dataset<Row> people_non_tech_people = people_non_tech.except(people_Tech_Writer); 
     people_non_tech_people.show(); 
     } 
} 
0

RDD抽象化の前提のため、Sparkアプリケーションでは保存順序が非常に難しいです。あなたが取ることができる最善のアプローチは、ここでやったように、スパークAPIを使ってパンダのロジックを翻訳することです。残念ながら、私はあなたがすべての列に同じフィルター基準を適用できるとは思わないので、手動で複数の列の操作にマスクを翻訳しなければなりませんでした。このDatabricks blog postは、PandasからSparkへの移行に役立ちます。

import pandas as pd 
import numpy as np 
np.random.seed(1000) 
df1 = pd.DataFrame(np.random.randn(10, 4), columns=['a', 'b', 'c', 'd']) 
mask = df1.applymap(lambda x: x <-0.7) 
df2 = df1[-mask.any(axis=1)] 

私たちが望む結果は次のとおりです。

  a   b   c   d 
1 -0.300797 0.389475 -0.107437 -0.479983 
5 -0.334835 -0.099482 0.407192 0.919388 
6 0.312118 1.533161 -0.550174 -0.383147 
8 -0.326925 -0.045797 -0.304460 1.923010 

だからスパークでは、我々は、パンダのデータフレームを使用してデータフレームを作成し、正しい結果セットを取得するためにfilterを使用します。

df1_spark = sqlContext.createDataFrame(df1).repartition(10) 
df2_spark = df1_spark.filter(\ 
    (df1_spark.a > -0.7)\ 
& (df1_spark.b > -0.7)\ 
& (df1_spark.c > -0.7)\ 
& (df1_spark.d > -0.7)\ 
) 

正しい結果が得られます(注文が保存されていないことに注意してください)。

df2_spark.show() 
+-------------------+--------------------+--------------------+-------------------+ 
|     a|     b|     c|     d| 
+-------------------+--------------------+--------------------+-------------------+ 
|-0.3348354532115408| -0.0994816980097769| 0.40719210034152314| 0.919387539204449| 
| 0.3121180100663634| 1.5331610653579348| -0.5501738650283003|-0.3831474108842978| 
|-0.3007966727870205| 0.3894745542873072|-0.10743730169089667|-0.4799830753607686| 
| -0.326924675176391|-0.04579718800728687| -0.3044600616968845| 1.923010130400007| 
+-------------------+--------------------+--------------------+-------------------+ 

あなたは絶対にパンダを使用してマスクを作成するためにを必要とする場合は、オリジナルのパンダのデータフレームのインデックスを維持し、インデックス列に基づいて放送変数とフィルタリングを作成することにより、スパークから個々のレコードを削除する必要があります。ここにYMMVの例があります。

インデックスを追加します。

df1['index_col'] = df1.index 

df1 
      a   b   c   d index_col 
0 -0.804458 0.320932 -0.025483 0.644324   0 
1 -0.300797 0.389475 -0.107437 -0.479983   1 
2 0.595036 -0.464668 0.667281 -0.806116   2 
3 -1.196070 -0.405960 -0.182377 0.103193   3 
4 -0.138422 0.705692 1.271795 -0.986747   4 
5 -0.334835 -0.099482 0.407192 0.919388   5 
6 0.312118 1.533161 -0.550174 -0.383147   6 
7 -0.822941 1.600083 -0.069281 0.083209   7 
8 -0.326925 -0.045797 -0.304460 1.923010   8 
9 -0.078659 -0.582066 -1.617982 0.867261   9 

は、Spark放送変数にマスクを変換します

myIdx = sc.broadcast(df2.index.tolist()) 

スパークAPIを使用してデータフレームを作成し、変更します。

df1_spark.rdd.filter(lambda row: row and row['index_col'] not in myIdx.value).collect() 
df2_spark = df1_spark.rdd.filter(lambda row: row and row['index_col'] in myIdx.value).toDF() 

df2_spark.show() 
+-------------------+--------------------+--------------------+-------------------+---------+ 
|     a|     b|     c|     d|index_col| 
+-------------------+--------------------+--------------------+-------------------+---------+ 
|-0.3007966727870205| 0.3894745542873072|-0.10743730169089667|-0.4799830753607686|  1| 
|-0.3348354532115408| -0.0994816980097769| 0.40719210034152314| 0.919387539204449|  5| 
| 0.3121180100663634| 1.5331610653579348| -0.5501738650283003|-0.3831474108842978|  6| 
| -0.326924675176391|-0.04579718800728687| -0.3044600616968845| 1.923010130400007|  8| 
+-------------------+--------------------+--------------------+-------------------+---------+ 
+0

良いのフィルタのelse一部を分割して

は怒鳴る言及したと仮定すると。私はちょうどこのパズルを解決するためのいくつかのより良い例で試しました。ただ見ているだけです。 –

関連する問題