Some checks failed
Build and Publish / build-release (push) Failing after 1m49s
128 lines
3.2 KiB
Elixir
128 lines
3.2 KiB
Elixir
defmodule PolicyServiceWeb.Plugs.ClaimsExtractor do
|
|
@moduledoc """
|
|
Extracts and normalizes Zitadel claims from validated JWT tokens.
|
|
|
|
This module handles the extraction of organization ID, user ID, roles,
|
|
and scopes from Zitadel standard claims and normalizes them for use
|
|
throughout the application.
|
|
"""
|
|
|
|
@doc """
|
|
Extracts and normalizes claims from a validated OIDCC token.
|
|
|
|
## Parameters
|
|
- token: The validated OIDCC token struct
|
|
|
|
## Returns
|
|
A map containing normalized claims:
|
|
- org_id: Organization ID from Zitadel claims
|
|
- user_id: User ID from Zitadel claims
|
|
- roles: List of user roles
|
|
- scopes: List of granted scopes
|
|
"""
|
|
def extract_claims(token) do
|
|
claims = token.claims
|
|
|
|
%{
|
|
org_id: get_org_id(claims),
|
|
user_id: get_user_id(claims),
|
|
roles: get_roles(claims),
|
|
scopes: get_scopes(claims)
|
|
}
|
|
end
|
|
|
|
@doc """
|
|
Extracts organization ID from Zitadel claims.
|
|
|
|
Uses the Zitadel-specific claim 'urn:zitadel:iam:user:resourceowner:id'
|
|
or falls back to the standard 'azp' (authorized party) claim.
|
|
"""
|
|
def get_org_id(claims) do
|
|
claims["urn:zitadel:iam:user:resourceowner:id"] ||
|
|
claims["azp"] ||
|
|
"default"
|
|
end
|
|
|
|
@doc """
|
|
Extracts user ID from Zitadel claims.
|
|
|
|
Uses the standard 'sub' (subject) claim as the primary user identifier.
|
|
"""
|
|
def get_user_id(claims) do
|
|
claims["sub"] ||
|
|
raise "Missing required user_id claim"
|
|
end
|
|
|
|
@doc """
|
|
Extracts roles from Zitadel claims.
|
|
|
|
Zitadel provides roles in the 'urn:zitadel:iam:org:project:roles' claim
|
|
with a nested structure containing role keys and organization context.
|
|
"""
|
|
def get_roles(claims) do
|
|
project_roles = claims["urn:zitadel:iam:org:project:roles"] || []
|
|
extract_roles_from_project_roles(project_roles)
|
|
end
|
|
|
|
@doc """
|
|
Extracts roles from Zitadel's nested project roles structure.
|
|
|
|
The structure is typically:
|
|
[
|
|
%{"role1" => %{"id" => "org1", "primaryDomain" => "domain1"}},
|
|
%{"role2" => %{"id" => "org2", "primaryDomain" => "domain2"}}
|
|
]
|
|
"""
|
|
def extract_roles_from_project_roles(project_roles) when is_list(project_roles) do
|
|
project_roles
|
|
|> Enum.flat_map(fn role_map ->
|
|
role_map
|
|
|> Map.keys()
|
|
|> Enum.reject(&(&1 == "id" || &1 == "primaryDomain"))
|
|
end)
|
|
|> Enum.map(&String.downcase/1)
|
|
|> Enum.uniq()
|
|
end
|
|
|
|
def extract_roles_from_project_roles(_), do: []
|
|
|
|
@doc """
|
|
Extracts scopes from Zitadel claims.
|
|
|
|
Uses the standard 'scope' claim and splits it into a list.
|
|
"""
|
|
def get_scopes(claims) do
|
|
scope_string = claims["scope"] || ""
|
|
|
|
scope_string
|
|
|> String.split(" ")
|
|
|> Enum.reject(&(&1 == ""))
|
|
end
|
|
|
|
@doc """
|
|
Checks if the given claims contain a specific role.
|
|
"""
|
|
def has_role?(claims, role) when is_binary(role) do
|
|
roles = get_roles(claims)
|
|
role_lower = String.downcase(role)
|
|
role_lower in roles
|
|
end
|
|
|
|
@doc """
|
|
Checks if the given claims contain a specific scope.
|
|
"""
|
|
def has_scope?(claims, scope) when is_binary(scope) do
|
|
scopes = get_scopes(claims)
|
|
scope in scopes
|
|
end
|
|
|
|
@doc """
|
|
Validates that all required claims are present.
|
|
"""
|
|
def validate_claims!(claims) do
|
|
_user_id = get_user_id(claims)
|
|
_org_id = get_org_id(claims)
|
|
:ok
|
|
end
|
|
end
|