add solictation consumer
All checks were successful
Build and Publish / build-release (push) Successful in 1m38s

This commit is contained in:
2026-04-23 10:32:20 -05:00
parent 33bba5b453
commit c8a58c3f58
5 changed files with 108 additions and 5 deletions

View File

@@ -158,7 +158,8 @@ defmodule PolicyService.Aggregates.PolicyApplication do
}, },
%SolicitationRequestSent{ %SolicitationRequestSent{
id: agg.id, id: agg.id,
plan: result.plan plan: result.plan,
provider_id: result.provider
} }
] ]
end end

View File

@@ -12,7 +12,7 @@ defmodule PolicyService.Application do
PolicyService.Handlers.QuoteRequestHandler, PolicyService.Handlers.QuoteRequestHandler,
PolicyService.Handlers.SolicitationRequestHandler, PolicyService.Handlers.SolicitationRequestHandler,
PolicyService.Consumers.QuoteTaskConsumer, PolicyService.Consumers.QuoteTaskConsumer,
# PolicyService.Consumers.SolicitationTaskConsumer, PolicyService.Consumers.SolicitationTaskConsumer,
# PolicyService.Consumers.PolicyIssuedConsumer, # PolicyService.Consumers.PolicyIssuedConsumer,
PolicyService.Projectors.PolicyProjector, PolicyService.Projectors.PolicyProjector,
PolicyServiceWeb.Telemetry, PolicyServiceWeb.Telemetry,

View File

@@ -0,0 +1,87 @@
defmodule PolicyService.Consumers.SolicitationTaskConsumer do
use GenServer
require Logger
alias PolicyService.CommandedApp
alias PolicyService.Commands.CarPolicy
alias PolicyService.Aggregates.PolicyId
@exchange "workload_service.events.solicitation_task_completed"
@queue "policy_service.solicitation_task_completed"
@routing_key "solicitation_task.completed"
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
{:ok, conn} = AMQP.Connection.open(Application.fetch_env!(:policy_service, :amqp_url))
{:ok, channel} = AMQP.Channel.open(conn)
AMQP.Queue.declare(channel, @queue, durable: true)
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
{:ok, _tag} = AMQP.Basic.consume(channel, @queue)
Logger.info("SolicitationTaskConsumer started, listening on #{@queue}")
{:ok, %{channel: channel}}
end
def handle_info({:basic_deliver, payload, meta}, state) do
:ok =
case process(payload) do
:ok ->
AMQP.Basic.ack(state.channel, meta.delivery_tag)
{:error, reason} ->
Logger.error("SolicitationTaskConsumer: failed to process: #{inspect(reason)}")
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false)
end
{:noreply, state}
end
defp process(payload) do
with {:ok, event} <- Jason.decode(payload),
:ok <- dispatch(event) do
:ok
end
end
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
defp dispatch(%{
"application_id" => %{
"org_id" => org_id,
"policy_type" => policy_type,
"application_id" => app_id
},
"task_info" => task_info,
"submission" => submission
}) do
cmd =
case policy_type do
"car" ->
%CarPolicy.RecordPolicyIssued{
id: PolicyId.new(org_id, policy_type, app_id),
provider_policy_number: Map.get(submission, "provider_policy_number"),
effective_date: Map.get(submission, "effective_date"),
expiry_date: Map.get(submission, "expiry_date"),
issued_at:
Map.get(submission, "issued_at") || DateTime.utc_now() |> DateTime.to_iso8601()
}
end
case CommandedApp.dispatch(cmd, consistency: :strong) do
:ok ->
Logger.info("SolicitationTaskConsumer: issued policy for #{app_id}")
:ok
{:error, reason} ->
Logger.error("SolicitationTaskConsumer: failed to issue policy - #{inspect(reason)}")
{:error, reason}
end
end
end

View File

@@ -56,7 +56,7 @@ defmodule PolicyService.Events.Policy do
defmodule SolicitationRequestSent do defmodule SolicitationRequestSent do
use PolicyService.Events use PolicyService.Events
@derive Jason.Encoder @derive Jason.Encoder
defstruct [:id, :plan] defstruct [:id, :plan, :provider_id]
end end
defmodule PolicyIssued do defmodule PolicyIssued do

View File

@@ -155,14 +155,29 @@ rawResources:
name: rabbitmq name: rabbitmq
namespace: rabbitmq namespace: rabbitmq
exchange-solictation-requested: exchange-solicitation-requested:
enabled: true enabled: true
apiVersion: rabbitmq.com/v1beta1 apiVersion: rabbitmq.com/v1beta1
kind: Exchange kind: Exchange
suffix: exchange-solicitation-requested suffix: exchange-solicitation-requested
spec: spec:
spec: spec:
name: policy_service.events.solictation_requested name: policy_service.events.solicitation_requested
type: topic
durable: true
vhost: "application"
rabbitmqClusterReference:
name: rabbitmq
namespace: rabbitmq
exchange-solicitation-task-completed:
enabled: true
apiVersion: rabbitmq.com/v1beta1
kind: Exchange
suffix: exchange-solicitation-task-completed
spec:
spec:
name: workload_service.events.solicitation_task_completed
type: topic type: topic
durable: true durable: true
vhost: "application" vhost: "application"