add authentication with zitadel
Some checks failed
Build and Publish / build-release (push) Failing after 1m49s
Some checks failed
Build and Publish / build-release (push) Failing after 1m49s
This commit is contained in:
184
lib/policy_service_web/plugs/authorization_plug.ex
Normal file
184
lib/policy_service_web/plugs/authorization_plug.ex
Normal file
@@ -0,0 +1,184 @@
|
||||
defmodule PolicyServiceWeb.Plugs.AuthorizationPlug do
|
||||
@moduledoc """
|
||||
Authorization plug for enforcing role-based access control.
|
||||
|
||||
This plug checks if authenticated users have the required roles
|
||||
and scopes to access protected resources.
|
||||
"""
|
||||
|
||||
import Plug.Conn
|
||||
require Logger
|
||||
|
||||
@doc """
|
||||
Initializes the authorization plug.
|
||||
|
||||
## Options
|
||||
- :required_roles - List of roles that can access the resource (optional)
|
||||
- :required_scopes - List of scopes required to access the resource (optional)
|
||||
- :resource_owner_check - Function to check if user owns the resource (optional)
|
||||
"""
|
||||
def init(opts) do
|
||||
%{
|
||||
required_roles: Keyword.get(opts, :required_roles, []),
|
||||
required_scopes: Keyword.get(opts, :required_scopes, []),
|
||||
resource_owner_check: Keyword.get(opts, :resource_owner_check, nil)
|
||||
}
|
||||
end
|
||||
|
||||
@doc """
|
||||
Authorizes the request by checking roles and scopes.
|
||||
|
||||
Verifies that the authenticated user has the required roles and scopes
|
||||
to access the requested resource.
|
||||
"""
|
||||
def call(conn, config) do
|
||||
user_roles = conn.assigns[:roles] || []
|
||||
user_scopes = conn.assigns[:scopes] || []
|
||||
user_id = conn.assigns[:user_id]
|
||||
org_id = conn.assigns[:org_id]
|
||||
|
||||
with :ok <- check_roles(user_roles, config.required_roles),
|
||||
:ok <- check_scopes(user_scopes, config.required_scopes),
|
||||
:ok <- check_resource_ownership(conn, config.resource_owner_check, user_id, org_id) do
|
||||
conn
|
||||
else
|
||||
{:error, :insufficient_role} ->
|
||||
handle_insufficient_role(conn, config.required_roles)
|
||||
|
||||
{:error, :insufficient_scope} ->
|
||||
handle_insufficient_scope(conn, config.required_scopes)
|
||||
|
||||
{:error, :not_owner} ->
|
||||
handle_not_owner(conn)
|
||||
end
|
||||
end
|
||||
|
||||
defp check_roles(_user_roles, required_roles) when required_roles == [] do
|
||||
:ok
|
||||
end
|
||||
|
||||
defp check_roles(user_roles, required_roles) do
|
||||
if has_any_role?(user_roles, required_roles) do
|
||||
:ok
|
||||
else
|
||||
Logger.warning(
|
||||
"User with roles #{inspect(user_roles)} lacks required roles: #{inspect(required_roles)}"
|
||||
)
|
||||
|
||||
{:error, :insufficient_role}
|
||||
end
|
||||
end
|
||||
|
||||
defp check_scopes(_user_scopes, required_scopes) when required_scopes == [] do
|
||||
:ok
|
||||
end
|
||||
|
||||
defp check_scopes(user_scopes, required_scopes) do
|
||||
if has_all_scopes?(user_scopes, required_scopes) do
|
||||
:ok
|
||||
else
|
||||
Logger.warning(
|
||||
"User with scopes #{inspect(user_scopes)} lacks required scopes: #{inspect(required_scopes)}"
|
||||
)
|
||||
|
||||
{:error, :insufficient_scope}
|
||||
end
|
||||
end
|
||||
|
||||
defp check_resource_ownership(_conn, nil, _user_id, _org_id) do
|
||||
:ok
|
||||
end
|
||||
|
||||
defp check_resource_ownership(conn, check_function, user_id, org_id) do
|
||||
try do
|
||||
if check_function.(conn, user_id, org_id) do
|
||||
:ok
|
||||
else
|
||||
{:error, :not_owner}
|
||||
end
|
||||
rescue
|
||||
error ->
|
||||
Logger.error("Resource ownership check failed: #{inspect(error)}")
|
||||
{:error, :not_owner}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Checks if the user has any of the required roles.
|
||||
|
||||
Supports role hierarchy where admin has all permissions.
|
||||
"""
|
||||
def has_any_role?(user_roles, required_roles) do
|
||||
cond do
|
||||
"admin" in user_roles ->
|
||||
true
|
||||
|
||||
Enum.any?(required_roles, fn role -> role in user_roles end) ->
|
||||
true
|
||||
|
||||
true ->
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Checks if the user has all of the required scopes.
|
||||
"""
|
||||
def has_all_scopes?(user_scopes, required_scopes) do
|
||||
Enum.all?(required_scopes, fn scope -> scope in user_scopes end)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Checks if the user can access a specific resource based on ownership.
|
||||
|
||||
Admins can access any resource. Other users can only access their own resources.
|
||||
"""
|
||||
def can_access_resource?(user_roles, resource_user_id, current_user_id) do
|
||||
cond do
|
||||
"admin" in user_roles ->
|
||||
true
|
||||
|
||||
resource_user_id == current_user_id ->
|
||||
true
|
||||
|
||||
true ->
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_insufficient_role(conn, required_roles) do
|
||||
conn
|
||||
|> put_resp_content_type("application/json")
|
||||
|> send_resp(
|
||||
403,
|
||||
Jason.encode!(%{
|
||||
error: "Insufficient permissions",
|
||||
required_roles: required_roles
|
||||
})
|
||||
)
|
||||
|> halt()
|
||||
end
|
||||
|
||||
defp handle_insufficient_scope(conn, required_scopes) do
|
||||
conn
|
||||
|> put_resp_content_type("application/json")
|
||||
|> send_resp(
|
||||
403,
|
||||
Jason.encode!(%{
|
||||
error: "Insufficient permissions",
|
||||
required_scopes: required_scopes
|
||||
})
|
||||
)
|
||||
|> halt()
|
||||
end
|
||||
|
||||
defp handle_not_owner(conn) do
|
||||
conn
|
||||
|> put_resp_content_type("application/json")
|
||||
|> send_resp(
|
||||
403,
|
||||
Jason.encode!(%{error: "You do not have permission to access this resource"})
|
||||
)
|
||||
|> halt()
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user