Konwertowanie PySpark DataFrame na JSON

Konwertowanie Pyspark Dataframe Na Json



Przesyłanie ustrukturyzowanych danych za pomocą JSON jest możliwe i zużywa mało pamięci. W porównaniu do PySpark RDD lub PySpark DataFrame, JSON zużywa mało pamięci i serializacji, co jest możliwe dzięki JSON. Jesteśmy w stanie przekonwertować PySpark DataFrame na JSON za pomocą metody pyspark.sql.DataFrameWriter.json(). Oprócz tego istnieją dwa inne sposoby konwersji DataFrame do JSON.

Temat treści:

Rozważmy prosty PySpark DataFrame we wszystkich przykładach i przekonwertujmy go na JSON przy użyciu wspomnianych funkcji.







Wymagany moduł:

Zainstaluj bibliotekę PySpark w swoim środowisku, jeśli nie jest jeszcze zainstalowana. Możesz skorzystać z następującego polecenia, aby go zainstalować:



pip zainstaluj pyspark

PySpark DataFrame do JSON przy użyciu To_json() z ToPandas()

Metoda to_json() jest dostępna w module Pandas, który konwertuje Pandas DataFrame na JSON. Możemy skorzystać z tej metody, jeśli przekonwertujemy naszą PySpark DataFrame na Pandas DataFrame. W celu konwersji PySpark DataFrame na Pandas DataFrame używana jest metoda toPandas(). Zobaczmy składnię metody to_json() wraz z jej parametrami.



Składnia:





dataframe_object.toPandas().to_json(orientacja,indeks,...)
  1. Orient służy do wyświetlania przekonwertowanego JSON jako żądanego formatu. Zajmuje „rekordy”, „tabelę”, „wartości”, „kolumny”, „indeks”, „podział”.
  2. Indeks służy do dołączania/usuwania indeksu z przekonwertowanego ciągu JSON. Jeśli jest ustawiony na „Prawda”, wyświetlane są indeksy. W przeciwnym razie indeksy nie będą wyświetlane, jeśli orientacja jest „split” lub „table”.

Przykład 1: Orientuj jako „Rekordy”

Utwórz „skills_df” PySpark DataFrame z 3 wierszami i 4 kolumnami. Przekonwertuj tę ramkę DataFrame na format JSON, określając parametr orient jako „rekordy”.

zaimportuj pyspark

importować pandy

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# dane umiejętności z 3 wierszami i 4 kolumnami

umiejętności =[{ 'ID' : 123 , 'osoba' : 'Miód' , 'umiejętność' : 'obraz' , 'nagroda' : 25000 },

{ 'ID' : 112 , 'osoba' : „Mouni” , 'umiejętność' : 'taniec' , 'nagroda' : 2000 },

{ 'ID' : 153 , 'osoba' : „Tulasi” , 'umiejętność' : 'czytanie' , 'nagroda' : 1200 }

]

# utwórz ramkę danych umiejętności z powyższych danych

skill_df = linuxhint_spark_app.createDataFrame(umiejętności)

# Rzeczywiste dane dotyczące umiejętności

skill_df.show()

# Konwertuj na JSON za pomocą to_json() z orientem jako „rekordami”

json_skills_data = skill_df.toPandas().to_json(orient= 'dokumentacja' )

print(json_skills_data)

Wyjście:



+---+------+-----+--------+

| identyfikator|osoba|nagroda| umiejętność|

+---+------+-----+--------+

| 123 | Kochanie| 25000 |malowanie|

| 112 | Mouni| 2000 | tańczyć|

| 153 |Tulasi| 1200 | czytanie|

+---+------+-----+--------+

[{ 'ID' : 123 , 'osoba' : 'Miód' , 'nagroda' : 25000 , 'umiejętność' : 'obraz' },{ 'ID' : 112 , 'osoba' : „Mouni” , 'nagroda' : 2000 , 'umiejętność' : 'taniec' },{ 'ID' : 153 , 'osoba' : „Tulasi” , 'nagroda' : 1200 , 'umiejętność' : 'czytanie' }]

Widzimy, że PySpark DataFrame jest konwertowany na tablicę JSON ze słownikiem wartości. Tutaj klucze reprezentują nazwę kolumny, a wartość reprezentuje wartość wiersza/komórki w PySpark DataFrame.

Przykład 2: Orientuj jako „Podziel”

Format JSON zwracany przez „split” orient zawiera nazwy kolumn, które zawierają listę kolumn, listę indeksów i listę danych. Poniżej przedstawiono format „podzielonego” orientu.

# Konwertuj na JSON za pomocą to_json() z orientem jako „split”

json_skills_data = skill_df.toPandas().to_json(orient= 'podział' )

print(json_skills_data)

Wyjście:

{ 'kolumny' :[ 'ID' , 'osoba' , 'nagroda' , 'umiejętność' ], 'indeks' :[ 0 , 1 , 2 ], 'dane' :[[ 123 , 'Miód' , 25000 , 'obraz' ],[ 112 , „Mouni” , 2000 , 'taniec' ],[ 153 , „Tulasi” , 1200 , 'czytanie' ]]}

Przykład 3: Orientuj jako „Indeks”

Tutaj każdy wiersz z PySpark DataFrame jest wycofywany w postaci słownika z kluczem jako nazwą kolumny. Dla każdego słownika pozycja indeksu jest określona jako klucz.

# Konwertuj na JSON za pomocą to_json() z orientem jako „indeks”

json_skills_data = skill_df.toPandas().to_json(orient= 'indeks' )

print(json_skills_data)

Wyjście:

{ „0” :{ 'ID' : 123 , 'osoba' : 'Miód' , 'nagroda' : 25000 , 'umiejętność' : 'obraz' }, „1” :{ 'ID' : 112 , 'osoba' : „Mouni” , 'nagroda' : 2000 , 'umiejętność' : 'taniec' }, „2” :{ 'ID' : 153 , 'osoba' : „Tulasi” , 'nagroda' : 1200 , 'umiejętność' : 'czytanie' }}

Przykład 4: Orientuj jako „Kolumny”

Kolumny są kluczem dla każdego rekordu. Każda kolumna zawiera słownik, który przyjmuje wartości kolumn wraz z numerami indeksów.

# Konwertuj na JSON za pomocą to_json() z orientem jako „kolumny”

json_skills_data = skill_df.toPandas().to_json(orient= „kolumny” )

print(json_skills_data)

Wyjście:

{ 'ID' :{ „0” : 123 , „1” : 112 , „2” : 153 }, 'osoba' :{ „0” : 'Miód' , „1” : „Mouni” , „2” : „Tulasi” }, 'nagroda' :{ „0” : 25000 , „1” : 2000 , „2” : 1200 }, 'umiejętność' :{ „0” : 'obraz' , „1” : 'taniec' , „2” : 'czytanie' }}

Przykład 5: Orientuj jako „Wartości”

Jeśli potrzebujesz tylko wartości w JSON, możesz wybrać orientację „wartości”. Wyświetla każdy wiersz na liście. Wreszcie wszystkie listy są przechowywane na liście. Ten kod JSON jest typu listy zagnieżdżonej.

# Konwertuj na JSON za pomocą to_json() z orientem jako „wartościami”

json_skills_data = skill_df.toPandas().to_json(orient= „wartości” )

print(json_skills_data)

Wyjście:

[[ 123 , 'Miód' , 25000 , 'obraz' ],[ 112 , „Mouni” , 2000 , 'taniec' ],[ 153 , „Tulasi” , 1200 , 'czytanie' ]]

Przykład 6: Orientacja jako „stół”

Orient „tabelowy” zwraca JSON, który zawiera schemat z nazwami pól wraz z typami danych kolumn, indeksem jako kluczem podstawowym i wersją Pandas. Nazwy kolumn z wartościami są wyświetlane jako „dane”.

# Konwertuj na JSON za pomocą to_json() z orientem jako „tabelą”

json_skills_data = skill_df.toPandas().to_json(orient= 'tabela' )

print(json_skills_data)

Wyjście:

{ 'schemat' :{ „pola” :[{ 'nazwa' : 'indeks' , 'typ' : 'liczba całkowita' },{ 'nazwa' : 'ID' , 'typ' : 'liczba całkowita' },{ 'nazwa' : 'osoba' , 'typ' : 'strunowy' },{ 'nazwa' : 'nagroda' , 'typ' : 'liczba całkowita' },{ 'nazwa' : 'umiejętność' , 'typ' : 'strunowy' }], 'główny klucz' :[ 'indeks' ], „wersja_pandy” : „1.4.0” }, 'dane' :[{ 'indeks' : 0 , 'ID' : 123 , 'osoba' : 'Miód' , 'nagroda' : 25000 , 'umiejętność' : 'obraz' },{ 'indeks' : 1 , 'ID' : 112 , 'osoba' : „Mouni” , 'nagroda' : 2000 , 'umiejętność' : 'taniec' },{ 'indeks' : 2 , 'ID' : 153 , 'osoba' : „Tulasi” , 'nagroda' : 1200 , 'umiejętność' : 'czytanie' }]}

Przykład 7: Z parametrem indeksu

Najpierw przekazujemy parametr index, ustawiając go na „True”. Zobaczysz, że dla każdej wartości kolumny pozycja indeksu jest zwracana jako klucz w słowniku.

W drugim wyjściu zwracane są tylko nazwy kolumn („kolumny”) i rekordy („dane”) bez pozycji indeksu, ponieważ indeks jest ustawiony na „Fałsz”.

# Konwertuj na JSON za pomocą to_json() z index=True

json_skills_data = skill_df.toPandas().to_json(index=True)

print(json_skills_data, ' \N ' )

# Konwertuj na JSON za pomocą to_json() z index=False

json_skills_data= umiejętności_df.toPandas().to_json(index=Fałsz,orient= 'podział' )

print(json_skills_data)

Wyjście:

{ 'ID' :{ „0” : 123 , „1” : 112 , „2” : 153 }, 'osoba' :{ „0” : 'Miód' , „1” : „Mouni” , „2” : „Tulasi” }, 'nagroda' :{ „0” : 25000 , „1” : 2000 , „2” : 1200 }, 'umiejętność' :{ „0” : 'obraz' , „1” : 'taniec' , „2” : 'czytanie' }}

{ 'kolumny' :[ 'ID' , 'osoba' , 'nagroda' , 'umiejętność' ], 'dane' :[[ 123 , 'Miód' , 25000 , 'obraz' ],[ 112 , „Mouni” , 2000 , 'taniec' ],[ 153 , „Tulasi” , 1200 , 'czytanie' ]]

PySpark DataFrame do JSON przy użyciu ToJSON()

Metoda toJSON() służy do konwersji ramki danych PySpark na obiekt JSON. Zasadniczo zwraca ciąg JSON otoczony listą. The [„{kolumna:wartość,…}”,…. ] to format zwracany przez tę funkcję. Tutaj każdy wiersz z PySpark DataFrame jest zwracany jako słownik z nazwą kolumny jako kluczem.

Składnia:

dataframe_object.toJSON()

Możliwe jest przekazanie parametrów, takich jak indeks, etykiety kolumn i typ danych.

Przykład:

Utwórz „skills_df” PySpark DataFrame z 5 wierszami i 4 kolumnami. Przekonwertuj tę ramkę DataFrame na format JSON przy użyciu metody toJSON().

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# dane umiejętności z 5 wierszami i 4 kolumnami

umiejętności =[{ 'ID' : 123 , 'osoba' : 'Miód' , 'umiejętność' : 'obraz' , 'nagroda' : 25000 },

{ 'ID' : 112 , 'osoba' : „Mouni” , 'umiejętność' : „muzyka/taniec” , 'nagroda' : 2000 },

{ 'ID' : 153 , 'osoba' : „Tulasi” , 'umiejętność' : 'czytanie' , 'nagroda' : 1200 },

{ 'ID' : 173 , 'osoba' : 'Biegł' , 'umiejętność' : 'muzyka' , 'nagroda' : 2000 },

{ 'ID' : 43 , 'osoba' : „Kamala” , 'umiejętność' : 'czytanie' , 'nagroda' : 10000 }

]

# utwórz ramkę danych umiejętności z powyższych danych

skill_df = linuxhint_spark_app.createDataFrame(umiejętności)

# Rzeczywiste dane dotyczące umiejętności

skill_df.show()

# Konwertuj na tablicę JSON

json_skills_data = umiejętności_df.toJSON().zbierz()

print(json_skills_data)

Wyjście:

+---+------+-----+-----------+

| identyfikator|osoba|nagroda| umiejętność|

+---+------+-----+-----------+

| 123 | Kochanie| 25000 | malowanie|

| 112 | Mouni| 2000 |muzyka/taniec|

| 153 |Tulasi| 1200 | czytanie|

| 173 | Biegł| 2000 | muzyka|

| 43 |Kamala| 10000 | czytanie|

+---+------+-----+-----------+

[ '{'id':123,'person':'Honey','prize':25000,'skill':'malowanie'}' , '{'id':112,'person':'Mouni','prize':2000,'skill':'muzyka/taniec'}' , '{'id':153,'person':'Tulasi','prize':1200,'skill':'czytanie'}' , '{'id':173,'person':'Ran','prize':2000,'skill':'muzyka'}' , '{'id':43,'person':'Kamala','prize':10000,'skill':'czytanie'}' ]

W PySpark DataFrame jest 5 wierszy. Wszystkie te 5 wierszy jest zwracanych jako słownik łańcuchów oddzielonych przecinkami.

PySpark DataFrame do JSON przy użyciu Write.json()

Metoda write.json() jest dostępna w PySpark, która zapisuje/zapisuje PySpark DataFrame do pliku JSON. Przyjmuje nazwę pliku/ścieżkę jako parametr. Zasadniczo zwraca JSON w wielu plikach (plikach podzielonych na partycje). Aby połączyć je wszystkie w jednym pliku, możemy użyć metody koalescencji().

Składnia:

dataframe_object.coalesce( 1 ).write.json('nazwa_pliku')
  1. Tryb dołączania – dataframe_object.write.mode('dołącz').json('nazwa_pliku')
  2. Tryb nadpisywania – dataframe_object.write.mode('nadpisywanie').json('nazwa_pliku')

Możliwe jest dołączenie/zastąpienie istniejącego JSON. Za pomocą metody write.mode() możemy dołączyć dane przekazując „append” lub nadpisać istniejące dane JSON przekazując do tej funkcji „overwrite”.

Przykład 1:

Utwórz „skills_df” PySpark DataFrame z 3 wierszami i 4 kolumnami. Zapisz tę ramkę danych w formacie JSON.

zaimportuj pyspark

importować pandy

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# dane umiejętności z 3 wierszami i 4 kolumnami

umiejętności =[{ 'ID' : 123 , 'osoba' : 'Miód' , 'umiejętność' : 'obraz' , 'nagroda' : 25000 },

{ 'ID' : 112 , 'osoba' : „Mouni” , 'umiejętność' : 'taniec' , 'nagroda' : 2000 },

{ 'ID' : 153 , 'osoba' : „Tulasi” , 'umiejętność' : 'czytanie' , 'nagroda' : 1200 }

]

# utwórz ramkę danych umiejętności z powyższych danych

skill_df = linuxhint_spark_app.createDataFrame(umiejętności)

# write.json()

skill_df.coalesce( 1 ).write.json( „dane_umiejętności” )

Plik JSON:

Widzimy, że folder skill_data zawiera partycjonowane dane JSON.

Otwórzmy plik JSON. Widzimy, że wszystkie wiersze z PySpark DataFrame są konwertowane na JSON.

W PySpark DataFrame jest 5 wierszy. Wszystkie te 5 wierszy jest zwracanych jako słownik łańcuchów oddzielonych przecinkami.

Przykład 2:

Utwórz ramkę danych PySpark „skills2_df” z jednym wierszem. Dołącz jeden wiersz do poprzedniego pliku JSON, określając tryb jako „dołącz”.

zaimportuj pyspark

importować pandy

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

umiejętności2 =[{ 'ID' : 78 , 'osoba' : „Maryja” , 'umiejętność' : 'jazda konna' , 'nagroda' : 8960 }

]

# utwórz ramkę danych umiejętności z powyższych danych

umiejętności2_df = linuxhint_spark_app.createDataFrame(umiejętności2)

# write.json() z trybem dołączania.

umiejętności2_df.write.mode( 'dodać' .json( „dane_umiejętności” )

Plik JSON:

Widzimy podzielone na partycje pliki JSON. Pierwszy plik zawiera pierwsze rekordy DataFrame, a drugi zawiera drugi rekord DataFrame.

Wniosek

Istnieją trzy różne sposoby konwersji PySpark DataFrame na JSON. Najpierw omówiliśmy metodę to_json(), która konwertuje do JSON, konwertując PySpark DataFrame na Pandas DataFrame z różnymi przykładami, biorąc pod uwagę różne parametry. Następnie wykorzystaliśmy metodę toJSON(). Na koniec nauczyliśmy się, jak używać funkcji write.json() do zapisywania ramki danych PySpark w formacie JSON. Ta funkcja umożliwia dołączanie i nadpisywanie.