programing

두 개의 PySpark 데이터 프레임 연결

closeapi 2023. 8. 14. 22:53
반응형

두 개의 PySpark 데이터 프레임 연결

두 개의 PySpark 데이터 프레임을 그 중 하나에만 있는 일부 열과 연결하려고 합니다.

from pyspark.sql.functions import randn, rand

df_1 = sqlContext.range(0, 10)

+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+

df_2 = sqlContext.range(11, 20)

+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+

df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))

이제 세 번째 데이터 프레임을 생성하려고 합니다.저는 판다 같은 것을 원합니다.concat:

df_1.show()
+---+--------------------+--------------------+
| id|             uniform|              normal|
+---+--------------------+--------------------+
|  0|  0.8122802274304282|  1.2423430583597714|
|  1|  0.8642043127063618|  0.3900018344856156|
|  2|  0.8292577771850476|  1.8077401259195247|
|  3|   0.198558705368724| -0.4270585782850261|
|  4|0.012661361966674889|   0.702634599720141|
|  5|  0.8535692890157796|-0.42355804115129153|
|  6|  0.3723296190171911|  1.3789648582622995|
|  7|  0.9529794127670571| 0.16238718777444605|
|  8|  0.9746632635918108| 0.02448061333761742|
|  9|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

df_2.show()
+---+--------------------+--------------------+
| id|             uniform|            normal_2|
+---+--------------------+--------------------+
| 11|  0.3221262660507942|  1.0269298899109824|
| 12|  0.4030672316912547|   1.285648175568798|
| 13|  0.9690555459609131|-0.22986601831364423|
| 14|0.011913836266515876|  -0.678915153834693|
| 15|  0.9359607054250594|-0.16557488664743034|
| 16| 0.45680471157575453| -0.3885563551710555|
| 17|  0.6411908952297819|  0.9161177183227823|
| 18|  0.5669232696934479|  0.7270125277020573|
| 19|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

#do some concatenation here, how?

df_concat.show()

| id|             uniform|              normal| normal_2   |
+---+--------------------+--------------------+------------+
|  0|  0.8122802274304282|  1.2423430583597714| None       |
|  1|  0.8642043127063618|  0.3900018344856156| None       |
|  2|  0.8292577771850476|  1.8077401259195247| None       |
|  3|   0.198558705368724| -0.4270585782850261| None       |
|  4|0.012661361966674889|   0.702634599720141| None       |
|  5|  0.8535692890157796|-0.42355804115129153| None       |
|  6|  0.3723296190171911|  1.3789648582622995| None       |
|  7|  0.9529794127670571| 0.16238718777444605| None       |
|  8|  0.9746632635918108| 0.02448061333761742| None       |
|  9|   0.513622008243935|  0.7626741803250845| None       |
| 11|  0.3221262660507942|  None              | 0.123      |
| 12|  0.4030672316912547|  None              |0.12323     |
| 13|  0.9690555459609131|  None              |0.123       |
| 14|0.011913836266515876|  None              |0.18923     |
| 15|  0.9359607054250594|  None              |0.99123     |
| 16| 0.45680471157575453|  None              |0.123       |
| 17|  0.6411908952297819|  None              |1.123       |
| 18|  0.5669232696934479|  None              |0.10023     |
| 19|   0.513622008243935|  None              |0.916332123 |
+---+--------------------+--------------------+------------+

그게 가능한가요?

존재하지 않는 열을 생성하고 호출할 수 있습니다.unionAll스파크 1.6 이하):

from pyspark.sql.functions import lit

cols = ['id', 'uniform', 'normal', 'normal_2']    

df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols)
df_2_new = df_2.withColumn("normal", lit(None)).select(cols)

result = df_1_new.union(df_2_new)

# To remove the duplicates:

result = result.dropDuplicates()
df_concat = df_1.union(df_2)

할 수 , 이에는 "" " " " " " " " 를 사용할 수 . " " " " 를 사용할 수 있습니다.withColumn()를 만들다normal_1그리고.normal_2

unionByName은 스파크 2.3.0부터 사용할 수 있는 스파크 내장 옵션입니다.

스파크 버전 3.1.0에서는 누락된 열을 처리하기 위해 기본값이 False로 설정된 allowMissingColumns 옵션이 있습니다.두 데이터 프레임에 동일한 열 집합이 없더라도 이 함수는 작동하여 결과 데이터 프레임에서 결측 열 값을 null로 설정합니다.

df_1.unionByName(df_2, allowMissingColumns=True).show()

+---+--------------------+--------------------+--------------------+
| id|             uniform|              normal|            normal_2|
+---+--------------------+--------------------+--------------------+
|  0|  0.8122802274304282|  1.2423430583597714|                null|
|  1|  0.8642043127063618|  0.3900018344856156|                null|
|  2|  0.8292577771850476|  1.8077401259195247|                null|
|  3|   0.198558705368724| -0.4270585782850261|                null|
|  4|0.012661361966674889|   0.702634599720141|                null|
|  5|  0.8535692890157796|-0.42355804115129153|                null|
|  6|  0.3723296190171911|  1.3789648582622995|                null|
|  7|  0.9529794127670571| 0.16238718777444605|                null|
|  8|  0.9746632635918108| 0.02448061333761742|                null|
|  9|   0.513622008243935|  0.7626741803250845|                null|
| 11|  0.3221262660507942|                null|  1.0269298899109824|
| 12|  0.4030672316912547|                null|   1.285648175568798|
| 13|  0.9690555459609131|                null|-0.22986601831364423|
| 14|0.011913836266515876|                null|  -0.678915153834693|
| 15|  0.9359607054250594|                null|-0.16557488664743034|
| 16| 0.45680471157575453|                null| -0.3885563551710555|
| 17|  0.6411908952297819|                null|  0.9161177183227823|
| 18|  0.5669232696934479|                null|  0.7270125277020573|
| 19|   0.513622008243935|                null|  0.7626741803250845|
+---+--------------------+--------------------+--------------------+

unionByName을 사용하여 다음을 수행할 수 있습니다.

df = df_1.unionByName(df_2)

unionByName은 Spark 2.3.0부터 사용할 수 있습니다.

유지하는 것을 더 일반적으로 위해 지 열 유 면 려 하 으 로 적 일 두 반 보 을 다 ▁to ▁both 려 ▁make 하 ▁indf1그리고.df2:

import pyspark.sql.functions as F

# Keep all columns in either df1 or df2
def outter_union(df1, df2):

    # Add missing columns to df1
    left_df = df1
    for column in set(df2.columns) - set(df1.columns):
        left_df = left_df.withColumn(column, F.lit(None))

    # Add missing columns to df2
    right_df = df2
    for column in set(df1.columns) - set(df2.columns):
        right_df = right_df.withColumn(column, F.lit(None))

    # Make sure columns are ordered the same
    return left_df.union(right_df.select(left_df.columns))

여러 pyspark 데이터 프레임을 하나로 연결하려면 다음과 같이 하십시오.

from functools import reduce

reduce(lambda x,y:x.union(y), [df_1,df_2])

그리고 [df_1, df_2]의 목록을 임의의 길이의 목록으로 바꿀 수 있습니다.

여전히 유용할 경우를 대비해 한 가지 방법이 있습니다. 파이썬 버전 2.7.12의 pyspark 셸에서 실행했으며 스파크 설치는 버전 2.0.1이었습니다.

PS: df_1df_2에 대해 다른 시드를 사용하라는 뜻이었던 것 같고 아래 코드는 그것을 반영합니다.

from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, rand
import pyspark.sql.functions as F

df_1 = sqlContext.range(0, 10)
df_2 = sqlContext.range(11, 20)
df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2"))

def get_uniform(df1_uniform, df2_uniform):
    if df1_uniform:
        return df1_uniform
    if df2_uniform:
        return df2_uniform

u_get_uniform = F.udf(get_uniform, FloatType())

df_3 = df_1.join(df_2, on = "id", how = 'outer').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id"))

다음은 출력 결과입니다.

df_1.show()
+---+-------------------+--------------------+
| id|            uniform|              normal|
+---+-------------------+--------------------+
|  0|0.41371264720975787|  0.5888539012978773|
|  1| 0.7311719281896606|  0.8645537008427937|
|  2| 0.1982919638208397| 0.06157382353970104|
|  3|0.12714181165849525|  0.3623040918178586|
|  4| 0.7604318153406678|-0.49575204523675975|
|  5|0.12030715258495939|  1.0854146699817222|
|  6|0.12131363910425985| -0.5284523629183004|
|  7|0.44292918521277047| -0.4798519469521663|
|  8| 0.8898784253886249| -0.8820294772950535|
|  9|0.03650707717266999| -2.1591956435415334|
+---+-------------------+--------------------+

df_2.show()
+---+-------------------+--------------------+
| id|            uniform|            normal_2|
+---+-------------------+--------------------+
| 11| 0.1982919638208397| 0.06157382353970104|
| 12|0.12714181165849525|  0.3623040918178586|
| 13|0.12030715258495939|  1.0854146699817222|
| 14|0.12131363910425985| -0.5284523629183004|
| 15|0.44292918521277047| -0.4798519469521663|
| 16| 0.8898784253886249| -0.8820294772950535|
| 17| 0.2731073068483362|-0.15116027592854422|
| 18| 0.7784518091224375| -0.3785563841011868|
| 19|0.43776394586845413| 0.47700719174464357|
+---+-------------------+--------------------+

df_3.show()
+---+-----------+--------------------+--------------------+                     
| id|    uniform|              normal|            normal_2|
+---+-----------+--------------------+--------------------+
|  0| 0.41371265|  0.5888539012978773|                null|
|  1|  0.7311719|  0.8645537008427937|                null|
|  2| 0.19829196| 0.06157382353970104|                null|
|  3| 0.12714182|  0.3623040918178586|                null|
|  4|  0.7604318|-0.49575204523675975|                null|
|  5|0.120307155|  1.0854146699817222|                null|
|  6| 0.12131364| -0.5284523629183004|                null|
|  7| 0.44292918| -0.4798519469521663|                null|
|  8| 0.88987845| -0.8820294772950535|                null|
|  9|0.036507078| -2.1591956435415334|                null|
| 11| 0.19829196|                null| 0.06157382353970104|
| 12| 0.12714182|                null|  0.3623040918178586|
| 13|0.120307155|                null|  1.0854146699817222|
| 14| 0.12131364|                null| -0.5284523629183004|
| 15| 0.44292918|                null| -0.4798519469521663|
| 16| 0.88987845|                null| -0.8820294772950535|
| 17| 0.27310732|                null|-0.15116027592854422|
| 18|  0.7784518|                null| -0.3785563841011868|
| 19| 0.43776396|                null| 0.47700719174464357|
+---+-----------+--------------------+--------------------+

위의 답변은 매우 우아합니다.저는 이 함수를 오래 전에 썼는데, 거기서 저는 두 개의 데이터 프레임을 별개의 열로 연결하는 데 어려움을 겪었습니다.

데이터 프레임 sdf1 및 sdf2가 있다고 가정합니다.

from pyspark.sql import functions as F
from pyspark.sql.types import *

def unequal_union_sdf(sdf1, sdf2):
    s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema)
    s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema)

    for i,j in s_df2_schema.difference(s_df1_schema):
        sdf1 = sdf1.withColumn(i,F.lit(None).cast(j))

    for i,j in s_df1_schema.difference(s_df2_schema):
        sdf2 = sdf2.withColumn(i,F.lit(None).cast(j))

    common_schema_colnames = sdf1.columns
    sdk = \
        sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames))
    return sdk 

sdf_concat = unequal_union_sdf(sdf1, sdf2) 

여기에서는 두 데이터 프레임 간에 모든 레코드를 통합하려고 합니다.pyspark에서 panda concat 방법처럼 축 0을 따라 2개의 데이터 프레임을 포합하는 간단한 unionByName 방법을 사용합니다.

이제 열 ID, 균일, 정규의 df1과 열 ID, 균일 및 정규의 df2가 있다고 가정합니다.열 ID, 균일, 정규, 정규_2의 세 번째 df3를 얻으려면 다음과 같이 하십시오.존재하지 않는 열을 2dfs(df1 및 df2) 중 하나에 추가해야 합니다.

# Add df1 columns which are not in df2 in df2 as null columns
for column in df1.columns:
    if column not in df2.columns:
         df2 = df2.withColumn(column, lit(None))

# Add df2 columns which are not in df1 in df1 as null columns
for column in df2.columns:
    if column not in df1.columns:
         df1 = df1.withColumn(column, lit(None))

df3 = df1.unionByName(df2)

Pyspark unionByName 문서

이 정도면 충분할 겁니다...

from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, rand, lit, coalesce, col
import pyspark.sql.functions as F

df_1 = sqlContext.range(0, 6)
df_2 = sqlContext.range(3, 10)
df_1 = df_1.select("id", lit("old").alias("source"))
df_2 = df_2.select("id")

df_1.show()
df_2.show()
df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")\
  .select(\
    [coalesce(df_1.id, df_2.id).alias("id")] +\
    [col("df_1." + c) for c in df_1.columns if c != "id"])\
  .sort("id")
df_3.show()

pyspark에서 판다 추가 기능을 구현하려고 했는데, 열 수만 조건이 달라도 2개 이상의 데이터 프레임을 콘택할 수 있는 사용자 지정 기능을 만든 것은 데이터 프레임의 이름이 같으면 데이터 유형이 동일해야 한다는 것입니다.

나는 두 개의 데이터 프레임을 병합하는 사용자 정의 함수를 작성했습니다.

def append_dfs(df1,df2):
    list1 = df1.columns
    list2 = df2.columns
    for col in list2:
        if(col not in list1):
            df1 = df1.withColumn(col, F.lit(None))
    for col in list1:
        if(col not in list2):
            df2 = df2.withColumn(col, F.lit(None))
    return df1.unionByName(df2)

용도:

  1. 두 개의 데이터 프레임을 연결합니다.

    final_df = append_dfs(df1,df2)

    1. 2개 이상의 데이터 프레임 연결

    final_df = append_dfs(dfs(df1,df2),df3)

예:

df1:

enter image description here

df2:

enter image description here

결과=dfs_dfs(df1,df2)

결과:

+------+---+-------+---------+
|  Name|Age|Married|  Address|
+------+---+-------+---------+
|   Jai| 27|   true|     null|
|Princi| 24|  false|     null|
|Gaurav| 22|  false|     null|
|  Anuj| 22|   true|     null|
|   Jai| 27|   null|    Delhi|
|Princi| 24|   null|   Kanpur|
|Gaurav| 22|   null|Allahabad|
|  Anuj| 22|   null|     null|
+------+---+-------+---------+

이것이 유용하기를 바랍니다.

저는 다음과 같은 방법으로 이 문제를 해결할 것입니다.

from pyspark.sql import SparkSession
df_1.createOrReplaceTempView("tab_1")
df_2.createOrReplaceTempView("tab_2")
df_concat=spark.sql("select tab_1.id,tab_1.uniform,tab_1.normal,tab_2.normal_2  from tab_1 tab_1 left join tab_2 tab_2 on tab_1.uniform=tab_2.uniform\
                union\
                select tab_2.id,tab_2.uniform,tab_1.normal,tab_2.normal_2  from tab_2 tab_2 left join tab_1 tab_1 on tab_1.uniform=tab_2.uniform")
df_concat.show()
import pyspark.pandas as ps
ps.concat([df_1.pandas_api('id'),df_2.pandas_api('id')])

아웃:

+-------------------+--------------------+--------------------+
|            uniform|              normal|            normal_2|
+-------------------+--------------------+--------------------+
|0.03422639313807285| 0.45800664187768786|                null|
| 0.3654625958161396| 0.16420866768809156|                null|
| 0.4175019040792016| -1.0451987154313813|                null|
|0.16452185994603707|  0.8306551181802446|                null|
|0.18141810315190554|  1.3792681955381285|                null|
| 0.9697474945375325|  0.5991404703866096|                null|
|0.34084319330900115| -1.4298752463301871|                null|
| 0.4725977369833597|-0.19668314148825305|                null|
| 0.5996723933366402| -0.6950946460326453|                null|
| 0.6396141227834357|-0.07706276399864868|                null|
|0.03422639313807285|                null| 0.45800664187768786|
| 0.3654625958161396|                null| 0.16420866768809156|
| 0.4175019040792016|                null| -1.0451987154313813|
|0.18141810315190554|                null|  1.3792681955381285|
|0.49595620559530806|                null|-0.02511484514022299|
| 0.9697474945375325|                null|  0.5991404703866096|
| 0.4725977369833597|                null|-0.19668314148825305|
| 0.5996723933366402|                null| -0.6950946460326453|
| 0.6396141227834357|                null|-0.07706276399864868|
+-------------------+--------------------+--------------------+

두 개 이상의 데이터 프레임을 연결할 수 있습니다.판다 데이터 프레임 변환을 사용하는 문제를 발견했습니다.

연결하려는 스파크 데이터 프레임이 3개 있다고 가정합니다.

코드는 다음과 같습니다.

list_dfs = []
list_dfs_ = []

df = spark.read.json('path_to_your_jsonfile.json',multiLine = True)
df2 = spark.read.json('path_to_your_jsonfile2.json',multiLine = True)
df3 = spark.read.json('path_to_your_jsonfile3.json',multiLine = True)

list_dfs.extend([df,df2,df3])

for df in list_dfs : 

    df = df.select([column for column in df.columns]).toPandas()
    list_dfs_.append(df)

list_dfs.clear()

df_ = sqlContext.createDataFrame(pd.concat(list_dfs_))

언급URL : https://stackoverflow.com/questions/37332434/concatenate-two-pyspark-dataframes

반응형