All checks were successful
Build and Publish / build-release (push) Successful in 1m33s
70 lines
2.8 KiB
Elixir
70 lines
2.8 KiB
Elixir
defmodule PolicyService.Consumers.QuoteTaskConsumer do
|
|
use GenServer
|
|
|
|
require Logger
|
|
|
|
alias PolicyService.CommandedApp
|
|
alias PolicyService.Commands.{CarPolicy, FirePolicy}
|
|
alias PolicyService.Aggregates.PolicyId
|
|
|
|
@exchange "workload_service.events.quote_task_completed"
|
|
@queue "policy_service.quote_task_consumer"
|
|
@routing_key "quote_task_completed"
|
|
|
|
def start_link(opts \\ []), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
|
|
|
def init(_opts) do
|
|
{:ok, conn} = AMQP.Connection.open(Application.fetch_env!(:policy_service, :amqp_url))
|
|
{:ok, channel} = AMQP.Channel.open(conn)
|
|
AMQP.Queue.declare(channel, @queue, durable: true)
|
|
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
|
|
AMQP.Basic.consume(channel, @queue, nil, no_ack: false)
|
|
Logger.info("QuoteTaskConsumer started, listening on #{@queue}")
|
|
{:ok, %{channel: channel}}
|
|
end
|
|
|
|
def handle_info({:basic_deliver, payload, %{delivery_tag: tag}}, %{channel: ch} = state) do
|
|
with {:ok, event} <- Jason.decode(payload),
|
|
{:ok, _} <- dispatch(event) do
|
|
AMQP.Basic.ack(ch, tag)
|
|
else
|
|
{:error, reason} ->
|
|
Logger.error("QuoteTaskConsumer failed: #{inspect(reason)}")
|
|
AMQP.Basic.nack(ch, tag, requeue: false)
|
|
end
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
|
|
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
|
|
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
|
|
|
|
defp dispatch(%{
|
|
"application_id" => %{"org_id" => org_id, "policy_type" => policy_type, "application_id" => app_id},
|
|
"task_info" => %{"provider_id" => provider_id},
|
|
"submission" => %{"quote_id" => quote_id, "recorded_by" => recorded_by, "valid_until" => valid_until, "plans" => plans}
|
|
}) do
|
|
cmd = case policy_type do
|
|
"car" -> %CarPolicy.RecordProviderQuote{
|
|
id: PolicyId.new(org_id, policy_type, app_id),
|
|
recorded_by: recorded_by || "system",
|
|
provider_id: provider_id,
|
|
quote_id: quote_id,
|
|
valid_until: parse_date(valid_until),
|
|
plans: Enum.map(plans || [], &%{plan_id: &1["plan_id"], name: &1["name"], premium: &1["premium"], coverage_details: &1["coverage_details"]})
|
|
}
|
|
"fire" -> %FirePolicy.RecordProviderQuote{
|
|
id: PolicyId.new(org_id, policy_type, app_id),
|
|
recorded_by: recorded_by || "system",
|
|
provider_id: provider_id,
|
|
quote_id: quote_id,
|
|
valid_until: parse_date(valid_until),
|
|
plans: Enum.map(plans || [], &%{plan_id: &1["plan_id"], name: &1["name"], premium: &1["premium"], coverage_details: &1["coverage_details"]})
|
|
}
|
|
end
|
|
CommandedApp.dispatch(cmd, consistency: :strong)
|
|
end
|
|
|
|
defp parse_date(nil), do: nil
|
|
defp parse_date(s) when is_binary(s), do: Date.from_iso8601(s) |> elem(1)
|
|
end |