Files
workload-service/lib/workload_service/consumers/quote_requested_consumer.ex
HaimKortovich 01ad2270bc
All checks were successful
Build and Publish / build-release (push) Successful in 1m23s
simplify task api
2026-04-17 11:42:19 -05:00

92 lines
2.6 KiB
Elixir

defmodule WorkloadService.Consumers.QuoteRequestedConsumer do
use GenServer
require Logger
alias WorkloadService.CommandedApp
alias WorkloadService.Commands.QuoteTask
@exchange "policy_service.events.quote_requested"
@queue "policy_service.quote_requested"
@routing_key "quote.requested"
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
{:ok, conn} = AMQP.Connection.open(amqp_url())
{:ok, channel} = AMQP.Channel.open(conn)
AMQP.Queue.declare(channel, @queue, durable: true)
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
{:ok, _tag} = AMQP.Basic.consume(channel, @queue)
Logger.info("QuoteRequestedConsumer started, listening on #{@queue}")
{:ok, %{channel: channel}}
end
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_deliver, payload, meta}, state) do
case process(payload) do
:ok ->
AMQP.Basic.ack(state.channel, meta.delivery_tag)
{:error, reason} ->
Logger.error("QuoteRequestedConsumer: failed to process: #{inspect(reason)}")
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false)
end
{:noreply, state}
end
defp process(payload) do
with {:ok, event} <- Jason.decode(payload),
:ok <- handle_event(event) do
:ok
end
end
defp handle_event(
%{
"id" => %{"org_id" => org_id} = application_id,
"provider_id" => provider_id,
"policy_details" => policy_details,
"applicant_info" => applicant_info
} = event
) do
task_id = WorkloadService.Aggregates.TaskId.new(org_id, "quote", Ecto.UUID.generate())
command = %QuoteTask.CreateTask{
id: task_id,
application_id: application_id,
task_info: %{
"provider_id" => provider_id,
"applicant_info" => applicant_info,
"policy_details" => policy_details,
"provider_email" => Map.get(event, "provider_email")
}
}
case CommandedApp.dispatch(command) do
:ok ->
Logger.info("QuoteRequestedConsumer: created task #{task_id}")
:ok
{:error, reason} ->
{:error, reason}
end
end
defp handle_event(event) do
{:error, {:invalid_event, event}}
end
defp amqp_url do
Application.get_env(:workload_service, :amqp_url, "amqp://guest:guest@localhost:5672")
end
end