diff --git a/lib/policy_service/consumers/quote_task_consumer.ex b/lib/policy_service/consumers/quote_task_consumer.ex index 96af53a..84f3e17 100644 --- a/lib/policy_service/consumers/quote_task_consumer.ex +++ b/lib/policy_service/consumers/quote_task_consumer.ex @@ -28,22 +28,27 @@ defmodule PolicyService.Consumers.QuoteTaskConsumer do {:ok, %{channel: channel}} end - def handle_info({:basic_deliver, payload, %{delivery_tag: tag}}, %{channel: ch} = state) do - with {:ok, event} <- Jason.decode(payload), - :ok <- dispatch(event) do - AMQP.Basic.ack(ch, tag) - else - {:ok, _} -> - AMQP.Basic.ack(ch, tag) + def handle_info({:basic_deliver, payload, meta}, state) do + :ok = + case process(payload) do + :ok -> + AMQP.Basic.ack(state.channel, meta.delivery_tag) - {:error, reason} -> - Logger.error("QuoteTaskConsumer failed: #{inspect(reason)}") - AMQP.Basic.nack(ch, tag, requeue: false) - end + {:error, reason} -> + Logger.error("QuoteTaskConsumer: failed to process: #{inspect(reason)}") + AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false) + end {:noreply, state} end + defp process(payload) do + with {:ok, event} <- Jason.decode(payload), + :ok <- dispatch(event) do + :ok + end + end + def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}