+1 vote
in .NET Core by

9 Answers

+1 vote
by
B1: Cài đặt zookeeper trong file docker-compose.yml
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    ports:
      - 2181:2181
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
    networks:
      - broker-kafka
+1 vote
by
B2: Cài đặt Kafka Broker trong file docker-compose.yml
  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    networks:
      - broker-kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000
      KAFKA_BROKER_ID: 1
      KAFKA_MIN_INSYNC_REPLICAS: 1
+1 vote
by
B3: Cài đặt KafDrop trong file docker-compose.yml
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    networks:
      - broker-kafka
    depends_on:
      - kafka
    ports:
      - 19000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka:29092
+1 vote
by
B4: Add network cho kafka broker, zookeper và kafdrop

networks:
  broker-kafka:
    driver: bridge
+1 vote
by
B5: Add nuget "Confluent.Kafka" trong service cần gửi message vào kafka
+1 vote
by
B6: Add Producer để gửi message vào topic

public class KafkaProducerService
{
    private readonly IProducer<string, string> _producer;
    public KafkaProducerService(string bootstrapServers)
    {
        var config = new ProducerConfig { BootstrapServers = bootstrapServers };
        _producer = new ProducerBuilder<string, string>(config).Build();
    }

    public async Task SendMessageAsync(string topic, EmailRequest emailRequest)
    {
        var message = new Message<string, string>
        {
            Key = emailRequest.RequestId,
            Value = JsonSerializer.Serialize(emailRequest, typeof(EmailRequest))
        };
        await _producer.ProduceAsync(topic, message);
    }
}
+1 vote
by
B7: Add kafka broker port và đăng ký producer trong Program.cs

var bootstrapServers = "kafka:29092";
builder.Services.AddSingleton(new KafkaProducerService(bootstrapServers));
builder.Services.AddSingleton(new RequestStatusService());
+1 vote
by
B8: Tạo api để test send message vào topic kafka

        [HttpPost("send-email")]
        public async Task<IActionResult> SendEmail([FromBody] EmailRequest emailRequest)
        {
            var requestId = Guid.NewGuid().ToString();
            emailRequest.RequestId = requestId;
            await _kafkaProducerService.SendMessageAsync(_emailTopic, emailRequest);
            return Ok(new { requestId });
        }
+1 vote
by
B9: Kiểm tra message đã insert hay chưa trên KafDrop
vd: http://localhost:19000
Welcome to Qtsd Q&A, where you can ask questions and receive answers from other members of the community.
...