All checks were successful
Build and Publish / build-release (push) Successful in 1m25s
119 lines
3.3 KiB
Elixir
119 lines
3.3 KiB
Elixir
defmodule PolicyService.Consumers.QuoteReceivedConsumer do
|
|
use GenServer
|
|
|
|
require Logger
|
|
|
|
alias PolicyService.CommandedApp
|
|
alias PolicyService.Commands.CarPolicy
|
|
alias PolicyService.Aggregates.PolicyId
|
|
|
|
@exchange "policy_service.events.quote_received"
|
|
@queue "policy_service.quote_received"
|
|
@routing_key "quote.received"
|
|
|
|
def start_link(opts \\ []) do
|
|
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
|
end
|
|
|
|
def init(_opts) do
|
|
amqp_url = Application.fetch_env!(:policy_service, :amqp_url)
|
|
|
|
{:ok, conn} = AMQP.Connection.open(amqp_url)
|
|
{:ok, channel} = AMQP.Channel.open(conn)
|
|
|
|
AMQP.Queue.declare(channel, @queue, durable: true)
|
|
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
|
|
|
|
AMQP.Basic.consume(channel, @queue, nil, no_ack: false)
|
|
|
|
Logger.info("QuoteReceivedConsumer started, listening on #{@queue}")
|
|
|
|
{:ok, %{conn: conn, channel: channel}}
|
|
end
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AMQP callbacks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
|
|
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
|
|
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
|
|
|
|
def handle_info({:basic_deliver, payload, %{delivery_tag: tag}}, state) do
|
|
case process(payload) do
|
|
:ok ->
|
|
AMQP.Basic.ack(state.channel, tag)
|
|
|
|
{:error, reason} ->
|
|
Logger.error("Failed to process quote.received: #{inspect(reason)}")
|
|
AMQP.Basic.nack(state.channel, tag, requeue: false)
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
defp process(payload) do
|
|
with {:ok, event} <- Jason.decode(payload),
|
|
{:ok, cmd} <- build_command(event),
|
|
:ok <- CommandedApp.dispatch(cmd, consistency: :strong) do
|
|
:ok
|
|
end
|
|
end
|
|
|
|
defp build_command(event) do
|
|
case event["policy_type"] do
|
|
"car" -> build_car_command(event)
|
|
type -> {:error, {:unsupported_policy_type, type}}
|
|
end
|
|
end
|
|
|
|
defp build_car_command(event) do
|
|
%{policy_type: policy_type} = PolicyId.parse!(event["id"])
|
|
|
|
case policy_type do
|
|
"car" ->
|
|
cmd = %CarPolicy.RecordProviderQuote{
|
|
id: PolicyId.parse!(event["id"]),
|
|
recorded_by: event["entered_by"],
|
|
provider_id: event["provider_id"],
|
|
quote_id: event["quote_id"],
|
|
valid_until: parse_date(event["valid_until"]),
|
|
plans: parse_plans(event["plans"])
|
|
}
|
|
|
|
{:ok, cmd}
|
|
end
|
|
rescue
|
|
e -> {:error, e}
|
|
end
|
|
|
|
defp parse_plans(nil), do: []
|
|
|
|
defp parse_plans(plans) when is_list(plans) do
|
|
Enum.map(plans, fn p ->
|
|
%{
|
|
plan_id: p["plan_id"],
|
|
name: p["name"],
|
|
premium: p["premium"],
|
|
coverage_details: p["coverage_details"],
|
|
deductible: p["deductible"],
|
|
coverage_limit: p["coverage_limit"]
|
|
}
|
|
end)
|
|
end
|
|
|
|
defp parse_date(nil), do: nil
|
|
defp parse_date(%Date{} = d), do: d
|
|
|
|
defp parse_date(s) when is_binary(s) do
|
|
case Date.from_iso8601(s) do
|
|
{:ok, d} -> d
|
|
_ -> nil
|
|
end
|
|
end
|
|
end
|