Producer and Consumer for Kafka in .NET: An Exploration

CodeGuru content and product recommendations are editorially independent. We may make money when you click on links to our partners. Learn More.

In one of my previous articles, “New to Big Data? Start with Kafka,” I wrote an introduction to Kafka, a big data messaging system. In this article, let us explore setting up a test Kafka broker on a Windows machine, create a Kafka producer, and create a Kafka consumer using the .NET framework.

Setting Up a Test Kafka Broker on Windows

Kafka is a system that is designed to run on a Linux machine. You also can set up a test Kafka broker on a Windows machine and use it to create sample producers and consumers. To create the .NET producer and consumer, I have set up a test Kafka environment on a Wwindows machine. Follow this link to set it up; it has step-by-step instructions.

Once the setup is done, start the zookeeper and Kafka broker as instructed.

.NET Provider for Kafka

To successfully connect to a Kafka broker from a .NET framework application, you will need a Kafka provider for .NET. There are many available on the NuGet packages. In this article, I am using the Kafka provider named kafka-net.dll, as shown in Figure 1.

Kafka1
Figure 1: Installing kafka-net.dll

Note: This provider will work on .NET applications using the 4.5 and above framework because it uses the async and await features of C#.

.NET Producer: A Sample

In this section, let us create a sample console application that will be a producer to pump in the payload to a Kafka broker. Kafka messages will be stored into specific topics so the data will be produced to the one mentioned in your code. Following is the C# producer code.

using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;

namespace Producer
{
   class Program
   {
      static void Main(string[] args)
      {
         var options = new KafkaOptions
            (new Uri("http://localhost:9092"));
         var router = new BrokerRouter(options);

         var client = new KafkaNet.Producer(router);
         client.SendMessageAsync("testtopic", new[]
            { new Message("Hi Hello! Welcome to Kafka!") }).Wait();

         Console.ReadLine();
      }
   }
}

.NET Consumer: A Sample

Let us now create a consumer to consume the messages that are available in the Kafka broker from a specific topic.

using System;
using System.Text;
using KafkaNet;
using KafkaNet.Model;

namespace Consumer
{
   class Program
   {
      static void Main(string[] args)
      {
         var options = new KafkaOptions(new Uri("http://localhost:9092"),
            new Uri("http://localhost:9092"));
         var router = new BrokerRouter(options);
         var consumer = new KafkaNet.Consumer(new ConsumerOptions("testtopic",
            new BrokerRouter(options)));

         //Consume returns a blocking IEnumerable (ie: never ending stream)
         foreach (var message in consumer.Consume())
         {
            Console.WriteLine("Response: P{0},O{1} : {2}",
               message.Meta.PartitionId, message.Meta.Offset,
               Encoding.UTF8.GetString(message.Value));
         }
      }
   }
}

Now, run the producer to post the message to the local Kafka server. This should add the given message to the topic “testtopic”. Later, run the consumer to pull the messages from the topic “testtopic”. Along with the message, it also will give back information such as the offset id and partition id of the consumed message. Figure 2 shows the consumed message from the Kafka broker, along with the offset and partition information.

Kafka2
Figure 2: Receiving a response from Kafka

Consume Messages from a Particular Offset

A message on the Kafka server can be uniquely identified by combining the partition id and the offset id. If you have to reprocess the messages that you have already consumed, there is an option where you can provide the offset id and the partition id from where the message consumption will start with. Following is a sample code to highlight reprocessing from a given offset and partition.

         OffsetPosition[] offsetPositions = new OffsetPosition[]
         {
            new OffsetPosition()
            {
               Offset = 10,
               PartitionId = 0
            }
         };

         var consumer = new KafkaNet.Consumer(new ConsumerOptions("test1",
            new BrokerRouter(options)), offsetPositions);

I hope this article enlightened the readers on how to produce messages to a Kafka broker and consume the same messages, all using .NET applications.

Happy reading!

More by Author

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Must Read