Producer and Consumer for Kafka in .NET: An Exploration

In one of my previous articles, "New to Big Data? Start with Kafka," I wrote an introduction to Kafka, a big data messaging system. In this article, let us explore setting up a test Kafka broker on a Windows machine, create a Kafka producer, and create a Kafka consumer using the .NET framework.

Setting Up a Test Kafka Broker on Windows

Kafka is a system that is designed to run on a Linux machine. You also can set up a test Kafka broker on a Windows machine and use it to create sample producers and consumers. To create the .NET producer and consumer, I have set up a test Kafka environment on a Wwindows machine. Follow this link to set it up; it has step-by-step instructions.

Once the setup is done, start the zookeeper and Kafka broker as instructed.

.NET Provider for Kafka

To successfully connect to a Kafka broker from a .NET framework application, you will need a Kafka provider for .NET. There are many available on the NuGet packages. In this article, I am using the Kafka provider named kafka-net.dll, as shown in Figure 1.

Kafka1
Figure 1: Installing kafka-net.dll

Note: This provider will work on .NET applications using the 4.5 and above framework because it uses the async and await features of C#.

.NET Producer: A Sample

In this section, let us create a sample console application that will be a producer to pump in the payload to a Kafka broker. Kafka messages will be stored into specific topics so the data will be produced to the one mentioned in your code. Following is the C# producer code.

using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;

namespace Producer
{
   class Program
   {
      static void Main(string[] args)
      {
         var options = new KafkaOptions
            (new Uri("http://localhost:9092"));
         var router = new BrokerRouter(options);

         var client = new KafkaNet.Producer(router);
         client.SendMessageAsync("testtopic", new[]
            { new Message("Hi Hello! Welcome to Kafka!") }).Wait();

         Console.ReadLine();
      }
   }
}

.NET Consumer: A Sample

Let us now create a consumer to consume the messages that are available in the Kafka broker from a specific topic.

using System;
using System.Text;
using KafkaNet;
using KafkaNet.Model;

namespace Consumer
{
   class Program
   {
      static void Main(string[] args)
      {
         var options = new KafkaOptions(new Uri("http://localhost:9092"),
            new Uri("http://localhost:9092"));
         var router = new BrokerRouter(options);
         var consumer = new KafkaNet.Consumer(new ConsumerOptions("testtopic",
            new BrokerRouter(options)));

         //Consume returns a blocking IEnumerable (ie: never ending stream)
         foreach (var message in consumer.Consume())
         {
            Console.WriteLine("Response: P{0},O{1} : {2}",
               message.Meta.PartitionId, message.Meta.Offset,
               Encoding.UTF8.GetString(message.Value));
         }
      }
   }
}

Now, run the producer to post the message to the local Kafka server. This should add the given message to the topic "testtopic". Later, run the consumer to pull the messages from the topic "testtopic". Along with the message, it also will give back information such as the offset id and partition id of the consumed message. Figure 2 shows the consumed message from the Kafka broker, along with the offset and partition information.

Kafka2
Figure 2: Receiving a response from Kafka

Consume Messages from a Particular Offset

A message on the Kafka server can be uniquely identified by combining the partition id and the offset id. If you have to reprocess the messages that you have already consumed, there is an option where you can provide the offset id and the partition id from where the message consumption will start with. Following is a sample code to highlight reprocessing from a given offset and partition.

         OffsetPosition[] offsetPositions = new OffsetPosition[]
         {
            new OffsetPosition()
            {
               Offset = 10,
               PartitionId = 0
            }
         };

         var consumer = new KafkaNet.Consumer(new ConsumerOptions("test1",
            new BrokerRouter(options)), offsetPositions);

I hope this article enlightened the readers on how to produce messages to a Kafka broker and consume the same messages, all using .NET applications.

Happy reading!



Related Articles

Comments

  • Unable to connect to kafka server

    Posted by Charan on 11/19/2016 02:58am

    Hi,i have installed kafka following this link https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04 While consuming message from server not getting any response Please help me in resolving this issue

    Reply
  • Issue

    Posted by naveen on 10/27/2016 08:32am

    Hi, I am not getting any message in console window, when it checks for 'SendMessageAsync 'API I am not getting any response in producer API

    Reply
  • No message on console of consumer

    Posted by vyshak on 03/23/2016 02:24am

    Hi, Thanks a lot for the post.I do not see any message on the console. But I do see message received notification on the debug window. Any suggestions?

    Reply
Leave a Comment
  • Your email address will not be published. All fields are required.

Top White Papers and Webcasts

Most Popular Programming Stories

More for Developers

RSS Feeds

Thanks for your registration, follow us on our social networks to keep up-to-date