PySpark Odczyt.Parquet()

Pyspark Odczyt Parquet



W PySpark funkcja write.parquet() zapisuje DataFrame do pliku Parquet, a read.parquet() odczytuje plik Parquet do PySpark DataFrame lub dowolnego innego źródła danych. Aby szybko i wydajnie przetwarzać kolumny w Apache Spark, musimy skompresować dane. Kompresja danych oszczędza naszą pamięć, a wszystkie kolumny są konwertowane na płaski poziom. Oznacza to, że istnieje przechowywanie na poziomie płaskiej kolumny. Plik, który je przechowuje, jest znany jako plik PARQUET.

W tym przewodniku skupimy się głównie na odczytywaniu/ładowaniu pliku parquet do PySpark DataFrame/SQL za pomocą funkcji read.parquet() dostępnej w klasie pyspark.sql.DataFrameReader.

Temat treści:







Pobierz plik parkietu



Odczytaj plik Parquet do ramki danych PySpark



Odczytaj plik Parquet do PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Ta funkcja służy do odczytywania pliku Parquet i ładowania go do PySpark DataFrame. Pobiera ścieżkę/nazwę pliku pliku parkietu. Możemy po prostu użyć funkcji read.parquet(), ponieważ jest to funkcja ogólna.

Składnia:



Zobaczmy składnię read.parquet():

spark_app.read.parquet(nazwa_pliku.parquet/ścieżka)

Najpierw zainstaluj moduł PySpark za pomocą polecenia pip:

pip zainstaluj pyspark

Pobierz plik parkietu

Aby odczytać plik parkietu, potrzebne są dane, w których plik parkietu jest generowany z tych danych. W tej części zobaczymy, jak wygenerować plik parkietu z PySpark DataFrame.

Stwórzmy PySpark DataFrame z 5 rekordami i zapiszmy to w pliku parkietu „industry_parquet”.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# utwórz ramkę danych, która przechowuje szczegóły branżowe

industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Rolnictwo' ,Powierzchnia= 'USA' ,
Ocena= 'Gorący' ,Ogółem_pracowników= 100 ),

Wiersz(Typ= 'Rolnictwo' ,Powierzchnia= 'Indie' ,Ocena= 'Gorący' ,Ogółem_pracowników= 200 ),

Wiersz(Typ= 'Rozwój' ,Powierzchnia= 'USA' ,Ocena= 'Ciepły' ,Ogółem_pracowników= 100 ),

Wiersz(Typ= 'Edukacja' ,Powierzchnia= 'USA' ,Ocena= 'Fajny' ,Ogółem_pracowników= 400 ),

Wiersz(Typ= 'Edukacja' ,Powierzchnia= 'USA' ,Ocena= 'Ciepły' ,Ogółem_pracowników= 20 )

])

# Rzeczywista ramka danych

Industry_df.show()

# Wpisz Industry_df do pliku Parquet

industry_df.coalesce( 1 ).write.parkiet( „parkiet_przemysłowy” )

Wyjście:

To jest DataFrame, która zawiera 5 rekordów.

Tworzony jest plik Parquet dla poprzedniej ramki DataFrame. Tutaj nasza nazwa pliku z rozszerzeniem to „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet”. Używamy tego pliku w całym samouczku.

Odczytaj plik Parquet do ramki danych PySpark

Mamy plik parkietu. Przeczytajmy ten plik za pomocą funkcji read.parquet() i załadujmy go do PySpark DataFrame.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# Wczytaj plik parkietu do obiektu dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Wyświetl dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Wyjście:

Wyświetlamy DataFrame za pomocą metody show(), która została utworzona z pliku parquet.

Zapytania SQL z plikiem Parquet

Po załadowaniu do DataFrame możliwe jest utworzenie tabel SQL i wyświetlenie danych obecnych w DataFrame. Musimy utworzyć TEMPORARY VIEW i użyć poleceń SQL, aby zwrócić rekordy z DataFrame, która jest utworzona z pliku Parquet.

Przykład 1:

Utwórz tymczasowy widok o nazwie „Sektory” i użyj polecenia SELECT, aby wyświetlić rekordy w DataFrame. Możesz się do tego odnieść instruktaż który wyjaśnia, jak utworzyć WIDOK w Spark – SQL.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# Wczytaj plik parkietu do obiektu dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Utwórz widok z powyższego pliku parkietu o nazwie - „Sektory”

dataframe_from_parquet.createOrReplaceTempView( „Sektory” )

# Zapytanie, aby wyświetlić wszystkie rekordy z sektorów

linuxhint_spark_app.sql( „wybierz * z sektorów” ).pokazywać()

Wyjście:

Przykład 2:

Korzystając z poprzedniego WIDOKU, napisz zapytanie SQL:

  1. Aby wyświetlić wszystkie rekordy z sektorów należących do „Indii”.
  2. Aby wyświetlić wszystkie rekordy z Sektorów z pracownikiem, który jest większy niż 100.
# Zapytanie, aby wyświetlić wszystkie rekordy z sektorów należących do „Indii”.

linuxhint_spark_app.sql( „wybierz * z sektorów, w których obszar = „Indie”” ).pokazywać()

# Zapytanie, aby wyświetlić wszystkie rekordy z sektorów z pracownikami większymi niż 100

linuxhint_spark_app.sql( „wybierz * z sektorów, w których liczba_pracowników>100” ).pokazywać()

Wyjście:

Jest tylko jeden rekord z obszarem „Indie” i dwa rekordy z pracownikami większymi niż 100.

Odczytaj plik Parquet do PySpark SQL

Najpierw musimy utworzyć WIDOK za pomocą polecenia CREATE. Używając słowa kluczowego „ścieżka” w zapytaniu SQL, możemy odczytać plik parkietu do Spark SQL. Po ścieżce musimy określić nazwę pliku/lokalizację pliku.

Składnia:

spark_app.sql( „UTWÓRZ WIDOK TYMCZASOWY nazwa_widoku UŻYWAJĄC OPCJI parkietu (ścieżka „ nazwa_pliku.parkiet ')' )

Przykład 1:

Utwórz tymczasowy widok o nazwie „Sektor2” i wczytaj do niego plik parkietu. Za pomocą funkcji sql() napisz zapytanie wybierające, aby wyświetlić wszystkie rekordy obecne w widoku.

zaimportuj pyspark

z pyspark.sql zaimportuj SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Wskazówka dotycząca Linuksa” .getOrCreate()

# Wczytaj plik Parquet do SparkSQL

linuxhint_spark_app.sql( „UTWÓRZ WIDOK TYMCZASOWY Sektor2 UŻYWAJĄC OPCJI parkietu (ścieżka „ część-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Zapytanie o wyświetlenie wszystkich rekordów z Sector2

linuxhint_spark_app.sql( „wybierz * z sektora 2” ).pokazywać()

Wyjście:

Przykład 2:

Użyj poprzedniego WIDOKU i napisz zapytanie, aby wyświetlić wszystkie rekordy z oceną „Hot” lub „Cool”.

# Zapytanie, aby wyświetlić wszystkie rekordy z Sektora 2 z oceną - Gorąca lub Chłodna.

linuxhint_spark_app.sql( „wybierz * z Sektora 2, gdzie Ocena=„Gorąca” LUB Ocena=„Zimna”” ).pokazywać()

Wyjście:

Istnieją trzy rekordy z oceną „Hot” lub „Cool”.

Wniosek

W PySpark funkcja write.parquet() zapisuje DataFrame do pliku Parquet. Funkcja read.parquet() odczytuje plik parkietu do PySpark DataFrame lub dowolnego innego źródła danych. Nauczyliśmy się, jak wczytać plik parkietu do PySpark DataFrame i do tabeli PySpark. W ramach tego samouczka omówiliśmy również, jak tworzyć tabele z PySpark DataFrame i filtrować dane za pomocą klauzuli WHERE.