PySpark Odczyt JSON()

Pyspark Odczyt Json



Podczas pracy z PySpark DataFrames musi być przechowywany w PySpark DataFrame, jeśli chcesz przetwarzać dane JSON. Po przechowywaniu w DataFrame możemy zastosować różne operacje i metody na danych. Ponadto istnieje wiele korzyści, jeśli przekonwertujemy JSON na PySpark DataFrame, ponieważ jest to proste i możemy przekształcać/partycjonować dane w prostszy sposób.

Temat treści:

Wczytywanie JSON do ramki danych PySpark za pomocą Pandas.read_json()







Odczytywanie JSON do PySpark DataFrame za pomocą Spark.read.json()



Odczytywanie JSON do PySpark DataFrame przy użyciu PySpark SQL



W tym samouczku przyjrzymy się, jak wczytać JSON do PySpark DataFrame za pomocą pandas.read_json(), spark.read.json() i spark.sql. We wszystkich scenariuszach przyjrzymy się różnym przykładom, biorąc pod uwagę różne formaty JSON.





Zainstaluj bibliotekę PySpark przed zaimplementowaniem poniższych przykładów.

pip zainstaluj pyspark

Po udanej instalacji możesz zobaczyć dane wyjściowe w następujący sposób:



Wczytywanie JSON do ramki danych PySpark za pomocą Pandas.read_json()

W PySpark metoda createDataFrame() służy do bezpośredniego tworzenia DataFrame. Tutaj musimy tylko przekazać plik/ścieżkę JSON do pliku JSON za pomocą metody pandas.read_json() . Ta metoda read_json() przyjmuje nazwę pliku/ścieżkę dostępną w module Pandas. Dlatego konieczne jest zaimportowanie i wykorzystanie modułu Pandas.

Składnia:

spark_app.createDataFrame(pandy.read_json( „nazwa_pliku.json” ))

Przykład:

Stwórzmy plik JSON o nazwie „student_skill.json”, który zawiera 2 rekordy. Tutaj klucze/kolumny to „Student 1” i „Student 2”. Wiersze to nazwa, wiek, umiejętność1 i umiejętność2.

zaimportuj pyspark

importować pandy

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# Używanie pandas.read_json()

Candidate_skills = linuxhint_spark_app.createDataFrame(pandas.read_json( „umiejętność_ucznia.json” ))

umiejętności_kandydata.show()

Wyjście:

Widzimy, że dane JSON są konwertowane do PySpark DataFrame z określonymi kolumnami i wierszami.

2. Odczyt JSON do PySpark DataFrame za pomocą Spark.read.json()

Read.json() to metoda podobna do read_json() w Pandach. Tutaj read.json() pobiera ścieżkę do JSON lub bezpośrednio do pliku JSON i bezpośrednio ładuje go do PySpark DataFrame. W tym scenariuszu nie ma potrzeby używania metody createDataFrame(). Jeśli chcesz czytać wiele plików JSON jednocześnie, musimy przekazać listę nazw plików JSON przez listę oddzieloną przecinkami. Wszystkie rekordy JSON są przechowywane w pojedynczej ramce DataFrame.

Składnia:

Pojedynczy plik — spark_app.read.json( „nazwa_pliku.json” )

Wiele plików — spark_app.read.json([ „plik1.json” , „plik2.json” ,...])

Scenariusz 1: Odczyt JSON z pojedynczą linią

Jeśli Twój plik JSON jest w formacie rekord1, rekord2, rekord3… (pojedynczy wiersz), możemy nazwać go JSON z pojedynczymi wierszami. Spark przetwarza te rekordy i przechowuje je w PySpark DataFrame jako wiersze. Każdy rekord jest wierszem w PySpark DataFrame.

Stwórzmy plik JSON o nazwie „candidate_skills.json”, który zawiera 3 rekordy. Wczytaj ten kod JSON do ramki danych PySpark.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# Wczytaj plik Candidate_skills.json do PySpark DataFrame

kandydaci_umiejętności = linuxhint_spark_app.read.json( „kandydat_umiejętności.json” )

umiejętności_kandydata.show()

Wyjście:

Widzimy, że dane JSON są konwertowane do PySpark DataFrame z określonymi rekordami i nazwami kolumn.

Scenariusz 2: Odczyt JSON z wieloma wierszami

Jeśli Twój plik JSON ma wiele wierszy, musisz użyć metody read.option().json() w celu przekazania parametru multiline, który musi być ustawiony na true. To pozwala nam załadować JSON z wieloma wierszami do PySpark DataFrame.

opcja odczytu ( „wieloliniowy” , 'PRAWDA' .json( „nazwa_pliku.json” )

Stwórzmy plik JSON o nazwie „multi.json”, który zawiera 3 rekordy. Wczytaj ten kod JSON do ramki danych PySpark.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# Wczytaj multi.json (zawierający wiele wierszy) do PySpark DataFrame

kandydat_umiejętności = linuxhint_spark_app.read.option( „wieloliniowy” , 'PRAWDA' .json( 'multi.json' )

umiejętności_kandydata.show()

Wyjście:

Scenariusz 3: Odczyt wielu plików JSON

Omówiliśmy już w początkowej fazie tego samouczka dotyczące wielu plików JSON. Jeśli chcesz czytać wiele plików JSON na raz i przechowywać je w jednym PySpark DataFrame, musimy przekazać listę nazw plików do metody read.json() .

Utwórzmy dwa pliki JSON o nazwach „candidate_skills.json” i „candidate_skills2.json” i załadujmy je do PySpark DataFrame.

Plik „candidate_skills.json” zawiera trzy rekordy.

Plik „candidate_skill2.json” zawiera tylko jeden rekord.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# Jednoczesne odczytywanie plików kandydata_umiejętności i kandydata_umiejętności2 w PySpark DataFrame

Candidate_skills = linuxhint_spark_app.read.json([ „kandydat_umiejętności.json” , „kandydat_umiejętności2.json” ])

umiejętności_kandydata.show()

Wyjście:

Wreszcie DataFrame zawiera cztery rekordy. Pierwsze trzy rekordy należą do pierwszego JSON, a ostatnie do drugiego JSON.

Odczytywanie JSON do PySpark DataFrame za pomocą Spark.read.json()

Read.json() to metoda podobna do read_json() w Pandach. Tutaj read.json() pobiera ścieżkę do JSON lub bezpośrednio do pliku JSON i ładuje go bezpośrednio do PySpark DataFrame. W tym scenariuszu nie ma potrzeby używania metody createDataFrame(). Jeśli chcesz czytać wiele plików JSON jednocześnie, musimy przekazać listę nazw plików JSON przez listę oddzieloną przecinkami. Wszystkie rekordy JSON są przechowywane w pojedynczej ramce DataFrame.

Składnia:

Pojedynczy plik — spark_app.read.json( „nazwa_pliku.json” )

Wiele plików — spark_app.read.json([ „plik1.json” , „plik2.json” ,...])

Scenariusz 1: Odczyt JSON z pojedynczą linią

Jeśli Twój plik JSON jest w formacie rekord1, rekord2, rekord3… (pojedynczy wiersz), możemy nazwać go JSON z pojedynczymi wierszami. Spark przetwarza te rekordy i przechowuje je w PySpark DataFrame jako wiersze. Każdy rekord jest wierszem w PySpark DataFrame.

Stwórzmy plik JSON o nazwie „candidate_skills.json”, który zawiera 3 rekordy. Wczytaj ten kod JSON do ramki danych PySpark.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# Wczytaj plik Candidate_skills.json do PySpark DataFrame

kandydaci_umiejętności = linuxhint_spark_app.read.json( „kandydat_umiejętności.json” )

umiejętności_kandydata.show()

Wyjście:

Widzimy, że dane JSON są konwertowane do PySpark DataFrame z określonymi rekordami i nazwami kolumn.

Odczytywanie JSON do PySpark DataFrame przy użyciu PySpark SQL

Możliwe jest utworzenie tymczasowego widoku naszych danych JSON przy użyciu PySpark SQL. Bezpośrednio możemy udostępnić JSON w momencie tworzenia widoku tymczasowego. Spójrz na następującą składnię. Następnie możemy użyć polecenia SELECT, aby wyświetlić PySpark DataFrame.

Składnia:

spark_app.sql( „UTWÓRZ WIDOK TYMCZASOWY NAZWA_WIDOKU PRZY UŻYCIU OPCJI json (ścieżka „nazwa_pliku.json”)” )

Tutaj „VIEW_NAME” to widok danych JSON, a „file_name” to nazwa pliku JSON.

Przykład 1:

Rozważmy plik JSON użyty w poprzednich przykładach — „candidate_skills.json”. Wybierz wszystkie wiersze z DataFrame za pomocą SELECT z operatorem „*”. Tutaj * wybiera wszystkie kolumny z PySpark DataFrame.

zaimportuj pyspark

importować pandy

z pyspark.sql zaimportuj SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# Używanie spark.sql do tworzenia VIEW z JSON

umiejętności_kandydata = linuxhint_spark_app.sql( „UTWÓRZ WIDOK TYMCZASOWY Candidate_data UŻYWAJĄC OPCJI json (ścieżka „candidate_skills.json”)” )

# Użyj zapytania SELECT, aby wybrać wszystkie rekordy z Candidate_data.

linuxhint_spark_app.sql( „WYBIERZ * z danych_kandydata” ).pokazywać()

Wyjście:

Całkowita liczba rekordów w PySpark DataFrame (odczytanych z JSON) to 3.

Przykład 2:

Teraz przefiltruj rekordy w PySpark DataFrame na podstawie kolumny wieku. Użyj operatora „większy niż” na wieku, aby uzyskać wiersze z wiekiem większym niż 22.

# Użyj zapytania SELECT, aby wybrać rekordy z wiekiem > 22.

linuxhint_spark_app.sql( „WYBIERZ * z danych_kandydata, gdzie wiek>22” ).pokazywać()

Wyjście:

W PySpark DataFrame jest tylko jeden rekord z wiekiem większym niż 22.

Wniosek

Poznaliśmy trzy różne sposoby wczytywania JSON do PySpark DataFrame. Najpierw nauczyliśmy się, jak używać metody read_json() dostępnej w module Pandas do odczytywania JSON do PySpark DataFrame. Następnie nauczyliśmy się odczytywać jedno-/wieloliniowe pliki JSON za pomocą metody spark.read.json() z opcją(). Aby odczytać wiele plików JSON na raz, musimy przekazać tej metodzie listę nazw plików. Za pomocą PySpark SQL plik JSON jest wczytywany do widoku tymczasowego, a DataFrame jest wyświetlana przy użyciu zapytania SELECT.