Adding Jira DataProcessor
This commit is contained in:
225
src/core/jira.py
Normal file
225
src/core/jira.py
Normal file
@@ -0,0 +1,225 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from core.Expando import Expando
|
||||
|
||||
JIRA_ROOT = "https://altares.atlassian.net/rest/api/2"
|
||||
DEFAULT_HEADERS = {"Accept": "application/json"}
|
||||
|
||||
logger = logging.getLogger("jql")
|
||||
|
||||
|
||||
class NotFound(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Jira:
|
||||
"""Manage default operation to JIRA"""
|
||||
|
||||
def __init__(self, user_name: str, api_token: str):
|
||||
"""
|
||||
Prepare a connection to JIRA
|
||||
The initialisation do not to anything,
|
||||
It only stores the user_name and the api_token
|
||||
Note that user_name and api_token is the recommended way to connect,
|
||||
therefore, the only supported here
|
||||
:param user_name:
|
||||
:param api_token:
|
||||
"""
|
||||
self.user_name = user_name
|
||||
self.api_token = api_token
|
||||
self.auth = HTTPBasicAuth(self.user_name, self.api_token)
|
||||
|
||||
def issue(self, issue_id: str) -> Expando:
|
||||
"""
|
||||
Retrieve an issue
|
||||
:param issue_id:
|
||||
:return:
|
||||
"""
|
||||
url = f"{JIRA_ROOT}/issue/{issue_id}"
|
||||
|
||||
response = requests.request(
|
||||
"GET",
|
||||
url,
|
||||
headers=DEFAULT_HEADERS,
|
||||
auth=self.auth
|
||||
)
|
||||
|
||||
return Expando(json.loads(response.text))
|
||||
|
||||
def fields(self) -> list[Expando]:
|
||||
"""
|
||||
Retrieve the list of all fields for an issue
|
||||
:return:
|
||||
"""
|
||||
url = f"{JIRA_ROOT}/field"
|
||||
|
||||
response = requests.request(
|
||||
"GET",
|
||||
url,
|
||||
headers=DEFAULT_HEADERS,
|
||||
auth=self.auth
|
||||
)
|
||||
|
||||
as_dict = json.loads(response.text)
|
||||
return [Expando(field) for field in as_dict]
|
||||
|
||||
def jql(self, jql: str, fields="summary,status,assignee") -> list[Expando]:
|
||||
"""
|
||||
Executes a JQL and returns the list of issues
|
||||
:param jql:
|
||||
:param fields: list of fields to retrieve
|
||||
:return:
|
||||
"""
|
||||
logger.debug(f"Processing jql '{jql}'")
|
||||
url = f"{JIRA_ROOT}/search"
|
||||
|
||||
headers = DEFAULT_HEADERS.copy()
|
||||
headers["Content-Type"] = "application/json"
|
||||
|
||||
payload = {
|
||||
"fields": [f.strip() for f in fields.split(",")],
|
||||
"fieldsByKeys": False,
|
||||
"jql": jql,
|
||||
"maxResults": 500, # Does not seem to be used. It's always 100 !
|
||||
"startAt": 0
|
||||
}
|
||||
|
||||
result = []
|
||||
while True:
|
||||
logger.debug(f"Request startAt '{payload['startAt']}'")
|
||||
response = requests.request(
|
||||
"POST",
|
||||
url,
|
||||
data=json.dumps(payload),
|
||||
headers=headers,
|
||||
auth=self.auth
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise Exception(self._format_error(response))
|
||||
|
||||
as_dict = json.loads(response.text)
|
||||
result += as_dict["issues"]
|
||||
|
||||
if as_dict["startAt"] + as_dict["maxResults"] >= as_dict["total"]:
|
||||
# We retrieve more than the total nuber of items
|
||||
break
|
||||
|
||||
payload["startAt"] += as_dict["maxResults"]
|
||||
|
||||
return [Expando(issue) for issue in result]
|
||||
|
||||
def extract(self, jql, mappings, updates=None) -> list[dict]:
|
||||
"""
|
||||
Executes a jql and returns list of dict
|
||||
The <code>issue</code> object, returned by the <ref>jql</ref> methods
|
||||
contains all the fields for Jira. They are not all necessary
|
||||
This method selects the required fields
|
||||
:param jql:
|
||||
:param mappings:
|
||||
:param updates: List of updates (lambda on issue) to perform
|
||||
:return:
|
||||
"""
|
||||
logger.debug(f"Processing extract using mapping {mappings}")
|
||||
|
||||
def _get_field(mapping):
|
||||
"""Returns the meaningful jira field, for the mapping description path"""
|
||||
fields = mapping.split(".")
|
||||
return fields[1] if len(fields) > 1 and fields[0] == "fields" else fields[0]
|
||||
|
||||
# retrieve the list of requested fields from what was asked in the mapping
|
||||
jira_fields = [_get_field(mapping) for mapping in mappings]
|
||||
as_string = ", ".join(jira_fields)
|
||||
issues = self.jql(jql, as_string)
|
||||
|
||||
for issue in issues:
|
||||
# apply updates if needed
|
||||
if updates:
|
||||
for update in updates:
|
||||
update(issue)
|
||||
|
||||
row = {cvs_col: issue.get(jira_path) for jira_path, cvs_col in mappings.items() if cvs_col is not None}
|
||||
yield row
|
||||
|
||||
def get_versions(self, project_key):
|
||||
"""
|
||||
Given a project name and a version name
|
||||
returns fixVersion number in JIRA
|
||||
:param project_key:
|
||||
:param version_name:
|
||||
:return:
|
||||
"""
|
||||
|
||||
url = f"{JIRA_ROOT}/project/{project_key}/versions"
|
||||
|
||||
response = requests.request(
|
||||
"GET",
|
||||
url,
|
||||
headers=DEFAULT_HEADERS,
|
||||
auth=self.auth
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise NotFound()
|
||||
|
||||
as_list = json.loads(response.text)
|
||||
return [Expando(version) for version in as_list]
|
||||
|
||||
def get_version(self, project_key, version_name):
|
||||
"""
|
||||
Given a project name and a version name
|
||||
returns fixVersion number in JIRA
|
||||
:param project_key:
|
||||
:param version_name:
|
||||
:return:
|
||||
"""
|
||||
|
||||
for version in self.get_versions(project_key):
|
||||
if version.name == version_name:
|
||||
return version
|
||||
|
||||
raise NotFound()
|
||||
|
||||
def get_all_fields(self):
|
||||
"""
|
||||
Helper function that returns the list of all field that can be requested in an issue
|
||||
:return:
|
||||
"""
|
||||
url = f"{JIRA_ROOT}/field"
|
||||
response = requests.request(
|
||||
"GET",
|
||||
url,
|
||||
headers=DEFAULT_HEADERS,
|
||||
auth=self.auth
|
||||
)
|
||||
|
||||
as_dict = json.loads(response.text)
|
||||
return [Expando(issue) for issue in as_dict]
|
||||
|
||||
@staticmethod
|
||||
def update_customer_refs(issue: Expando, bug_only=True, link_name=None):
|
||||
issue["ticket_customer_refs"] = []
|
||||
if bug_only and issue.fields.issuetype.name != "Bug":
|
||||
return
|
||||
|
||||
for issue_link in issue.fields.issuelinks: # [i_link for i_link in issue.fields.issuelinks if i_link["type"]["name"] == "Relates"]:
|
||||
if link_name and issue_link["type"]["name"] not in link_name:
|
||||
continue
|
||||
|
||||
direction = "inwardIssue" if "inwardIssue" in issue_link else "outwardIssue"
|
||||
related_issue_key = issue_link[direction]["key"]
|
||||
if related_issue_key.startswith("ITSUP"):
|
||||
issue.ticket_customer_refs.append(related_issue_key)
|
||||
continue
|
||||
|
||||
@staticmethod
|
||||
def _format_error(response):
|
||||
if "errorMessages" in response.text:
|
||||
error_messages = json.loads(response.text)["errorMessages"]
|
||||
else:
|
||||
error_messages = response.text
|
||||
return f"Error {response.status_code} : {response.reason} : {error_messages}"
|
||||
Reference in New Issue
Block a user