defmodule PolicyServiceWeb.Plugs.AuthorizeRoles do @moduledoc """ Authorize request based on Zitadel role permissions. After token introspection, checks if the user holds any of the `required_permissions` roles for the organization identified by `X-Organization-Id` header. The Zitadel roles claim structure is: %{"urn:zitadel:iam:org:project::roles": { "": { "": "" }, "": { "": "" } }} """ @behaviour Plug import Plug.Conn @impl Plug def init(opts), do: opts |> Keyword.validate!([ :roles_claim, :required_permissions ]) @impl Plug def call(conn, opts) do if authorized?(conn, opts.roles_claim, opts.required_permissions) do conn else conn |> put_resp_content_type("application/json") |> halt() |> send_resp( :forbidden, %{error: "Forbidden", reason: "Missing required role"} ) end end defp authorized?(conn, roles_claim, required_permissions) do org_id = conn.private[PolicyServiceWeb.Plugs.ExtractOrganizationId] with true <- org_id_given?(org_id), roles_map <- get_roles_map(conn, roles_claim), true <- has_any_role?(roles_map, org_id, required_permissions) do true else _ -> false end end defp org_id_given?(org_id), do: not is_nil(org_id) defp get_roles_map(conn, roles_claim) do case conn.private[Oidcc.Plug.IntrospectToken] do %{extra: %{^roles_claim => %{} = roles_map}} -> Map.get(roles_map, roles_claim, %{}) _ -> %{} end end defp has_any_role?(roles_map, org_id, required_permissions) do Enum.any?(required_permissions, fn role -> role_orgs = Map.get(roles_map, role, %{}) Map.has_key?(role_orgs, org_id) end) end end