Jak zaimplementować strumieniowanie danych w czasie rzeczywistym w Pythonie

Jak Zaimplementowac Strumieniowanie Danych W Czasie Rzeczywistym W Pythonie



Opanowanie implementacji strumieniowania danych w czasie rzeczywistym w Pythonie jest niezbędną umiejętnością w dzisiejszym świecie opartym na danych. W tym przewodniku omówiono podstawowe kroki i niezbędne narzędzia umożliwiające wykorzystanie strumieniowania danych w czasie rzeczywistym z zachowaniem autentyczności w języku Python. Od wyboru odpowiedniego frameworka, takiego jak Apache Kafka lub Apache Pulsar, po pisanie kodu w języku Python w celu łatwego gromadzenia, przetwarzania i skutecznej wizualizacji danych, zdobędziemy umiejętności potrzebne do konstruowania sprawnych i wydajnych kanałów danych w czasie rzeczywistym.

Przykład 1: Implementacja strumieniowania danych w czasie rzeczywistym w Pythonie

Wdrożenie strumieniowania danych w czasie rzeczywistym w Pythonie ma kluczowe znaczenie w dzisiejszym świecie i wieku opartym na danych. W tym szczegółowym przykładzie przeprowadzimy przez proces budowania systemu strumieniowego przesyłania danych w czasie rzeczywistym przy użyciu Apache Kafka i Python w Google Colab.







Aby zainicjować przykład przed rozpoczęciem kodowania, niezbędne jest zbudowanie określonego środowiska w Google Colab. Pierwszą rzeczą, którą musimy zrobić, to zainstalować niezbędne biblioteki. Do integracji Kafki używamy biblioteki „kafka-python”.



! pypeć zainstalować kafka-python


To polecenie instaluje bibliotekę „kafka-python”, która udostępnia funkcje Pythona i powiązania dla Apache Kafka. Następnie importujemy wymagane biblioteki dla naszego projektu. Import wymaganych bibliotek, w tym „KafkaProducer” i „KafkaConsumer” to klasy z biblioteki „kafka-python”, które pozwalają nam na interakcję z brokerami Kafki. JSON to biblioteka Pythona do pracy z danymi JSON, których używamy do serializacji i deserializacji wiadomości.



z kafka import KafkaProducer, KafkaConsumer
importuj jsona


Stworzenie producenta Kafki





Jest to ważne, ponieważ producent platformy Kafka wysyła dane do tematu platformy Kafka. W naszym przykładzie tworzymy producenta, który będzie wysyłał symulowane dane w czasie rzeczywistym do tematu zwanego „tematem w czasie rzeczywistym”.

Tworzymy instancję „KafkaProducer”, która określa adres brokera Kafka jako „localhost:9092”. Następnie używamy „value_serializer”, funkcji serializującej dane przed wysłaniem ich do Kafki. W naszym przypadku funkcja lambda koduje dane w formacie JSON zakodowanym w formacie UTF-8. Teraz zasymulujmy dane w czasie rzeczywistym i wyślijmy je do tematu Kafki.



producent = KafkaProducer ( serwery startowe = „host lokalny: 9092” ,
serializator wartości =lambda v: json.dumps ( W ) .kodować ( „utf-8” ) )
# Symulowane dane w czasie rzeczywistym
dane = { „id_czujnika” : 1 , 'temperatura' : 25,5 , 'wilgotność' : 60.2 }
# Wysyłanie danych do tematu
producent.wyślij ( „temat w czasie rzeczywistym” , dane )


W tych wierszach definiujemy słownik „danych”, który reprezentuje symulowane dane z czujnika. Następnie używamy metody „wyślij”, aby opublikować te dane w „temacie w czasie rzeczywistym”.

Następnie chcemy utworzyć konsumenta Kafki, a konsument Kafki odczytuje dane z tematu Kafki. Tworzymy konsumenta, który będzie konsumował i przetwarzał wiadomości w „temacie czasu rzeczywistego”. Tworzymy instancję „KafkaConsumer”, podając temat, który chcemy wykorzystać, np. (temat czasu rzeczywistego) i adres brokera Kafki. Następnie „value_deserializer” to funkcja deserializująca dane otrzymane od Kafki. W naszym przypadku funkcja lambda dekoduje dane w formacie JSON zakodowanym w formacie UTF-8.

konsument = KafkaKonsument ( „temat w czasie rzeczywistym” ,
serwery startowe = „host lokalny: 9092” ,
wartość_deserializer =lambda x: json.loads ( dekodowanie x ( „utf-8” ) ) )


Używamy pętli iteracyjnej, aby w sposób ciągły konsumować i przetwarzać wiadomości z tematu.

# Odczyt i przetwarzanie danych w czasie rzeczywistym
Do wiadomość W konsument:
dane = wartość.wiadomości
wydrukować ( F „Otrzymane dane: {dane}” )


Pobieramy wartość każdej wiadomości i nasze symulowane dane z czujnika w pętli i drukujemy je na konsoli. Uruchomienie producenta i konsumenta Kafki polega na uruchomieniu tego kodu w Google Colab i indywidualnym wykonaniu komórek kodu. Producent przesyła symulowane dane do tematu Kafki, a konsument odczytuje i drukuje otrzymane dane.


Analiza danych wyjściowych podczas działania kodu

Będziemy obserwować dane w czasie rzeczywistym, które są produkowane i zużywane. Format danych może się różnić w zależności od naszej symulacji lub rzeczywistego źródła danych. W tym szczegółowym przykładzie omawiamy cały proces konfigurowania systemu strumieniowego przesyłania danych w czasie rzeczywistym przy użyciu Apache Kafka i Python w Google Colab. Wyjaśnimy każdą linijkę kodu i jej znaczenie w budowaniu tego systemu. Przesyłanie strumieniowe danych w czasie rzeczywistym to potężna funkcja, a ten przykład służy jako podstawa dla bardziej złożonych aplikacji w świecie rzeczywistym.

Przykład 2: Implementacja strumieniowania danych w czasie rzeczywistym w Pythonie przy użyciu danych giełdowych

Zróbmy kolejny unikalny przykład implementacji strumieniowania danych w czasie rzeczywistym w Pythonie przy użyciu innego scenariusza; tym razem skupimy się na danych giełdowych. Tworzymy system strumieniowego przesyłania danych w czasie rzeczywistym, który rejestruje zmiany cen akcji i przetwarza je przy użyciu Apache Kafka i Python w Google Colab. Jak pokazano w poprzednim przykładzie, zaczynamy od skonfigurowania naszego środowiska w Google Colab. Najpierw instalujemy wymagane biblioteki:

! pypeć zainstalować kafka-python yfinanse


Tutaj dodajemy bibliotekę „yfinance”, która pozwala nam uzyskać dane giełdowe w czasie rzeczywistym. Następnie importujemy niezbędne biblioteki. W dalszym ciągu używamy klas „KafkaProducer” i „KafkaConsumer” z biblioteki „kafka-python” do interakcji z platformą Kafka. Importujemy JSON, aby pracować z danymi JSON. Używamy również „yfinance”, aby uzyskać dane giełdowe w czasie rzeczywistym. Importujemy również bibliotekę „time”, aby dodać opóźnienie czasowe w celu symulacji aktualizacji w czasie rzeczywistym.

z kafka import KafkaProducer, KafkaConsumer
importuj jsona
importuj finanse Jak yf
import czas


Teraz tworzymy producenta Kafki dla danych giełdowych. Nasz producent platformy Kafka otrzymuje w czasie rzeczywistym dane dotyczące akcji i wysyła je do tematu platformy Kafka o nazwie „cena akcji”.

producent = KafkaProducer ( serwery startowe = „host lokalny: 9092” ,
serializator wartości =lambda v: json.dumps ( W ) .kodować ( „utf-8” ) )

chwila PRAWDA:
stock = yf.Ticker ( „AAPL” ) # Przykład: akcje Apple Inc
stock_data = stock.historia ( okres = „1d” )
ostatnia_cena = dane_magazynowe [ 'Zamknąć' ] .I loc [ - 1 ]
dane = { 'symbol' : „AAPL” , 'cena' : Ostatnia cena }
producent.wyślij ( 'Cena akcji' , dane )
czas spać ( 10 ) # Symuluj aktualizacje w czasie rzeczywistym co 10 sekund


Tworzymy instancję „KafkaProducer” z adresem brokera Kafka w tym kodzie. Wewnątrz pętli używamy słowa „yfinance”, aby uzyskać najnowszy kurs akcji Apple Inc. („AAPL”). Następnie wyodrębniamy ostatnią cenę zamknięcia i wysyłamy ją do tematu „cena akcji”. Ostatecznie wprowadzamy opóźnienie czasowe, aby symulować aktualizacje w czasie rzeczywistym co 10 sekund.

Stwórzmy konsumenta Kafki, który będzie odczytywał i przetwarzał dane dotyczące cen akcji z tematu „cena akcji”.

konsument = KafkaKonsument ( 'Cena akcji' ,
serwery startowe = „host lokalny: 9092” ,
wartość_deserializer =lambda x: json.loads ( dekodowanie x ( „utf-8” ) ) )

Do wiadomość W konsument:
stock_data = wiadomość.wartość
wydrukować ( F „Otrzymano dane giełdowe: {stock_data['symbol']} - Cena: {stock_data['price']}' )


Ten kod jest podobny do konfiguracji konsumenckiej z poprzedniego przykładu. W sposób ciągły odczytuje i przetwarza komunikaty z tematu „cena akcji” oraz drukuje symbol akcji i cenę na konsoli. Komórki kodu wykonujemy sekwencyjnie, np. jedna po drugiej w Google Colab, aby uruchomić producenta i konsumenta. Producent otrzymuje i wysyła aktualizacje cen akcji w czasie rzeczywistym, podczas gdy konsument odczytuje i wyświetla te dane.

! pypeć zainstalować kafka-python yfinanse
z kafka import KafkaProducer, KafkaConsumer
importuj jsona
importuj finanse Jak yf
import czas
producent = KafkaProducer ( serwery startowe = „host lokalny: 9092” ,
serializator wartości =lambda v: json.dumps ( W ) .kodować ( „utf-8” ) )

chwila PRAWDA:
stock = yf.Ticker ( „AAPL” ) # Akcje Apple Inc
stock_data = stock.historia ( okres = „1d” )
ostatnia_cena = dane_magazynowe [ 'Zamknąć' ] .I loc [ - 1 ]

dane = { 'symbol' : „AAPL” , 'cena' : Ostatnia cena }

producent.wyślij ( 'Cena akcji' , dane )

czas spać ( 10 ) # Symuluj aktualizacje w czasie rzeczywistym co 10 sekund
konsument = KafkaKonsument ( 'Cena akcji' ,
serwery startowe = „host lokalny: 9092” ,
wartość_deserializer =lambda x: json.loads ( dekodowanie x ( „utf-8” ) ) )

Do wiadomość W konsument:
stock_data = wiadomość.wartość
wydrukować ( F „Otrzymano dane giełdowe: {stock_data['symbol']} - Cena: {stock_data['price']}' )


W analizie wyników po uruchomieniu kodu będziemy obserwować aktualizacje cen akcji Apple Inc. w czasie rzeczywistym, które są produkowane i konsumowane.

Wniosek

W tym wyjątkowym przykładzie zademonstrowaliśmy implementację strumieniowania danych w czasie rzeczywistym w Pythonie przy użyciu Apache Kafka i biblioteki „yfinance” do przechwytywania i przetwarzania danych giełdowych. Dokładnie wyjaśniliśmy każdą linijkę kodu. Przesyłanie strumieniowe danych w czasie rzeczywistym można zastosować w różnych dziedzinach w celu tworzenia rzeczywistych aplikacji w finansach, IoT i nie tylko.