Apacha Kafka dla początkujących.

Tomasz Gintowt
7 min readJun 30, 2020

TL;DR W artykule znajdziesz podstawowe informacje o Apache Kafka oraz kilka praktycznych przykładów.

https://kafka.apache.org/intro

Apache Kafka jest rozproszoną platformą do przesyłania milionów wiadomości. Projekt powstał w firmie LinkedIn i jest napisany w języku Scala. Podbiją świat IT swoją niezawodnością, skalowalnością i dziś większość skomplikowanych projektów, nie tylko tych z big data, ma w swoich ekosystemie Apache Kafka.

Pojęcia, które trzeba poznać:

  • Broker — jest to instancja Apache Kafka, może to być fizyczny serwer, POD w k8s czy też wirtualna maszyna.
  • Klaster — Kilka brokerów połączonych ze sobą.
  • Topic — Podstawowa jednostka, a właściwie nazwa służącą do opisania grupy logów.
  • Partycja — Każdy topic zawiera minimum jedną partycję, zazwyczaj kilka lub kilkanaście. Mówiąc niskopoziomowo, jest to plik na dysku brokera, do którego zapisywane są logi.

>>Zapisz sie na kurs — Apache Kafka dla początkujacych <<

W tym momencie warto zauważyć, że w Apache Kafka posługujemy się najczęściej pojęciem logu, chociaż czasem zamiennie używa się też słowa wiadomość lub event. Na początku może być to mylące. Dlaczego log ? Na pewno widziałeś wiele logów w swoich życiu i wiesz, że najnowsze dane są zapisywane na końcu, dopisywane do już istniejących. Tak, też dzieje się w przypadku Apache Kafka. Każdy nowy log(wiadomość/event) jest dopisywany na końcu pliku.

Apache Kafka ekosystem

Niezbędnym elementem, każdego klastra Apache Kafka jest Zookeeper. Minimum jeden, a zazwyczaj trzy nody, żeby zapewnić wysoką dostępność i tolerancje na awarie. Apache Kafka trzyma w Zookeeper metadane o topic, borker, consumer. Nie ma tam danych, które wysyłasz, tylko metadane klastra. Jako ciekawostkę dodam, że trwają prace nad przeniesieniem metadanych trzymanych w Zookeeper do Apache Kafka ( KIP-500 ) .

Tak wyglądają relację pomiędzy producer, broker i consumer.

Każdy producer może pisać do każdego brokera oraz każdy consumer może konsumować wiadomości z każdego brokera. W tym momencie dochodzimy do topic, jest to nazwa pewnej grupy wiadomości. Jeden producer może pisać do wielu topic, tak samo consumer może czytać dane z wielu topic. Topic jest podzielony na partycje, a każda partycja zawiera jakaś cześć wiadomości, które wysłałeś. Po co są partycje ? Żeby zwiększyć przepustowość. Załóżmy, że producer wysyła dane do topic, który ma 3 partycje a każda z nim jest na innym borker. Tym samym wysyłamy dane do trzech różnych serwerów, przez trzy różne karty sieciowe, procesory i na różne dyski. W teorii mamy trzykrotnie większą przepustowość. Oczywiście jest to sytuacja idealna, nie dzieje się tak jednak zawsze.

Jeśli powyższa cześć jest trudną do zrozumienia za pierwszym razem, nie przejmuj się, czytaj dalej i wykonaj polecenia.

Czasem zdarza się, że jakiś broker będzie niedostępny a tym samym wiadomości wysłane do niego. Żeby, rozwiązać ten problem Apache Kafka używa replikacji, zapewniając tym samym wysoką dostępność danych oraz niezawodność. Replikacja to duplikaty danych, które są przechowywane na innym brokerze. “Replication factor” to parametr, który definiujemy tworząc topic i może on być zmieniany nawet dla istniejących topic.

(Źródło — http://kafka.apache.org/documentation.html#introduction)

W Kafka posługujemy się pojęciem lidera partycji uznajemy, że jest to nasze źródło prawdy, a repliki są tylko kopiami zapasowymi na wypadek awarii. Każda wiadomość posiada offest, który jest numer pozycji w logu.

Podsumujmy. Producer wysyła wiadomości do broker, czyli do serwera lub kontenera. Wiadomości trafiają do odpowiedniego topic, który składa się z jednej lub więcej partycji, czyli fizycznego pliku na dysku. Proste prawda :)

Praktyka. Żeby, wykonać polecenia pokazane poniżej potrzebujesz docker oraz docker compose.

Link do repo.

https://github.com/wurstmeister/kafka-docker

$git clone https://github.com/wurstmeister/kafka-docker
$cd kafka-docker
$ sed -i 's@KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100@KAFKA_LISTENERS: PLAINTEXT://:9092@' docker-compose.yml
$docker-compose up -d
Recreating kafka-docker_kafka_1 ... done
Starting kafka-docker_zookeeper_1 ... done
$docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
139f5cc240a1 kafka-docker_kafka "start-kafka.sh" 6 seconds ago Up 3 seconds 0.0.0.0:32768->9092/tcp kafka-docker_kafka_1
91eaf22d2251 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 3 months ago Up 4 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafka-docker_zookeeper_1
$ docker exec -ti kafka-docker_kafka_1 bash
bash-4.4#

Teraz, kiedy już jesteśmy w kontenerze, popatrzmy jak wygląda klaster i załóżmy nowy topic i wyślijmy kilka wiadomości.

bash-4.4# kafka-topics.sh --zookeeper zookeeper --list
bash-4.4#
bash-4.4# kafka-topics.sh --zookeeper zookeeper --create -topic test --partitions 1 --replication-factor 1
Created topic test.
bash-4.4# kafka-topics.sh --zookeeper zookeeper --describe
Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001

Stworzyliśmy topic o nazwie “test” z jedną partycją i jedną repliką. Teraz wyślijmy kilka wiadomości.

bash-4.4# kafka-console-producer.sh --broker-list kafka:9092 --topic test
>wiadomosc1
>wiadomosc2
>wiadomosc3
>^C

Przeczytajmy te wiadomości.

bash-4.4# kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
wiadomosc1
wiadomosc2
wiadomosc3
^CProcessed a total of 3 messages
bash-4.4#

Wiemy, że mamy trzy wiadomości, teraz zobaczmy jak to wygląda z poziomu systemu plików.

bash-4.4# cd /kafka/kafka-logs-*/test-0/
bash-4.4# ls test-0/
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint

Nasze wiadomości są w pliku *.log

kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --deep-iteration --files 00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1593463177181 size: 78 magic: 2 compresscodec: NONE crc: 841198575 isvalid: true
| offset: 0 CreateTime: 1593463177181 keysize: -1 valuesize: 10 sequence: -1 headerKeys: [] payload: wiadomosc1
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 78 CreateTime: 1593463179641 size: 78 magic: 2 compresscodec: NONE crc: 195960679 isvalid: true
| offset: 1 CreateTime: 1593463179641 keysize: -1 valuesize: 10 sequence: -1 headerKeys: [] payload: wiadomosc2
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 156 CreateTime: 1593463182242 size: 78 magic: 2 compresscodec: NONE crc: 738383461 isvalid: true
| offset: 2 CreateTime: 1593463182242 keysize: -1 valuesize: 10 sequence: -1 headerKeys: [] payload: wiadomosc3

Jak widzisz, offest to nic innego jak numer ID wiadomości, jest też zapisany czas jej stworzenia CreateTime. Łatwo go rozszyfrować przy użyciu polecenia.

bash-4.4# date -d @1593463182242
Sat Nov 1 21:44:02 UTC +52464

Na samym końcu jest treść wiadomości, która wysłaliśmy.

Tak jak wspomniałem na samym początku, topic to nic innego jak fizyczny plik na dysku do ktorego zapisywane są dane.

Teraz zróbmy test i przekonajmy się, dlaczego kafka jest tak szybka i wydajna. Przy pierwszej próbie wyślijmy milion wiadomości do jednego topicu z ilością partycji 1.

kafka-producer-perf-test.sh --print-metrics  --topic test --num-records 1000000 --record-size 100 --throughput 15000000 --producer-props acks=1 bootstrap.servers=kafka:9092 buffer.memory=67108864 compression.type=none batch.size=81961000000 records sent, 164203.612479 records/sec (15.66 MB/sec), 1655.62 ms avg latency, 2448.00 ms max latency, 1893 ms 50th, 2411 ms 95th, 2436 ms 99th, 2446 ms 99.9th.

Teraz, zwiększmy ilość partycji z 1 do 9. Przypominam, że ilość partycji można zwiększać, ale nie zmniejszać.

bash-4.4#  kafka-topics.sh --zookeeper zookeeper --alter --topic test --partitions 9
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
bash-4.4# kafka-topics.sh --zookeeper zookeeper --describe
Topic: test PartitioneCount: 9 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test Partition: 5 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test Partition: 6 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test Partition: 7 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test Partition: 8 Leader: 1001 Replicas: 1001 Isr: 1001

Z poziomu plików też mamy więcej katalogów o nazwie test-*

bash-4.4# cd /kafka/kafka-logs-*/
bash-4.4# ls test-* -d
test-0 test-1 test-2 test-3 test-4 test-5 test-6 test-7 test-8

W Apache Kafka partycje liczymy od zera, więc partycją numer 1 jest test-0. Dzięki temu będziemy mogli zapisywać i odczytywać nie z jednego pliku a z ośmiu. Jeśli, dodamy jeszcze kolejne brokery i partycje zostaną przeniesione na te borkery będziemy mogli czytać i zapisywać z dużo większą prędkością. Porównajmy wyniki zapisu dla jednej partycji i dziewięciu.

bash-4.4#kafka-producer-perf-test.sh --print-metrics  --topic test --num- ecords 1000000 --record-size 100 --throughput 15000000 --producer-propsype=none batch.size=8196s=kafka:9092 buffer.memory=67108864 compression.ty
1000000 records sent, 298596.595999 records/sec (28.48 MB/sec), 70.31 ms avg latency, 419.00 ms max latency, 60 ms 50th, 180 ms 95th, 232 ms 99th, 254 ms 99.9th.

Przy jednej partycji było to 15.66 MB/sec, przy dziewięciu 28.48 MB/sec. Test jest o tyle nie miarodajny, że ciągle korzystamy z tego samego dockera, czyli dyski/CPU/RAM. Jeżeli dodamy kolejne brokery, które będą fizycznie odseparowane to prędkość, będzie jeszcze większa.

Gratuluje, właśnie wysłałeś swój pierwszy milion wiadomości :) W ramach wstępu to by było na tyle.

Posłuchaj jak zarządzamy klastrem Apache Kafka, który przesyła miliardy wiadomości dziennie.

Tomasz Gintowt jest Architektem/DevOps/DBA, głównie skupiony na dostarczaniu rozwiązań składowania i przetwarzania danych. Nie są mu obce wszelkiej maści bazy danych, systemy real-time data i streamingu. Obecnie pracuje z Apache Kafka, RabbitMQ, Elastic Stack i PostgreSQL. Prywatnie fan ciasta marchewkowego.

https://www.linkedin.com/in/tomasz-gintowt/

--

--

Tomasz Gintowt

Architect, DevOps, SysOps, and DBA. Currently, I’m an IT Systems Engineer working with Apache Kafka, RabbitMQ, PostgreSQL, Elastic Stack in Real-Time Data Team.