Apache Kafka — jak działa log compaction.
Jedną z wielu bardzo ciekawych opcji jakie można wykorzystać w Apache Kafka jest kompaktowanie danych ( log compaction) . W tym wpisie wyjaśnie jak działa kompaktowanie i w jakich sytuacjach można je wykorzystać.
Zakładam że czytając to znasz już podstawy i wiesz jak działa Apache Kafka. Dodatkowo będzie potrzebny docker lub istniejący klaster.
Co to jest kompaktowanie ( log compaction) ?
Na stronie sjp.pl znajdziemy takie wyjaśnienie słowa kompaktowanie
w informatyce: procedura oczyszczania ze zbędnych danych plików zawierających pocztę elektroniczną
Apache Kafka ma niewiele wspólnego z pocztą elektreniczna, ale faktycznie potrafi usuwać zbędne dane z topic.
Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key. This way the log is guaranteed to have at least the last state for each key.
Powyższy opis znajdziemy w dokumentacji Apache Kafka. Jak należy go rozumieć ? Mówiąc bardzo prostymi słowami Kafka usuwa stare rekordy jeżeli pojawią sie nowsze dla danego klucza w danej partycji. Popatrzmy na poniższy przykład:
Log compaction usunie “wartosc1” i “wartosc2”, zostanie tylko “wartosc3” ponieważ jest ona ostatnią dla “klucz1” i ma najnowszy offset. W Apache Kafka nie możemy zmienić zawartości wiadomości, ale możemy zapisać nową wiadomość która będzie miała nowy offset. Dzięki kompatowaniu będziemy mogli w łatwy sposób sprawdzić jaka jest ostatnia, najbardziej aktualna wartość.
Gdzie więc sprawdzi sie kompaktowanie ? Wszedzie tam gdzie zainteresowani jesteśmy aktualną wartością lub stanem i nie potrzebujemy widzieć jak one zmieniały się w czasie. Interesuje nas tu i teraz.
Jak działa kompatowanie w praktyce.
Będziemy potrzebowali klastra Apache Kafka, można użyc obrazu dockera z https://github.com/wurstmeister/kafka-docker
Jeśli masz już działajacy klaster Apache Kafka możesz pominać poniższe komendy:
$ git clone https://github.com/wurstmeister/kafka-docker.git$ cd kafka-docker/$ sed -i s/192.168.99.100/localhost/g docker-compose-single-broker.yml$ docker-compose -f docker-compose-single-broker.yml up -d
Starting kafka-docker_zookeeper_1 … done
Starting kafka-docker_kafka_1 … done$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f358003012df kafka-docker_kafka “start-kafka.sh” 5 minutes ago Up 3 seconds 0.0.0.0:32770->9092/tcp kafka-docker_kafka_1
5cefa0390a7e wurstmeister/zookeeper “/bin/sh -c ‘/usr/sb…” 4 weeks ago Up 4 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafka-docker_zookeeper_1$ docker exec -ti kafka-docker_kafka_1 /bin/bash
bash-4.4#
Kiedy mamy już działający klaster możemy wykonać następujące polecenie, które stworzy nam topic, warto zwrócić uwagę na konfigurację.
bash-4.4# kafka-topics.sh --create --zookeeper zookeeper:2181 --topic latest-value --replication-factor 1 --partitions 1 --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0.01"
Created topic latest-value.
Dodajmy kilka rekordów
bash-4.4# kafka-console-producer.sh --broker-list localhost:9092 --topic latest-value --property parse.key=true --property key.separator=:
>klucz1:10
>klucz1:20
>klucz2:5
>klucz3:6
>klucz1:30
Przeczytajmy to co mamy
bash-4.4# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic latest-value --property print.key=true --property key.separator=: --from-beginning
>klucz1:10
>klucz1:20
>klucz2:5
>klucz3:6
>klucz1:30
Zgodnie z teoria, powinniśmy mieć ostatnie zapisane wartości dla danego klucza, powinny być to: klucz1:30, klucz2:5,klucz3:6 . Dlaczego więc tak nie jest ? Ważny jest czas ( delete.retention.ms ), jeżeli przeczytamy odpowiednio szybko to co jest w topicu, to log compaction jeszcze nie skompaktował rekordów. Po pewnym czasie:
bash-4.4# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic latest-value --property print.key=true --property key.separator=: --from-beginning
klucz1:20
klucz2:5
klucz3:6
klucz1:30
Lepiej, ale ciagle mamy dwie wartości dla klucz1, dlaczego ? Popatrzymy jak to wygląda z poziomu plików
bash-4.4# cd /kafka/kafka-logs-*/latest-value-0/
bash-4.4# ls
00000000000000000000.index
00000000000000000000.index.deleted
00000000000000000000.log
00000000000000000000.log.deleted
00000000000000000000.timeindex
00000000000000000000.timeindex.deleted
00000000000000000001.index.deleted
00000000000000000001.log.deleted
00000000000000000001.snapshot
00000000000000000001.timeindex.deleted
00000000000000000002.index.deleted
00000000000000000002.log.deleted
00000000000000000002.snapshot
00000000000000000002.timeindex.deleted
00000000000000000003.index.deleted
00000000000000000003.log.deleted
00000000000000000003.snapshot
00000000000000000003.timeindex.deleted
00000000000000000004.index
00000000000000000004.log
00000000000000000004.snapshot
00000000000000000004.timeindex
leader-epoch-checkpoint
I po raz kolejny, czas jest bardzo ważny, po kilku chwilach już wygląda to tak
bash-4.4# ls
00000000000000000000.index 00000000000000000004.index
00000000000000000000.log 00000000000000000004.log
00000000000000000000.timeindex 00000000000000000004.snapshot
00000000000000000001.snapshot 00000000000000000004.timeindex
00000000000000000002.snapshot leader-epoch-checkpoint
00000000000000000003.snapshot
Co sie stało ? Wewnętrzny proces log cleaner przeniósł rekordy do jednego segmentu i usunął pozostałości, wszystkie pliki z nazwa *.deleted. Wróćmy teraz do kluczy i wartości, dlaczego mamy dwie wartości dla klucz1 ( 20 i 30 ) ?
bash-4.4# kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --deep-iteration --files 00000000000000000000.log
Dumping 00000000000000000000.log
Starting offset: 0
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1585774427745 size: 76 magic: 2 compresscodec: NONE crc: 3574414105 isvalid: true
| offset: 1 CreateTime: 1585774427745 keysize: 6 valuesize: 2 sequence: -1 headerKeys: [] key: klucz1 payload: 20
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 76 CreateTime: 1585774431402 size: 75 magic: 2 compresscodec: NONE crc: 3951877235 isvalid: true
| offset: 2 CreateTime: 1585774431402 keysize: 6 valuesize: 1 sequence: -1 headerKeys: [] key: klucz2 payload: 5
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 151 CreateTime: 1585774435705 size: 75 magic: 2 compresscodec: NONE crc: 367995597 isvalid: true
| offset: 3 CreateTime: 1585774435705 keysize: 6 valuesize: 1 sequence: -1 headerKeys: [] key: klucz3 payload: 6
bash-4.4# kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --deep-iteration --files 00000000000000000004.log
Dumping 00000000000000000004.log
Starting offset: 4
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1585774439931 size: 76 magic: 2 compresscodec: NONE crc: 1888514047 isvalid: true
| offset: 4 CreateTime: 1585774439931 keysize: 6 valuesize: 2 sequence: -1 headerKeys: [] key: klucz1 payload: 30
Wewnętrzny proces w Apache Kafka potrzebuje minimum dwóch segmentów, segment 00000000000000000000.log jest tym w którym trzymane są skompaktowane dane ( klucz1:20, klucz2:5, klucz3:6 ). Drugi 00000000000000000004.log jest tym, który jest aktywny (klucz1:30 ) i nie został on skompaktowany. (The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.) Dodatkowo warto pamiętać że ostatnia cyfra segmentu jest to pierwszy offset ( 00000000000000000004.log — Starting offset: 4 baseOffset: 4 ).
Dopiszmy więc kolejną wartość dla klucz1
bash-4.4# kafka-console-producer.sh --broker-list localhost:9092 --topic latest-value --property parse.key=true --property key.separator=:
>klucz1:40
Odczekajmy chwile i dajmy szanse wewnętrznym procesom Apache Kafka.
bash-4.4# ls
00000000000000000000.index 00000000000000000001.snapshot 00000000000000000004.snapshot 00000000000000000005.snapshot
00000000000000000000.log 00000000000000000002.snapshot 00000000000000000005.index 00000000000000000005.timeindex
00000000000000000000.timeindex 00000000000000000003.snapshot 00000000000000000005.log leader-epoch-checkpoint
Jak widać segment 00000000000000000004.log został skompaktowany i usunięty, a aktywnym segmentem jest 00000000000000000005.log z klucz1:40 . Klucz1:30 jest teraz w 00000000000000000000.log .
bash-4.4# kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --deep-iteration --files 00000000000000000005.log
Dumping 00000000000000000005.log
Starting offset: 5
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1585775426312 size: 76 magic: 2 compresscodec: NONE crc: 1534843419 isvalid: true
| offset: 5 CreateTime: 1585775426312 keysize: 6 valuesize: 2 sequence: -1 headerKeys: [] key: klucz1 payload: 40
Przeczytajmy dla pewności co jest w topicu.
bash-4.4# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic latest-value --property print.key=true --property key.separator=: --from-beginning
klucz2:5
klucz3:6
klucz1:30
klucz1:40
Podsumowując, klucz2:5 klucz3:6 i klucz1:30 są w 00000000000000000000.log, a klucz1:40 w 00000000000000000005.log, który jest aktywnym segmentem. Klucz1:20 został usunięty w momencie kompaktowania segmentu 000000000000000000004.log. Warto dodać, że kompaktowanie usunie również rekordy z wartością null, a skompaktowane rekordy zachowują swój offset.
Ciekawostka. Apache Kafka używa offset map do porównywania kluczy i wybiera wartość z najnowszych offsetem i na tym opiera się całe kompaktowanie.
Tomasz Gintowt jest Architektem/DevOps/DBA/Trenerem, głównie skupiony na dostarczaniu rozwiązań składowania i przetwarzania danych. Nie są mu obce wszelkiej maści bazy danych, systemy real-time data i streamingu. Obecnie pracuje z Apache Kafka, RabbitMQ, Elastic Stack i PostgreSQL. Organizator spotkań DataOps Poland.
https://www.linkedin.com/in/tomasz-gintowt/
https://dataops-academy.pl — kursy i szkolenia.