This commit is contained in:
@@ -28,22 +28,27 @@ defmodule PolicyService.Consumers.QuoteTaskConsumer do
|
|||||||
{:ok, %{channel: channel}}
|
{:ok, %{channel: channel}}
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_info({:basic_deliver, payload, %{delivery_tag: tag}}, %{channel: ch} = state) do
|
def handle_info({:basic_deliver, payload, meta}, state) do
|
||||||
with {:ok, event} <- Jason.decode(payload),
|
:ok =
|
||||||
:ok <- dispatch(event) do
|
case process(payload) do
|
||||||
AMQP.Basic.ack(ch, tag)
|
:ok ->
|
||||||
else
|
AMQP.Basic.ack(state.channel, meta.delivery_tag)
|
||||||
{:ok, _} ->
|
|
||||||
AMQP.Basic.ack(ch, tag)
|
|
||||||
|
|
||||||
{:error, reason} ->
|
{:error, reason} ->
|
||||||
Logger.error("QuoteTaskConsumer failed: #{inspect(reason)}")
|
Logger.error("QuoteTaskConsumer: failed to process: #{inspect(reason)}")
|
||||||
AMQP.Basic.nack(ch, tag, requeue: false)
|
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false)
|
||||||
end
|
end
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp process(payload) do
|
||||||
|
with {:ok, event} <- Jason.decode(payload),
|
||||||
|
:ok <- dispatch(event) do
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
|
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
|
||||||
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
|
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
|
||||||
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
|
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
|
||||||
|
|||||||
Reference in New Issue
Block a user