init commit

This commit is contained in:
2026-04-16 14:20:58 -05:00
commit cc973cc11c
51 changed files with 2447 additions and 0 deletions

View File

@@ -0,0 +1,9 @@
defmodule WorkloadService.Aggregates.QuoteTask do
use WorkloadService.Aggregates.Task,
task_type: "quote",
commands: WorkloadService.Commands.QuoteTask
def validate_submission(_) do
:ok
end
end

View File

@@ -0,0 +1,9 @@
defmodule WorkloadService.Aggregates.SolicitationTask do
use WorkloadService.Aggregates.Task,
task_type: "solicitation",
commands: WorkloadService.Commands.SolicitationTask
def validate_submission(_) do
:ok
end
end

View File

@@ -0,0 +1,145 @@
defmodule WorkloadService.Aggregates.Task do
@moduledoc """
Behaviour and __using__ macro for task aggregates.
Each task type can override validation and add specific behavior.
Usage:
defmodule WorkloadService.Aggregates.QuoteTask do
use WorkloadService.Aggregates.Task,
task_type: "quote"
end
"""
@callback validate_submission(map()) :: :ok | {:error, term()}
defmacro __using__(opts) do
task_type = Keyword.fetch!(opts, :task_type)
commands_module = Keyword.get(opts, :commands, WorkloadService.Commands.Task)
quote do
@behaviour Commanded.Aggregates.Aggregate
@task_type unquote(task_type)
alias unquote(commands_module).CreateTask
alias unquote(commands_module).SubmitResponse
alias unquote(commands_module).ApproveSubmission
alias unquote(commands_module).CompleteTask
alias Commanded.Aggregates.Aggregate
defstruct [
:id,
:application_id,
:provider_id,
:provider_name,
:task_info,
:submission,
:attachments,
:status,
:version
]
@impl Aggregate
def execute(%__MODULE__{status: nil}, %CreateTask{} = cmd) do
%WorkloadService.Events.TaskCreated{
id: cmd.id,
application_id: cmd.application_id,
provider_id: cmd.provider_id,
provider_name: cmd.provider_name,
task_info: cmd.task_info || %{},
attachments: cmd.attachments || []
}
end
@impl Aggregate
def execute(%__MODULE__{}, %CreateTask{}) do
{:error, :task_already_exists}
end
@impl Aggregate
def execute(%__MODULE__{status: status}, %SubmitResponse{} = cmd)
when status in [nil, "created", "draft", "approved"] do
with :ok <- validate_submission(cmd.submission) do
new_status = if status == "approved", do: "draft", else: "draft"
%WorkloadService.Events.SubmissionUpdated{
id: cmd.id,
submission: cmd.submission,
attachments: cmd.attachments
}
end
end
@impl Aggregate
def execute(%__MODULE__{status: "draft"}, %ApproveSubmission{}) do
%WorkloadService.Events.SubmissionApproved{
id: nil
}
end
@impl Aggregate
def execute(%__MODULE__{status: status}, %ApproveSubmission{}) do
{:error, {:invalid_state, "cannot approve in state: #{status}"}}
end
@impl Aggregate
def execute(%__MODULE__{status: "approved"}, %CompleteTask{} = cmd) do
%WorkloadService.Events.TaskCompleted{
id: cmd.id,
completed_by: cmd.completed_by
}
end
@impl Aggregate
def execute(%__MODULE__{status: status}, %CompleteTask{}) do
{:error, {:invalid_state, "cannot complete in state: #{status}"}}
end
@impl Aggregate
def apply(%__MODULE__{} = agg, %WorkloadService.Events.TaskCreated{} = e) do
%{
agg
| id: e.id,
application_id: e.application_id,
provider_id: e.provider_id,
provider_name: e.provider_name,
task_info: e.task_info,
attachments: e.attachments,
status: "created",
version: agg.version + 1
}
end
@impl Aggregate
def apply(%__MODULE__{} = agg, %WorkloadService.Events.SubmissionUpdated{} = e) do
%{
agg
| submission: e.submission,
attachments: e.attachments || [],
status: "draft",
version: agg.version + 1
}
end
@impl Aggregate
def apply(%__MODULE__{} = agg, %WorkloadService.Events.SubmissionApproved{}) do
%{
agg
| status: "approved",
version: agg.version + 1
}
end
@impl Aggregate
def apply(%__MODULE__{} = agg, %WorkloadService.Events.TaskCompleted{}) do
%{
agg
| status: "completed",
version: agg.version + 1
}
end
defoverridable execute: 2, apply: 2
end
end
end

View File

@@ -0,0 +1,48 @@
defmodule WorkloadService.Aggregates.TaskId do
@moduledoc """
Task identifier with org_id, type and task_id.
ID format: "org_id:type:task_id" (e.g., "test:quote:uuid")
"""
@derive Jason.Encoder
defstruct [:org_id, :type, :task_id]
def new(org_id, type, task_id) when type in ["quote", "solicitation"] do
%__MODULE__{
org_id: org_id,
type: type,
task_id: task_id
}
end
def parse(string) when is_binary(string) do
case String.split(string, ":", parts: 3) do
[org_id, type, task_id] when type in ["quote", "solicitation"] ->
{:ok, new(org_id, type, task_id)}
_ ->
{:error, :invalid_task_id}
end
end
def parse!(string) do
case parse(string) do
{:ok, id} -> id
{:error, reason} -> raise ArgumentError, "invalid task id #{inspect(string)}: #{reason}"
end
end
defimpl String.Chars do
def to_string(%WorkloadService.Aggregates.TaskId{
org_id: org_id,
type: type,
task_id: task_id
}) do
"#{org_id}:#{type}:#{task_id}"
end
end
defimpl Commanded.Serialization.JsonDecoder do
def decode(id), do: id
end
end

View File

@@ -0,0 +1,29 @@
defmodule WorkloadService.Application do
@moduledoc false
use Application
@impl true
def start(_type, _args) do
children = [
WorkloadService.CommandedApp,
WorkloadService.Consumers.QuoteRequestedConsumer,
WorkloadService.Consumers.SolicitationRequestedConsumer,
WorkloadService.Projectors.TaskProjector,
WorkloadService.Repo,
WorkloadServiceWeb.Telemetry,
{DNSCluster, query: Application.get_env(:workload_service, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: WorkloadService.PubSub},
WorkloadServiceWeb.Endpoint
]
opts = [strategy: :one_for_one, name: WorkloadService.Supervisor]
Supervisor.start_link(children, opts)
end
@impl true
def config_change(changed, _new, removed) do
WorkloadServiceWeb.Endpoint.config_change(changed, removed)
:ok
end
end

View File

@@ -0,0 +1,32 @@
defmodule WorkloadService.Router do
use Commanded.Commands.Router
dispatch(
[
WorkloadService.Commands.QuoteTask.CreateTask,
WorkloadService.Commands.QuoteTask.SubmitResponse,
WorkloadService.Commands.QuoteTask.ApproveSubmission,
WorkloadService.Commands.QuoteTask.CompleteTask
],
to: WorkloadService.Aggregates.QuoteTask,
identity: :id
)
dispatch(
[
WorkloadService.Commands.SolicitationTask.CreateTask,
WorkloadService.Commands.SolicitationTask.SubmitResponse,
WorkloadService.Commands.SolicitationTask.ApproveSubmission,
WorkloadService.Commands.SolicitationTask.CompleteTask
],
to: WorkloadService.Aggregates.SolicitationTask,
identity: :id
)
end
defmodule WorkloadService.CommandedApp do
use Commanded.Application,
otp_app: :workload_service
router(WorkloadService.Router)
end

View File

@@ -0,0 +1,53 @@
defmodule WorkloadService.Commands.QuoteTask do
@moduledoc """
Quote task commands.
"""
defmodule CreateTask do
@moduledoc """
Command to create a new quote task.
"""
@derive Jason.Encoder
defstruct [:id, :application_id, :provider_id, :provider_name, :task_info, :attachments]
def new(attrs) do
struct(__MODULE__, attrs)
end
end
defmodule SubmitResponse do
@moduledoc """
Command to submit response for a quote task.
"""
@derive Jason.Encoder
defstruct [:id, :submission, :attachments]
def new(attrs) do
struct(__MODULE__, attrs)
end
end
defmodule ApproveSubmission do
@moduledoc """
Command to approve submission for a quote task.
"""
@derive Jason.Encoder
defstruct [:id]
def new(attrs) do
struct(__MODULE__, attrs)
end
end
defmodule CompleteTask do
@moduledoc """
Command to complete a quote task.
"""
@derive Jason.Encoder
defstruct [:id, :completed_by]
def new(attrs) do
struct(__MODULE__, attrs)
end
end
end

View File

@@ -0,0 +1,53 @@
defmodule WorkloadService.Commands.SolicitationTask do
@moduledoc """
Solicitation task commands.
"""
defmodule CreateTask do
@moduledoc """
Command to create a new solicitation task.
"""
@derive Jason.Encoder
defstruct [:id, :application_id, :provider_id, :provider_name, :task_info, :attachments]
def new(attrs) do
struct(__MODULE__, attrs)
end
end
defmodule SubmitResponse do
@moduledoc """
Command to submit response for a solicitation task.
"""
@derive Jason.Encoder
defstruct [:id, :submission, :attachments]
def new(attrs) do
struct(__MODULE__, attrs)
end
end
defmodule ApproveSubmission do
@moduledoc """
Command to approve submission for a solicitation task.
"""
@derive Jason.Encoder
defstruct [:id]
def new(attrs) do
struct(__MODULE__, attrs)
end
end
defmodule CompleteTask do
@moduledoc """
Command to complete a solicitation task.
"""
@derive Jason.Encoder
defstruct [:id, :completed_by]
def new(attrs) do
struct(__MODULE__, attrs)
end
end
end

View File

@@ -0,0 +1,93 @@
defmodule WorkloadService.Consumers.QuoteRequestedConsumer do
use GenServer
require Logger
alias WorkloadService.CommandedApp
alias WorkloadService.Commands.QuoteTask
@exchange "workload_service.events.quote_requested"
@queue "workload_service.quote_requested"
@routing_key "quote.requested"
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
{:ok, conn} = AMQP.Connection.open(amqp_url())
{:ok, channel} = AMQP.Channel.open(conn)
AMQP.Queue.declare(channel, @queue, durable: true)
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
{:ok, _tag} = AMQP.Basic.consume(channel, @queue)
Logger.info("QuoteRequestedConsumer started, listening on #{@queue}")
{:ok, %{channel: channel}}
end
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_deliver, payload, meta}, state) do
case process(payload) do
:ok ->
AMQP.Basic.ack(state.channel, meta.delivery_tag)
{:error, reason} ->
Logger.error("QuoteRequestedConsumer: failed to process: #{inspect(reason)}")
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false)
end
{:noreply, state}
end
defp process(payload) do
with {:ok, event} <- Jason.decode(payload),
:ok <- handle_event(event) do
:ok
end
end
defp handle_event(%{
"application_id" => application_id,
"org_id" => org_id,
"provider_id" => provider_id,
"policy_type" => policy_type
} = event) do
task_id = WorkloadService.Aggregates.TaskId.new(org_id, "quote", Ecto.UUID.generate())
command = %QuoteTask.CreateTask{
id: task_id,
application_id: application_id,
provider_id: provider_id,
provider_name: Map.get(event, "provider_name", ""),
task_info: %{
"policy_type" => policy_type,
"provider_email" => Map.get(event, "provider_email"),
"applicant_info" => Map.get(event, "applicant_info", %{}),
"car_details" => Map.get(event, "car_details", %{}),
"building_details" => Map.get(event, "building_details", %{}),
"life_details" => Map.get(event, "life_details", %{})
}
}
case CommandedApp.dispatch(command) do
:ok ->
Logger.info("QuoteRequestedConsumer: created task #{task_id}")
:ok
{:error, reason} ->
{:error, reason}
end
end
defp handle_event(event) do
{:error, {:invalid_event, event}}
end
defp amqp_url do
Application.get_env(:workload_service, :amqp_url, "amqp://guest:guest@localhost:5672")
end
end

View File

@@ -0,0 +1,138 @@
defmodule WorkloadService.Consumers.SolicitationRequestedConsumer do
use GenServer
require Logger
alias WorkloadService.CommandedApp
alias WorkloadService.Commands.SolicitationTask
@exchange "workload_service.events.solicitation_requested"
@queue "workload_service.solicitation_requested"
@routing_key "solicitation.requested"
@provider_service_url "http://localhost:4002"
@solicitation_service_url "http://localhost:8081"
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
{:ok, conn} = AMQP.Connection.open(amqp_url())
{:ok, channel} = AMQP.Channel.open(conn)
AMQP.Queue.declare(channel, @queue, durable: true)
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
{:ok, _tag} = AMQP.Basic.consume(channel, @queue)
Logger.info("SolicitationRequestedConsumer started, listening on #{@queue}")
{:ok, %{channel: channel}}
end
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_deliver, payload, meta}, state) do
case process(payload) do
:ok ->
AMQP.Basic.ack(state.channel, meta.delivery_tag)
{:error, reason} ->
Logger.error("SolicitationRequestedConsumer: failed to process: #{inspect(reason)}")
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false)
end
{:noreply, state}
end
defp process(payload) do
with {:ok, event} <- Jason.decode(payload),
{:ok, provider} <- get_provider(event["provider_id"]),
{:ok, template} <- get_active_template(provider, event["policy_type"]),
{:ok, result} <- generate_solicitation(event, template),
:ok <- create_task(event, result) do
:ok
end
end
defp get_provider(provider_id) do
url = "#{@provider_service_url}/api/v1/providers/#{provider_id}"
case Req.get(url) do
{:ok, %{status: 200, body: %{"data" => provider}}} -> {:ok, provider}
{:ok, %{status: 404}} -> {:error, :provider_not_found}
{:error, reason} -> {:error, reason}
end
end
defp get_active_template(provider, policy_type) do
templates = get_in(provider, ["templates", policy_type]) || []
default_id = get_in(provider, ["default_templates", policy_type])
template =
if default_id do
Enum.find(templates, &(&1["template_id"] == default_id))
else
Enum.find(templates, &(&1["active"] == true))
end
case template do
nil -> {:error, :no_active_template}
t -> {:ok, t}
end
end
defp generate_solicitation(event, template) do
url = "#{@solicitation_service_url}/api/solicitations/generate"
body = %{
org_id: event["org_id"],
id: event["id"],
template_document_url: template["document_url"],
fields: event["fields"] || %{}
}
case Req.post(url, json: body) do
{:ok, %{status: 200, body: result}} ->
{:ok, result}
{:ok, %{status: status, body: body}} ->
{:error, "solicitation_service returned #{status}: #{inspect(body)}"}
{:error, reason} ->
{:error, reason}
end
end
defp create_task(event, result) do
org_id = event["org_id"]
task_id = WorkloadService.Aggregates.TaskId.new(org_id, "solicitation", Ecto.UUID.generate())
command = %SolicitationTask.CreateTask{
id: task_id,
application_id: event["id"],
provider_id: event["provider_id"],
provider_name: event["provider_name"] || "",
task_info: %{
"quote_id" => event["quote_id"],
"plan_id" => event["plan_id"],
"document_url" => result["document_url"]
},
attachments: [result["document_url"]] |> Enum.filter(& &1)
}
case CommandedApp.dispatch(command) do
:ok ->
Logger.info("SolicitationRequestedConsumer: created task #{task_id}")
:ok
{:error, reason} ->
{:error, reason}
end
end
defp amqp_url do
Application.get_env(:workload_service, :amqp_url, "amqp://guest:guest@localhost:5672")
end
end

View File

@@ -0,0 +1,3 @@
defmodule WorkloadService.EventStore do
use EventStore, otp_app: :workload_service
end

View File

@@ -0,0 +1,47 @@
defmodule WorkloadService.Events do
@moduledoc """
All domain events for the workload service.
"""
defmodule TaskCreated do
@moduledoc """
Emitted when a new task is created (quote or solicitation).
ID format: "org_id:type:task_id" (e.g., "test:quote:uuid") - TaskId struct
"""
@derive Jason.Encoder
defstruct [:id, :application_id, :provider_id, :provider_name, :task_info, :attachments]
end
defmodule SubmissionUpdated do
@moduledoc """
Emitted when submission is updated (user provides response data).
"""
@derive Jason.Encoder
defstruct [:id, :submission, :attachments]
end
defmodule SubmissionApproved do
@moduledoc """
Emitted when submission is approved and ready to send.
"""
@derive Jason.Encoder
defstruct [:id]
end
defmodule TaskCompleted do
@moduledoc """
Emitted when task is completed and sent to policy-service.
Triggers RabbitMQ publish.
"""
@derive Jason.Encoder
defstruct [:id, :completed_by]
end
defmodule QuoteAccepted do
@moduledoc """
Emitted when a quote is accepted - triggers solicitation generation.
"""
@derive Jason.Encoder
defstruct [:id, :provider_id, :quote_id, :plan_id]
end
end

View File

@@ -0,0 +1,86 @@
defmodule WorkloadService.MessageBus do
@moduledoc false
use GenServer
require Logger
alias AMQP.{Connection, Channel, Exchange}
@reconnect_interval 5_000
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def publish(routing_key, payload) when is_binary(payload) do
GenServer.cast(__MODULE__, {:publish, routing_key, payload})
end
def publish(routing_key, payload) do
GenServer.cast(__MODULE__, {:publish, routing_key, Jason.encode!(payload)})
end
@impl true
def init(_opts) do
send(self(), :connect)
{:ok, %{channel: nil, connection: nil}}
end
@impl true
def handle_info(:connect, _state) do
case connect() do
{:ok, connection, channel} ->
{:noreply, %{connection: connection, channel: channel}}
{:error, reason} ->
Logger.error("Failed to connect to RabbitMQ: #{inspect(reason)}")
schedule_reconnect()
{:noreply, %{channel: nil, connection: nil}}
end
end
@impl true
def handle_info({:DOWN, _, :process, _pid, reason}, _state) do
Logger.error("RabbitMQ connection lost: #{inspect(reason)}")
schedule_reconnect()
{:noreply, %{channel: nil, connection: nil}}
end
@impl true
def handle_cast({:publish, _routing_key, _payload}, %{channel: nil} = state) do
Logger.warning("RabbitMQ not connected, message dropped")
{:noreply, state}
end
@impl true
def handle_cast({:publish, routing_key, payload}, %{channel: channel} = state) do
Exchange.direct(channel, exchange_name(), durable: true)
AMQP.Basic.publish(channel, exchange_name(), routing_key, payload,
content_type: "application/json",
persistent: true
)
{:noreply, state}
end
defp connect do
config = Application.get_env(:workload_service, WorkloadService.MessageBus, [])
with {:ok, connection} <- Connection.open(config),
{:ok, channel} <- Channel.open(connection) do
Exchange.declare(channel, exchange_name(), :direct, durable: true)
Process.monitor(connection.pid)
{:ok, connection, channel}
end
end
defp exchange_name do
Application.get_env(:workload_service, WorkloadService.MessageBus, [])
|> Keyword.get(:exchange, "workload_service.events")
end
defp schedule_reconnect do
Process.send_after(self(), :connect, @reconnect_interval)
end
end

View File

@@ -0,0 +1,61 @@
defmodule WorkloadService.Projections.Task do
@moduledoc """
Ecto schema for the task read model.
ID format: "org_id:type:task_id" (e.g., "test:quote:uuid")
"""
use Ecto.Schema
import Ecto.Changeset
@derive {Flop.Schema,
filterable: [:status, :org_id, :application_id, :provider_id],
sortable: [:inserted_at, :updated_at, :status]}
@primary_key {:id, :string, []}
schema "tasks" do
field(:org_id, :string)
field(:application_id, :string)
field(:provider_id, :string)
field(:provider_name, :string)
field(:task_info, :map)
field(:submission, :map)
field(:attachments, {:array, :string})
field(:status, :string)
field(:version, :integer, default: 1)
timestamps(type: :utc_datetime)
end
@statuses ["created", "draft", "approved", "completed"]
def status_changeset(task, attrs) do
task
|> cast(attrs, [:status, :task_info, :submission, :attachments, :updated_at])
|> validate_inclusion(:status, @statuses)
end
def create_changeset(task, attrs) do
task
|> cast(attrs, [
:id,
:org_id,
:application_id,
:provider_id,
:provider_name,
:task_info,
:submission,
:attachments,
:status,
:version
])
|> validate_required([
:id,
:org_id,
:application_id,
:provider_id,
:provider_name,
:status
])
|> validate_inclusion(:status, @statuses)
end
end

View File

@@ -0,0 +1,61 @@
defmodule WorkloadService.Projectors.TaskProjector do
use Commanded.Projections.Ecto,
application: WorkloadService.CommandedApp,
repo: WorkloadService.Repo,
name: "TaskProjector"
alias WorkloadService.Aggregates.TaskId
alias WorkloadService.Events
alias WorkloadService.Projections.Task
import Ecto.Query
project(%Events.TaskCreated{} = e, _meta, fn multi ->
task_id = TaskId.parse!(to_string(e.id))
%{org_id: org_id} = task_id
Ecto.Multi.insert(multi, :task, %Task{
id: to_string(e.id),
org_id: org_id,
application_id: e.application_id,
provider_id: e.provider_id,
provider_name: e.provider_name,
task_info: e.task_info,
attachments: e.attachments || [],
status: "created"
})
end)
project(%Events.SubmissionUpdated{} = e, _meta, fn multi ->
multi
|> Ecto.Multi.run(:fetch, fn repo, _ ->
{:ok, repo.get(Task, to_string(e.id))}
end)
|> Ecto.Multi.update(:task, fn %{fetch: task} ->
Ecto.Changeset.change(task, %{
status: "draft",
submission: e.submission,
attachments: e.attachments || []
})
end)
end)
project(%Events.SubmissionApproved{} = e, _meta, fn multi ->
multi
|> Ecto.Multi.run(:fetch, fn repo, _ ->
{:ok, repo.get(Task, to_string(e.id))}
end)
|> Ecto.Multi.update(:task, fn %{fetch: task} ->
Ecto.Changeset.change(task, %{status: "approved"})
end)
end)
project(%Events.TaskCompleted{} = e, _meta, fn multi ->
multi
|> Ecto.Multi.run(:fetch, fn repo, _ ->
{:ok, repo.get(Task, to_string(e.id))}
end)
|> Ecto.Multi.update(:task, fn %{fetch: task} ->
Ecto.Changeset.change(task, %{status: "completed"})
end)
end)
end

View File

@@ -0,0 +1,37 @@
defmodule WorkloadService.Release do
@moduledoc """
Used for executing DB release tasks when run in production without Mix
installed.
"""
@app :workload_service
def migrate do
load_app()
init_event_store()
for repo <- repos() do
{:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :up, all: true))
end
end
def rollback(repo, version) do
load_app()
{:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :down, to: version))
end
defp repos do
Application.fetch_env!(@app, :ecto_repos)
end
defp load_app do
Application.ensure_all_started(:ssl)
Application.ensure_all_started(:postgrex)
Application.ensure_loaded(@app)
end
def init_event_store do
config = WorkloadService.EventStore.config()
:ok = EventStore.Tasks.Init.exec(config, [])
end
end

View File

@@ -0,0 +1,7 @@
defmodule WorkloadService.Repo do
@moduledoc false
use Ecto.Repo,
otp_app: :workload_service,
adapter: Ecto.Adapters.Postgres
end

View File

@@ -0,0 +1,38 @@
defmodule WorkloadService.Workload.Queries do
@moduledoc """
Database query functions for tasks.
"""
import Ecto.Query
alias WorkloadService.Projections.Task
alias WorkloadService.Repo
def list_tasks(params \\ %{}) do
base_query()
|> Flop.validate_and_run(params, for: Task)
end
def list_tasks_by_org(org_id, params \\ %{}) do
base_query()
|> where(org_id: ^org_id)
|> Flop.validate_and_run(params, for: Task)
end
def get_task_by_id(id) do
case Repo.get(Task, id) do
nil -> {:error, :not_found}
task -> {:ok, task}
end
end
def get_task_by_org_and_application(org_id, application_id) do
case Repo.get_by(Task, org_id: org_id, application_id: application_id) do
nil -> {:error, :not_found}
task -> {:ok, task}
end
end
defp base_query do
from(t in Task, as: :task)
end
end

View File

@@ -0,0 +1,46 @@
defmodule WorkloadServiceWeb do
@moduledoc """
The entrypoint for defining your web interface, such as controllers, components, channels, and so on.
This can be used in your application as:
use WorkloadServiceWeb, :controller
use WorkloadServiceWeb, :html
The definitions below will be executed for every controller,
component, etc, so keep them short and clean, focused
on imports, uses and aliases.
Do NOT define functions inside the quoted expressions
below. Instead, define additional modules and import
those modules here.
"""
def static_paths, do: ~w(assets fonts images favicon.ico robots.txt)
def router do
quote do
use Phoenix.Router, helpers: false
import Plug.Conn
import Phoenix.Controller
end
end
def controller do
quote do
use Phoenix.Controller, formats: [:json]
import Plug.Conn
alias WorkloadServiceWeb.Router.Helpers, as: Routes
end
end
@doc """
When used, dispatch to the appropriate controller/live_view/etc.
"""
defmacro __using__(which) when is_atom(which) do
apply(__MODULE__, which, [])
end
end

View File

@@ -0,0 +1,20 @@
defmodule WorkloadServiceWeb.ApiSpec do
alias OpenApiSpex.{Info, OpenApi, Server}
alias WorkloadServiceWeb.Endpoint
@behaviour OpenApi
@impl OpenApi
def spec do
%OpenApi{
servers: [
Server.from_endpoint(Endpoint)
],
info: %Info{
title: "Workload Service",
version: "1.0"
},
paths: OpenApiSpex.Paths.from_router(WorkloadServiceWeb.Router)
}
|> OpenApiSpex.resolve_schema_modules()
end
end

View File

@@ -0,0 +1,3 @@
defmodule WorkloadServiceWeb.Controllers do
@moduledoc false
end

View File

@@ -0,0 +1,28 @@
defmodule WorkloadServiceWeb.FallbackController do
use WorkloadServiceWeb, :controller
def call(conn, {:error, :not_found}) do
conn
|> put_status(:not_found)
|> put_view(json: WorkloadServiceWeb.ErrorJSON)
|> render(:"404")
end
def call(conn, {:error, reason}) when is_atom(reason) do
conn
|> put_status(:unprocessable_entity)
|> json(%{error: Atom.to_string(reason)})
end
def call(conn, {:error, %{errors: errors}}) do
conn
|> put_status(:unprocessable_entity)
|> json(%{errors: errors})
end
def call(conn, {:error, reason}) do
conn
|> put_status(:unprocessable_entity)
|> json(%{error: inspect(reason)})
end
end

View File

@@ -0,0 +1,15 @@
defmodule WorkloadServiceWeb.HealthController do
use WorkloadServiceWeb, :controller
def health(conn, _params) do
conn
|> put_status(:ok)
|> json(%{status: "ok"})
end
def ready(conn, _params) do
conn
|> put_status(:ok)
|> json(%{status: "ready"})
end
end

View File

@@ -0,0 +1,249 @@
defmodule WorkloadServiceWeb.TaskController do
use WorkloadServiceWeb, :controller
use OpenApiSpex.ControllerSpecs
alias WorkloadService.CommandedApp
alias WorkloadService.Workload.Queries
alias WorkloadServiceWeb.Schemas.Task, as: S
tags(["Tasks"])
operation(:list,
summary: "List tasks",
parameters: [
page: [in: :query, type: :integer, required: false, example: 1],
page_size: [in: :query, type: :integer, required: false, example: 20],
status: [in: :query, type: :string, required: false],
policy_type: [in: :query, type: :string, required: false],
org_id: [in: :query, type: :string, required: false],
application_id: [in: :query, type: :string, required: false],
provider_id: [in: :query, type: :string, required: false]
],
responses: [
ok: {"Task list", "application/json", S.TaskListResponse},
bad_request: {"Invalid params", "application/json", S.ErrorResponse}
]
)
def list(conn, params) do
case Queries.list_tasks(params) do
{:ok, {tasks, meta}} ->
conn
|> put_status(:ok)
|> json(%{
data: Enum.map(tasks, &task_summary/1),
meta: meta_json(meta)
})
{:error, _} ->
conn |> put_status(:bad_request) |> json(%{error: "invalid parameters"})
end
end
operation(:show,
summary: "Get task by ID",
parameters: [
id: [in: :path, type: :string, required: true]
],
responses: [
ok: {"Task detail", "application/json", S.TaskDetailResponse},
not_found: {"Not found", "application/json", S.ErrorResponse}
]
)
def show(conn, %{"id" => id}) do
case Queries.get_task_by_id(id) do
{:error, :not_found} ->
conn |> put_status(:not_found) |> json(%{error: "task not found"})
{:ok, task} ->
conn |> put_status(:ok) |> json(%{data: task_detail(task)})
end
end
operation(:respond_to_quote,
summary: "Record quote response",
parameters: [
id: [in: :path, type: :string, required: true]
],
request_body: {"Quote response", "application/json", S.QuoteResponseRequest, required: true},
responses: [
ok: {"Quote response recorded", "application/json", S.TaskDetailResponse},
not_found: {"Not found", "application/json", S.ErrorResponse},
unprocessable_entity: {"Error", "application/json", S.ErrorResponse}
]
)
def respond_to_quote(conn, %{"id" => id} = params) do
case Queries.get_task_by_id(id) do
{:error, :not_found} ->
conn |> put_status(:not_found) |> json(%{error: "task not found"})
{:ok, %{status: "created"} = _task} ->
command = %WorkloadService.Commands.QuoteTask.SubmitResponse{
id: id,
submission: %{
"quote_id" => params["quote_id"],
"plans" => params["plans"],
"valid_until" => params["valid_until"],
"responded_by" => params["responded_by"],
"document_data" => params["document_data"]
},
attachments: [params["document_url"]] |> Enum.filter(& &1)
}
case CommandedApp.dispatch(command) do
:ok ->
{:ok, task} = Queries.get_task_by_id(id)
conn |> put_status(:ok) |> json(%{data: task_detail(task)})
{:error, reason} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: inspect(reason)})
end
{:ok, %{status: "draft"} = _task} ->
command = %WorkloadService.Commands.QuoteTask.ApproveSubmission{
id: id
}
case CommandedApp.dispatch(command) do
:ok ->
{:ok, task} = Queries.get_task_by_id(id)
conn |> put_status(:ok) |> json(%{data: task_detail(task)})
{:error, reason} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: inspect(reason)})
end
{:ok, %{status: "approved"} = _task} ->
command = %WorkloadService.Commands.QuoteTask.CompleteTask{
id: id,
completed_by: params["completed_by"] || "system"
}
case CommandedApp.dispatch(command) do
:ok ->
{:ok, task} = Queries.get_task_by_id(id)
conn |> put_status(:ok) |> json(%{data: task_detail(task)})
{:error, reason} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: inspect(reason)})
end
{:ok, _task} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: "invalid state"})
end
end
operation(:confirm_delivery,
summary: "Confirm solicitation delivery",
parameters: [
id: [in: :path, type: :string, required: true]
],
request_body:
{"Delivery confirmation", "application/json", S.ConfirmDeliveryRequest, required: false},
responses: [
ok: {"Delivery confirmed", "application/json", S.TaskDetailResponse},
not_found: {"Not found", "application/json", S.ErrorResponse},
unprocessable_entity: {"Error", "application/json", S.ErrorResponse}
]
)
def confirm_delivery(conn, %{"id" => id} = params) do
case Queries.get_task_by_id(id) do
{:error, :not_found} ->
conn |> put_status(:not_found) |> json(%{error: "task not found"})
{:ok, %{status: "created"} = _task} ->
command = %WorkloadService.Commands.SolicitationTask.SubmitResponse{
id: id,
submission: %{
"delivery_confirmed_by" => params["delivery_confirmed_by"] || "system"
},
attachments: []
}
case CommandedApp.dispatch(command) do
:ok ->
{:ok, task} = Queries.get_task_by_id(id)
conn |> put_status(:ok) |> json(%{data: task_detail(task)})
{:error, reason} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: inspect(reason)})
end
{:ok, %{status: "draft"} = _task} ->
command = %WorkloadService.Commands.SolicitationTask.ApproveSubmission{
id: id
}
case CommandedApp.dispatch(command) do
:ok ->
{:ok, task} = Queries.get_task_by_id(id)
conn |> put_status(:ok) |> json(%{data: task_detail(task)})
{:error, reason} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: inspect(reason)})
end
{:ok, %{status: "approved"} = _task} ->
command = %WorkloadService.Commands.SolicitationTask.CompleteTask{
id: id,
completed_by: params["completed_by"] || "system"
}
case CommandedApp.dispatch(command) do
:ok ->
{:ok, task} = Queries.get_task_by_id(id)
conn |> put_status(:ok) |> json(%{data: task_detail(task)})
{:error, reason} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: inspect(reason)})
end
{:ok, _task} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: "invalid state"})
end
end
defp task_summary(t) do
%{
id: t.id,
org_id: t.org_id,
application_id: t.application_id,
provider_id: t.provider_id,
provider_name: t.provider_name,
task_info: t.task_info,
status: t.status,
created_at: t.inserted_at
}
end
defp task_detail(t) do
%{
id: t.id,
org_id: t.org_id,
application_id: t.application_id,
provider_id: t.provider_id,
provider_name: t.provider_name,
task_info: t.task_info,
submission: t.submission,
attachments: t.attachments,
status: t.status,
version: t.version,
created_at: t.inserted_at,
updated_at: t.updated_at
}
end
defp meta_json(meta) do
%{
total_count: meta.total_count,
total_pages: meta.total_pages,
current_page: meta.current_page,
page_size: meta.page_size,
has_next: meta.has_next_page?,
has_prev: meta.has_previous_page?
}
end
end

View File

@@ -0,0 +1,33 @@
defmodule WorkloadServiceWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :workload_service
@session_options [
store: :cookie,
key: "_workload_service_key",
signing_salt: "workload_salt",
same_site: "Lax"
]
plug Plug.Static,
at: "/",
from: :workload_service,
gzip: false,
only: WorkloadServiceWeb.static_paths()
if code_reloading? do
plug Phoenix.CodeReloader
end
plug Plug.RequestId
plug Plug.Telemetry, event_prefix: [:phoenix, :endpoint]
plug Plug.Parsers,
parsers: [:urlencoded, :multipart, :json],
pass: ["*/*"],
json_decoder: Phoenix.json_library()
plug Plug.MethodOverride
plug Plug.Head
plug Plug.Session, @session_options
plug WorkloadServiceWeb.Router
end

View File

@@ -0,0 +1,5 @@
defmodule WorkloadServiceWeb.ErrorJSON do
def render(template, _assigns) do
%{errors: %{detail: Phoenix.Controller.status_message_from_template(template)}}
end
end

View File

@@ -0,0 +1,32 @@
defmodule WorkloadServiceWeb.Router do
use WorkloadServiceWeb, :router
alias WorkloadServiceWeb.TaskController
alias WorkloadServiceWeb.HealthController
pipeline :api do
plug OpenApiSpex.Plug.PutApiSpec, module: WorkloadServiceWeb.ApiSpec
end
scope "/api" do
pipe_through [:api]
get "/health", HealthController, :health
get "/health/ready", HealthController, :ready
get "/openapi", OpenApiSpex.Plug.RenderSpec, []
scope "/v1" do
get "/tasks", TaskController, :list
get "/tasks/:id", TaskController, :show
post "/tasks/:id/respond-quote", TaskController, :respond_to_quote
post "/tasks/:id/confirm-delivery", TaskController, :confirm_delivery
end
end
if Mix.env() == :dev do
scope "/swaggerui" do
get "/", OpenApiSpex.Plug.SwaggerUI, path: "/api/openapi"
end
end
end

View File

@@ -0,0 +1,146 @@
defmodule WorkloadServiceWeb.Schemas.Task do
alias OpenApiSpex.Schema
defmodule PaginationMeta do
require OpenApiSpex
OpenApiSpex.schema(%{
title: "PaginationMeta",
type: :object,
properties: %{
total_count: %Schema{type: :integer},
total_pages: %Schema{type: :integer},
current_page: %Schema{type: :integer},
page_size: %Schema{type: :integer},
has_next: %Schema{type: :boolean},
has_prev: %Schema{type: :boolean}
}
})
end
defmodule Plan do
require OpenApiSpex
OpenApiSpex.schema(%{
title: "Plan",
type: :object,
properties: %{
plan_id: %Schema{type: :string},
plan_name: %Schema{type: :string},
premium: %Schema{type: :number},
coverage_details: %Schema{type: :string},
deductible: %Schema{type: :number, nullable: true},
coverage_limit: %Schema{type: :number, nullable: true}
}
})
end
defmodule TaskSummary do
require OpenApiSpex
OpenApiSpex.schema(%{
title: "TaskSummary",
type: :object,
properties: %{
id: %Schema{type: :string},
org_id: %Schema{type: :string},
application_id: %Schema{type: :string},
provider_id: %Schema{type: :string},
provider_name: %Schema{type: :string},
task_info: %Schema{type: :object},
status: %Schema{type: :string, enum: ["created", "draft", "approved", "completed"]},
created_at: %Schema{type: :string, format: :"date-time"}
}
})
end
defmodule TaskDetail do
require OpenApiSpex
OpenApiSpex.schema(%{
title: "TaskDetail",
type: :object,
properties: %{
id: %Schema{type: :string},
org_id: %Schema{type: :string},
application_id: %Schema{type: :string},
provider_id: %Schema{type: :string},
provider_name: %Schema{type: :string},
task_info: %Schema{type: :object},
submission: %Schema{type: :object, nullable: true},
attachments: %Schema{type: :array, items: %Schema{type: :string}},
status: %Schema{type: :string, enum: ["created", "draft", "approved", "completed"]},
version: %Schema{type: :integer},
created_at: %Schema{type: :string, format: :"date-time"},
updated_at: %Schema{type: :string, format: :"date-time"}
}
})
end
defmodule QuoteResponseRequest do
require OpenApiSpex
OpenApiSpex.schema(%{
title: "QuoteResponseRequest",
type: :object,
required: [:quote_id, :document_url],
properties: %{
quote_id: %Schema{type: :string},
plans: %Schema{type: :array, items: Plan},
valid_until: %Schema{type: :string, format: :date},
responded_by: %Schema{type: :string},
document_url: %Schema{type: :string},
document_data: %Schema{type: :object, additionalProperties: true}
}
})
end
defmodule ConfirmDeliveryRequest do
require OpenApiSpex
OpenApiSpex.schema(%{
title: "ConfirmDeliveryRequest",
type: :object,
properties: %{
delivery_confirmed_by: %Schema{type: :string}
}
})
end
defmodule TaskListResponse do
require OpenApiSpex
OpenApiSpex.schema(%{
title: "TaskListResponse",
type: :object,
properties: %{
data: %Schema{type: :array, items: TaskSummary},
meta: PaginationMeta
}
})
end
defmodule TaskDetailResponse do
require OpenApiSpex
OpenApiSpex.schema(%{
title: "TaskDetailResponse",
type: :object,
properties: %{
data: TaskDetail
}
})
end
defmodule ErrorResponse do
require OpenApiSpex
OpenApiSpex.schema(%{
title: "ErrorResponse",
type: :object,
properties: %{
error: %Schema{type: :string}
}
})
end
end

View File

@@ -0,0 +1,51 @@
defmodule WorkloadServiceWeb.Telemetry do
use Supervisor
import Telemetry.Metrics
def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
@impl true
def init(_arg) do
children = [
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
]
Supervisor.init(children, strategy: :one_for_one)
end
def metrics do
[
summary("phoenix.endpoint.start.system_time",
unit: {:native, :millisecond}
),
summary("phoenix.endpoint.stop.duration",
unit: {:native, :millisecond}
),
summary("phoenix.router_dispatch.start.system_time",
tags: [:route],
unit: {:native, :millisecond}
),
summary("phoenix.router_dispatch.exception.duration",
tags: [:route],
unit: {:native, :millisecond}
),
summary("phoenix.router_dispatch.stop.duration",
tags: [:route],
unit: {:native, :millisecond}
),
summary("workload_service.repo.query.total_time",
unit: {:native, :millisecond}
),
summary("vm.memory.total", unit: {:byte, :kilobyte}),
summary("vm.total_run_queue_lengths.total"),
summary("vm.total_run_queue_lengths.cpu"),
summary("vm.total_run_queue_lengths.io")
]
end
defp periodic_measurements do
[]
end
end