diff --git a/README.md b/README.md index 69623d2..f9993fc 100644 --- a/README.md +++ b/README.md @@ -11,3 +11,4 @@ examples. - [Docker](docker/README.md): Docker images, Dockerfiles, and container basics - [Keycloak](keycloak/README.md): Identity management, JWT authentication, Docker Compose setup +- [Kafka](kafka/README.md): Producer, Consumer, Docker Compose setup diff --git a/kafka/Kafka.slnx b/kafka/Kafka.slnx new file mode 100644 index 0000000..05e440f --- /dev/null +++ b/kafka/Kafka.slnx @@ -0,0 +1,4 @@ + + + + diff --git a/kafka/README.md b/kafka/README.md new file mode 100644 index 0000000..0084b13 --- /dev/null +++ b/kafka/README.md @@ -0,0 +1,81 @@ +# Kafka + +The aim of this project is to demonstrate how to implement a `Consumer` for +`Apache Kafka` in `.NET`. Project contains a sample producer and consumer +implementations. `Confluent.Kafka` `.NET` library and official `Apache Kafka` +docker image is used. + +Producer and Consumer samples are implemented in a way to demonstrate a topic +with multiple partitions, auto and manual offset committing. + +View [Apache Kafka](0) for more information on `Kafka` + +View [confluent-kafka-dotnet](1) for more information for consumer and producer +implementations. + +## Setup + +- Run compose file to start a working `Kafka` server +- Create a topic using following command +```cmd +/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic +--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 +``` +- Start `Producer` app to send messages +- Start `Consumer` app to receive messages + +> [!NOTE] +> +> We currently faced an issue when all applications run in docker, since +> producer and consumer should wait for the server to be healthy but currently +> a valid health check could not be performed so we decided to only run `Kafka` +> server in docker + +For more information for docker setup, view [kafka](2) repository + +## Producer + +Producers are clients that send messages to Kafka topics. They create data and +push to `Kafka` server. Producers can specify the partition a message belongs +to, or it can be assigned by the Kafka server based on a given key or +automatically. + +Producers can send messages to multiple topics, and multiple producers can send +messages to the same topic. If topic has multiple partitions, messages with the +same key will always be in the same partition. + +## Consumer + +Consumers are clients that read messages from `Kafka` for given topic. Consumers +keep track of processed messages by committing an offset, which is stored in +topic partitions. + +Consumers can subscribe to multiple topics and can also be grouped. Each +consumer group will have their individual message offsets so it enables multiple +groups to subscribe to same topic. + +### Notes + +- Consumers have group id and offset is associated with that group id, if group + id is randomly generated every time, the offset will be reset for that group, + and all persisted messages will be read from the beginning + +- One or more consumer groups can be subscribed to a topic, and each consumer + group will have separate offsets. + +- For a partitioned topic, each consumer will be assigned to an individual + partition if a topic has 3 partitions and 5 consumers, 2 of the consumers will + be left idle + +- If number of partitions are more than consumer count in a group, a consumer + may have more than one partition assigned + +- If a consumer of a group fails, the partitions will be assigned to remaining + consumers by `Kafka` + +- `Kafka` moves the offset when committed, Auto-commit is set to true by default + but can be turned off and committed manually. + +[0]: https://kafka.apache.org/ +[1]: https://github.com/confluentinc/confluent-kafka-dotnet +[2]: https://github.com/apache/kafka/tree/trunk/docker/examples \ No newline at end of file diff --git a/kafka/compose.yml b/kafka/compose.yml new file mode 100644 index 0000000..eaebccd --- /dev/null +++ b/kafka/compose.yml @@ -0,0 +1,19 @@ +version: '3.8' + +services: + kafka: + image: apache/kafka:latest + container_name: kafka + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 diff --git a/kafka/src/Consumer/Consumer.csproj b/kafka/src/Consumer/Consumer.csproj new file mode 100644 index 0000000..8c0e676 --- /dev/null +++ b/kafka/src/Consumer/Consumer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs new file mode 100644 index 0000000..524d8bb --- /dev/null +++ b/kafka/src/Consumer/Program.cs @@ -0,0 +1,46 @@ +using Confluent.Kafka; + +var config = new ConsumerConfig +{ + BootstrapServers = "localhost:9092", + GroupId = "1", + AutoOffsetReset = AutoOffsetReset.Earliest +}; +var consumerBuilder = new ConsumerBuilder(config); +var consumerWithNoCommitBuilder = new ConsumerBuilder(new ConsumerConfig(config) { EnableAutoCommit = false }); + +Task[] consumers = [ + Consume(consumerBuilder, 0), + Consume(consumerWithNoCommitBuilder, 1, manualCommit: true), + Consume(consumerWithNoCommitBuilder, 2) +]; + +await Task.WhenAll(consumers); + +Task Consume(ConsumerBuilder builder, int consumerId, + bool manualCommit = false +) => + Task.Run(() => + { + using var consumer = builder.Build(); + consumer.Subscribe("demo-topic"); + + while (true) + { + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result == null) continue; + + Console.WriteLine( + $"Consumer: {consumerId}," + + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); + + if (manualCommit) + { + consumer.Commit(result); + } + } + }); \ No newline at end of file diff --git a/kafka/src/Producer/Producer.csproj b/kafka/src/Producer/Producer.csproj new file mode 100644 index 0000000..8c0e676 --- /dev/null +++ b/kafka/src/Producer/Producer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + diff --git a/kafka/src/Producer/Program.cs b/kafka/src/Producer/Program.cs new file mode 100644 index 0000000..e4c01ef --- /dev/null +++ b/kafka/src/Producer/Program.cs @@ -0,0 +1,45 @@ +using Confluent.Kafka; + +var producerBuilder = new ProducerBuilder(new ProducerConfig() +{ + BootstrapServers = "localhost:9092", + Acks = Acks.All, +}); + +using var producer = producerBuilder.Build(); + +Task[] producers = [ + Produce(), + Produce(key: "1234") +]; + +await Task.WhenAll(producers); + +Task Produce( + string? key = default +) => Task.Run(async () => + { + while (true) + { + try + { + var result = await producer.ProduceAsync("demo-topic", new Message + { + Key = key ?? $"#{DateTime.Now:ddhhmmss}", + Value = "Message" + }); + Console.WriteLine( + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); + + await Task.Delay(2000); + } + catch (Exception e) + { + Console.WriteLine(e.Message); + } + } + });