# This Python file uses the following encoding: utf-8 import os from pathlib import Path import sys import yaml import re from subprocess import run import requests from textwrap import wrap from PySide6.QtWidgets import ( QApplication, QWidget, QDialog, QLineEdit, QPushButton, QVBoxLayout, QMessageBox, QHBoxLayout, QComboBox, QTextEdit, ) from PySide6.QtGui import QCursor from PySide6.QtCore import QFile, QDate, QDir, Qt, QTimer from PySide6.QtUiTools import QUiLoader BASE_URL = "https://bugs.mageia.org/rest/bug/" class folded_str(str): pass class literal_str(str): pass class LineDialog(QDialog): def __init__(self, title, init="", parent=None): super().__init__(parent) self.setWindowTitle(title) self.name_ql = QLineEdit() self.name_ql.setText(init) apply_bt = QPushButton("Apply") layout = QHBoxLayout() layout.addWidget(self.name_ql) layout.addWidget(apply_bt) self.setLayout(layout) apply_bt.clicked.connect(self.apply) def apply(self): self.accept() class SrcDialog(QDialog): def __init__(self, parent=None): super().__init__(parent) self.setWindowTitle("Add source") self.name_ql = QLineEdit() self.release = QComboBox() self.release.addItems(["9", "8"]) self.repo = QComboBox() self.repo.addItems(["core", "nonfree", "tainted"]) apply_bt = QPushButton("Apply") layout = QHBoxLayout() layout.addWidget(self.release) layout.addWidget(self.repo) layout.addWidget(self.name_ql) layout.addWidget(apply_bt) self.setLayout(layout) apply_bt.clicked.connect(self.apply) def apply(self): self.accept() class Widget(QWidget): def __init__(self, parent=None): super().__init__(parent) self.load_ui() self.setWindowTitle("Mageia advisor") def load_ui(self): loader = QUiLoader() path = Path(__file__).resolve().parent / "form.ui" ui_file = QFile(path) ui_file.open(QFile.ReadOnly) self.ui = loader.load(ui_file, self) ui_file.close() self.ui.retrieve_pb.clicked.connect(self.retrieve) self.ui.add_cve_pb.clicked.connect(self.add_cve) self.ui.remove_cve_pb.clicked.connect(self.remove_cve) self.ui.add_src_pb.clicked.connect(self.add_src) self.ui.remove_src_pb.clicked.connect(self.remove_src) self.ui.add_ref_pb.clicked.connect(self.add_reference) self.ui.remove_ref_pb.clicked.connect(self.remove_reference) self.ui.export_pb.clicked.connect(self.export) self.ui.cancel_pb.clicked.connect(self.cancel) self.ui.preview_pb.clicked.connect(self.preview) self.ui.bug_le.editingFinished.connect(self.valid_number) def retrieve(self): """ Retrieve CVEs, URLs and Source package name from the bug report given by its number """ if self.ui.bug_le.text() == "": mb = QMessageBox(QMessageBox.Warning, "Retrieve info", "Provide a bug number first", QMessageBox.Ok) mb.exec() return QApplication.setOverrideCursor(QCursor(Qt.WaitCursor)) url = os.path.join(BASE_URL, self.ui.bug_le.text()) + "?include_fields=cf_rpmpkg,cf_cve,url" headers = {'Accept': 'application/json'} r = requests.get(url, headers=headers) if r.status_code == 200 and r.json()["faults"] == []: for pkg in re.split(';|,| ', r.json()['bugs'][0]['cf_rpmpkg']): pkg = pkg.strip() if pkg == "": continue analyze = re.search(r"(\w+)-\d", pkg) if analyze is not None: pkg = analyze.group(1) sources = self.src_populate(pkg) for source in sources: suffix = ".mga" + source["mga_release"] if source["repo"] in ("tainted", "nonfree"): suffix += "." + repo self.ui.list_src.addItem( " ".join((source["mga_release"], source["repo"], source["package"] + "-" + source["version"] + "-" + source["release"] + suffix, ))) for cve in re.split(';|,| ', r.json()['bugs'][0]['cf_cve']): cve = cve.strip() if cve != "": self.ui.list_cve.addItem(cve) for url in re.split(';|,| ', r.json()['bugs'][0]['url']): url = url.strip() if url != "": self.ui.list_ref.addItem(url) else: self.ui.status.setText("No info retreived") QTimer.singleShot(5000, self.clean_status) QApplication.restoreOverrideCursor() def clean_status(self): self.ui.status.setText("") def add_src(self): dl = SrcDialog() name = dl.exec() self.ui.list_src.addItem( " ".join((dl.release.currentText(), dl.repo.currentText(), self.sanitize_line(dl.name_ql.text()) )) ) def remove_src(self): self.ui.list_src.takeItem(self.ui.list_src.currentRow()) def add_reference(self): dl = LineDialog("Add reference") dl.exec() self.ui.list_ref.addItem(self.sanitize_line(dl.name_ql.text())) def remove_reference(self): self.ui.list_ref.takeItem(self.ui.list_ref.currentRow()) def add_cve(self): init_value = f"CVE-{QDate.currentDate().year()}-" dl = LineDialog("Add CVE", init=init_value) dl.exec() self.ui.list_cve.addItem(self.sanitize_line(dl.name_ql.text())) def remove_cve(self): self.ui.list_cve.takeItem(self.ui.list_cve.currentRow()) def adv_text(self): def change_style(style, representer): def new_representer(dumper, data): scalar = representer(dumper, data) scalar.style = style return scalar return new_representer from yaml.representer import SafeRepresenter represent_folded_str = change_style('>', SafeRepresenter.represent_str) represent_literal_str = change_style('|', SafeRepresenter.represent_str) yaml.add_representer(folded_str, represent_folded_str) yaml.add_representer(literal_str, represent_literal_str) data = dict() if self.ui.bugfix_rb.isEnabled(): data['type'] = 'bugfix' if self.ui.security_rb.isEnabled(): data['type'] = 'security' if self.ui.subject_le.text() != "": data['subject'] = self.sanitize_line(self.ui.subject_le.text()) cves = [] if self.ui.list_cve.count() != 0: for n in range(0, self.ui.list_cve.count()): cves.append(self.ui.list_cve.item(n).text()) data['CVE'] = cves srcs = {} if self.ui.list_src.count() != 0: for n in range(0, self.ui.list_src.count()): release, repo, name = self.ui.list_src.item(n).text().split(" ") rel = int(release) if not rel in srcs.keys(): srcs[rel] = {} srcs[rel][repo] = [] else: if not repo in srcs[release].keys(): srcs[rel][repo] = [] srcs[rel][repo].append(self.sanitize_line(name)) data['src'] = srcs if len(self.ui.description_te.toPlainText()) != 0: lines_desc = wrap(self.ui.description_te.toPlainText(), width=72, break_on_hyphens=False) data['description'] = literal_str("\n".join(lines_desc)) refs = [] if self.ui.list_ref.count(): for n in range(0, self.ui.list_ref.count()): refs.append(self.ui.list_ref.item(n).text()) data['references'] = refs return yaml.dump(data, default_flow_style=False, sort_keys=False, width=75, allow_unicode=True) def export(self): if QDir().mkpath(QDir().homePath() + "/mageia-advisories/advisories"): if self.ui.bug_le.text() != "": #TODO check that it exists with open(f"{QDir().homePath()}/mageia-advisories/advisories/{self.ui.bug_le.text()}.adv", 'w') as f: f.write(self.adv_text()) def cancel(self): self.close() def preview(self): dl = QDialog() dl.setWindowTitle("Advisory preview") te = QTextEdit() te.setPlainText(self.adv_text()) te.setReadOnly(True) te.setMinimumSize(600, 0) ok_bt = QPushButton("OK") layout = QVBoxLayout() layout.addWidget(te) layout.addWidget(ok_bt) dl.setLayout(layout) ok_bt.clicked.connect(dl.close) dl.exec() def sanitize_line(self, line): if len(line) != 0: return line.splitlines()[0].strip() else: return "" def valid_number(self): self.ui.bug_le.setText(re.sub('\D', '', self.ui.bug_le.text())) def src_populate(self, package): # retrieve information with repo, release from package name cmd = ["mgarepo", "rpmlog"] sources = [] for mga_release in range(8,10): source = {} repo = "" p = run(cmd + [f"{mga_release}/{package}"], capture_output=True, text=True) if p.returncode == 0: line1 = p.stdout.split('\n')[0] analyze = re.search(r"(\w*[\.\d]+)-([\.\d]+)", line1) if analyze is not None: version = analyze.group(1) release = analyze.group(2)[:-1] repo = "core" if "tainted" in line1: repo = "tainted" elif "nonfree" in line1: repo = "nonfree" source["mga_release"] = str(mga_release) source['package'] = package source["repo"] = repo source["version"] = version source["release"] = release sources.append(source) return sources if __name__ == "__main__": app = QApplication(sys.argv) widget = Widget() widget.show() sys.exit(app.exec())