重複行を削除するためにはdrop_duplicates
かdistinct
メソッドを使用します。
distinct
は全列のみを対象にしているのに対しdrop_duplicates
は引数を指定しなければdistinct
と同じ、引数に対象とする列名を指定すれば指定した列のみで重複を判別して削除されます。このため以下コードではdrop_duplicates
のみを使用します。
引数を指定しない場合
+---+---+
| c0| c1|
+---+---+
| 1| a|
| 1| a|
| 1| b|
+---+---+
このデータに対してdrop_duplicates
を適用してみます。
df = spark.createDataFrame([
(1, "a"),
(1, "a"),
(1, "b")
],
["c0", "c1"])
df.drop_duplicates().show()
データが同じものが削除されます。
nullデータ
+----+----+
| c0| c1|
+----+----+
|null| a|
|null| a|
|null| b|
|null|null|
| 1| b|
+----+----+
nullを入れたデータも確認してみます。
df = spark.createDataFrame([
(None, "a"),
(None, "a"),
(None, "b"),
(None, None),
(1, "b")
],
["c0", "c1"])
df.drop_duplicates().show()
nullに対しても値が入っている時と同じ様に処理されるようです。
+----+----+
| c0| c1|
+----+----+
|null|null|
|null| b|
|null| a|
| 1| b|
+----+----+
重複判定の列を指定
+---+---+
| c0| c1|
+---+---+
| 1| a|
| 1| b|
| 2| c|
+---+---+
というデータに対してc0
列のみを対象にしてみます。
df = spark.createDataFrame([
(1, "a"),
(1, "b"),
(2, "c")
],
["c0", "c1"])
df.drop_duplicates(["c0"]).show()
c0
列のみを見て重複行がなくなっていることが分かります。
+---+---+
| c0| c1|
+---+---+
| 1| a|
| 2| c|
+---+---+
パフォーマンス
explainで確認すると列を指定した時と指定していない時でパフォーマンスに違いが出そうだったので、大きなデータを扱う場合には把握しておいた方が良さそうです。
列を指定しない場合
== Physical Plan ==
*(2) HashAggregate(keys=[c0#6778L, c1#6779], functions=[])
+- Exchange hashpartitioning(c0#6778L, c1#6779, 200), true, [id=#3086]
+- *(1) HashAggregate(keys=[c0#6778L, c1#6779], functions=[])
+- *(1) Scan ExistingRDD[c0#6778L,c1#6779]
列を指定した場合
列を指定するとSortが走り、Sortは分散処理出来ないのでパフォーマンスに大きく影響を与えそうです。
== Physical Plan ==
SortAggregate(key=[c0#6769L], functions=[first(c1#6770, false)])
+- *(2) Sort [c0#6769L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(c0#6769L, 200), true, [id=#3068]
+- SortAggregate(key=[c0#6769L], functions=[partial_first(c1#6770, false)])
+- *(1) Sort [c0#6769L ASC NULLS FIRST], false, 0
+- *(1) Scan ExistingRDD[c0#6769L,c1#6770]