私はユーザ名のデータフレームをどのスレッドに投稿したのか、それらの投稿のタイムスタンプを持っています。スレッドの最初のユーザーは誰だったのか、何時だったのかを把握するために何をしようとしているのですか?最初の投稿がスレッドでグループを行い、次にタイムスタンプで分を行うことであることがわかります。しかしそれはユーザー名を削除します。どのようにグループを使用してユーザー名を保持するのですか?グループを使用しているときにSparkで未使用の列を保持していますか?
3
A
答えて
1
これは、HiveContextとHive named_struct関数を使用して1つのgroupByで実行できます。トリックはminです。左から順に列を評価し、現在の列が等しい場合は次の行に移動するだけで、構造体でminが機能します。したがって、この場合、タイムスタンプの列を比較するだけですが、min関数が結果を吐き出した後にアクセスする名前を含む構造体を作成します。
data = [
('user', 'thread', 'ts'),
('ryan', 1, 1234),
('bob', 1, 2345),
('bob', 2, 1234),
('john', 2, 2223)
]
header = data[0]
rdd = sc.parallelize(data[1:])
df = sqlContext.createDataFrame(rdd, header)
df.registerTempTable('table')
sql = """
SELECT thread, min(named_struct('ts', ts, 'user', user)) as earliest
FROM table
GROUP BY thread
"""
grouped = sqlContext.sql(sql)
final = grouped.selectExpr('thread', 'earliest.user as user', 'earliest.ts as timestamp')
1
これは、row_number()ウィンドウ関数を使用して行うことができます。これは、他のすべての列をそのまま維持します。 withColumnを使用して、 "thread_user_order"のような新しい列を作成し、その値はrow_number()PARTITION BYスレッドORDER BY tsである必要があります。 次に、"thread_user_order" == 1をフィルタリングします。あなたが順序でフィールドをソートして、一度に2つの列を維持する構造体のソート順を利用することができます
df.withColumn("thread_user_order", row_number().over(Window.partitionBy(col("thread")).orderBy(col("ts")))).where(col("thread_user_order").equalTo(1))
1
:ここ
は、いくつかの擬似コードです。その後、min
と呼ぶと、最初にタイムスタンプでソートされ、次に2回ネストされたときにユーザー名がソートされます。
user_time = functions.struct(df.timestamp, df.username).alias('user_time')
min_thread_users_df = df.select(df.thread, user_time).groupby('thread').agg(
functions.min('user_time').alias('user_time')).select(
'thread', 'user_time.username', 'user_time.timestamp')
関連する問題
- 1. spark udfを使用して既存の列から新しい列を作成し、グループを使用します。
- 2. 名前を保持している列名の配列を使用してSparkデータフレームを集約します。
- 3. ナビゲーションコントローラーを使用しているときにヘッダービューを保持
- 4. shlex.splitを使用しているときに引用符を保持する
- 5. Spark:ストリーミングクエリでイベントタイムスライディングウィンドウを使用しているときの問題
- 6. Sparkストリーミングを使用してrddをHbaseに保存すると、
- 7. ファイヤーバード1でグループを使用しているときのエラー
- 8. グループ化でカスタムマップサプライヤを使用しているときのClassCastException
- 9. IFRAMEを使用しているときに$ _SESSIONを保持する
- 10. PHPのGDlib imagecopyresampledを使用しているときに、PNGイメージの透明度を保持できますか?
- 11. Djangoグループをフレンドリストとして使用できますか?
- 12. 複数のレイヤーを使用しているときに未使用のCALayerメモリを解放できません
- 13. IndexedDBを使用していくつのオブジェクト(列)を使用できますか?
- 14. 共用体を使用しているときにグループ化する
- 15. clj-kafkaを使用して保持時間を把握していますか?
- 16. sparkでunionを効率的に使用しています
- 17. Pandas.read_excelでコンバータを使用しているときに列インデックスを使用できますか
- 18. Sparkを使用してIndexedRowMatrixを作成するときに重複する列
- 19. dist_pkgdata_DATAを使用しているときにデータファイルのプレフィックスを保持しますか?
- 20. .LINQを使用してLINQを使用した保持の保持
- 21. Spark UDAF - 入力タイプとしてジェネリックを使用していますか?
- 22. タイプアリアを使用して配列のテンプレートを保持する
- 23. spark-redshift - Spark 2.1.0を使用してエラーを保存しました。
- 24. サービスとしてsparkを使用することはできますか?
- 25. グループ内のシフトを使用して新しい列を作成
- 26. コアデータエンティティを列挙型として使用していますか?
- 27. eachとimage.onloadを使用して配列の順序を保持します
- 28. __blockを使用してもデータを保持できません
- 29. グーグルビジュアライゼーションAPIでグループ化を使用して書式を保持する
- 30. routeconfigを使用しているときに、 'annotations'のプロパティが未定義です。
ユーザー名別 – eliasah