from typing import Any, Iterable, Optional, Pattern, Union
import yaml
+from filecache import DAY, filecache
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from graphql import DocumentNode
transport=self._transport, fetch_schema_from_transport=True
)
+ @filecache(DAY)
def query(
self, gql_file: Union[Path, str], params: dict[str, Any]
) -> dict[str, Any]:
# Execute the query on the transport
return self.client.execute(query, variable_values=params)
+ def invalidate_query_cache(self):
+ self.query._db.clear()
+
def create_job_needs_dag(
gl_gql: GitlabGQL, params
content = Path(gitlab_yml_file).read_text()
params["content"] = content
raw_response = gl_gql.query("job_details.gql", params)
- merged_yaml = raw_response["ciConfig"]["mergedYaml"]
- assert merged_yaml, """
+ if merged_yaml := raw_response["ciConfig"]["mergedYaml"]:
+ return yaml.safe_load(merged_yaml)
+
+ gl_gql.invalidate_query_cache()
+ raise ValueError(
+ """
Could not fetch any content for merged YAML,
please verify if the git SHA exists in remote.
- Maybe you forgot to `git push`?"""
- return yaml.safe_load(merged_yaml)
+ Maybe you forgot to `git push`? """
+ )
def recursive_fill(job, relationship_field, target_data, acc_data: dict, merged_yaml):