Menyiapkan Kafka di Docker untuk Pengembangan Lokal

· 6 min read
Menyiapkan Kafka di Docker untuk Pengembangan Lokal
Photo by Boitumelo / Unsplash

Kafka adalah platform streaming yang digunakan untuk memproses data secara real-time. Di dunia di mana data adalah rajanya, Kafka adalah alat yang berharga untuk dipelajari oleh pengembang dan insinyur data. Namun, menyiapkan Kafka secara lokal dapat membuat frustasi dan menghambat pembelajaran. Dalam artikel ini, saya akan menunjukkan kepada Anda cara tercepat menyiapkan Kafka untuk pengembangan menggunakan Docker, dan juga menunjukkan kepada Anda bagaimana pengaturan ini dapat mendukung koneksi secara lokal dan dari container Docker lokal lainnya.

Tutorial ini mengasumsikan bahwa Anda memiliki pengetahuan tentang penggunaan Docker dan docker-compose untuk pengembangan. Jika Anda baru mengenal Docker, saya sarankan untuk membaca artikel ini terlebih dahulu. Jika Anda perlu menginstal Docker, ikuti petunjuk di sini.

Ikhtisar Kafka

Diagram di bawah ini menunjukkan ikhtisar tingkat tinggi tentang Kafka untuk pemula. (Masih banyak lagi yang ada di Kafka, seperti Zookeeper, Consumer Groups, Partitions, dll. tapi kita akan tinggalkan itu untuk lain waktu.)

Sekilas tentang Kafka

Sekilas tentang Kafka

Kafka mengkategorikan data ke dalam topik. Topik adalah kategori atau nama feed tempat rekaman dipublikasikan.

Produser mempublikasikan pesan ke topik tertentu. Pesannya bisa dalam format apa pun, dengan JSON dan Avro menjadi opsi populer. Misalnya, platform media sosial mungkin menggunakan produser untuk mempublikasikan pesan ke topik yang disebut postingan setiap kali pengguna membuat postingan.

Konsumen berlangganan suatu topik untuk mengonsumsi rekaman yang diterbitkan oleh produsen. Dalam contoh media sosial, mungkin ada konsumen yang diatur untuk menggunakan topik postingan untuk melakukan pemeriksaan keamanan pada postingan sebelum dipublikasikan ke feed global, dan konsumen lain mungkin secara asinkron mengirimkan pemberitahuan ke pengikut pengguna.

Menyiapkan Kafka di Docker

Kami akan menggunakan gambar bitnami untuk Kafka dan Zookeeper. Saya lebih suka ini daripada gambar wurstmeister karena lebih mudah diatur dan lebih aktif dipelihara. Kami juga akan menggunakan alat untuk mengelola container kami.docker-compose

Buat file bernama docker-compose.yml dan tambahkan konten berikut:

# docker-compose.yml
version: "3.7"
services:
  zookeeper:
    restart: always
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper-volume:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    restart: always
    image: docker.io/bitnami/kafka:3.3
    ports:
      - "9093:9093"
    volumes:
      - "kafka-volume:/bitnami"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
volumes:
  kafka-volume:
  zookeeper-volume:

. Hal ini karena Kafka bergantung pada Zookeeper untuk menyimpan metadata tentang topik dan partisi. Untuk tujuan pengembangan, Anda tidak perlu berinteraksi dengannya dan Anda dapat mengabaikannya dengan aman untuk sementara waktu.. Lihat bahwa kita juga telah mendefinisikan layanan bitnami/kafkazookeeper

Untuk menjalankannya, jalankan saja docker-compose up -d dan Anda akan melihat keluaran berikut:

$ docker-compose up -d
Creating network "kafka-on-docker_default" with the default driver
Creating volume "kafka-on-docker_kafka-volume" with default driver
Creating volume "kafka-on-docker_zookeeper-volume" with default driver
Creating kafka-on-docker_zookeeper_1 ... done
Creating kafka-on-docker_kafka_1     ... done

Berinteraksi dengan kontainer Kafka dari mesin lokal Anda

kafkacat menawarkan antarmuka baris perintah sederhana untuk berinteraksi dengan Kafka. Ini adalah alat yang hebat untuk digunakan untuk memeriksa kewarasan apakah Kafka sedang berjalan. Untuk menginstal kafkacat, ikuti petunjuk dari bergantung pada sistem operasi Anda.https://github.com/edenhill/kcat

Untuk memastikan apakah Kafka sedang berjalan, jalankan perintah berikut untuk mencantumkan semua topik yang saat ini ada di Kafka:

$ kcat -b localhost:9093 -L  # list all topics currently in kafka
Metadata for all topics (from broker 1: localhost:9093/1):
 1 brokers:
  broker 1 at localhost:9093 (controller)
 0 topics:

Perhatikan bahwa kami menggunakan localhost:9093 dan bukan port default 9092. Ini karena kami menggunakan port yang terekspos ke mesin lokal kami.

Untuk menguji produser, jalankan perintah berikut:

$ kcat -b localhost:9093 -t test-topic -P  # producer
one line per message
another line

Pembatas default antar pesan adalah baris baru. Setelah selesai, tekan tidak akan berhasil, Anda harus mencoba lagi.)ctrl-d untuk mengirim pesan. (Untuk diketahui - mengklik ctrl+c

Untuk membaca pesan yang Anda buat, jalankan perintah berikut untuk memulai konsumen:

$ kcat -b localhost:9093 -t test-topic -C  # consumer
one line per message
another line
% Reached end of topic test-topic [0] at offset 2

Menerbitkan teks sembarangan bukanlah hal yang kita inginkan, jadi mari kita coba memublikasikan pesan JSON saja. Untuk membuat ini lebih mudah untuk ekstensi, kami akan menulis beberapa skrip Python untuk menghasilkan dan menggunakan pesan.

Untuk memulai, Anda harus menginstal perpustakaan sebagai gantinya. dan instal dari , atau jika Anda menjalankannya lebih mahir, gunakan kafka-python. Anda dapat melakukannya dengan menjalankan pip install kafka-pythonvirtualenvrequirements.txt

# producer.py
from kafka import KafkaProducer
from datetime import datetime
import json
producer = KafkaProducer(
    bootstrap_servers=['localhost:9093'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('posts', {'author': 'choyiny', 'content': 'Kafka is cool!', 'created_at': datetime.now().isoformat()})

Di bawah ini adalah contoh konsumen Python yang berlangganan post dan mencetak setiap nilai.

# consumer.py
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
    'posts',
    bootstrap_servers=['localhost:9093'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# note that this for loop will block forever to wait for the next message
for message in consumer:
    print(message.value)

Menghubungkan ke Kafka dari container Docker lain

Jika Anda membuat pesan dengan topik Kafka, kemungkinan besar Anda telah merencanakan tujuan pesan tersebut. Misalnya saja di atas cuplikan menunjukkan bagaimana Anda dapat menulis konsumen python Anda sendiri. Namun terkadang, Anda ingin memindahkan pesan Anda ke orang lain database seperti Clickhouse atau Elasticsearch untuk pemrosesan atau visualisasi lebih lanjut.

Meskipun Anda dapat menyisipkan pesan langsung ke Clickhouse, untuk solusi yang lebih terukur, Anda dapat menggunakan penyisipan massal meminimalkan transaksi. (Sekali lagi, ini adalah topik untuk lain waktu... tujuan kami sebenarnya adalah mengaktifkan dan menjalankan POC.)

Mari kita tunjukkan contoh integrasi Kafka yang di-Docker ke Clickhouse, sebuah database OLAP. (Apa Clickhouse akan menjadi artikel untuk lain waktu...)

Untuk mempelajari lebih lanjut tentang mesin tabel Clickhouse, lihat Menggunakan mesin tabel Kafka.

Buat file bernama docker-compose.yml dan tambahkan konten berikut:

# docker-compose.yml
version: "3.7"
services:
  clickhouse:
    restart: always
    image: clickhouse/clickhouse-server
    ports:
      - "8123:8123"
      - "9000:9000"
    volumes:
      - "clickhouse-volume:/var/lib/clickhouse/"
volumes:
  clickhouse-volume:

Sekarang kita siap untuk terhubung ke Clickhouse. Jalankan perintah berikut untuk memulai wadah Clickhouse:

$ docker-compose -f clickhouse.docker-compose.yml exec clickhouse clickhouse-client
ClickHouse client version 22.11.2.30 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 22.11.2 revision 54460.

Warnings:
 * Linux is not using a fast clock source. Performance can be degraded. Check /sys/devices/system/clocksource/clocksource0/current_clocksource

0cd6f3269407 :)

Ini adalah shell interaktif yang memungkinkan Anda menjalankan perintah SQL. Untuk membuat tabel, jalankan perintah berikut:

-- create messages queue
CREATE TABLE default.message_queue
(
  created_at DateTime,
  content String,
  author String
)
ENGINE = Kafka(
  'kafka:9092',
  'posts',
  'clickhouse',
  'JSONEachRow'
) settings kafka_thread_per_consumer = 1, kafka_num_consumers = 1;

Perhatikan bagaimana kami menggunakan kafka:9092 sebagai string koneksi, bukan localhost:9093 saat menyambung ke Kafka secara lokal. Ini adalah karena kami menghubungkannya melalui jaringan Docker internal.

Mari buat beberapa tabel lagi untuk memvisualisasikan data:

-- create messages table
CREATE TABLE default.messages
(
  created_at DateTime,
  content String,
  author String
)
ENGINE = MergeTree
ORDER BY created_at;

-- create materialized view
CREATE MATERIALIZED VIEW default.messages_mv
TO default.messages
AS SELECT * FROM default.message_queue;

messages.

Menghubungkan semuanya bersama-sama

Untuk aplikasi demo, kita akan membuat titik akhir API POST /posts dengan Python Flask, dan alih-alih menyimpannya ke database secara langsung, kita akan memproduksinya ke topik Kafka < /span>. Selain itu, karena kami telah menyiapkan Clickhouse untuk menggunakan pesan dari Kafka, kami akan dapat melihat pesan tersebut di UI Clickhouse.repo demo iniposts. Ikuti di bawah ini atau dengan mengkloning

Diagram urutan untuk contoh aplikasi kita

Diagram urutan untuk contoh aplikasi kita

Berikut kode langsung untuk titik akhir API:

# app.py
# fun fact: This snippet was generated entirely by Copilot
from flask import Flask, request
from kafka import KafkaProducer
from datetime import datetime
import json

app = Flask(__name__)
producer = KafkaProducer(
    bootstrap_servers=['kafka:9093'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.route('/posts', methods=['POST'])
def create_post():
    post = request.get_json()
    # clickhouse can only parse strings without milliseconds
    post['created_at'] = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
    producer.send('posts', post)
    return 'ok'

Untuk menjalankannya, Anda dapat menggunakan perintah berikut:

$ flask run

Setelah dijalankan, Anda dapat mencoba titik akhir dengan menjalankan perintah curl:

$ curl -X POST -H "Content-Type: application/json" -d '{"author": "choyiny", "content": "Kafka is cool!"}' http://localhost:5000/posts

Akan lebih baik jika berhasil pada percobaan pertama, namun ini adalah perintah yang dapat Anda gunakan untuk mengeluarkan log dari Clickhouse untuk memeriksa kesalahan:

$ docker-compose exec clickhouse tail -f /var/log/clickhouse-server/clickhouse-server.log

Untuk menghapus pesan yang rusak dari topik kafka, cukup hapus seluruh topik dengan perintah ini:

$ docker-compose exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic posts

Idealnya, tidak ada kesalahan. Saya akan menyerahkannya sebagai latihan kepada pembaca untuk terhubung ke Clickhouse lagi dan memeriksa apakah data dari Kafka sedang digunakan.