First commit

This commit is contained in:
Andros Fenollosa
2025-01-24 09:39:17 +01:00
commit aad8c7f415
25 changed files with 802 additions and 0 deletions

33
src/core/decorators.py Normal file
View File

@ -0,0 +1,33 @@
from functools import wraps
from pydantic import ValidationError
from src.core.entity.ResponseTypes import ResponseTypes
def check_params(Model):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
params = kwargs.get('params', None)
if params is not None:
try:
Model.model_validate(params, strict=True)
except ValidationError as e:
errors = []
for error in e.errors():
errors.append(
{
'field': error['loc'][0],
'message': error['msg'],
}
)
return {
'type': ResponseTypes.PARAMETERS_ERROR,
'errors': errors,
}
return func(*args, **kwargs)
return wrapper
return decorator

View File

@ -0,0 +1,5 @@
from enum import Enum
class DocumentsType(Enum):
DIRECTORY = 'directory'
FILE = 'file'

View File

@ -0,0 +1,8 @@
class ResponseTypes:
SUCCESS = 'Success' # The process ended correctly
PARAMETERS_ERROR = 'ParametersError' # Missing or invalid parameters
ACCESS_DENIED = 'AccessDenied' # The user does not have permission to access the resource
RESOURCE_ERROR = 'ResourceError' # The process ended correctly but the resource is not available (DB, file, etc)
SYSTEM_ERROR = (
'SystemError' # The process ended with an error. Python error
)

View File

View File

@ -0,0 +1,19 @@
from pydantic import BaseModel, StrictStr
from src.core.decorators import check_params
class DocumentListModel(BaseModel):
app_name: StrictStr
client_id: StrictStr
@check_params(DocumentListModel)
def documents_list(storage, params) -> dict:
"""
List documents
:param storage: Storage
:return: dict
"""
return storage.list_files(params)

View File

@ -0,0 +1,4 @@
FROM base-core-app
# launcher
ENTRYPOINT fastapi dev main.py --host 0.0.0.0

View File

@ -0,0 +1,135 @@
import os
from enum import Enum
from functools import wraps
from core_auth.core.entity.ResponseTypes import ResponseTypes
from src.core.use_case import (
clients_use_case,
users_use_case,
)
from src.infra.api.fastapi.tools import (
get_auth,
)
class Permission(Enum):
CAN_READ_USERS = 'can_read_users'
CAN_EDIT_USER_DATA = 'can_edit_user_data'
CAN_EDIT_USER_PASSWORD = 'can_edit_user_password'
CAN_DELETE_USERS = 'can_delete_users'
def get_access_token(access_token: str) -> dict:
auth = get_auth()
try:
token_introspect = auth.token_introspect(
params={
'access_token': access_token,
'app_name': os.environ.get('APP_NAME'),
}
)
return {
'type': ResponseTypes.SUCCESS,
'data': token_introspect['data'],
}
except Exception as e:
return {
'type': ResponseTypes.ACCESS_DENIED,
'errors': [{'field': 'access_token', 'message': str(e)}],
'data': None,
}
def auth_required(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if os.environ.get('API_DISABLE_AUTH', 'False').lower() != 'true':
access_token = kwargs['request'].headers.get('Authorization', None)
if not access_token:
return {
'type': ResponseTypes.ACCESS_DENIED,
'errors': [
{
'field': 'access_token',
'message': 'Missing access token',
}
],
'data': None,
}
access_token = access_token.split(' ')[1]
access_token_data = get_access_token(access_token)
if access_token_data.get('data', {}).get('active', False) is False:
return {
'type': ResponseTypes.ACCESS_DENIED,
'errors': [
{
'field': 'access_token',
'message': 'Access token is not active. Check that the domain is correct and the token is not expired.',
}
],
'data': None,
}
if access_token_data['type'] == ResponseTypes.SUCCESS:
kwargs['access_token_data'] = access_token_data['data']
else:
kwargs['access_token_data'] = None
return await func(*args, **kwargs)
return wrapper
def check_permissions(permissions: list[str]):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if os.environ.get('API_DISABLE_AUTH', 'False').lower() != 'true':
access_token_data = kwargs['access_token_data']
auth = get_auth()
group_name = access_token_data.get('group', [None])[0]
user_data = users_use_case.get(
auth=auth,
params={
'app_name': os.environ.get('APP_NAME'),
'client_id': clients_use_case.get_client_by_name(
auth=auth,
params={
'app_name': os.environ.get('APP_NAME'),
'client_name': group_name,
},
)['data']['id'],
'user_id': access_token_data['sub'],
},
)
if user_data['type'] != ResponseTypes.SUCCESS:
return {
'type': ResponseTypes.ACCESS_DENIED,
'errors': [
{
'field': 'check permissions',
'message': 'We could not find the user associated with the access token when checking permissions',
}
],
'data': None,
}
user_permissions = [
permission['name']
for permission in user_data['data']['role']['permissions']
]
for permission in permissions:
if permission not in user_permissions:
return {
'type': ResponseTypes.ACCESS_DENIED,
'errors': [
{
'field': 'permissions',
'message': 'You do not have permission to access this resource',
}
],
'data': None,
}
return await func(*args, **kwargs)
return wrapper
return decorator

View File

@ -0,0 +1,45 @@
import os
from src.core.entity.ResponseTypes import ResponseTypes
from src.infra.api.fastapi.middleware import (
CorrectHTTPCodeMiddleware,
ErrorsToCamelCase,
ResponseToCamelCase,
)
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.infra.api.fastapi.routers import documents
app = FastAPI()
origins = list(
filter(
lambda url: f"{url.strip()}",
os.environ.get("API_CORS_ORIGINS", "*").split(","),
)
)
# Middlewares
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(CorrectHTTPCodeMiddleware)
app.add_middleware(ResponseToCamelCase)
app.add_middleware(ErrorsToCamelCase)
# Routers
app.include_router(documents.router)
@app.get("/")
async def index():
return {
"type": ResponseTypes.SUCCESS,
"error": [],
"data": "Welcome to the Core Auth API. Check the documentation at /docs.",
}

View File

@ -0,0 +1,69 @@
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from src.core.entity.ResponseTypes import ResponseTypes
from src.infra.api.fastapi.tools import (
convert_all_to_camel_case,
convert_keys_to_camel_case,
get_body,
set_body,
)
from fastapi import status
class CorrectHTTPCodeMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
"""
Adjust the HTTP code based on the response type
There will be delay and large use of RAM due to the size of the files that are returned.
Source: https://stackoverflow.com/questions/69670125/how-to-log-raw-http-request-response-in-python-fastapi/73464007#73464007
If the files are too large, consider using a different approach. For example, saving the files in a temporary folder and returning the path to the client or a decorator.
"""
response = await call_next(request)
response_json = await get_body(response)
if response_json:
match response_json.get("type", None):
case ResponseTypes.PARAMETERS_ERROR:
response.status_code = status.HTTP_400_BAD_REQUEST
case ResponseTypes.ACCESS_DENIED:
response.status_code = status.HTTP_401_UNAUTHORIZED
case ResponseTypes.RESOURCE_ERROR:
response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE
case ResponseTypes.SYSTEM_ERROR:
response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
case _:
response.status_code = status.HTTP_200_OK
else:
response.status_code = status.HTTP_200_OK
return response
class ResponseToCamelCase(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
"""
Convert snake_case to camelCase
"""
response = await call_next(request)
response_json = await get_body(response)
if response_json:
response_json = convert_keys_to_camel_case(response_json)
return set_body(response, response_json)
return response
class ErrorsToCamelCase(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
"""
Convert error from snake_case to camelCase
"""
response = await call_next(request)
response_json = await get_body(response)
if (
response_json
and "type" in response_json
and response_json["type"] != ResponseTypes.SUCCESS
):
response_json["errors"] = convert_all_to_camel_case(response_json["errors"])
return set_body(response, response_json)
return response

View File

@ -0,0 +1,3 @@
fastapi[standard]
pyhumps
pyjwt

View File

@ -0,0 +1,37 @@
import os
from typing import Any
from pydantic import BaseModel
from src.infra.api.fastapi.tools import (
convert_keys_to_snake_case,
)
from fastapi import APIRouter, Depends, Request, Response
from src.core.use_case import documents_use_case
from src.infra.storage.AzureStorage import AzureStorage
router = APIRouter(
prefix="/api/v1/documents",
tags=["documents"],
)
class DocumentListModel(BaseModel):
appName: Any | None = None
clientId: Any | None = None
@router.get(
"/",
summary="Get list documents",
description="Get list documents",
)
async def list_documents(
request: Request,
response: Response,
query: DocumentListModel = Depends(),
) -> dict:
storage = AzureStorage(
account_name=os.getenv("AZURE_STORAGE_ACCOUNT_NAME", ""),
account_key=os.getenv("AZURE_STORAGE_ACCOUNT_KEY", ""),
)
params = convert_keys_to_snake_case(query.dict(exclude_none=True))
return documents_use_case.documents_list(storage=storage, params=params)

View File

@ -0,0 +1,105 @@
import json
from typing import Any
from starlette.concurrency import iterate_in_threadpool
from fastapi import Response
def is_json(myjson):
try:
json.loads(myjson)
except ValueError:
return False
return True
async def get_body(response):
response_body = [chunk async for chunk in response.body_iterator]
response.body_iterator = iterate_in_threadpool(iter(response_body))
if not response_body:
return None
decode_str = response_body[0].decode()
if is_json(decode_str):
return json.loads(decode_str)
return None
def set_body(response: Response, body: dict[str, Any]) -> Response:
"""
Set the body of the response
"""
response.body = json.dumps(body).encode("utf-8")
response.headers["Content-Length"] = str(len(response.body))
return Response(
content=json.dumps(body).encode("utf-8"),
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type,
)
def to_camel_case(input_str) -> str:
"""
Convert snake_case to camelCase
"""
components = input_str.split("_")
return components[0] + "".join(x.title() for x in components[1:])
def convert_keys_to_camel_case(item: dict[str, Any]) -> dict[str, Any]:
"""
Convert the keys of the dictionary to camel case (snake_case to camelCase) in all levels
"""
if isinstance(item, dict):
new_dict = {}
for k, v in item.items():
new_key = to_camel_case(k)
new_dict[new_key] = convert_keys_to_camel_case(v)
return new_dict
elif isinstance(item, list):
return [convert_keys_to_camel_case(item) for item in item]
else:
return item
def convert_all_to_camel_case(item: dict[str, Any]) -> dict[str, Any]:
"""
Convert all keys to camel case (snake_case to camelCase) in all levels
"""
if isinstance(item, dict):
new_dict = {}
for k, v in item.items():
new_key = to_camel_case(k)
new_value = to_camel_case(v) if isinstance(v, str) else v
new_dict[new_key] = convert_all_to_camel_case(new_value)
return new_dict
elif isinstance(item, list):
return [convert_all_to_camel_case(item) for item in item]
else:
return item
def to_snake_case(input_str) -> str:
"""
Convert camelCase to snake_case
"""
return "".join(["_" + i.lower() if i.isupper() else i for i in input_str]).lstrip(
"_"
)
def convert_keys_to_snake_case(item: dict[str, Any]) -> dict[str, Any]:
"""
Convert the keys of the dictionary to snake case (camelCase to snake_case) in all levels
"""
if isinstance(item, dict):
new_dict = {}
for k, v in item.items():
new_key = to_snake_case(k)
new_dict[new_key] = v
return new_dict
elif isinstance(item, list):
return [convert_keys_to_snake_case(item) for item in item]
else:
return item

View File

@ -0,0 +1,4 @@
FROM base-core-app
# launcher
ENTRYPOINT flask --app main run --host=0.0.0.0 --debug

View File

@ -0,0 +1,21 @@
import os
from flask import Flask, request
from src.core.use_case import documents_use_case
from src.infra.storage.AzureStorage import AzureStorage
from src.infra.api.flask.middlewares import register_middlewares
app = Flask(__name__)
register_middlewares(app)
@app.route("/api/v1/documents/")
def documents_list():
storage = AzureStorage(
account_name=os.getenv("AZURE_STORAGE_ACCOUNT_NAME", ""),
account_key=os.getenv("AZURE_STORAGE_ACCOUNT_KEY", ""),
)
params = request.args
return documents_use_case.documents_list(storage=storage, params=params)

View File

@ -0,0 +1,92 @@
import json
from flask import request
from typing import Any
def to_snake_case(input_str) -> str:
"""
Convert camelCase to snake_case
"""
return "".join(["_" + i.lower() if i.isupper() else i for i in input_str]).lstrip(
"_"
)
def to_camel_case(input_str) -> str:
"""
Convert snake_case to camelCase
"""
components = input_str.split("_")
return components[0] + "".join(x.title() for x in components[1:])
def convert_keys_to_snake_case(item: dict[str, Any]) -> dict[str, Any]:
"""
Convert the keys of the dictionary to snake case (camelCase to snake_case) in all levels
"""
if isinstance(item, dict):
new_dict = {}
for k, v in item.items():
new_key = to_snake_case(k)
new_dict[new_key] = v
return new_dict
elif isinstance(item, list):
return [convert_keys_to_snake_case(item) for item in item]
else:
return item
def convert_keys_to_camel_case(item: dict[str, Any]) -> dict[str, Any]:
"""
Convert the keys of the dictionary to camel case (snake_case to camelCase) in all levels
"""
if isinstance(item, dict):
new_dict = {}
for k, v in item.items():
new_key = to_camel_case(k)
new_dict[new_key] = convert_keys_to_camel_case(v)
return new_dict
elif isinstance(item, list):
return [convert_keys_to_camel_case(item) for item in item]
else:
return item
def register_middlewares(app):
"""
Registra los middlewares a la instancia de Flask
"""
@app.before_request
def convert_request_keys_to_snake_case_middleware():
"""
Convert the keys of the request to snake case (camelCase to snake_case) in all levels
"""
request.args = convert_keys_to_snake_case(request.args)
@app.after_request
def convert_response_to_camel_case_middleware(response):
"""
Convert the keys of the response to camel case (snake_case to camelCase) in all levels
"""
response.data = json.dumps(
convert_keys_to_camel_case(json.loads(response.data))
)
return response
@app.after_request
def convert_response_to_json_middleware(response):
"""
Convert the response to JSON format
"""
response.headers["Content-Type"] = "application/json"
return response
@app.after_request
def add_cors_headers_middleware(response):
"""
Add CORS headers to the response
"""
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE"
return response

View File

@ -0,0 +1 @@
flask

View File

@ -0,0 +1,35 @@
from src.core.entity.ResponseTypes import ResponseTypes
from src.core.decorators import check_params
from src.core.use_case.documents_use_case import DocumentListModel
from src.core.entity.Documents import DocumentsType
from random import randint
from faker import Faker
fake = Faker()
class AzureStorage:
def __init__(self, account_name, account_key):
self.account_name = account_name
self.account_key = account_key
def put_file(self, container_name, file_path):
pass
def download_file(self, container_name, file_path):
pass
@check_params(DocumentListModel)
def list_files(self, container_name):
return {
'type': ResponseTypes.SUCCESS,
'errors': [],
'data': [
{
'id': id,
'name': fake.file_name(),
'type': DocumentsType.FILE.value if randint(0, 1) else DocumentsType.DIRECTORY.value,
'public_url': fake.url() + fake.file_name(),
'created_at': fake.date_time_this_year(),
'parent_id': randint(1, 10) if randint(0, 1) else None,
}
for id in range(1, 10)],
}