remove policy issued
All checks were successful
Build and Publish / build-release (push) Successful in 2m24s
All checks were successful
Build and Publish / build-release (push) Successful in 2m24s
This commit is contained in:
@@ -15,7 +15,6 @@ defmodule PolicyService.Application do
|
|||||||
PolicyService.Handlers.SolicitationRequestHandler,
|
PolicyService.Handlers.SolicitationRequestHandler,
|
||||||
PolicyService.Consumers.QuoteTaskConsumer,
|
PolicyService.Consumers.QuoteTaskConsumer,
|
||||||
PolicyService.Consumers.SolicitationTaskConsumer,
|
PolicyService.Consumers.SolicitationTaskConsumer,
|
||||||
PolicyService.Consumers.PolicyIssuedConsumer,
|
|
||||||
PolicyService.Projectors.PolicyProjector,
|
PolicyService.Projectors.PolicyProjector,
|
||||||
PolicyServiceWeb.Telemetry,
|
PolicyServiceWeb.Telemetry,
|
||||||
PolicyService.Repo,
|
PolicyService.Repo,
|
||||||
|
|||||||
@@ -1,72 +0,0 @@
|
|||||||
defmodule PolicyService.Consumers.PolicyIssuedConsumer do
|
|
||||||
use GenServer
|
|
||||||
require Logger
|
|
||||||
|
|
||||||
alias PolicyService.CommandedApp
|
|
||||||
alias PolicyService.Commands.CarPolicy
|
|
||||||
alias PolicyService.Aggregates.PolicyId
|
|
||||||
|
|
||||||
@exchange "policy_service.events.policy_issued"
|
|
||||||
@queue "policy_service.policy_issued"
|
|
||||||
@routing_key "policy.issued"
|
|
||||||
|
|
||||||
def start_link(_opts), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
|
|
||||||
|
|
||||||
def init(_) do
|
|
||||||
{:ok, conn} = AMQP.Connection.open(amqp_url())
|
|
||||||
{:ok, channel} = AMQP.Channel.open(conn)
|
|
||||||
|
|
||||||
AMQP.Queue.declare(channel, @queue, durable: true)
|
|
||||||
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
|
|
||||||
AMQP.Basic.qos(channel, prefetch_count: 10)
|
|
||||||
{:ok, _tag} = AMQP.Basic.consume(channel, @queue)
|
|
||||||
|
|
||||||
{:ok, %{channel: channel}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
|
|
||||||
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
|
|
||||||
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
|
|
||||||
|
|
||||||
def handle_info({:basic_deliver, payload, meta}, state) do
|
|
||||||
case Jason.decode(payload) do
|
|
||||||
{:ok, event} ->
|
|
||||||
process(event, meta, state)
|
|
||||||
|
|
||||||
{:error, _} ->
|
|
||||||
Logger.error("PolicyIssuedConsumer: failed to decode payload")
|
|
||||||
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false)
|
|
||||||
end
|
|
||||||
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
defp process(event, meta, state) do
|
|
||||||
%{policy_type: policy_type} = PolicyId.parse!(event["id"])
|
|
||||||
|
|
||||||
command =
|
|
||||||
case policy_type do
|
|
||||||
"car" ->
|
|
||||||
%CarPolicy.RecordPolicyIssued{
|
|
||||||
id: event["id"],
|
|
||||||
provider_policy_number: event["provider_policy_number"],
|
|
||||||
effective_date: event["effective_date"],
|
|
||||||
expiry_date: event["expiry_date"],
|
|
||||||
issued_at: DateTime.utc_now()
|
|
||||||
}
|
|
||||||
end
|
|
||||||
|
|
||||||
case CommandedApp.dispatch(command) do
|
|
||||||
:ok ->
|
|
||||||
AMQP.Basic.ack(state.channel, meta.delivery_tag)
|
|
||||||
|
|
||||||
{:error, reason} ->
|
|
||||||
Logger.error("PolicyIssuedConsumer: dispatch failed: #{inspect(reason)}")
|
|
||||||
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: true)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp amqp_url do
|
|
||||||
Application.get_env(:policy_service, :amqp_url, "amqp://guest:guest@localhost:5672")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
Reference in New Issue
Block a user