From ad1e3c9f17f5df954fb4be5fdc2fe031a1a49e15 Mon Sep 17 00:00:00 2001 From: dncsvr Date: Wed, 11 Feb 2026 17:26:29 +0300 Subject: [PATCH 1/5] init `issue/hello-kafka` --- README.md | 1 + kafka/README.md | 1 + 2 files changed, 2 insertions(+) create mode 100644 kafka/README.md diff --git a/README.md b/README.md index 69623d2..f9993fc 100644 --- a/README.md +++ b/README.md @@ -11,3 +11,4 @@ examples. - [Docker](docker/README.md): Docker images, Dockerfiles, and container basics - [Keycloak](keycloak/README.md): Identity management, JWT authentication, Docker Compose setup +- [Kafka](kafka/README.md): Producer, Consumer, Docker Compose setup diff --git a/kafka/README.md b/kafka/README.md new file mode 100644 index 0000000..2eb9d10 --- /dev/null +++ b/kafka/README.md @@ -0,0 +1 @@ +# Kafka From f6b4427a6e181a3e81e2622d3f17057b890302fe Mon Sep 17 00:00:00 2001 From: dncsvr Date: Thu, 12 Feb 2026 01:50:12 +0300 Subject: [PATCH 2/5] add sample Consumer and Producer - add compose file for kafka --- kafka/Kafka.slnx | 4 ++++ kafka/compose.yml | 19 +++++++++++++++++++ kafka/src/Consumer/Consumer.csproj | 14 ++++++++++++++ kafka/src/Consumer/Program.cs | 22 ++++++++++++++++++++++ kafka/src/Producer/Producer.csproj | 14 ++++++++++++++ kafka/src/Producer/Program.cs | 27 +++++++++++++++++++++++++++ 6 files changed, 100 insertions(+) create mode 100644 kafka/Kafka.slnx create mode 100644 kafka/compose.yml create mode 100644 kafka/src/Consumer/Consumer.csproj create mode 100644 kafka/src/Consumer/Program.cs create mode 100644 kafka/src/Producer/Producer.csproj create mode 100644 kafka/src/Producer/Program.cs diff --git a/kafka/Kafka.slnx b/kafka/Kafka.slnx new file mode 100644 index 0000000..05e440f --- /dev/null +++ b/kafka/Kafka.slnx @@ -0,0 +1,4 @@ + + + + diff --git a/kafka/compose.yml b/kafka/compose.yml new file mode 100644 index 0000000..eaebccd --- /dev/null +++ b/kafka/compose.yml @@ -0,0 +1,19 @@ +version: '3.8' + +services: + kafka: + image: apache/kafka:latest + container_name: kafka + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 diff --git a/kafka/src/Consumer/Consumer.csproj b/kafka/src/Consumer/Consumer.csproj new file mode 100644 index 0000000..8c0e676 --- /dev/null +++ b/kafka/src/Consumer/Consumer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs new file mode 100644 index 0000000..9494b50 --- /dev/null +++ b/kafka/src/Consumer/Program.cs @@ -0,0 +1,22 @@ +using Confluent.Kafka; + +using var consumer = new ConsumerBuilder(new ConsumerConfig +{ + BootstrapServers = "localhost:9092", + GroupId = "1", + AutoOffsetReset = AutoOffsetReset.Earliest +}).Build(); + +consumer.Subscribe("demo-topic"); + +while (true) +{ + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result == null) + { + Console.WriteLine("Awaiting Message"); + continue; + } + + Console.WriteLine($"Key: {result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); +} diff --git a/kafka/src/Producer/Producer.csproj b/kafka/src/Producer/Producer.csproj new file mode 100644 index 0000000..8c0e676 --- /dev/null +++ b/kafka/src/Producer/Producer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + diff --git a/kafka/src/Producer/Program.cs b/kafka/src/Producer/Program.cs new file mode 100644 index 0000000..6310ec0 --- /dev/null +++ b/kafka/src/Producer/Program.cs @@ -0,0 +1,27 @@ +using Confluent.Kafka; + +using var producer = new ProducerBuilder(new ProducerConfig() +{ + BootstrapServers = "localhost:9092", + Acks = Acks.All, +}).Build(); + +while (true) +{ + try + { + var result = await producer.ProduceAsync("demo-topic", new Message + { + Key = Guid.NewGuid().ToString(), + Value = $"Message #{DateTime.Now:yyyyMMddhhmmss}" + }); + + Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); + + await Task.Delay(2000); + } + catch (Exception e) + { + Console.WriteLine(e.Message); + } +} \ No newline at end of file From bad72414177020a125400235b98151ec6fa274d4 Mon Sep 17 00:00:00 2001 From: dncsvr Date: Thu, 12 Feb 2026 17:25:30 +0300 Subject: [PATCH 3/5] improve sample to use topic with multiple partitions and multiple consumer --- kafka/README.md | 7 +++++++ kafka/src/Consumer/Program.cs | 33 +++++++++++++++++++++------------ kafka/src/Producer/Program.cs | 22 ++++++++++++++++------ 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/kafka/README.md b/kafka/README.md index 2eb9d10..3aa36c4 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -1 +1,8 @@ # Kafka + + +Create a topic using following command + +```cmd +/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 +``` \ No newline at end of file diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs index 9494b50..bcb49b1 100644 --- a/kafka/src/Consumer/Program.cs +++ b/kafka/src/Consumer/Program.cs @@ -1,22 +1,31 @@ using Confluent.Kafka; -using var consumer = new ConsumerBuilder(new ConsumerConfig +var consumerBuilder = new ConsumerBuilder(new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "1", AutoOffsetReset = AutoOffsetReset.Earliest -}).Build(); +}); -consumer.Subscribe("demo-topic"); +var consumers = Enumerable.Repeat(0, 3).Select((_, index) => Execute(consumerBuilder, index)); -while (true) -{ - var result = consumer.Consume(TimeSpan.FromSeconds(5)); - if (result == null) +await Task.WhenAll(consumers); + +Task Execute(ConsumerBuilder builder, int consumerId) => + Task.Run(() => { - Console.WriteLine("Awaiting Message"); - continue; - } + using var consumer = builder.Build(); + consumer.Subscribe("demo-topic"); + + while (true) + { + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result == null) continue; - Console.WriteLine($"Key: {result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); -} + Console.Write($"Consumer: {consumerId},"); + Console.Write($"Partition: {result.Partition.Value},"); + Console.Write($"Offset: {result.Offset},"); + Console.Write($"Key: {result.Message.Key},"); + Console.WriteLine($"Value: {result.Message.Value}"); + } + }); \ No newline at end of file diff --git a/kafka/src/Producer/Program.cs b/kafka/src/Producer/Program.cs index 6310ec0..1e2e97f 100644 --- a/kafka/src/Producer/Program.cs +++ b/kafka/src/Producer/Program.cs @@ -1,24 +1,34 @@ using Confluent.Kafka; -using var producer = new ProducerBuilder(new ProducerConfig() +var producerBuilder = new ProducerBuilder(new ProducerConfig() { BootstrapServers = "localhost:9092", Acks = Acks.All, -}).Build(); +}); + +var producer = producerBuilder.Build(); while (true) { try { - var result = await producer.ProduceAsync("demo-topic", new Message + var result = await producer.ProduceAsync("demo-topic" , new Message { - Key = Guid.NewGuid().ToString(), - Value = $"Message #{DateTime.Now:yyyyMMddhhmmss}" + Key = $"#{DateTime.Now:yyyyMMddhhmmss}", + Value = "Message" }); + Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); + + await Task.Delay(500); + result = await producer.ProduceAsync("demo-topic", new Message + { + Key = "1234", + Value = $"Message" + }); Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); - await Task.Delay(2000); + await Task.Delay(500); } catch (Exception e) { From 2f8837f1c294819c1e3041de9ed8ea21f8e4864d Mon Sep 17 00:00:00 2001 From: dncsvr Date: Thu, 12 Feb 2026 18:13:41 +0300 Subject: [PATCH 4/5] begin writing documentation - refactor producer and consumer sample --- kafka/README.md | 33 ++++++++++++++++++++- kafka/src/Consumer/Program.cs | 33 +++++++++++++++------ kafka/src/Producer/Program.cs | 56 ++++++++++++++++++++--------------- 3 files changed, 88 insertions(+), 34 deletions(-) diff --git a/kafka/README.md b/kafka/README.md index 3aa36c4..cdd14de 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -1,8 +1,39 @@ # Kafka +We used `Confluent.Kafka` `.NET` library and `apache/kafka` docker image in this +demo project + +## Setup Create a topic using following command ```cmd /opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 -``` \ No newline at end of file +``` + +## Consumer + + +### Notes + +- Consumers have group id and offset is associated with the group id, if group + id is randomly generated everytime, the offset will be reset for that group, + and all persisted messages will be read from the beginning + +- One ore more consumer groups can be subscribed to a topic, and each conmsumer + group will have seperate offsets. + +- For a partioned topic, each consumer will be assigned to an individual + partition if a topic has 3 partitions and 5 consumers, 2 of the consumers will + be left idle + +- If number of partitions are more than consumer count in a group, a consumer may + have more than one partitions assigned + +- If a consumer of a group fails, the partitions will be assigned to remaining + consumers by kafka + +- Messages with same key will exists in the same partition + +- Kafka moves the offset when commited, Auto commit is set true by default, can + be turned of an manually committed. diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs index bcb49b1..524d8bb 100644 --- a/kafka/src/Consumer/Program.cs +++ b/kafka/src/Consumer/Program.cs @@ -1,17 +1,25 @@ using Confluent.Kafka; -var consumerBuilder = new ConsumerBuilder(new ConsumerConfig +var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "1", AutoOffsetReset = AutoOffsetReset.Earliest -}); +}; +var consumerBuilder = new ConsumerBuilder(config); +var consumerWithNoCommitBuilder = new ConsumerBuilder(new ConsumerConfig(config) { EnableAutoCommit = false }); -var consumers = Enumerable.Repeat(0, 3).Select((_, index) => Execute(consumerBuilder, index)); +Task[] consumers = [ + Consume(consumerBuilder, 0), + Consume(consumerWithNoCommitBuilder, 1, manualCommit: true), + Consume(consumerWithNoCommitBuilder, 2) +]; await Task.WhenAll(consumers); -Task Execute(ConsumerBuilder builder, int consumerId) => +Task Consume(ConsumerBuilder builder, int consumerId, + bool manualCommit = false +) => Task.Run(() => { using var consumer = builder.Build(); @@ -22,10 +30,17 @@ Task Execute(ConsumerBuilder builder, int consumerId) => var result = consumer.Consume(TimeSpan.FromSeconds(5)); if (result == null) continue; - Console.Write($"Consumer: {consumerId},"); - Console.Write($"Partition: {result.Partition.Value},"); - Console.Write($"Offset: {result.Offset},"); - Console.Write($"Key: {result.Message.Key},"); - Console.WriteLine($"Value: {result.Message.Value}"); + Console.WriteLine( + $"Consumer: {consumerId}," + + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); + + if (manualCommit) + { + consumer.Commit(result); + } } }); \ No newline at end of file diff --git a/kafka/src/Producer/Program.cs b/kafka/src/Producer/Program.cs index 1e2e97f..e4c01ef 100644 --- a/kafka/src/Producer/Program.cs +++ b/kafka/src/Producer/Program.cs @@ -6,32 +6,40 @@ Acks = Acks.All, }); -var producer = producerBuilder.Build(); +using var producer = producerBuilder.Build(); -while (true) -{ - try - { - var result = await producer.ProduceAsync("demo-topic" , new Message - { - Key = $"#{DateTime.Now:yyyyMMddhhmmss}", - Value = "Message" - }); - Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); +Task[] producers = [ + Produce(), + Produce(key: "1234") +]; - await Task.Delay(500); +await Task.WhenAll(producers); - result = await producer.ProduceAsync("demo-topic", new Message +Task Produce( + string? key = default +) => Task.Run(async () => + { + while (true) { - Key = "1234", - Value = $"Message" - }); - Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); + try + { + var result = await producer.ProduceAsync("demo-topic", new Message + { + Key = key ?? $"#{DateTime.Now:ddhhmmss}", + Value = "Message" + }); + Console.WriteLine( + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); - await Task.Delay(500); - } - catch (Exception e) - { - Console.WriteLine(e.Message); - } -} \ No newline at end of file + await Task.Delay(2000); + } + catch (Exception e) + { + Console.WriteLine(e.Message); + } + } + }); From d78bbfb689c031b8c4eb3c5114e8704f559d3c52 Mon Sep 17 00:00:00 2001 From: dncsvr Date: Thu, 12 Feb 2026 23:19:06 +0300 Subject: [PATCH 5/5] update documentation --- kafka/README.md | 74 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/kafka/README.md b/kafka/README.md index cdd14de..0084b13 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -1,39 +1,81 @@ # Kafka -We used `Confluent.Kafka` `.NET` library and `apache/kafka` docker image in this -demo project +The aim of this project is to demonstrate how to implement a `Consumer` for +`Apache Kafka` in `.NET`. Project contains a sample producer and consumer +implementations. `Confluent.Kafka` `.NET` library and official `Apache Kafka` +docker image is used. -## Setup +Producer and Consumer samples are implemented in a way to demonstrate a topic +with multiple partitions, auto and manual offset committing. + +View [Apache Kafka](0) for more information on `Kafka` -Create a topic using following command +View [confluent-kafka-dotnet](1) for more information for consumer and producer +implementations. +## Setup + +- Run compose file to start a working `Kafka` server +- Create a topic using following command ```cmd -/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 +/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic +--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 ``` +- Start `Producer` app to send messages +- Start `Consumer` app to receive messages + +> [!NOTE] +> +> We currently faced an issue when all applications run in docker, since +> producer and consumer should wait for the server to be healthy but currently +> a valid health check could not be performed so we decided to only run `Kafka` +> server in docker + +For more information for docker setup, view [kafka](2) repository + +## Producer + +Producers are clients that send messages to Kafka topics. They create data and +push to `Kafka` server. Producers can specify the partition a message belongs +to, or it can be assigned by the Kafka server based on a given key or +automatically. + +Producers can send messages to multiple topics, and multiple producers can send +messages to the same topic. If topic has multiple partitions, messages with the +same key will always be in the same partition. ## Consumer +Consumers are clients that read messages from `Kafka` for given topic. Consumers +keep track of processed messages by committing an offset, which is stored in +topic partitions. + +Consumers can subscribe to multiple topics and can also be grouped. Each +consumer group will have their individual message offsets so it enables multiple +groups to subscribe to same topic. ### Notes -- Consumers have group id and offset is associated with the group id, if group - id is randomly generated everytime, the offset will be reset for that group, +- Consumers have group id and offset is associated with that group id, if group + id is randomly generated every time, the offset will be reset for that group, and all persisted messages will be read from the beginning -- One ore more consumer groups can be subscribed to a topic, and each conmsumer - group will have seperate offsets. +- One or more consumer groups can be subscribed to a topic, and each consumer + group will have separate offsets. -- For a partioned topic, each consumer will be assigned to an individual +- For a partitioned topic, each consumer will be assigned to an individual partition if a topic has 3 partitions and 5 consumers, 2 of the consumers will be left idle -- If number of partitions are more than consumer count in a group, a consumer may - have more than one partitions assigned +- If number of partitions are more than consumer count in a group, a consumer + may have more than one partition assigned - If a consumer of a group fails, the partitions will be assigned to remaining - consumers by kafka + consumers by `Kafka` -- Messages with same key will exists in the same partition +- `Kafka` moves the offset when committed, Auto-commit is set to true by default + but can be turned off and committed manually. -- Kafka moves the offset when commited, Auto commit is set true by default, can - be turned of an manually committed. +[0]: https://kafka.apache.org/ +[1]: https://github.com/confluentinc/confluent-kafka-dotnet +[2]: https://github.com/apache/kafka/tree/trunk/docker/examples \ No newline at end of file