diff --git a/lib/policy_service/application.ex b/lib/policy_service/application.ex index c9a8b23..c9a2a41 100644 --- a/lib/policy_service/application.ex +++ b/lib/policy_service/application.ex @@ -11,7 +11,8 @@ defmodule PolicyService.Application do PolicyService.CommandedApp, PolicyService.Handlers.QuoteRequestHandler, PolicyService.Handlers.SolicitationRequestHandler, - # PolicyService.Consumers.QuoteReceivedConsumer, + PolicyService.Consumers.QuoteTaskConsumer, + # PolicyService.Consumers.SolicitationTaskConsumer, # PolicyService.Consumers.PolicyIssuedConsumer, PolicyService.Projectors.PolicyProjector, PolicyServiceWeb.Telemetry, diff --git a/lib/policy_service/consumers/quote_received.ex b/lib/policy_service/consumers/quote_received.ex deleted file mode 100644 index d16fdfb..0000000 --- a/lib/policy_service/consumers/quote_received.ex +++ /dev/null @@ -1,118 +0,0 @@ -defmodule PolicyService.Consumers.QuoteReceivedConsumer do - use GenServer - - require Logger - - alias PolicyService.CommandedApp - alias PolicyService.Commands.CarPolicy - alias PolicyService.Aggregates.PolicyId - - @exchange "policy_service.events.quote_received" - @queue "policy_service.quote_received" - @routing_key "quote.received" - - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end - - def init(_opts) do - amqp_url = Application.fetch_env!(:policy_service, :amqp_url) - - {:ok, conn} = AMQP.Connection.open(amqp_url) - {:ok, channel} = AMQP.Channel.open(conn) - - AMQP.Queue.declare(channel, @queue, durable: true) - AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) - - AMQP.Basic.consume(channel, @queue, nil, no_ack: false) - - Logger.info("QuoteReceivedConsumer started, listening on #{@queue}") - - {:ok, %{conn: conn, channel: channel}} - end - - # --------------------------------------------------------------------------- - # AMQP callbacks - # --------------------------------------------------------------------------- - - def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} - def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} - def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} - - def handle_info({:basic_deliver, payload, %{delivery_tag: tag}}, state) do - case process(payload) do - :ok -> - AMQP.Basic.ack(state.channel, tag) - - {:error, reason} -> - Logger.error("Failed to process quote.received: #{inspect(reason)}") - AMQP.Basic.nack(state.channel, tag, requeue: false) - end - - {:noreply, state} - end - - # --------------------------------------------------------------------------- - # Processing - # --------------------------------------------------------------------------- - - defp process(payload) do - with {:ok, event} <- Jason.decode(payload), - {:ok, cmd} <- build_command(event), - :ok <- CommandedApp.dispatch(cmd, consistency: :strong) do - :ok - end - end - - defp build_command(event) do - case event["policy_type"] do - "car" -> build_car_command(event) - type -> {:error, {:unsupported_policy_type, type}} - end - end - - defp build_car_command(event) do - %{policy_type: policy_type} = PolicyId.parse!(event["id"]) - - case policy_type do - "car" -> - cmd = %CarPolicy.RecordProviderQuote{ - id: PolicyId.parse!(event["id"]), - recorded_by: event["entered_by"], - provider_id: event["provider_id"], - quote_id: event["quote_id"], - valid_until: parse_date(event["valid_until"]), - plans: parse_plans(event["plans"]) - } - - {:ok, cmd} - end - rescue - e -> {:error, e} - end - - defp parse_plans(nil), do: [] - - defp parse_plans(plans) when is_list(plans) do - Enum.map(plans, fn p -> - %{ - plan_id: p["plan_id"], - name: p["name"], - premium: p["premium"], - coverage_details: p["coverage_details"], - deductible: p["deductible"], - coverage_limit: p["coverage_limit"] - } - end) - end - - defp parse_date(nil), do: nil - defp parse_date(%Date{} = d), do: d - - defp parse_date(s) when is_binary(s) do - case Date.from_iso8601(s) do - {:ok, d} -> d - _ -> nil - end - end -end diff --git a/lib/policy_service/consumers/quote_task_consumer.ex b/lib/policy_service/consumers/quote_task_consumer.ex new file mode 100644 index 0000000..1203684 --- /dev/null +++ b/lib/policy_service/consumers/quote_task_consumer.ex @@ -0,0 +1,70 @@ +defmodule PolicyService.Consumers.QuoteTaskConsumer do + use GenServer + + require Logger + + alias PolicyService.CommandedApp + alias PolicyService.Commands.{CarPolicy, FirePolicy} + alias PolicyService.Aggregates.PolicyId + + @exchange "workload_service.events.quote_task_completed" + @queue "policy_service.quote_task_consumer" + @routing_key "quote_task_completed" + + def start_link(opts \\ []), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) + + def init(_opts) do + {:ok, conn} = AMQP.Connection.open(Application.fetch_env!(:policy_service, :amqp_url)) + {:ok, channel} = AMQP.Channel.open(conn) + AMQP.Queue.declare(channel, @queue, durable: true) + AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) + AMQP.Basic.consume(channel, @queue, nil, no_ack: false) + Logger.info("QuoteTaskConsumer started, listening on #{@queue}") + {:ok, %{channel: channel}} + end + + def handle_info({:basic_deliver, payload, %{delivery_tag: tag}}, %{channel: ch} = state) do + with {:ok, event} <- Jason.decode(payload), + {:ok, _} <- dispatch(event) do + AMQP.Basic.ack(ch, tag) + else + {:error, reason} -> + Logger.error("QuoteTaskConsumer failed: #{inspect(reason)}") + AMQP.Basic.nack(ch, tag, requeue: false) + end + {:noreply, state} + end + + def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} + def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} + def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} + + defp dispatch(%{ + "application_id" => %{"org_id" => org_id, "policy_type" => policy_type, "application_id" => app_id}, + "task_info" => %{"provider_id" => provider_id}, + "submission" => %{"quote_id" => quote_id, "recorded_by" => recorded_by, "valid_until" => valid_until, "plans" => plans} + }) do + cmd = case policy_type do + "car" -> %CarPolicy.RecordProviderQuote{ + id: PolicyId.new(org_id, policy_type, app_id), + recorded_by: recorded_by || "system", + provider_id: provider_id, + quote_id: quote_id, + valid_until: parse_date(valid_until), + plans: Enum.map(plans || [], &%{plan_id: &1["plan_id"], name: &1["name"], premium: &1["premium"], coverage_details: &1["coverage_details"]}) + } + "fire" -> %FirePolicy.RecordProviderQuote{ + id: PolicyId.new(org_id, policy_type, app_id), + recorded_by: recorded_by || "system", + provider_id: provider_id, + quote_id: quote_id, + valid_until: parse_date(valid_until), + plans: Enum.map(plans || [], &%{plan_id: &1["plan_id"], name: &1["name"], premium: &1["premium"], coverage_details: &1["coverage_details"]}) + } + end + CommandedApp.dispatch(cmd, consistency: :strong) + end + + defp parse_date(nil), do: nil + defp parse_date(s) when is_binary(s), do: Date.from_iso8601(s) |> elem(1) +end \ No newline at end of file diff --git a/lib/policy_service/events/policy.ex b/lib/policy_service/events/policy.ex index f793735..490b45f 100644 --- a/lib/policy_service/events/policy.ex +++ b/lib/policy_service/events/policy.ex @@ -1,7 +1,6 @@ -defmodule PolicyService.Events.Policy do +defmodule PolicyService.Events do @moduledoc """ - Policy domain events. - Contains helpers for common event functionality. + Events macro for adding JsonDecoder to domain events. """ alias PolicyService.Aggregates.PolicyId @@ -17,90 +16,52 @@ defmodule PolicyService.Events.Policy do end end end +end + +defmodule PolicyService.Events.Policy do + @moduledoc """ + Policy domain events. + """ defmodule PolicyApplicationSubmitted do - use PolicyService.Events.Policy + use PolicyService.Events @derive Jason.Encoder - defstruct [ - :id, - :submitted_by, - :applicant_info, - :policy_details, - :selected_providers, - :submitted_at - ] + defstruct [:id, :submitted_by, :applicant_info, :policy_details, :selected_providers, :submitted_at] end defmodule QuoteRequestSent do - use PolicyService.Events.Policy + use PolicyService.Events @derive Jason.Encoder - defstruct [ - :id, - :provider_id, - :provider_email, - :applicant_info, - :policy_details, - :requested_at - ] + defstruct [:id, :provider_id, :provider_email, :applicant_info, :policy_details, :requested_at] end defmodule ProviderQuoteReceived do - use PolicyService.Events.Policy + use PolicyService.Events @derive Jason.Encoder - defstruct [ - :id, - :recorded_by, - :provider_id, - :quote_id, - :premium, - :coverage_details, - :valid_until, - :plans, - :received_at - ] + defstruct [:id, :recorded_by, :provider_id, :quote_id, :premium, :coverage_details, :valid_until, :plans, :received_at] end defmodule AllQuotesReceived do - use PolicyService.Events.Policy + use PolicyService.Events @derive Jason.Encoder defstruct [:id, :quote_count] end defmodule QuoteAccepted do - use PolicyService.Events.Policy + use PolicyService.Events @derive Jason.Encoder - defstruct [ - :id, - :accepted_by, - :quote, - :plan, - :provider, - :accepted_at - ] + defstruct [:id, :accepted_by, :quote, :plan, :provider, :accepted_at] end defmodule SolicitationSent do - use PolicyService.Events.Policy + use PolicyService.Events @derive Jason.Encoder - defstruct [ - :id, - :solicitation_id, - :provider_id, - :template_id, - :s3_key, - :sent_at - ] + defstruct [:id, :solicitation_id, :provider_id, :template_id, :s3_key, :sent_at] end defmodule PolicyIssued do - use PolicyService.Events.Policy + use PolicyService.Events @derive Jason.Encoder - defstruct [ - :id, - :policy_number, - :effective_date, - :expiry_date, - :issued_at - ] + defstruct [:id, :policy_number, :effective_date, :expiry_date, :issued_at] end end