diff --git a/lib/workload_service/aggregates/quote_task.ex b/lib/workload_service/aggregates/quote_task.ex index 7b1f421..234d0ec 100644 --- a/lib/workload_service/aggregates/quote_task.ex +++ b/lib/workload_service/aggregates/quote_task.ex @@ -4,7 +4,66 @@ defmodule WorkloadService.Aggregates.QuoteTask do commands: WorkloadService.Commands.QuoteTask, submission_type: map() - def validate_submission(_) do - :ok + def validate_submission(submission) when is_map(submission) do + with {:ok, quote_id} <- validate_required_string(submission, "quote_id"), + {:ok, recorded_by} <- validate_required_string(submission, "recorded_by"), + {:ok, valid_until} <- validate_date_string(submission, "valid_until"), + {:ok, plans} <- validate_plans(submission["plans"]) do + :ok + end end + + def validate_submission(_), do: {:error, :invalid_submission_format} + + defp validate_required_string(submission, field) do + case Map.get(submission, field) do + nil -> {:error, {:missing_field, field}} + value when is_binary(value) and byte_size(value) > 0 -> {:ok, value} + _ -> {:error, {:invalid_field, field}} + end + end + + defp validate_date_string(submission, field) do + case Map.get(submission, field) do + nil -> {:error, {:missing_field, field}} + value when is_binary(value) -> + case Date.from_iso8601(value) do + {:ok, _date} -> {:ok, value} + {:error, _} -> {:error, {:invalid_date_format, field}} + end + _ -> {:error, {:invalid_field, field}} + end + end + + defp validate_plans(plans) when is_list(plans) do + if length(plans) == 0 do + {:error, :no_plans_provided} + else + case Enum.reduce_while(plans, :ok, fn plan, _acc -> + validate_plan(plan) + end) do + :ok -> {:ok, plans} + error -> error + end + end + end + + defp validate_plans(_), do: {:error, :invalid_plans_format} + + defp validate_plan(plan) when is_map(plan) do + with {:ok, plan_id} <- validate_required_string(plan, "plan_id"), + {:ok, name} <- validate_required_string(plan, "name"), + {:ok, premium} <- validate_premium(plan["premium"]), + {:ok, coverage_details} <- validate_coverage_details(plan["coverage_details"]) do + {:cont, :ok} + end + end + + defp validate_plan(_), do: {:halt, {:error, :invalid_plan_format}} + + defp validate_premium(premium) when is_number(premium) and premium > 0, do: {:ok, premium} + defp validate_premium(_), do: {:error, :invalid_premium} + + defp validate_coverage_details(details) when is_map(details), do: {:ok, details} + defp validate_coverage_details(_), do: {:error, :invalid_coverage_details} end diff --git a/lib/workload_service/aggregates/solicitation_task.ex b/lib/workload_service/aggregates/solicitation_task.ex index 4c53478..c2714d4 100644 --- a/lib/workload_service/aggregates/solicitation_task.ex +++ b/lib/workload_service/aggregates/solicitation_task.ex @@ -4,7 +4,47 @@ defmodule WorkloadService.Aggregates.SolicitationTask do commands: WorkloadService.Commands.SolicitationTask, submission_type: map() - def validate_submission(_) do - :ok + def validate_submission(submission) when is_map(submission) do + with {:ok, provider_policy_number} <- validate_required_string(submission, "provider_policy_number"), + {:ok, effective_date} <- validate_date_string(submission, "effective_date"), + {:ok, expiry_date} <- validate_date_string(submission, "expiry_date"), + {:ok, _} <- validate_expiry_after_effective(effective_date, expiry_date) do + :ok + end + end + + def validate_submission(_), do: {:error, :invalid_submission_format} + + defp validate_required_string(submission, field) do + case Map.get(submission, field) do + nil -> {:error, {:missing_field, field}} + value when is_binary(value) and byte_size(value) > 0 -> {:ok, value} + _ -> {:error, {:invalid_field, field}} + end + end + + defp validate_date_string(submission, field) do + case Map.get(submission, field) do + nil -> {:error, {:missing_field, field}} + value when is_binary(value) -> + case Date.from_iso8601(value) do + {:ok, _date} -> {:ok, value} + {:error, _} -> {:error, {:invalid_date_format, field}} + end + _ -> {:error, {:invalid_field, field}} + end + end + + defp validate_expiry_after_effective(effective_date, expiry_date) do + case {Date.from_iso8601(effective_date), Date.from_iso8601(expiry_date)} do + {{:ok, effective}, {:ok, expiry}} -> + if Date.compare(expiry, effective) == :gt do + {:ok, :valid_date_range} + else + {:error, :expiry_must_be_after_effective} + end + _ -> + {:error, :invalid_date_comparison} + end end end diff --git a/lib/workload_service/application.ex b/lib/workload_service/application.ex index f19090e..705067e 100644 --- a/lib/workload_service/application.ex +++ b/lib/workload_service/application.ex @@ -8,8 +8,8 @@ defmodule WorkloadService.Application do children = [ WorkloadService.CommandedApp, WorkloadService.Consumers.QuoteRequestedConsumer, + WorkloadService.Consumers.SolicitationRequestedConsumer, WorkloadService.Handlers.TaskCompletedHandler, - # WorkloadService.Consumers.SolicitationRequestedConsumer, WorkloadService.Projectors.TaskProjector, WorkloadService.Repo, WorkloadServiceWeb.Telemetry, diff --git a/lib/workload_service/consumers/solicitation_requested_consumer.ex b/lib/workload_service/consumers/solicitation_requested_consumer.ex index de3a4db..bd6ad2d 100644 --- a/lib/workload_service/consumers/solicitation_requested_consumer.ex +++ b/lib/workload_service/consumers/solicitation_requested_consumer.ex @@ -1,137 +1,96 @@ -# defmodule WorkloadService.Consumers.SolicitationRequestedConsumer do -# use GenServer -# require Logger +defmodule WorkloadService.Consumers.SolicitationRequestedConsumer do + use GenServer + require Logger -# alias WorkloadService.CommandedApp -# alias WorkloadService.Commands.SolicitationTask + alias WorkloadService.CommandedApp + alias WorkloadService.Commands.SolicitationTask -# @exchange "workload_service.events.solicitation_requested" -# @queue "workload_service.solicitation_requested" -# @routing_key "solicitation.requested" + @exchange "policy_service.events.solicitation_requested" + @queue "workload_service.solicitation_requested" + @routing_key "solicitation.requested" -# @provider_service_url "http://localhost:4002" -# @solicitation_service_url "http://localhost:8081" + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end -# def start_link(opts \\ []) do -# GenServer.start_link(__MODULE__, opts, name: __MODULE__) -# end + def init(_opts) do + {:ok, conn} = AMQP.Connection.open(amqp_url()) + {:ok, channel} = AMQP.Channel.open(conn) -# def init(_opts) do -# {:ok, conn} = AMQP.Connection.open(amqp_url()) -# {:ok, channel} = AMQP.Channel.open(conn) + {:ok, _} = AMQP.Queue.declare(channel, @queue, durable: true) + :ok = AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) + {:ok, _tag} = AMQP.Basic.consume(channel, @queue) -# AMQP.Queue.declare(channel, @queue, durable: true) -# AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) -# {:ok, _tag} = AMQP.Basic.consume(channel, @queue) + Logger.info("SolicitationRequestedConsumer started, listening on #{@queue}") -# Logger.info("SolicitationRequestedConsumer started, listening on #{@queue}") + {:ok, %{channel: channel}} + end -# {:ok, %{channel: channel}} -# end + def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} + def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} + def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} -# def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} -# def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} -# def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} + def handle_info({:basic_deliver, payload, meta}, state) do + :ok = + case process(payload) do + :ok -> + AMQP.Basic.ack(state.channel, meta.delivery_tag) -# def handle_info({:basic_deliver, payload, meta}, state) do -# case process(payload) do -# :ok -> -# AMQP.Basic.ack(state.channel, meta.delivery_tag) + {:error, reason} -> + Logger.error("SolicitationRequestedConsumer: failed to process: #{inspect(reason)}") + AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false) + end -# {:error, reason} -> -# Logger.error("SolicitationRequestedConsumer: failed to process: #{inspect(reason)}") -# AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false) -# end + {:noreply, state} + end -# {:noreply, state} -# end + defp process(payload) do + with {:ok, event} <- Jason.decode(payload), + :ok <- handle_event(event) do + :ok + end + end -# defp process(payload) do -# with {:ok, event} <- Jason.decode(payload), -# {:ok, provider} <- get_provider(event["provider_id"]), -# {:ok, template} <- get_active_template(provider, event["policy_type"]), -# {:ok, result} <- generate_solicitation(event, template), -# :ok <- create_task(event, result) do -# :ok -# end -# end + defp handle_event( + %{ + "id" => %{"org_id" => org_id, "application_id" => app_id, "policy_type" => policy_type}, + "plan" => plan, + "provider_id" => provider_id + } = event + ) do + task_id = WorkloadService.Aggregates.TaskId.new(org_id, "solicitation", Ecto.UUID.generate()) + app_id_struct = WorkloadService.Aggregates.ApplicationId.new(org_id, app_id, policy_type) -# defp get_provider(provider_id) do -# url = "#{@provider_service_url}/api/v1/providers/#{provider_id}" + command = %SolicitationTask.CreateTask{ + id: task_id, + application_id: app_id_struct, + attachments: [], + task_info: %{ + "provider_id" => provider_id, + "plan_id" => Map.get(plan, "plan_id"), + "plan_name" => Map.get(plan, "name"), + "premium" => Map.get(plan, "premium"), + "coverage_details" => Map.get(plan, "coverage_details") + } + } -# case Req.get(url) do -# {:ok, %{status: 200, body: %{"data" => provider}}} -> {:ok, provider} -# {:ok, %{status: 404}} -> {:error, :provider_not_found} -# {:error, reason} -> {:error, reason} -# end -# end + case CommandedApp.dispatch(command) do + :ok -> + Logger.info("SolicitationRequestedConsumer: created task #{task_id}") + :ok -# defp get_active_template(provider, policy_type) do -# templates = get_in(provider, ["templates", policy_type]) || [] -# default_id = get_in(provider, ["default_templates", policy_type]) + {:error, reason} -> + Logger.error("SolicitationRequestedConsumer: failed to create task - #{inspect(reason)}") + {:error, reason} + end + end -# template = -# if default_id do -# Enum.find(templates, &(&1["template_id"] == default_id)) -# else -# Enum.find(templates, &(&1["active"] == true)) -# end + defp handle_event(event) do + Logger.warning("SolicitationRequestedConsumer: unhandled event #{inspect(event)}") + {:error, {:invalid_event, event}} + end -# case template do -# nil -> {:error, :no_active_template} -# t -> {:ok, t} -# end -# end - -# defp generate_solicitation(event, template) do -# url = "#{@solicitation_service_url}/api/solicitations/generate" - -# body = %{ -# org_id: event["org_id"], -# id: event["id"], -# template_document_url: template["document_url"], -# fields: event["fields"] || %{} -# } - -# case Req.post(url, json: body) do -# {:ok, %{status: 200, body: result}} -> -# {:ok, result} - -# {:ok, %{status: status, body: body}} -> -# {:error, "solicitation_service returned #{status}: #{inspect(body)}"} - -# {:error, reason} -> -# {:error, reason} -# end -# end - -# defp create_task(event, result) do -# org_id = event["org_id"] -# task_id = WorkloadService.Aggregates.TaskId.new(org_id, "solicitation", Ecto.UUID.generate()) - -# command = %SolicitationTask.CreateTask{ -# id: task_id, -# application_id: event["id"], -# provider_id: event["provider_id"], -# provider_name: event["provider_name"] || "", -# task_info: %{ -# "quote" => event["quote"], -# "plan" => event["plan"] -# }, -# attachments: [result["document_url"]] |> Enum.filter(& &1) -# } - -# case CommandedApp.dispatch(command) do -# :ok -> -# Logger.info("SolicitationRequestedConsumer: created task #{task_id}") -# :ok - -# {:error, reason} -> -# {:error, reason} -# end -# end - -# defp amqp_url do -# Application.get_env(:workload_service, :amqp_url, "amqp://guest:guest@localhost:5672") -# end -# end + defp amqp_url do + Application.get_env(:workload_service, :amqp_url, "amqp://guest:guest@localhost:5672") + end +end diff --git a/lib/workload_service/handlers/task_completed_handler.ex b/lib/workload_service/handlers/task_completed_handler.ex index cf388c3..ccddca2 100644 --- a/lib/workload_service/handlers/task_completed_handler.ex +++ b/lib/workload_service/handlers/task_completed_handler.ex @@ -14,11 +14,11 @@ defmodule WorkloadService.Handlers.TaskCompletedHandler do SolicitationTask } - def handle(%TaskCompleted{} = event, _metadata) do +def handle(%TaskCompleted{} = event, _metadata) do aggregate_module = case event.id.type do "quote" -> {:ok, QuoteTask} - # "solicitation" -> SolicitationTask + "solicitation" -> {:ok, SolicitationTask} _ -> {:error, "aggregate module not found for event type #{event.id}"} end @@ -36,9 +36,21 @@ defmodule WorkloadService.Handlers.TaskCompletedHandler do Logger.warning("TaskCompletedHandler: aggregate not found for #{event.id}") state -> + exchange = + case event.id.type do + "quote" -> "workload_service.events.quote_task_completed" + "solicitation" -> "workload_service.events.solicitation_task_completed" + end + + routing_key = + case event.id.type do + "quote" -> "quote_task.completed" + "solicitation" -> "solicitation_task.completed" + end + MessageBus.publish( - "workload_service.events.quote_task_completed", - "quote_task.completed", + exchange, + routing_key, state ) diff --git a/lib/workload_service_web/controllers/task_controller.ex b/lib/workload_service_web/controllers/task_controller.ex index 75fa96f..62004cf 100644 --- a/lib/workload_service_web/controllers/task_controller.ex +++ b/lib/workload_service_web/controllers/task_controller.ex @@ -13,7 +13,7 @@ defmodule WorkloadServiceWeb.TaskController do operation(:list, summary: "List tasks", parameters: QueryHelpers.flop( - [:status, :application_id], + [:status, :application_id, :policy_type], [:created_at, :updated_at, :status] ), responses: [ @@ -122,6 +122,9 @@ defmodule WorkloadServiceWeb.TaskController do command = %WorkloadService.Commands.SolicitationTask.SubmitResponse{ id: task_id, submission: %{ + "provider_policy_number" => params["provider_policy_number"], + "effective_date" => params["effective_date"], + "expiry_date" => params["expiry_date"], "recorded_by" => params["recorded_by"] || "system" }, attachments: [] @@ -248,6 +251,7 @@ defmodule WorkloadServiceWeb.TaskController do id: t.id, org_id: t.org_id, application_id: t.application_id, + policy_type: t.policy_type, task_info: t.task_info, status: t.status, created_at: t.inserted_at @@ -259,6 +263,7 @@ defmodule WorkloadServiceWeb.TaskController do id: t.id, org_id: t.org_id, application_id: t.application_id, + policy_type: t.policy_type, task_info: t.task_info, submission: t.submission, attachments: t.attachments, diff --git a/lib/workload_service_web/schemas/task.ex b/lib/workload_service_web/schemas/task.ex index 2721ac1..8fba301 100644 --- a/lib/workload_service_web/schemas/task.ex +++ b/lib/workload_service_web/schemas/task.ex @@ -24,29 +24,96 @@ defmodule WorkloadServiceWeb.Schemas.Task do OpenApiSpex.schema(%{ title: "Plan", type: :object, + required: [:plan_id, :name, :premium, :coverage_details], properties: %{ plan_id: %Schema{type: :string}, name: %Schema{type: :string}, + premium: %Schema{type: :number, exclusiveMinimum: 0}, + coverage_details: %Schema{type: :object, additionalProperties: true} + } + }) + end + + defmodule QuoteTaskInfo do + require OpenApiSpex + + OpenApiSpex.schema(%{ + title: "QuoteTaskInfo", + type: :object, + required: [:provider_id, :applicant_info, :policy_details], + properties: %{ + provider_id: %Schema{type: :string}, + applicant_info: %Schema{type: :object, additionalProperties: true}, + policy_details: %Schema{type: :object, additionalProperties: true}, + provider_email: %Schema{type: :string, nullable: true} + } + }) + end + + defmodule SolicitationTaskInfo do + require OpenApiSpex + + OpenApiSpex.schema(%{ + title: "SolicitationTaskInfo", + type: :object, + required: [:provider_id, :plan_id, :plan_name, :premium, :coverage_details], + properties: %{ + provider_id: %Schema{type: :string}, + plan_id: %Schema{type: :string}, + plan_name: %Schema{type: :string}, premium: %Schema{type: :number}, coverage_details: %Schema{type: :object, additionalProperties: true} } }) end + defmodule QuoteSubmissionDetail do + require OpenApiSpex + + OpenApiSpex.schema(%{ + title: "QuoteSubmissionDetail", + type: :object, + required: [:quote_id, :recorded_by, :valid_until, :plans], + properties: %{ + quote_id: %Schema{type: :string}, + recorded_by: %Schema{type: :string}, + valid_until: %Schema{type: :string, format: :date}, + plans: %Schema{type: :array, items: Plan, minItems: 1}, + document_data: %Schema{type: :object, additionalProperties: true, nullable: true}, + document_url: %Schema{type: :string, nullable: true} + } + }) + end + + defmodule SolicitationSubmissionDetail do + require OpenApiSpex + + OpenApiSpex.schema(%{ + title: "SolicitationSubmissionDetail", + type: :object, + required: [:provider_policy_number, :effective_date, :expiry_date], + properties: %{ + provider_policy_number: %Schema{type: :string}, + effective_date: %Schema{type: :string, format: :date}, + expiry_date: %Schema{type: :string, format: :date} + } + }) + end + defmodule QuoteSubmission do require OpenApiSpex OpenApiSpex.schema(%{ title: "QuoteSubmission", type: :object, - required: [:recorded_by, :quote_id], + required: [:quote_id, :recorded_by, :valid_until, :plans], properties: %{ - recorded_by: %Schema{type: :string}, quote_id: %Schema{type: :string}, + recorded_by: %Schema{type: :string}, valid_until: %Schema{type: :string, format: :date}, - plans: %Schema{type: :array, items: Plan}, - document_url: %Schema{type: :string}, - document_data: %Schema{type: :object, additionalProperties: true} + plans: %Schema{type: :array, items: Plan, minItems: 1}, + document_data: %Schema{type: :object, additionalProperties: true, nullable: true}, + document_url: %Schema{type: :string, nullable: true} } }) end @@ -57,9 +124,11 @@ defmodule WorkloadServiceWeb.Schemas.Task do OpenApiSpex.schema(%{ title: "SolicitationSubmission", type: :object, - required: [:recorded_by], + required: [:provider_policy_number, :effective_date, :expiry_date], properties: %{ - recorded_by: %Schema{type: :string} + provider_policy_number: %Schema{type: :string}, + effective_date: %Schema{type: :string, format: :date}, + expiry_date: %Schema{type: :string, format: :date} } }) end @@ -74,7 +143,8 @@ defmodule WorkloadServiceWeb.Schemas.Task do id: %Schema{type: :string}, org_id: %Schema{type: :string}, application_id: %Schema{type: :string}, - task_info: %Schema{type: :object}, + policy_type: %Schema{type: :string, enum: ["car", "life", "fire"]}, + task_info: %Schema{oneOf: [QuoteTaskInfo, SolicitationTaskInfo]}, status: %Schema{type: :string, enum: ["created", "draft", "approved", "completed"]}, created_at: %Schema{type: :string, format: :"date-time"} } @@ -91,8 +161,9 @@ defmodule WorkloadServiceWeb.Schemas.Task do id: %Schema{type: :string}, org_id: %Schema{type: :string}, application_id: %Schema{type: :string}, - task_info: %Schema{type: :object}, - submission: %Schema{type: :object, nullable: true}, + policy_type: %Schema{type: :string, enum: ["car", "life", "fire"]}, + task_info: %Schema{oneOf: [QuoteTaskInfo, SolicitationTaskInfo]}, + submission: %Schema{oneOf: [QuoteSubmissionDetail, SolicitationSubmissionDetail], nullable: true}, attachments: %Schema{type: :array, items: %Schema{type: :string}}, status: %Schema{type: :string, enum: ["created", "draft", "approved", "completed"]}, created_at: %Schema{type: :string, format: :"date-time"}, diff --git a/ops/chart/values.yaml b/ops/chart/values.yaml index f390793..03ac917 100644 --- a/ops/chart/values.yaml +++ b/ops/chart/values.yaml @@ -155,6 +155,21 @@ rawResources: name: rabbitmq namespace: rabbitmq + exchange-solicitation-task-completed: + enabled: true + apiVersion: rabbitmq.com/v1beta1 + kind: Exchange + suffix: exchange-solicitation-task-completed + spec: + spec: + name: workload_service.events.solicitation_task_completed + type: topic + durable: true + vhost: "application" + rabbitmqClusterReference: + name: rabbitmq + namespace: rabbitmq + password-generator: enabled: true apiVersion: generators.external-secrets.io/v1alpha1