partition by org_id and add auth
All checks were successful
Build and Publish / build-release (push) Successful in 3m7s

This commit is contained in:
2026-05-15 10:08:54 -05:00
parent a0b5e0c0b3
commit 4519f797fd
26 changed files with 687 additions and 112 deletions

View File

@@ -0,0 +1,48 @@
defmodule CustomerService.Aggregates.CustomerId do
@type t :: %__MODULE__{
org_id: String.t(),
customer_type: String.t(),
customer_id: String.t()
}
@derive {Jason.Encoder, only: [:org_id, :customer_type, :customer_id]}
defstruct [:org_id, :customer_type, :customer_id]
def new(org_id, customer_type, customer_id)
when is_binary(org_id) and is_binary(customer_type) and is_binary(customer_id) do
%__MODULE__{
org_id: org_id,
customer_type: customer_type,
customer_id: customer_id
}
end
def parse(<<_::binary>> = string) do
case String.split(string, ":", parts: 3) do
[org_id, customer_type, customer_id] ->
{:ok, %__MODULE__{org_id: org_id, customer_type: customer_type, customer_id: customer_id}}
_ ->
{:error, :invalid_customer_id}
end
end
def parse! do
{:error, :invalid_customer_id}
end
def to_string(%__MODULE__{
org_id: org_id,
customer_type: customer_type,
customer_id: customer_id
}) do
org_id <> ":" <> customer_type <> ":" <> customer_id
end
defimpl Commanded.Serialization.JsonDecoder do
def decode(%{org_id: org_id, customer_type: customer_type, customer_id: customer_id}) do
CustomerService.Aggregates.CustomerId.new(org_id, customer_type, customer_id)
end
def decode(id), do: id
end
end

View File

@@ -0,0 +1,39 @@
defmodule CustomerService.Aggregates.LeadId do
@type t :: %__MODULE__{
org_id: String.t(),
type: String.t(),
lead_id: String.t()
}
@derive {Jason.Encoder, only: [:org_id, :type, :lead_id]}
defstruct [:org_id, :type, :lead_id]
def new(org_id, lead_id) when is_binary(org_id) and is_binary(lead_id) do
%__MODULE__{
org_id: org_id,
type: "lead",
lead_id: lead_id
}
end
def parse(<<_::binary>> = string) do
case String.split(string, ":", parts: 3) do
[org_id, "lead", lead_id] ->
{:ok, %__MODULE__{org_id: org_id, type: "lead", lead_id: lead_id}}
_ ->
{:error, :invalid_lead_id}
end
end
def to_string(%__MODULE__{org_id: org_id, type: "lead", lead_id: lead_id}) do
org_id <> ":" <> "lead" <> ":" <> lead_id
end
defimpl Commanded.Serialization.JsonDecoder do
def decode(%{org_id: org_id, type: "lead", lead_id: lead_id}) do
CustomerService.Aggregates.LeadId.new(org_id, lead_id)
end
def decode(id), do: id
end
end

View File

@@ -7,6 +7,21 @@ defmodule CustomerService.Application do
@impl true
def start(_type, _args) do
oidcc_child =
case Application.get_env(:customer_service, :zitadel) do
nil ->
[]
cfg ->
[
{Oidcc.ProviderConfiguration.Worker,
%{
issuer: cfg[:issuer],
name: CustomerService.ZitadelProvider
}}
]
end
children = [
CustomerService.CommandedApp,
CustomerService.Repo,
@@ -14,21 +29,14 @@ defmodule CustomerService.Application do
CustomerService.Projectors.QuickLead,
CustomerServiceWeb.Telemetry,
{DNSCluster, query: Application.get_env(:customer_service, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: CustomerService.PubSub},
# Start a worker by calling: CustomerService.Worker.start_link(arg)
# {CustomerService.Worker, arg},
# Start to serve requests, typically the last entry
CustomerServiceWeb.Endpoint
{Phoenix.PubSub, name: CustomerService.PubSub}
| oidcc_child ++ [CustomerServiceWeb.Endpoint]
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: CustomerService.Supervisor]
Supervisor.start_link(children, opts)
end
# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
@impl true
def config_change(changed, _new, removed) do
CustomerServiceWeb.Endpoint.config_change(changed, removed)

View File

@@ -2,12 +2,13 @@ defmodule CustomerService.Customer.Queries do
alias CustomerService.Projections.Customer
alias CustomerService.Repo
def list_customers(params \\ %{}) do
def list_by_org(org_id, params \\ %{}) when is_binary(org_id) do
params = Map.put(params, :org_id, org_id)
Flop.validate_and_run(Customer, params, for: Customer)
end
def get_customer(id) do
case Repo.get(Customer, id) do
def get_by_org(org_id, customer_id) when is_binary(org_id) and is_binary(customer_id) do
case Repo.get_by(Customer, org_id: org_id, customer_id: customer_id) do
nil -> {:error, :not_found}
customer -> {:ok, customer}
end

View File

@@ -1,4 +1,30 @@
defmodule CustomerService.Events do
@moduledoc """
Events macro for adding JsonDecoder to domain events.
"""
alias CustomerService.Aggregates.CustomerId
alias CustomerService.Aggregates.LeadId
defmacro __using__(_opts) do
quote do
defimpl Commanded.Serialization.JsonDecoder do
def decode(%{id: %CustomerId{} = id} = event) do
%{event | id: id}
end
def decode(%{id: %LeadId{} = id} = event) do
%{event | id: id}
end
def decode(event), do: event
end
end
end
end
defmodule CustomerService.Events.CustomerCreated do
use CustomerService.Events
@derive Jason.Encoder
defstruct [
:id,
@@ -14,6 +40,7 @@ defmodule CustomerService.Events.CustomerCreated do
end
defmodule CustomerService.Events.CorporateCustomerCreated do
use CustomerService.Events
@derive Jason.Encoder
defstruct [
:id,
@@ -29,6 +56,7 @@ defmodule CustomerService.Events.CorporateCustomerCreated do
end
defmodule CustomerService.Events.CustomerUpdated do
use CustomerService.Events
@derive Jason.Encoder
defstruct [
:id,
@@ -44,6 +72,7 @@ defmodule CustomerService.Events.CustomerUpdated do
end
defmodule CustomerService.Events.CorporateCustomerUpdated do
use CustomerService.Events
@derive Jason.Encoder
defstruct [
:id,
@@ -59,6 +88,7 @@ defmodule CustomerService.Events.CorporateCustomerUpdated do
end
defmodule CustomerService.Events.QuickLeadCreated do
use CustomerService.Events
@derive Jason.Encoder
defstruct [
:id,
@@ -77,6 +107,7 @@ defmodule CustomerService.Events.QuickLeadCreated do
end
defmodule CustomerService.Events.QuickLeadUpdated do
use CustomerService.Events
@derive Jason.Encoder
defstruct [
:id,
@@ -92,6 +123,7 @@ defmodule CustomerService.Events.QuickLeadUpdated do
end
defmodule CustomerService.Events.LeadStatusUpdated do
use CustomerService.Events
@derive Jason.Encoder
defstruct [:id, :status, :previous_status, :updated_at]
end

View File

@@ -2,6 +2,18 @@ defmodule CustomerService.Lead.Queries do
alias CustomerService.Projections.QuickLead
alias CustomerService.Repo
def list_by_org(org_id, params \\ %{}) when is_binary(org_id) do
params = Map.put(params, :org_id, org_id)
Flop.validate_and_run(QuickLead, params, for: QuickLead)
end
def get_by_org(org_id, lead_id) when is_binary(org_id) and is_binary(lead_id) do
case Repo.get_by(QuickLead, org_id: org_id, lead_id: lead_id) do
nil -> {:error, :not_found}
lead -> {:ok, lead}
end
end
def list_leads(params \\ %{}) do
Flop.validate_and_run(QuickLead, params, for: QuickLead)
end

View File

@@ -4,6 +4,8 @@ defmodule CustomerService.Projections.Customer do
@derive {Jason.Encoder,
only: [
:id,
:org_id,
:customer_id,
:customer_type,
# individual
:first_name,
@@ -27,7 +29,7 @@ defmodule CustomerService.Projections.Customer do
@derive {
Flop.Schema,
filterable: [:customer_type, :email, :phone, :document_id, :ruc, :search],
filterable: [:org_id, :customer_type, :email, :phone, :document_id, :ruc, :search],
sortable: [:last_name, :legal_name, :inserted_at],
default_limit: 20,
max_limit: 100,
@@ -40,10 +42,13 @@ defmodule CustomerService.Projections.Customer do
]
}
@primary_key {:id, :binary_id, autogenerate: false}
@primary_key {:id, :string, autogenerate: false}
@timestamps_opts [type: :utc_datetime_usec]
schema "customers" do
field :org_id, :string
field :customer_id, :string
field :customer_type, :string, default: "individual"
# individual fields

View File

@@ -4,6 +4,8 @@ defmodule CustomerService.Projections.QuickLead do
@derive {Jason.Encoder,
only: [
:id,
:org_id,
:lead_id,
:name,
:email,
:phone,
@@ -22,7 +24,7 @@ defmodule CustomerService.Projections.QuickLead do
@derive {
Flop.Schema,
filterable: [:status, :priority, :source, :assigned_to, :search],
filterable: [:org_id, :status, :priority, :source, :assigned_to, :search],
sortable: [:name, :company_name, :status, :priority, :inserted_at],
default_limit: 20,
max_limit: 100,
@@ -35,10 +37,13 @@ defmodule CustomerService.Projections.QuickLead do
]
}
@primary_key {:id, :binary_id, autogenerate: false}
@primary_key {:id, :string, autogenerate: false}
@timestamps_opts [type: :utc_datetime_usec]
schema "quick_leads" do
field :org_id, :string
field :lead_id, :string
field :name, :string
field :email, :string
field :phone, :string

View File

@@ -7,10 +7,15 @@ defmodule CustomerService.Projectors.Customer do
alias CustomerService.Events
alias CustomerService.Projections.Customer
alias CustomerService.Aggregates.CustomerId
project(%Events.CustomerCreated{} = event, fn multi ->
%CustomerService.Aggregates.CustomerId{org_id: org_id, customer_id: customer_id} = event.id
Ecto.Multi.insert(multi, :customer, %Customer{
id: event.id,
id: CustomerId.to_string(event.id),
org_id: org_id,
customer_id: customer_id,
customer_type: "individual",
first_name: event.first_name,
last_name: event.last_name,
@@ -34,8 +39,12 @@ defmodule CustomerService.Projectors.Customer do
end
project(%Events.CorporateCustomerCreated{} = e, _meta, fn multi ->
%CustomerService.Aggregates.CustomerId{org_id: org_id, customer_id: customer_id} = e.id
Ecto.Multi.insert(multi, :customer, %Customer{
id: e.id,
id: CustomerId.to_string(e.id),
org_id: org_id,
customer_id: customer_id,
customer_type: "corporate",
legal_name: e.legal_name,
commercial_name: e.commercial_name,
@@ -49,7 +58,9 @@ defmodule CustomerService.Projectors.Customer do
end)
project(%Events.CustomerUpdated{} = e, _meta, fn multi ->
Ecto.Multi.update_all(multi, :customer, from(c in Customer, where: c.id == ^e.id),
composite_id = CustomerId.to_string(e.id)
Ecto.Multi.update_all(multi, :customer, from(c in Customer, where: c.id == ^composite_id),
set: [
first_name: e.first_name,
last_name: e.last_name,
@@ -64,7 +75,9 @@ defmodule CustomerService.Projectors.Customer do
end)
project(%Events.CorporateCustomerUpdated{} = e, _meta, fn multi ->
Ecto.Multi.update_all(multi, :customer, from(c in Customer, where: c.id == ^e.id),
composite_id = CustomerId.to_string(e.id)
Ecto.Multi.update_all(multi, :customer, from(c in Customer, where: c.id == ^composite_id),
set: [
legal_name: e.legal_name,
commercial_name: e.commercial_name,

View File

@@ -7,10 +7,15 @@ defmodule CustomerService.Projectors.QuickLead do
alias CustomerService.Events
alias CustomerService.Projections.QuickLead
alias CustomerService.Aggregates.LeadId
project(%Events.QuickLeadCreated{} = event, fn multi ->
%CustomerService.Aggregates.LeadId{org_id: org_id, lead_id: lead_id} = event.id
Ecto.Multi.insert(multi, :quick_lead, %QuickLead{
id: event.id,
id: LeadId.to_string(event.id),
org_id: org_id,
lead_id: lead_id,
name: event.name,
email: event.email,
phone: event.phone,
@@ -33,7 +38,9 @@ defmodule CustomerService.Projectors.QuickLead do
end)
project(%Events.QuickLeadUpdated{} = event, _meta, fn multi ->
Ecto.Multi.update_all(multi, :quick_lead, from(q in QuickLead, where: q.id == ^event.id),
composite_id = LeadId.to_string(event.id)
Ecto.Multi.update_all(multi, :quick_lead, from(q in QuickLead, where: q.id == ^composite_id),
set: [
name: event.name,
email: event.email,
@@ -48,7 +55,9 @@ defmodule CustomerService.Projectors.QuickLead do
end)
project(%Events.LeadStatusUpdated{} = event, _meta, fn multi ->
Ecto.Multi.update_all(multi, :quick_lead, from(q in QuickLead, where: q.id == ^event.id),
composite_id = LeadId.to_string(event.id)
Ecto.Multi.update_all(multi, :quick_lead, from(q in QuickLead, where: q.id == ^composite_id),
set: [
status: to_string(event.status)
]

View File

@@ -1,5 +1,5 @@
defmodule CustomerServiceWeb.ApiSpec do
alias OpenApiSpex.{OpenApi, Info, Server}
alias OpenApiSpex.{OpenApi, Info, Server, Components, SecurityScheme}
alias OpenApiSpex.{Info, OpenApi, Paths, Server}
alias CustomerServiceWeb.{Endpoint, Router}
@behaviour OpenApi
@@ -8,17 +8,31 @@ defmodule CustomerServiceWeb.ApiSpec do
def spec do
%OpenApi{
servers: [
# Populate the Server info from a phoenix endpoint
Server.from_endpoint(Endpoint)
],
info: %Info{
title: "Customer Service",
version: "1.0"
},
# Populate the paths from a phoenix router
paths: Paths.from_router(Router)
paths: Paths.from_router(Router),
components: %Components{
securitySchemes: %{
"bearerAuth" => %SecurityScheme{
type: "http",
scheme: "bearer",
bearerFormat: "JWT",
description: "Zitadel JWT bearer token for authentication"
},
"x-organization-id" => %SecurityScheme{
type: "apiKey",
in: "header",
name: "x-organization-id",
description: "Organization identifier for tenant isolation"
}
}
},
security: [%{"bearerAuth" => [], "x-organization-id" => []}]
}
# Discover request/response schemas from path specs
|> OpenApiSpex.resolve_schema_modules()
end
end

View File

@@ -12,6 +12,7 @@ defmodule CustomerServiceWeb.CustomerController do
alias CustomerService.Customer.Queries, as: CustomerQueries
alias CustomerServiceWeb.Schemas.Customer, as: CustomerSchemas
alias CustomerServiceWeb.QueryHelpers
alias CustomerService.Aggregates.CustomerId
operation(:index,
summary: "List customers",
@@ -27,7 +28,9 @@ defmodule CustomerServiceWeb.CustomerController do
)
def index(conn, params) do
case CustomerQueries.list_customers(params) do
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
case CustomerQueries.list_by_org(org_id, params) do
{:ok, {customers, meta}} ->
conn
|> put_status(:ok)
@@ -62,12 +65,17 @@ defmodule CustomerServiceWeb.CustomerController do
)
def show(conn, %{"id" => id}) do
case CustomerQueries.get_customer(id) do
{:ok, customer} ->
conn |> put_status(:ok) |> json(%{data: customer_json(customer)})
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
with {:ok, %CustomerId{customer_id: local_id}} <- CustomerId.parse(id),
{:ok, customer} <- CustomerQueries.get_by_org(org_id, local_id) do
conn |> put_status(:ok) |> json(%{data: customer_json(customer)})
else
{:error, :not_found} ->
conn |> put_status(:not_found) |> json(%{error: "not found"})
:error ->
conn |> put_status(:bad_request) |> json(%{error: "invalid customer id"})
end
end
@@ -81,7 +89,10 @@ defmodule CustomerServiceWeb.CustomerController do
)
def create(conn, params) do
customer_id = Ecto.UUID.generate()
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
customer_type = "individual"
customer_uuid = Ecto.UUID.generate()
customer_id = CustomerId.new(org_id, customer_type, customer_uuid)
command = %CreateCustomer{
id: customer_id,
@@ -108,7 +119,10 @@ defmodule CustomerServiceWeb.CustomerController do
)
def create_corporate(conn, params) do
customer_id = Ecto.UUID.generate()
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
customer_type = "corporate"
customer_uuid = Ecto.UUID.generate()
customer_id = CustomerId.new(org_id, customer_type, customer_uuid)
command = %CreateCorporateCustomer{
id: customer_id,
@@ -138,19 +152,30 @@ defmodule CustomerServiceWeb.CustomerController do
)
def update(conn, %{"id" => id} = params) do
command = %UpdateCustomer{
id: id,
first_name: params["first_name"],
last_name: params["last_name"],
birth_date: parse_date(params["birth_date"]),
gender: params["gender"],
email: params["email"],
phone: params["phone"],
document_id: params["document_id"],
address: params["address"]
}
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
dispatch_and_return(conn, command, id)
with {:ok, %CustomerId{customer_id: local_id}} <- CustomerId.parse(id),
{:ok, _customer} <- CustomerQueries.get_by_org(org_id, local_id) do
command = %UpdateCustomer{
id: CustomerId.new(org_id, "individual", local_id),
first_name: params["first_name"],
last_name: params["last_name"],
birth_date: parse_date(params["birth_date"]),
gender: params["gender"],
email: params["email"],
phone: params["phone"],
document_id: params["document_id"],
address: params["address"]
}
dispatch_and_return(conn, command, CustomerId.new(org_id, "individual", local_id))
else
{:error, :not_found} ->
conn |> put_status(:not_found) |> json(%{error: "not found"})
:error ->
conn |> put_status(:bad_request) |> json(%{error: "invalid customer id"})
end
end
operation(:update_corporate,
@@ -167,29 +192,55 @@ defmodule CustomerServiceWeb.CustomerController do
)
def update_corporate(conn, %{"id" => id} = params) do
command = %UpdateCorporateCustomer{
id: id,
legal_name: params["legal_name"],
commercial_name: params["commercial_name"],
ruc: params["ruc"],
legal_rep_name: params["legal_rep_name"],
legal_rep_document_id: params["legal_rep_document_id"],
email: params["email"],
phone: params["phone"],
address: params["address"]
}
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
dispatch_and_return(conn, command, id)
with {:ok, %CustomerId{customer_id: local_id}} <- CustomerId.parse(id),
{:ok, _customer} <- CustomerQueries.get_by_org(org_id, local_id) do
command = %UpdateCorporateCustomer{
id: CustomerId.new(org_id, "corporate", local_id),
legal_name: params["legal_name"],
commercial_name: params["commercial_name"],
ruc: params["ruc"],
legal_rep_name: params["legal_rep_name"],
legal_rep_document_id: params["legal_rep_document_id"],
email: params["email"],
phone: params["phone"],
address: params["address"]
}
dispatch_and_return(conn, command, CustomerId.new(org_id, "corporate", local_id))
else
{:error, :not_found} ->
conn |> put_status(:not_found) |> json(%{error: "not found"})
:error ->
conn |> put_status(:bad_request) |> json(%{error: "invalid customer id"})
end
end
# ---------------------------------------------------------------------------
# Private
# ---------------------------------------------------------------------------
defp dispatch_and_return(conn, command, customer_id) do
defp dispatch_and_return(conn, command, %CustomerId{} = customer_id) do
case CustomerService.CommandedApp.dispatch(command, consistency: :strong) do
:ok ->
case CustomerQueries.get_customer(customer_id) do
case CustomerQueries.get_customer(CustomerId.to_string(customer_id)) do
{:ok, customer} ->
conn |> put_status(:ok) |> json(%{data: customer_json(customer)})
{:error, :not_found} ->
conn
|> put_status(:internal_server_error)
|> json(%{error: "customer created but not found in projection"})
end
{:error, reason} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: inspect(reason)})
end
end
defp dispatch_and_return(conn, command, customer_id_string)
when is_binary(customer_id_string) do
case CustomerService.CommandedApp.dispatch(command, consistency: :strong) do
:ok ->
case CustomerQueries.get_customer(customer_id_string) do
{:ok, customer} ->
conn |> put_status(:ok) |> json(%{data: customer_json(customer)})

View File

@@ -6,6 +6,7 @@ defmodule CustomerServiceWeb.LeadController do
alias CustomerService.Lead.Queries, as: LeadQueries
alias CustomerServiceWeb.Schemas.Lead, as: LeadSchemas
alias CustomerServiceWeb.QueryHelpers
alias CustomerService.Aggregates.LeadId
operation(:index,
summary: "List leads",
@@ -23,7 +24,9 @@ defmodule CustomerServiceWeb.LeadController do
)
def index(conn, params) do
case LeadQueries.list_leads(params) do
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
case LeadQueries.list_by_org(org_id, params) do
{:ok, {leads, meta}} ->
conn
|> put_status(:ok)
@@ -58,12 +61,17 @@ defmodule CustomerServiceWeb.LeadController do
)
def show(conn, %{"id" => id}) do
case LeadQueries.get_lead(id) do
{:ok, lead} ->
conn |> put_status(:ok) |> json(%{data: lead_json(lead)})
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
with {:ok, %LeadId{lead_id: local_id}} <- LeadId.parse(id),
{:ok, lead} <- LeadQueries.get_by_org(org_id, local_id) do
conn |> put_status(:ok) |> json(%{data: lead_json(lead)})
else
{:error, :not_found} ->
conn |> put_status(:not_found) |> json(%{error: "not found"})
:error ->
conn |> put_status(:bad_request) |> json(%{error: "invalid lead id"})
end
end
@@ -76,7 +84,9 @@ defmodule CustomerServiceWeb.LeadController do
)
def create(conn, params) do
lead_id = Ecto.UUID.generate()
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
lead_uuid = Ecto.UUID.generate()
lead_id = LeadId.new(org_id, lead_uuid)
command = %CreateQuickLead{
id: lead_id,
@@ -108,19 +118,30 @@ defmodule CustomerServiceWeb.LeadController do
)
def update(conn, %{"id" => id} = params) do
command = %UpdateQuickLead{
id: id,
name: params["name"],
email: params["email"],
phone: params["phone"],
company_name: params["company_name"],
notes: params["notes"],
assigned_to: params["assigned_to"],
estimated_value: parse_decimal(params["estimated_value"]),
expected_close_date: parse_date(params["expected_close_date"])
}
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
dispatch_and_return(conn, command, id)
with {:ok, %LeadId{lead_id: local_id}} <- LeadId.parse(id),
{:ok, _lead} <- LeadQueries.get_by_org(org_id, local_id) do
command = %UpdateQuickLead{
id: LeadId.new(org_id, local_id),
name: params["name"],
email: params["email"],
phone: params["phone"],
company_name: params["company_name"],
notes: params["notes"],
assigned_to: params["assigned_to"],
estimated_value: parse_decimal(params["estimated_value"]),
expected_close_date: parse_date(params["expected_close_date"])
}
dispatch_and_return(conn, command, LeadId.new(org_id, local_id))
else
{:error, :not_found} ->
conn |> put_status(:not_found) |> json(%{error: "not found"})
:error ->
conn |> put_status(:bad_request) |> json(%{error: "invalid lead id"})
end
end
operation(:update_status,
@@ -136,18 +157,47 @@ defmodule CustomerServiceWeb.LeadController do
)
def update_status(conn, %{"id" => id} = params) do
command = %UpdateLeadStatus{
id: id,
status: String.to_existing_atom(params["status"])
}
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
dispatch_and_return(conn, command, id)
with {:ok, %LeadId{lead_id: local_id}} <- LeadId.parse(id),
{:ok, _lead} <- LeadQueries.get_by_org(org_id, local_id) do
command = %UpdateLeadStatus{
id: LeadId.new(org_id, local_id),
status: String.to_existing_atom(params["status"])
}
dispatch_and_return(conn, command, LeadId.new(org_id, local_id))
else
{:error, :not_found} ->
conn |> put_status(:not_found) |> json(%{error: "not found"})
:error ->
conn |> put_status(:bad_request) |> json(%{error: "invalid lead id"})
end
end
defp dispatch_and_return(conn, command, lead_id) do
defp dispatch_and_return(conn, command, %LeadId{} = lead_id) do
case CustomerService.CommandedApp.dispatch(command, consistency: :strong) do
:ok ->
case LeadQueries.get_lead(lead_id) do
case LeadQueries.get_lead(LeadId.to_string(lead_id)) do
{:ok, lead} ->
conn |> put_status(:ok) |> json(%{data: lead_json(lead)})
{:error, :not_found} ->
conn
|> put_status(:internal_server_error)
|> json(%{error: "lead created but not found in projection"})
end
{:error, reason} ->
conn |> put_status(:unprocessable_entity) |> json(%{error: inspect(reason)})
end
end
defp dispatch_and_return(conn, command, lead_id_string) when is_binary(lead_id_string) do
case CustomerService.CommandedApp.dispatch(command, consistency: :strong) do
:ok ->
case LeadQueries.get_lead(lead_id_string) do
{:ok, lead} ->
conn |> put_status(:ok) |> json(%{data: lead_json(lead)})

View File

@@ -45,6 +45,10 @@ defmodule CustomerServiceWeb.Endpoint do
plug Plug.MethodOverride
plug Plug.Head
plug Plug.Session, @session_options
plug CORSPlug
plug CORSPlug,
origin: ["*"],
headers: ["*"]
plug CustomerServiceWeb.Router
end

View File

@@ -0,0 +1,81 @@
defmodule CustomerServiceWeb.Plugs.AuthorizeRoles do
@moduledoc """
Authorize request based on Zitadel role permissions.
After token introspection, checks if the user holds any of the
`required_permissions` roles for the organization identified by
`X-Organization-Id` header.
The Zitadel roles claim structure is:
%{"urn:zitadel:iam:org:project:<project_id>:roles": {
"<role>": {
"<org_id>": "<org_domain>"
},
"<role>": {
"<org_id>": "<org_domain>"
}
}}
"""
@behaviour Plug
import Plug.Conn
@impl Plug
def init(opts),
do:
opts
|> Keyword.validate!([
:roles_claim
])
@impl Plug
def call(conn, opts) do
if authorized?(
conn,
Keyword.get(opts, :roles_claim),
Keyword.get(opts, :required_permissions)
) do
conn
else
conn
|> put_resp_content_type("application/json")
|> halt()
|> send_resp(
:forbidden,
Jason.encode_to_iodata!(%{error: "Forbidden", reason: "Missing required role"})
)
end
end
defp authorized?(conn, roles_claim, required_permissions) do
org_id = conn.private[CustomerServiceWeb.Plugs.ExtractOrganizationId]
with true <- org_id_given?(org_id),
roles_map <- get_roles_map(conn, roles_claim),
true <- has_any_role?(roles_map, org_id, required_permissions) do
true
else
_ -> false
end
end
defp org_id_given?(org_id), do: not is_nil(org_id)
defp get_roles_map(conn, roles_claim) do
case conn.private[Oidcc.Plug.IntrospectToken] do
%Oidcc.TokenIntrospection{extra: extra} ->
Map.get(extra, roles_claim, %{})
_ ->
%{}
end
end
defp has_any_role?(roles_map, org_id, required_permissions) do
Enum.any?(required_permissions, fn role ->
role_orgs = Map.get(roles_map, role, %{})
Map.has_key?(role_orgs, org_id)
end)
end
end

View File

@@ -0,0 +1,22 @@
defmodule CustomerServiceWeb.Plugs.ExtractOrganizationId do
@moduledoc """
Extract `X-Organization-Id` request header.
Stores the organization identifier in conn.private[__MODULE__] for downstream authorization checks.
"""
@behaviour Plug
import Plug.Conn, only: [get_req_header: 2, put_private: 3]
@impl Plug
def init(_opts), do: %{}
@impl Plug
def call(conn, _opts) do
case get_req_header(conn, "x-organization-id") do
[org_id | _rest] -> put_private(conn, __MODULE__, org_id)
[] -> put_private(conn, __MODULE__, nil)
end
end
end

View File

@@ -0,0 +1,27 @@
defmodule CustomerServiceWeb.Plugs.RequireOrganizationId do
@moduledoc """
Ensure `X-Organization-Id` header is provided.
This plug must be used after `CustomerServiceWeb.Plugs.ExtractOrganizationId`.
"""
@behaviour Plug
import Plug.Conn, only: [get_req_header: 2, halt: 1, send_resp: 3]
@impl Plug
def init(_opts), do: %{}
@impl Plug
def call(conn, _opts) do
case get_req_header(conn, "x-organization-id") do
[] ->
conn
|> halt()
|> send_resp(:bad_request, "The organization id is required")
[_org_id] ->
conn
end
end
end

View File

@@ -1,5 +1,6 @@
defmodule CustomerServiceWeb.Router do
use CustomerServiceWeb, :router
alias CustomerServiceWeb.{CustomerController, LeadController}
pipeline :api do
@@ -7,28 +8,86 @@ defmodule CustomerServiceWeb.Router do
plug OpenApiSpex.Plug.PutApiSpec, module: CustomerServiceWeb.ApiSpec
end
get("/health", CustomerServiceWeb.HealthController, :health)
get("/health/ready", CustomerServiceWeb.HealthController, :ready)
pipeline :auth do
plug Oidcc.Plug.ExtractAuthorization
plug Oidcc.Plug.RequireAuthorization
plug CustomerServiceWeb.Plugs.RequireOrganizationId
plug CustomerServiceWeb.Plugs.ExtractOrganizationId
plug :introspect
end
pipeline :customer_create do
plug :authorize_roles, required_permissions: ["customer:create"]
end
pipeline :customer_read do
plug :authorize_roles, required_permissions: ["customer:read"]
end
pipeline :customer_update do
plug :authorize_roles, required_permissions: ["customer:update"]
end
pipeline :lead_create do
plug :authorize_roles, required_permissions: ["lead:create"]
end
pipeline :lead_read do
plug :authorize_roles, required_permissions: ["lead:read"]
end
pipeline :lead_update do
plug :authorize_roles, required_permissions: ["lead:update"]
end
get "/health", CustomerServiceWeb.HealthController, :health
get "/health/ready", CustomerServiceWeb.HealthController, :ready
scope "/api" do
pipe_through :api
pipe_through [:api]
get "/openapi", OpenApiSpex.Plug.RenderSpec, []
scope "/v1" do
post "/customers", CustomerController, :create
post "/customers/individual", CustomerController, :create
post "/customers/corporate", CustomerController, :create_corporate
get "/customers", CustomerController, :index
get "/customers/:id", CustomerController, :show
put "/customers/individual/:id", CustomerController, :update
put "/customers/corporate/:id", CustomerController, :update_corporate
pipe_through [:auth]
post "/leads", LeadController, :create
get "/leads", LeadController, :index
get "/leads/:id", LeadController, :show
put "/leads/:id", LeadController, :update
put "/leads/:id/status", LeadController, :update_status
scope "/" do
pipe_through [:customer_create]
post "/customers", CustomerController, :create
post "/customers/individual", CustomerController, :create
post "/customers/corporate", CustomerController, :create_corporate
end
scope "/" do
pipe_through [:customer_read]
get "/customers", CustomerController, :index
get "/customers/:id", CustomerController, :show
end
scope "/" do
pipe_through [:customer_update]
put "/customers/individual/:id", CustomerController, :update
put "/customers/corporate/:id", CustomerController, :update_corporate
end
scope "/" do
pipe_through [:lead_create]
post "/leads", LeadController, :create
end
scope "/" do
pipe_through [:lead_read]
get "/leads", LeadController, :index
get "/leads/:id", LeadController, :show
end
scope "/" do
pipe_through [:lead_update]
put "/leads/:id", LeadController, :update
put "/leads/:id/status", LeadController, :update_status
end
end
end
@@ -37,4 +96,27 @@ defmodule CustomerServiceWeb.Router do
get "/", OpenApiSpex.Plug.SwaggerUI, path: "/api/openapi"
end
end
def introspect(conn, _opts) do
zitadel = Application.get_env(:customer_service, :zitadel)
opts =
Oidcc.Plug.IntrospectToken.init(
provider: CustomerService.ZitadelProvider,
client_id: zitadel[:client_id],
client_secret: zitadel[:client_secret],
token_introspection_opts: %{client_self_only: false}
)
Oidcc.Plug.IntrospectToken.call(conn, opts)
end
def authorize_roles(conn, opts) do
zitadel = Application.get_env(:customer_service, :zitadel)
o =
CustomerServiceWeb.Plugs.AuthorizeRoles.init(roles_claim: zitadel[:roles_claim])
CustomerServiceWeb.Plugs.AuthorizeRoles.call(conn, Keyword.merge(opts, o))
end
end