Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ examples.
- [Docker](docker/README.md): Docker images, Dockerfiles, and container basics
- [Keycloak](keycloak/README.md): Identity management, JWT authentication,
Docker Compose setup
- [Kafka](kafka/README.md): Producer, Consumer, Docker Compose setup
4 changes: 4 additions & 0 deletions kafka/Kafka.slnx
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<Solution>
<Project Path="src/Consumer/Consumer.csproj" Id="14e3fc49-7c8e-4aaa-8e65-911a4d261fd9" />
<Project Path="src/Producer/Producer.csproj" Id="d4876fc2-e56f-4c2b-89bb-0906f5d57fdb" />
</Solution>
81 changes: 81 additions & 0 deletions kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Kafka

The aim of this project is to demonstrate how to implement a `Consumer` for
`Apache Kafka` in `.NET`. Project contains a sample producer and consumer
implementations. `Confluent.Kafka` `.NET` library and official `Apache Kafka`
docker image is used.

Producer and Consumer samples are implemented in a way to demonstrate a topic
with multiple partitions, auto and manual offset committing.

View [Apache Kafka](0) for more information on `Kafka`

View [confluent-kafka-dotnet](1) for more information for consumer and producer
implementations.

## Setup

- Run compose file to start a working `Kafka` server
- Create a topic using following command
```cmd
/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic <topic-name>
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
```
- Start `Producer` app to send messages
- Start `Consumer` app to receive messages

> [!NOTE]
>
> We currently faced an issue when all applications run in docker, since
> producer and consumer should wait for the server to be healthy but currently
> a valid health check could not be performed so we decided to only run `Kafka`
> server in docker

For more information for docker setup, view [kafka](2) repository

## Producer

Producers are clients that send messages to Kafka topics. They create data and
push to `Kafka` server. Producers can specify the partition a message belongs
to, or it can be assigned by the Kafka server based on a given key or
automatically.

Producers can send messages to multiple topics, and multiple producers can send
messages to the same topic. If topic has multiple partitions, messages with the
same key will always be in the same partition.

## Consumer

Consumers are clients that read messages from `Kafka` for given topic. Consumers
keep track of processed messages by committing an offset, which is stored in
topic partitions.

Consumers can subscribe to multiple topics and can also be grouped. Each
consumer group will have their individual message offsets so it enables multiple
groups to subscribe to same topic.

### Notes

- Consumers have group id and offset is associated with that group id, if group
id is randomly generated every time, the offset will be reset for that group,
and all persisted messages will be read from the beginning

- One or more consumer groups can be subscribed to a topic, and each consumer
group will have separate offsets.

- For a partitioned topic, each consumer will be assigned to an individual
partition if a topic has 3 partitions and 5 consumers, 2 of the consumers will
be left idle

- If number of partitions are more than consumer count in a group, a consumer
may have more than one partition assigned

- If a consumer of a group fails, the partitions will be assigned to remaining
consumers by `Kafka`

- `Kafka` moves the offset when committed, Auto-commit is set to true by default
but can be turned off and committed manually.

[0]: https://kafka.apache.org/
[1]: https://github.com/confluentinc/confluent-kafka-dotnet
[2]: https://github.com/apache/kafka/tree/trunk/docker/examples
19 changes: 19 additions & 0 deletions kafka/compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: '3.8'

services:
kafka:
image: apache/kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
14 changes: 14 additions & 0 deletions kafka/src/Consumer/Consumer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.13.0" />
</ItemGroup>

</Project>
46 changes: 46 additions & 0 deletions kafka/src/Consumer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using Confluent.Kafka;

var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "1",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var consumerBuilder = new ConsumerBuilder<string, string>(config);
var consumerWithNoCommitBuilder = new ConsumerBuilder<string, string>(new ConsumerConfig(config) { EnableAutoCommit = false });

Task[] consumers = [
Consume(consumerBuilder, 0),
Consume(consumerWithNoCommitBuilder, 1, manualCommit: true),
Consume(consumerWithNoCommitBuilder, 2)
];

await Task.WhenAll(consumers);

Task Consume(ConsumerBuilder<string, string> builder, int consumerId,
bool manualCommit = false
) =>
Task.Run(() =>
{
using var consumer = builder.Build();
consumer.Subscribe("demo-topic");

while (true)
{
var result = consumer.Consume(TimeSpan.FromSeconds(5));
if (result == null) continue;

Console.WriteLine(
$"Consumer: {consumerId}," +
$"Partition: {result.Partition.Value}," +
$"Offset: {result.Offset}," +
$"Key: {result.Message.Key}," +
$"Value: {result.Message.Value}"
);

if (manualCommit)
{
consumer.Commit(result);
}
}
});
14 changes: 14 additions & 0 deletions kafka/src/Producer/Producer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.13.0" />
</ItemGroup>

</Project>
45 changes: 45 additions & 0 deletions kafka/src/Producer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Confluent.Kafka;

var producerBuilder = new ProducerBuilder<string, string>(new ProducerConfig()
{
BootstrapServers = "localhost:9092",
Acks = Acks.All,
});

using var producer = producerBuilder.Build();

Task[] producers = [
Produce(),
Produce(key: "1234")
];

await Task.WhenAll(producers);

Task Produce(
string? key = default
) => Task.Run(async () =>
{
while (true)
{
try
{
var result = await producer.ProduceAsync("demo-topic", new Message<string, string>
{
Key = key ?? $"#{DateTime.Now:ddhhmmss}",
Value = "Message"
});
Console.WriteLine(
$"Partition: {result.Partition.Value}," +
$"Offset: {result.Offset}," +
$"Key: {result.Message.Key}," +
$"Value: {result.Message.Value}"
);

await Task.Delay(2000);
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
});