Some checks failed
Build and Publish / build-release (push) Failing after 35s
96 lines
2.8 KiB
Elixir
96 lines
2.8 KiB
Elixir
defmodule WorkloadService.Consumers.QuoteRequestedConsumer do
|
|
use GenServer
|
|
require Logger
|
|
|
|
alias WorkloadService.CommandedApp
|
|
alias WorkloadService.Commands.QuoteTask
|
|
|
|
@exchange "policy_service.events.quote_requested"
|
|
@queue "policy_service.quote_requested"
|
|
@routing_key "quote.requested"
|
|
|
|
def start_link(opts \\ []) do
|
|
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
|
end
|
|
|
|
def init(_opts) do
|
|
{:ok, conn} = AMQP.Connection.open(amqp_url())
|
|
{:ok, channel} = AMQP.Channel.open(conn)
|
|
|
|
AMQP.Queue.declare(channel, @queue, durable: true)
|
|
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
|
|
{:ok, _tag} = AMQP.Basic.consume(channel, @queue)
|
|
|
|
Logger.info("QuoteRequestedConsumer started, listening on #{@queue}")
|
|
|
|
{:ok, %{channel: channel}}
|
|
end
|
|
|
|
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
|
|
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
|
|
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
|
|
|
|
def handle_info({:basic_deliver, payload, meta}, state) do
|
|
case process(payload) do
|
|
:ok ->
|
|
AMQP.Basic.ack(state.channel, meta.delivery_tag)
|
|
|
|
{:error, reason} ->
|
|
Logger.error("QuoteRequestedConsumer: failed to process: #{inspect(reason)}")
|
|
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false)
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
defp process(payload) do
|
|
with {:ok, event} <- Jason.decode(payload),
|
|
:ok <- handle_event(event) do
|
|
:ok
|
|
end
|
|
end
|
|
|
|
defp handle_event(
|
|
%{
|
|
"id" => application_id,
|
|
"org_id" => org_id,
|
|
"provider_id" => provider_id,
|
|
"policy_type" => policy_type
|
|
} = event
|
|
) do
|
|
task_id = WorkloadService.Aggregates.TaskId.new(org_id, "quote", Ecto.UUID.generate())
|
|
|
|
command = %QuoteTask.CreateTask{
|
|
id: task_id,
|
|
application_id: id,
|
|
provider_id: provider_id,
|
|
provider_name: Map.get(event, "provider_name", ""),
|
|
task_info: %{
|
|
"policy_type" => policy_type,
|
|
"provider_email" => Map.get(event, "provider_email"),
|
|
"applicant_info" => Map.get(event, "applicant_info", %{}),
|
|
"car_details" => Map.get(event, "car_details", %{}),
|
|
"building_details" => Map.get(event, "building_details", %{}),
|
|
"life_details" => Map.get(event, "life_details", %{})
|
|
}
|
|
}
|
|
|
|
case CommandedApp.dispatch(command) do
|
|
:ok ->
|
|
Logger.info("QuoteRequestedConsumer: created task #{task_id}")
|
|
:ok
|
|
|
|
{:error, reason} ->
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
defp handle_event(event) do
|
|
{:error, {:invalid_event, event}}
|
|
end
|
|
|
|
defp amqp_url do
|
|
Application.get_env(:workload_service, :amqp_url, "amqp://guest:guest@localhost:5672")
|
|
end
|
|
end
|