defmodule PolicyService.Consumers.QuoteTaskConsumer do use GenServer require Logger alias PolicyService.CommandedApp alias PolicyService.Commands.{CarPolicy, FirePolicy} alias PolicyService.Aggregates.PolicyId @exchange "workload_service.events.quote_task_completed" @queue "policy_service.quote_task_completed" @routing_key "quote_task.completed" def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def init(_opts) do {:ok, conn} = AMQP.Connection.open(Application.fetch_env!(:policy_service, :amqp_url)) {:ok, channel} = AMQP.Channel.open(conn) AMQP.Queue.declare(channel, @queue, durable: true) AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) {:ok, _tag} = AMQP.Basic.consume(channel, @queue) Logger.info("QuoteTaskConsumer started, listening on #{@queue}") {:ok, %{channel: channel}} end def handle_info({:basic_deliver, payload, meta}, state) do :ok = case process(payload) do :ok -> AMQP.Basic.ack(state.channel, meta.delivery_tag) {:error, reason} -> Logger.error("QuoteTaskConsumer: failed to process: #{inspect(reason)}") AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false) end {:noreply, state} end defp process(payload) do with {:ok, event} <- Jason.decode(payload), :ok <- dispatch(event) do :ok end end def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} defp dispatch(%{ "application_id" => %{ "org_id" => org_id, "policy_type" => policy_type, "application_id" => app_id }, "task_info" => %{"provider_id" => provider_id}, "submission" => %{ "quote_id" => quote_id, "recorded_by" => recorded_by, "valid_until" => valid_until, "plans" => plans } }) do cmd = case policy_type do "car" -> %CarPolicy.RecordProviderQuote{ id: PolicyId.new(org_id, policy_type, app_id), recorded_by: recorded_by || "system", provider_id: provider_id, quote_id: quote_id, valid_until: parse_date(valid_until), plans: Enum.map( plans || [], &%{ plan_id: &1["plan_id"], name: &1["name"], premium: &1["premium"], coverage_details: &1["coverage_details"] } ) } "fire" -> %FirePolicy.RecordProviderQuote{ id: PolicyId.new(org_id, policy_type, app_id), recorded_by: recorded_by || "system", provider_id: provider_id, quote_id: quote_id, valid_until: parse_date(valid_until), plans: Enum.map( plans || [], &%{ plan_id: &1["plan_id"], name: &1["name"], premium: &1["premium"], coverage_details: &1["coverage_details"] } ) } end CommandedApp.dispatch(cmd, consistency: :strong) end defp parse_date(nil), do: nil defp parse_date(s) when is_binary(s), do: Date.from_iso8601(s) |> elem(1) end