Skip to content

RefreshServicesUsecase

fastapi_gateway_auto_generate.domain.usecases.RefreshServicesUsecase.RefreshServicesUsecase()

The usecase for updating and verifying all connected services.

Source code in fastapi_gateway_auto_generate/domain/usecases/RefreshServicesUsecase.py
def __init__(self):
    pass

execute(config)

Launch execution of usecase

Parameters:

Name Type Description Default
config Config

The Config object with its configuration.

required
Source code in fastapi_gateway_auto_generate/domain/usecases/RefreshServicesUsecase.py
def execute(self, config: Config) -> None:
    """Launch execution of usecase
    Args:
        config (Config): The Config object with its configuration.
    """

    get_all_info_services_model: GetAllInfoServices = GetAllInfoServices(
        page=1)


    services, err = GetAllServices(db_url=config.db_url).get_all_services(
        get_all_info_services_model=get_all_info_services_model)

    if not err is None:
        return None

    count_page: int = services["metadata"]["count_page"]

    routes = config.fast_api_app.router.routes

    for _ in range(0, count_page):
        for service in services["services"]:
            for url in service["urls"]:
                for i, r in enumerate(routes):
                    if r.path == f"/{service['name']}{url}":
                        del config.fast_api_app.routes[i]
                        logger.debug(r)

            if service['delete']:
                # print(service['id'])
                DeleteService(db_url=config.db_url).delete_service(
                    delete_service_model=delete_service_model(id_service=service['id'])
                )

    services_result = BuildRouteModelsUsecase().execute(
        config=config
    )

    BuildRoutesUsecase().execute(
        services_result=services_result,
        fast_api_app=config.fast_api_app
    )

    logger.success("Services have been successfully updated")