Files
policy-service/lib/policy_service/consumers/quote_task_consumer.ex
HaimKortovich 5a98549a24
All checks were successful
Build and Publish / build-release (push) Successful in 1m27s
consume commands correctly
2026-04-27 14:14:19 -05:00

155 lines
4.7 KiB
Elixir

defmodule PolicyService.Consumers.QuoteTaskConsumer do
use GenServer
require Logger
alias PolicyService.CommandedApp
alias PolicyService.Commands.{CarPolicy, LifePolicy, FireStructurePolicy, FireContentsPolicy}
alias PolicyService.Aggregates.PolicyId
@exchange "workload_service.events.quote_task_completed"
@queue "policy_service.quote_task_completed"
@routing_key "quote_task.completed"
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
{:ok, conn} = AMQP.Connection.open(Application.fetch_env!(:policy_service, :amqp_url))
{:ok, channel} = AMQP.Channel.open(conn)
AMQP.Queue.declare(channel, @queue, durable: true)
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
{:ok, _tag} = AMQP.Basic.consume(channel, @queue)
Logger.info("QuoteTaskConsumer started, listening on #{@queue}")
{:ok, %{channel: channel}}
end
def handle_info({:basic_deliver, payload, meta}, state) do
:ok =
case process(payload) do
:ok ->
AMQP.Basic.ack(state.channel, meta.delivery_tag)
{:error, reason} ->
Logger.error("QuoteTaskConsumer: failed to process: #{inspect(reason)}")
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false)
end
{:noreply, state}
end
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
defp process(payload) do
with {:ok, event} <- Jason.decode(payload),
:ok <- dispatch(event) do
:ok
end
end
defp dispatch(%{
"application_id" => %{
"org_id" => org_id,
"policy_type" => policy_type,
"application_id" => app_id
},
"task_info" => %{"provider_id" => provider_id},
"submission" => %{
"quote_id" => quote_id,
"recorded_by" => recorded_by,
"valid_until" => valid_until,
"plans" => plans
}
}) do
cmd =
case policy_type do
"car" ->
%CarPolicy.RecordProviderQuote{
id: PolicyId.new(org_id, policy_type, app_id),
recorded_by: recorded_by || "system",
provider_id: provider_id,
quote_id: quote_id,
valid_until: parse_date(valid_until),
plans:
Enum.map(
plans || [],
&%{
plan_id: &1["plan_id"],
name: &1["name"],
premium: &1["premium"],
coverage_details: &1["coverage_details"]
}
)
}
"life" ->
%LifePolicy.RecordProviderQuote{
id: PolicyId.new(org_id, policy_type, app_id),
recorded_by: recorded_by || "system",
provider_id: provider_id,
quote_id: quote_id,
valid_until: parse_date(valid_until),
plans:
Enum.map(
plans || [],
&%{
plan_id: &1["plan_id"],
name: &1["name"],
premium: &1["premium"],
coverage_details: &1["coverage_details"]
}
)
}
"fire_structure" ->
%FireStructurePolicy.RecordProviderQuote{
id: PolicyId.new(org_id, policy_type, app_id),
recorded_by: recorded_by || "system",
provider_id: provider_id,
quote_id: quote_id,
valid_until: parse_date(valid_until),
plans:
Enum.map(
plans || [],
&%{
plan_id: &1["plan_id"],
name: &1["name"],
premium: &1["premium"],
coverage_details: &1["coverage_details"]
}
)
}
"fire_contents" ->
%FireContentsPolicy.RecordProviderQuote{
id: PolicyId.new(org_id, policy_type, app_id),
recorded_by: recorded_by || "system",
provider_id: provider_id,
quote_id: quote_id,
valid_until: parse_date(valid_until),
plans:
Enum.map(
plans || [],
&%{
plan_id: &1["plan_id"],
name: &1["name"],
premium: &1["premium"],
coverage_details: &1["coverage_details"]
}
)
}
end
CommandedApp.dispatch(cmd, consistency: :strong)
end
defp parse_date(nil), do: nil
defp parse_date(s) when is_binary(s), do: Date.from_iso8601(s) |> elem(1)
end