In this application, we are going to create a Kafka application with the help of Confluent Cloud and Azure Function App and its step-by-step implementation using .NET Core 7
Agenda
- Overview of Confluent Cloud
- Setup of Confluent Cloud
- Step-by-step implementation using .NET Core 7 Application
If you don’t know about Kafka and its architecture, I recommend you read this article for a better understanding.
Overview of Confluent Cloud
- Confluent Cloud is the data streaming platform that enables us to access, manage, and store data.
- Confluent Kafka provides different connectors to connect with different data sources easily.
- Confluent Cloud has many features using which we can scale our Kafka cluster, maintain and monitor with high availability and zero downtime.
Setup of Confluent Cloud
Step 1
Open the Confluent Cloud site and log in the same.
Step 2
Add a new Kafka Cluster
![]()
Step 3
Select basic plan
![]()
Step 4
Next, choose Azure Cloud and region.
![]()
Step 5
Skip payment section
![]()
Step 6
Enter the Kafka Cluster name.
![]()
Step 7
Afterward, go to the cluster section and open the topic section from the side nav and create a new topic.
![]()
![]()
![]()
Step 8
Click on API Keys and add a new key that we need inside the .NET Core Application.
![]()
![]()
![]()
![]()
Step-by-step implementation using .NET Core 7 Application
Step 1
Create a new .NET Core Web API Application
![]()
Step 2
Configure the application
![]()
Step 3
Provide the additional details
![]()
Step 4
Install the following NuGet Packages.
![]()
Step 5
Create a new Car Details class.
namespace ConfluentKafkaDemo.Entities
{
public class CarDetails
{
public int CarId { get; set; }
public string CarName { get; set; }
public string BookingStatus { get; set; }
}
}
Step 6
Add Kafka Confluent Cloud configuration details inside the app settings file.
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"KafkaConfiguration": {
"BootstrapServers": "<bootstrap server url>",
"SaslUsername": "<API Key>",
"SaslPassword": "<API Password>",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "Plain"
},
"TopicName": "topic_1"
}
Step 7
Open the Program file and register the few Kafka Services as shown below.
using Confluent.Kafka;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
var producerConfiguration = new ProducerConfig();
builder.Configuration.Bind("KafkaConfiguration", producerConfiguration);
builder.Services.AddSingleton < ProducerConfig > (producerConfiguration);
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment()) {
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
Step 8
Next, create a car details controller
using Confluent.Kafka;
using ConfluentKafkaDemo.Entities;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
namespace ConfluentKafkaDemo.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class CarsController : ControllerBase
{
private ProducerConfig _configuration;
private readonly IConfiguration _config;
public CarsController(ProducerConfig configuration, IConfiguration config)
{
_configuration = configuration;
_config = config;
}
[HttpPost("sendBookingDetails")]
public async Task<ActionResult> Get([FromBody] CarDetails employee)
{
string serializedData = JsonConvert.SerializeObject(employee);
var topic = _config.GetSection("TopicName").Value;
using (var producer = new ProducerBuilder<Null, string>(_configuration).Build())
{
await producer.ProduceAsync(topic, new Message<Null, string> { Value = serializedData });
producer.Flush(TimeSpan.FromSeconds(10));
return Ok(true);
}
}
}
}
Step 9
Add a new Azure Function App project inside the same solution.
![]()
Step 10
Configure the function app
![]()
Step 11
Provide the additional details and select the Kafka trigger as a function.
![]()
Step 12
Open Kafka Receiver and provide the details of the confluent cloud, like the API key, password, and bootstrap URL. Here I just hard code values at the top of the function, but you can store them inside the JSON file and access the same.
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace KafkaFunctionApp
{
public class KafkaReciever
{
// KafkaTrigger sample
// Consume the message from "topic" on the LocalBroker.
// Add `BrokerList` and `KafkaPassword` to the local.settings.json
// For EventHubs
// "BrokerList": "{EVENT_HUBS_NAMESPACE}.servicebus.windows.net:9093"
// "KafkaPassword":"{EVENT_HUBS_CONNECTION_STRING}
[FunctionName("Function1")]
public void Run(
[KafkaTrigger("<bootstrap server url>",
"topic_1",
Username = "<API Key>",
Password = "<API Password>",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string>[] events,
ILogger log)
{
foreach (KafkaEventData<string> eventData in events)
{
log.LogInformation($"C# Kafka trigger function processed a message: {eventData.Value}");
}
}
}
}
Step 13
Select both projects as a startup project in the solution and start the same.
Step 14
Execute post request with few details.
![]()
Step 15
Open the message section on the confluent cloud topic and see the message we sent there.
![]()
Step 16
After sending the message, our Kafka receiver will consume the same.
![]()
GitHub URL
https://github.com/Jaydeep-007/ConfluentKafkaDemo
Conclusion
In this article, we discussed Confluent Cloud and Azure Kafka Trigger with the help of practical implementation using .NET Core 7.
Happy Coding!!!