+1 vote

4 Answers

+1 vote
by
B1: Tạo services để ghi nhận status vào topic kafka

public class RequestStatusService
{
    private readonly ConcurrentDictionary<string, RequestStatus> _requestStatuses = new ConcurrentDictionary<string, RequestStatus>();

    public void UpdateRequestStatus(string requestId, string status, string message)
    {
        if (_requestStatuses.ContainsKey(requestId))
        {
            _requestStatuses[requestId].Status = status;
            _requestStatuses[requestId].Message = message;
        }
    }
}
+1 vote
by
B2: Tạo action để xử lý gửi mail và ghi nhận trạng thái

public async Task HandleEmailAsync(EmailRequest emailRequest)
{
    SmtpClient smtpClient = new SmtpClient();
    MailMessage mail = new MailMessage();
    try
    {
        //// Logic gửi email
        mail.From = new MailAddress(FromAddress);
        mail.To.Add(emailRequest.To);
        mail.Subject = emailRequest.Subject;
        mail.Body = emailRequest.Body;
        mail.IsBodyHtml = true;
        smtpClient.Host = Host;
        smtpClient.EnableSsl = false;
        smtpClient.UseDefaultCredentials = false;
        smtpClient.Credentials = new NetworkCredential(User, Pass);
        smtpClient.Send(mail);

        // Gửi email thành công
        await UpdateRequestStatus(emailRequest.RequestId, "Completed", "Email sent successfully.");
    }
    catch (Exception ex)
    {
        // Gửi email thất bại
        await UpdateRequestStatus(emailRequest.RequestId, "Failed", ex.Message);
    }
}

private async Task UpdateRequestStatus(string requestId, string status, string message)
{
    // Cập nhật trạng thái vào topic Kafka "email_status_updates"
    var updateMessage = new
    {
        RequestId = requestId,
        Status = status,
        Message = message
    };

    var messageJson = JsonSerializer.Serialize(updateMessage);
    await _producer.ProduceAsync("email_status_updates", new Message<string, string> { Key = requestId, Value = messageJson });

    // Cập nhật trạng thái trong RequestStatusService (nếu cần)
    _requestStatusService.UpdateRequestStatus(requestId, status, message);
}
+1 vote
by
edited
B3: Tạo background service để lắng nghe khi topic có phát sinh message mới và xử lý

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    _logger.LogInformation($"Starting email consumer service for topic {_topic}");
    _consumer.Subscribe(_topic);
    try
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                _logger.LogInformation("Waiting for messages...");
                var result = _consumer.Consume(stoppingToken);
                if (result != null)
                {
                    _logger.LogInformation($"Message received: {result.Message.Value}");
                    var emailRequest = JsonSerializer.Deserialize<EmailRequest>(result.Message.Value);
                    if (emailRequest != null)
                    {
                        _logger.LogInformation($"Handling email request {emailRequest.RequestId}");
                        await _emailHandler.HandleEmailAsync(emailRequest);
                    }
                    else
                    {
                        _logger.LogWarning("Received null email request");
                    }
                }
            }
            catch (ConsumeException ex)
            {
                _logger.LogError($"Consume error: {ex.Error.Reason}");
            }
            catch (OperationCanceledException)
            {
                _logger.LogInformation("Cancellation requested, closing consumer");
                _consumer.Close();
            }
            catch (Exception ex)
            {
                _logger.LogError($"Error handling email request: {ex.Message}");
            }
        }
    }
    catch (Exception ex)
    {
        _logger.LogError($"An error occurred in the email consumer service: {ex.Message}");
    }
}
+1 vote
by
edited
B4: Đăng ký background service trong program.cs

builder.Services.AddHostedService(provider =>
    new EmailConsumerService(
        provider.GetRequiredService<ILogger<EmailConsumerService>>(),
        bootstrapServers,
        groupId,
        emailTopic,
        provider.GetRequiredService<EmailHandler>(),
        provider.GetRequiredService<RequestStatusService>()
    ));
Welcome to Qtsd Q&A, where you can ask questions and receive answers from other members of the community.
...