Temat treści:
Wczytywanie JSON do ramki danych PySpark za pomocą Pandas.read_json()
Odczytywanie JSON do PySpark DataFrame za pomocą Spark.read.json()
Odczytywanie JSON do PySpark DataFrame przy użyciu PySpark SQL
W tym samouczku przyjrzymy się, jak wczytać JSON do PySpark DataFrame za pomocą pandas.read_json(), spark.read.json() i spark.sql. We wszystkich scenariuszach przyjrzymy się różnym przykładom, biorąc pod uwagę różne formaty JSON.
Zainstaluj bibliotekę PySpark przed zaimplementowaniem poniższych przykładów.
pip zainstaluj pysparkPo udanej instalacji możesz zobaczyć dane wyjściowe w następujący sposób:
Wczytywanie JSON do ramki danych PySpark za pomocą Pandas.read_json()
W PySpark metoda createDataFrame() służy do bezpośredniego tworzenia DataFrame. Tutaj musimy tylko przekazać plik/ścieżkę JSON do pliku JSON za pomocą metody pandas.read_json() . Ta metoda read_json() przyjmuje nazwę pliku/ścieżkę dostępną w module Pandas. Dlatego konieczne jest zaimportowanie i wykorzystanie modułu Pandas.
Składnia:
spark_app.createDataFrame(pandy.read_json( „nazwa_pliku.json” ))Przykład:
Stwórzmy plik JSON o nazwie „student_skill.json”, który zawiera 2 rekordy. Tutaj klucze/kolumny to „Student 1” i „Student 2”. Wiersze to nazwa, wiek, umiejętność1 i umiejętność2.
zaimportuj pyspark
importować pandy
z pyspark.sql zaimportuj SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# Używanie pandas.read_json()
Candidate_skills = linuxhint_spark_app.createDataFrame(pandas.read_json( „umiejętność_ucznia.json” ))
umiejętności_kandydata.show()
Wyjście:
Widzimy, że dane JSON są konwertowane do PySpark DataFrame z określonymi kolumnami i wierszami.
2. Odczyt JSON do PySpark DataFrame za pomocą Spark.read.json()
Read.json() to metoda podobna do read_json() w Pandach. Tutaj read.json() pobiera ścieżkę do JSON lub bezpośrednio do pliku JSON i bezpośrednio ładuje go do PySpark DataFrame. W tym scenariuszu nie ma potrzeby używania metody createDataFrame(). Jeśli chcesz czytać wiele plików JSON jednocześnie, musimy przekazać listę nazw plików JSON przez listę oddzieloną przecinkami. Wszystkie rekordy JSON są przechowywane w pojedynczej ramce DataFrame.
Składnia:
Pojedynczy plik — spark_app.read.json( „nazwa_pliku.json” )Wiele plików — spark_app.read.json([ „plik1.json” , „plik2.json” ,...])
Scenariusz 1: Odczyt JSON z pojedynczą linią
Jeśli Twój plik JSON jest w formacie rekord1, rekord2, rekord3… (pojedynczy wiersz), możemy nazwać go JSON z pojedynczymi wierszami. Spark przetwarza te rekordy i przechowuje je w PySpark DataFrame jako wiersze. Każdy rekord jest wierszem w PySpark DataFrame.
Stwórzmy plik JSON o nazwie „candidate_skills.json”, który zawiera 3 rekordy. Wczytaj ten kod JSON do ramki danych PySpark.
zaimportuj pyspark
z pyspark.sql zaimportuj SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# Wczytaj plik Candidate_skills.json do PySpark DataFrame
kandydaci_umiejętności = linuxhint_spark_app.read.json( „kandydat_umiejętności.json” )
umiejętności_kandydata.show()
Wyjście:
Widzimy, że dane JSON są konwertowane do PySpark DataFrame z określonymi rekordami i nazwami kolumn.
Scenariusz 2: Odczyt JSON z wieloma wierszami
Jeśli Twój plik JSON ma wiele wierszy, musisz użyć metody read.option().json() w celu przekazania parametru multiline, który musi być ustawiony na true. To pozwala nam załadować JSON z wieloma wierszami do PySpark DataFrame.
opcja odczytu ( „wieloliniowy” , 'PRAWDA' .json( „nazwa_pliku.json” )Stwórzmy plik JSON o nazwie „multi.json”, który zawiera 3 rekordy. Wczytaj ten kod JSON do ramki danych PySpark.
zaimportuj pyspark
z pyspark.sql zaimportuj SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# Wczytaj multi.json (zawierający wiele wierszy) do PySpark DataFrame
kandydat_umiejętności = linuxhint_spark_app.read.option( „wieloliniowy” , 'PRAWDA' .json( 'multi.json' )
umiejętności_kandydata.show()
Wyjście:
Scenariusz 3: Odczyt wielu plików JSON
Omówiliśmy już w początkowej fazie tego samouczka dotyczące wielu plików JSON. Jeśli chcesz czytać wiele plików JSON na raz i przechowywać je w jednym PySpark DataFrame, musimy przekazać listę nazw plików do metody read.json() .
Utwórzmy dwa pliki JSON o nazwach „candidate_skills.json” i „candidate_skills2.json” i załadujmy je do PySpark DataFrame.
Plik „candidate_skills.json” zawiera trzy rekordy.
Plik „candidate_skill2.json” zawiera tylko jeden rekord.
zaimportuj pyspark
z pyspark.sql zaimportuj SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# Jednoczesne odczytywanie plików kandydata_umiejętności i kandydata_umiejętności2 w PySpark DataFrame
Candidate_skills = linuxhint_spark_app.read.json([ „kandydat_umiejętności.json” , „kandydat_umiejętności2.json” ])
umiejętności_kandydata.show()
Wyjście:
Wreszcie DataFrame zawiera cztery rekordy. Pierwsze trzy rekordy należą do pierwszego JSON, a ostatnie do drugiego JSON.
Odczytywanie JSON do PySpark DataFrame za pomocą Spark.read.json()
Read.json() to metoda podobna do read_json() w Pandach. Tutaj read.json() pobiera ścieżkę do JSON lub bezpośrednio do pliku JSON i ładuje go bezpośrednio do PySpark DataFrame. W tym scenariuszu nie ma potrzeby używania metody createDataFrame(). Jeśli chcesz czytać wiele plików JSON jednocześnie, musimy przekazać listę nazw plików JSON przez listę oddzieloną przecinkami. Wszystkie rekordy JSON są przechowywane w pojedynczej ramce DataFrame.
Składnia:
Pojedynczy plik — spark_app.read.json( „nazwa_pliku.json” )Wiele plików — spark_app.read.json([ „plik1.json” , „plik2.json” ,...])
Scenariusz 1: Odczyt JSON z pojedynczą linią
Jeśli Twój plik JSON jest w formacie rekord1, rekord2, rekord3… (pojedynczy wiersz), możemy nazwać go JSON z pojedynczymi wierszami. Spark przetwarza te rekordy i przechowuje je w PySpark DataFrame jako wiersze. Każdy rekord jest wierszem w PySpark DataFrame.
Stwórzmy plik JSON o nazwie „candidate_skills.json”, który zawiera 3 rekordy. Wczytaj ten kod JSON do ramki danych PySpark.
zaimportuj pyspark
z pyspark.sql zaimportuj SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# Wczytaj plik Candidate_skills.json do PySpark DataFrame
kandydaci_umiejętności = linuxhint_spark_app.read.json( „kandydat_umiejętności.json” )
umiejętności_kandydata.show()
Wyjście:
Widzimy, że dane JSON są konwertowane do PySpark DataFrame z określonymi rekordami i nazwami kolumn.
Odczytywanie JSON do PySpark DataFrame przy użyciu PySpark SQL
Możliwe jest utworzenie tymczasowego widoku naszych danych JSON przy użyciu PySpark SQL. Bezpośrednio możemy udostępnić JSON w momencie tworzenia widoku tymczasowego. Spójrz na następującą składnię. Następnie możemy użyć polecenia SELECT, aby wyświetlić PySpark DataFrame.
Składnia:
spark_app.sql( „UTWÓRZ WIDOK TYMCZASOWY NAZWA_WIDOKU PRZY UŻYCIU OPCJI json (ścieżka „nazwa_pliku.json”)” )Tutaj „VIEW_NAME” to widok danych JSON, a „file_name” to nazwa pliku JSON.
Przykład 1:
Rozważmy plik JSON użyty w poprzednich przykładach — „candidate_skills.json”. Wybierz wszystkie wiersze z DataFrame za pomocą SELECT z operatorem „*”. Tutaj * wybiera wszystkie kolumny z PySpark DataFrame.
zaimportuj pysparkimportować pandy
z pyspark.sql zaimportuj SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# Używanie spark.sql do tworzenia VIEW z JSON
umiejętności_kandydata = linuxhint_spark_app.sql( „UTWÓRZ WIDOK TYMCZASOWY Candidate_data UŻYWAJĄC OPCJI json (ścieżka „candidate_skills.json”)” )
# Użyj zapytania SELECT, aby wybrać wszystkie rekordy z Candidate_data.
linuxhint_spark_app.sql( „WYBIERZ * z danych_kandydata” ).pokazywać()
Wyjście:
Całkowita liczba rekordów w PySpark DataFrame (odczytanych z JSON) to 3.
Przykład 2:
Teraz przefiltruj rekordy w PySpark DataFrame na podstawie kolumny wieku. Użyj operatora „większy niż” na wieku, aby uzyskać wiersze z wiekiem większym niż 22.
# Użyj zapytania SELECT, aby wybrać rekordy z wiekiem > 22.linuxhint_spark_app.sql( „WYBIERZ * z danych_kandydata, gdzie wiek>22” ).pokazywać()
Wyjście:
W PySpark DataFrame jest tylko jeden rekord z wiekiem większym niż 22.
Wniosek
Poznaliśmy trzy różne sposoby wczytywania JSON do PySpark DataFrame. Najpierw nauczyliśmy się, jak używać metody read_json() dostępnej w module Pandas do odczytywania JSON do PySpark DataFrame. Następnie nauczyliśmy się odczytywać jedno-/wieloliniowe pliki JSON za pomocą metody spark.read.json() z opcją(). Aby odczytać wiele plików JSON na raz, musimy przekazać tej metodzie listę nazw plików. Za pomocą PySpark SQL plik JSON jest wczytywany do widoku tymczasowego, a DataFrame jest wyświetlana przy użyciu zapytania SELECT.