W tym przewodniku skupimy się głównie na odczytywaniu/ładowaniu pliku parquet do PySpark DataFrame/SQL za pomocą funkcji read.parquet() dostępnej w klasie pyspark.sql.DataFrameReader.
Temat treści:
Odczytaj plik Parquet do ramki danych PySpark
Odczytaj plik Parquet do PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
Ta funkcja służy do odczytywania pliku Parquet i ładowania go do PySpark DataFrame. Pobiera ścieżkę/nazwę pliku pliku parkietu. Możemy po prostu użyć funkcji read.parquet(), ponieważ jest to funkcja ogólna.
Składnia:
Zobaczmy składnię read.parquet():
spark_app.read.parquet(nazwa_pliku.parquet/ścieżka)Najpierw zainstaluj moduł PySpark za pomocą polecenia pip:
pip zainstaluj pyspark
Pobierz plik parkietu
Aby odczytać plik parkietu, potrzebne są dane, w których plik parkietu jest generowany z tych danych. W tej części zobaczymy, jak wygenerować plik parkietu z PySpark DataFrame.
Stwórzmy PySpark DataFrame z 5 rekordami i zapiszmy to w pliku parkietu „industry_parquet”.
zaimportuj pysparkz pyspark.sql zaimportuj SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# utwórz ramkę danych, która przechowuje szczegóły branżowe
industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Rolnictwo' ,Powierzchnia= 'USA' ,
Ocena= 'Gorący' ,Ogółem_pracowników= 100 ),
Wiersz(Typ= 'Rolnictwo' ,Powierzchnia= 'Indie' ,Ocena= 'Gorący' ,Ogółem_pracowników= 200 ),
Wiersz(Typ= 'Rozwój' ,Powierzchnia= 'USA' ,Ocena= 'Ciepły' ,Ogółem_pracowników= 100 ),
Wiersz(Typ= 'Edukacja' ,Powierzchnia= 'USA' ,Ocena= 'Fajny' ,Ogółem_pracowników= 400 ),
Wiersz(Typ= 'Edukacja' ,Powierzchnia= 'USA' ,Ocena= 'Ciepły' ,Ogółem_pracowników= 20 )
])
# Rzeczywista ramka danych
Industry_df.show()
# Wpisz Industry_df do pliku Parquet
industry_df.coalesce( 1 ).write.parkiet( „parkiet_przemysłowy” )
Wyjście:
To jest DataFrame, która zawiera 5 rekordów.
Tworzony jest plik Parquet dla poprzedniej ramki DataFrame. Tutaj nasza nazwa pliku z rozszerzeniem to „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet”. Używamy tego pliku w całym samouczku.
Odczytaj plik Parquet do ramki danych PySpark
Mamy plik parkietu. Przeczytajmy ten plik za pomocą funkcji read.parquet() i załadujmy go do PySpark DataFrame.
zaimportuj pysparkz pyspark.sql zaimportuj SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# Wczytaj plik parkietu do obiektu dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Wyświetl dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Wyjście:
Wyświetlamy DataFrame za pomocą metody show(), która została utworzona z pliku parquet.
Zapytania SQL z plikiem Parquet
Po załadowaniu do DataFrame możliwe jest utworzenie tabel SQL i wyświetlenie danych obecnych w DataFrame. Musimy utworzyć TEMPORARY VIEW i użyć poleceń SQL, aby zwrócić rekordy z DataFrame, która jest utworzona z pliku Parquet.
Przykład 1:
Utwórz tymczasowy widok o nazwie „Sektory” i użyj polecenia SELECT, aby wyświetlić rekordy w DataFrame. Możesz się do tego odnieść instruktaż który wyjaśnia, jak utworzyć WIDOK w Spark – SQL.
zaimportuj pysparkz pyspark.sql zaimportuj SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# Wczytaj plik parkietu do obiektu dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Utwórz widok z powyższego pliku parkietu o nazwie - „Sektory”
dataframe_from_parquet.createOrReplaceTempView( „Sektory” )
# Zapytanie, aby wyświetlić wszystkie rekordy z sektorów
linuxhint_spark_app.sql( „wybierz * z sektorów” ).pokazywać()
Wyjście:
Przykład 2:
Korzystając z poprzedniego WIDOKU, napisz zapytanie SQL:
- Aby wyświetlić wszystkie rekordy z sektorów należących do „Indii”.
- Aby wyświetlić wszystkie rekordy z Sektorów z pracownikiem, który jest większy niż 100.
linuxhint_spark_app.sql( „wybierz * z sektorów, w których obszar = „Indie”” ).pokazywać()
# Zapytanie, aby wyświetlić wszystkie rekordy z sektorów z pracownikami większymi niż 100
linuxhint_spark_app.sql( „wybierz * z sektorów, w których liczba_pracowników>100” ).pokazywać()
Wyjście:
Jest tylko jeden rekord z obszarem „Indie” i dwa rekordy z pracownikami większymi niż 100.
Odczytaj plik Parquet do PySpark SQL
Najpierw musimy utworzyć WIDOK za pomocą polecenia CREATE. Używając słowa kluczowego „ścieżka” w zapytaniu SQL, możemy odczytać plik parkietu do Spark SQL. Po ścieżce musimy określić nazwę pliku/lokalizację pliku.
Składnia:
spark_app.sql( „UTWÓRZ WIDOK TYMCZASOWY nazwa_widoku UŻYWAJĄC OPCJI parkietu (ścieżka „ nazwa_pliku.parkiet ')' )Przykład 1:
Utwórz tymczasowy widok o nazwie „Sektor2” i wczytaj do niego plik parkietu. Za pomocą funkcji sql() napisz zapytanie wybierające, aby wyświetlić wszystkie rekordy obecne w widoku.
zaimportuj pysparkz pyspark.sql zaimportuj SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()
# Wczytaj plik Parquet do SparkSQL
linuxhint_spark_app.sql( „UTWÓRZ WIDOK TYMCZASOWY Sektor2 UŻYWAJĄC OPCJI parkietu (ścieżka „ część-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# Zapytanie o wyświetlenie wszystkich rekordów z Sector2
linuxhint_spark_app.sql( „wybierz * z sektora 2” ).pokazywać()
Wyjście:
Przykład 2:
Użyj poprzedniego WIDOKU i napisz zapytanie, aby wyświetlić wszystkie rekordy z oceną „Hot” lub „Cool”.
# Zapytanie, aby wyświetlić wszystkie rekordy z Sektora 2 z oceną - Gorąca lub Chłodna.linuxhint_spark_app.sql( „wybierz * z Sektora 2, gdzie Ocena=„Gorąca” LUB Ocena=„Zimna”” ).pokazywać()
Wyjście:
Istnieją trzy rekordy z oceną „Hot” lub „Cool”.
Wniosek
W PySpark funkcja write.parquet() zapisuje DataFrame do pliku Parquet. Funkcja read.parquet() odczytuje plik parkietu do PySpark DataFrame lub dowolnego innego źródła danych. Nauczyliśmy się, jak wczytać plik parkietu do PySpark DataFrame i do tabeli PySpark. W ramach tego samouczka omówiliśmy również, jak tworzyć tabele z PySpark DataFrame i filtrować dane za pomocą klauzuli WHERE.