# This Python file uses the following encoding: utf-8 import os from pathlib import Path import sys import yaml import re from subprocess import run import requests from textwrap import wrap import time from PySide6.QtWidgets import ( QApplication, QWidget, QDialog, QLineEdit, QPushButton, QVBoxLayout, QMessageBox, QHBoxLayout, QComboBox, QTextEdit, ) from PySide6.QtGui import QCursor from PySide6.QtCore import QFile, QDate, QDir, Qt, QTimer from PySide6.QtUiTools import QUiLoader BASE_URL = "https://bugs.mageia.org/rest/bug/" class folded_str(str): pass class literal_str(str): pass class LineDialog(QDialog): def __init__(self, title, init="", parent=None): super().__init__(parent) self.setWindowTitle(title) self.name_ql = QLineEdit() self.name_ql.setText(init) apply_bt = QPushButton("Apply") layout = QHBoxLayout() layout.addWidget(self.name_ql) layout.addWidget(apply_bt) self.setLayout(layout) apply_bt.clicked.connect(self.apply) def apply(self): self.accept() class SrcDialog(QDialog): def __init__(self, parent=None): super().__init__(parent) self.setWindowTitle("Add source") self.name_ql = QLineEdit() self.release = QComboBox() self.release.addItems(["9", "8"]) self.repo = QComboBox() self.repo.addItems(["core", "nonfree", "tainted"]) apply_bt = QPushButton("Apply") layout = QHBoxLayout() layout.addWidget(self.release) layout.addWidget(self.repo) layout.addWidget(self.name_ql) layout.addWidget(apply_bt) self.setLayout(layout) apply_bt.clicked.connect(self.apply) def apply(self): self.accept() class Widget(QWidget): def __init__(self, parent=None): super().__init__(parent) self.load_ui() self.setWindowTitle("Mageia advisor") def load_ui(self): loader = QUiLoader() path = Path(__file__).resolve().parent / "form.ui" ui_file = QFile(path) ui_file.open(QFile.ReadOnly) self.ui = loader.load(ui_file, self) ui_file.close() self.ui.retrieve_pb.clicked.connect(self.retrieve) self.ui.add_cve_pb.clicked.connect(self.add_cve) self.ui.remove_cve_pb.clicked.connect(self.remove_cve) self.ui.add_src_pb.clicked.connect(self.add_src) self.ui.remove_src_pb.clicked.connect(self.remove_src) self.ui.add_ref_pb.clicked.connect(self.add_reference) self.ui.remove_ref_pb.clicked.connect(self.remove_reference) self.ui.export_pb.clicked.connect(self.export) self.ui.cancel_pb.clicked.connect(self.cancel) self.ui.preview_pb.clicked.connect(self.preview) self.ui.bug_le.editingFinished.connect(self.valid_number) self.ui.bug_le.returnPressed.connect(self.retrieve) def retrieve(self): """ Retrieve CVEs, URLs and Source package name from the bug report given by its number """ if self.ui.bug_le.text() == "": mb = QMessageBox(QMessageBox.Warning, "Retrieve info", "Provide a bug number first", QMessageBox.Ok) mb.exec() return self.valid_number() QApplication.setOverrideCursor(QCursor(Qt.WaitCursor)) self.clean() self.ui.status.setText("Loading...") self.ui.status.repaint() url = os.path.join(BASE_URL, self.ui.bug_le.text()) + "?include_fields=cf_rpmpkg,cf_cve,url,component" headers = {'Accept': 'application/json'} r = requests.get(url, headers=headers) if r.status_code == 200 and r.json()["faults"] == []: desc ="" for pkg in re.split(';|,| ', r.json()['bugs'][0]['cf_rpmpkg']): pkg = pkg.strip() if pkg == "": continue analyze = re.search(r"([\w\-\+_]+)-\d", pkg) if analyze is not None: pkg = analyze.group(1) sources = self.src_populate(pkg) for source in sources: suffix = ".mga" + source["mga_release"] if source["repo"] in ("tainted", "nonfree"): suffix += "." + source["repo"] self.ui.list_src.addItem( " ".join((source["mga_release"], source["repo"], source["package"] + "-" + source["version"] + "-" + source["release"] + suffix, ))) desc += f" {pkg}" for cve in re.split(';|,| ', r.json()['bugs'][0]['cf_cve']): cve = cve.strip() if cve != "": self.ui.list_cve.addItem(cve) self.ui.list_ref.addItem(os.path.join(BASE_URL, self.ui.bug_le.text()) ) for url in re.split(';|,| ', r.json()['bugs'][0]['url']): url = url.strip() if url != "": self.ui.list_ref.addItem(url) if "component" in r.json()['bugs'][0].keys(): if r.json()['bugs'][0]["component"] == "Security": self.ui.security_rb.setChecked(True) self.ui.subject_le.setText(f"Updated{desc} packages fix security vulnerabilities") else: self.ui.bugfix_rb.setChecked(True) self.ui.subject_le.setText(f"Updated{desc} packages fix ") self.ui.status.setText("") else: self.ui.status.setText("No info retreived") QTimer.singleShot(5000, self.clean_status) QApplication.restoreOverrideCursor() def clean_status(self): self.ui.status.setText("") def clean(self): # Delete all fields except bug number self.ui.list_ref.clear() self.ui.list_src.clear() self.ui.subject_le.setText("") self.ui.list_cve.clear() self.ui.description_te.clear() self.ui.list_ref.repaint() self.ui.list_src.repaint() self.ui.subject_le.repaint() self.ui.list_cve.repaint() self.ui.description_te.repaint() def add_src(self): dl = SrcDialog() name = dl.exec() self.ui.list_src.addItem( " ".join((dl.release.currentText(), dl.repo.currentText(), self.sanitize_line(dl.name_ql.text()) )) ) def remove_src(self): self.ui.list_src.takeItem(self.ui.list_src.currentRow()) def add_reference(self): dl = LineDialog("Add reference") dl.exec() self.ui.list_ref.addItem(self.sanitize_line(dl.name_ql.text())) def remove_reference(self): self.ui.list_ref.takeItem(self.ui.list_ref.currentRow()) def add_cve(self): init_value = f"CVE-{QDate.currentDate().year()}-" dl = LineDialog("Add CVE", init=init_value) dl.exec() self.ui.list_cve.addItem(self.sanitize_line(dl.name_ql.text())) def remove_cve(self): self.ui.list_cve.takeItem(self.ui.list_cve.currentRow()) def adv_text(self): # https://stackoverflow.com/questions/74955147/how-to-add-literal-string-pattern-in-pyyaml-with-chomping-indicator def change_style(style, representer): def new_representer(dumper, data): scalar = representer(dumper, data) scalar.style = style return scalar return new_representer from yaml.representer import SafeRepresenter represent_folded_str = change_style('>', SafeRepresenter.represent_str) represent_literal_str = change_style('|', SafeRepresenter.represent_str) yaml.add_representer(folded_str, represent_folded_str) yaml.add_representer(literal_str, represent_literal_str) data = dict() if self.ui.bugfix_rb.isChecked(): data['type'] = 'bugfix' if self.ui.security_rb.isChecked(): data['type'] = 'security' if self.ui.subject_le.text() != "": data['subject'] = self.sanitize_line(self.ui.subject_le.text()) cves = [] if self.ui.list_cve.count() != 0: for n in range(0, self.ui.list_cve.count()): cves.append(self.ui.list_cve.item(n).text()) data['CVE'] = cves srcs = {} if self.ui.list_src.count() != 0: for n in range(0, self.ui.list_src.count()): release, repo, name = self.ui.list_src.item(n).text().split(" ") rel = int(release) if not rel in srcs.keys(): srcs[rel] = {} srcs[rel][repo] = [] else: if not repo in srcs[rel].keys(): srcs[rel][repo] = [] srcs[rel][repo].append(self.sanitize_line(name)) data['src'] = srcs if len(self.ui.description_te.toPlainText()) != 0: paragraphs = self.ui.description_te.toPlainText().split("\n") lines_desc = [] for paragraph in paragraphs: lines_desc += wrap(paragraph, width=72, break_on_hyphens=False) data['description'] = literal_str("\n".join(lines_desc)+"\n") refs = [] if self.ui.list_ref.count(): for n in range(0, self.ui.list_ref.count()): refs.append(self.ui.list_ref.item(n).text()) data['references'] = refs return yaml.dump(data, default_flow_style=False, sort_keys=False, width=75, allow_unicode=True) def export(self): if QDir().mkpath(QDir().homePath() + "/mageia-advisories/advisories"): if self.ui.bug_le.text() != "": filename = f"{QDir().homePath()}/mageia-advisories/advisories/{self.ui.bug_le.text()}.adv" if os.path.exists(filename): response = QMessageBox.question(self, 'File exists', f'The file {filename} already exists. Do you want to override it ?', QMessageBox.Yes | QMessageBox.No) if response == QMessageBox.No: return with open(filename, 'w') as f: f.write(self.adv_text()) QMessageBox.information(self, 'Success', f'The file {filename} has been written!') def cancel(self): self.close() def preview(self): dl = QDialog() dl.setWindowTitle("Advisory preview") te = QTextEdit() te.setPlainText(self.adv_text()) te.setReadOnly(True) te.setMinimumSize(600, 0) ok_bt = QPushButton("OK") layout = QVBoxLayout() layout.addWidget(te) layout.addWidget(ok_bt) dl.setLayout(layout) ok_bt.clicked.connect(dl.close) dl.exec() def sanitize_line(self, line): if len(line) != 0: return line.splitlines()[0].strip() else: return "" def valid_number(self): self.ui.bug_le.setText(re.sub(r'\D', '', self.ui.bug_le.text())) def src_populate(self, package): # retrieve information with repo, release from package name cmd = ["mgarepo", "rpmlog"] sources = [] for mga_release in range(9,10): source = {} repo = "" p = run(cmd + [f"{mga_release}/{package}"], capture_output=True, text=True) if p.returncode == 0: # example to parse : * Tue Aug 15 2023 squidf 116.0.5845.96-1.mga9.tainted line1 = p.stdout.split('\n')[0] analyze = re.search(r"^\*.*\s(\w*[:\.\w]+)-([\.\d]+)", line1) if analyze is not None: # get version, remove epoch version = analyze.group(1).split(":")[-1] release = analyze.group(2)[:-1] source["mga_release"] = str(mga_release) source['package'] = package source["version"] = version source["release"] = release first = True for line in p.stdout.split('\n'): # Skip the first one which is badly formatted by mgarepo rpmlog if first: first = False else: analyze = re.search(r"^\*.*\s(\w*[:\.\d]+)-([\.\d]+).mga\d*(.*)", line) if analyze is not None: if analyze.group(3): source["repo"] = analyze.group(3)[1:] else: source["repo"] = "core" sources.append(source) break return sources if __name__ == "__main__": app = QApplication(sys.argv) widget = Widget() widget.show() sys.exit(app.exec())