Konwertowanie PySpark DataFrame na CSV

Konwertowanie Pyspark Dataframe Na Csv



Przyjrzyjmy się czterem różnym scenariuszom konwersji PySpark DataFrame do CSV. Bezpośrednio używamy metody write.csv() do konwersji PySpark DataFrame na CSV. Za pomocą funkcji to_csv() konwertujemy ramkę danych PySpark Pandas do formatu CSV. Może to być również możliwe poprzez konwersję do tablicy NumPy.

Temat treści:

Jeśli chcesz dowiedzieć się więcej o PySpark DataFrame i instalacji modułu, przejdź przez to artykuł .







PySpark DataFrame do CSV przez konwersję do Pandas DataFrame

To_csv() to metoda dostępna w module Pandas, która konwertuje Pandas DataFrame na CSV. Najpierw musimy przekonwertować naszą PySpark DataFrame na Pandas DataFrame. Służy do tego metoda toPandas(). Zobaczmy składnię to_csv() wraz z jej parametrami.



Składnia:



pandas_dataframe_obj.to_csv(ścieżka/ „nazwa_pliku.csv” , nagłówek ,indeks,kolumny,tryb...)
  1. Musimy określić nazwę pliku pliku CSV. Jeśli chcesz zapisać pobrany plik CSV w określonej lokalizacji na komputerze, możesz również określić ścieżkę wraz z nazwą pliku.
  2. Kolumny są uwzględniane, jeśli nagłówek jest ustawiony na „Prawda”. Jeśli nie potrzebujesz kolumn, ustaw nagłówek na „Fałsz”.
  3. Indeksy są określone, jeśli indeks jest ustawiony na „Prawda”. Jeśli nie potrzebujesz indeksów, ustaw indeks na „Fałsz”.
  4. Parametr Columns pobiera listę nazw kolumn, w której możemy określić, które konkretnie kolumny mają zostać wyodrębnione do pliku CSV.
  5. Możemy dodać rekordy do CSV za pomocą parametru mode. Dołącz – służy do tego „a”.

Przykład 1: Z parametrami nagłówka i indeksu

Utwórz „skills_df” PySpark DataFrame z 3 wierszami i 4 kolumnami. Przekonwertuj tę ramkę DataFrame na plik CSV, najpierw konwertując ją na ramkę DataFrame Pandas.





zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# dane umiejętności z 3 wierszami i 4 kolumnami

umiejętności =[{ 'ID' : 123 , 'osoba' : 'Miód' , 'umiejętność' : 'obraz' , 'nagroda' : 25000 },

{ 'ID' : 112 , 'osoba' : „Mouni” , 'umiejętność' : 'taniec' , 'nagroda' : 2000 },

{ 'ID' : 153 , 'osoba' : „Tulasi” , 'umiejętność' : 'czytanie' , 'nagroda' : 1200 }

]

# utwórz ramkę danych umiejętności z powyższych danych

skill_df = linuxhint_spark_app.createDataFrame(umiejętności)

skill_df.show()

# Konwertuj skill_df na pandas DataFrame

pandas_skills_df= umiejętności_df.toPandas()

print(pandas_skills_df)

# Konwertuj tę ramkę danych na plik csv z nagłówkiem i indeksem

pandas_skills_df.to_csv( „pandas_skills1.csv” , nagłówek =Prawda, indeks=Prawda)

Wyjście:



Widzimy, że PySpark DataFrame jest konwertowany na Pandas DataFrame. Zobaczmy, czy jest konwertowany do CSV z nazwami kolumn i indeksami:

Przykład 2: Dołącz dane do pliku CSV

Utwórz jeszcze jedną ramkę PySpark DataFrame z 1 rekordem i dołącz ją do pliku CSV utworzonego w ramach naszego pierwszego przykładu. Upewnij się, że musimy ustawić nagłówek na „False” wraz z parametrem mode. W przeciwnym razie nazwy kolumn są również dołączane jako wiersze.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

umiejętności =[{ 'ID' : 90 , 'osoba' : „Bhargaw” , 'umiejętność' : 'czytanie' , 'nagroda' : 12000 }

]

# utwórz ramkę danych umiejętności z powyższych danych

skill_df = linuxhint_spark_app.createDataFrame(umiejętności)

# Konwertuj skill_df na pandas DataFrame

pandas_skills_df= umiejętności_df.toPandas()

# Dodaj tę ramkę danych do pliku pandas_skills1.csv

pandas_skills_df.to_csv( „pandas_skills1.csv” , tryb = 'A' , nagłówek = Fałsz)

Wyjście CSV:

Widzimy, że do pliku CSV został dodany nowy wiersz.

Przykład 3: Z parametrem Columns

Miejmy ten sam DataFrame i przekonwertujmy go na CSV z dwiema kolumnami: „osoba” i „nagroda”.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# dane umiejętności z 3 wierszami i 4 kolumnami

umiejętności =[{ 'ID' : 123 , 'osoba' : 'Miód' , 'umiejętność' : 'obraz' , 'nagroda' : 25000 },

{ 'ID' : 112 , 'osoba' : „Mouni” , 'umiejętność' : 'taniec' , 'nagroda' : 2000 },

{ 'ID' : 153 , 'osoba' : „Tulasi” , 'umiejętność' : 'czytanie' , 'nagroda' : 1200 }

]

# utwórz ramkę danych umiejętności z powyższych danych

skill_df = linuxhint_spark_app.createDataFrame(umiejętności)

# Konwertuj skill_df na pandas DataFrame

pandas_skills_df= umiejętności_df.toPandas()

# Konwertuj tę ramkę danych na plik csv z określonymi kolumnami

pandas_skills_df.to_csv( „pandas_skills2.csv” , kolumny=[ 'osoba' , 'nagroda' ])

Wyjście CSV:

Widzimy, że w pliku CSV istnieją tylko kolumny „osoba” i „nagroda”.

PySpark Pandas DataFrame do pliku CSV przy użyciu metody To_Csv().

To_csv() to metoda dostępna w module Pandas, która konwertuje Pandas DataFrame na CSV. Najpierw musimy przekonwertować naszą PySpark DataFrame na Pandas DataFrame. Służy do tego metoda toPandas(). Zobaczmy składnię to_csv() wraz z jej parametrami:

Składnia:

pyspark_pandas_dataframe_obj.to_csv(ścieżka/ „nazwa_pliku.csv” , nagłówek ,indeks,kolumny,...)
  1. Musimy określić nazwę pliku pliku CSV. Jeśli chcesz zapisać pobrany plik CSV w określonej lokalizacji na komputerze, możesz również określić ścieżkę wraz z nazwą pliku.
  2. Kolumny są uwzględniane, jeśli nagłówek jest ustawiony na „Prawda”. Jeśli nie potrzebujesz kolumn, ustaw nagłówek na „Fałsz”.
  3. Indeksy są określone, jeśli indeks jest ustawiony na „Prawda”. Jeśli nie potrzebujesz indeksów, ustaw indeks na „Fałsz”.
  4. Parametr columns pobiera listę nazw kolumn, w której możemy określić, które konkretnie kolumny zostaną wyodrębnione do pliku CSV.

Przykład 1: Z parametrem Columns

Utwórz ramkę DataFrame PySpark Pandas z 3 kolumnami i przekonwertuj ją na CSV za pomocą to_csv() z kolumnami „osoba” i „nagroda”.

z pyspark importuj pandy

pyspark_pandas_dataframe=pandas.DataFrame({ 'ID' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Miód' , „Mouni” , 'samego siebie' , „radha” ], 'nagroda' :[ 1 , 2 , 3 , 4 ]})

print(pyspark_pandas_dataframe)

# Konwertuj tę ramkę danych na plik csv z określonymi kolumnami

pyspark_pandas_dataframe.to_csv( „pyspark_pandas1” , kolumny=[ 'osoba' , 'nagroda' ])

Wyjście:

Widzimy, że PySpark Pandas DataFrame jest konwertowany na CSV z dwiema partycjami. Każda partycja zawiera 2 rekordy. Ponadto kolumny w pliku CSV zawierają tylko „osoba” i „nagroda”.

Plik partycji 1:

Plik partycji 2:

Przykład 2: Z parametrem nagłówka

Użyj poprzedniej DataFrame i określ parametr nagłówka, ustawiając go na „True”.

z pyspark importuj pandy

pyspark_pandas_dataframe=pandas.DataFrame({ 'ID' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Miód' , „Mouni” , 'samego siebie' , „radha” ], 'nagroda' :[ 1 , 2 , 3 , 4 ]})

# Konwertuj tę ramkę danych na plik csv z nagłówkiem.

pyspark_pandas_dataframe.to_csv( „pyspark_pandas2” , nagłówek = Prawda)

Wyjście CSV:

Widzimy, że PySpark Pandas DataFrame jest konwertowany na CSV z dwiema partycjami. Każda partycja zawiera 2 rekordy z nazwami kolumn.

Plik partycji 1:

Plik partycji 2:

PySpark Pandas DataFrame do CSV przez konwersję do tablicy NumPy

Mamy opcję konwersji PySpark Pandas DataFrame na CSV poprzez konwersję do tablicy Numpy. To_numpy() to metoda dostępna w module PySpark Pandas, która konwertuje ramkę DataFrame PySpark Pandas na tablicę NumPy.

Składnia:

pyspark_pandas_dataframe_obj.to_numpy()

Nie przyjmie żadnych parametrów.

Korzystanie z metody Tofile().

Po konwersji do tablicy NumPy możemy użyć metody tofile() do konwersji NumPy na CSV. Tutaj przechowuje każdy rekord w nowej kolumnie komórki w pliku CSV.

Składnia:

array_obj.to_numpy(nazwa pliku/ścieżka,sep=’’)

Pobiera nazwę pliku lub ścieżkę pliku CSV i separator.

Przykład:

Utwórz PySpark Pandas DataFrame z 3 kolumnami i 4 rekordami i przekonwertuj go na CSV, najpierw konwertując go na tablicę NumPy.

z pyspark importuj pandy

pyspark_pandas_dataframe=pandas.DataFrame({ 'ID' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Miód' , „Mouni” , 'samego siebie' , „radha” ], 'nagroda' :[ 1 , 2 , 3 , 4 ]})

# Konwertuj powyższą ramkę DataFrame na tablicę numpy

przekonwertowane = pyspark_pandas_dataframe.to_numpy()

wydrukować (przekonwertowany)

# Używanie tofile()

przekonwertowany.tofile( „przekonwertowany 1.csv” , wrzesień = ',' )

Wyjście:

[[ 90 'Miód' 1 ]

[ 78 „Mouni” 2 ]

[ 90 'samego siebie' 3 ]

[ 57 „radha” 4 ]]

Widzimy, że PySpark Pandas DataFrame jest konwertowany na tablicę NumPy (12 wartości). Jeśli widzisz dane CSV, każda wartość komórki jest zapisywana w nowej kolumnie.

PySpark DataFrame do pliku CSV przy użyciu metody Write.Csv().

Metoda write.csv() przyjmuje jako parametr nazwę/ścieżkę pliku, w którym musimy zapisać plik CSV.

Składnia:

dataframe_object.coalesce( 1 ).write.csv( 'Nazwa pliku' )

W rzeczywistości plik CSV jest zapisywany jako partycje (więcej niż jedna). Aby się tego pozbyć, łączymy wszystkie podzielone na partycje pliki CSV w jeden. W tym scenariuszu używamy funkcji koalescencji(). Teraz widzimy tylko jeden plik CSV ze wszystkimi wierszami z PySpark DataFrame.

Przykład:

Rozważ PySpark DataFrame z 4 rekordami mającymi 4 kolumny. Zapisz tę ramkę DataFrame w pliku CSV z plikiem o nazwie „market_details”.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# dane rynkowe z 4 wierszami i 4 kolumnami

rynek =[{ 'Środek' : 'mz-001' , „m_imię” : 'ABC' , 'm_miasto' : 'Delhi' , „m_stan” : 'Delhi' },

{ 'Środek' : 'mz-002' , „m_imię” : „XYZ” , 'm_miasto' : 'patna' , „m_stan” : „szczęście” },

{ 'Środek' : 'mz-003' , „m_imię” : „PQR” , 'm_miasto' : „floryda” , „m_stan” : 'jeden' },

{ 'Środek' : 'mz-004' , „m_imię” : 'ABC' , 'm_miasto' : 'Delhi' , „m_stan” : „szczęście” }

]



# utwórz ramkę danych rynku z powyższych danych

market_df = linuxhint_spark_app.createDataFrame(rynek)

# Rzeczywiste dane rynkowe

market_df.show()

# write.csv()

market_df.coalesce( 1 ).write.csv( 'szczegóły_rynku' )

Wyjście:

Sprawdźmy plik:

Otwórz ostatni plik, aby zobaczyć rekordy.

Wniosek

Poznaliśmy cztery różne scenariusze konwersji PySpark DataFrame na CSV z przykładami, biorąc pod uwagę różne parametry. Podczas pracy z PySpark DataFrame masz dwie możliwości konwersji tej ramki DataFrame na CSV: jedną metodą jest użycie metody write() , a drugą metoda to_csv() poprzez konwersję do Pandas DataFrame. Jeśli pracujesz z PySpark Pandas DataFrame, możesz także wykorzystać to_csv() i tofile() poprzez konwersję do tablicy NumPy.