PySparkで重複行を削除する

重複行を削除するためにはdrop_duplicatesdistinctメソッドを使用します。

distinctは全列のみを対象にしているのに対しdrop_duplicatesは引数を指定しなければdistinctと同じ、引数に対象とする列名を指定すれば指定した列のみで重複を判別して削除されます。このため以下コードではdrop_duplicatesのみを使用します。

引数を指定しない場合

+---+---+
| c0| c1|
+---+---+
|  1|  a|
|  1|  a|
|  1|  b|
+---+---+

このデータに対してdrop_duplicatesを適用してみます。

df = spark.createDataFrame([
        (1, "a"),
        (1, "a"),
        (1, "b")
    ],
    ["c0", "c1"])

df.drop_duplicates().show()

データが同じものが削除されます。

nullデータ

+----+----+
|  c0|  c1|
+----+----+
|null|   a|
|null|   a|
|null|   b|
|null|null|
|   1|   b|
+----+----+

nullを入れたデータも確認してみます。

df = spark.createDataFrame([
        (None, "a"),
        (None, "a"),
        (None, "b"),
        (None, None),
        (1, "b")
    ],
    ["c0", "c1"])

df.drop_duplicates().show()

nullに対しても値が入っている時と同じ様に処理されるようです。

+----+----+
|  c0|  c1|
+----+----+
|null|null|
|null|   b|
|null|   a|
|   1|   b|
+----+----+

重複判定の列を指定

+---+---+
| c0| c1|
+---+---+
|  1|  a|
|  1|  b|
|  2|  c|
+---+---+

というデータに対してc0列のみを対象にしてみます。

df = spark.createDataFrame([
        (1, "a"),
        (1, "b"),
        (2, "c")
    ],
    ["c0", "c1"])
df.drop_duplicates(["c0"]).show()

c0列のみを見て重複行がなくなっていることが分かります。

+---+---+
| c0| c1|
+---+---+
|  1|  a|
|  2|  c|
+---+---+

パフォーマンス

explainで確認すると列を指定した時と指定していない時でパフォーマンスに違いが出そうだったので、大きなデータを扱う場合には把握しておいた方が良さそうです。

列を指定しない場合

== Physical Plan ==
*(2) HashAggregate(keys=[c0#6778L, c1#6779], functions=[])
+- Exchange hashpartitioning(c0#6778L, c1#6779, 200), true, [id=#3086]
   +- *(1) HashAggregate(keys=[c0#6778L, c1#6779], functions=[])
      +- *(1) Scan ExistingRDD[c0#6778L,c1#6779]

列を指定した場合

列を指定するとSortが走り、Sortは分散処理出来ないのでパフォーマンスに大きく影響を与えそうです。

== Physical Plan ==
SortAggregate(key=[c0#6769L], functions=[first(c1#6770, false)])
+- *(2) Sort [c0#6769L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(c0#6769L, 200), true, [id=#3068]
      +- SortAggregate(key=[c0#6769L], functions=[partial_first(c1#6770, false)])
         +- *(1) Sort [c0#6769L ASC NULLS FIRST], false, 0
            +- *(1) Scan ExistingRDD[c0#6769L,c1#6770]