PySpark Pandas_Udf()

Pyspark Pandas Udf



Transformacja PySpark DataFrame jest możliwa przy użyciu funkcji pandas_udf(). Jest to funkcja zdefiniowana przez użytkownika, która jest stosowana w PySpark DataFrame ze strzałką. Możemy wykonać operacje wektoryzowane za pomocą pandas_udf(). Można to zaimplementować, przekazując tę ​​funkcję jako dekorator. Zagłębmy się w ten przewodnik, aby poznać składnię, parametry i różne przykłady.

Temat treści:

Jeśli chcesz wiedzieć o instalacji PySpark DataFrame i modułu, przejdź przez to artykuł .







Pyspark.sql.functions.pandas_udf()

Funkcja pandas_udf () jest dostępna w module sql.functions w PySpark, który można zaimportować za pomocą słowa kluczowego „from”. Służy do wykonywania operacji wektorowych na naszej PySpark DataFrame. Ta funkcja jest realizowana jak dekorator poprzez przekazanie trzech parametrów. Następnie możemy utworzyć funkcję zdefiniowaną przez użytkownika, która zwraca dane w formacie wektorowym (tak jak używamy do tego serii/NumPy) za pomocą strzałki. W ramach tej funkcji jesteśmy w stanie zwrócić wynik.



Struktura i składnia:



Najpierw spójrzmy na strukturę i składnię tej funkcji:

@pandas_udf (typ danych)
def nazwa_funkcji (operacja) -> format_konwersji:
oświadczenie zwrotne

Tutaj nazwa_funkcji to nazwa naszej zdefiniowanej funkcji. Typ danych określa typ danych zwracany przez tę funkcję. Wynik możemy zwrócić za pomocą słowa kluczowego „return”. Wszystkie operacje są wykonywane wewnątrz funkcji z przypisaniem strzałki.





Pandas_udf (funkcja i typ zwrotu)

  1. Pierwszym parametrem jest funkcja zdefiniowana przez użytkownika, która jest do niego przekazywana.
  2. Drugi parametr służy do określenia typu danych zwracanych przez funkcję.

Dane:

W całym tym przewodniku do demonstracji używamy tylko jednej ramki danych PySpark. Wszystkie funkcje zdefiniowane przez użytkownika są stosowane w tej ramce danych PySpark. Upewnij się, że najpierw utworzysz tę ramkę DataFrame w swoim środowisku po zainstalowaniu PySpark.



zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

z pyspark.sql.functions zaimportuj pandas_udf

z importu pyspark.sql.types *

importuj pandy jako panda

# szczegóły warzyw

warzywo =[{ 'typ' : 'warzywo' , 'nazwa' : 'pomidor' , „lokalizuj_kraj” : 'USA' , 'ilość' : 800 },

{ 'typ' : 'owoc' , 'nazwa' : 'banan' , „lokalizuj_kraj” : 'CHINY' , 'ilość' : 20 },

{ 'typ' : 'warzywo' , 'nazwa' : 'pomidor' , „lokalizuj_kraj” : 'USA' , 'ilość' : 800 },

{ 'typ' : 'warzywo' , 'nazwa' : 'Mango' , „lokalizuj_kraj” : 'JAPONIA' , 'ilość' : 0 },

{ 'typ' : 'owoc' , 'nazwa' : 'cytrynowy' , „lokalizuj_kraj” : 'INDIE' , 'ilość' : 1700 },

{ 'typ' : 'warzywo' , 'nazwa' : 'pomidor' , „lokalizuj_kraj” : 'USA' , 'ilość' : 1200 },

{ 'typ' : 'warzywo' , 'nazwa' : 'Mango' , „lokalizuj_kraj” : 'JAPONIA' , 'ilość' : 0 },

{ 'typ' : 'owoc' , 'nazwa' : 'cytrynowy' , „lokalizuj_kraj” : 'INDIE' , 'ilość' : 0 }

]

# utwórz ramkę danych rynku z powyższych danych

market_df = linuxhint_spark_app.createDataFrame(warzywa)

market_df.show()

Wyjście:

Tutaj tworzymy tę ramkę danych z 4 kolumnami i 8 wierszami. Teraz używamy pandas_udf() do tworzenia funkcji zdefiniowanych przez użytkownika i zastosowania ich do tych kolumn.

Pandas_udf() z różnymi typami danych

W tym scenariuszu tworzymy funkcje zdefiniowane przez użytkownika za pomocą pandas_udf() i stosujemy je w kolumnach oraz wyświetlamy wyniki za pomocą metody select(). W każdym przypadku używamy pandas.Series podczas wykonywania operacji wektorowych. To traktuje wartości kolumn jako jednowymiarową tablicę, a operacja jest wykonywana na kolumnie. W samym dekoratorze określamy typ zwracanej funkcji.

Przykład 1: Pandas_udf() z typem String

Tutaj tworzymy dwie funkcje zdefiniowane przez użytkownika z typem zwracanym string, aby przekonwertować wartości kolumny typu string na wielkie i małe litery. Na koniec stosujemy te funkcje w kolumnach „type” i „locate_country”.

# Konwertuj kolumnę typu na wielkie litery za pomocą pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

return i.str.upper()

# Konwertuj kolumnę kraj_lokalizacji na małe litery za pomocą pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Wyświetl kolumny za pomocą select()

market_df.select( 'typ' ,type_upper_case( 'typ' ), „zlokalizuj_kraj” ,
country_lower_case( „zlokalizuj_kraj” )).pokazywać()

Wyjście:

Wyjaśnienie:

Funkcja StringType() jest dostępna w module pyspark.sql.types. Zaimportowaliśmy już ten moduł podczas tworzenia PySpark DataFrame.

  1. Po pierwsze, UDF (funkcja zdefiniowana przez użytkownika) zwraca łańcuchy wielkimi literami za pomocą funkcji str.upper(). Funkcja str.upper() jest dostępna w strukturze danych serii (ponieważ konwertujemy na serie ze strzałką wewnątrz funkcji), która konwertuje podany ciąg znaków na wielkie litery. Na koniec ta funkcja jest stosowana do kolumny „type”, która jest określona w metodzie select(). Wcześniej wszystkie ciągi w kolumnie typu były pisane małymi literami. Teraz są one zamieniane na wielkie litery.
  2. Po drugie, UDF zwraca łańcuchy wielkimi literami za pomocą funkcji str.lower(). Funkcja str.lower() jest dostępna w strukturze danych serii, która konwertuje podany ciąg znaków na małe litery. Na koniec ta funkcja jest stosowana do kolumny „type”, która jest określona w metodzie select(). Wcześniej wszystkie ciągi w kolumnie typu były pisane wielkimi literami. Teraz są one zamieniane na małe litery.

Przykład 2: Pandas_udf() z typem Integer

Stwórzmy funkcję UDF, która konwertuje kolumnę liczb całkowitych PySpark DataFrame na serię Pandas i dodajemy 100 do każdej wartości. Przekaż kolumnę „ilość” do tej funkcji wewnątrz metody select().

# Dodaj 100

@pandas_udf(typ_liczby_całkowitej())

def add_100(i: panda.Series) -> panda.Series:

zwróć i+ 100

# Przekaż kolumnę ilości do powyższej funkcji i wyświetl.

market_df.select( 'ilość' ,dodaj_100( 'ilość' )).pokazywać()

Wyjście:

Wyjaśnienie:

Wewnątrz funkcji UDF iterujemy wszystkie wartości i konwertujemy je na serie. Następnie dodajemy 100 do każdej wartości w Serii. Na koniec przekazujemy do tej funkcji kolumnę „ilość” i widzimy, że do wszystkich wartości dodaje się 100.

Pandas_udf() z różnymi typami danych przy użyciu Groupby() i Agg()

Przyjrzyjmy się przykładom przekazywania UDF do zagregowanych kolumn. Tutaj wartości kolumn są najpierw grupowane za pomocą funkcji groupby(), a agregacja odbywa się za pomocą funkcji agg(). Przekazujemy nasz UDF wewnątrz tej funkcji agregującej.

Składnia:

pyspark_dataframe_object.groupby( 'grupowanie_kolumny' ).agg(UDF
(pyspark_dataframe_object[ 'kolumna' ]))

Tutaj wartości w kolumnie grupowania są grupowane jako pierwsze. Następnie agregacja jest wykonywana na każdych zgrupowanych danych w odniesieniu do naszego UDF.

Przykład 1: Pandas_udf() z Aggregate Mean()

Tutaj tworzymy funkcję zdefiniowaną przez użytkownika z zwracanym typem float. Wewnątrz funkcji obliczamy średnią za pomocą funkcji mean(). Ten UDF jest przekazywany do kolumny „ilość”, aby uzyskać średnią ilość dla każdego typu.

# zwraca średnią/średnią

@pandas_udf( 'platforma' )

def średnia_funkcja(i: panda.Series) -> float:

zwróć i.mean()

# Przekaż kolumnę ilości do funkcji, grupując kolumnę typu.

market_df.groupby( 'typ' ).agg(średnia_funkcja(market_df[ 'ilość' ])).pokazywać()

Wyjście:

Grupujemy na podstawie elementów w kolumnie „typ”. Tworzą się dwie grupy – „owocowa” i „warzywna”. Dla każdej grupy obliczana i zwracana jest średnia.

Przykład 2: Pandas_udf() z Aggregate Max() i Min()

Tutaj tworzymy dwie funkcje zdefiniowane przez użytkownika z zwracanym typem integer (int). Pierwszy UDF zwraca wartość minimalną, a drugi UDF zwraca wartość maksymalną.

# pandas_udf, które zwracają wartość minimalną

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

powrót i.min()

# pandas_udf, które zwracają maksymalną wartość

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

zwróć i.max()

# Przekaż kolumnę ilości do min_ pandas_udf, grupując kraj_lokalizacji.

market_df.groupby( „zlokalizuj_kraj” ).agg(min_(market_df[ 'ilość' ])).pokazywać()

# Przekaż kolumnę ilości do max_ pandas_udf, grupując kraj_lokalizacji.

market_df.groupby( „zlokalizuj_kraj” ).agg(max_(market_df[ 'ilość' ])).pokazywać()

Wyjście:

Aby zwrócić wartości minimalne i maksymalne, używamy funkcji min() i max() w typie zwracanym funkcji UDF. Teraz grupujemy dane w kolumnie „locate_country”. Powstają cztery grupy („CHINY”, „INDIE”, „JAPONIA”, „USA”). Dla każdej grupy zwracamy maksymalną ilość. Podobnie zwracamy minimalną ilość.

Wniosek

Zasadniczo pandas_udf () służy do wykonywania operacji wektorowych na naszej PySpark DataFrame. Widzieliśmy, jak utworzyć funkcję pandas_udf() i zastosować ją do ramki danych PySpark. Aby lepiej zrozumieć, omówiliśmy różne przykłady, biorąc pod uwagę wszystkie typy danych (łańcuchowe, zmiennoprzecinkowe i całkowite). Możliwe jest użycie pandas_udf() z groupby() poprzez funkcję agg().